evento/lib.rs
1//! # Evento - Event Sourcing and CQRS Framework
2//!
3//! Evento is a comprehensive library for building event-sourced applications using Domain-Driven Design (DDD)
4//! and Command Query Responsibility Segregation (CQRS) patterns in Rust.
5//!
6//! ## Overview
7//!
8//! Event sourcing is a pattern where state changes are stored as a sequence of events. Instead of persisting
9//! just the current state, you persist all the events that led to the current state. This provides:
10//!
11//! - **Complete audit trail**: Every change is recorded as an event
12//! - **Time travel**: Replay events to see state at any point in time
13//! - **Event-driven architecture**: React to events with handlers
14//! - **CQRS support**: Separate read and write models
15//!
16//! ## Core Concepts
17//!
18//! - **Events**: Immutable facts representing something that happened
19//! - **Aggregates**: Domain objects that process events and maintain state
20//! - **Event Handlers**: Functions that react to events and trigger side effects
21//! - **Event Store**: Persistent storage for events (SQL databases supported)
22//! - **Snapshots**: Periodic state captures to optimize loading
23//!
24//! ## Quick Start
25//!
26//! ```no_run
27//! use evento::{EventDetails, AggregatorName};
28//! use serde::{Deserialize, Serialize};
29//! use bincode::{Decode, Encode};
30//!
31//! // Define events
32//! #[derive(AggregatorName, Encode, Decode)]
33//! struct UserCreated {
34//! name: String,
35//! email: String,
36//! }
37//!
38//! // Define aggregate
39//! #[derive(Default, Serialize, Deserialize, Encode, Decode, Clone, Debug)]
40//! struct User {
41//! name: String,
42//! email: String,
43//! }
44//!
45//! // Implement event handlers on the aggregate
46//! #[evento::aggregator]
47//! impl User {
48//! async fn user_created(&mut self, event: EventDetails<UserCreated>) -> anyhow::Result<()> {
49//! self.name = event.data.name;
50//! self.email = event.data.email;
51//! Ok(())
52//! }
53//! }
54//!
55//! #[tokio::main]
56//! async fn main() -> anyhow::Result<()> {
57//! // Setup SQLite executor
58//! let pool = sqlx::SqlitePool::connect("sqlite:events.db").await?;
59//! let executor: evento::Sqlite = pool.into();
60//!
61//! // Create and save events
62//! let user_id = evento::create::<User>()
63//! .data(&UserCreated {
64//! name: "John Doe".to_string(),
65//! email: "john@example.com".to_string(),
66//! })?
67//! .metadata(&true)?
68//! .commit(&executor)
69//! .await?;
70//!
71//! // Load aggregate from events
72//! let user = evento::load::<User, _>(&executor, &user_id).await?;
73//! println!("User: {:?}", user.item);
74//!
75//! Ok(())
76//! }
77//! ```
78//!
79//! ## Features
80//!
81//! - **SQL Database Support**: SQLite, PostgreSQL, MySQL
82//! - **Event Handlers**: Async event processing with retries
83//! - **Event Subscriptions**: Continuous event processing
84//! - **Streaming**: Real-time event streams (with `stream` feature)
85//! - **Migrations**: Database schema management
86//! - **Macros**: Procedural macros for cleaner code
87//!
88//! ## Feature Flags
89//!
90//! - `macro` - Enable procedural macros (default)
91//! - `handler` - Enable event handlers (default)
92//! - `stream` - Enable streaming support
93//! - `sql` - Enable all SQL database backends
94//! - `sqlite` - SQLite support
95//! - `postgres` - PostgreSQL support
96//! - `mysql` - MySQL support
97
98pub mod context;
99pub mod cursor;
100mod executor;
101mod load;
102pub mod metadata;
103mod save;
104mod subscribe;
105
106#[cfg(any(feature = "sqlite", feature = "mysql", feature = "postgres"))]
107pub mod sql;
108#[cfg(any(feature = "sqlite", feature = "mysql", feature = "postgres"))]
109pub mod sql_migrator;
110#[cfg(any(feature = "sqlite", feature = "mysql", feature = "postgres"))]
111pub mod sql_types;
112
113#[cfg(feature = "macro")]
114pub use evento_macro::*;
115
116pub use executor::*;
117pub use load::*;
118pub use save::*;
119pub use subscribe::*;
120
121use std::{fmt::Debug, ops::Deref};
122use ulid::Ulid;
123
124use crate::cursor::Cursor;
125
126/// Stream utilities for working with event streams
127///
128/// This module provides stream processing capabilities when the `stream` feature is enabled.
129///
130/// ```no_run
131/// use evento::stream::StreamExt;
132/// ```
133#[cfg(feature = "stream")]
134pub mod stream {
135 pub use tokio_stream::StreamExt;
136}
137
138/// Database migration utilities
139///
140/// This module provides migration support for SQL databases. Migrations are automatically
141/// included when using any SQL database feature (sqlite, postgres, mysql).
142///
143/// ```no_run
144/// use evento::migrator::{Migrate, Plan};
145/// ```
146#[cfg(any(feature = "sqlite", feature = "postgres", feature = "mysql"))]
147pub mod migrator {
148 pub use sqlx_migrator::{Migrate, Plan};
149}
150
151/// Event with typed data and metadata
152///
153/// `EventDetails` wraps a raw [`Event`] with typed data and metadata. This provides
154/// type-safe access to event payloads in event handlers and aggregators.
155///
156/// # Type Parameters
157///
158/// - `D`: The type of the event data (must implement [`AggregatorName`] and be decodable)
159/// - `M`: The type of the metadata (defaults to `bool`, must be decodable)
160///
161/// # Examples
162///
163/// ```no_run
164/// use evento::{EventDetails, AggregatorName};
165/// use bincode::{Encode, Decode};
166///
167/// #[derive(AggregatorName, Encode, Decode)]
168/// struct UserCreated {
169/// name: String,
170/// email: String,
171/// }
172///
173/// // In an event handler
174/// async fn handle_user_created(event: EventDetails<UserCreated>) -> anyhow::Result<()> {
175/// println!("User created: {} ({})", event.data.name, event.data.email);
176/// println!("Event ID: {}", event.id);
177/// Ok(())
178/// }
179/// ```
180pub struct EventDetails<D, M = bool> {
181 inner: Event,
182 /// The typed event data
183 pub data: D,
184 /// The typed event metadata
185 pub metadata: M,
186}
187
188impl<D, M> Deref for EventDetails<D, M> {
189 type Target = Event;
190
191 fn deref(&self) -> &Self::Target {
192 &self.inner
193 }
194}
195
196#[cfg(feature = "mysql")]
197pub use sql::MySql;
198
199#[cfg(feature = "postgres")]
200pub use sql::Postgres;
201
202#[cfg(feature = "sqlite")]
203pub use sql::Sqlite;
204
205/// Cursor for event pagination and positioning
206///
207/// `EventCursor` represents a position in the event stream. It contains the event ID,
208/// version, and timestamp to enable efficient pagination and resuming from specific points.
209///
210/// This is primarily used internally for event stream pagination and subscription tracking.
211#[derive(Debug, bincode::Encode, bincode::Decode)]
212pub struct EventCursor {
213 /// Event ID (ULID string)
214 pub i: String,
215 /// Event version
216 pub v: i32,
217 /// Event timestamp (Unix timestamp in seconds)
218 pub t: u64,
219 pub s: u32,
220}
221
222/// Raw event stored in the event store
223///
224/// `Event` represents a single immutable event in the event stream. Events are the
225/// fundamental building blocks of event sourcing - they represent facts that have
226/// occurred in the domain.
227///
228/// Events are typically not used directly in application code. Instead, use
229/// [`EventDetails`] which provides typed access to the event data.
230///
231/// # Fields
232///
233/// - `id`: Unique identifier for the event (ULID)
234/// - `aggregator_id`: ID of the aggregate this event belongs to
235/// - `aggregator_type`: Type name of the aggregate
236/// - `version`: Version number of the aggregate after this event
237/// - `name`: Event type name
238/// - `routing_key`: Optional routing key for event distribution
239/// - `data`: Serialized event data (bincode)
240/// - `metadata`: Serialized event metadata (bincode)
241/// - `timestamp`: Unix timestamp when the event occurred
242///
243/// # Examples
244///
245/// Events are usually created through the [`create`] and [`save`] functions:
246///
247/// ```no_run
248/// use evento::create;
249/// # use evento::*;
250/// # use bincode::{Encode, Decode};
251/// # #[derive(AggregatorName, Encode, Decode)]
252/// # struct UserCreated { name: String }
253/// # #[derive(Default, Encode, Decode, Clone, Debug)]
254/// # struct User;
255/// # #[evento::aggregator]
256/// # impl User {}
257///
258/// async fn create_user(executor: &evento::Sqlite) -> anyhow::Result<String> {
259/// let user_id = create::<User>()
260/// .data(&UserCreated { name: "John".to_string() })?
261/// .metadata(&true)?
262/// .commit(executor)
263/// .await?;
264/// Ok(user_id)
265/// }
266/// ```
267#[derive(Debug, Clone, PartialEq, Default)]
268pub struct Event {
269 /// Unique event identifier (ULID)
270 pub id: Ulid,
271 /// ID of the aggregate this event belongs to
272 pub aggregator_id: String,
273 /// Type name of the aggregate (e.g., "myapp/User")
274 pub aggregator_type: String,
275 /// Version number of the aggregate after this event
276 pub version: i32,
277 /// Event type name
278 pub name: String,
279 /// Optional routing key for event distribution
280 pub routing_key: Option<String>,
281 /// Serialized event data (bincode format)
282 pub data: Vec<u8>,
283 /// Serialized event metadata (bincode format)
284 pub metadata: Vec<u8>,
285 /// Unix timestamp when the event occurred (seconds)
286 pub timestamp: u64,
287 /// Unix timestamp when the event occurred (seconds)
288 pub timestamp_subsec: u32,
289}
290
291impl Event {
292 /// Convert this raw event to typed [`EventDetails`]
293 ///
294 /// This method attempts to deserialize the event data and metadata into the specified types.
295 /// Returns `None` if the event name doesn't match the expected type `D`.
296 ///
297 /// # Type Parameters
298 ///
299 /// - `D`: The expected event data type (must implement [`AggregatorName`])
300 /// - `M`: The expected metadata type
301 ///
302 /// # Errors
303 ///
304 /// Returns a [`bincode::error::DecodeError`] if deserialization fails.
305 ///
306 /// # Examples
307 ///
308 /// ```no_run
309 /// use evento::{Event, EventDetails};
310 /// # use evento::*;
311 /// # use bincode::{Encode, Decode};
312 /// # #[derive(AggregatorName, Encode, Decode)]
313 /// # struct UserCreated { name: String }
314 ///
315 /// fn handle_event(event: &Event) -> anyhow::Result<()> {
316 /// if let Some(details) = event.to_details::<UserCreated, bool>()? {
317 /// println!("User created: {}", details.data.name);
318 /// }
319 /// Ok(())
320 /// }
321 /// ```
322 pub fn to_details<D: AggregatorName + bincode::Decode<()>, M: bincode::Decode<()>>(
323 &self,
324 ) -> Result<Option<EventDetails<D, M>>, bincode::error::DecodeError> {
325 if D::name() != self.name {
326 return Ok(None);
327 }
328
329 let config = bincode::config::standard();
330
331 let (data, _) = bincode::decode_from_slice(&self.data[..], config)?;
332 let (metadata, _) = bincode::decode_from_slice(&self.metadata[..], config)?;
333
334 Ok(Some(EventDetails {
335 data,
336 metadata,
337 inner: self.clone(),
338 }))
339 }
340}
341
342impl Cursor for Event {
343 type T = EventCursor;
344
345 fn serialize(&self) -> Self::T {
346 EventCursor {
347 i: self.id.to_string(),
348 v: self.version,
349 t: self.timestamp,
350 s: self.timestamp_subsec,
351 }
352 }
353}
354
355/// Trait for domain aggregates that process events
356///
357/// `Aggregator` defines the contract for objects that maintain state by processing events.
358/// Aggregates are the core building blocks in event sourcing - they represent domain entities
359/// that rebuild their state by replaying events from the event store.
360///
361/// # Implementation
362///
363/// Instead of implementing this trait manually, use the `#[evento::aggregator]` attribute macro
364/// which generates the implementation automatically based on your event handler methods.
365///
366/// # Requirements
367///
368/// Aggregators must:
369/// - Implement `Default` (initial empty state)
370/// - Be `Send + Sync` for async processing
371/// - Be serializable with `bincode::Encode + bincode::Decode`
372/// - Be `Clone`able for snapshots
373/// - Implement [`AggregatorName`] for type identification
374/// - Be `Debug`gable for diagnostics
375///
376/// # Examples
377///
378/// ```no_run
379/// use evento::{Aggregator, AggregatorName, EventDetails};
380/// use serde::{Deserialize, Serialize};
381/// use bincode::{Encode, Decode};
382///
383/// #[derive(AggregatorName, Encode, Decode)]
384/// struct UserCreated {
385/// name: String,
386/// email: String,
387/// }
388///
389/// #[derive(Default, Serialize, Deserialize, Encode, Decode, Clone, Debug)]
390/// struct User {
391/// name: String,
392/// email: String,
393/// is_active: bool,
394/// }
395///
396/// #[evento::aggregator]
397/// impl User {
398/// async fn user_created(&mut self, event: EventDetails<UserCreated>) -> anyhow::Result<()> {
399/// self.name = event.data.name;
400/// self.email = event.data.email;
401/// self.is_active = true;
402/// Ok(())
403/// }
404/// }
405/// ```
406pub trait Aggregator:
407 Default + Send + Sync + bincode::Encode + bincode::Decode<()> + Clone + AggregatorName + Debug
408{
409 /// Process an event and update the aggregate's state
410 ///
411 /// This method is called for each event when rebuilding the aggregate from the event store.
412 /// The implementation should update the aggregate's state based on the event.
413 ///
414 /// Typically, this method is generated automatically by the `#[evento::aggregator]` macro
415 /// and dispatches to specific handler methods based on the event type.
416 fn aggregate<'async_trait>(
417 &'async_trait mut self,
418 event: &'async_trait Event,
419 ) -> std::pin::Pin<
420 Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'async_trait>,
421 >
422 where
423 Self: Sync + 'async_trait;
424
425 /// Get the revision hash for this aggregate implementation
426 ///
427 /// The revision changes when the aggregate's event handling logic changes.
428 /// This is used for versioning and ensuring compatibility.
429 fn revision() -> &'static str;
430}
431
432/// Trait for types that have a name identifier
433///
434/// `AggregatorName` provides a way to get the name of a type at runtime.
435/// This is used to identify event types and aggregate types in the event store.
436///
437/// # Implementation
438///
439/// For events, derive this trait using `#[derive(AggregatorName)]`.
440/// For aggregates, this is automatically implemented by the `#[evento::aggregator]` macro.
441///
442/// # Examples
443///
444/// ```no_run
445/// use evento::AggregatorName;
446/// use bincode::{Encode, Decode};
447///
448/// #[derive(AggregatorName, Encode, Decode)]
449/// struct UserCreated {
450/// name: String,
451/// }
452///
453/// assert_eq!(UserCreated::name(), "UserCreated");
454/// ```
455pub trait AggregatorName {
456 /// Get the name of this type
457 fn name() -> &'static str;
458}