cqrs-postgres 0.3.3

An implementation of cqrs for a PostgreSQL backend.
Documentation
use crate::util::Sequence;
use cqrs_core::{reactor::Reaction, CqrsError, EventNumber, RawEvent, Since};
use num_traits::ToPrimitive;
use postgres::{rows::Row, types::ToSql};
use r2d2::{Pool, PooledConnection};
use r2d2_postgres::PostgresConnectionManager;
use std::{error, fmt, sync::Arc};

#[derive(Debug)]
pub enum ReactorError<R, P = r2d2::Error, D = postgres::Error>
where
    R: Reaction,
{
    Pool(Arc<P>),
    Postgres(Arc<D>),
    React(R::Error),
}

impl<R, P, D> Clone for ReactorError<R, P, D>
where
    R: Reaction,
    R::Error: Clone,
{
    fn clone(&self) -> Self {
        match self {
            ReactorError::Pool(e) => ReactorError::Pool(e.clone()),
            ReactorError::Postgres(e) => ReactorError::Postgres(e.clone()),
            ReactorError::React(e) => ReactorError::React(e.clone()),
        }
    }
}

impl<R, P, D> ReactorError<R, P, D>
where
    R: Reaction,
{
    pub fn pool(err: P) -> Self {
        ReactorError::Pool(Arc::new(err))
    }

    pub fn postgres(err: D) -> Self {
        ReactorError::Postgres(Arc::new(err))
    }

    pub fn react(err: R::Error) -> Self {
        ReactorError::React(err)
    }
}

impl<R, P, D> fmt::Display for ReactorError<R, P, D>
where
    R: Reaction,
    R::Error: fmt::Display,
    P: fmt::Display,
    D: fmt::Display,
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            ReactorError::Pool(ref err) => write!(f, "Pool error during reaction: {}", err),
            ReactorError::Postgres(ref err) => write!(f, "Postgres error during reaction: {}", err),
            ReactorError::React(ref err) => write!(f, "React error during reaction: {}", err),
        }
    }
}

pub trait DbPool<'conn> {
    type Connection: DbConnection<'conn> + 'conn;
    type Error: CqrsError;
    fn get(&self) -> Result<Self::Connection, Self::Error>;
}

pub trait DbConnection<'conn> {
    type Error: CqrsError;
    fn load_since(&self, reaction_name: &str) -> Result<Since, Self::Error>;
    fn save_since(&self, reaction_name: &str, event_id: EventNumber) -> Result<(), Self::Error>;
    fn read_all_events(
        &self,
        query: &str,
        since: Since,
        params: &[Box<dyn ToSql>],
    ) -> Result<Vec<RawEvent>, Self::Error>;
}

impl<'conn> DbPool<'conn> for Pool<PostgresConnectionManager> {
    type Connection = PooledConnection<PostgresConnectionManager>;
    type Error = r2d2::Error;

    fn get(&self) -> Result<Self::Connection, Self::Error> {
        (*self).get()
    }
}

impl<'conn> DbConnection<'conn> for PooledConnection<PostgresConnectionManager> {
    type Error = postgres::Error;

    fn load_since(&self, reaction_name: &str) -> Result<Since, Self::Error> {
        let stmt = self.prepare_cached(
            "SELECT event_id \
             FROM reactions \
             WHERE reaction_name = $1 \
             LIMIT 1",
        )?;

        let rows = stmt.query(&[&reaction_name])?;

        if rows.len() > 0 {
            let event_id: Sequence = rows.get(0).get(0);
            return Ok(Since::Event(event_id.0));
        }

        Ok(Since::BeginningOfStream)
    }

    fn save_since(&self, reaction_name: &str, event_id: EventNumber) -> Result<(), Self::Error> {
        let stmt = self.prepare_cached(
            "INSERT INTO reactions (reaction_name, event_id) \
             VALUES ($1, $2) \
             ON CONFLICT (reaction_name) \
             DO UPDATE SET event_id = EXCLUDED.event_id",
        )?;

        stmt.execute(&[
            &reaction_name,
            &event_id
                .get()
                .to_i64()
                .expect("Not expecting event_id > several billions"),
        ])?;

        Ok(())
    }

    fn read_all_events(
        &self,
        query: &str,
        since: Since,
        params: &[Box<ToSql>],
    ) -> Result<Vec<RawEvent>, Self::Error> {
        let last_sequence = match since {
            Since::BeginningOfStream => 0,
            Since::Event(x) => x.get(),
        } as i64;

        fn handle_row(row: Row) -> RawEvent {
            let event_id: Sequence = row.get(0);
            let aggregate_type = row.get(1);
            let entity_id = row.get(2);
            let sequence: Sequence = row.get(3);
            let event_type = row.get(4);
            let payload = row.get_bytes(5).unwrap();
            RawEvent {
                event_id: event_id.0,
                aggregate_type,
                entity_id,
                sequence: sequence.0,
                event_type,
                payload: payload.to_owned(),
            }
        };

        let events: Vec<RawEvent>;
        {
            let rows = {
                let local_params: Vec<_> = ::std::iter::once::<&dyn ToSql>(&last_sequence)
                    .chain(params.iter().map(|p| &**p))
                    .collect();
                let stmt = self.prepare_cached(query)?;
                stmt.query(&local_params)?
            };

            events = (&rows).into_iter().map(handle_row).collect();
        }

        Ok(events)
    }
}