1use cqrs_es::persist::PersistedEventStore;
2use cqrs_es::{Aggregate, CqrsFramework, Query};
3
4use crate::{MysqlCqrs, MysqlEventRepository};
5use sqlx::mysql::MySqlPoolOptions;
6use sqlx::{MySql, Pool};
7
8pub async fn default_mysql_pool(connection_string: &str) -> Pool<MySql> {
10 MySqlPoolOptions::new()
11 .max_connections(10)
12 .connect(connection_string)
13 .await
14 .expect("unable to connect to database")
15}
16
17pub fn mysql_cqrs<A>(
30 pool: Pool<MySql>,
31 query_processor: Vec<Box<dyn Query<A>>>,
32 services: A::Services,
33) -> MysqlCqrs<A>
34where
35 A: Aggregate,
36{
37 let repo = MysqlEventRepository::new(pool);
38 let store = PersistedEventStore::new_event_store(repo);
39 CqrsFramework::new(store, query_processor, services)
40}
41
42pub fn mysql_snapshot_cqrs<A>(
44 pool: Pool<MySql>,
45 query_processor: Vec<Box<dyn Query<A>>>,
46 snapshot_size: usize,
47 services: A::Services,
48) -> MysqlCqrs<A>
49where
50 A: Aggregate,
51{
52 let repo = MysqlEventRepository::new(pool);
53 let store = PersistedEventStore::new_snapshot_store(repo, snapshot_size);
54 CqrsFramework::new(store, query_processor, services)
55}
56
57pub fn mysql_aggregate_cqrs<A>(
59 pool: Pool<MySql>,
60 query_processor: Vec<Box<dyn Query<A>>>,
61 services: A::Services,
62) -> MysqlCqrs<A>
63where
64 A: Aggregate,
65{
66 let repo = MysqlEventRepository::new(pool);
67 let store = PersistedEventStore::new_aggregate_store(repo);
68 CqrsFramework::new(store, query_processor, services)
69}
70
71#[cfg(test)]
72mod test {
73 use crate::testing::tests::{
74 TestAggregate, TestQueryRepository, TestServices, TestView, TEST_CONNECTION_STRING,
75 };
76 use crate::{default_mysql_pool, mysql_cqrs, MysqlViewRepository};
77 use std::sync::Arc;
78
79 #[tokio::test]
80 async fn test_valid_cqrs_framework() {
81 let pool = default_mysql_pool(TEST_CONNECTION_STRING).await;
82 let repo = MysqlViewRepository::<TestView, TestAggregate>::new("test_view", pool.clone());
83 let query = TestQueryRepository::new(Arc::new(repo));
84 let _ps = mysql_cqrs(pool, vec![Box::new(query)], TestServices);
85 }
86}