mysql_es/
cqrs.rs

1use cqrs_es::persist::PersistedEventStore;
2use cqrs_es::{Aggregate, CqrsFramework, Query};
3
4use crate::{MysqlCqrs, MysqlEventRepository};
5use sqlx::mysql::MySqlPoolOptions;
6use sqlx::{MySql, Pool};
7
8/// A convenience building a simple connection pool for MySql database.
9pub async fn default_mysql_pool(connection_string: &str) -> Pool<MySql> {
10    MySqlPoolOptions::new()
11        .max_connections(10)
12        .connect(connection_string)
13        .await
14        .expect("unable to connect to database")
15}
16
17/// A convenience method for building a simple connection pool for MySql.
18/// A connection pool is needed for both the event and view repositories.
19///
20/// ```
21/// use sqlx::{MySql, Pool};
22/// use mysql_es::default_mysql_pool;
23///
24/// # async fn configure_pool() {
25/// let connection_string = "mysql://test_user:test_pass@localhost:3306/test";
26/// let pool: Pool<MySql> = default_mysql_pool(connection_string).await;
27/// # }
28/// ```
29pub fn mysql_cqrs<A>(
30    pool: Pool<MySql>,
31    query_processor: Vec<Box<dyn Query<A>>>,
32    services: A::Services,
33) -> MysqlCqrs<A>
34where
35    A: Aggregate,
36{
37    let repo = MysqlEventRepository::new(pool);
38    let store = PersistedEventStore::new_event_store(repo);
39    CqrsFramework::new(store, query_processor, services)
40}
41
42/// A convenience function for creating a CqrsFramework using a snapshot store.
43pub fn mysql_snapshot_cqrs<A>(
44    pool: Pool<MySql>,
45    query_processor: Vec<Box<dyn Query<A>>>,
46    snapshot_size: usize,
47    services: A::Services,
48) -> MysqlCqrs<A>
49where
50    A: Aggregate,
51{
52    let repo = MysqlEventRepository::new(pool);
53    let store = PersistedEventStore::new_snapshot_store(repo, snapshot_size);
54    CqrsFramework::new(store, query_processor, services)
55}
56
57/// A convenience function for creating a CqrsFramework using an aggregate store.
58pub fn mysql_aggregate_cqrs<A>(
59    pool: Pool<MySql>,
60    query_processor: Vec<Box<dyn Query<A>>>,
61    services: A::Services,
62) -> MysqlCqrs<A>
63where
64    A: Aggregate,
65{
66    let repo = MysqlEventRepository::new(pool);
67    let store = PersistedEventStore::new_aggregate_store(repo);
68    CqrsFramework::new(store, query_processor, services)
69}
70
71#[cfg(test)]
72mod test {
73    use crate::testing::tests::{
74        TestAggregate, TestQueryRepository, TestServices, TestView, TEST_CONNECTION_STRING,
75    };
76    use crate::{default_mysql_pool, mysql_cqrs, MysqlViewRepository};
77    use std::sync::Arc;
78
79    #[tokio::test]
80    async fn test_valid_cqrs_framework() {
81        let pool = default_mysql_pool(TEST_CONNECTION_STRING).await;
82        let repo = MysqlViewRepository::<TestView, TestAggregate>::new("test_view", pool.clone());
83        let query = TestQueryRepository::new(Arc::new(repo));
84        let _ps = mysql_cqrs(pool, vec![Box::new(query)], TestServices);
85    }
86}