cqrs-rust-lib 0.8.0

An opinionated implementation of CQRS/Event Sourcing with pluggable storage backends (InMemory, PostgreSQL, MongoDB, SurrealDB)
Documentation
use crate::errors::CqrsError;
use crate::es::storage::EventStream;
use crate::snapshot::Snapshot;
use crate::{Aggregate, CqrsContext, EventEnvelope};
use futures::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;

#[cfg(not(target_arch = "wasm32"))]
pub type DynEventStore<A> = Arc<dyn EventStore<A> + Send + Sync + 'static>;
#[cfg(target_arch = "wasm32")]
pub type DynEventStore<A> = Arc<dyn EventStore<A> + 'static>;

cqrs_async_trait! {
pub trait EventStore<A>
where
    A: Aggregate + 'static,
{
    async fn load_snapshot(&self, aggregate_id: &str) -> Result<Option<Snapshot<A>>, CqrsError>;

    async fn load_events_from_version(
        &self,
        aggregate_id: &str,
        version: usize,
    ) -> Result<EventStream<A>, CqrsError>;

    async fn load_events(&self, aggregate_id: &str) -> Result<EventStream<A>, CqrsError>;

    async fn load_events_paged(
        &self,
        aggregate_id: &str,
        page: usize,
        page_size: usize,
    ) -> Result<(Vec<EventEnvelope<A>>, i64), CqrsError>;

    async fn initialize_aggregate(&self, aggregate_id: &str) -> Result<(A, usize), CqrsError> {
        let maybe_snapshot = self.load_snapshot(aggregate_id).await?;
        if maybe_snapshot.is_some() {
            return Err(CqrsError::aggregate_already_exists(aggregate_id));
        }
        Ok((A::default().with_aggregate_id(aggregate_id.to_string()), 0))
    }

    async fn load_aggregate(&self, aggregate_id: &str) -> Result<(A, usize), CqrsError> {
        let maybe_snapshot = self.load_snapshot(aggregate_id).await?;
        if maybe_snapshot.is_none() {
            return Err(CqrsError::aggregate_not_found(aggregate_id));
        }
        let snapshot = maybe_snapshot.unwrap();
        let mut agg = snapshot.state;
        let version = snapshot.version;

        let mut latest_version = version;
        let mut event_stream = self.load_events_from_version(aggregate_id, version).await?;
        while let Some(event) = event_stream.next().await {
            let event = event?;
            agg.apply(event.payload).map_err(CqrsError::user_error)?;
            latest_version = event.version;
        }
        Ok((agg, latest_version))
    }

    async fn commit(
        &self,
        events: Vec<A::Event>,
        aggregate: &A,
        metadata: HashMap<String, String>,
        version: usize,
        context: &CqrsContext,
    ) -> Result<Vec<EventEnvelope<A>>, CqrsError>;
}
}