eventmill 0.4.0

Event sourcing and CQRS for Rust applications
use crate::aggregate::{AggregateIdOf, WithAggregateId};
use crate::event::{DomainEvent, EventType, Sequence};
use crate::query::ReceiveEvent;
use crate::store::{EventSink, EventSource};
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::sync::{Arc, RwLock};

type EventMap<E, A> = HashMap<String, Vec<DomainEvent<E, A>>>;

pub type InMemoryStoreError = Error;

#[derive(thiserror::Error, Debug, PartialEq)]
pub enum Error {
    #[error("can not acquire read access, cause: {0}")]
    NoReadAccess(String),
    #[error("can not acquire write access, cause: {0}")]
    NoWriteAccess(String),
}

#[derive(Debug)]
pub struct InMemoryStore<E, A>
where
    A: WithAggregateId,
{
    events: Arc<RwLock<EventMap<E, A>>>,
}

impl<E, A> Default for InMemoryStore<E, A>
where
    A: WithAggregateId,
{
    fn default() -> Self {
        Self {
            events: Arc::new(RwLock::new(EventMap::new())),
        }
    }
}

impl<E, A> InMemoryStore<E, A>
where
    A: WithAggregateId,
    AggregateIdOf<A>: Display,
{
    pub fn new() -> Self {
        Self {
            events: Arc::new(RwLock::new(EventMap::new())),
        }
    }

    pub fn with_events(events: impl IntoIterator<Item = DomainEvent<E, A>>) -> Self {
        let mut event_map = EventMap::with_capacity(4);
        events.into_iter().for_each(|ev| {
            event_map
                .entry(ev.aggregate_id.to_string())
                .or_insert_with(|| Vec::with_capacity(4))
                .push(ev)
        });
        Self {
            events: Arc::new(RwLock::new(event_map)),
        }
    }
}

impl<E, A> EventSink<E, A> for InMemoryStore<E, A>
where
    E: EventType,
    A: WithAggregateId,
    AggregateIdOf<A>: Display,
{
    type Error = Error;

    fn append(&self, event: DomainEvent<E, A>) -> Result<(), Self::Error> {
        let mut event_map = self
            .events
            .write()
            .map_err(|err| Error::NoWriteAccess(err.to_string()))?;

        event_map
            .entry(event.aggregate_id.to_string())
            .or_insert_with(|| Vec::with_capacity(4))
            .push(event);
        Ok(())
    }

    fn append_batch(
        &self,
        events: impl IntoIterator<Item = DomainEvent<E, A>>,
    ) -> Result<(), Self::Error> {
        let mut event_map = self
            .events
            .write()
            .map_err(|err| Error::NoWriteAccess(err.to_string()))?;

        events.into_iter().for_each(|ev| {
            event_map
                .entry(ev.aggregate_id.to_string())
                .or_insert_with(|| Vec::with_capacity(4))
                .push(ev)
        });
        Ok(())
    }
}

impl<E, A> EventSource<E, A> for InMemoryStore<E, A>
where
    E: EventType,
    A: WithAggregateId,
    AggregateIdOf<A>: Display,
{
    type Error = Error;

    fn read<R>(
        &self,
        aggregate_id: &AggregateIdOf<A>,
        subscriber: &mut R,
    ) -> Result<(), Self::Error>
    where
        R: ReceiveEvent<E, A>,
    {
        let event_map = self
            .events
            .read()
            .map_err(|err| Error::NoReadAccess(err.to_string()))?;

        event_map
            .get(&aggregate_id.to_string())
            .into_iter()
            .for_each(|events| {
                events
                    .iter()
                    .for_each(|ev| subscriber.receive_event(ev.as_view()))
            });
        Ok(())
    }

    fn read_from_offset<R>(
        &self,
        aggregate_id: &AggregateIdOf<A>,
        offset: Sequence,
        subscriber: &mut R,
    ) -> Result<(), Self::Error>
    where
        R: ReceiveEvent<E, A>,
    {
        let event_map = self
            .events
            .read()
            .map_err(|err| Error::NoReadAccess(err.to_string()))?;

        event_map
            .get(&aggregate_id.to_string())
            .into_iter()
            .for_each(|events| {
                events
                    .iter()
                    .skip(offset.number() as usize)
                    .for_each(|ev| subscriber.receive_event(ev.as_view()))
            });
        Ok(())
    }
}