Skip to main content

cqrs_rust_lib/
event_store.rs

1use crate::errors::CqrsError;
2use crate::es::storage::EventStream;
3use crate::snapshot::Snapshot;
4use crate::{Aggregate, CqrsContext, EventEnvelope};
5use futures::StreamExt;
6use std::collections::HashMap;
7use std::sync::Arc;
8
9#[cfg(not(target_arch = "wasm32"))]
10pub type DynEventStore<A> = Arc<dyn EventStore<A> + Send + Sync + 'static>;
11#[cfg(target_arch = "wasm32")]
12pub type DynEventStore<A> = Arc<dyn EventStore<A> + 'static>;
13
14cqrs_async_trait! {
15pub trait EventStore<A>
16where
17    A: Aggregate + 'static,
18{
19    async fn load_snapshot(&self, aggregate_id: &str) -> Result<Option<Snapshot<A>>, CqrsError>;
20
21    async fn load_events_from_version(
22        &self,
23        aggregate_id: &str,
24        version: usize,
25    ) -> Result<EventStream<A>, CqrsError>;
26
27    async fn load_events(&self, aggregate_id: &str) -> Result<EventStream<A>, CqrsError>;
28
29    async fn load_events_paged(
30        &self,
31        aggregate_id: &str,
32        page: usize,
33        page_size: usize,
34    ) -> Result<(Vec<EventEnvelope<A>>, i64), CqrsError>;
35
36    async fn initialize_aggregate(&self, aggregate_id: &str) -> Result<(A, usize), CqrsError> {
37        let maybe_snapshot = self.load_snapshot(aggregate_id).await?;
38        if maybe_snapshot.is_some() {
39            return Err(CqrsError::aggregate_already_exists(aggregate_id));
40        }
41        Ok((A::default().with_aggregate_id(aggregate_id.to_string()), 0))
42    }
43
44    async fn load_aggregate(&self, aggregate_id: &str) -> Result<(A, usize), CqrsError> {
45        let maybe_snapshot = self.load_snapshot(aggregate_id).await?;
46        if maybe_snapshot.is_none() {
47            return Err(CqrsError::aggregate_not_found(aggregate_id));
48        }
49        let snapshot = maybe_snapshot.unwrap();
50        let mut agg = snapshot.state;
51        let version = snapshot.version;
52
53        let mut latest_version = version;
54        let mut event_stream = self.load_events_from_version(aggregate_id, version).await?;
55        while let Some(event) = event_stream.next().await {
56            let event = event?;
57            agg.apply(event.payload).map_err(CqrsError::user_error)?;
58            latest_version = event.version;
59        }
60        Ok((agg, latest_version))
61    }
62
63    async fn commit(
64        &self,
65        events: Vec<A::Event>,
66        aggregate: &A,
67        metadata: HashMap<String, String>,
68        version: usize,
69        context: &CqrsContext,
70    ) -> Result<Vec<EventEnvelope<A>>, CqrsError>;
71}
72}