Skip to main content

evento_core/
lib.rs

1//! Core types and traits for the Evento event sourcing library.
2//!
3//! This crate provides the foundational abstractions for building event-sourced applications
4//! with Evento. It defines the core traits, types, and builders used throughout the framework.
5//!
6//! # Features
7//!
8//! - **`macro`** (default) - Procedural macros from `evento-macro`
9//! - **`group`** - Multi-executor support via `EventoGroup`
10//! - **`rw`** - Read-write split executor pattern via `Rw`
11//! - **`sqlite`**, **`mysql`**, **`postgres`** - Database support via sqlx
12//! - **`fjall`** - Embedded key-value storage with Fjall
13//!
14//! # Core Concepts
15//!
16//! ## Events
17//!
18//! Events are immutable facts that represent something that happened in your domain.
19//! The [`Event`] struct stores serialized event data with metadata:
20//!
21//! ```rust,ignore
22//! // Define events using the aggregator macro
23//! #[evento::aggregator]
24//! pub enum BankAccount {
25//!     AccountOpened { owner_id: String, initial_balance: i64 },
26//!     MoneyDeposited { amount: i64 },
27//! }
28//! ```
29//!
30//! ## Executor
31//!
32//! The [`Executor`] trait abstracts event storage and retrieval. Implementations
33//! handle persisting events, querying, and managing subscriptions.
34//!
35//! ## Aggregator Builder
36//!
37//! Use [`create()`] or [`aggregator()`] to build and commit events:
38//!
39//! ```rust,ignore
40//! use evento::metadata::Metadata;
41//!
42//! let id = evento::create()
43//!     .event(&AccountOpened { owner_id: "user1".into(), initial_balance: 1000 })
44//!     .metadata(&Metadata::default())
45//!     .commit(&executor)
46//!     .await?;
47//! ```
48//!
49//! ## Projections
50//!
51//! Build read models by replaying events. Use the [`projection`] module for loading
52//! aggregate state:
53//!
54//! ```rust,ignore
55//! use evento::projection::Projection;
56//!
57//! #[evento::projection]
58//! #[derive(Debug)]
59//! pub struct AccountView {
60//!     pub balance: i64,
61//! }
62//!
63//! #[evento::handler]
64//! async fn on_deposited(
65//!     event: Event<MoneyDeposited>,
66//!     projection: &mut AccountView,
67//! ) -> anyhow::Result<()> {
68//!     projection.balance += event.data.amount;
69//!     Ok(())
70//! }
71//!
72//! let result = Projection::<AccountView, _>::new::<BankAccount>("account-123")
73//!     .handler(on_deposited())
74//!     .execute(&executor)
75//!     .await?;
76//! ```
77//!
78//! ## Subscriptions
79//!
80//! Process events continuously in real-time. See the [`subscription`] module:
81//!
82//! ```rust,ignore
83//! use evento::subscription::SubscriptionBuilder;
84//!
85//! #[evento::subscription]
86//! async fn on_deposited<E: Executor>(
87//!     context: &Context<'_, E>,
88//!     event: Event<MoneyDeposited>,
89//! ) -> anyhow::Result<()> {
90//!     // Perform side effects
91//!     Ok(())
92//! }
93//!
94//! let subscription = SubscriptionBuilder::<Sqlite>::new("deposit-processor")
95//!     .handler(on_deposited())
96//!     .routing_key("accounts")
97//!     .start(&executor)
98//!     .await?;
99//! ```
100//!
101//! ## Cursor-based Pagination
102//!
103//! GraphQL-style pagination for querying events. See the [`cursor`] module.
104//!
105//! # Modules
106//!
107//! - [`context`] - Type-safe request context for storing arbitrary data
108//! - [`cursor`] - Cursor-based pagination types and traits
109//! - [`metadata`] - Standard event metadata types
110//! - [`projection`] - Projections for loading aggregate state
111//! - [`subscription`] - Continuous event processing with subscriptions
112//!
113//! # Example
114//!
115//! ```rust,ignore
116//! use evento::{Executor, metadata::Metadata, cursor::Args, ReadAggregator};
117//!
118//! // Create and persist an event
119//! let id = evento::create()
120//!     .event(&AccountOpened { owner_id: "user1".into(), initial_balance: 1000 })
121//!     .metadata(&Metadata::default())
122//!     .commit(&executor)
123//!     .await?;
124//!
125//! // Query events with pagination
126//! let events = executor.read(
127//!     Some(vec![ReadAggregator::id("myapp/Account", &id)]),
128//!     None,
129//!     Args::forward(10, None),
130//! ).await?;
131//! ```
132
133mod aggregator;
134pub mod context;
135pub mod cursor;
136mod executor;
137pub mod metadata;
138pub mod projection;
139pub mod subscription;
140
141#[cfg(feature = "macro")]
142pub use evento_macro::*;
143
144pub use aggregator::*;
145pub use executor::*;
146pub use subscription::RoutingKey;
147
148use std::fmt::Debug;
149use ulid::Ulid;
150
151use crate::{cursor::Cursor, metadata::Metadata};
152
153/// Cursor data for event pagination.
154///
155/// Used internally for base64-encoded cursor values in paginated queries.
156/// Contains the essential fields needed to uniquely identify an event's position.
157#[derive(Debug, bitcode::Encode, bitcode::Decode)]
158pub struct EventCursor {
159    /// Event ID (ULID string)
160    pub i: String,
161    /// Event version
162    pub v: u16,
163    /// Event timestamp (Unix timestamp in seconds)
164    pub t: u64,
165    /// Sub-second precision (milliseconds)
166    pub s: u32,
167}
168
169/// A stored event in the event store.
170///
171/// Events are immutable records of facts that occurred in your domain.
172/// They contain serialized data and metadata, along with positioning
173/// information for the aggregate they belong to.
174///
175/// # Fields
176///
177/// - `id` - Unique event identifier (ULID format for time-ordering)
178/// - `aggregator_id` - The aggregate instance this event belongs to
179/// - `aggregator_type` - Type name like `"myapp/BankAccount"`
180/// - `version` - Sequence number within the aggregate (for optimistic concurrency)
181/// - `name` - Event type name like `"AccountOpened"`
182/// - `routing_key` - Optional key for event distribution/partitioning
183/// - `data` - Serialized event payload (bitcode format)
184/// - `metadata` - Event metadata (see [`metadata::Metadata`])
185/// - `timestamp` - When the event occurred (Unix seconds)
186/// - `timestamp_subsec` - Sub-second precision (milliseconds)
187///
188/// # Serialization
189///
190/// Event data is serialized using [bitcode](https://crates.io/crates/bitcode)
191/// for compact binary representation. Use [`metadata::Event`] to deserialize typed events.
192#[derive(Debug, Clone, PartialEq, Default)]
193pub struct Event {
194    /// Unique event identifier (ULID)
195    pub id: Ulid,
196    /// ID of the aggregate this event belongs to
197    pub aggregator_id: String,
198    /// Type name of the aggregate (e.g., "myapp/User")
199    pub aggregator_type: String,
200    /// Version number of the aggregate after this event
201    pub version: u16,
202    /// Event type name
203    pub name: String,
204    /// Optional routing key for event distribution
205    pub routing_key: Option<String>,
206    /// Serialized event data (bitcode format)
207    pub data: Vec<u8>,
208    /// Event metadata
209    pub metadata: Metadata,
210    /// Unix timestamp when the event occurred (seconds)
211    pub timestamp: u64,
212    /// Sub-second precision (milliseconds)
213    pub timestamp_subsec: u32,
214}
215
216impl Cursor for Event {
217    type T = EventCursor;
218
219    fn serialize(&self) -> Self::T {
220        EventCursor {
221            i: self.id.to_string(),
222            v: self.version,
223            t: self.timestamp,
224            s: self.timestamp_subsec,
225        }
226    }
227}
228
229impl cursor::Bind for Event {
230    type T = Self;
231
232    fn sort_by(data: &mut Vec<Self::T>, is_order_desc: bool) {
233        if !is_order_desc {
234            data.sort_by(|a, b| {
235                if a.timestamp_subsec != b.timestamp_subsec {
236                    return a.timestamp_subsec.cmp(&b.timestamp_subsec);
237                }
238
239                if a.timestamp != b.timestamp {
240                    return a.timestamp.cmp(&b.timestamp);
241                }
242
243                if a.version != b.version {
244                    return a.version.cmp(&b.version);
245                }
246
247                a.id.cmp(&b.id)
248            });
249        } else {
250            data.sort_by(|a, b| {
251                if a.timestamp_subsec != b.timestamp_subsec {
252                    return b.timestamp_subsec.cmp(&a.timestamp_subsec);
253                }
254
255                if a.timestamp != b.timestamp {
256                    return b.timestamp.cmp(&a.timestamp);
257                }
258
259                if a.version != b.version {
260                    return b.version.cmp(&a.version);
261                }
262
263                b.id.cmp(&a.id)
264            });
265        }
266    }
267
268    fn retain(
269        data: &mut Vec<Self::T>,
270        cursor: <<Self as cursor::Bind>::T as Cursor>::T,
271        is_order_desc: bool,
272    ) {
273        data.retain(|event| {
274            if is_order_desc {
275                event.timestamp < cursor.t
276                    || (event.timestamp == cursor.t
277                        && (event.timestamp_subsec < cursor.s
278                            || (event.timestamp_subsec == cursor.s
279                                && (event.version < cursor.v
280                                    || (event.version == cursor.v
281                                        && event.id.to_string() < cursor.i)))))
282            } else {
283                event.timestamp > cursor.t
284                    || (event.timestamp == cursor.t
285                        && (event.timestamp_subsec > cursor.s
286                            || (event.timestamp_subsec == cursor.s
287                                && (event.version > cursor.v
288                                    || (event.version == cursor.v
289                                        && event.id.to_string() > cursor.i)))))
290            }
291        });
292    }
293}