1use cqrs_es::persist::PersistedEventStore;
2use cqrs_es::{Aggregate, CqrsFramework, Query};
3
4use crate::{PostgresCqrs, PostgresEventRepository};
5use sqlx::postgres::PgPoolOptions;
6use sqlx::{Pool, Postgres};
7
8pub async fn default_postgress_pool(connection_string: &str) -> Pool<Postgres> {
21 PgPoolOptions::new()
22 .max_connections(10)
23 .connect(connection_string)
24 .await
25 .expect("unable to connect to database")
26}
27
28pub fn postgres_cqrs<A>(
31 pool: Pool<Postgres>,
32 query_processor: Vec<Box<dyn Query<A>>>,
33 services: A::Services,
34) -> PostgresCqrs<A>
35where
36 A: Aggregate,
37{
38 let repo = PostgresEventRepository::new(pool);
39 let store = PersistedEventStore::new_event_store(repo);
40 CqrsFramework::new(store, query_processor, services)
41}
42
43pub fn postgres_snapshot_cqrs<A>(
45 pool: Pool<Postgres>,
46 query_processor: Vec<Box<dyn Query<A>>>,
47 snapshot_size: usize,
48 services: A::Services,
49) -> PostgresCqrs<A>
50where
51 A: Aggregate,
52{
53 let repo = PostgresEventRepository::new(pool);
54 let store = PersistedEventStore::new_snapshot_store(repo, snapshot_size);
55 CqrsFramework::new(store, query_processor, services)
56}
57
58pub fn postgres_aggregate_cqrs<A>(
60 pool: Pool<Postgres>,
61 query_processor: Vec<Box<dyn Query<A>>>,
62 services: A::Services,
63) -> PostgresCqrs<A>
64where
65 A: Aggregate,
66{
67 let repo = PostgresEventRepository::new(pool);
68 let store = PersistedEventStore::new_aggregate_store(repo);
69 CqrsFramework::new(store, query_processor, services)
70}
71
72#[cfg(test)]
73mod test {
74 use crate::testing::tests::{
75 TestAggregate, TestQueryRepository, TestServices, TestView, TEST_CONNECTION_STRING,
76 };
77 use crate::{default_postgress_pool, postgres_cqrs, PostgresViewRepository};
78 use std::sync::Arc;
79
80 #[tokio::test]
81 async fn test_valid_cqrs_framework() {
82 let pool = default_postgress_pool(TEST_CONNECTION_STRING).await;
83 let repo =
84 PostgresViewRepository::<TestView, TestAggregate>::new("test_view", pool.clone());
85 let query = TestQueryRepository::new(Arc::new(repo));
86 let _ps = postgres_cqrs(pool, vec![Box::new(query)], TestServices);
87 }
88}