use std::collections::HashMap;
use std::marker::PhantomData;
use async_trait::async_trait;
use cqrs_es::{Aggregate, AggregateError, EventEnvelope, EventStore};
use crate::{EventStoreAggregateContext, PersistedEventRepository};
pub struct PersistedEventStore<R, A>
where
R: PersistedEventRepository<A>,
A: Aggregate + Send + Sync,
{
repo: R,
_phantom: PhantomData<A>,
}
impl<R, A> PersistedEventStore<R, A>
where
R: PersistedEventRepository<A>,
A: Aggregate + Send + Sync,
{
pub fn new(repo: R) -> Self {
PersistedEventStore {
repo,
_phantom: PhantomData,
}
}
}
#[async_trait]
impl<R, A> EventStore<A> for PersistedEventStore<R, A>
where
R: PersistedEventRepository<A>,
A: Aggregate + Send + Sync,
{
type AC = EventStoreAggregateContext<A>;
async fn load(&self, aggregate_id: &str) -> Vec<EventEnvelope<A>> {
match self.repo.get_events(aggregate_id).await {
Ok(val) => val,
Err(_err) => {
Default::default()
}
}
}
async fn load_aggregate(&self, aggregate_id: &str) -> EventStoreAggregateContext<A> {
let committed_events = self.load(aggregate_id).await;
let mut aggregate = A::default();
let mut current_sequence = 0;
for envelope in committed_events {
current_sequence = envelope.sequence;
let event = envelope.payload;
aggregate.apply(event);
}
EventStoreAggregateContext {
aggregate_id: aggregate_id.to_string(),
aggregate,
current_sequence,
}
}
async fn commit(
&self,
events: Vec<A::Event>,
context: EventStoreAggregateContext<A>,
metadata: HashMap<String, String>,
) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
let aggregate_id = context.aggregate_id.as_str();
let current_sequence = context.current_sequence;
let wrapped_events = self.wrap_events(aggregate_id, current_sequence, events, metadata);
self.repo.persist(&wrapped_events, None).await?;
Ok(wrapped_events)
}
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
use cqrs_es::EventStore;
use crate::{EventStoreAggregateContext, PersistedEventStore, PersistenceError};
use crate::snapshot_store::test::{EVENT_VERSION, MockRepo, TEST_AGGREGATE_ID, test_event_envelope, TestAggregate, TestEvents};
#[tokio::test]
async fn load() {
let repo = MockRepo::with_events(Ok(vec![test_event_envelope(
1,
TestEvents::SomethingWasDone,
)]));
let store = PersistedEventStore::new(repo);
let events = store.load(TEST_AGGREGATE_ID).await;
let event = events.get(0).unwrap();
assert_eq!(1, event.sequence);
assert_eq!("SomethingWasDone", event.event_type);
assert_eq!(EVENT_VERSION, event.event_version);
}
#[tokio::test]
async fn load_error() {
let repo = MockRepo::with_events(Err(PersistenceError::OptimisticLockError));
let store = PersistedEventStore::new(repo);
let events = store.load(TEST_AGGREGATE_ID).await;
assert_eq!(0, events.len())
}
#[tokio::test]
async fn load_aggregate_new() {
let repo = MockRepo::with_events(Ok(vec![]));
let store = PersistedEventStore::new(repo);
let agg_context = store.load_aggregate(TEST_AGGREGATE_ID).await;
assert_eq!(0, agg_context.current_sequence);
assert_eq!(TEST_AGGREGATE_ID, agg_context.aggregate_id);
assert_eq!(TestAggregate::default(), agg_context.aggregate);
}
#[tokio::test]
async fn load_aggregate_existing() {
let repo = MockRepo::with_events(Ok(vec![test_event_envelope(
1,
TestEvents::Started,
),test_event_envelope(
2,
TestEvents::SomethingWasDone,
)]));
let store = PersistedEventStore::new(repo);
let snapshot_context = store.load_aggregate(TEST_AGGREGATE_ID).await;
assert_eq!(2, snapshot_context.current_sequence);
assert_eq!(TEST_AGGREGATE_ID, snapshot_context.aggregate_id);
assert_eq!(
TestAggregate {
something_happened: 1
},
snapshot_context.aggregate
);
}
#[tokio::test]
#[should_panic]
async fn load_aggregate_error() {
let repo = MockRepo::with_snapshot(Err(PersistenceError::OptimisticLockError));
let store = PersistedEventStore::new(repo);
store.load_aggregate(TEST_AGGREGATE_ID).await;
}
#[tokio::test]
async fn commit() {
let repo = MockRepo::with_commit(Box::new(|events, snapshot_update| {
assert_eq!(3, events.len());
let event = events.get(2).unwrap();
assert_eq!(TEST_AGGREGATE_ID, event.aggregate_id);
assert_eq!(3, event.sequence);
assert!(snapshot_update.is_none());
}));
let store = PersistedEventStore::new(repo);
let context = EventStoreAggregateContext {
aggregate_id: TEST_AGGREGATE_ID.to_string(),
aggregate: TestAggregate::default(),
current_sequence: 0,
};
let event_envelopes = store
.commit(
vec![
TestEvents::Started,
TestEvents::SomethingWasDone,
TestEvents::SomethingWasDone,
],
context,
HashMap::default(),
)
.await
.unwrap();
assert_eq!(3, event_envelopes.len());
let event = event_envelopes.get(0).unwrap();
assert_eq!(TEST_AGGREGATE_ID, event.aggregate_id);
assert_eq!(TestEvents::Started, event.payload);
assert_eq!(TestEvents::SomethingWasDone, event_envelopes.get(2).unwrap().payload);
}
}