rust_rabbit/patterns/
event_sourcing.rs

1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use tracing::{debug, info};
7use uuid::Uuid;
8
9use crate::error::RustRabbitError;
10
11/// Unique identifier for aggregate roots
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub struct AggregateId(String);
14
15impl AggregateId {
16    pub fn new() -> Self {
17        Self(Uuid::new_v4().to_string())
18    }
19
20    pub fn from_string(id: String) -> Self {
21        Self(id)
22    }
23
24    pub fn as_str(&self) -> &str {
25        &self.0
26    }
27}
28
29impl Default for AggregateId {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35impl std::fmt::Display for AggregateId {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        write!(f, "{}", self.0)
38    }
39}
40
41/// Event sequence number for ordering
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
43pub struct EventSequence(u64);
44
45impl EventSequence {
46    pub fn new(sequence: u64) -> Self {
47        Self(sequence)
48    }
49
50    pub fn next(&self) -> Self {
51        Self(self.0 + 1)
52    }
53
54    pub fn value(&self) -> u64 {
55        self.0
56    }
57}
58
59impl From<u64> for EventSequence {
60    fn from(value: u64) -> Self {
61        Self(value)
62    }
63}
64
65/// Domain event with metadata
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct DomainEvent {
68    pub event_id: String,
69    pub aggregate_id: AggregateId,
70    pub aggregate_type: String,
71    pub event_type: String,
72    pub event_data: Vec<u8>,
73    pub metadata: HashMap<String, String>,
74    pub sequence: EventSequence,
75    pub timestamp: DateTime<Utc>,
76    pub version: u32,
77}
78
79impl DomainEvent {
80    pub fn new(
81        aggregate_id: AggregateId,
82        aggregate_type: String,
83        event_type: String,
84        event_data: Vec<u8>,
85        sequence: EventSequence,
86    ) -> Self {
87        Self {
88            event_id: Uuid::new_v4().to_string(),
89            aggregate_id,
90            aggregate_type,
91            event_type,
92            event_data,
93            metadata: HashMap::new(),
94            sequence,
95            timestamp: Utc::now(),
96            version: 1,
97        }
98    }
99
100    pub fn with_metadata(mut self, key: String, value: String) -> Self {
101        self.metadata.insert(key, value);
102        self
103    }
104
105    pub fn with_version(mut self, version: u32) -> Self {
106        self.version = version;
107        self
108    }
109}
110
111/// Snapshot of aggregate state at a specific point in time
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct AggregateSnapshot {
114    pub aggregate_id: AggregateId,
115    pub aggregate_type: String,
116    pub sequence: EventSequence,
117    pub data: Vec<u8>,
118    pub timestamp: DateTime<Utc>,
119    pub version: u32,
120}
121
122impl AggregateSnapshot {
123    pub fn new(
124        aggregate_id: AggregateId,
125        aggregate_type: String,
126        sequence: EventSequence,
127        data: Vec<u8>,
128    ) -> Self {
129        Self {
130            aggregate_id,
131            aggregate_type,
132            sequence,
133            data,
134            timestamp: Utc::now(),
135            version: 1,
136        }
137    }
138}
139
140/// Event stream query parameters
141#[derive(Debug, Clone)]
142pub struct EventStreamQuery {
143    pub aggregate_id: AggregateId,
144    pub from_sequence: Option<EventSequence>,
145    pub to_sequence: Option<EventSequence>,
146    pub event_types: Option<Vec<String>>,
147    pub limit: Option<usize>,
148}
149
150impl EventStreamQuery {
151    pub fn for_aggregate(aggregate_id: AggregateId) -> Self {
152        Self {
153            aggregate_id,
154            from_sequence: None,
155            to_sequence: None,
156            event_types: None,
157            limit: None,
158        }
159    }
160
161    pub fn from_sequence(mut self, sequence: EventSequence) -> Self {
162        self.from_sequence = Some(sequence);
163        self
164    }
165
166    pub fn to_sequence(mut self, sequence: EventSequence) -> Self {
167        self.to_sequence = Some(sequence);
168        self
169    }
170
171    pub fn with_event_types(mut self, event_types: Vec<String>) -> Self {
172        self.event_types = Some(event_types);
173        self
174    }
175
176    pub fn with_limit(mut self, limit: usize) -> Self {
177        self.limit = Some(limit);
178        self
179    }
180}
181
182/// Trait for event store implementations
183#[async_trait::async_trait]
184pub trait EventStore {
185    /// Append events to the store
186    async fn append_events(&self, events: Vec<DomainEvent>) -> Result<()>;
187
188    /// Read events from the store
189    async fn read_events(&self, query: EventStreamQuery) -> Result<Vec<DomainEvent>>;
190
191    /// Get the latest sequence number for an aggregate
192    async fn get_latest_sequence(
193        &self,
194        aggregate_id: &AggregateId,
195    ) -> Result<Option<EventSequence>>;
196
197    /// Save a snapshot
198    async fn save_snapshot(&self, snapshot: AggregateSnapshot) -> Result<()>;
199
200    /// Load the latest snapshot for an aggregate
201    async fn load_snapshot(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateSnapshot>>;
202
203    /// Check if aggregate exists
204    async fn aggregate_exists(&self, aggregate_id: &AggregateId) -> Result<bool>;
205}
206
207/// In-memory event store implementation (for testing/development)
208#[derive(Debug)]
209pub struct InMemoryEventStore {
210    events: Arc<Mutex<HashMap<AggregateId, Vec<DomainEvent>>>>,
211    snapshots: Arc<Mutex<HashMap<AggregateId, AggregateSnapshot>>>,
212}
213
214impl InMemoryEventStore {
215    pub fn new() -> Self {
216        Self {
217            events: Arc::new(Mutex::new(HashMap::new())),
218            snapshots: Arc::new(Mutex::new(HashMap::new())),
219        }
220    }
221
222    pub fn event_count(&self) -> usize {
223        self.events
224            .lock()
225            .unwrap()
226            .values()
227            .map(|events| events.len())
228            .sum()
229    }
230
231    pub fn snapshot_count(&self) -> usize {
232        self.snapshots.lock().unwrap().len()
233    }
234}
235
236impl Default for InMemoryEventStore {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242#[async_trait::async_trait]
243impl EventStore for InMemoryEventStore {
244    async fn append_events(&self, events: Vec<DomainEvent>) -> Result<()> {
245        let mut store = self.events.lock().unwrap();
246
247        for event in events {
248            let aggregate_id = event.aggregate_id.clone();
249
250            debug!(
251                aggregate_id = %aggregate_id,
252                event_type = %event.event_type,
253                sequence = event.sequence.value(),
254                "Appending event to store"
255            );
256
257            let aggregate_events = store.entry(aggregate_id).or_default();
258
259            // Ensure sequence ordering
260            if let Some(last_event) = aggregate_events.last() {
261                if event.sequence.value() <= last_event.sequence.value() {
262                    return Err(RustRabbitError::EventSequenceError.into());
263                }
264            }
265
266            aggregate_events.push(event);
267        }
268
269        Ok(())
270    }
271
272    async fn read_events(&self, query: EventStreamQuery) -> Result<Vec<DomainEvent>> {
273        let store = self.events.lock().unwrap();
274
275        let aggregate_events = store
276            .get(&query.aggregate_id)
277            .map(|events| events.as_slice())
278            .unwrap_or(&[]);
279
280        let mut filtered_events: Vec<DomainEvent> = aggregate_events
281            .iter()
282            .filter(|event| {
283                // Filter by sequence range
284                if let Some(from_seq) = query.from_sequence {
285                    if event.sequence < from_seq {
286                        return false;
287                    }
288                }
289                if let Some(to_seq) = query.to_sequence {
290                    if event.sequence > to_seq {
291                        return false;
292                    }
293                }
294
295                // Filter by event types
296                if let Some(ref event_types) = query.event_types {
297                    if !event_types.contains(&event.event_type) {
298                        return false;
299                    }
300                }
301
302                true
303            })
304            .cloned()
305            .collect();
306
307        // Apply limit
308        if let Some(limit) = query.limit {
309            filtered_events.truncate(limit);
310        }
311
312        debug!(
313            aggregate_id = %query.aggregate_id,
314            event_count = filtered_events.len(),
315            "Read events from store"
316        );
317
318        Ok(filtered_events)
319    }
320
321    async fn get_latest_sequence(
322        &self,
323        aggregate_id: &AggregateId,
324    ) -> Result<Option<EventSequence>> {
325        let store = self.events.lock().unwrap();
326
327        let latest_sequence = store
328            .get(aggregate_id)
329            .and_then(|events| events.last())
330            .map(|event| event.sequence);
331
332        Ok(latest_sequence)
333    }
334
335    async fn save_snapshot(&self, snapshot: AggregateSnapshot) -> Result<()> {
336        let mut store = self.snapshots.lock().unwrap();
337
338        debug!(
339            aggregate_id = %snapshot.aggregate_id,
340            sequence = snapshot.sequence.value(),
341            "Saving snapshot"
342        );
343
344        store.insert(snapshot.aggregate_id.clone(), snapshot);
345        Ok(())
346    }
347
348    async fn load_snapshot(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateSnapshot>> {
349        let store = self.snapshots.lock().unwrap();
350        Ok(store.get(aggregate_id).cloned())
351    }
352
353    async fn aggregate_exists(&self, aggregate_id: &AggregateId) -> Result<bool> {
354        let store = self.events.lock().unwrap();
355        Ok(store.contains_key(aggregate_id))
356    }
357}
358
359/// Event sourcing repository for managing aggregates
360pub struct EventSourcingRepository<T> {
361    event_store: Arc<dyn EventStore + Send + Sync>,
362    snapshot_frequency: u64,
363    _phantom: std::marker::PhantomData<T>,
364}
365
366impl<T> EventSourcingRepository<T>
367where
368    T: AggregateRoot + Send + Sync,
369{
370    pub fn new(event_store: Arc<dyn EventStore + Send + Sync>) -> Self {
371        Self {
372            event_store,
373            snapshot_frequency: 100, // Take snapshot every 100 events
374            _phantom: std::marker::PhantomData,
375        }
376    }
377
378    pub fn with_snapshot_frequency(mut self, frequency: u64) -> Self {
379        self.snapshot_frequency = frequency;
380        self
381    }
382
383    /// Load aggregate from event store
384    pub async fn load(&self, aggregate_id: &AggregateId) -> Result<Option<T>> {
385        info!(aggregate_id = %aggregate_id, "Loading aggregate");
386
387        // Try to load from snapshot first
388        let (aggregate, from_sequence) =
389            if let Some(snapshot) = self.event_store.load_snapshot(aggregate_id).await? {
390                debug!(
391                    aggregate_id = %aggregate_id,
392                    sequence = snapshot.sequence.value(),
393                    "Loaded from snapshot"
394                );
395
396                let aggregate = T::from_snapshot(snapshot)?;
397                let from_sequence = aggregate.sequence().next();
398                (Some(aggregate), Some(from_sequence))
399            } else {
400                (None, None)
401            };
402
403        // Load events since snapshot (or all events if no snapshot)
404        let query = EventStreamQuery::for_aggregate(aggregate_id.clone());
405        let query = if let Some(from_seq) = from_sequence {
406            query.from_sequence(from_seq)
407        } else {
408            query
409        };
410
411        let events = self.event_store.read_events(query).await?;
412
413        if events.is_empty() && aggregate.is_none() {
414            return Ok(None);
415        }
416
417        // Apply events to aggregate
418        let mut final_aggregate = aggregate.unwrap_or_else(|| T::new(aggregate_id.clone()));
419
420        for event in events {
421            final_aggregate.apply_event(&event)?;
422        }
423
424        debug!(
425            aggregate_id = %aggregate_id,
426            sequence = final_aggregate.sequence().value(),
427            "Aggregate loaded successfully"
428        );
429
430        Ok(Some(final_aggregate))
431    }
432
433    /// Save aggregate to event store
434    pub async fn save(&self, aggregate: &mut T) -> Result<()> {
435        let uncommitted_events = aggregate.uncommitted_events();
436
437        if uncommitted_events.is_empty() {
438            debug!(aggregate_id = %aggregate.id(), "No uncommitted events to save");
439            return Ok(());
440        }
441
442        info!(
443            aggregate_id = %aggregate.id(),
444            event_count = uncommitted_events.len(),
445            "Saving aggregate"
446        );
447
448        // Append events to store
449        self.event_store
450            .append_events(uncommitted_events.clone())
451            .await?;
452
453        // Take snapshot if needed
454        if aggregate
455            .sequence()
456            .value()
457            .is_multiple_of(self.snapshot_frequency)
458        {
459            let snapshot = aggregate.create_snapshot()?;
460            self.event_store.save_snapshot(snapshot).await?;
461
462            debug!(
463                aggregate_id = %aggregate.id(),
464                sequence = aggregate.sequence().value(),
465                "Snapshot created"
466            );
467        }
468
469        // Mark events as committed
470        aggregate.mark_events_committed();
471
472        debug!(
473            aggregate_id = %aggregate.id(),
474            sequence = aggregate.sequence().value(),
475            "Aggregate saved successfully"
476        );
477
478        Ok(())
479    }
480
481    /// Check if aggregate exists
482    pub async fn exists(&self, aggregate_id: &AggregateId) -> Result<bool> {
483        self.event_store.aggregate_exists(aggregate_id).await
484    }
485}
486
487/// Trait for aggregate roots in event sourcing
488pub trait AggregateRoot {
489    /// Create new aggregate with given ID
490    fn new(id: AggregateId) -> Self;
491
492    /// Get aggregate ID
493    fn id(&self) -> &AggregateId;
494
495    /// Get current sequence number
496    fn sequence(&self) -> EventSequence;
497
498    /// Apply an event to the aggregate
499    fn apply_event(&mut self, event: &DomainEvent) -> Result<()>;
500
501    /// Get uncommitted events
502    fn uncommitted_events(&self) -> Vec<DomainEvent>;
503
504    /// Mark events as committed
505    fn mark_events_committed(&mut self);
506
507    /// Create snapshot of current state
508    fn create_snapshot(&self) -> Result<AggregateSnapshot>;
509
510    /// Restore from snapshot
511    fn from_snapshot(snapshot: AggregateSnapshot) -> Result<Self>
512    where
513        Self: Sized;
514}
515
516/// Event replay service for rebuilding projections
517pub struct EventReplayService {
518    event_store: Arc<dyn EventStore + Send + Sync>,
519}
520
521impl EventReplayService {
522    pub fn new(event_store: Arc<dyn EventStore + Send + Sync>) -> Self {
523        Self { event_store }
524    }
525
526    /// Replay all events for an aggregate
527    pub async fn replay_aggregate(&self, aggregate_id: &AggregateId) -> Result<Vec<DomainEvent>> {
528        info!(aggregate_id = %aggregate_id, "Starting event replay");
529
530        let query = EventStreamQuery::for_aggregate(aggregate_id.clone());
531        let events = self.event_store.read_events(query).await?;
532
533        info!(
534            aggregate_id = %aggregate_id,
535            event_count = events.len(),
536            "Event replay completed"
537        );
538
539        Ok(events)
540    }
541
542    /// Replay events within a sequence range
543    pub async fn replay_range(
544        &self,
545        aggregate_id: &AggregateId,
546        from_sequence: EventSequence,
547        to_sequence: EventSequence,
548    ) -> Result<Vec<DomainEvent>> {
549        info!(
550            aggregate_id = %aggregate_id,
551            from_sequence = from_sequence.value(),
552            to_sequence = to_sequence.value(),
553            "Starting event replay for range"
554        );
555
556        let query = EventStreamQuery::for_aggregate(aggregate_id.clone())
557            .from_sequence(from_sequence)
558            .to_sequence(to_sequence);
559
560        let events = self.event_store.read_events(query).await?;
561
562        info!(
563            aggregate_id = %aggregate_id,
564            event_count = events.len(),
565            "Event replay range completed"
566        );
567
568        Ok(events)
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575
576    #[derive(Debug, Clone)]
577    struct TestAggregate {
578        id: AggregateId,
579        sequence: EventSequence,
580        value: String,
581        uncommitted_events: Vec<DomainEvent>,
582    }
583
584    impl AggregateRoot for TestAggregate {
585        fn new(id: AggregateId) -> Self {
586            Self {
587                id,
588                sequence: EventSequence::new(0),
589                value: String::new(),
590                uncommitted_events: Vec::new(),
591            }
592        }
593
594        fn id(&self) -> &AggregateId {
595            &self.id
596        }
597
598        fn sequence(&self) -> EventSequence {
599            self.sequence
600        }
601
602        fn apply_event(&mut self, event: &DomainEvent) -> Result<()> {
603            match event.event_type.as_str() {
604                "ValueChanged" => {
605                    self.value = String::from_utf8_lossy(&event.event_data).to_string();
606                    self.sequence = event.sequence;
607                }
608                _ => return Err(RustRabbitError::UnknownEventType(event.event_type.clone()).into()),
609            }
610            Ok(())
611        }
612
613        fn uncommitted_events(&self) -> Vec<DomainEvent> {
614            self.uncommitted_events.clone()
615        }
616
617        fn mark_events_committed(&mut self) {
618            self.uncommitted_events.clear();
619        }
620
621        fn create_snapshot(&self) -> Result<AggregateSnapshot> {
622            Ok(AggregateSnapshot::new(
623                self.id.clone(),
624                "TestAggregate".to_string(),
625                self.sequence,
626                self.value.as_bytes().to_vec(),
627            ))
628        }
629
630        fn from_snapshot(snapshot: AggregateSnapshot) -> Result<Self> {
631            Ok(Self {
632                id: snapshot.aggregate_id,
633                sequence: snapshot.sequence,
634                value: String::from_utf8_lossy(&snapshot.data).to_string(),
635                uncommitted_events: Vec::new(),
636            })
637        }
638    }
639
640    impl TestAggregate {
641        fn change_value(&mut self, new_value: String) {
642            let event = DomainEvent::new(
643                self.id.clone(),
644                "TestAggregate".to_string(),
645                "ValueChanged".to_string(),
646                new_value.as_bytes().to_vec(),
647                self.sequence.next(),
648            );
649
650            self.apply_event(&event).unwrap();
651            self.uncommitted_events.push(event);
652        }
653    }
654
655    #[tokio::test]
656    async fn test_aggregate_id_generation() {
657        let id1 = AggregateId::new();
658        let id2 = AggregateId::new();
659        assert_ne!(id1, id2);
660    }
661
662    #[tokio::test]
663    async fn test_event_sequence() {
664        let seq1 = EventSequence::new(1);
665        let seq2 = seq1.next();
666        assert_eq!(seq2.value(), 2);
667        assert!(seq2 > seq1);
668    }
669
670    #[tokio::test]
671    async fn test_in_memory_event_store() {
672        let store = InMemoryEventStore::new();
673        let aggregate_id = AggregateId::new();
674
675        let event = DomainEvent::new(
676            aggregate_id.clone(),
677            "TestAggregate".to_string(),
678            "TestEvent".to_string(),
679            b"test data".to_vec(),
680            EventSequence::new(1),
681        );
682
683        // Append event
684        store.append_events(vec![event.clone()]).await.unwrap();
685
686        // Read events
687        let query = EventStreamQuery::for_aggregate(aggregate_id.clone());
688        let events = store.read_events(query).await.unwrap();
689
690        assert_eq!(events.len(), 1);
691        assert_eq!(events[0].event_type, "TestEvent");
692
693        // Check latest sequence
694        let latest_seq = store.get_latest_sequence(&aggregate_id).await.unwrap();
695        assert_eq!(latest_seq, Some(EventSequence::new(1)));
696    }
697
698    #[tokio::test]
699    async fn test_event_sourcing_repository() {
700        let store = Arc::new(InMemoryEventStore::new());
701        let repo = EventSourcingRepository::<TestAggregate>::new(store.clone());
702
703        let aggregate_id = AggregateId::new();
704        let mut aggregate = TestAggregate::new(aggregate_id.clone());
705
706        // Modify aggregate
707        aggregate.change_value("Hello".to_string());
708        aggregate.change_value("World".to_string());
709
710        // Save aggregate
711        repo.save(&mut aggregate).await.unwrap();
712
713        // Load aggregate
714        let loaded_aggregate = repo.load(&aggregate_id).await.unwrap().unwrap();
715        assert_eq!(loaded_aggregate.value, "World");
716        assert_eq!(loaded_aggregate.sequence.value(), 2);
717    }
718
719    #[tokio::test]
720    async fn test_snapshot_functionality() {
721        let store = Arc::new(InMemoryEventStore::new());
722        let repo =
723            EventSourcingRepository::<TestAggregate>::new(store.clone()).with_snapshot_frequency(2); // Snapshot every 2 events
724
725        let aggregate_id = AggregateId::new();
726        let mut aggregate = TestAggregate::new(aggregate_id.clone());
727
728        // Generate events to trigger snapshot
729        aggregate.change_value("First".to_string());
730        aggregate.change_value("Second".to_string());
731
732        repo.save(&mut aggregate).await.unwrap();
733
734        // Should have created a snapshot
735        assert_eq!(store.snapshot_count(), 1);
736
737        // Load aggregate (should use snapshot)
738        let loaded_aggregate = repo.load(&aggregate_id).await.unwrap().unwrap();
739        assert_eq!(loaded_aggregate.value, "Second");
740        assert_eq!(loaded_aggregate.sequence.value(), 2);
741    }
742
743    #[tokio::test]
744    async fn test_event_replay_service() {
745        let store = Arc::new(InMemoryEventStore::new());
746        let repo = EventSourcingRepository::<TestAggregate>::new(store.clone());
747        let replay_service = EventReplayService::new(store);
748
749        let aggregate_id = AggregateId::new();
750        let mut aggregate = TestAggregate::new(aggregate_id.clone());
751
752        // Create some events
753        aggregate.change_value("Event1".to_string());
754        aggregate.change_value("Event2".to_string());
755        aggregate.change_value("Event3".to_string());
756
757        repo.save(&mut aggregate).await.unwrap();
758
759        // Replay all events
760        let replayed_events = replay_service
761            .replay_aggregate(&aggregate_id)
762            .await
763            .unwrap();
764        assert_eq!(replayed_events.len(), 3);
765
766        // Replay range
767        let range_events = replay_service
768            .replay_range(&aggregate_id, EventSequence::new(2), EventSequence::new(3))
769            .await
770            .unwrap();
771        assert_eq!(range_events.len(), 2);
772    }
773}