timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
mod common;

use chrono::Utc;
use common::order::OrderCommand;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use serde_json::Value as Json;
use sqlx::FromRow;
use test_case::test_case;
use timesource::aggregate::UncommittedEvent;
use timesource::store::{CommitOrder, EventStore, EventStoreBuilder};
use timesource::{Aggregate, TimesourceEvent};
use uuid::Uuid;

use crate::common::data::{
    bootstrap_test, sample_commands, sample_events_uncommitted, TestData, DSN,
};
use crate::common::order::{OrderAggregate, OrderEvent};

#[allow(dead_code)]
#[derive(Debug, FromRow)]
struct EventRow {
    aggregate_type_id: i32,
    name: String,
    payload: Option<Json>,
    time: i64,
}

#[allow(dead_code)]
#[derive(Debug, FromRow)]
struct EventRowName {
    aggregate_type_id: i32,
    id: i32,
    name: String,
}

#[tokio::test]
async fn different_aggregate_types_can_share_id() {
    let TestData {
        aggregate_type_name,
        ..
    } = bootstrap_test(false).await;

    let shared_id = Uuid::new_v4();

    #[derive(Debug, Clone, Serialize, Deserialize, TimesourceEvent)]
    enum Store2Event {
        Something,
    }

    let event_store_builder = EventStoreBuilder::new(DSN);
    let store = event_store_builder
        .build::<OrderEvent>(&aggregate_type_name)
        .await
        .expect("store to be created");

    let store2 = event_store_builder
        .build::<Store2Event>("orders2")
        .await
        .expect("store to be created");

    let events1 = sample_events_uncommitted();

    let events2 = vec![Store2Event::Something; events1.len()]
        .into_iter()
        .map(|data| UncommittedEvent {
            utc: Utc::now(),
            data,
        })
        .collect::<Vec<_>>();

    store
        .commit(shared_id, CommitOrder::None, &events1)
        .await
        .expect("Failed appending events");

    store2
        .commit(shared_id, CommitOrder::None, &events2)
        .await
        .expect("Failed appending events");

    let read_events = store
        .aggregate_stream(shared_id)
        .await
        .try_collect::<Vec<_>>()
        .await
        .expect("failed to collect first stream of events from consumer");

    let read_events2 = store2
        .aggregate_stream(shared_id)
        .await
        .try_collect::<Vec<_>>()
        .await
        .expect("failed to collect second stream of events from consumer");

    for (i, event) in read_events.iter().enumerate() {
        assert_eq!(
            event.aggregate_id(),
            read_events2[i].aggregate_id(),
            "Expected event source ids to be the same between stores"
        );
    }
}

#[tokio::test]
#[should_panic(expected = "list of events can't be empty")]
async fn return_err_given_no_events() {
    let TestData {
        aggregate_type_name,
        ..
    } = bootstrap_test(false).await;

    let store = EventStoreBuilder::new(DSN)
        .build::<OrderEvent>(&aggregate_type_name)
        .await
        .expect("store to be created");

    store
        .commit(Uuid::new_v4(), CommitOrder::None, &[])
        .await
        .unwrap();
}

#[tokio::test]
#[should_panic(expected = "AlreadyCreated")]
async fn should_propagate_aggregate_err_when_handling_command() {
    let TestData {
        repository,
        mut root,
        ..
    } = bootstrap_test(false).await;

    // these are invalid events
    root.handle(OrderCommand::Create).unwrap();
    root.handle(OrderCommand::Create).unwrap();

    repository.commit_unorderly(&mut root).await.unwrap();
}

// same as before, but when committed orderly, it shouldn't allow it
#[tokio::test]
#[should_panic(expected = "invalid aggregate time provided")]
async fn return_err_for_ordered_events_which_are_not_first() {
    let TestData {
        aggregate_id,
        repository,
        root,
        ..
    } = bootstrap_test(false).await;

    let mut system_a_root = root;
    let mut system_b_root = OrderAggregate::root_with_id(aggregate_id);

    // given system A handles first commands first
    for command in sample_commands() {
        system_a_root
            .handle(command)
            .expect("Failed handling event for systemA");
    }

    // given system B handles first commands later on
    for command in sample_commands() {
        system_b_root
            .handle(command)
            .expect("Failed handling event for systemA");
    }

    let system_a_events = system_a_root.uncommitted_events();
    let system_b_events = system_b_root.uncommitted_events();
    for (index, event) in system_a_events.iter().enumerate() {
        assert!(
            event.utc < system_b_events[index].utc,
            "Something is not right with this test. Events in System A should be created before B"
        );
    }

    // but then system B commit first
    repository.commit_orderly(&mut system_b_root).await.unwrap();

    // and system A events arrive later so they should not be saved
    repository.commit_orderly(&mut system_a_root).await.unwrap();
}

#[tokio::test]
#[should_panic(expected = "invalid aggregate time provided")]
async fn return_err_given_unordered_events_and_existing_offset() {
    let TestData {
        aggregate_id,
        repository,
        root,
        ..
    } = bootstrap_test(false).await;

    let mut system_a_root = root;
    let mut system_b_root = OrderAggregate::root_with_id(aggregate_id);

    // given system A handles commands first
    for command in sample_commands() {
        system_a_root
            .handle(command)
            .expect("Failed handling event for systemA");
    }

    // given system B handles commands later on
    for command in sample_commands() {
        system_b_root
            .handle(command)
            .expect("Failed handling event for systemA");
    }

    let system_a_events = system_a_root.uncommitted_events();
    let system_b_events = system_b_root.uncommitted_events();
    for (index, event) in system_a_events.iter().enumerate() {
        assert!(
            event.utc < system_b_events[index].utc,
            "Something is not right with this test. Events in System A should be created before B"
        );
    }

    // but then system B commit first
    repository.commit_orderly(&mut system_b_root).await.unwrap();

    // and system A events arrive later so they should not be saved
    repository.commit_orderly(&mut system_a_root).await.unwrap();
}

#[test_case(true; "orderly")]
#[test_case(false; "unorderly")]
#[tokio::test]
async fn should_return_committed_ids(orderly: bool) {
    let TestData {
        repository,
        mut root,
        ..
    } = bootstrap_test(true).await;

    let commands = sample_commands();
    let expected_total = commands.len();
    for command in commands {
        root.handle(command).unwrap();
    }

    let output = if orderly {
        repository.commit_orderly(&mut root).await.unwrap()
    } else {
        repository.commit_unorderly(&mut root).await.unwrap()
    };

    assert_eq!(output.len(), expected_total);
}