evento-core 2.0.0-alpha.15

Core types and traits for evento event sourcing library.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
//! Event storage and retrieval abstraction.
//!
//! This module defines the [`Executor`] trait, the core abstraction for event
//! persistence. Implementations handle storing events, querying, and managing
//! subscriptions.
//!
//! # Types
//!
//! - [`Executor`] - Core trait for event storage backends
//! - [`Evento`] - Type-erased wrapper around any executor
//! - [`EventoGroup`] - Multi-executor aggregation (feature: `group`)
//! - [`Rw`] - Read-write split executor (feature: `rw`)
//! - [`ReadAggregator`] - Query filter for reading events

use std::{hash::Hash, sync::Arc};
use ulid::Ulid;

use crate::{
    cursor::{Args, ReadResult, Value},
    Event, RoutingKey, WriteError,
};

/// Filter for querying events by aggregator.
///
/// Use the constructor methods to create filters:
///
/// # Example
///
/// ```rust,ignore
/// // All events for an aggregator type
/// let filter = ReadAggregator::aggregator("myapp/User");
///
/// // Events for a specific aggregate instance
/// let filter = ReadAggregator::id("myapp/User", "user-123");
///
/// // Events of a specific type
/// let filter = ReadAggregator::event("myapp/User", "UserCreated");
/// ```
#[derive(Clone, PartialEq, Eq)]
pub struct ReadAggregator {
    /// Aggregator type (e.g., "myapp/User")
    pub aggregator_type: String,
    /// Optional specific aggregate ID
    pub aggregator_id: Option<String>,
    /// Optional event name filter
    pub name: Option<String>,
}

impl ReadAggregator {
    /// Creates a filter with all fields specified.
    ///
    /// Filters events by aggregator type, specific aggregate ID, and event name.
    pub fn new(
        aggregator_type: impl Into<String>,
        id: impl Into<String>,
        name: impl Into<String>,
    ) -> Self {
        Self {
            aggregator_type: aggregator_type.into(),
            aggregator_id: Some(id.into()),
            name: Some(name.into()),
        }
    }

    /// Creates a filter for all events of an aggregator type.
    ///
    /// Returns all events regardless of aggregate ID or event name.
    pub fn aggregator(value: impl Into<String>) -> Self {
        Self {
            aggregator_type: value.into(),
            aggregator_id: None,
            name: None,
        }
    }

    /// Creates a filter for a specific aggregate instance.
    ///
    /// Returns all events for the given aggregator type and ID.
    pub fn id(aggregator_type: impl Into<String>, id: impl Into<String>) -> Self {
        Self {
            aggregator_type: aggregator_type.into(),
            aggregator_id: Some(id.into()),
            name: None,
        }
    }

    /// Creates a filter for a specific event type.
    ///
    /// Returns all events of the given name for an aggregator type.
    pub fn event(aggregator_type: impl Into<String>, name: impl Into<String>) -> Self {
        Self {
            aggregator_type: aggregator_type.into(),
            aggregator_id: None,
            name: Some(name.into()),
        }
    }
}

impl Hash for ReadAggregator {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.aggregator_type.hash(state);
        self.aggregator_id.hash(state);
        self.name.hash(state);
    }
}

