timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
#![allow(dead_code)]

use chrono::Utc;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
use timesource::aggregate::UncommittedEvent;
use timesource::consumer::{AggregateConsumerStore, Consumer, ConsumerBuilder, RootConsumerStore};
use timesource::repository::Repository;
use timesource::store::{EventStore, EventStoreBuilder, TimescaleStore};
use timesource::{Aggregate, AggregateRoot};
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;
use uuid::Uuid;

use crate::common::order::OrderItem;
use crate::common::user_cbor::UserCommand;

use super::order::{OrderAggregate, OrderCommand, OrderEvent};
use super::user_cbor::{UserAggregate, UserEvent};

pub const DSN: &str = "postgres://postgres@localhost/timesource";

use gabble::Gab;
use rand::{thread_rng, Rng};

pub struct TestData<A, S>
where
    A: Aggregate + 'static,
    S: EventStore<Event = A::Event>,
{
    pub aggregate_id: Uuid,
    pub aggregate_type_id: i32,
    pub aggregate_type_name: String,
    pub pool: Pool<Postgres>,
    pub repository: Repository<A, S>,
    pub root: AggregateRoot<A>,
    pub consumer_name: String,
    pub consumer: Consumer<AggregateConsumerStore<A::Event>>,
    pub consumer_root: Consumer<RootConsumerStore<A::Event>>,
    pub consumer_root_name: String,
}

pub async fn bootstrap_test<'a>(
    with_tracing: bool,
) -> TestData<OrderAggregate, TimescaleStore<OrderEvent>> {
    if with_tracing {
        let filter_layer = EnvFilter::try_from_default_env()
            .or_else(|_| EnvFilter::try_new("timesource=debug"))
            .unwrap();

        tracing_subscriber::registry()
            .with(filter_layer)
            .with(tracing_subscriber::fmt::layer())
            .try_init()
            .unwrap_or_else(|_| {});
    }

    let mut rng = thread_rng();
    let aggregate_type_name: Gab = rng.gen();
    let aggregate_type_name = aggregate_type_name.to_string();
    let consumer_name: Gab = rng.gen();
    let consumer_name = consumer_name.to_string();
    let consumer_root_name: Gab = rng.gen();
    let consumer_root_name = consumer_root_name.to_string();

    let pool = PgPoolOptions::new()
        .connect(DSN)
        .await
        .expect("being able to create Pg pool");

    let store = EventStoreBuilder::new(DSN)
        .build::<OrderEvent>(&aggregate_type_name)
        .await
        .expect("store to be created");

    let consumer = ConsumerBuilder::new(DSN)
        .aggregate_build(consumer_name.clone().into(), &aggregate_type_name)
        .await
        .unwrap();

    let root = OrderAggregate::root();

    let consumer_root = ConsumerBuilder::new(DSN)
        .aggregate_root_build(
            consumer_root_name.clone().into(),
            &aggregate_type_name,
            root.id(),
        )
        .await
        .unwrap();

    let aggregate_type_id = store.aggregate_type_id;
    let repository = Repository::new(store);

    TestData {
        aggregate_id: root.id(),
        aggregate_type_id,
        aggregate_type_name,
        repository,
        root,
        pool,
        consumer_name,
        consumer,
        consumer_root,
        consumer_root_name,
    }
}

pub async fn bootstrap_test_cbor<'a>(
    with_tracing: bool,
) -> TestData<UserAggregate, TimescaleStore<UserEvent>> {
    if with_tracing {
        let filter_layer = EnvFilter::try_from_default_env()
            .or_else(|_| EnvFilter::try_new("timesource=debug"))
            .unwrap();

        tracing_subscriber::registry()
            .with(filter_layer)
            .with(tracing_subscriber::fmt::layer())
            .try_init()
            .unwrap_or_else(|_| {});
    }

    let mut rng = thread_rng();
    let aggregate_type_name: Gab = rng.gen();
    let aggregate_type_name = aggregate_type_name.to_string();
    let consumer_name: Gab = rng.gen();
    let consumer_name = consumer_name.to_string();
    let consumer_root_name: Gab = rng.gen();
    let consumer_root_name = consumer_root_name.to_string();

    let pool = PgPoolOptions::new()
        .connect(DSN)
        .await
        .expect("being able to create Pg pool");

    let store = EventStoreBuilder::new(DSN)
        .build::<UserEvent>(&aggregate_type_name)
        .await
        .expect("store to be created");

    let consumer = ConsumerBuilder::new(DSN)
        .aggregate_build(consumer_name.clone().into(), &aggregate_type_name)
        .await
        .unwrap();

    let root = UserAggregate::root();

    let consumer_root = ConsumerBuilder::new(DSN)
        .aggregate_root_build(
            consumer_root_name.clone().into(),
            &aggregate_type_name,
            root.id(),
        )
        .await
        .unwrap();

    let aggregate_type_id = store.aggregate_type_id;
    let repository = Repository::new(store);

    TestData {
        aggregate_id: root.id(),
        aggregate_type_id,
        aggregate_type_name,
        repository,
        root,
        pool,
        consumer_name,
        consumer,
        consumer_root,
        consumer_root_name,
    }
}

#[allow(dead_code)]
pub fn sample_commands() -> Vec<OrderCommand> {
    vec![
        OrderCommand::Create,
        OrderCommand::AddItem {
            item: OrderItem {
                item_sku: "sku-123".into(),
                quantity: 1,
                price: 12,
            },
        },
        OrderCommand::Empty("for no reason".to_string()),
    ]
}

#[allow(dead_code)]
pub fn sample_cbor_commands() -> Vec<UserCommand> {
    vec![
        UserCommand::Create,
        UserCommand::SetName {
            name: "John".to_string(),
            surname: None,
        },
    ]
}

#[allow(dead_code)]
pub fn sample_cbor_events() -> Vec<UserEvent> {
    vec![
        UserEvent::Created,
        UserEvent::NameChanged {
            name: "John".to_string(),
            surname: None,
        },
    ]
}

pub fn sample_events() -> Vec<OrderEvent> {
    vec![
        OrderEvent::Created,
        OrderEvent::ItemAdded {
            item: OrderItem {
                item_sku: "sku-123".into(),
                quantity: 1,
                price: 12,
            },
        },
        OrderEvent::Emptied("for no reason".into()),
    ]
}

#[allow(dead_code)]
pub fn sample_events_uncommitted() -> Vec<UncommittedEvent<OrderEvent>> {
    sample_events()
        .into_iter()
        .map(|data| UncommittedEvent {
            utc: Utc::now(),
            data,
        })
        .collect::<Vec<_>>()
}