evento_core/lib.rs
1//! Core types and traits for the Evento event sourcing library.
2//!
3//! This crate provides the foundational abstractions for building event-sourced applications
4//! with Evento. It defines the core traits, types, and builders used throughout the framework.
5//!
6//! # Features
7//!
8//! - **`macro`** (default) - Procedural macros from `evento-macro`
9//! - **`group`** - Multi-executor support via `EventoGroup`
10//! - **`rw`** - Read-write split executor pattern via `Rw`
11//! - **`sqlite`**, **`mysql`**, **`postgres`** - Database support via sqlx
12//! - **`fjall`** - Embedded key-value storage with Fjall
13//!
14//! # Core Concepts
15//!
16//! ## Events
17//!
18//! Events are immutable facts that represent something that happened in your domain.
19//! The [`Event`] struct stores serialized event data with metadata:
20//!
21//! ```rust,ignore
22//! // Define events using the aggregator macro
23//! #[evento::aggregator]
24//! pub enum BankAccount {
25//! AccountOpened { owner_id: String, initial_balance: i64 },
26//! MoneyDeposited { amount: i64 },
27//! }
28//! ```
29//!
30//! ## Executor
31//!
32//! The [`Executor`] trait abstracts event storage and retrieval. Implementations
33//! handle persisting events, querying, and managing subscriptions.
34//!
35//! ## Aggregator Builder
36//!
37//! Use [`create()`] or [`aggregator()`] to build and commit events:
38//!
39//! ```rust,ignore
40//! use evento::metadata::Metadata;
41//!
42//! let id = evento::create()
43//! .event(&AccountOpened { owner_id: "user1".into(), initial_balance: 1000 })?
44//! .metadata(&Metadata::default())?
45//! .commit(&executor)
46//! .await?;
47//! ```
48//!
49//! ## Projections
50//!
51//! Build read models by subscribing to events. See the [`projection`] module.
52//!
53//! ## Cursor-based Pagination
54//!
55//! GraphQL-style pagination for querying events. See the [`cursor`] module.
56//!
57//! # Modules
58//!
59//! - [`context`] - Type-safe request context for storing arbitrary data
60//! - [`cursor`] - Cursor-based pagination types and traits
61//! - [`metadata`] - Standard event metadata types
62//! - [`projection`] - Projections, subscriptions, and event handlers
63//!
64//! # Example
65//!
66//! ```rust,ignore
67//! use evento::{Executor, metadata::Metadata, cursor::Args, ReadAggregator};
68//!
69//! // Create and persist an event
70//! let id = evento::create()
71//! .event(&AccountOpened { owner_id: "user1".into(), initial_balance: 1000 })?
72//! .metadata(&Metadata::default())?
73//! .commit(&executor)
74//! .await?;
75//!
76//! // Query events with pagination
77//! let events = executor.read(
78//! Some(vec![ReadAggregator::id("myapp/Account", &id)]),
79//! None,
80//! Args::forward(10, None),
81//! ).await?;
82//! ```
83
84mod aggregator;
85pub mod context;
86pub mod cursor;
87mod executor;
88pub mod metadata;
89pub mod projection;
90
91#[cfg(feature = "macro")]
92pub use evento_macro::*;
93
94pub use aggregator::*;
95pub use executor::*;
96pub use projection::RoutingKey;
97
98use std::fmt::Debug;
99use ulid::Ulid;
100
101use crate::cursor::Cursor;
102
103/// Cursor data for event pagination.
104///
105/// Used internally for base64-encoded cursor values in paginated queries.
106/// Contains the essential fields needed to uniquely identify an event's position.
107#[derive(Debug, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
108pub struct EventCursor {
109 /// Event ID (ULID string)
110 pub i: String,
111 /// Event version
112 pub v: u16,
113 /// Event timestamp (Unix timestamp in seconds)
114 pub t: u64,
115 /// Sub-second precision (milliseconds)
116 pub s: u32,
117}
118
119/// A stored event in the event store.
120///
121/// Events are immutable records of facts that occurred in your domain.
122/// They contain serialized data and metadata, along with positioning
123/// information for the aggregate they belong to.
124///
125/// # Fields
126///
127/// - `id` - Unique event identifier (ULID format for time-ordering)
128/// - `aggregator_id` - The aggregate instance this event belongs to
129/// - `aggregator_type` - Type name like `"myapp/BankAccount"`
130/// - `version` - Sequence number within the aggregate (for optimistic concurrency)
131/// - `name` - Event type name like `"AccountOpened"`
132/// - `routing_key` - Optional key for event distribution/partitioning
133/// - `data` - Serialized event payload (rkyv format)
134/// - `metadata` - Serialized metadata (rkyv format)
135/// - `timestamp` - When the event occurred (Unix seconds)
136/// - `timestamp_subsec` - Sub-second precision (milliseconds)
137///
138/// # Serialization
139///
140/// Event data and metadata are serialized using [rkyv](https://rkyv.org/) for
141/// zero-copy deserialization. Use [`projection::EventData`] to deserialize.
142#[derive(Debug, Clone, PartialEq, Default)]
143pub struct Event {
144 /// Unique event identifier (ULID)
145 pub id: Ulid,
146 /// ID of the aggregate this event belongs to
147 pub aggregator_id: String,
148 /// Type name of the aggregate (e.g., "myapp/User")
149 pub aggregator_type: String,
150 /// Version number of the aggregate after this event
151 pub version: u16,
152 /// Event type name
153 pub name: String,
154 /// Optional routing key for event distribution
155 pub routing_key: Option<String>,
156 /// Serialized event data (rkyv format)
157 pub data: Vec<u8>,
158 /// Serialized event metadata (rkyv format)
159 pub metadata: Vec<u8>,
160 /// Unix timestamp when the event occurred (seconds)
161 pub timestamp: u64,
162 /// Sub-second precision (milliseconds)
163 pub timestamp_subsec: u32,
164}
165
166impl Cursor for Event {
167 type T = EventCursor;
168
169 fn serialize(&self) -> Self::T {
170 EventCursor {
171 i: self.id.to_string(),
172 v: self.version,
173 t: self.timestamp,
174 s: self.timestamp_subsec,
175 }
176 }
177
178 fn serialize_cursor(&self) -> Result<cursor::Value, cursor::CursorError> {
179 use base64::{alphabet, engine::general_purpose, engine::GeneralPurpose, Engine};
180
181 let cursor = self.serialize();
182 let encoded = rkyv::to_bytes::<rkyv::rancor::Error>(&cursor)
183 .map_err(|e| cursor::CursorError::Rkyv(e.to_string()))?;
184
185 let engine = GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::PAD);
186
187 Ok(cursor::Value(engine.encode(&encoded)))
188 }
189
190 fn deserialize_cursor(value: &cursor::Value) -> Result<Self::T, cursor::CursorError> {
191 use base64::{alphabet, engine::general_purpose, engine::GeneralPurpose, Engine};
192
193 let engine = GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::PAD);
194 let decoded = engine.decode(value)?;
195
196 let result = rkyv::from_bytes::<Self::T, rkyv::rancor::Error>(&decoded)
197 .map_err(|e| cursor::CursorError::Rkyv(e.to_string()))?;
198
199 Ok(result)
200 }
201}
202
203impl cursor::Bind for Event {
204 type T = Self;
205
206 fn sort_by(data: &mut Vec<Self::T>, is_order_desc: bool) {
207 if !is_order_desc {
208 data.sort_by(|a, b| {
209 if a.timestamp_subsec != b.timestamp_subsec {
210 return a.timestamp_subsec.cmp(&b.timestamp_subsec);
211 }
212
213 if a.timestamp != b.timestamp {
214 return a.timestamp.cmp(&b.timestamp);
215 }
216
217 if a.version != b.version {
218 return a.version.cmp(&b.version);
219 }
220
221 a.id.cmp(&b.id)
222 });
223 } else {
224 data.sort_by(|a, b| {
225 if a.timestamp_subsec != b.timestamp_subsec {
226 return b.timestamp_subsec.cmp(&a.timestamp_subsec);
227 }
228
229 if a.timestamp != b.timestamp {
230 return b.timestamp.cmp(&a.timestamp);
231 }
232
233 if a.version != b.version {
234 return b.version.cmp(&a.version);
235 }
236
237 b.id.cmp(&a.id)
238 });
239 }
240 }
241
242 fn retain(
243 data: &mut Vec<Self::T>,
244 cursor: <<Self as cursor::Bind>::T as Cursor>::T,
245 is_order_desc: bool,
246 ) {
247 data.retain(|event| {
248 if is_order_desc {
249 event.timestamp < cursor.t
250 || (event.timestamp == cursor.t
251 && (event.timestamp_subsec < cursor.s
252 || (event.timestamp_subsec == cursor.s
253 && (event.version < cursor.v
254 || (event.version == cursor.v
255 && event.id.to_string() < cursor.i)))))
256 } else {
257 event.timestamp > cursor.t
258 || (event.timestamp == cursor.t
259 && (event.timestamp_subsec > cursor.s
260 || (event.timestamp_subsec == cursor.s
261 && (event.version > cursor.v
262 || (event.version == cursor.v
263 && event.id.to_string() > cursor.i)))))
264 }
265 });
266 }
267}
268
269#[cfg(any(feature = "sqlite", feature = "mysql", feature = "postgres"))]
270impl<R: sqlx::Row> sqlx::FromRow<'_, R> for Event
271where
272 i32: sqlx::Type<R::Database> + for<'r> sqlx::Decode<'r, R::Database>,
273 Vec<u8>: sqlx::Type<R::Database> + for<'r> sqlx::Decode<'r, R::Database>,
274 String: sqlx::Type<R::Database> + for<'r> sqlx::Decode<'r, R::Database>,
275 i64: sqlx::Type<R::Database> + for<'r> sqlx::Decode<'r, R::Database>,
276 for<'r> &'r str: sqlx::Type<R::Database> + sqlx::Decode<'r, R::Database>,
277 for<'r> &'r str: sqlx::ColumnIndex<R>,
278{
279 fn from_row(row: &R) -> Result<Self, sqlx::Error> {
280 let timestamp: i64 = sqlx::Row::try_get(row, "timestamp")?;
281 let timestamp_subsec: i64 = sqlx::Row::try_get(row, "timestamp_subsec")?;
282 let version: i32 = sqlx::Row::try_get(row, "version")?;
283
284 Ok(Event {
285 id: Ulid::from_string(sqlx::Row::try_get(row, "id")?)
286 .map_err(|err| sqlx::Error::InvalidArgument(err.to_string()))?,
287 aggregator_id: sqlx::Row::try_get(row, "aggregator_id")?,
288 aggregator_type: sqlx::Row::try_get(row, "aggregator_type")?,
289 version: version as u16,
290 name: sqlx::Row::try_get(row, "name")?,
291 routing_key: sqlx::Row::try_get(row, "routing_key")?,
292 data: sqlx::Row::try_get(row, "data")?,
293 metadata: sqlx::Row::try_get(row, "metadata")?,
294 timestamp: timestamp as u64,
295 timestamp_subsec: timestamp_subsec as u32,
296 })
297 }
298}