/// Core trait for event storage backends.
///
/// Implementations handle persisting events, querying, and managing subscriptions.
/// The main implementation is [`evento_sql::Sql`](../evento_sql/struct.Sql.html).
///
/// # Methods
///
/// - `write` - Persist events atomically
/// - `read` - Query events with filtering and pagination
/// - `get_subscriber_cursor` - Get subscription position
/// - `is_subscriber_running` - Check if subscription is active
/// - `upsert_subscriber` - Create/update subscription
/// - `acknowledge` - Update subscription cursor
#[async_trait::async_trait]
pub trait Executor: Send + Sync + 'static {
    /// Persists events atomically.
    ///
    /// Returns `WriteError::InvalidOriginalVersion` if version conflicts occur.
    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError>;

    /// Gets the current cursor position for a subscription.
    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>>;

    /// Checks if a subscription is running with the given worker ID.
    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool>;

    /// Creates or updates a subscription record.
    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()>;

    /// Updates subscription cursor after processing events.
    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()>;

    /// Queries events with filtering and pagination.
    async fn read(
        &self,
        aggregators: Option<Vec<ReadAggregator>>,
        routing_key: Option<RoutingKey>,
        args: Args,
    ) -> anyhow::Result<ReadResult<Event>>;

    /// Retrieves a stored snapshot for an aggregate.
    ///
    /// Returns the serialized snapshot data and cursor position, or `None`
    /// if no snapshot exists for the given aggregate.
    async fn get_snapshot(
        &self,
        aggregator_type: String,
        aggregator_revision: String,
        id: String,
    ) -> anyhow::Result<Option<(Vec<u8>, Value)>>;

    /// Stores a snapshot for an aggregate.
    ///
    /// Snapshots cache aggregate state to avoid replaying all events.
    /// The `cursor` indicates the event position up to which the snapshot is valid.
    async fn save_snapshot(
        &self,
        aggregator_type: String,
        aggregator_revision: String,
        id: String,
        data: Vec<u8>,
        cursor: Value,
    ) -> anyhow::Result<()>;
}

/// Type-erased wrapper around any [`Executor`] implementation.
///
/// `Evento` wraps an executor in `Arc<Box<dyn Executor>>` for dynamic dispatch.
/// This allows storing different executor implementations in the same collection.
///
/// # Example
///
/// ```rust,ignore
/// let sql_executor: Sql<sqlx::Sqlite> = pool.into();
/// let evento = Evento::new(sql_executor);
///
/// // Use like any executor
/// evento.write(events).await?;
/// ```
pub struct Evento(Arc<Box<dyn Executor>>);

impl Clone for Evento {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

#[async_trait::async_trait]
impl Executor for Evento {
    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
        self.0.write(events).await
    }

    async fn read(
        &self,
        aggregators: Option<Vec<ReadAggregator>>,
        routing_key: Option<RoutingKey>,
        args: Args,
    ) -> anyhow::Result<ReadResult<Event>> {
        self.0.read(aggregators, routing_key, args).await
    }

    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
        self.0.get_subscriber_cursor(key).await
    }

    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
        self.0.is_subscriber_running(key, worker_id).await
    }

    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
        self.0.upsert_subscriber(key, worker_id).await
    }

    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
        self.0.acknowledge(key, cursor, lag).await
    }

    async fn get_snapshot(
        &self,
        aggregator_type: String,
        aggregator_revision: String,
        id: String,
    ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
        self.0
            .get_snapshot(aggregator_type, aggregator_revision, id)
            .await
    }

    async fn save_snapshot(
        &self,
        aggregator_type: String,
        aggregator_revision: String,
        id: String,
        data: Vec<u8>,
        cursor: Value,
    ) -> anyhow::Result<()> {
        self.0
            .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
            .await
    }
}

impl Evento {
    /// Creates a new type-erased executor wrapper.
    pub fn new<E: Executor>(executor: E) -> Self {
        Self(Arc::new(Box::new(executor)))
    }
}

/// Multi-executor aggregation (requires `group` feature).
///
/// `EventoGroup` combines multiple executors into one. Reads query all executors
/// and merge results; writes go only to the first executor.
///
/// Useful for aggregating events from multiple sources.
#[cfg(feature = "group")]
#[derive(Clone, Default)]
pub struct EventoGroup {
    executors: Vec<Evento>,
}

#[cfg(feature = "group")]
impl EventoGroup {
    /// Adds an executor to the group.
    ///
    /// Returns `self` for method chaining.
    pub fn executor(mut self, executor: impl Into<Evento>) -> Self {
        self.executors.push(executor.into());

        self
    }

