postgres_es/
cqrs.rs

1use cqrs_es::persist::PersistedEventStore;
2use cqrs_es::{Aggregate, CqrsFramework, Query};
3
4use crate::{PostgresCqrs, PostgresEventRepository};
5use sqlx::postgres::PgPoolOptions;
6use sqlx::{Pool, Postgres};
7
8/// A convenience method for building a simple connection pool for PostgresDb.
9/// A connection pool is needed for both the event and view repositories.
10///
11/// ```
12/// use sqlx::{Pool, Postgres};
13/// use postgres_es::default_postgress_pool;
14///
15/// # async fn configure_pool() {
16/// let connection_string = "postgresql://test_user:test_pass@localhost:5432/test";
17/// let pool: Pool<Postgres> = default_postgress_pool(connection_string).await;
18/// # }
19/// ```
20pub async fn default_postgress_pool(connection_string: &str) -> Pool<Postgres> {
21    PgPoolOptions::new()
22        .max_connections(10)
23        .connect(connection_string)
24        .await
25        .expect("unable to connect to database")
26}
27
28/// A convenience function for creating a CqrsFramework from a database connection pool
29/// and queries.
30pub fn postgres_cqrs<A>(
31    pool: Pool<Postgres>,
32    query_processor: Vec<Box<dyn Query<A>>>,
33    services: A::Services,
34) -> PostgresCqrs<A>
35where
36    A: Aggregate,
37{
38    let repo = PostgresEventRepository::new(pool);
39    let store = PersistedEventStore::new_event_store(repo);
40    CqrsFramework::new(store, query_processor, services)
41}
42
43/// A convenience function for creating a CqrsFramework using a snapshot store.
44pub fn postgres_snapshot_cqrs<A>(
45    pool: Pool<Postgres>,
46    query_processor: Vec<Box<dyn Query<A>>>,
47    snapshot_size: usize,
48    services: A::Services,
49) -> PostgresCqrs<A>
50where
51    A: Aggregate,
52{
53    let repo = PostgresEventRepository::new(pool);
54    let store = PersistedEventStore::new_snapshot_store(repo, snapshot_size);
55    CqrsFramework::new(store, query_processor, services)
56}
57
58/// A convenience function for creating a CqrsFramework using an aggregate store.
59pub fn postgres_aggregate_cqrs<A>(
60    pool: Pool<Postgres>,
61    query_processor: Vec<Box<dyn Query<A>>>,
62    services: A::Services,
63) -> PostgresCqrs<A>
64where
65    A: Aggregate,
66{
67    let repo = PostgresEventRepository::new(pool);
68    let store = PersistedEventStore::new_aggregate_store(repo);
69    CqrsFramework::new(store, query_processor, services)
70}
71
72#[cfg(test)]
73mod test {
74    use crate::testing::tests::{
75        TestAggregate, TestQueryRepository, TestServices, TestView, TEST_CONNECTION_STRING,
76    };
77    use crate::{default_postgress_pool, postgres_cqrs, PostgresViewRepository};
78    use std::sync::Arc;
79
80    #[tokio::test]
81    async fn test_valid_cqrs_framework() {
82        let pool = default_postgress_pool(TEST_CONNECTION_STRING).await;
83        let repo =
84            PostgresViewRepository::<TestView, TestAggregate>::new("test_view", pool.clone());
85        let query = TestQueryRepository::new(Arc::new(repo));
86        let _ps = postgres_cqrs(pool, vec![Box::new(query)], TestServices);
87    }
88}