evento_core/
executor.rs

1//! Event storage and retrieval abstraction.
2//!
3//! This module defines the [`Executor`] trait, the core abstraction for event
4//! persistence. Implementations handle storing events, querying, and managing
5//! subscriptions.
6//!
7//! # Types
8//!
9//! - [`Executor`] - Core trait for event storage backends
10//! - [`Evento`] - Type-erased wrapper around any executor
11//! - [`EventoGroup`] - Multi-executor aggregation (feature: `group`)
12//! - [`Rw`] - Read-write split executor (feature: `rw`)
13//! - [`ReadAggregator`] - Query filter for reading events
14
15use std::{hash::Hash, sync::Arc};
16use ulid::Ulid;
17
18use crate::{
19    cursor::{Args, ReadResult, Value},
20    Event, RoutingKey, WriteError,
21};
22
23/// Filter for querying events by aggregator.
24///
25/// Use the constructor methods to create filters:
26///
27/// # Example
28///
29/// ```rust,ignore
30/// // All events for an aggregator type
31/// let filter = ReadAggregator::aggregator("myapp/User");
32///
33/// // Events for a specific aggregate instance
34/// let filter = ReadAggregator::id("myapp/User", "user-123");
35///
36/// // Events of a specific type
37/// let filter = ReadAggregator::event("myapp/User", "UserCreated");
38/// ```
39#[derive(Clone, PartialEq, Eq)]
40pub struct ReadAggregator {
41    /// Aggregator type (e.g., "myapp/User")
42    pub aggregator_type: String,
43    /// Optional specific aggregate ID
44    pub aggregator_id: Option<String>,
45    /// Optional event name filter
46    pub name: Option<String>,
47}
48
49impl ReadAggregator {
50    pub fn new(
51        aggregator_type: impl Into<String>,
52        id: impl Into<String>,
53        name: impl Into<String>,
54    ) -> Self {
55        Self {
56            aggregator_type: aggregator_type.into(),
57            aggregator_id: Some(id.into()),
58            name: Some(name.into()),
59        }
60    }
61
62    pub fn aggregator(value: impl Into<String>) -> Self {
63        Self {
64            aggregator_type: value.into(),
65            aggregator_id: None,
66            name: None,
67        }
68    }
69
70    pub fn id(aggregator_type: impl Into<String>, id: impl Into<String>) -> Self {
71        Self {
72            aggregator_type: aggregator_type.into(),
73            aggregator_id: Some(id.into()),
74            name: None,
75        }
76    }
77
78    pub fn event(aggregator_type: impl Into<String>, name: impl Into<String>) -> Self {
79        Self {
80            aggregator_type: aggregator_type.into(),
81            aggregator_id: None,
82            name: Some(name.into()),
83        }
84    }
85}
86
87impl Hash for ReadAggregator {
88    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
89        self.aggregator_type.hash(state);
90        self.aggregator_id.hash(state);
91        self.name.hash(state);
92    }
93}
94
95/// Core trait for event storage backends.
96///
97/// Implementations handle persisting events, querying, and managing subscriptions.
98/// The main implementation is [`evento_sql::Sql`](../evento_sql/struct.Sql.html).
99///
100/// # Methods
101///
102/// - `write` - Persist events atomically
103/// - `read` - Query events with filtering and pagination
104/// - `get_subscriber_cursor` - Get subscription position
105/// - `is_subscriber_running` - Check if subscription is active
106/// - `upsert_subscriber` - Create/update subscription
107/// - `acknowledge` - Update subscription cursor
108#[async_trait::async_trait]
109pub trait Executor: Send + Sync + 'static {
110    /// Persists events atomically.
111    ///
112    /// Returns `WriteError::InvalidOriginalVersion` if version conflicts occur.
113    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError>;
114
115    /// Gets the current cursor position for a subscription.
116    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>>;
117
118    /// Checks if a subscription is running with the given worker ID.
119    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool>;
120
121    /// Creates or updates a subscription record.
122    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()>;
123
124    /// Updates subscription cursor after processing events.
125    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()>;
126
127    /// Queries events with filtering and pagination.
128    async fn read(
129        &self,
130        aggregators: Option<Vec<ReadAggregator>>,
131        routing_key: Option<RoutingKey>,
132        args: Args,
133    ) -> anyhow::Result<ReadResult<Event>>;
134}
135
136/// Type-erased wrapper around any [`Executor`] implementation.
137///
138/// `Evento` wraps an executor in `Arc<Box<dyn Executor>>` for dynamic dispatch.
139/// This allows storing different executor implementations in the same collection.
140///
141/// # Example
142///
143/// ```rust,ignore
144/// let sql_executor: Sql<sqlx::Sqlite> = pool.into();
145/// let evento = Evento::new(sql_executor);
146///
147/// // Use like any executor
148/// evento.write(events).await?;
149/// ```
150pub struct Evento(Arc<Box<dyn Executor>>);
151
152impl Clone for Evento {
153    fn clone(&self) -> Self {
154        Self(self.0.clone())
155    }
156}
157
158#[async_trait::async_trait]
159impl Executor for Evento {
160    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
161        self.0.write(events).await
162    }
163
164    async fn read(
165        &self,
166        aggregators: Option<Vec<ReadAggregator>>,
167        routing_key: Option<RoutingKey>,
168        args: Args,
169    ) -> anyhow::Result<ReadResult<Event>> {
170        self.0.read(aggregators, routing_key, args).await
171    }
172
173    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
174        self.0.get_subscriber_cursor(key).await
175    }
176
177    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
178        self.0.is_subscriber_running(key, worker_id).await
179    }
180
181    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
182        self.0.upsert_subscriber(key, worker_id).await
183    }
184
185    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
186        self.0.acknowledge(key, cursor, lag).await
187    }
188}
189
190impl Evento {
191    pub fn new<E: Executor>(executor: E) -> Self {
192        Self(Arc::new(Box::new(executor)))
193    }
194}
195
196/// Multi-executor aggregation (requires `group` feature).
197///
198/// `EventoGroup` combines multiple executors into one. Reads query all executors
199/// and merge results; writes go only to the first executor.
200///
201/// Useful for aggregating events from multiple sources.
202#[cfg(feature = "group")]
203#[derive(Clone, Default)]
204pub struct EventoGroup {
205    executors: Vec<Evento>,
206}
207
208#[cfg(feature = "group")]
209impl EventoGroup {
210    pub fn executor(mut self, executor: impl Into<Evento>) -> Self {
211        self.executors.push(executor.into());
212
213        self
214    }
215
216    pub fn first(&self) -> &Evento {
217        self.executors
218            .first()
219            .expect("EventoGroup must have at least one executor")
220    }
221}
222
223#[cfg(feature = "group")]
224#[async_trait::async_trait]
225impl Executor for EventoGroup {
226    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
227        self.first().write(events).await
228    }
229
230    async fn read(
231        &self,
232        aggregators: Option<Vec<ReadAggregator>>,
233        routing_key: Option<RoutingKey>,
234        args: Args,
235    ) -> anyhow::Result<ReadResult<Event>> {
236        use crate::cursor;
237        let futures = self
238            .executors
239            .iter()
240            .map(|e| e.read(aggregators.to_owned(), routing_key.to_owned(), args.clone()));
241
242        let results = futures_util::future::join_all(futures).await;
243        let mut events = vec![];
244        for res in results {
245            for edge in res?.edges {
246                events.push(edge.node);
247            }
248        }
249
250        Ok(cursor::Reader::new(events).args(args).execute()?)
251    }
252
253    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
254        self.first().get_subscriber_cursor(key).await
255    }
256
257    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
258        self.first().is_subscriber_running(key, worker_id).await
259    }
260
261    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
262        self.first().upsert_subscriber(key, worker_id).await
263    }
264
265    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
266        self.first().acknowledge(key, cursor, lag).await
267    }
268}
269
270/// Read-write split executor (requires `rw` feature).
271///
272/// Separates read and write operations to different executors.
273/// Useful for CQRS patterns where read and write databases differ.
274///
275/// # Example
276///
277/// ```rust,ignore
278/// let rw: Rw<ReadReplica, Primary> = (read_executor, write_executor).into();
279/// ```
280#[cfg(feature = "rw")]
281pub struct Rw<R: Executor, W: Executor> {
282    r: R,
283    w: W,
284}
285
286#[cfg(feature = "rw")]
287impl<R: Executor + Clone, W: Executor + Clone> Clone for Rw<R, W> {
288    fn clone(&self) -> Self {
289        Self {
290            r: self.r.clone(),
291            w: self.w.clone(),
292        }
293    }
294}
295
296#[cfg(feature = "rw")]
297#[async_trait::async_trait]
298impl<R: Executor, W: Executor> Executor for Rw<R, W> {
299    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
300        self.w.write(events).await
301    }
302
303    async fn read(
304        &self,
305        aggregators: Option<Vec<ReadAggregator>>,
306        routing_key: Option<RoutingKey>,
307        args: Args,
308    ) -> anyhow::Result<ReadResult<Event>> {
309        self.r.read(aggregators, routing_key, args).await
310    }
311
312    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
313        self.r.get_subscriber_cursor(key).await
314    }
315
316    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
317        self.r.is_subscriber_running(key, worker_id).await
318    }
319
320    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
321        self.w.upsert_subscriber(key, worker_id).await
322    }
323
324    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
325        self.w.acknowledge(key, cursor, lag).await
326    }
327}
328
329#[cfg(feature = "rw")]
330impl<R: Executor, W: Executor> From<(R, W)> for Rw<R, W> {
331    fn from((r, w): (R, W)) -> Self {
332        Self { r, w }
333    }
334}