#![allow(dead_code)]
use chrono::Utc;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
use timesource::aggregate::UncommittedEvent;
use timesource::consumer::{AggregateConsumerStore, Consumer, ConsumerBuilder, RootConsumerStore};
use timesource::repository::Repository;
use timesource::store::{EventStore, EventStoreBuilder, TimescaleStore};
use timesource::{Aggregate, AggregateRoot};
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;
use uuid::Uuid;
use crate::common::order::OrderItem;
use crate::common::user_cbor::UserCommand;
use super::order::{OrderAggregate, OrderCommand, OrderEvent};
use super::user_cbor::{UserAggregate, UserEvent};
pub const DSN: &str = "postgres://postgres@localhost/timesource";
use gabble::Gab;
use rand::{thread_rng, Rng};
pub struct TestData<A, S>
where
A: Aggregate + 'static,
S: EventStore<Event = A::Event>,
{
pub aggregate_id: Uuid,
pub aggregate_type_id: i32,
pub aggregate_type_name: String,
pub pool: Pool<Postgres>,
pub repository: Repository<A, S>,
pub root: AggregateRoot<A>,
pub consumer_name: String,
pub consumer: Consumer<AggregateConsumerStore<A::Event>>,
pub consumer_root: Consumer<RootConsumerStore<A::Event>>,
pub consumer_root_name: String,
}
pub async fn bootstrap_test<'a>(
with_tracing: bool,
) -> TestData<OrderAggregate, TimescaleStore<OrderEvent>> {
if with_tracing {
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("timesource=debug"))
.unwrap();
tracing_subscriber::registry()
.with(filter_layer)
.with(tracing_subscriber::fmt::layer())
.try_init()
.unwrap_or_else(|_| {});
}
let mut rng = thread_rng();
let aggregate_type_name: Gab = rng.gen();
let aggregate_type_name = aggregate_type_name.to_string();
let consumer_name: Gab = rng.gen();
let consumer_name = consumer_name.to_string();
let consumer_root_name: Gab = rng.gen();
let consumer_root_name = consumer_root_name.to_string();
let pool = PgPoolOptions::new()
.connect(DSN)
.await
.expect("being able to create Pg pool");
let store = EventStoreBuilder::new(DSN)
.build::<OrderEvent>(&aggregate_type_name)
.await
.expect("store to be created");
let consumer = ConsumerBuilder::new(DSN)
.aggregate_build(consumer_name.clone().into(), &aggregate_type_name)
.await
.unwrap();
let root = OrderAggregate::root();
let consumer_root = ConsumerBuilder::new(DSN)
.aggregate_root_build(
consumer_root_name.clone().into(),
&aggregate_type_name,
root.id(),
)
.await
.unwrap();
let aggregate_type_id = store.aggregate_type_id;
let repository = Repository::new(store);
TestData {
aggregate_id: root.id(),
aggregate_type_id,
aggregate_type_name,
repository,
root,
pool,
consumer_name,
consumer,
consumer_root,
consumer_root_name,
}
}
pub async fn bootstrap_test_cbor<'a>(
with_tracing: bool,
) -> TestData<UserAggregate, TimescaleStore<UserEvent>> {
if with_tracing {
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("timesource=debug"))
.unwrap();
tracing_subscriber::registry()
.with(filter_layer)
.with(tracing_subscriber::fmt::layer())
.try_init()
.unwrap_or_else(|_| {});
}
let mut rng = thread_rng();
let aggregate_type_name: Gab = rng.gen();
let aggregate_type_name = aggregate_type_name.to_string();
let consumer_name: Gab = rng.gen();
let consumer_name = consumer_name.to_string();
let consumer_root_name: Gab = rng.gen();
let consumer_root_name = consumer_root_name.to_string();
let pool = PgPoolOptions::new()
.connect(DSN)
.await
.expect("being able to create Pg pool");
let store = EventStoreBuilder::new(DSN)
.build::<UserEvent>(&aggregate_type_name)
.await
.expect("store to be created");
let consumer = ConsumerBuilder::new(DSN)
.aggregate_build(consumer_name.clone().into(), &aggregate_type_name)
.await
.unwrap();
let root = UserAggregate::root();
let consumer_root = ConsumerBuilder::new(DSN)
.aggregate_root_build(
consumer_root_name.clone().into(),
&aggregate_type_name,
root.id(),
)
.await
.unwrap();
let aggregate_type_id = store.aggregate_type_id;
let repository = Repository::new(store);
TestData {
aggregate_id: root.id(),
aggregate_type_id,
aggregate_type_name,
repository,
root,
pool,
consumer_name,
consumer,
consumer_root,
consumer_root_name,
}
}
#[allow(dead_code)]
pub fn sample_commands() -> Vec<OrderCommand> {
vec![
OrderCommand::Create,
OrderCommand::AddItem {
item: OrderItem {
item_sku: "sku-123".into(),
quantity: 1,
price: 12,
},
},
OrderCommand::Empty("for no reason".to_string()),
]
}
#[allow(dead_code)]
pub fn sample_cbor_commands() -> Vec<UserCommand> {
vec![
UserCommand::Create,
UserCommand::SetName {
name: "John".to_string(),
surname: None,
},
]
}
#[allow(dead_code)]
pub fn sample_cbor_events() -> Vec<UserEvent> {
vec![
UserEvent::Created,
UserEvent::NameChanged {
name: "John".to_string(),
surname: None,
},
]
}
pub fn sample_events() -> Vec<OrderEvent> {
vec![
OrderEvent::Created,
OrderEvent::ItemAdded {
item: OrderItem {
item_sku: "sku-123".into(),
quantity: 1,
price: 12,
},
},
OrderEvent::Emptied("for no reason".into()),
]
}
#[allow(dead_code)]
pub fn sample_events_uncommitted() -> Vec<UncommittedEvent<OrderEvent>> {
sample_events()
.into_iter()
.map(|data| UncommittedEvent {
utc: Utc::now(),
data,
})
.collect::<Vec<_>>()
}