1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use tracing::{debug, info};
7use uuid::Uuid;
8
9use crate::error::RustRabbitError;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub struct AggregateId(String);
14
15impl AggregateId {
16 pub fn new() -> Self {
17 Self(Uuid::new_v4().to_string())
18 }
19
20 pub fn from_string(id: String) -> Self {
21 Self(id)
22 }
23
24 pub fn as_str(&self) -> &str {
25 &self.0
26 }
27}
28
29impl Default for AggregateId {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl std::fmt::Display for AggregateId {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 write!(f, "{}", self.0)
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
43pub struct EventSequence(u64);
44
45impl EventSequence {
46 pub fn new(sequence: u64) -> Self {
47 Self(sequence)
48 }
49
50 pub fn next(&self) -> Self {
51 Self(self.0 + 1)
52 }
53
54 pub fn value(&self) -> u64 {
55 self.0
56 }
57}
58
59impl From<u64> for EventSequence {
60 fn from(value: u64) -> Self {
61 Self(value)
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct DomainEvent {
68 pub event_id: String,
69 pub aggregate_id: AggregateId,
70 pub aggregate_type: String,
71 pub event_type: String,
72 pub event_data: Vec<u8>,
73 pub metadata: HashMap<String, String>,
74 pub sequence: EventSequence,
75 pub timestamp: DateTime<Utc>,
76 pub version: u32,
77}
78
79impl DomainEvent {
80 pub fn new(
81 aggregate_id: AggregateId,
82 aggregate_type: String,
83 event_type: String,
84 event_data: Vec<u8>,
85 sequence: EventSequence,
86 ) -> Self {
87 Self {
88 event_id: Uuid::new_v4().to_string(),
89 aggregate_id,
90 aggregate_type,
91 event_type,
92 event_data,
93 metadata: HashMap::new(),
94 sequence,
95 timestamp: Utc::now(),
96 version: 1,
97 }
98 }
99
100 pub fn with_metadata(mut self, key: String, value: String) -> Self {
101 self.metadata.insert(key, value);
102 self
103 }
104
105 pub fn with_version(mut self, version: u32) -> Self {
106 self.version = version;
107 self
108 }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct AggregateSnapshot {
114 pub aggregate_id: AggregateId,
115 pub aggregate_type: String,
116 pub sequence: EventSequence,
117 pub data: Vec<u8>,
118 pub timestamp: DateTime<Utc>,
119 pub version: u32,
120}
121
122impl AggregateSnapshot {
123 pub fn new(
124 aggregate_id: AggregateId,
125 aggregate_type: String,
126 sequence: EventSequence,
127 data: Vec<u8>,
128 ) -> Self {
129 Self {
130 aggregate_id,
131 aggregate_type,
132 sequence,
133 data,
134 timestamp: Utc::now(),
135 version: 1,
136 }
137 }
138}
139
140#[derive(Debug, Clone)]
142pub struct EventStreamQuery {
143 pub aggregate_id: AggregateId,
144 pub from_sequence: Option<EventSequence>,
145 pub to_sequence: Option<EventSequence>,
146 pub event_types: Option<Vec<String>>,
147 pub limit: Option<usize>,
148}
149
150impl EventStreamQuery {
151 pub fn for_aggregate(aggregate_id: AggregateId) -> Self {
152 Self {
153 aggregate_id,
154 from_sequence: None,
155 to_sequence: None,
156 event_types: None,
157 limit: None,
158 }
159 }
160
161 pub fn from_sequence(mut self, sequence: EventSequence) -> Self {
162 self.from_sequence = Some(sequence);
163 self
164 }
165
166 pub fn to_sequence(mut self, sequence: EventSequence) -> Self {
167 self.to_sequence = Some(sequence);
168 self
169 }
170
171 pub fn with_event_types(mut self, event_types: Vec<String>) -> Self {
172 self.event_types = Some(event_types);
173 self
174 }
175
176 pub fn with_limit(mut self, limit: usize) -> Self {
177 self.limit = Some(limit);
178 self
179 }
180}
181
182#[async_trait::async_trait]
184pub trait EventStore {
185 async fn append_events(&self, events: Vec<DomainEvent>) -> Result<()>;
187
188 async fn read_events(&self, query: EventStreamQuery) -> Result<Vec<DomainEvent>>;
190
191 async fn get_latest_sequence(
193 &self,
194 aggregate_id: &AggregateId,
195 ) -> Result<Option<EventSequence>>;
196
197 async fn save_snapshot(&self, snapshot: AggregateSnapshot) -> Result<()>;
199
200 async fn load_snapshot(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateSnapshot>>;
202
203 async fn aggregate_exists(&self, aggregate_id: &AggregateId) -> Result<bool>;
205}
206
207#[derive(Debug)]
209pub struct InMemoryEventStore {
210 events: Arc<Mutex<HashMap<AggregateId, Vec<DomainEvent>>>>,
211 snapshots: Arc<Mutex<HashMap<AggregateId, AggregateSnapshot>>>,
212}
213
214impl InMemoryEventStore {
215 pub fn new() -> Self {
216 Self {
217 events: Arc::new(Mutex::new(HashMap::new())),
218 snapshots: Arc::new(Mutex::new(HashMap::new())),
219 }
220 }
221
222 pub fn event_count(&self) -> usize {
223 self.events
224 .lock()
225 .unwrap()
226 .values()
227 .map(|events| events.len())
228 .sum()
229 }
230
231 pub fn snapshot_count(&self) -> usize {
232 self.snapshots.lock().unwrap().len()
233 }
234}
235
236impl Default for InMemoryEventStore {
237 fn default() -> Self {
238 Self::new()
239 }
240}
241
242#[async_trait::async_trait]
243impl EventStore for InMemoryEventStore {
244 async fn append_events(&self, events: Vec<DomainEvent>) -> Result<()> {
245 let mut store = self.events.lock().unwrap();
246
247 for event in events {
248 let aggregate_id = event.aggregate_id.clone();
249
250 debug!(
251 aggregate_id = %aggregate_id,
252 event_type = %event.event_type,
253 sequence = event.sequence.value(),
254 "Appending event to store"
255 );
256
257 let aggregate_events = store.entry(aggregate_id).or_default();
258
259 if let Some(last_event) = aggregate_events.last() {
261 if event.sequence.value() <= last_event.sequence.value() {
262 return Err(RustRabbitError::EventSequenceError.into());
263 }
264 }
265
266 aggregate_events.push(event);
267 }
268
269 Ok(())
270 }
271
272 async fn read_events(&self, query: EventStreamQuery) -> Result<Vec<DomainEvent>> {
273 let store = self.events.lock().unwrap();
274
275 let aggregate_events = store
276 .get(&query.aggregate_id)
277 .map(|events| events.as_slice())
278 .unwrap_or(&[]);
279
280 let mut filtered_events: Vec<DomainEvent> = aggregate_events
281 .iter()
282 .filter(|event| {
283 if let Some(from_seq) = query.from_sequence {
285 if event.sequence < from_seq {
286 return false;
287 }
288 }
289 if let Some(to_seq) = query.to_sequence {
290 if event.sequence > to_seq {
291 return false;
292 }
293 }
294
295 if let Some(ref event_types) = query.event_types {
297 if !event_types.contains(&event.event_type) {
298 return false;
299 }
300 }
301
302 true
303 })
304 .cloned()
305 .collect();
306
307 if let Some(limit) = query.limit {
309 filtered_events.truncate(limit);
310 }
311
312 debug!(
313 aggregate_id = %query.aggregate_id,
314 event_count = filtered_events.len(),
315 "Read events from store"
316 );
317
318 Ok(filtered_events)
319 }
320
321 async fn get_latest_sequence(
322 &self,
323 aggregate_id: &AggregateId,
324 ) -> Result<Option<EventSequence>> {
325 let store = self.events.lock().unwrap();
326
327 let latest_sequence = store
328 .get(aggregate_id)
329 .and_then(|events| events.last())
330 .map(|event| event.sequence);
331
332 Ok(latest_sequence)
333 }
334
335 async fn save_snapshot(&self, snapshot: AggregateSnapshot) -> Result<()> {
336 let mut store = self.snapshots.lock().unwrap();
337
338 debug!(
339 aggregate_id = %snapshot.aggregate_id,
340 sequence = snapshot.sequence.value(),
341 "Saving snapshot"
342 );
343
344 store.insert(snapshot.aggregate_id.clone(), snapshot);
345 Ok(())
346 }
347
348 async fn load_snapshot(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateSnapshot>> {
349 let store = self.snapshots.lock().unwrap();
350 Ok(store.get(aggregate_id).cloned())
351 }
352
353 async fn aggregate_exists(&self, aggregate_id: &AggregateId) -> Result<bool> {
354 let store = self.events.lock().unwrap();
355 Ok(store.contains_key(aggregate_id))
356 }
357}
358
359pub struct EventSourcingRepository<T> {
361 event_store: Arc<dyn EventStore + Send + Sync>,
362 snapshot_frequency: u64,
363 _phantom: std::marker::PhantomData<T>,
364}
365
366impl<T> EventSourcingRepository<T>
367where
368 T: AggregateRoot + Send + Sync,
369{
370 pub fn new(event_store: Arc<dyn EventStore + Send + Sync>) -> Self {
371 Self {
372 event_store,
373 snapshot_frequency: 100, _phantom: std::marker::PhantomData,
375 }
376 }
377
378 pub fn with_snapshot_frequency(mut self, frequency: u64) -> Self {
379 self.snapshot_frequency = frequency;
380 self
381 }
382
383 pub async fn load(&self, aggregate_id: &AggregateId) -> Result<Option<T>> {
385 info!(aggregate_id = %aggregate_id, "Loading aggregate");
386
387 let (aggregate, from_sequence) =
389 if let Some(snapshot) = self.event_store.load_snapshot(aggregate_id).await? {
390 debug!(
391 aggregate_id = %aggregate_id,
392 sequence = snapshot.sequence.value(),
393 "Loaded from snapshot"
394 );
395
396 let aggregate = T::from_snapshot(snapshot)?;
397 let from_sequence = aggregate.sequence().next();
398 (Some(aggregate), Some(from_sequence))
399 } else {
400 (None, None)
401 };
402
403 let query = EventStreamQuery::for_aggregate(aggregate_id.clone());
405 let query = if let Some(from_seq) = from_sequence {
406 query.from_sequence(from_seq)
407 } else {
408 query
409 };
410
411 let events = self.event_store.read_events(query).await?;
412
413 if events.is_empty() && aggregate.is_none() {
414 return Ok(None);
415 }
416
417 let mut final_aggregate = aggregate.unwrap_or_else(|| T::new(aggregate_id.clone()));
419
420 for event in events {
421 final_aggregate.apply_event(&event)?;
422 }
423
424 debug!(
425 aggregate_id = %aggregate_id,
426 sequence = final_aggregate.sequence().value(),
427 "Aggregate loaded successfully"
428 );
429
430 Ok(Some(final_aggregate))
431 }
432
433 pub async fn save(&self, aggregate: &mut T) -> Result<()> {
435 let uncommitted_events = aggregate.uncommitted_events();
436
437 if uncommitted_events.is_empty() {
438 debug!(aggregate_id = %aggregate.id(), "No uncommitted events to save");
439 return Ok(());
440 }
441
442 info!(
443 aggregate_id = %aggregate.id(),
444 event_count = uncommitted_events.len(),
445 "Saving aggregate"
446 );
447
448 self.event_store
450 .append_events(uncommitted_events.clone())
451 .await?;
452
453 if aggregate
455 .sequence()
456 .value()
457 .is_multiple_of(self.snapshot_frequency)
458 {
459 let snapshot = aggregate.create_snapshot()?;
460 self.event_store.save_snapshot(snapshot).await?;
461
462 debug!(
463 aggregate_id = %aggregate.id(),
464 sequence = aggregate.sequence().value(),
465 "Snapshot created"
466 );
467 }
468
469 aggregate.mark_events_committed();
471
472 debug!(
473 aggregate_id = %aggregate.id(),
474 sequence = aggregate.sequence().value(),
475 "Aggregate saved successfully"
476 );
477
478 Ok(())
479 }
480
481 pub async fn exists(&self, aggregate_id: &AggregateId) -> Result<bool> {
483 self.event_store.aggregate_exists(aggregate_id).await
484 }
485}
486
487pub trait AggregateRoot {
489 fn new(id: AggregateId) -> Self;
491
492 fn id(&self) -> &AggregateId;
494
495 fn sequence(&self) -> EventSequence;
497
498 fn apply_event(&mut self, event: &DomainEvent) -> Result<()>;
500
501 fn uncommitted_events(&self) -> Vec<DomainEvent>;
503
504 fn mark_events_committed(&mut self);
506
507 fn create_snapshot(&self) -> Result<AggregateSnapshot>;
509
510 fn from_snapshot(snapshot: AggregateSnapshot) -> Result<Self>
512 where
513 Self: Sized;
514}
515
516pub struct EventReplayService {
518 event_store: Arc<dyn EventStore + Send + Sync>,
519}
520
521impl EventReplayService {
522 pub fn new(event_store: Arc<dyn EventStore + Send + Sync>) -> Self {
523 Self { event_store }
524 }
525
526 pub async fn replay_aggregate(&self, aggregate_id: &AggregateId) -> Result<Vec<DomainEvent>> {
528 info!(aggregate_id = %aggregate_id, "Starting event replay");
529
530 let query = EventStreamQuery::for_aggregate(aggregate_id.clone());
531 let events = self.event_store.read_events(query).await?;
532
533 info!(
534 aggregate_id = %aggregate_id,
535 event_count = events.len(),
536 "Event replay completed"
537 );
538
539 Ok(events)
540 }
541
542 pub async fn replay_range(
544 &self,
545 aggregate_id: &AggregateId,
546 from_sequence: EventSequence,
547 to_sequence: EventSequence,
548 ) -> Result<Vec<DomainEvent>> {
549 info!(
550 aggregate_id = %aggregate_id,
551 from_sequence = from_sequence.value(),
552 to_sequence = to_sequence.value(),
553 "Starting event replay for range"
554 );
555
556 let query = EventStreamQuery::for_aggregate(aggregate_id.clone())
557 .from_sequence(from_sequence)
558 .to_sequence(to_sequence);
559
560 let events = self.event_store.read_events(query).await?;
561
562 info!(
563 aggregate_id = %aggregate_id,
564 event_count = events.len(),
565 "Event replay range completed"
566 );
567
568 Ok(events)
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575
576 #[derive(Debug, Clone)]
577 struct TestAggregate {
578 id: AggregateId,
579 sequence: EventSequence,
580 value: String,
581 uncommitted_events: Vec<DomainEvent>,
582 }
583
584 impl AggregateRoot for TestAggregate {
585 fn new(id: AggregateId) -> Self {
586 Self {
587 id,
588 sequence: EventSequence::new(0),
589 value: String::new(),
590 uncommitted_events: Vec::new(),
591 }
592 }
593
594 fn id(&self) -> &AggregateId {
595 &self.id
596 }
597
598 fn sequence(&self) -> EventSequence {
599 self.sequence
600 }
601
602 fn apply_event(&mut self, event: &DomainEvent) -> Result<()> {
603 match event.event_type.as_str() {
604 "ValueChanged" => {
605 self.value = String::from_utf8_lossy(&event.event_data).to_string();
606 self.sequence = event.sequence;
607 }
608 _ => return Err(RustRabbitError::UnknownEventType(event.event_type.clone()).into()),
609 }
610 Ok(())
611 }
612
613 fn uncommitted_events(&self) -> Vec<DomainEvent> {
614 self.uncommitted_events.clone()
615 }
616
617 fn mark_events_committed(&mut self) {
618 self.uncommitted_events.clear();
619 }
620
621 fn create_snapshot(&self) -> Result<AggregateSnapshot> {
622 Ok(AggregateSnapshot::new(
623 self.id.clone(),
624 "TestAggregate".to_string(),
625 self.sequence,
626 self.value.as_bytes().to_vec(),
627 ))
628 }
629
630 fn from_snapshot(snapshot: AggregateSnapshot) -> Result<Self> {
631 Ok(Self {
632 id: snapshot.aggregate_id,
633 sequence: snapshot.sequence,
634 value: String::from_utf8_lossy(&snapshot.data).to_string(),
635 uncommitted_events: Vec::new(),
636 })
637 }
638 }
639
640 impl TestAggregate {
641 fn change_value(&mut self, new_value: String) {
642 let event = DomainEvent::new(
643 self.id.clone(),
644 "TestAggregate".to_string(),
645 "ValueChanged".to_string(),
646 new_value.as_bytes().to_vec(),
647 self.sequence.next(),
648 );
649
650 self.apply_event(&event).unwrap();
651 self.uncommitted_events.push(event);
652 }
653 }
654
655 #[tokio::test]
656 async fn test_aggregate_id_generation() {
657 let id1 = AggregateId::new();
658 let id2 = AggregateId::new();
659 assert_ne!(id1, id2);
660 }
661
662 #[tokio::test]
663 async fn test_event_sequence() {
664 let seq1 = EventSequence::new(1);
665 let seq2 = seq1.next();
666 assert_eq!(seq2.value(), 2);
667 assert!(seq2 > seq1);
668 }
669
670 #[tokio::test]
671 async fn test_in_memory_event_store() {
672 let store = InMemoryEventStore::new();
673 let aggregate_id = AggregateId::new();
674
675 let event = DomainEvent::new(
676 aggregate_id.clone(),
677 "TestAggregate".to_string(),
678 "TestEvent".to_string(),
679 b"test data".to_vec(),
680 EventSequence::new(1),
681 );
682
683 store.append_events(vec![event.clone()]).await.unwrap();
685
686 let query = EventStreamQuery::for_aggregate(aggregate_id.clone());
688 let events = store.read_events(query).await.unwrap();
689
690 assert_eq!(events.len(), 1);
691 assert_eq!(events[0].event_type, "TestEvent");
692
693 let latest_seq = store.get_latest_sequence(&aggregate_id).await.unwrap();
695 assert_eq!(latest_seq, Some(EventSequence::new(1)));
696 }
697
698 #[tokio::test]
699 async fn test_event_sourcing_repository() {
700 let store = Arc::new(InMemoryEventStore::new());
701 let repo = EventSourcingRepository::<TestAggregate>::new(store.clone());
702
703 let aggregate_id = AggregateId::new();
704 let mut aggregate = TestAggregate::new(aggregate_id.clone());
705
706 aggregate.change_value("Hello".to_string());
708 aggregate.change_value("World".to_string());
709
710 repo.save(&mut aggregate).await.unwrap();
712
713 let loaded_aggregate = repo.load(&aggregate_id).await.unwrap().unwrap();
715 assert_eq!(loaded_aggregate.value, "World");
716 assert_eq!(loaded_aggregate.sequence.value(), 2);
717 }
718
719 #[tokio::test]
720 async fn test_snapshot_functionality() {
721 let store = Arc::new(InMemoryEventStore::new());
722 let repo =
723 EventSourcingRepository::<TestAggregate>::new(store.clone()).with_snapshot_frequency(2); let aggregate_id = AggregateId::new();
726 let mut aggregate = TestAggregate::new(aggregate_id.clone());
727
728 aggregate.change_value("First".to_string());
730 aggregate.change_value("Second".to_string());
731
732 repo.save(&mut aggregate).await.unwrap();
733
734 assert_eq!(store.snapshot_count(), 1);
736
737 let loaded_aggregate = repo.load(&aggregate_id).await.unwrap().unwrap();
739 assert_eq!(loaded_aggregate.value, "Second");
740 assert_eq!(loaded_aggregate.sequence.value(), 2);
741 }
742
743 #[tokio::test]
744 async fn test_event_replay_service() {
745 let store = Arc::new(InMemoryEventStore::new());
746 let repo = EventSourcingRepository::<TestAggregate>::new(store.clone());
747 let replay_service = EventReplayService::new(store);
748
749 let aggregate_id = AggregateId::new();
750 let mut aggregate = TestAggregate::new(aggregate_id.clone());
751
752 aggregate.change_value("Event1".to_string());
754 aggregate.change_value("Event2".to_string());
755 aggregate.change_value("Event3".to_string());
756
757 repo.save(&mut aggregate).await.unwrap();
758
759 let replayed_events = replay_service
761 .replay_aggregate(&aggregate_id)
762 .await
763 .unwrap();
764 assert_eq!(replayed_events.len(), 3);
765
766 let range_events = replay_service
768 .replay_range(&aggregate_id, EventSequence::new(2), EventSequence::new(3))
769 .await
770 .unwrap();
771 assert_eq!(range_events.len(), 2);
772 }
773}