Skip to main content

cala_ledger/ledger/
mod.rs

1pub mod config;
2pub mod error;
3
4use es_entity::clock::ClockHandle;
5use sqlx::PgPool;
6pub use tracing::instrument;
7use tracing::Instrument;
8
9pub use config::*;
10use error::*;
11
12use crate::{
13    account::Accounts,
14    account_set::AccountSets,
15    balance::Balances,
16    entry::Entries,
17    journal::Journals,
18    outbox::OutboxPublisher,
19    primitives::TransactionId,
20    transaction::{Transaction, Transactions},
21    tx_template::{Params, TxTemplates},
22    velocity::Velocities,
23};
24
25#[derive(Clone)]
26pub struct CalaLedger {
27    pool: PgPool,
28    clock: ClockHandle,
29    accounts: Accounts,
30    account_sets: AccountSets,
31    journals: Journals,
32    transactions: Transactions,
33    tx_templates: TxTemplates,
34    entries: Entries,
35    velocities: Velocities,
36    balances: Balances,
37    publisher: OutboxPublisher,
38}
39
40impl CalaLedger {
41    #[instrument(name = "cala_ledger.init", skip_all)]
42    pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
43        let pool = match (config.pool, config.pg_con) {
44            (Some(pool), None) => pool,
45            (None, Some(pg_con)) => {
46                let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
47                if let Some(max_connections) = config.max_connections {
48                    pool_opts = pool_opts.max_connections(max_connections);
49                }
50                pool_opts.connect(&pg_con).await?
51            }
52            _ => {
53                return Err(LedgerError::ConfigError(
54                    "One of pg_con or pool must be set".to_string(),
55                ))
56            }
57        };
58        if config.exec_migrations {
59            sqlx::migrate!()
60                .run(&pool)
61                .instrument(tracing::info_span!("cala_ledger.migrations"))
62                .await?;
63        }
64
65        let clock = config.clock;
66        let publisher = OutboxPublisher::init(&pool, &clock).await?;
67        let accounts = Accounts::new(&pool, &publisher, &clock);
68        let journals = Journals::new(&pool, &publisher, &clock);
69        let tx_templates = TxTemplates::new(&pool, &publisher, &clock);
70        let transactions = Transactions::new(&pool, &publisher);
71        let entries = Entries::new(&pool, &publisher);
72        let balances = Balances::new(&pool, &publisher, &journals);
73        let velocities = Velocities::new(&pool, &clock);
74        let account_sets =
75            AccountSets::new(&pool, &publisher, &accounts, &entries, &balances, &clock);
76        Ok(Self {
77            accounts,
78            account_sets,
79            journals,
80            tx_templates,
81            publisher,
82            transactions,
83            entries,
84            balances,
85            velocities,
86            pool,
87            clock,
88        })
89    }
90
91    pub fn pool(&self) -> &PgPool {
92        &self.pool
93    }
94
95    pub fn clock(&self) -> &ClockHandle {
96        &self.clock
97    }
98
99    pub async fn begin_operation(&self) -> Result<es_entity::DbOpWithTime<'static>, LedgerError> {
100        let db_op = es_entity::DbOp::init_with_clock(&self.pool, &self.clock)
101            .await?
102            .with_clock_time();
103        Ok(db_op)
104    }
105
106    pub fn accounts(&self) -> &Accounts {
107        &self.accounts
108    }
109
110    pub fn velocities(&self) -> &Velocities {
111        &self.velocities
112    }
113
114    pub fn account_sets(&self) -> &AccountSets {
115        &self.account_sets
116    }
117
118    pub fn journals(&self) -> &Journals {
119        &self.journals
120    }
121
122    pub fn tx_templates(&self) -> &TxTemplates {
123        &self.tx_templates
124    }
125
126    pub fn balances(&self) -> &Balances {
127        &self.balances
128    }
129
130    pub fn entries(&self) -> &Entries {
131        &self.entries
132    }
133
134    pub fn transactions(&self) -> &Transactions {
135        &self.transactions
136    }
137
138    #[instrument(
139        name = "cala_ledger.post_transaction",
140        skip(self, params),
141        fields(tx_template_code)
142    )]
143    pub async fn post_transaction(
144        &self,
145        tx_id: TransactionId,
146        tx_template_code: &str,
147        params: impl Into<Params> + std::fmt::Debug,
148    ) -> Result<Transaction, LedgerError> {
149        let mut db = es_entity::DbOp::init_with_clock(&self.pool, &self.clock).await?;
150        let transaction = self
151            .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
152            .await?;
153        db.commit().await?;
154        Ok(transaction)
155    }
156
157    #[instrument(
158        name = "cala_ledger.post_transaction_in_op",
159        skip(self, db)
160        fields(transaction_id, external_id)
161    )]
162    pub async fn post_transaction_in_op(
163        &self,
164        db: &mut impl es_entity::AtomicOperation,
165        tx_id: TransactionId,
166        tx_template_code: &str,
167        params: impl Into<Params> + std::fmt::Debug,
168    ) -> Result<Transaction, LedgerError> {
169        let mut db = es_entity::OpWithTime::cached_or_db_time(db).await?;
170        let time = db.now();
171        let prepared_tx = self
172            .tx_templates
173            .prepare_transaction_in_op(&mut db, time, tx_id, tx_template_code, params.into())
174            .await?;
175
176        let transaction = self
177            .transactions
178            .create_in_op(&mut db, prepared_tx.transaction)
179            .await?;
180
181        let span = tracing::Span::current();
182        span.record("transaction_id", transaction.id().to_string());
183        span.record("external_id", &transaction.values().external_id);
184
185        let entries = self
186            .entries
187            .create_all_in_op(&mut db, prepared_tx.entries)
188            .await?;
189
190        let account_ids = entries
191            .iter()
192            .map(|entry| entry.account_id)
193            .collect::<Vec<_>>();
194        let mappings = self
195            .account_sets
196            .fetch_mappings_in_op(&mut db, transaction.values().journal_id, &account_ids)
197            .await?;
198
199        self.velocities
200            .update_balances_with_limit_enforcement_in_op(
201                &mut db,
202                transaction.created_at(),
203                transaction.values(),
204                &entries,
205                &account_ids,
206                &mappings,
207            )
208            .await?;
209
210        self.balances
211            .update_balances_in_op(
212                &mut db,
213                transaction.journal_id(),
214                entries,
215                transaction.effective(),
216                transaction.created_at(),
217                mappings,
218            )
219            .await?;
220        Ok(transaction)
221    }
222
223    #[instrument(name = "cala_ledger.void_transaction", skip(self))]
224    pub async fn void_transaction(
225        &self,
226        voiding_tx_id: TransactionId,
227        existing_tx_id: TransactionId,
228    ) -> Result<Transaction, LedgerError> {
229        let mut db = self.begin_operation().await?;
230        let transaction = self
231            .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
232            .await?;
233        db.commit().await?;
234        Ok(transaction)
235    }
236
237    #[instrument(
238        name = "cala_ledger.transaction_void",
239        skip(self, db)
240        fields(transaction_id, external_id)
241    )]
242    pub async fn void_transaction_in_op(
243        &self,
244        db: &mut impl es_entity::AtomicOperationWithTime,
245        voiding_tx_id: TransactionId,
246        existing_tx_id: TransactionId,
247    ) -> Result<Transaction, LedgerError> {
248        let new_entries = self
249            .entries
250            .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
251            .await?;
252
253        let transaction = self
254            .transactions()
255            .create_voided_tx_in_op(
256                db,
257                voiding_tx_id,
258                existing_tx_id,
259                new_entries.iter().map(|entry| entry.id),
260            )
261            .await?;
262
263        let span = tracing::Span::current();
264        span.record("transaction_id", transaction.id().to_string());
265        span.record("external_id", &transaction.values().external_id);
266
267        let entries = self.entries.create_all_in_op(db, new_entries).await?;
268
269        let account_ids = entries
270            .iter()
271            .map(|entry| entry.account_id)
272            .collect::<Vec<_>>();
273        let mappings = self
274            .account_sets
275            .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
276            .await?;
277
278        self.velocities
279            .update_balances_with_limit_enforcement_in_op(
280                db,
281                transaction.created_at(),
282                transaction.values(),
283                &entries,
284                &account_ids,
285                &mappings,
286            )
287            .await?;
288
289        self.balances
290            .update_balances_in_op(
291                db,
292                transaction.journal_id(),
293                entries,
294                transaction.effective(),
295                transaction.created_at(),
296                mappings,
297            )
298            .await?;
299        Ok(transaction)
300    }
301
302    pub fn outbox(&self) -> &crate::outbox::ObixOutbox {
303        self.publisher.inner()
304    }
305
306    pub fn register_outbox_listener(
307        &self,
308        start_after: Option<obix::EventSequence>,
309    ) -> obix::out::PersistentOutboxListener<crate::outbox::OutboxEventPayload> {
310        self.publisher.inner().listen_persisted(start_after)
311    }
312}