postgres-es2 0.2.3

A Postgres implementation of an event store for cqrs-es2.
Documentation
use std::{
    collections::HashMap,
    marker::PhantomData,
};

use postgres::Client;
use serde_json::Value;

use cqrs_es2::{
    Aggregate,
    AggregateError,
    EventEnvelope,
    EventStore,
};

use super::postgres_snapshot_store_aggregate_context::PostgresSnapshotStoreAggregateContext;

static INSERT_EVENT: &str = "INSERT INTO events (aggregate_type, \
                             aggregate_id, sequence, payload, \
                             metadata)
    VALUES ($1, $2, $3, $4, $5)";
static SELECT_EVENTS: &str = "SELECT aggregate_type, aggregate_id, \
                              sequence, payload, metadata
     FROM events
     WHERE aggregate_type = $1 AND aggregate_id = $2 ORDER BY \
                              sequence";

static INSERT_SNAPSHOT: &str = "INSERT INTO snapshots \
                                (aggregate_type, aggregate_id, \
                                last_sequence, payload)
    VALUES ($1, $2, $3, $4)";
static UPDATE_SNAPSHOT: &str = "UPDATE snapshots
    SET last_sequence= $3 , payload= $4
    WHERE aggregate_type= $1 AND aggregate_id= $2";
static SELECT_SNAPSHOT: &str = "SELECT aggregate_type, \
                                aggregate_id, last_sequence, payload
     FROM snapshots
     WHERE aggregate_type = $1 AND aggregate_id = $2";

/// Storage engine using an Postgres backing and relying on a
/// serialization of the aggregate rather than individual events. This
/// is similar to the "snapshot strategy" seen in many CQRS
/// frameworks.
pub struct PostgresSnapshotStore<A: Aggregate> {
    conn: Client,
    _phantom: PhantomData<A>,
}

impl<A: Aggregate> PostgresSnapshotStore<A> {
    /// Creates a new `PostgresSnapshotStore` from the provided
    /// database connection.
    pub fn new(conn: Client) -> Self {
        PostgresSnapshotStore {
            conn,
            _phantom: PhantomData,
        }
    }
}

