use std::sync::Arc;
use eventually_core::store::{EventStore, Expected, Persisted, Select};
use eventually_postgres::EventStoreBuilder;
use futures::stream::TryStreamExt;
use serde::{Deserialize, Serialize};
use testcontainers::core::Docker;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum Event {
A,
B,
C,
}
#[tokio::test]
async fn stream_all_works() {
let docker = testcontainers::clients::Cli::default();
let postgres_image = testcontainers::images::postgres::Postgres::default();
let node = docker.run(postgres_image);
let dsn = format!(
"postgres://postgres:postgres@localhost:{}/postgres",
node.get_host_port(5432).unwrap()
);
let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
.await
.expect("could not connect to the docker container");
tokio::spawn(async move {
connection
.await
.expect("connection with the database exited with error")
});
let source_name = "stream_all_test";
let source_id_1 = "stream_all_test_1";
let source_id_2 = "stream_all_test_2";
let event_store_builder = EventStoreBuilder::migrate_database(&mut client)
.await
.expect("failed to run database migrations")
.builder(Arc::new(client));
let mut event_store = event_store_builder
.build::<String, Event>(source_name)
.await
.expect("failed to create event store");
event_store
.append(
source_id_1.to_owned(),
Expected::Exact(0),
vec![Event::A, Event::B, Event::C],
)
.await
.expect("failed while appending events");
event_store
.append(
source_id_2.to_owned(),
Expected::Exact(0),
vec![Event::C, Event::A, Event::B],
)
.await
.expect("failed while appending events");
let events: Vec<Persisted<String, Event>> = event_store
.stream_all(Select::All)
.await
.expect("failed to create first stream")
.try_collect()
.await
.expect("failed to collect events from subscription");
assert_eq!(
vec![
Persisted::from(source_id_1.to_owned(), Event::A)
.version(1)
.sequence_number(0),
Persisted::from(source_id_1.to_owned(), Event::B)
.version(2)
.sequence_number(1),
Persisted::from(source_id_1.to_owned(), Event::C)
.version(3)
.sequence_number(2),
Persisted::from(source_id_2.to_owned(), Event::C)
.version(1)
.sequence_number(3),
Persisted::from(source_id_2.to_owned(), Event::A)
.version(2)
.sequence_number(4),
Persisted::from(source_id_2.to_owned(), Event::B)
.version(3)
.sequence_number(5)
],
events
);
let events: Vec<Persisted<String, Event>> = event_store
.stream_all(Select::From(3))
.await
.expect("failed to create second stream")
.try_collect()
.await
.expect("failed to collect events from subscription");
assert_eq!(
vec![
Persisted::from(source_id_2.to_owned(), Event::C)
.version(1)
.sequence_number(3),
Persisted::from(source_id_2.to_owned(), Event::A)
.version(2)
.sequence_number(4),
Persisted::from(source_id_2.to_owned(), Event::B)
.version(3)
.sequence_number(5)
],
events
);
}
#[tokio::test]
async fn stream_works() {
let docker = testcontainers::clients::Cli::default();
let postgres_image = testcontainers::images::postgres::Postgres::default();
let node = docker.run(postgres_image);
let dsn = format!(
"postgres://postgres:postgres@localhost:{}/postgres",
node.get_host_port(5432).unwrap()
);
let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
.await
.expect("could not connect to the docker container");
tokio::spawn(async move {
connection
.await
.expect("connection with the database exited with error")
});
let source_name = "stream_test";
let source_id = "stream_test";
let event_store_builder = EventStoreBuilder::migrate_database(&mut client)
.await
.expect("failed to run database migrations")
.builder(Arc::new(client));
let mut event_store = event_store_builder
.build::<String, Event>(source_name)
.await
.expect("failed to create event store");
event_store
.append(
source_id.to_owned(),
Expected::Exact(0),
vec![Event::A, Event::B, Event::C],
)
.await
.expect("failed while appending events");
let events: Vec<Persisted<String, Event>> = event_store
.stream(source_id.to_owned(), Select::All)
.await
.expect("failed to create first stream")
.try_collect()
.await
.expect("failed to collect events from subscription");
assert_eq!(
vec![
Persisted::from(source_id.to_owned(), Event::A)
.version(1)
.sequence_number(0),
Persisted::from(source_id.to_owned(), Event::B)
.version(2)
.sequence_number(1),
Persisted::from(source_id.to_owned(), Event::C)
.version(3)
.sequence_number(2)
],
events
);
let events: Vec<Persisted<String, Event>> = event_store
.stream(source_id.to_owned(), Select::From(3))
.await
.expect("failed to create second stream")
.try_collect()
.await
.expect("failed to collect events from subscription");
assert_eq!(
vec![Persisted::from(source_id.to_owned(), Event::C)
.version(3)
.sequence_number(2)],
events
);
}