1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
use std::ops::Deref;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::state::AggregateState;
use crate::types::SequenceNumber;
#[cfg(feature = "postgres")]
pub mod postgres;
/// Marker trait for every [`EventStoreLockGuard`].
///
/// Implementors should unlock concurrent access to the guarded resource, when dropped.
pub trait UnlockOnDrop: Send + Sync + 'static {}
/// Lock guard preventing concurrent access to a resource.
///
/// The lock is released when this guard is dropped.
pub struct EventStoreLockGuard(Box<dyn UnlockOnDrop>);
impl EventStoreLockGuard {
/// Creates a new instance from any [`UnlockOnDrop`].
#[must_use]
pub fn new(lock: impl UnlockOnDrop) -> Self {
Self(Box::new(lock))
}
}
/// An EventStore is responsible for persisting events that an aggregate emits into a database, and loading the events
/// that represent an aggregate's history from the database.
#[async_trait]
pub trait EventStore {
type Aggregate: crate::Aggregate;
type Error: std::error::Error;
/// Acquires a lock for the given aggregate, or waits for outstanding guards to be released.
///
/// Used to prevent concurrent access to the aggregate state.
/// Note that any process which does *not* `lock` will get immediate (possibly shared!) access.
/// ALL accesses (regardless of this guard) are subject to the usual optimistic locking strategy on write.
async fn lock(&self, aggregate_id: Uuid) -> Result<EventStoreLockGuard, Self::Error>;
/// Loads the events that an aggregate instance has emitted in the past.
async fn by_aggregate_id(
&self,
aggregate_id: Uuid,
) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error>;
/// Persists multiple events into the database. This should be done in a single transaction - either
/// all the events are persisted correctly, or none are.
///
/// Persisting events may additionally trigger configured event handlers (transactional and non-transactional).
async fn persist(
&self,
aggregate_state: &mut AggregateState<<Self::Aggregate as crate::Aggregate>::State>,
events: Vec<<Self::Aggregate as crate::Aggregate>::Event>,
) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error>;
/// Publish multiple events on the configured events buses.
async fn publish(&self, store_events: &[StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>]);
/// Delete all events from events store related to given `aggregate_id`.
///
/// Moreover it should delete all the read side projections triggered by event handlers.
async fn delete(&self, aggregate_id: Uuid) -> Result<(), Self::Error>;
}
/// Blanket implementation making an [`EventStore`] every (smart) pointer to an [`EventStore`],
/// e.g. `&Store`, `Box<Store>`, `Arc<Store>`.
/// This is particularly useful when there's the need in your codebase to have a generic [`EventStore`].
#[async_trait]
impl<A, E, T, S> EventStore for T
where
A: crate::Aggregate,
A::Event: Send + Sync,
A::State: Send,
E: std::error::Error,
S: EventStore<Aggregate = A, Error = E> + ?Sized,
T: Deref<Target = S> + Sync,
for<'a> A::Event: 'a,
{
type Aggregate = A;
type Error = E;
/// Deref call to [`EventStore::lock`].
async fn lock(&self, aggregate_id: Uuid) -> Result<EventStoreLockGuard, Self::Error> {
self.deref().lock(aggregate_id).await
}
/// Deref call to [`EventStore::by_aggregate_id`].
async fn by_aggregate_id(
&self,
aggregate_id: Uuid,
) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error> {
self.deref().by_aggregate_id(aggregate_id).await
}
/// Deref call to [`EventStore::persist`].
async fn persist(
&self,
aggregate_state: &mut AggregateState<<Self::Aggregate as crate::Aggregate>::State>,
events: Vec<<Self::Aggregate as crate::Aggregate>::Event>,
) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error> {
self.deref().persist(aggregate_state, events).await
}
/// Deref call to [`EventStore::publish`].
async fn publish(&self, events: &[StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>]) {
self.deref().publish(events).await
}
/// Deref call to [`EventStore::delete`].
async fn delete(&self, aggregate_id: Uuid) -> Result<(), Self::Error> {
self.deref().delete(aggregate_id).await
}
}
/// A `StoreEvent` contains the payload (the original event) alongside the event's metadata.
#[derive(Serialize, Deserialize, Debug)]
pub struct StoreEvent<Event> {
/// Uniquely identifies an event among all events emitted from all aggregates.
pub id: Uuid,
/// The aggregate instance that emitted the event.
pub aggregate_id: Uuid,
/// The original, emitted, event.
pub payload: Event,
/// The timestamp of when the event is persisted.
pub occurred_on: DateTime<Utc>,
/// The sequence number of the event, within its specific aggregate instance.
pub sequence_number: SequenceNumber,
/// The version of the event.
pub version: Option<i32>,
}
impl<Event> StoreEvent<Event> {
/// Returns the sequence number of the event, within its specific aggregate instance.
pub const fn sequence_number(&self) -> &SequenceNumber {
&self.sequence_number
}
/// Returns the original, emitted, event.
pub const fn payload(&self) -> &Event {
&self.payload
}
}