evento/
lib.rs

1//! # Evento - Event Sourcing and CQRS Framework
2//!
3//! Evento is a comprehensive library for building event-sourced applications using Domain-Driven Design (DDD)
4//! and Command Query Responsibility Segregation (CQRS) patterns in Rust.
5//!
6//! ## Overview
7//!
8//! Event sourcing is a pattern where state changes are stored as a sequence of events. Instead of persisting
9//! just the current state, you persist all the events that led to the current state. This provides:
10//!
11//! - **Complete audit trail**: Every change is recorded as an event
12//! - **Time travel**: Replay events to see state at any point in time  
13//! - **Event-driven architecture**: React to events with handlers
14//! - **CQRS support**: Separate read and write models
15//!
16//! ## Core Concepts
17//!
18//! - **Events**: Immutable facts representing something that happened
19//! - **Aggregates**: Domain objects that process events and maintain state
20//! - **Event Handlers**: Functions that react to events and trigger side effects
21//! - **Event Store**: Persistent storage for events (SQL databases supported)
22//! - **Snapshots**: Periodic state captures to optimize loading
23//!
24//! ## Quick Start
25//!
26//! ```no_run
27//! use evento::{EventDetails, AggregatorName};
28//! use serde::{Deserialize, Serialize};
29//! use bincode::{Decode, Encode};
30//!
31//! // Define events
32//! #[derive(AggregatorName, Encode, Decode)]
33//! struct UserCreated {
34//!     name: String,
35//!     email: String,
36//! }
37//!
38//! // Define aggregate
39//! #[derive(Default, Serialize, Deserialize, Encode, Decode, Clone, Debug)]
40//! struct User {
41//!     name: String,
42//!     email: String,
43//! }
44//!
45//! // Implement event handlers on the aggregate
46//! #[evento::aggregator]
47//! impl User {
48//!     async fn user_created(&mut self, event: EventDetails<UserCreated>) -> anyhow::Result<()> {
49//!         self.name = event.data.name;
50//!         self.email = event.data.email;
51//!         Ok(())
52//!     }
53//! }
54//!
55//! #[tokio::main]
56//! async fn main() -> anyhow::Result<()> {
57//!     // Setup SQLite executor
58//!     let pool = sqlx::SqlitePool::connect("sqlite:events.db").await?;
59//!     let executor: evento::Sqlite = pool.into();
60//!
61//!     // Create and save events
62//!     let user_id = evento::create::<User>()
63//!         .data(&UserCreated {
64//!             name: "John Doe".to_string(),
65//!             email: "john@example.com".to_string(),
66//!         })?
67//!         .metadata(&true)?
68//!         .commit(&executor)
69//!         .await?;
70//!
71//!     // Load aggregate from events
72//!     let user = evento::load::<User, _>(&executor, &user_id).await?;
73//!     println!("User: {:?}", user.item);
74//!
75//!     Ok(())
76//! }
77//! ```
78//!
79//! ## Features
80//!
81//! - **SQL Database Support**: SQLite, PostgreSQL, MySQL
82//! - **Event Handlers**: Async event processing with retries
83//! - **Event Subscriptions**: Continuous event processing
84//! - **Streaming**: Real-time event streams (with `stream` feature)
85//! - **Migrations**: Database schema management
86//! - **Macros**: Procedural macros for cleaner code
87//!
88//! ## Feature Flags
89//!
90//! - `macro` - Enable procedural macros (default)
91//! - `handler` - Enable event handlers (default)  
92//! - `stream` - Enable streaming support
93//! - `sql` - Enable all SQL database backends
94//! - `sqlite` - SQLite support
95//! - `postgres` - PostgreSQL support
96//! - `mysql` - MySQL support
97
98pub mod context;
99pub mod cursor;
100mod executor;
101mod load;
102pub mod metadata;
103mod save;
104mod subscribe;
105
106#[cfg(any(feature = "sqlite", feature = "mysql", feature = "postgres"))]
107pub mod sql;
108#[cfg(any(feature = "sqlite", feature = "mysql", feature = "postgres"))]
109pub mod sql_migrator;
110#[cfg(any(feature = "sqlite", feature = "mysql", feature = "postgres"))]
111pub mod sql_types;
112
113#[cfg(feature = "macro")]
114pub use evento_macro::*;
115
116pub use executor::*;
117pub use load::*;
118pub use save::*;
119pub use subscribe::*;
120
121use std::{fmt::Debug, ops::Deref};
122use ulid::Ulid;
123
124use crate::cursor::Cursor;
125
126/// Stream utilities for working with event streams
127///
128/// This module provides stream processing capabilities when the `stream` feature is enabled.
129///
130/// ```no_run
131/// use evento::stream::StreamExt;
132/// ```
133#[cfg(feature = "stream")]
134pub mod stream {
135    pub use tokio_stream::StreamExt;
136}
137
138/// Database migration utilities
139///
140/// This module provides migration support for SQL databases. Migrations are automatically
141/// included when using any SQL database feature (sqlite, postgres, mysql).
142///
143/// ```no_run
144/// use evento::migrator::{Migrate, Plan};
145/// ```
146#[cfg(any(feature = "sqlite", feature = "postgres", feature = "mysql"))]
147pub mod migrator {
148    pub use sqlx_migrator::{Migrate, Plan};
149}
150
151/// Event with typed data and metadata
152///
153/// `EventDetails` wraps a raw [`Event`] with typed data and metadata. This provides
154/// type-safe access to event payloads in event handlers and aggregators.
155///
156/// # Type Parameters
157///
158/// - `D`: The type of the event data (must implement [`AggregatorName`] and be decodable)
159/// - `M`: The type of the metadata (defaults to `bool`, must be decodable)
160///
161/// # Examples
162///
163/// ```no_run
164/// use evento::{EventDetails, AggregatorName};
165/// use bincode::{Encode, Decode};
166///
167/// #[derive(AggregatorName, Encode, Decode)]
168/// struct UserCreated {
169///     name: String,
170///     email: String,
171/// }
172///
173/// // In an event handler
174/// async fn handle_user_created(event: EventDetails<UserCreated>) -> anyhow::Result<()> {
175///     println!("User created: {} ({})", event.data.name, event.data.email);
176///     println!("Event ID: {}", event.id);
177///     Ok(())
178/// }
179/// ```
180pub struct EventDetails<D, M = bool> {
181    inner: Event,
182    /// The typed event data
183    pub data: D,
184    /// The typed event metadata
185    pub metadata: M,
186}
187
188impl<D, M> Deref for EventDetails<D, M> {
189    type Target = Event;
190
191    fn deref(&self) -> &Self::Target {
192        &self.inner
193    }
194}
195
196#[cfg(feature = "mysql")]
197pub use sql::MySql;
198
199#[cfg(feature = "postgres")]
200pub use sql::Postgres;
201
202#[cfg(feature = "sqlite")]
203pub use sql::Sqlite;
204
205/// Cursor for event pagination and positioning
206///
207/// `EventCursor` represents a position in the event stream. It contains the event ID,
208/// version, and timestamp to enable efficient pagination and resuming from specific points.
209///
210/// This is primarily used internally for event stream pagination and subscription tracking.
211#[derive(Debug, bincode::Encode, bincode::Decode)]
212pub struct EventCursor {
213    /// Event ID (ULID string)
214    pub i: String,
215    /// Event version
216    pub v: i32,
217    /// Event timestamp (Unix timestamp in seconds)
218    pub t: u64,
219    pub s: u32,
220}
221
222/// Raw event stored in the event store
223///
224/// `Event` represents a single immutable event in the event stream. Events are the
225/// fundamental building blocks of event sourcing - they represent facts that have
226/// occurred in the domain.
227///
228/// Events are typically not used directly in application code. Instead, use
229/// [`EventDetails`] which provides typed access to the event data.
230///
231/// # Fields
232///
233/// - `id`: Unique identifier for the event (ULID)
234/// - `aggregator_id`: ID of the aggregate this event belongs to
235/// - `aggregator_type`: Type name of the aggregate
236/// - `version`: Version number of the aggregate after this event
237/// - `name`: Event type name
238/// - `routing_key`: Optional routing key for event distribution
239/// - `data`: Serialized event data (bincode)
240/// - `metadata`: Serialized event metadata (bincode)
241/// - `timestamp`: Unix timestamp when the event occurred
242///
243/// # Examples
244///
245/// Events are usually created through the [`create`] and [`save`] functions:
246///
247/// ```no_run
248/// use evento::create;
249/// # use evento::*;
250/// # use bincode::{Encode, Decode};
251/// # #[derive(AggregatorName, Encode, Decode)]
252/// # struct UserCreated { name: String }
253/// # #[derive(Default, Encode, Decode, Clone, Debug)]
254/// # struct User;
255/// # #[evento::aggregator]
256/// # impl User {}
257///
258/// async fn create_user(executor: &evento::Sqlite) -> anyhow::Result<String> {
259///     let user_id = create::<User>()
260///         .data(&UserCreated { name: "John".to_string() })?
261///         .metadata(&true)?
262///         .commit(executor)
263///         .await?;
264///     Ok(user_id)
265/// }
266/// ```
267#[derive(Debug, Clone, PartialEq, Default)]
268pub struct Event {
269    /// Unique event identifier (ULID)
270    pub id: Ulid,
271    /// ID of the aggregate this event belongs to
272    pub aggregator_id: String,
273    /// Type name of the aggregate (e.g., "myapp/User")
274    pub aggregator_type: String,
275    /// Version number of the aggregate after this event
276    pub version: i32,
277    /// Event type name
278    pub name: String,
279    /// Optional routing key for event distribution
280    pub routing_key: Option<String>,
281    /// Serialized event data (bincode format)
282    pub data: Vec<u8>,
283    /// Serialized event metadata (bincode format)
284    pub metadata: Vec<u8>,
285    /// Unix timestamp when the event occurred (seconds)
286    pub timestamp: u64,
287    /// Unix timestamp when the event occurred (seconds)
288    pub timestamp_subsec: u32,
289}
290
291impl Event {
292    /// Convert this raw event to typed [`EventDetails`]
293    ///
294    /// This method attempts to deserialize the event data and metadata into the specified types.
295    /// Returns `None` if the event name doesn't match the expected type `D`.
296    ///
297    /// # Type Parameters
298    ///
299    /// - `D`: The expected event data type (must implement [`AggregatorName`])
300    /// - `M`: The expected metadata type
301    ///
302    /// # Errors
303    ///
304    /// Returns a [`bincode::error::DecodeError`] if deserialization fails.
305    ///
306    /// # Examples
307    ///
308    /// ```no_run
309    /// use evento::{Event, EventDetails};
310    /// # use evento::*;
311    /// # use bincode::{Encode, Decode};
312    /// # #[derive(AggregatorName, Encode, Decode)]
313    /// # struct UserCreated { name: String }
314    ///
315    /// fn handle_event(event: &Event) -> anyhow::Result<()> {
316    ///     if let Some(details) = event.to_details::<UserCreated, bool>()? {
317    ///         println!("User created: {}", details.data.name);
318    ///     }
319    ///     Ok(())
320    /// }
321    /// ```
322    pub fn to_details<D: AggregatorName + bincode::Decode<()>, M: bincode::Decode<()>>(
323        &self,
324    ) -> Result<Option<EventDetails<D, M>>, bincode::error::DecodeError> {
325        if D::name() != self.name {
326            return Ok(None);
327        }
328
329        let config = bincode::config::standard();
330
331        let (data, _) = bincode::decode_from_slice(&self.data[..], config)?;
332        let (metadata, _) = bincode::decode_from_slice(&self.metadata[..], config)?;
333
334        Ok(Some(EventDetails {
335            data,
336            metadata,
337            inner: self.clone(),
338        }))
339    }
340}
341
342impl Cursor for Event {
343    type T = EventCursor;
344
345    fn serialize(&self) -> Self::T {
346        EventCursor {
347            i: self.id.to_string(),
348            v: self.version,
349            t: self.timestamp,
350            s: self.timestamp_subsec,
351        }
352    }
353}
354
355/// Trait for domain aggregates that process events
356///
357/// `Aggregator` defines the contract for objects that maintain state by processing events.
358/// Aggregates are the core building blocks in event sourcing - they represent domain entities
359/// that rebuild their state by replaying events from the event store.
360///
361/// # Implementation
362///
363/// Instead of implementing this trait manually, use the `#[evento::aggregator]` attribute macro
364/// which generates the implementation automatically based on your event handler methods.
365///
366/// # Requirements
367///
368/// Aggregators must:
369/// - Implement `Default` (initial empty state)
370/// - Be `Send + Sync` for async processing
371/// - Be serializable with `bincode::Encode + bincode::Decode`
372/// - Be `Clone`able for snapshots
373/// - Implement [`AggregatorName`] for type identification
374/// - Be `Debug`gable for diagnostics
375///
376/// # Examples
377///
378/// ```no_run
379/// use evento::{Aggregator, AggregatorName, EventDetails};
380/// use serde::{Deserialize, Serialize};
381/// use bincode::{Encode, Decode};
382///
383/// #[derive(AggregatorName, Encode, Decode)]
384/// struct UserCreated {
385///     name: String,
386///     email: String,
387/// }
388///
389/// #[derive(Default, Serialize, Deserialize, Encode, Decode, Clone, Debug)]
390/// struct User {
391///     name: String,
392///     email: String,
393///     is_active: bool,
394/// }
395///
396/// #[evento::aggregator]
397/// impl User {
398///     async fn user_created(&mut self, event: EventDetails<UserCreated>) -> anyhow::Result<()> {
399///         self.name = event.data.name;
400///         self.email = event.data.email;
401///         self.is_active = true;
402///         Ok(())
403///     }
404/// }
405/// ```
406pub trait Aggregator:
407    Default + Send + Sync + bincode::Encode + bincode::Decode<()> + Clone + AggregatorName + Debug
408{
409    /// Process an event and update the aggregate's state
410    ///
411    /// This method is called for each event when rebuilding the aggregate from the event store.
412    /// The implementation should update the aggregate's state based on the event.
413    ///
414    /// Typically, this method is generated automatically by the `#[evento::aggregator]` macro
415    /// and dispatches to specific handler methods based on the event type.
416    fn aggregate<'async_trait>(
417        &'async_trait mut self,
418        event: &'async_trait Event,
419    ) -> std::pin::Pin<
420        Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'async_trait>,
421    >
422    where
423        Self: Sync + 'async_trait;
424
425    /// Get the revision hash for this aggregate implementation
426    ///
427    /// The revision changes when the aggregate's event handling logic changes.
428    /// This is used for versioning and ensuring compatibility.
429    fn revision() -> &'static str;
430}
431
432/// Trait for types that have a name identifier
433///
434/// `AggregatorName` provides a way to get the name of a type at runtime.
435/// This is used to identify event types and aggregate types in the event store.
436///
437/// # Implementation
438///
439/// For events, derive this trait using `#[derive(AggregatorName)]`.
440/// For aggregates, this is automatically implemented by the `#[evento::aggregator]` macro.
441///
442/// # Examples
443///
444/// ```no_run
445/// use evento::AggregatorName;
446/// use bincode::{Encode, Decode};
447///
448/// #[derive(AggregatorName, Encode, Decode)]
449/// struct UserCreated {
450///     name: String,
451/// }
452///
453/// assert_eq!(UserCreated::name(), "UserCreated");
454/// ```
455pub trait AggregatorName {
456    /// Get the name of this type
457    fn name() -> &'static str;
458}