Skip to main content

oxirs_stream/event_sourcing/
simple.rs

1//! Simplified in-memory event sourcing utilities (v1.1.0).
2//!
3//! Provides `SimpleEventStore`, `SimpleSnapshotStore`, `SimpleEventBus`,
4//! `ProjectionRunner`, and `EventStreamIter` for lightweight event sourcing
5//! without the async/persistence overhead of the full `EventStore`.
6
7use std::collections::HashMap;
8
9/// A simple domain event for in-memory append-only log.
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct SimpleEvent {
12    /// Unique event identifier (auto-incremented).
13    pub id: u64,
14    /// ID of the aggregate this event belongs to.
15    pub aggregate_id: String,
16    /// Type label for the event (e.g. `"OrderPlaced"`).
17    pub event_type: String,
18    /// JSON or text payload.
19    pub payload: String,
20    /// Monotonically increasing version within the aggregate.
21    pub version: u64,
22    /// Unix timestamp in seconds.
23    pub timestamp: u64,
24}
25
26/// Append-only, in-memory simple event store.
27pub struct SimpleEventStore {
28    events: Vec<SimpleEvent>,
29    next_id: u64,
30    versions: HashMap<String, u64>,
31}
32
33impl SimpleEventStore {
34    /// Create an empty event store.
35    pub fn new() -> Self {
36        Self {
37            events: Vec::new(),
38            next_id: 1,
39            versions: HashMap::new(),
40        }
41    }
42
43    /// Append a new event for `aggregate_id`.
44    pub fn append(
45        &mut self,
46        aggregate_id: impl Into<String>,
47        event_type: impl Into<String>,
48        payload: impl Into<String>,
49    ) -> SimpleEvent {
50        use std::time::{SystemTime, UNIX_EPOCH};
51        let aggregate_id = aggregate_id.into();
52        let version = self.versions.entry(aggregate_id.clone()).or_insert(0);
53        *version += 1;
54        let event = SimpleEvent {
55            id: self.next_id,
56            aggregate_id: aggregate_id.clone(),
57            event_type: event_type.into(),
58            payload: payload.into(),
59            version: *version,
60            timestamp: SystemTime::now()
61                .duration_since(UNIX_EPOCH)
62                .map(|d| d.as_secs())
63                .unwrap_or(0),
64        };
65        self.next_id += 1;
66        self.events.push(event.clone());
67        event
68    }
69
70    /// Load all events for the given aggregate, ordered by version.
71    pub fn load_aggregate(&self, aggregate_id: &str) -> Vec<SimpleEvent> {
72        self.events
73            .iter()
74            .filter(|e| e.aggregate_id == aggregate_id)
75            .cloned()
76            .collect()
77    }
78
79    /// Load events for `aggregate_id` starting at `from_version` (inclusive).
80    pub fn load_from_version(&self, aggregate_id: &str, from_version: u64) -> Vec<SimpleEvent> {
81        self.events
82            .iter()
83            .filter(|e| e.aggregate_id == aggregate_id && e.version >= from_version)
84            .cloned()
85            .collect()
86    }
87
88    /// Load every event across all aggregates.
89    pub fn load_all_events(&self) -> Vec<SimpleEvent> {
90        self.events.clone()
91    }
92
93    /// Total number of events stored.
94    pub fn len(&self) -> usize {
95        self.events.len()
96    }
97
98    /// True when the store is empty.
99    pub fn is_empty(&self) -> bool {
100        self.events.is_empty()
101    }
102
103    /// Current version for `aggregate_id`.
104    pub fn current_version(&self, aggregate_id: &str) -> u64 {
105        self.versions.get(aggregate_id).copied().unwrap_or(0)
106    }
107}
108
109impl Default for SimpleEventStore {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115/// An iterator over simple events with optional filtering.
116pub struct EventStreamIter {
117    events: Vec<SimpleEvent>,
118    position: usize,
119    filter_aggregate: Option<String>,
120    filter_type: Option<String>,
121}
122
123impl EventStreamIter {
124    /// Create a stream over a list of events.
125    pub fn new(events: Vec<SimpleEvent>) -> Self {
126        Self {
127            events,
128            position: 0,
129            filter_aggregate: None,
130            filter_type: None,
131        }
132    }
133
134    /// Filter events to a single aggregate.
135    pub fn for_aggregate(mut self, aggregate_id: impl Into<String>) -> Self {
136        self.filter_aggregate = Some(aggregate_id.into());
137        self
138    }
139
140    /// Filter events to a single event type.
141    pub fn for_type(mut self, event_type: impl Into<String>) -> Self {
142        self.filter_type = Some(event_type.into());
143        self
144    }
145
146    fn matches(&self, event: &SimpleEvent) -> bool {
147        if let Some(ref agg) = self.filter_aggregate {
148            if &event.aggregate_id != agg {
149                return false;
150            }
151        }
152        if let Some(ref et) = self.filter_type {
153            if &event.event_type != et {
154                return false;
155            }
156        }
157        true
158    }
159}
160
161impl Iterator for EventStreamIter {
162    type Item = SimpleEvent;
163
164    fn next(&mut self) -> Option<Self::Item> {
165        while self.position < self.events.len() {
166            let ev = &self.events[self.position];
167            self.position += 1;
168            if self.matches(ev) {
169                return Some(ev.clone());
170            }
171        }
172        None
173    }
174}
175
176/// A serialized snapshot of aggregate state at a given version.
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct SimpleSnapshot {
179    /// Aggregate this snapshot belongs to.
180    pub aggregate_id: String,
181    /// Serialized state.
182    pub state: String,
183    /// Aggregate version at the time of the snapshot.
184    pub version: u64,
185}
186
187/// In-memory store for aggregate snapshots.
188pub struct SimpleSnapshotStore {
189    snapshots: HashMap<String, SimpleSnapshot>,
190}
191
192impl SimpleSnapshotStore {
193    /// Create an empty snapshot store.
194    pub fn new() -> Self {
195        Self {
196            snapshots: HashMap::new(),
197        }
198    }
199
200    /// Save or overwrite the snapshot for an aggregate.
201    pub fn save(&mut self, snapshot: SimpleSnapshot) {
202        self.snapshots
203            .insert(snapshot.aggregate_id.clone(), snapshot);
204    }
205
206    /// Retrieve the latest snapshot for `aggregate_id`, if any.
207    pub fn load_snapshot(&self, aggregate_id: &str) -> Option<&SimpleSnapshot> {
208        self.snapshots.get(aggregate_id)
209    }
210
211    /// Remove the snapshot for `aggregate_id`.
212    pub fn delete(&mut self, aggregate_id: &str) -> bool {
213        self.snapshots.remove(aggregate_id).is_some()
214    }
215
216    /// Number of stored snapshots.
217    pub fn len(&self) -> usize {
218        self.snapshots.len()
219    }
220
221    /// True when no snapshots are stored.
222    pub fn is_empty(&self) -> bool {
223        self.snapshots.is_empty()
224    }
225}
226
227impl Default for SimpleSnapshotStore {
228    fn default() -> Self {
229        Self::new()
230    }
231}
232
233/// Handler function type for the simple event bus.
234pub type SimpleEventHandler = std::sync::Arc<dyn Fn(&SimpleEvent) + Send + Sync>;
235
236/// A simple pub/sub event bus.
237pub struct SimpleEventBus {
238    subscriptions: HashMap<String, Vec<SimpleEventHandler>>,
239    wildcard: Vec<SimpleEventHandler>,
240}
241
242impl SimpleEventBus {
243    /// Create an empty event bus.
244    pub fn new() -> Self {
245        Self {
246            subscriptions: HashMap::new(),
247            wildcard: Vec::new(),
248        }
249    }
250
251    /// Subscribe a handler to a specific `event_type`. Use `"*"` for all events.
252    pub fn subscribe(&mut self, event_type: impl Into<String>, handler: SimpleEventHandler) {
253        let key = event_type.into();
254        if key == "*" {
255            self.wildcard.push(handler);
256        } else {
257            self.subscriptions.entry(key).or_default().push(handler);
258        }
259    }
260
261    /// Publish an event, invoking all matching handlers synchronously.
262    pub fn publish(&self, event: &SimpleEvent) {
263        if let Some(handlers) = self.subscriptions.get(&event.event_type) {
264            for handler in handlers {
265                handler(event);
266            }
267        }
268        for handler in &self.wildcard {
269            handler(event);
270        }
271    }
272
273    /// Total number of type-specific subscriptions.
274    pub fn subscription_count(&self) -> usize {
275        self.subscriptions.values().map(|v| v.len()).sum()
276    }
277
278    /// Number of wildcard subscriptions.
279    pub fn wildcard_count(&self) -> usize {
280        self.wildcard.len()
281    }
282}
283
284impl Default for SimpleEventBus {
285    fn default() -> Self {
286        Self::new()
287    }
288}
289
290/// Applies events to build a read model (projection).
291pub struct ProjectionRunner {
292    /// Human-readable name for the projection.
293    pub name: String,
294    /// Number of events processed so far.
295    processed: u64,
296}
297
298impl ProjectionRunner {
299    /// Create a named projection runner.
300    pub fn new(name: impl Into<String>) -> Self {
301        Self {
302            name: name.into(),
303            processed: 0,
304        }
305    }
306
307    /// Apply a handler function over every event in `store`, returning the final state.
308    pub fn run<S, F>(&mut self, store: &SimpleEventStore, initial: S, mut handler: F) -> S
309    where
310        F: FnMut(S, &SimpleEvent) -> S,
311    {
312        let mut state = initial;
313        for event in store.load_all_events() {
314            state = handler(state, &event);
315            self.processed += 1;
316        }
317        state
318    }
319
320    /// Apply a handler over events for a single aggregate.
321    pub fn run_for_aggregate<S, F>(
322        &mut self,
323        store: &SimpleEventStore,
324        aggregate_id: &str,
325        initial: S,
326        mut handler: F,
327    ) -> S
328    where
329        F: FnMut(S, &SimpleEvent) -> S,
330    {
331        let mut state = initial;
332        for event in store.load_aggregate(aggregate_id) {
333            state = handler(state, &event);
334            self.processed += 1;
335        }
336        state
337    }
338
339    /// Events processed so far.
340    pub fn processed_count(&self) -> u64 {
341        self.processed
342    }
343}