Skip to main content

this/events/
log.rs

1//! EventLog trait — persistent, ordered, replayable event storage
2//!
3//! The EventLog is the source of truth for the event flow system.
4//! Unlike the EventBus (fire-and-forget broadcast), the EventLog persists
5//! events and supports replay from any position.
6
7use crate::core::events::EventEnvelope;
8use crate::events::types::{SeekPosition, SeqNo};
9use anyhow::Result;
10use async_trait::async_trait;
11use std::pin::Pin;
12use tokio_stream::Stream;
13
14/// Trait for persistent event storage backends
15///
16/// Implementations must be Send + Sync to allow sharing across tasks.
17///
18/// # Backends
19///
20/// - `InMemoryEventLog` — Vec-backed, suitable for dev/single-instance
21/// - Future: NATS JetStream, Kafka, Redis Streams
22///
23/// # Consumer Groups
24///
25/// Each consumer has an independent position in the log. The `ack` method
26/// advances the consumer's position, and `seek` allows repositioning.
27/// Consumer groups enable:
28/// - **Replay**: Start from `Beginning` to reprocess all events
29/// - **Resume**: Use `LastAcknowledged` to pick up where you left off
30/// - **Live**: Use `Latest` to only see new events
31#[async_trait]
32pub trait EventLog: Send + Sync {
33    /// Append an event envelope to the log
34    ///
35    /// Returns the sequence number assigned to the event.
36    /// Events are assigned monotonically increasing sequence numbers.
37    /// The envelope's `seq_no` field is set by the implementation.
38    async fn append(&self, envelope: EventEnvelope) -> Result<SeqNo>;
39
40    /// Subscribe to events from a given position
41    ///
42    /// Returns a stream of `EventEnvelope` starting from the specified position.
43    /// The stream is infinite — it will yield stored events first, then wait
44    /// for new events as they are appended.
45    ///
46    /// # Arguments
47    ///
48    /// * `consumer` - Consumer group name (for tracking position)
49    /// * `position` - Where to start reading from
50    async fn subscribe(
51        &self,
52        consumer: &str,
53        position: SeekPosition,
54    ) -> Result<Pin<Box<dyn Stream<Item = EventEnvelope> + Send>>>;
55
56    /// Acknowledge that a consumer has processed up to a sequence number
57    ///
58    /// This advances the consumer's `LastAcknowledged` position.
59    async fn ack(&self, consumer: &str, seq_no: SeqNo) -> Result<()>;
60
61    /// Seek a consumer to a new position
62    ///
63    /// This changes the consumer's position without acknowledging.
64    /// The next `subscribe` with `LastAcknowledged` will use this position.
65    async fn seek(&self, consumer: &str, position: SeekPosition) -> Result<()>;
66
67    /// Get the current last sequence number in the log
68    ///
69    /// Returns `None` if the log is empty.
70    async fn last_seq_no(&self) -> Option<SeqNo>;
71}