    /// Returns a reference to the first executor in the group.
    ///
    /// # Panics
    ///
    /// Panics if the group has no executors.
    pub fn first(&self) -> &Evento {
        self.executors
            .first()
            .expect("EventoGroup must have at least one executor")
    }
}

#[cfg(feature = "group")]
#[async_trait::async_trait]
impl Executor for EventoGroup {
    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
        self.first().write(events).await
    }

    async fn read(
        &self,
        aggregators: Option<Vec<ReadAggregator>>,
        routing_key: Option<RoutingKey>,
        args: Args,
    ) -> anyhow::Result<ReadResult<Event>> {
        use crate::cursor;
        let futures = self
            .executors
            .iter()
            .map(|e| e.read(aggregators.to_owned(), routing_key.to_owned(), args.clone()));

        let results = futures_util::future::join_all(futures).await;
        let mut events = vec![];
        for res in results {
            for edge in res?.edges {
                events.push(edge.node);
            }
        }

        Ok(cursor::Reader::new(events).args(args).execute()?)
    }

    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
        self.first().get_subscriber_cursor(key).await
    }

    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
        self.first().is_subscriber_running(key, worker_id).await
    }

    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
        self.first().upsert_subscriber(key, worker_id).await
    }

    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
        self.first().acknowledge(key, cursor, lag).await
    }

    async fn get_snapshot(
        &self,
        aggregator_type: String,
        aggregator_revision: String,
        id: String,
    ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
        self.first()
            .get_snapshot(aggregator_type, aggregator_revision, id)
            .await
    }

    async fn save_snapshot(
        &self,
        aggregator_type: String,
        aggregator_revision: String,
        id: String,
        data: Vec<u8>,
        cursor: Value,
    ) -> anyhow::Result<()> {
        self.first()
            .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
            .await
    }
}

/// Read-write split executor (requires `rw` feature).
///
/// Separates read and write operations to different executors.
/// Useful for CQRS patterns where read and write databases differ.
///
/// # Example
///
/// ```rust,ignore
/// let rw: Rw<ReadReplica, Primary> = (read_executor, write_executor).into();
/// ```
#[cfg(feature = "rw")]
pub struct Rw<R: Executor, W: Executor> {
    r: R,
    w: W,
}

#[cfg(feature = "rw")]
impl<R: Executor + Clone, W: Executor + Clone> Clone for Rw<R, W> {
    fn clone(&self) -> Self {
        Self {
            r: self.r.clone(),
            w: self.w.clone(),
        }
    }
}

#[cfg(feature = "rw")]
#[async_trait::async_trait]
impl<R: Executor, W: Executor> Executor for Rw<R, W> {
    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
        self.w.write(events).await
    }

    async fn read(
        &self,
        aggregators: Option<Vec<ReadAggregator>>,
        routing_key: Option<RoutingKey>,
        args: Args,
    ) -> anyhow::Result<ReadResult<Event>> {
        self.r.read(aggregators, routing_key, args).await
    }

    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
        self.r.get_subscriber_cursor(key).await
    }

    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
        self.r.is_subscriber_running(key, worker_id).await
    }

    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
        self.w.upsert_subscriber(key, worker_id).await
    }

    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
        self.w.acknowledge(key, cursor, lag).await
    }

    async fn get_snapshot(
        &self,
        aggregator_type: String,
        aggregator_revision: String,
        id: String,
    ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
        self.r
            .get_snapshot(aggregator_type, aggregator_revision, id)
            .await
    }

    async fn save_snapshot(
        &self,
        aggregator_type: String,
        aggregator_revision: String,
        id: String,
        data: Vec<u8>,
        cursor: Value,
    ) -> anyhow::Result<()> {
        self.w
            .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
            .await
    }
}

#[cfg(feature = "rw")]
impl<R: Executor, W: Executor> From<(R, W)> for Rw<R, W> {
    fn from((r, w): (R, W)) -> Self {
        Self { r, w }
    }
}