1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use async_trait::async_trait;
use std::collections::HashMap;
use std::marker::PhantomData;

use crate::event_repository::EventRepository;
use cqrs_es::{Aggregate, AggregateContext, AggregateError, EventEnvelope, EventStore};
use sqlx::{Pool, Postgres};

/// Storage engine using a Postgres database backing.
/// This is an event-sourced `EventStore`, meaning it uses events as the
/// primary source of truth for the state of the aggregate.
///
/// For a snapshot-based `EventStore`
/// see [`PostgresSnapshotStore`](struct.PostgresSnapshotStore.html).
///
pub struct PostgresStore<A: Aggregate + Send + Sync> {
    repo: EventRepository<A>,
    _phantom: PhantomData<A>,
}

impl<A: Aggregate> PostgresStore<A> {
    /// Creates a new `PostgresStore` from the provided database connection,
    /// an `EventStore` used for configuring a new cqrs framework.
    ///
    /// ```ignore
    /// # use postgres_es::PostgresStore;
    /// # use cqrs_es::CqrsFramework;
    /// let store = PostgresStore::<MyAggregate>::new(pool);
    /// let cqrs = CqrsFramework::new(store, vec![]);
    /// ```
    pub fn new(pool: Pool<Postgres>) -> Self {
        let repo = EventRepository::new(pool);
        PostgresStore {
            repo,
            _phantom: PhantomData,
        }
    }
}

#[async_trait]
impl<A: Aggregate> EventStore<A> for PostgresStore<A> {
    type AC = PostgresStoreAggregateContext<A>;

    async fn load(&self, aggregate_id: &str) -> Vec<EventEnvelope<A>> {
        match self.repo.get_events(aggregate_id).await {
            Ok(val) => val,
            Err(_err) => {
                // TODO: improved error handling
                Default::default()
            }
        }
    }
    async fn load_aggregate(&self, aggregate_id: &str) -> PostgresStoreAggregateContext<A> {
        let committed_events = self.load(aggregate_id).await;
        let mut aggregate = A::default();
        let mut current_sequence = 0;
        for envelope in committed_events {
            current_sequence = envelope.sequence;
            let event = envelope.payload;
            aggregate.apply(event);
        }
        PostgresStoreAggregateContext {
            aggregate_id: aggregate_id.to_string(),
            aggregate,
            current_sequence,
        }
    }

    async fn commit(
        &self,
        events: Vec<A::Event>,
        context: PostgresStoreAggregateContext<A>,
        metadata: HashMap<String, String>,
    ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
        let aggregate_id = context.aggregate_id.as_str();
        let current_sequence = context.current_sequence;
        let wrapped_events = self.wrap_events(aggregate_id, current_sequence, events, metadata);
        self.repo.insert_events(wrapped_events.clone()).await?;
        Ok(wrapped_events)
    }
}

/// Holds context for the pure event store implementation PostgresStore.
/// This is only used internally within the `EventStore`.
pub struct PostgresStoreAggregateContext<A: Aggregate> {
    aggregate_id: String,
    aggregate: A,
    current_sequence: usize,
}

impl<A: Aggregate> AggregateContext<A> for PostgresStoreAggregateContext<A> {
    fn aggregate(&self) -> &A {
        &self.aggregate
    }
}