sqlite_es/
cqrs.rs

1use cqrs_es::persist::PersistedEventStore;
2use cqrs_es::{Aggregate, CqrsFramework, Query};
3
4use crate::{SqliteCqrs, SqliteEventRepository};
5use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
6use sqlx::{Pool, Sqlite};
7use std::str::FromStr;
8
9/// A convenience method for building a simple connection pool for SQLite.
10/// A connection pool is needed for both the event and view repositories.
11///
12/// ```
13/// use sqlx::{Pool, Sqlite};
14/// use sqlite_es::default_sqlite_pool;
15///
16/// # async fn configure_pool() {
17/// let connection_string = "sqlite://db/test.db";
18/// let pool: Pool<Sqlite> = default_sqlite_pool(connection_string).await;
19/// # }
20/// ```
21pub async fn default_sqlite_pool(connection_string: &str) -> Pool<Sqlite> {
22    let options = SqliteConnectOptions::from_str(connection_string)
23        .unwrap()
24        .create_if_missing(true)
25        .journal_mode(SqliteJournalMode::Wal)
26        .synchronous(SqliteSynchronous::Normal)
27        .foreign_keys(true);
28
29    SqlitePoolOptions::new()
30        .max_connections(10)
31        .connect_with(options)
32        .await
33        .expect("unable to connect to database")
34}
35
36/// A convenience function for creating a CqrsFramework from a database connection pool
37/// and queries.
38pub fn sqlite_cqrs<A>(
39    pool: Pool<Sqlite>,
40    query_processor: Vec<Box<dyn Query<A>>>,
41    services: A::Services,
42) -> SqliteCqrs<A>
43where
44    A: Aggregate,
45{
46    let repo = SqliteEventRepository::new(pool);
47    let store = PersistedEventStore::new_event_store(repo);
48    CqrsFramework::new(store, query_processor, services)
49}
50
51/// A convenience function for creating a CqrsFramework using a snapshot store.
52pub fn sqlite_snapshot_cqrs<A>(
53    pool: Pool<Sqlite>,
54    query_processor: Vec<Box<dyn Query<A>>>,
55    snapshot_size: usize,
56    services: A::Services,
57) -> SqliteCqrs<A>
58where
59    A: Aggregate,
60{
61    let repo = SqliteEventRepository::new(pool);
62    let store = PersistedEventStore::new_snapshot_store(repo, snapshot_size);
63    CqrsFramework::new(store, query_processor, services)
64}
65
66/// A convenience function for creating a CqrsFramework using an aggregate store.
67pub fn sqlite_aggregate_cqrs<A>(
68    pool: Pool<Sqlite>,
69    query_processor: Vec<Box<dyn Query<A>>>,
70    services: A::Services,
71) -> SqliteCqrs<A>
72where
73    A: Aggregate,
74{
75    let repo = SqliteEventRepository::new(pool);
76    let store = PersistedEventStore::new_aggregate_store(repo);
77    CqrsFramework::new(store, query_processor, services)
78}
79
80#[cfg(test)]
81mod test {
82    use crate::testing::tests::{
83        TestAggregate, TestQueryRepository, TestServices, TestView, TEST_CONNECTION_STRING,
84    };
85    use crate::{default_sqlite_pool, sqlite_cqrs, SqliteViewRepository};
86    use std::sync::Arc;
87
88    #[tokio::test]
89    async fn test_valid_cqrs_framework() {
90        let pool = default_sqlite_pool(TEST_CONNECTION_STRING).await;
91        sqlx::migrate!().run(&pool).await.unwrap();
92
93        let repo = SqliteViewRepository::<TestView, TestAggregate>::new("test_view", pool.clone());
94        let query = TestQueryRepository::new(Arc::new(repo));
95        let _ps = sqlite_cqrs(pool, vec![Box::new(query)], TestServices);
96    }
97}