eventually-postgres 0.2.0

Event Store implementation using PostgreSQL for the Eventually crate
use std::sync::Arc;

use eventually_core::store::{EventStore, Expected, Persisted, Select};
use eventually_postgres::EventStoreBuilder;

use futures::stream::TryStreamExt;

use serde::{Deserialize, Serialize};

use testcontainers::core::Docker;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum Event {
    A,
    B,
    C,
}

#[tokio::test]
async fn stream_all_works() {
    let docker = testcontainers::clients::Cli::default();
    let postgres_image = testcontainers::images::postgres::Postgres::default();
    let node = docker.run(postgres_image);

    let dsn = format!(
        "postgres://postgres:postgres@localhost:{}/postgres",
        node.get_host_port(5432).unwrap()
    );

    let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
        .await
        .expect("could not connect to the docker container");

    tokio::spawn(async move {
        connection
            .await
            .expect("connection with the database exited with error")
    });

    let source_name = "stream_all_test";
    let source_id_1 = "stream_all_test_1";
    let source_id_2 = "stream_all_test_2";

    let event_store_builder = EventStoreBuilder::migrate_database(&mut client)
        .await
        .expect("failed to run database migrations")
        .builder(Arc::new(client));

    let mut event_store = event_store_builder
        .build::<String, Event>(source_name)
        .await
        .expect("failed to create event store");

    event_store
        .append(
            source_id_1.to_owned(),
            Expected::Exact(0),
            vec![Event::A, Event::B, Event::C],
        )
        .await
        .expect("failed while appending events");

    event_store
        .append(
            source_id_2.to_owned(),
            Expected::Exact(0),
            vec![Event::C, Event::A, Event::B],
        )
        .await
        .expect("failed while appending events");

    // Select::All returns all the events.
    let events: Vec<Persisted<String, Event>> = event_store
        .stream_all(Select::All)
        .await
        .expect("failed to create first stream")
        .try_collect()
        .await
        .expect("failed to collect events from subscription");

    assert_eq!(
        vec![
            Persisted::from(source_id_1.to_owned(), Event::A)
                .version(1)
                .sequence_number(0),
            Persisted::from(source_id_1.to_owned(), Event::B)
                .version(2)
                .sequence_number(1),
            Persisted::from(source_id_1.to_owned(), Event::C)
                .version(3)
                .sequence_number(2),
            Persisted::from(source_id_2.to_owned(), Event::C)
                .version(1)
                .sequence_number(3),
            Persisted::from(source_id_2.to_owned(), Event::A)
                .version(2)
                .sequence_number(4),
            Persisted::from(source_id_2.to_owned(), Event::B)
                .version(3)
                .sequence_number(5)
        ],
        events
    );

    // Select::From returns a slice of the events by their sequence number,
    // in this case it will return only events coming from the second source.
    let events: Vec<Persisted<String, Event>> = event_store
        .stream_all(Select::From(3))
        .await
        .expect("failed to create second stream")
        .try_collect()
        .await
        .expect("failed to collect events from subscription");

    assert_eq!(
        vec![
            Persisted::from(source_id_2.to_owned(), Event::C)
                .version(1)
                .sequence_number(3),
            Persisted::from(source_id_2.to_owned(), Event::A)
                .version(2)
                .sequence_number(4),
            Persisted::from(source_id_2.to_owned(), Event::B)
                .version(3)
                .sequence_number(5)
        ],
        events
    );
}

#[tokio::test]
async fn stream_works() {
    let docker = testcontainers::clients::Cli::default();
    let postgres_image = testcontainers::images::postgres::Postgres::default();
    let node = docker.run(postgres_image);

    let dsn = format!(
        "postgres://postgres:postgres@localhost:{}/postgres",
        node.get_host_port(5432).unwrap()
    );

    let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
        .await
        .expect("could not connect to the docker container");

    tokio::spawn(async move {
        connection
            .await
            .expect("connection with the database exited with error")
    });

    let source_name = "stream_test";
    let source_id = "stream_test";

    let event_store_builder = EventStoreBuilder::migrate_database(&mut client)
        .await
        .expect("failed to run database migrations")
        .builder(Arc::new(client));

    let mut event_store = event_store_builder
        .build::<String, Event>(source_name)
        .await
        .expect("failed to create event store");

    event_store
        .append(
            source_id.to_owned(),
            Expected::Exact(0),
            vec![Event::A, Event::B, Event::C],
        )
        .await
        .expect("failed while appending events");

    // Select::All returns all the events.
    let events: Vec<Persisted<String, Event>> = event_store
        .stream(source_id.to_owned(), Select::All)
        .await
        .expect("failed to create first stream")
        .try_collect()
        .await
        .expect("failed to collect events from subscription");

    assert_eq!(
        vec![
            Persisted::from(source_id.to_owned(), Event::A)
                .version(1)
                .sequence_number(0),
            Persisted::from(source_id.to_owned(), Event::B)
                .version(2)
                .sequence_number(1),
            Persisted::from(source_id.to_owned(), Event::C)
                .version(3)
                .sequence_number(2)
        ],
        events
    );

    // Select::From returns a slice of the events by their version.
    let events: Vec<Persisted<String, Event>> = event_store
        .stream(source_id.to_owned(), Select::From(3))
        .await
        .expect("failed to create second stream")
        .try_collect()
        .await
        .expect("failed to collect events from subscription");

    assert_eq!(
        vec![Persisted::from(source_id.to_owned(), Event::C)
            .version(3)
            .sequence_number(2)],
        events
    );
}