impl<A: Aggregate>
    EventStore<A, PostgresSnapshotStoreAggregateContext<A>>
    for PostgresSnapshotStore<A>
{
    fn load(
        &mut self,
        aggregate_id: &str,
    ) -> Vec<EventEnvelope<A>> {
        let agg_type = A::aggregate_type();
        let id = aggregate_id.to_string();
        let mut result = Vec::new();
        match self
            .conn
            .query(SELECT_EVENTS, &[&agg_type, &id])
        {
            Ok(rows) => {
                for row in rows.iter() {
                    let aggregate_type: String =
                        row.get("aggregate_type");
                    let aggregate_id: String =
                        row.get("aggregate_id");
                    let s: i64 = row.get("sequence");
                    let sequence = s as usize;
                    let payload: A::Event =
                        match serde_json::from_value(
                            row.get("payload"),
                        ) {
                            Ok(payload) => payload,
                            Err(err) => {
                                panic!(
                                    "bad payload found in events \
                                     table for aggregate id {} with \
                                     error: {}",
                                    &id, err
                                );
                            },
                        };
                    let event = EventEnvelope::new(
                        aggregate_id,
                        sequence,
                        aggregate_type,
                        payload,
                    );
                    result.push(event);
                }
            },
            Err(e) => {
                panic!("{:?}", e);
            },
        }
        result
    }
    fn load_aggregate(
        &mut self,
        aggregate_id: &str,
    ) -> PostgresSnapshotStoreAggregateContext<A> {
        let agg_type = A::aggregate_type();
        match self.conn.query(
            SELECT_SNAPSHOT,
            &[&agg_type, &aggregate_id.to_string()],
        ) {
            Ok(rows) => {
                match rows.iter().next() {
                    None => {
                        let current_sequence = 0;
                        PostgresSnapshotStoreAggregateContext::new(
                            aggregate_id.to_string(),
                            A::default(),
                            current_sequence,
                        )
                    },
                    Some(row) => {
                        let s: i64 = row.get("last_sequence");
                        let val: Value = row.get("payload");
                        let aggregate =
                            serde_json::from_value(val).unwrap();
                        PostgresSnapshotStoreAggregateContext::new(
                            aggregate_id.to_string(),
                            aggregate,
                            s as usize,
                        )
                    },
                }
            },
            Err(e) => {
                panic!("{:?}", e);
            },
        }
    }

    fn commit(
        &mut self,
        events: Vec<A::Event>,
        context: PostgresSnapshotStoreAggregateContext<A>,
        metadata: HashMap<String, String>,
    ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
        let mut updated_aggregate = context.aggregate_copy();
        let aggregate_id = context.aggregate_id.as_str();
        let current_sequence = context.current_sequence;
        let wrapped_events = self.wrap_events(
            aggregate_id,
            current_sequence,
            events,
            metadata,
        );
        let mut trans = match self.conn.transaction() {
            Ok(t) => t,
            Err(err) => {
                return Err(AggregateError::TechnicalError(
                    err.to_string(),
                ));
            },
        };
        let mut last_sequence = current_sequence as i64;
        for event in wrapped_events.clone() {
            let agg_type = event.aggregate_type.clone();
            let id = context.aggregate_id.clone();
            let sequence = event.sequence as i64;
            last_sequence = sequence;
            let payload = match serde_json::to_value(&event.payload) {
                Ok(payload) => payload,
                Err(err) => {
                    panic!(
                        "bad payload found in events table for \
                         aggregate id {} with error: {}",
                        &id, err
                    );
                },
            };
            let metadata = match serde_json::to_value(&event.metadata)
            {
                Ok(metadata) => metadata,
                Err(err) => {
                    panic!(
                        "bad metadata found in events table for \
                         aggregate id {} with error: {}",
                        &id, err
                    );
                },
            };
            match trans.execute(
                INSERT_EVENT,
                &[
                    &agg_type, &id, &sequence, &payload, &metadata,
                ],
            ) {
                Ok(_) => {},
                Err(err) => {
                    match err.code() {
                        None => {},
                        Some(state) => {
                            if state.code() == "23505" {
                                return Err(
                                    AggregateError::TechnicalError(
                                        "optimistic lock error"
                                            .to_string(),
                                    ),
                                );
                            }
                        },
                    }
                    panic!(
                        "unable to insert event table for aggregate \
                         id {} with error: {}\n  and payload: {}",
                        &id, err, &payload
                    );
                },
            };
            updated_aggregate.apply(&event.payload);
        }

        let agg_type = A::aggregate_type();
        let aggregate_payload =
            match serde_json::to_value(updated_aggregate) {
                Ok(val) => val,
                Err(err) => {
                    panic!(
                        "bad metadata found in events table for \
                         aggregate id {} with error: {}",
                        &aggregate_id, err
                    );
                },
            };
        if context.current_sequence == 0 {
            match trans.execute(
                INSERT_SNAPSHOT,
                &[
                    &agg_type,
                    &aggregate_id,
                    &last_sequence,
                    &aggregate_payload,
                ],
            ) {
                Ok(_) => {},
                Err(err) => {
                    panic!(
                        "unable to insert snapshot for aggregate id \
                         {} with error: {}\n  and payload: {}",
                        &aggregate_id, err, &aggregate_payload
                    );
                },
            };
        }
        else {
            match trans.execute(
                UPDATE_SNAPSHOT,
                &[
                    &agg_type,
                    &aggregate_id,
                    &last_sequence,
                    &aggregate_payload,
                ],
            ) {
                Ok(_) => {},
                Err(err) => {
                    panic!(
                        "unable to update snapshot for aggregate id \
                         {} with error: {}\n  and payload: {}",
                        &aggregate_id, err, &aggregate_payload
                    );
                },
            };
        }

        match trans.commit() {
            Ok(_) => Ok(wrapped_events),
            Err(err) => {
                Err(AggregateError::TechnicalError(
                    err.to_string(),
                ))
            },
        }
    }
}