use std::ops::Deref;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::state::AggregateState;
use crate::types::SequenceNumber;
#[cfg(feature = "postgres")]
pub mod postgres;
pub trait UnlockOnDrop: Send + Sync + 'static {}
#[allow(dead_code)]
pub struct EventStoreLockGuard(Box<dyn UnlockOnDrop>);
impl EventStoreLockGuard {
#[must_use]
pub fn new(lock: impl UnlockOnDrop) -> Self {
Self(Box::new(lock))
}
}
#[async_trait]
pub trait EventStore {
type Aggregate: crate::Aggregate;
type Error: std::error::Error;
async fn lock(&self, aggregate_id: Uuid) -> Result<EventStoreLockGuard, Self::Error>;
async fn by_aggregate_id(
&self,
aggregate_id: Uuid,
) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error>;
async fn persist(
&self,
aggregate_state: &mut AggregateState<<Self::Aggregate as crate::Aggregate>::State>,
events: Vec<<Self::Aggregate as crate::Aggregate>::Event>,
) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error>;
async fn publish(&self, store_events: &[StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>]);
async fn delete(&self, aggregate_id: Uuid) -> Result<(), Self::Error>;
}
#[async_trait]
impl<A, E, T, S> EventStore for T
where
A: crate::Aggregate,
A::Event: Send + Sync,
A::State: Send,
E: std::error::Error,
S: EventStore<Aggregate = A, Error = E> + ?Sized,
T: Deref<Target = S> + Sync,
for<'a> A::Event: 'a,
{
type Aggregate = A;
type Error = E;
async fn lock(&self, aggregate_id: Uuid) -> Result<EventStoreLockGuard, Self::Error> {
self.deref().lock(aggregate_id).await
}
async fn by_aggregate_id(
&self,
aggregate_id: Uuid,
) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error> {
self.deref().by_aggregate_id(aggregate_id).await
}
async fn persist(
&self,
aggregate_state: &mut AggregateState<<Self::Aggregate as crate::Aggregate>::State>,
events: Vec<<Self::Aggregate as crate::Aggregate>::Event>,
) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error> {
self.deref().persist(aggregate_state, events).await
}
async fn publish(&self, events: &[StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>]) {
self.deref().publish(events).await
}
async fn delete(&self, aggregate_id: Uuid) -> Result<(), Self::Error> {
self.deref().delete(aggregate_id).await
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct StoreEvent<Event> {
pub id: Uuid,
pub aggregate_id: Uuid,
pub payload: Event,
pub occurred_on: DateTime<Utc>,
pub sequence_number: SequenceNumber,
pub version: Option<i32>,
}
impl<Event> StoreEvent<Event> {
pub const fn sequence_number(&self) -> &SequenceNumber {
&self.sequence_number
}
pub const fn payload(&self) -> &Event {
&self.payload
}
}