postgres-es2 0.2.3

A Postgres implementation of an event store for cqrs-es2.
Documentation
use std::marker::PhantomData;

use postgres::Client;

use cqrs_es2::{
    Aggregate,
    AggregateError,
    EventEnvelope,
    Query,
    QueryProcessor,
};

use super::query_context::QueryContext;

/// This provides a simple query repository that can be used both to
/// return deserialized views and to act as a query processor.
pub struct GenericQueryRepository<V, A>
where
    V: Query<A>,
    A: Aggregate, {
    conn: Client,
    query_name: String,
    error_handler: Option<Box<ErrorHandler>>,
    _phantom: PhantomData<(V, A)>,
}

type ErrorHandler = dyn Fn(AggregateError);

impl<V, A> GenericQueryRepository<V, A>
where
    V: Query<A>,
    A: Aggregate,
{
    /// Creates a new `GenericQueryRepository` that will store its'
    /// views in the table named identically to the `query_name`
    /// value provided. This table should be created by the user
    /// previously (see `/db/init.sql`).
    #[must_use]
    pub fn new(
        query_name: &str,
        conn: Client,
    ) -> Self {
        GenericQueryRepository {
            conn,
            query_name: query_name.to_string(),
            error_handler: None,
            _phantom: PhantomData,
        }
    }
    /// Since inbound views cannot
    pub fn with_error_handler(
        &mut self,
        error_handler: Box<ErrorHandler>,
    ) {
        self.error_handler = Some(error_handler);
    }

    /// Returns the originally configured view name.
    #[must_use]
    pub fn view_name(&self) -> String {
        self.query_name.to_string()
    }

    fn load_mut(
        &mut self,
        query_instance_id: String,
    ) -> Result<(V, QueryContext<V>), AggregateError> {
        let query = format!(
            "SELECT version,payload FROM {} WHERE \
             query_instance_id= $1",
            &self.query_name
        );
        let result = match self
            .conn
            .query(query.as_str(), &[&query_instance_id])
        {
            Ok(result) => result,
            Err(e) => {
                return Err(AggregateError::new(
                    e.to_string().as_str(),
                ));
            },
        };
        match result.iter().next() {
            Some(row) => {
                let view_name = self.query_name.clone();
                let version = row.get("version");
                let payload = row.get("payload");
                let view = serde_json::from_value(payload)?;
                let view_context = QueryContext::new(
                    view_name,
                    query_instance_id,
                    version,
                    PhantomData,
                );
                Ok((view, view_context))
            },
            None => {
                let view_context = QueryContext::new(
                    self.query_name.clone(),
                    query_instance_id,
                    0,
                    PhantomData,
                );
                Ok((Default::default(), view_context))
            },
        }
    }

    /// Used to apply committed events to a view.
    pub fn apply_events(
        &mut self,
        query_instance_id: &str,
        events: &[EventEnvelope<A>],
    ) {
        match self.load_mut(query_instance_id.to_string()) {
            Ok((mut view, mut view_context)) => {
                for event in events {
                    view.update(event);
                }
                view_context.commit(&mut self.conn, view);
            },
            Err(e) => {
                match &self.error_handler {
                    None => {},
                    Some(handler) => {
                        (handler)(e);
                    },
                }
            },
        };
    }

    /// Loads and deserializes a view based on the view id.
    pub fn load(
        &mut self,
        query_instance_id: String,
    ) -> Option<V> {
        let query = format!(
            "SELECT version,payload FROM {} WHERE \
             query_instance_id= $1",
            &self.query_name
        );
        let result = match self
            .conn
            .query(query.as_str(), &[&query_instance_id])
        {
            Ok(result) => result,
            Err(err) => {
                panic!(
                    "unable to load view '{}' with id: '{}', \
                     encountered: {}",
                    &query_instance_id, &self.query_name, err
                );
            },
        };
        match result.iter().next() {
            Some(row) => {
                let payload = row.get("payload");
                match serde_json::from_value(payload) {
                    Ok(view) => Some(view),
                    Err(e) => {
                        match &self.error_handler {
                            None => {},
                            Some(handler) => {
                                (handler)(e.into());
                            },
                        }
                        None
                    },
                }
            },
            None => None,
        }
    }
}

impl<Q, A> QueryProcessor<A> for GenericQueryRepository<Q, A>
where
    Q: Query<A>,
    A: Aggregate,
{
    fn dispatch(
        &mut self,
        query_instance_id: &str,
        events: &[EventEnvelope<A>],
    ) {
        self.apply_events(&query_instance_id.to_string(), &events);
    }
}