postgres-es 0.5.0

A Postgres implementation of an event repository for cqrs-es.
Documentation
use std::collections::HashMap;

use cqrs_es::doc::{Customer, CustomerEvent};
use cqrs_es::persist::{PersistedEventStore, SemanticVersionEventUpcaster};
use cqrs_es::EventStore;
use postgres_es::{default_postgress_pool, PostgresEventRepository};
use serde_json::Value;
use sqlx::{Pool, Postgres};

const TEST_CONNECTION_STRING: &str = "postgresql://test_user:test_pass@127.0.0.1:5432/test";

fn new_test_event_store(
    pool: Pool<Postgres>,
) -> PersistedEventStore<PostgresEventRepository, Customer> {
    let repo = PostgresEventRepository::new(pool);
    PersistedEventStore::<PostgresEventRepository, Customer>::new_event_store(repo)
}

#[tokio::test]
async fn commit_and_load_events() {
    let pool = default_postgress_pool(TEST_CONNECTION_STRING).await;
    let repo = PostgresEventRepository::new(pool);
    let event_store =
        PersistedEventStore::<PostgresEventRepository, Customer>::new_event_store(repo);

    simple_es_commit_and_load_test(event_store).await;
}

#[tokio::test]
async fn commit_and_load_events_snapshot_store() {
    let pool = default_postgress_pool(TEST_CONNECTION_STRING).await;
    let repo = PostgresEventRepository::new(pool);
    let event_store =
        PersistedEventStore::<PostgresEventRepository, Customer>::new_aggregate_store(repo);

    simple_es_commit_and_load_test(event_store).await;
}

async fn simple_es_commit_and_load_test(
    event_store: PersistedEventStore<PostgresEventRepository, Customer>,
) {
    let id = uuid::Uuid::new_v4().to_string();
    assert_eq!(0, event_store.load_events(id.as_str()).await.unwrap().len());
    let context = event_store.load_aggregate(id.as_str()).await.unwrap();

    event_store
        .commit(
            vec![
                CustomerEvent::NameAdded {
                    name: "test_event_A".to_string(),
                },
                CustomerEvent::EmailUpdated {
                    new_email: "email A".to_string(),
                },
            ],
            context,
            HashMap::default(),
        )
        .await
        .unwrap();

    assert_eq!(2, event_store.load_events(id.as_str()).await.unwrap().len());
    let context = event_store.load_aggregate(id.as_str()).await.unwrap();

    event_store
        .commit(
            vec![CustomerEvent::EmailUpdated {
                new_email: "email B".to_string(),
            }],
            context,
            HashMap::default(),
        )
        .await
        .unwrap();
    assert_eq!(3, event_store.load_events(id.as_str()).await.unwrap().len());
}

#[tokio::test]
async fn upcasted_event() {
    let pool = default_postgress_pool(TEST_CONNECTION_STRING).await;
    let upcaster = SemanticVersionEventUpcaster::new(
        "NameAdded",
        "1.0.1",
        Box::new(|mut event| match event.get_mut("NameAdded").unwrap() {
            Value::Object(object) => {
                object.insert("name".to_string(), Value::String("UNKNOWN".to_string()));
                event
            }
            _ => panic!("not the expected object"),
        }),
    );
    let event_store = new_test_event_store(pool).with_upcasters(vec![Box::new(upcaster)]);

    let id = "previous_event_in_need_of_upcast".to_string();
    let result = event_store.load_aggregate(id.as_str()).await.unwrap();
    assert_eq!(1, result.current_sequence);
    assert_eq!(None, result.current_snapshot);
}