cqrs 0.3.1

An event-sourced command-query system
Documentation
use std::{
    cell::RefCell,
    collections::{hash_map::Entry, HashMap},
};

use cqrs::{
    AggregateId, EventNumber, EventSink, EventSource, Precondition, Since, Version, VersionedEvent,
};
use cqrs_todo_core::{TodoAggregate, TodoEvent, TodoIdRef, TodoMetadata};
use void::Void;

#[derive(Debug)]
struct EventMap(RefCell<HashMap<String, Vec<cqrs::VersionedEvent<cqrs_todo_core::TodoEvent>>>>);

impl EventSource<TodoAggregate, TodoEvent> for EventMap {
    type Error = Void;
    type Events = Vec<VersionedEvent<TodoEvent>>;

    fn read_events<I>(
        &self,
        id: &I,
        since: Since,
        max_count: Option<u64>,
    ) -> Result<Option<Self::Events>, Self::Error>
    where
        I: AggregateId<TodoAggregate>,
    {
        let borrow = self.0.borrow();
        let stream = borrow.get(id.as_str());
        match (since, max_count) {
            (Since::BeginningOfStream, Some(max_count)) => Ok(stream.map(|e| {
                e.into_iter()
                    .take(max_count.min(usize::max_value() as u64) as usize)
                    .map(ToOwned::to_owned)
                    .collect()
            })),
            (Since::Event(event_number), Some(max_count)) => Ok(stream.map(|e| {
                e.into_iter()
                    .skip(event_number.get() as usize)
                    .take(max_count.min(usize::max_value() as u64) as usize)
                    .map(ToOwned::to_owned)
                    .collect()
            })),
            (Since::BeginningOfStream, None) => {
                Ok(stream.map(|e| e.into_iter().map(ToOwned::to_owned).collect()))
            }
            (Since::Event(event_number), None) => Ok(stream.map(|e| {
                e.into_iter()
                    .skip(event_number.get() as usize)
                    .map(ToOwned::to_owned)
                    .collect()
            })),
        }
    }
}

impl EventSink<TodoAggregate, TodoEvent, TodoMetadata> for EventMap {
    type Error = Void;

    fn append_events<I>(
        &self,
        id: &I,
        events: &[TodoEvent],
        precondition: Option<Precondition>,
        _metadata: TodoMetadata,
    ) -> Result<EventNumber, Self::Error>
    where
        I: AggregateId<TodoAggregate>,
    {
        let mut borrow = self.0.borrow_mut();
        let entry = borrow.entry(id.as_str().into());

        match entry {
            Entry::Occupied(_) if precondition == Some(Precondition::New) => {
                panic!("Need error type here")
            }
            Entry::Vacant(_) => {
                if let Some(Precondition::ExpectedVersion(_)) = precondition {
                    panic!("Need error type here")
                }
            }
            _ => {}
        }

        let stream = entry.or_insert_with(Vec::default);
        let mut sequence = Version::new(stream.len() as u64);
        match precondition {
            Some(Precondition::ExpectedVersion(evt)) => {
                if evt != sequence {
                    panic!("Need error type here")
                }
            }
            Some(Precondition::New) => {
                if sequence != Version::Initial {
                    panic!("Need error type here")
                }
            }
            _ => {}
        }

        sequence.incr();
        stream.reserve(events.len());
        let initial = sequence;
        for event in events {
            stream.push(VersionedEvent {
                sequence: sequence.event_number().unwrap(),
                event: event.to_owned(),
            });
            sequence.incr();
        }

        Ok(initial.event_number().unwrap())
    }
}

#[test]
fn main_test() {
    let em = EventMap(RefCell::new(HashMap::default()));
    let id = TodoIdRef("test");
    let metadata = TodoMetadata {
        initiated_by: String::from("test"),
    };

    assert_eq!(
        em.read_events(&id, Since::BeginningOfStream, None),
        Ok(None)
    );
    let event_num = em
        .append_events(
            &id,
            &[cqrs_todo_core::TodoEvent::Completed(
                cqrs_todo_core::events::Completed {},
            )],
            Some(Precondition::New),
            metadata.clone(),
        )
        .unwrap();
    assert_eq!(event_num, EventNumber::MIN_VALUE);
    let event_num = em
        .append_events(
            &id,
            &[cqrs_todo_core::TodoEvent::Uncompleted(
                cqrs_todo_core::events::Uncompleted {},
            )],
            Some(Precondition::ExpectedVersion(Version::Number(
                EventNumber::MIN_VALUE,
            ))),
            metadata,
        )
        .unwrap();
    assert_eq!(event_num, EventNumber::MIN_VALUE.next());

    let expected_events = vec![
        VersionedEvent {
            sequence: EventNumber::MIN_VALUE,
            event: cqrs_todo_core::TodoEvent::Completed(cqrs_todo_core::events::Completed {}),
        },
        VersionedEvent {
            sequence: EventNumber::MIN_VALUE.next(),
            event: cqrs_todo_core::TodoEvent::Uncompleted(cqrs_todo_core::events::Uncompleted {}),
        },
    ];

    assert_eq!(
        em.read_events(&id, Since::BeginningOfStream, None),
        Ok(Some(expected_events.clone()))
    );
    assert_eq!(
        em.read_events(&id, Since::Event(EventNumber::MIN_VALUE), None),
        Ok(Some(expected_events[1..].to_owned()))
    );
    assert_eq!(
        em.read_events(&id, Since::Event(EventNumber::MIN_VALUE.next()), None),
        Ok(Some(Vec::default()))
    );
}