1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use std::ops::Deref;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::state::AggregateState;
use crate::types::SequenceNumber;

#[cfg(feature = "postgres")]
pub mod postgres;

/// Marker trait for every [`EventStoreLockGuard`].
///
/// Implementors should unlock concurrent access to the guarded resource, when dropped.
pub trait UnlockOnDrop: Send + Sync + 'static {}

/// Lock guard preventing concurrent access to a resource.
///
/// The lock is released when this guard is dropped.
pub struct EventStoreLockGuard(Box<dyn UnlockOnDrop>);

impl EventStoreLockGuard {
    /// Creates a new instance from any [`UnlockOnDrop`].
    #[must_use]
    pub fn new(lock: impl UnlockOnDrop) -> Self {
        Self(Box::new(lock))
    }
}

/// An EventStore is responsible for persisting events that an aggregate emits into a database, and loading the events
/// that represent an aggregate's history from the database.
#[async_trait]
pub trait EventStore {
    type Aggregate: crate::Aggregate;
    type Error: std::error::Error;

    /// Acquires a lock for the given aggregate, or waits for outstanding guards to be released.
    ///
    /// Used to prevent concurrent access to the aggregate state.
    /// Note that any process which does *not* `lock` will get immediate (possibly shared!) access.
    /// ALL accesses (regardless of this guard) are subject to the usual optimistic locking strategy on write.
    async fn lock(&self, aggregate_id: Uuid) -> Result<EventStoreLockGuard, Self::Error>;

    /// Loads the events that an aggregate instance has emitted in the past.
    async fn by_aggregate_id(
        &self,
        aggregate_id: Uuid,
    ) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error>;

    /// Persists multiple events into the database. This should be done in a single transaction - either
    /// all the events are persisted correctly, or none are.
    ///
    /// Persisting events may additionally trigger configured event handlers (transactional and non-transactional).
    async fn persist(
        &self,
        aggregate_state: &mut AggregateState<<Self::Aggregate as crate::Aggregate>::State>,
        events: Vec<<Self::Aggregate as crate::Aggregate>::Event>,
    ) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error>;

    /// Publish multiple events on the configured events buses.
    async fn publish(&self, store_events: &[StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>]);

    /// Delete all events from events store related to given `aggregate_id`.
    ///
    /// Moreover it should delete all the read side projections triggered by event handlers.
    async fn delete(&self, aggregate_id: Uuid) -> Result<(), Self::Error>;
}

/// Blanket implementation making an [`EventStore`] every (smart) pointer to an [`EventStore`],
/// e.g. `&Store`, `Box<Store>`, `Arc<Store>`.
/// This is particularly useful when there's the need in your codebase to have a generic [`EventStore`].
#[async_trait]
impl<A, E, T, S> EventStore for T
where
    A: crate::Aggregate,
    A::Event: Send + Sync,
    A::State: Send,
    E: std::error::Error,
    S: EventStore<Aggregate = A, Error = E> + ?Sized,
    T: Deref<Target = S> + Sync,
    for<'a> A::Event: 'a,
{
    type Aggregate = A;
    type Error = E;

    /// Deref call to [`EventStore::lock`].
    async fn lock(&self, aggregate_id: Uuid) -> Result<EventStoreLockGuard, Self::Error> {
        self.deref().lock(aggregate_id).await
    }

    /// Deref call to [`EventStore::by_aggregate_id`].
    async fn by_aggregate_id(
        &self,
        aggregate_id: Uuid,
    ) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error> {
        self.deref().by_aggregate_id(aggregate_id).await
    }

    /// Deref call to [`EventStore::persist`].
    async fn persist(
        &self,
        aggregate_state: &mut AggregateState<<Self::Aggregate as crate::Aggregate>::State>,
        events: Vec<<Self::Aggregate as crate::Aggregate>::Event>,
    ) -> Result<Vec<StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>>, Self::Error> {
        self.deref().persist(aggregate_state, events).await
    }

    /// Deref call to [`EventStore::publish`].
    async fn publish(&self, events: &[StoreEvent<<Self::Aggregate as crate::Aggregate>::Event>]) {
        self.deref().publish(events).await
    }

    /// Deref call to [`EventStore::delete`].
    async fn delete(&self, aggregate_id: Uuid) -> Result<(), Self::Error> {
        self.deref().delete(aggregate_id).await
    }
}

/// A `StoreEvent` contains the payload (the original event) alongside the event's metadata.
#[derive(Serialize, Deserialize, Debug)]
pub struct StoreEvent<Event> {
    /// Uniquely identifies an event among all events emitted from all aggregates.
    pub id: Uuid,
    /// The aggregate instance that emitted the event.
    pub aggregate_id: Uuid,
    /// The original, emitted, event.
    pub payload: Event,
    /// The timestamp of when the event is persisted.
    pub occurred_on: DateTime<Utc>,
    /// The sequence number of the event, within its specific aggregate instance.
    pub sequence_number: SequenceNumber,
    /// The version of the event.
    pub version: Option<i32>,
}

impl<Event> StoreEvent<Event> {
    /// Returns the sequence number of the event, within its specific aggregate instance.
    pub const fn sequence_number(&self) -> &SequenceNumber {
        &self.sequence_number
    }

    /// Returns the original, emitted, event.
    pub const fn payload(&self) -> &Event {
        &self.payload
    }
}