Skip to main content

evento_core/
executor.rs

1//! Event storage and retrieval abstraction.
2//!
3//! This module defines the [`Executor`] trait, the core abstraction for event
4//! persistence. Implementations handle storing events, querying, and managing
5//! subscriptions.
6//!
7//! # Types
8//!
9//! - [`Executor`] - Core trait for event storage backends
10//! - [`Evento`] - Type-erased wrapper around any executor
11//! - [`EventoGroup`] - Multi-executor aggregation (feature: `group`)
12//! - [`Rw`] - Read-write split executor (feature: `rw`)
13//! - [`ReadAggregator`] - Query filter for reading events
14
15use std::{hash::Hash, sync::Arc};
16use ulid::Ulid;
17
18use crate::{
19    cursor::{Args, ReadResult, Value},
20    Event, RoutingKey, WriteError,
21};
22
23/// Filter for querying events by aggregator.
24///
25/// Use the constructor methods to create filters:
26///
27/// # Example
28///
29/// ```rust,ignore
30/// // All events for an aggregator type
31/// let filter = ReadAggregator::aggregator("myapp/User");
32///
33/// // Events for a specific aggregate instance
34/// let filter = ReadAggregator::id("myapp/User", "user-123");
35///
36/// // Events of a specific type
37/// let filter = ReadAggregator::event("myapp/User", "UserCreated");
38/// ```
39#[derive(Clone, PartialEq, Eq)]
40pub struct ReadAggregator {
41    /// Aggregator type (e.g., "myapp/User")
42    pub aggregator_type: String,
43    /// Optional specific aggregate ID
44    pub aggregator_id: Option<String>,
45    /// Optional event name filter
46    pub name: Option<String>,
47}
48
49impl ReadAggregator {
50    /// Creates a filter with all fields specified.
51    ///
52    /// Filters events by aggregator type, specific aggregate ID, and event name.
53    pub fn new(
54        aggregator_type: impl Into<String>,
55        id: impl Into<String>,
56        name: impl Into<String>,
57    ) -> Self {
58        Self {
59            aggregator_type: aggregator_type.into(),
60            aggregator_id: Some(id.into()),
61            name: Some(name.into()),
62        }
63    }
64
65    /// Creates a filter for all events of an aggregator type.
66    ///
67    /// Returns all events regardless of aggregate ID or event name.
68    pub fn aggregator(value: impl Into<String>) -> Self {
69        Self {
70            aggregator_type: value.into(),
71            aggregator_id: None,
72            name: None,
73        }
74    }
75
76    /// Creates a filter for a specific aggregate instance.
77    ///
78    /// Returns all events for the given aggregator type and ID.
79    pub fn id(aggregator_type: impl Into<String>, id: impl Into<String>) -> Self {
80        Self {
81            aggregator_type: aggregator_type.into(),
82            aggregator_id: Some(id.into()),
83            name: None,
84        }
85    }
86
87    /// Creates a filter for a specific event type.
88    ///
89    /// Returns all events of the given name for an aggregator type.
90    pub fn event(aggregator_type: impl Into<String>, name: impl Into<String>) -> Self {
91        Self {
92            aggregator_type: aggregator_type.into(),
93            aggregator_id: None,
94            name: Some(name.into()),
95        }
96    }
97}
98
99impl Hash for ReadAggregator {
100    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
101        self.aggregator_type.hash(state);
102        self.aggregator_id.hash(state);
103        self.name.hash(state);
104    }
105}
106
107/// Core trait for event storage backends.
108///
109/// Implementations handle persisting events, querying, and managing subscriptions.
110/// The main implementation is [`evento_sql::Sql`](../evento_sql/struct.Sql.html).
111///
112/// # Methods
113///
114/// - `write` - Persist events atomically
115/// - `read` - Query events with filtering and pagination
116/// - `get_subscriber_cursor` - Get subscription position
117/// - `is_subscriber_running` - Check if subscription is active
118/// - `upsert_subscriber` - Create/update subscription
119/// - `acknowledge` - Update subscription cursor
120#[async_trait::async_trait]
121pub trait Executor: Send + Sync + 'static {
122    /// Persists events atomically.
123    ///
124    /// Returns `WriteError::InvalidOriginalVersion` if version conflicts occur.
125    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError>;
126
127    /// Gets the current cursor position for a subscription.
128    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>>;
129
130    /// Checks if a subscription is running with the given worker ID.
131    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool>;
132
133    /// Creates or updates a subscription record.
134    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()>;
135
136    /// Updates subscription cursor after processing events.
137    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()>;
138
139    /// Queries events with filtering and pagination.
140    async fn read(
141        &self,
142        aggregators: Option<Vec<ReadAggregator>>,
143        routing_key: Option<RoutingKey>,
144        args: Args,
145    ) -> anyhow::Result<ReadResult<Event>>;
146
147    /// Retrieves a stored snapshot for an aggregate.
148    ///
149    /// Returns the serialized snapshot data and cursor position, or `None`
150    /// if no snapshot exists for the given aggregate.
151    async fn get_snapshot(
152        &self,
153        aggregator_type: String,
154        aggregator_revision: String,
155        id: String,
156    ) -> anyhow::Result<Option<(Vec<u8>, Value)>>;
157
158    /// Stores a snapshot for an aggregate.
159    ///
160    /// Snapshots cache aggregate state to avoid replaying all events.
161    /// The `cursor` indicates the event position up to which the snapshot is valid.
162    async fn save_snapshot(
163        &self,
164        aggregator_type: String,
165        aggregator_revision: String,
166        id: String,
167        data: Vec<u8>,
168        cursor: Value,
169    ) -> anyhow::Result<()>;
170}
171
172/// Type-erased wrapper around any [`Executor`] implementation.
173///
174/// `Evento` wraps an executor in `Arc<Box<dyn Executor>>` for dynamic dispatch.
175/// This allows storing different executor implementations in the same collection.
176///
177/// # Example
178///
179/// ```rust,ignore
180/// let sql_executor: Sql<sqlx::Sqlite> = pool.into();
181/// let evento = Evento::new(sql_executor);
182///
183/// // Use like any executor
184/// evento.write(events).await?;
185/// ```
186pub struct Evento(Arc<Box<dyn Executor>>);
187
188impl Clone for Evento {
189    fn clone(&self) -> Self {
190        Self(self.0.clone())
191    }
192}
193
194#[async_trait::async_trait]
195impl Executor for Evento {
196    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
197        self.0.write(events).await
198    }
199
200    async fn read(
201        &self,
202        aggregators: Option<Vec<ReadAggregator>>,
203        routing_key: Option<RoutingKey>,
204        args: Args,
205    ) -> anyhow::Result<ReadResult<Event>> {
206        self.0.read(aggregators, routing_key, args).await
207    }
208
209    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
210        self.0.get_subscriber_cursor(key).await
211    }
212
213    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
214        self.0.is_subscriber_running(key, worker_id).await
215    }
216
217    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
218        self.0.upsert_subscriber(key, worker_id).await
219    }
220
221    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
222        self.0.acknowledge(key, cursor, lag).await
223    }
224
225    async fn get_snapshot(
226        &self,
227        aggregator_type: String,
228        aggregator_revision: String,
229        id: String,
230    ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
231        self.0
232            .get_snapshot(aggregator_type, aggregator_revision, id)
233            .await
234    }
235
236    async fn save_snapshot(
237        &self,
238        aggregator_type: String,
239        aggregator_revision: String,
240        id: String,
241        data: Vec<u8>,
242        cursor: Value,
243    ) -> anyhow::Result<()> {
244        self.0
245            .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
246            .await
247    }
248}
249
250impl Evento {
251    /// Creates a new type-erased executor wrapper.
252    pub fn new<E: Executor>(executor: E) -> Self {
253        Self(Arc::new(Box::new(executor)))
254    }
255}
256
257/// Multi-executor aggregation (requires `group` feature).
258///
259/// `EventoGroup` combines multiple executors into one. Reads query all executors
260/// and merge results; writes go only to the first executor.
261///
262/// Useful for aggregating events from multiple sources.
263#[cfg(feature = "group")]
264#[derive(Clone, Default)]
265pub struct EventoGroup {
266    executors: Vec<Evento>,
267}
268
269#[cfg(feature = "group")]
270impl EventoGroup {
271    /// Adds an executor to the group.
272    ///
273    /// Returns `self` for method chaining.
274    pub fn executor(mut self, executor: impl Into<Evento>) -> Self {
275        self.executors.push(executor.into());
276
277        self
278    }
279
280    /// Returns a reference to the first executor in the group.
281    ///
282    /// # Panics
283    ///
284    /// Panics if the group has no executors.
285    pub fn first(&self) -> &Evento {
286        self.executors
287            .first()
288            .expect("EventoGroup must have at least one executor")
289    }
290}
291
292#[cfg(feature = "group")]
293#[async_trait::async_trait]
294impl Executor for EventoGroup {
295    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
296        self.first().write(events).await
297    }
298
299    async fn read(
300        &self,
301        aggregators: Option<Vec<ReadAggregator>>,
302        routing_key: Option<RoutingKey>,
303        args: Args,
304    ) -> anyhow::Result<ReadResult<Event>> {
305        use crate::cursor;
306        let futures = self
307            .executors
308            .iter()
309            .map(|e| e.read(aggregators.to_owned(), routing_key.to_owned(), args.clone()));
310
311        let results = futures_util::future::join_all(futures).await;
312        let mut events = vec![];
313        for res in results {
314            for edge in res?.edges {
315                events.push(edge.node);
316            }
317        }
318
319        Ok(cursor::Reader::new(events).args(args).execute()?)
320    }
321
322    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
323        self.first().get_subscriber_cursor(key).await
324    }
325
326    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
327        self.first().is_subscriber_running(key, worker_id).await
328    }
329
330    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
331        self.first().upsert_subscriber(key, worker_id).await
332    }
333
334    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
335        self.first().acknowledge(key, cursor, lag).await
336    }
337
338    async fn get_snapshot(
339        &self,
340        aggregator_type: String,
341        aggregator_revision: String,
342        id: String,
343    ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
344        self.first()
345            .get_snapshot(aggregator_type, aggregator_revision, id)
346            .await
347    }
348
349    async fn save_snapshot(
350        &self,
351        aggregator_type: String,
352        aggregator_revision: String,
353        id: String,
354        data: Vec<u8>,
355        cursor: Value,
356    ) -> anyhow::Result<()> {
357        self.first()
358            .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
359            .await
360    }
361}
362
363/// Read-write split executor (requires `rw` feature).
364///
365/// Separates read and write operations to different executors.
366/// Useful for CQRS patterns where read and write databases differ.
367///
368/// # Example
369///
370/// ```rust,ignore
371/// let rw: Rw<ReadReplica, Primary> = (read_executor, write_executor).into();
372/// ```
373#[cfg(feature = "rw")]
374pub struct Rw<R: Executor, W: Executor> {
375    r: R,
376    w: W,
377}
378
379#[cfg(feature = "rw")]
380impl<R: Executor + Clone, W: Executor + Clone> Clone for Rw<R, W> {
381    fn clone(&self) -> Self {
382        Self {
383            r: self.r.clone(),
384            w: self.w.clone(),
385        }
386    }
387}
388
389#[cfg(feature = "rw")]
390#[async_trait::async_trait]
391impl<R: Executor, W: Executor> Executor for Rw<R, W> {
392    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
393        self.w.write(events).await
394    }
395
396    async fn read(
397        &self,
398        aggregators: Option<Vec<ReadAggregator>>,
399        routing_key: Option<RoutingKey>,
400        args: Args,
401    ) -> anyhow::Result<ReadResult<Event>> {
402        self.r.read(aggregators, routing_key, args).await
403    }
404
405    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
406        self.r.get_subscriber_cursor(key).await
407    }
408
409    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
410        self.r.is_subscriber_running(key, worker_id).await
411    }
412
413    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
414        self.w.upsert_subscriber(key, worker_id).await
415    }
416
417    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
418        self.w.acknowledge(key, cursor, lag).await
419    }
420
421    async fn get_snapshot(
422        &self,
423        aggregator_type: String,
424        aggregator_revision: String,
425        id: String,
426    ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
427        self.r
428            .get_snapshot(aggregator_type, aggregator_revision, id)
429            .await
430    }
431
432    async fn save_snapshot(
433        &self,
434        aggregator_type: String,
435        aggregator_revision: String,
436        id: String,
437        data: Vec<u8>,
438        cursor: Value,
439    ) -> anyhow::Result<()> {
440        self.w
441            .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
442            .await
443    }
444}
445
446#[cfg(feature = "rw")]
447impl<R: Executor, W: Executor> From<(R, W)> for Rw<R, W> {
448    fn from((r, w): (R, W)) -> Self {
449        Self { r, w }
450    }
451}