this/events/log.rs
1//! EventLog trait — persistent, ordered, replayable event storage
2//!
3//! The EventLog is the source of truth for the event flow system.
4//! Unlike the EventBus (fire-and-forget broadcast), the EventLog persists
5//! events and supports replay from any position.
6
7use crate::core::events::EventEnvelope;
8use crate::events::types::{SeekPosition, SeqNo};
9use anyhow::Result;
10use async_trait::async_trait;
11use std::pin::Pin;
12use tokio_stream::Stream;
13
14/// Trait for persistent event storage backends
15///
16/// Implementations must be Send + Sync to allow sharing across tasks.
17///
18/// # Backends
19///
20/// - `InMemoryEventLog` — Vec-backed, suitable for dev/single-instance
21/// - Future: NATS JetStream, Kafka, Redis Streams
22///
23/// # Consumer Groups
24///
25/// Each consumer has an independent position in the log. The `ack` method
26/// advances the consumer's position, and `seek` allows repositioning.
27/// Consumer groups enable:
28/// - **Replay**: Start from `Beginning` to reprocess all events
29/// - **Resume**: Use `LastAcknowledged` to pick up where you left off
30/// - **Live**: Use `Latest` to only see new events
31#[async_trait]
32pub trait EventLog: Send + Sync {
33 /// Append an event envelope to the log
34 ///
35 /// Returns the sequence number assigned to the event.
36 /// Events are assigned monotonically increasing sequence numbers.
37 /// The envelope's `seq_no` field is set by the implementation.
38 async fn append(&self, envelope: EventEnvelope) -> Result<SeqNo>;
39
40 /// Subscribe to events from a given position
41 ///
42 /// Returns a stream of `EventEnvelope` starting from the specified position.
43 /// The stream is infinite — it will yield stored events first, then wait
44 /// for new events as they are appended.
45 ///
46 /// # Arguments
47 ///
48 /// * `consumer` - Consumer group name (for tracking position)
49 /// * `position` - Where to start reading from
50 async fn subscribe(
51 &self,
52 consumer: &str,
53 position: SeekPosition,
54 ) -> Result<Pin<Box<dyn Stream<Item = EventEnvelope> + Send>>>;
55
56 /// Acknowledge that a consumer has processed up to a sequence number
57 ///
58 /// This advances the consumer's `LastAcknowledged` position.
59 async fn ack(&self, consumer: &str, seq_no: SeqNo) -> Result<()>;
60
61 /// Seek a consumer to a new position
62 ///
63 /// This changes the consumer's position without acknowledging.
64 /// The next `subscribe` with `LastAcknowledged` will use this position.
65 async fn seek(&self, consumer: &str, position: SeekPosition) -> Result<()>;
66
67 /// Get the current last sequence number in the log
68 ///
69 /// Returns `None` if the log is empty.
70 async fn last_seq_no(&self) -> Option<SeqNo>;
71}