1use cqrs_es::persist::PersistedEventStore;
2use cqrs_es::{Aggregate, CqrsFramework, Query};
3
4use crate::{SqliteCqrs, SqliteEventRepository};
5use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
6use sqlx::{Pool, Sqlite};
7use std::str::FromStr;
8
9pub async fn default_sqlite_pool(connection_string: &str) -> Pool<Sqlite> {
22 let options = SqliteConnectOptions::from_str(connection_string)
23 .unwrap()
24 .create_if_missing(true)
25 .journal_mode(SqliteJournalMode::Wal)
26 .synchronous(SqliteSynchronous::Normal)
27 .foreign_keys(true);
28
29 SqlitePoolOptions::new()
30 .max_connections(10)
31 .connect_with(options)
32 .await
33 .expect("unable to connect to database")
34}
35
36pub fn sqlite_cqrs<A>(
39 pool: Pool<Sqlite>,
40 query_processor: Vec<Box<dyn Query<A>>>,
41 services: A::Services,
42) -> SqliteCqrs<A>
43where
44 A: Aggregate,
45{
46 let repo = SqliteEventRepository::new(pool);
47 let store = PersistedEventStore::new_event_store(repo);
48 CqrsFramework::new(store, query_processor, services)
49}
50
51pub fn sqlite_snapshot_cqrs<A>(
53 pool: Pool<Sqlite>,
54 query_processor: Vec<Box<dyn Query<A>>>,
55 snapshot_size: usize,
56 services: A::Services,
57) -> SqliteCqrs<A>
58where
59 A: Aggregate,
60{
61 let repo = SqliteEventRepository::new(pool);
62 let store = PersistedEventStore::new_snapshot_store(repo, snapshot_size);
63 CqrsFramework::new(store, query_processor, services)
64}
65
66pub fn sqlite_aggregate_cqrs<A>(
68 pool: Pool<Sqlite>,
69 query_processor: Vec<Box<dyn Query<A>>>,
70 services: A::Services,
71) -> SqliteCqrs<A>
72where
73 A: Aggregate,
74{
75 let repo = SqliteEventRepository::new(pool);
76 let store = PersistedEventStore::new_aggregate_store(repo);
77 CqrsFramework::new(store, query_processor, services)
78}
79
80#[cfg(test)]
81mod test {
82 use crate::testing::tests::{
83 TestAggregate, TestQueryRepository, TestServices, TestView, TEST_CONNECTION_STRING,
84 };
85 use crate::{default_sqlite_pool, sqlite_cqrs, SqliteViewRepository};
86 use std::sync::Arc;
87
88 #[tokio::test]
89 async fn test_valid_cqrs_framework() {
90 let pool = default_sqlite_pool(TEST_CONNECTION_STRING).await;
91 sqlx::migrate!().run(&pool).await.unwrap();
92
93 let repo = SqliteViewRepository::<TestView, TestAggregate>::new("test_view", pool.clone());
94 let query = TestQueryRepository::new(Arc::new(repo));
95 let _ps = sqlite_cqrs(pool, vec![Box::new(query)], TestServices);
96 }
97}