1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
use std::marker::PhantomData;

use cqrs_es::{Aggregate, AggregateError, DomainEvent, EventStore, MessageEnvelope};
use postgres::Connection;

/// Storage engine using an Postgres backing. This is the only persistent store currently
/// provided.
pub struct PostgresStore<A, E>
    where
        A: Aggregate,
        E: DomainEvent<A>
{
    conn: Connection,
    _phantom: PhantomData<(A, E)>,
}

impl<A, E> PostgresStore<A, E>
    where
        A: Aggregate,
        E: DomainEvent<A>
{
    /// Creates a new `PostgresStore` from the provided database connection.
    pub fn new(conn: Connection) -> Self {
        PostgresStore {
            conn,
            _phantom: PhantomData,
        }
    }
}

static INSERT_EVENT: &str = "INSERT INTO events (aggregate_type, aggregate_id, sequence, payload, metadata)
                               VALUES ($1, $2, $3, $4, $5)";
static SELECT_EVENTS: &str = "SELECT aggregate_type, aggregate_id, sequence, payload, metadata
                                FROM events
                                WHERE aggregate_type = $1 AND aggregate_id = $2 ORDER BY sequence";

impl<A, E> EventStore<A, E> for PostgresStore<A, E>
    where
        A: Aggregate,
        E: DomainEvent<A>
{
    fn load(&self, aggregate_id: &str) -> Vec<MessageEnvelope<A, E>> {
        let agg_type = A::aggregate_type();
        let id = aggregate_id.to_string();
        let mut result = Vec::new();
        match self.conn.query(SELECT_EVENTS, &[&agg_type, &id]) {
            Ok(rows) => {
                for row in rows.iter() {
                    let aggregate_type: String = row.get("aggregate_type");
                    let aggregate_id: String = row.get("aggregate_id");
                    let s: i64 = row.get("sequence");
                    let sequence = s as usize;
                    let payload: E = match serde_json::from_value(row.get("payload")) {
                        Ok(payload) => payload,
                        Err(err) => {
                            panic!("bad payload found in events table for aggregate id {} with error: {}", &id, err);
                        }
                    };
                    let event = MessageEnvelope::new(aggregate_id, sequence, aggregate_type, payload);
                    result.push(event);
                }
            }
            Err(e) => { println!("{:?}", e); }
        }
        result
    }

    fn commit(&self, events: Vec<MessageEnvelope<A, E>>) -> Result<(), AggregateError> {
        let trans = match self.conn.transaction() {
            Ok(t) => { t }
            Err(err) => {
                return Err(AggregateError::TechnicalError(err.to_string()));
            }
        };
        for event in events {
            let agg_type = event.aggregate_type.clone();
            let id = event.aggregate_id.clone();
            let sequence = event.sequence as i64;
            let payload = match serde_json::to_value(&event.payload) {
                Ok(payload) => payload,
                Err(err) => {
                    panic!("bad payload found in events table for aggregate id {} with error: {}", &id, err);
                }
            };
            let metadata = match serde_json::to_value(&event.metadata) {
                Ok(metadata) => metadata,
                Err(err) => {
                    panic!("bad metadata found in events table for aggregate id {} with error: {}", &id, err);
                }
            };
            match self.conn.execute(INSERT_EVENT, &[&agg_type, &id, &sequence, &payload, &metadata]) {
                Ok(_) => {}
                Err(err) => {
                    match err.code() {
                        None => {}
                        Some(state) => {
                            if state.code() == "23505" {
                                return Err(AggregateError::TechnicalError("optimistic lock error".to_string()));
                            }
                        }
                    }
                    panic!("unable to insert event table for aggregate id {} with error: {}\n  and payload: {}", &id, err, &payload);
                }
            };
        }
        match trans.commit() {
            Ok(_) => Ok(()),
            Err(err) => Err(AggregateError::TechnicalError(err.to_string())),
        }
    }
}