eventually-postgres 0.2.0

Event Store implementation using PostgreSQL for the Eventually crate
use std::sync::Arc;

use eventually_core::store::{EventStore, Expected, Persisted};
use eventually_core::subscription::{EventSubscriber as EventSubscriberTrait, Subscription};
use eventually_postgres::{EventStoreBuilder, EventSubscriber, PersistentBuilder};

use futures::stream::{StreamExt, TryStreamExt};

use serde::{Deserialize, Serialize};

use testcontainers::core::Docker;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum Event {
    A,
    B,
    C,
}

#[tokio::test]
async fn subscribe_all_works() {
    let docker = testcontainers::clients::Cli::default();
    let postgres_image = testcontainers::images::postgres::Postgres::default();
    let node = docker.run(postgres_image);

    let dsn = format!(
        "postgres://postgres:postgres@localhost:{}/postgres",
        node.get_host_port(5432).unwrap()
    );

    let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
        .await
        .expect("could not connect to the docker container");

    tokio::spawn(async move {
        connection
            .await
            .expect("connection with the database exited with error")
    });

    let source_name = "subscriber_test";
    let source_id = "subscriber_test";

    let event_store_builder = EventStoreBuilder::migrate_database(&mut client)
        .await
        .expect("failed to run database migrations")
        .builder(Arc::new(client));

    let mut event_store = event_store_builder
        .build::<String, Event>(source_name)
        .await
        .expect("failed to create event store");

    let event_subscriber = EventSubscriber::<String, Event>::new(&dsn, source_name)
        .await
        .expect("failed to create event subscription");

    let subscription = event_subscriber
        .subscribe_all()
        .await
        .expect("failed to create subscription from event subscriber");

    event_store
        .append(
            source_id.to_owned(),
            Expected::Exact(0),
            vec![Event::A, Event::B, Event::C],
        )
        .await
        .expect("failed while appending events");

    let events: Vec<Persisted<String, Event>> = subscription
        .take(3)
        .try_collect()
        .await
        .expect("failed to collect events from subscription");

    assert_eq!(
        vec![
            Persisted::from(source_id.to_owned(), Event::A)
                .version(1)
                .sequence_number(0),
            Persisted::from(source_id.to_owned(), Event::B)
                .version(2)
                .sequence_number(1),
            Persisted::from(source_id.to_owned(), Event::C)
                .version(3)
                .sequence_number(2)
        ],
        events
    );
}

#[tokio::test]
async fn persistent_subscription_works() {
    let docker = testcontainers::clients::Cli::default();
    let postgres_image = testcontainers::images::postgres::Postgres::default();
    let node = docker.run(postgres_image);

    let dsn = format!(
        "postgres://postgres:postgres@localhost:{}/postgres",
        node.get_host_port(5432).unwrap()
    );

    let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
        .await
        .expect("could not connect to the docker container");

    tokio::spawn(async move {
        connection
            .await
            .expect("connection with the database exited with error")
    });

    let source_name = "persistent_subscription_test";
    let source_id = "persistent_subscription_test";

    let event_store_builder = EventStoreBuilder::migrate_database(&mut client)
        .await
        .expect("failed to run database migrations")
        .builder(Arc::new(client));

    let mut event_store = event_store_builder
        .build::<String, Event>(source_name)
        .await
        .expect("failed to create event store");

    let event_subscriber = EventSubscriber::<String, Event>::new(&dsn, source_name)
        .await
        .expect("failed to create event subscription");

    let (client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
        .await
        .expect("could not connect to the docker container");

    tokio::spawn(async move {
        connection
            .await
            .expect("connection with the database exited with error")
    });

    let subscription =
        PersistentBuilder::new(Arc::new(client), event_store.clone(), event_subscriber)
            .get_or_create(source_name.to_owned())
            .await
            .expect("failed to create persistent subscription");

    event_store
        .append(
            source_id.to_owned(),
            Expected::Exact(0),
            vec![Event::A, Event::B, Event::C],
        )
        .await
        .expect("failed while appending events");

    let events: Vec<Persisted<String, Event>> = subscription
        .resume()
        .await
        .expect("failed to resume subscription")
        .take(3)
        .try_collect()
        .await
        .expect("failed to collect events from subscription");

    assert_eq!(
        vec![
            Persisted::from(source_id.to_owned(), Event::A)
                .version(1)
                .sequence_number(0),
            Persisted::from(source_id.to_owned(), Event::B)
                .version(2)
                .sequence_number(1),
            Persisted::from(source_id.to_owned(), Event::C)
                .version(3)
                .sequence_number(2)
        ],
        events
    );
}