mod common;
use chrono::Utc;
use common::order::OrderCommand;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use serde_json::Value as Json;
use sqlx::FromRow;
use test_case::test_case;
use timesource::aggregate::UncommittedEvent;
use timesource::store::{CommitOrder, EventStore, EventStoreBuilder};
use timesource::{Aggregate, TimesourceEvent};
use uuid::Uuid;
use crate::common::data::{
bootstrap_test, sample_commands, sample_events_uncommitted, TestData, DSN,
};
use crate::common::order::{OrderAggregate, OrderEvent};
#[allow(dead_code)]
#[derive(Debug, FromRow)]
struct EventRow {
aggregate_type_id: i32,
name: String,
payload: Option<Json>,
time: i64,
}
#[allow(dead_code)]
#[derive(Debug, FromRow)]
struct EventRowName {
aggregate_type_id: i32,
id: i32,
name: String,
}
#[tokio::test]
async fn different_aggregate_types_can_share_id() {
let TestData {
aggregate_type_name,
..
} = bootstrap_test(false).await;
let shared_id = Uuid::new_v4();
#[derive(Debug, Clone, Serialize, Deserialize, TimesourceEvent)]
enum Store2Event {
Something,
}
let event_store_builder = EventStoreBuilder::new(DSN);
let store = event_store_builder
.build::<OrderEvent>(&aggregate_type_name)
.await
.expect("store to be created");
let store2 = event_store_builder
.build::<Store2Event>("orders2")
.await
.expect("store to be created");
let events1 = sample_events_uncommitted();
let events2 = vec![Store2Event::Something; events1.len()]
.into_iter()
.map(|data| UncommittedEvent {
utc: Utc::now(),
data,
})
.collect::<Vec<_>>();
store
.commit(shared_id, CommitOrder::None, &events1)
.await
.expect("Failed appending events");
store2
.commit(shared_id, CommitOrder::None, &events2)
.await
.expect("Failed appending events");
let read_events = store
.aggregate_stream(shared_id)
.await
.try_collect::<Vec<_>>()
.await
.expect("failed to collect first stream of events from consumer");
let read_events2 = store2
.aggregate_stream(shared_id)
.await
.try_collect::<Vec<_>>()
.await
.expect("failed to collect second stream of events from consumer");
for (i, event) in read_events.iter().enumerate() {
assert_eq!(
event.aggregate_id(),
read_events2[i].aggregate_id(),
"Expected event source ids to be the same between stores"
);
}
}
#[tokio::test]
#[should_panic(expected = "list of events can't be empty")]
async fn return_err_given_no_events() {
let TestData {
aggregate_type_name,
..
} = bootstrap_test(false).await;
let store = EventStoreBuilder::new(DSN)
.build::<OrderEvent>(&aggregate_type_name)
.await
.expect("store to be created");
store
.commit(Uuid::new_v4(), CommitOrder::None, &[])
.await
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "AlreadyCreated")]
async fn should_propagate_aggregate_err_when_handling_command() {
let TestData {
repository,
mut root,
..
} = bootstrap_test(false).await;
root.handle(OrderCommand::Create).unwrap();
root.handle(OrderCommand::Create).unwrap();
repository.commit_unorderly(&mut root).await.unwrap();
}
#[tokio::test]
#[should_panic(expected = "invalid aggregate time provided")]
async fn return_err_for_ordered_events_which_are_not_first() {
let TestData {
aggregate_id,
repository,
root,
..
} = bootstrap_test(false).await;
let mut system_a_root = root;
let mut system_b_root = OrderAggregate::root_with_id(aggregate_id);
for command in sample_commands() {
system_a_root
.handle(command)
.expect("Failed handling event for systemA");
}
for command in sample_commands() {
system_b_root
.handle(command)
.expect("Failed handling event for systemA");
}
let system_a_events = system_a_root.uncommitted_events();
let system_b_events = system_b_root.uncommitted_events();
for (index, event) in system_a_events.iter().enumerate() {
assert!(
event.utc < system_b_events[index].utc,
"Something is not right with this test. Events in System A should be created before B"
);
}
repository.commit_orderly(&mut system_b_root).await.unwrap();
repository.commit_orderly(&mut system_a_root).await.unwrap();
}
#[tokio::test]
#[should_panic(expected = "invalid aggregate time provided")]
async fn return_err_given_unordered_events_and_existing_offset() {
let TestData {
aggregate_id,
repository,
root,
..
} = bootstrap_test(false).await;
let mut system_a_root = root;
let mut system_b_root = OrderAggregate::root_with_id(aggregate_id);
for command in sample_commands() {
system_a_root
.handle(command)
.expect("Failed handling event for systemA");
}
for command in sample_commands() {
system_b_root
.handle(command)
.expect("Failed handling event for systemA");
}
let system_a_events = system_a_root.uncommitted_events();
let system_b_events = system_b_root.uncommitted_events();
for (index, event) in system_a_events.iter().enumerate() {
assert!(
event.utc < system_b_events[index].utc,
"Something is not right with this test. Events in System A should be created before B"
);
}
repository.commit_orderly(&mut system_b_root).await.unwrap();
repository.commit_orderly(&mut system_a_root).await.unwrap();
}
#[test_case(true; "orderly")]
#[test_case(false; "unorderly")]
#[tokio::test]
async fn should_return_committed_ids(orderly: bool) {
let TestData {
repository,
mut root,
..
} = bootstrap_test(true).await;
let commands = sample_commands();
let expected_total = commands.len();
for command in commands {
root.handle(command).unwrap();
}
let output = if orderly {
repository.commit_orderly(&mut root).await.unwrap()
} else {
repository.commit_unorderly(&mut root).await.unwrap()
};
assert_eq!(output.len(), expected_total);
}