cqrs_rust_lib/
event_store.rs1use crate::errors::CqrsError;
2use crate::es::storage::EventStream;
3use crate::snapshot::Snapshot;
4use crate::{Aggregate, CqrsContext, EventEnvelope};
5use futures::StreamExt;
6use std::collections::HashMap;
7use std::sync::Arc;
8
9#[cfg(not(target_arch = "wasm32"))]
10pub type DynEventStore<A> = Arc<dyn EventStore<A> + Send + Sync + 'static>;
11#[cfg(target_arch = "wasm32")]
12pub type DynEventStore<A> = Arc<dyn EventStore<A> + 'static>;
13
14cqrs_async_trait! {
15pub trait EventStore<A>
16where
17 A: Aggregate + 'static,
18{
19 async fn load_snapshot(&self, aggregate_id: &str) -> Result<Option<Snapshot<A>>, CqrsError>;
20
21 async fn load_events_from_version(
22 &self,
23 aggregate_id: &str,
24 version: usize,
25 ) -> Result<EventStream<A>, CqrsError>;
26
27 async fn load_events(&self, aggregate_id: &str) -> Result<EventStream<A>, CqrsError>;
28
29 async fn load_events_paged(
30 &self,
31 aggregate_id: &str,
32 page: usize,
33 page_size: usize,
34 ) -> Result<(Vec<EventEnvelope<A>>, i64), CqrsError>;
35
36 async fn initialize_aggregate(&self, aggregate_id: &str) -> Result<(A, usize), CqrsError> {
37 let maybe_snapshot = self.load_snapshot(aggregate_id).await?;
38 if maybe_snapshot.is_some() {
39 return Err(CqrsError::aggregate_already_exists(aggregate_id));
40 }
41 Ok((A::default().with_aggregate_id(aggregate_id.to_string()), 0))
42 }
43
44 async fn load_aggregate(&self, aggregate_id: &str) -> Result<(A, usize), CqrsError> {
45 let maybe_snapshot = self.load_snapshot(aggregate_id).await?;
46 if maybe_snapshot.is_none() {
47 return Err(CqrsError::aggregate_not_found(aggregate_id));
48 }
49 let snapshot = maybe_snapshot.unwrap();
50 let mut agg = snapshot.state;
51 let version = snapshot.version;
52
53 let mut latest_version = version;
54 let mut event_stream = self.load_events_from_version(aggregate_id, version).await?;
55 while let Some(event) = event_stream.next().await {
56 let event = event?;
57 agg.apply(event.payload).map_err(CqrsError::user_error)?;
58 latest_version = event.version;
59 }
60 Ok((agg, latest_version))
61 }
62
63 async fn commit(
64 &self,
65 events: Vec<A::Event>,
66 aggregate: &A,
67 metadata: HashMap<String, String>,
68 version: usize,
69 context: &CqrsContext,
70 ) -> Result<Vec<EventEnvelope<A>>, CqrsError>;
71}
72}