mod common;
use chrono::Utc;
use futures::StreamExt;
use timesource::consumer::store::ConsumerStore;
use timesource::consumer::{ConsumerBuilder, RootConsumerStore};
use timesource::repository::Repository;
use timesource::store::{CommitOrder, EventStore, EventStoreBuilder};
use timesource::Aggregate;
use tokio::task::yield_now;
use uuid::Uuid;
use crate::common::data::{
bootstrap_test, bootstrap_test_cbor, sample_cbor_commands, sample_cbor_events, sample_commands,
sample_events, sample_events_uncommitted, TestData, DSN,
};
use crate::common::order::{OrderAggregate, OrderCommand, OrderEvent};
use crate::common::user_cbor::{UserCommand, UserEvent};
#[macro_use]
extern crate test_case;
#[tokio::test]
async fn consumer_should_fetch_prior_json_events_on_resume() {
let TestData {
aggregate_id,
repository,
mut root,
consumer,
..
} = bootstrap_test(false).await;
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let mut stream = consumer.resume().await.expect("should resume consumer");
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message.aggregate_id(), aggregate_id);
assert_eq!(message.into_data(), OrderEvent::Created);
}
#[tokio::test]
async fn consumer_should_fetch_prior_cbor_events_on_resume() {
let TestData {
aggregate_id,
repository,
mut root,
consumer,
..
} = bootstrap_test_cbor(false).await;
root.handle(UserCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let mut stream = consumer.resume().await.expect("should resume consumer");
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message.aggregate_id(), aggregate_id);
assert_eq!(message.into_data(), UserEvent::Created);
}
#[tokio::test]
async fn consumer_should_stream_json_events_after_notification() {
let TestData {
aggregate_id,
repository,
mut root,
consumer,
..
} = bootstrap_test(false).await;
let mut stream = consumer.resume().await.expect("should resume consumer");
for command in sample_commands() {
root.handle(command)
.expect("Should be able to submit command");
}
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
for event in sample_events() {
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message.aggregate_id(), aggregate_id);
assert_eq!(message.into_data(), event);
}
}
#[tokio::test]
async fn consumer_should_stream_events_for_different_roots_after_notification() {
let TestData {
aggregate_id,
repository,
mut root,
consumer,
..
} = bootstrap_test(false).await;
let mut stream = consumer.resume().await.expect("should resume consumer");
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message.aggregate_id(), aggregate_id);
assert_eq!(message.into_data(), OrderEvent::Created);
let mut root2 = OrderAggregate::root();
root2
.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root2)
.await
.expect("Should be able to commit root");
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message.aggregate_id(), root2.id());
assert_eq!(message.into_data(), OrderEvent::Created);
}
#[tokio::test]
async fn consumer_should_stream_cbor_events_after_notification() {
let TestData {
aggregate_id,
repository,
mut root,
consumer,
..
} = bootstrap_test_cbor(false).await;
let mut stream = consumer.resume().await.expect("should resume consumer");
for command in sample_cbor_commands() {
root.handle(command)
.expect("Should be able to submit command");
}
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
for event in sample_cbor_events() {
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message.aggregate_id(), aggregate_id);
assert_eq!(message.into_data(), event);
}
}
#[tokio::test]
async fn it_should_recover_if_postgres_notifications_buffer_gets_full() {
let TestData {
aggregate_type_name,
consumer_name,
..
} = bootstrap_test(false).await;
let store = EventStoreBuilder::new(DSN)
.build::<OrderEvent>(&aggregate_type_name)
.await
.expect("store to be created");
let events1 = sample_events_uncommitted();
let events2 = sample_events_uncommitted();
let events3 = sample_events_uncommitted();
let consumer = ConsumerBuilder::new(DSN)
.with_notification_buffer_capacity(1)
.with_event_buffer_capacity(events1.len() - 1)
.aggregate_build::<OrderEvent>(consumer_name.into(), &aggregate_type_name)
.await
.unwrap();
let stream = consumer.resume().await.expect("should resume consumer");
let aggregate_id = Uuid::new_v4();
store
.commit(aggregate_id, CommitOrder::None, &events1)
.await
.expect("Failed appending events");
store
.commit(aggregate_id, CommitOrder::None, &events2)
.await
.expect("Failed appending events");
store
.commit(aggregate_id, CommitOrder::None, &events3)
.await
.expect("Failed appending events");
yield_now().await;
let received = stream.take(9).map(|x| x.unwrap()).collect::<Vec<_>>().await;
assert_eq!(received.len(), 9, "It should receive all events");
}
#[tokio::test]
async fn root_store_should_return_queries() {
let TestData {
repository,
mut root,
aggregate_type_name,
consumer_root_name,
aggregate_type_id,
pool,
..
} = bootstrap_test(false).await;
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let store = EventStoreBuilder::new(DSN)
.build::<OrderEvent>(&aggregate_type_name)
.await
.expect("store to be created");
let repository = Repository::new(store);
let mut root2 = OrderAggregate::root();
root2
.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root2)
.await
.expect("Should be able to commit root");
let store = RootConsumerStore::<OrderEvent>::new(
root2.id(),
aggregate_type_id,
consumer_root_name.into(),
pool.clone(),
);
let events_after_offset = store.events_after_offset().collect::<Vec<_>>().await;
assert_eq!(
events_after_offset.len(),
1,
"should only return events after offset 2"
);
let event = events_after_offset[0].as_ref().unwrap();
let offset = event.id();
root.handle(OrderCommand::Cancel).unwrap();
repository.commit_orderly(&mut root).await.unwrap();
let events_after = store.events_after(offset).collect::<Vec<_>>().await;
assert_eq!(
events_after.len(),
0,
"only one event for root2 has been stored"
);
root2.handle(OrderCommand::Cancel).unwrap();
repository.commit_orderly(&mut root2).await.unwrap();
let events_after = store.events_after(offset).collect::<Vec<_>>().await;
assert_eq!(events_after.len(), 1, "should return event after first one");
let events_range = store
.events_range(offset, Utc::now().timestamp_nanos() as u64)
.collect::<Vec<_>>()
.await;
assert_eq!(
events_range.len(),
1,
"should return range after first one for root only"
);
}
#[tokio::test]
async fn should_consume_prior_root_events_only() {
let TestData {
repository,
mut root,
aggregate_type_name,
consumer_root_name,
..
} = bootstrap_test(false).await;
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let store = EventStoreBuilder::new(DSN)
.build::<OrderEvent>(&aggregate_type_name)
.await
.expect("store to be created");
let repository = Repository::new(store);
let mut root2 = OrderAggregate::root();
root2
.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root2)
.await
.expect("Should be able to commit root");
let consumer = ConsumerBuilder::new(DSN)
.aggregate_root_build::<OrderEvent>(
consumer_root_name.clone().into(),
&aggregate_type_name,
root2.id(),
)
.await
.unwrap();
let mut stream = consumer.resume().await.expect("should resume consumer");
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message.aggregate_id(), root2.id());
}
#[tokio::test]
async fn should_get_notified_for_root_events_only() {
let TestData {
repository,
mut root,
aggregate_type_name,
consumer_root_name,
..
} = bootstrap_test(false).await;
let mut root2 = OrderAggregate::root();
root2
.handle(OrderCommand::Create)
.expect("Should be able to submit command");
let consumer = ConsumerBuilder::new(DSN)
.aggregate_root_build::<OrderEvent>(
consumer_root_name.clone().into(),
&aggregate_type_name,
root2.id(),
)
.await
.unwrap();
let mut stream = consumer.resume().await.expect("should resume consumer");
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
repository
.commit_orderly(&mut root2)
.await
.expect("Should be able to commit root");
let first_message = stream.next().await.unwrap().unwrap();
assert_eq!(first_message.aggregate_id(), root2.id());
}
#[test_case(false ; "aggregate")]
#[test_case(true ; "aggregate_root")]
#[tokio::test]
async fn handles_checkpoints_ok(root_case: bool) {
let TestData {
repository,
mut root,
consumer_root,
consumer,
consumer_name,
consumer_root_name,
pool,
aggregate_type_name,
..
} = bootstrap_test(false).await;
let mut stream = if root_case {
consumer_root
.resume()
.await
.expect("should resume consumer")
} else {
consumer.resume().await.expect("should resume consumer")
};
let consumer_name = if root_case {
consumer_root_name
} else {
consumer_name
};
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let message = stream.next().await.unwrap().unwrap();
if root_case {
consumer_root.ack(message.id()).await.unwrap();
} else {
consumer.ack(message.id()).await.unwrap();
}
let saved_offset =
sqlx::query_file_scalar!("tests/queries/consumer_offset.sql", &consumer_name)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(message.id(), saved_offset.unwrap() as u64);
let consumer_2 = ConsumerBuilder::new(DSN)
.aggregate_build::<OrderEvent>(consumer_name.clone().into(), &aggregate_type_name)
.await
.unwrap();
let mut stream_2 = consumer_2.resume().await.expect("should resume consumer");
root.handle(OrderCommand::Cancel)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let message = stream_2.next().await.unwrap().unwrap().into_data();
assert_eq!(
message,
OrderEvent::Cancelled,
"should deliver next message after offset"
);
}
#[test_case(false ; "aggregate")]
#[test_case(true ; "aggregate_root")]
#[tokio::test]
async fn it_should_save_last_checkpoint(root_case: bool) {
let TestData {
repository,
mut root,
consumer,
consumer_root,
consumer_name,
consumer_root_name,
pool,
..
} = bootstrap_test(false).await;
let stream = if root_case {
consumer_root
.resume()
.await
.expect("should resume consumer")
} else {
consumer.resume().await.expect("should resume consumer")
};
let consumer_name = if root_case {
consumer_root_name
} else {
consumer_name
};
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
root.handle(OrderCommand::Cancel)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let messages = stream.take(2).collect::<Vec<_>>().await;
let last_message = messages.last().unwrap().as_ref().unwrap();
if root_case {
consumer_root.ack(last_message.id()).await.unwrap();
} else {
consumer.ack(last_message.id()).await.unwrap();
}
let saved_offset =
sqlx::query_file_scalar!("tests/queries/consumer_offset.sql", &consumer_name)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(last_message.id(), saved_offset.unwrap() as u64);
}
#[test_case(false ; "aggregate")]
#[test_case(true ; "aggregate_root")]
#[tokio::test]
async fn it_should_handle_first_try_ack(root_case: bool) {
let TestData {
repository,
mut root,
consumer,
consumer_root,
consumer_name,
consumer_root_name,
pool,
..
} = bootstrap_test(false).await;
let mut stream = if root_case {
consumer_root
.resume()
.await
.expect("should resume consumer")
} else {
consumer.resume().await.expect("should resume consumer")
};
let consumer_name = if root_case {
consumer_root_name
} else {
consumer_name
};
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let message = stream.next().await.unwrap().unwrap();
if root_case {
consumer_root.ack(message.id()).await.unwrap();
} else {
consumer.ack(message.id()).await.unwrap();
}
let saved_offset =
sqlx::query_file_scalar!("tests/queries/consumer_offset.sql", &consumer_name)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(message.id(), saved_offset.unwrap() as u64);
}
#[test_case(false ; "aggregate")]
#[test_case(true ; "aggregate_root")]
#[tokio::test]
#[should_panic(expected = "checkpoint out of order")]
async fn it_should_return_err_for_out_of_order_try_ack(root_case: bool) {
let TestData {
repository,
mut root,
consumer,
consumer_root,
..
} = bootstrap_test(false).await;
let stream = if root_case {
consumer_root
.resume()
.await
.expect("should resume consumer")
} else {
consumer.resume().await.expect("should resume consumer")
};
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
root.handle(OrderCommand::Cancel)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let messages = stream.take(2).collect::<Vec<_>>().await;
let last_message = messages.last().unwrap().as_ref().unwrap();
consumer.try_ack(last_message.id()).await.unwrap();
}
#[test_case(false ; "aggregate")]
#[test_case(true ; "aggregate_root")]
#[tokio::test]
#[should_panic(expected = "checkpoint behind current offset")]
async fn it_should_return_err_for_already_submitted_checkpoint(root_case: bool) {
let TestData {
repository,
mut root,
consumer,
consumer_root,
..
} = bootstrap_test(false).await;
let mut stream = if root_case {
consumer_root
.resume()
.await
.expect("should resume consumer")
} else {
consumer.resume().await.expect("should resume consumer")
};
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
root.handle(OrderCommand::Cancel)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let first = stream.next().await.unwrap().unwrap();
let last = stream.next().await.unwrap().unwrap();
consumer.try_ack(first.id()).await.unwrap();
consumer.try_ack(last.id()).await.unwrap();
consumer.try_ack(first.id()).await.unwrap();
}
#[test_case(false ; "aggregate")]
#[test_case(true ; "aggregate_root")]
#[tokio::test]
#[should_panic(
expected = "update or delete on table aggregate_consumer violates foreign key constraint"
)]
async fn it_should_return_err_for_non_existent_offset_when_trying_checkpoint(root_case: bool) {
let TestData {
repository,
mut root,
consumer,
consumer_root,
..
} = bootstrap_test(false).await;
let mut stream = if root_case {
consumer_root
.resume()
.await
.expect("should resume consumer")
} else {
consumer.resume().await.expect("should resume consumer")
};
root.handle(OrderCommand::Create)
.expect("Should be able to submit command");
repository
.commit_orderly(&mut root)
.await
.expect("Should be able to commit root");
let event = stream.next().await.unwrap().unwrap();
consumer.try_ack(event.id() + 1).await.unwrap();
}