cala_ledger/ledger/
mod.rs

1pub mod config;
2pub mod error;
3
4use sqlx::PgPool;
5use std::sync::{Arc, Mutex};
6pub use tracing::instrument;
7use tracing::Instrument;
8
9pub use config::*;
10use error::*;
11
12use crate::{
13    account::Accounts,
14    account_set::AccountSets,
15    balance::Balances,
16    entry::Entries,
17    journal::Journals,
18    ledger_operation::*,
19    outbox::{server, EventSequence, Outbox, OutboxListener},
20    primitives::TransactionId,
21    transaction::{Transaction, Transactions},
22    tx_template::{Params, TxTemplates},
23    velocity::Velocities,
24};
25#[cfg(feature = "import")]
26mod import_deps {
27    pub use crate::primitives::DataSourceId;
28    pub use cala_types::outbox::OutboxEvent;
29}
30#[cfg(feature = "import")]
31use import_deps::*;
32
33#[derive(Clone)]
34pub struct CalaLedger {
35    pool: PgPool,
36    accounts: Accounts,
37    account_sets: AccountSets,
38    journals: Journals,
39    transactions: Transactions,
40    tx_templates: TxTemplates,
41    entries: Entries,
42    velocities: Velocities,
43    balances: Balances,
44    outbox: Outbox,
45    #[allow(clippy::type_complexity)]
46    outbox_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), LedgerError>>>>>,
47}
48
49impl CalaLedger {
50    #[instrument(name = "cala_ledger.init", skip_all)]
51    pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
52        let pool = match (config.pool, config.pg_con) {
53            (Some(pool), None) => pool,
54            (None, Some(pg_con)) => {
55                let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
56                if let Some(max_connections) = config.max_connections {
57                    pool_opts = pool_opts.max_connections(max_connections);
58                }
59                pool_opts.connect(&pg_con).await?
60            }
61            _ => {
62                return Err(LedgerError::ConfigError(
63                    "One of pg_con or pool must be set".to_string(),
64                ))
65            }
66        };
67        if config.exec_migrations {
68            sqlx::migrate!()
69                .run(&pool)
70                .instrument(tracing::info_span!("cala_ledger.migrations"))
71                .await?;
72        }
73
74        let outbox = Outbox::init(&pool).await?;
75        let mut outbox_handle = None;
76        if let Some(outbox_config) = config.outbox {
77            outbox_handle = Some(Self::start_outbox_server(outbox_config, outbox.clone()));
78        }
79
80        let accounts = Accounts::new(&pool, outbox.clone());
81        let journals = Journals::new(&pool, outbox.clone());
82        let tx_templates = TxTemplates::new(&pool, outbox.clone());
83        let transactions = Transactions::new(&pool, outbox.clone());
84        let entries = Entries::new(&pool, outbox.clone());
85        let balances = Balances::new(&pool, outbox.clone(), &journals);
86        let velocities = Velocities::new(&pool, outbox.clone());
87        let account_sets = AccountSets::new(&pool, outbox.clone(), &accounts, &entries, &balances);
88        Ok(Self {
89            accounts,
90            account_sets,
91            journals,
92            tx_templates,
93            outbox,
94            transactions,
95            entries,
96            balances,
97            velocities,
98            outbox_handle: Arc::new(Mutex::new(outbox_handle)),
99            pool,
100        })
101    }
102
103    pub fn pool(&self) -> &PgPool {
104        &self.pool
105    }
106
107    pub async fn begin_operation(&self) -> Result<LedgerOperation<'static>, LedgerError> {
108        Ok(LedgerOperation::init(&self.pool, &self.outbox).await?)
109    }
110
111    pub fn ledger_operation_from_db_op<'a>(
112        &self,
113        db_op: es_entity::DbOpWithTime<'a>,
114    ) -> LedgerOperation<'a> {
115        LedgerOperation::new(db_op, &self.outbox)
116    }
117
118    pub fn accounts(&self) -> &Accounts {
119        &self.accounts
120    }
121
122    pub fn velocities(&self) -> &Velocities {
123        &self.velocities
124    }
125
126    pub fn account_sets(&self) -> &AccountSets {
127        &self.account_sets
128    }
129
130    pub fn journals(&self) -> &Journals {
131        &self.journals
132    }
133
134    pub fn tx_templates(&self) -> &TxTemplates {
135        &self.tx_templates
136    }
137
138    pub fn balances(&self) -> &Balances {
139        &self.balances
140    }
141
142    pub fn entries(&self) -> &Entries {
143        &self.entries
144    }
145
146    pub fn transactions(&self) -> &Transactions {
147        &self.transactions
148    }
149
150    #[instrument(
151        name = "cala_ledger.post_transaction",
152        skip(self, params),
153        fields(tx_template_code)
154    )]
155    pub async fn post_transaction(
156        &self,
157        tx_id: TransactionId,
158        tx_template_code: &str,
159        params: impl Into<Params> + std::fmt::Debug,
160    ) -> Result<Transaction, LedgerError> {
161        let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
162        let transaction = self
163            .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
164            .await?;
165        db.commit().await?;
166        Ok(transaction)
167    }
168
169    #[instrument(
170        name = "cala_ledger.post_transaction_in_op",
171        skip(self, db)
172        fields(transaction_id, external_id)
173    )]
174    pub async fn post_transaction_in_op(
175        &self,
176        db: &mut LedgerOperation<'_>,
177        tx_id: TransactionId,
178        tx_template_code: &str,
179        params: impl Into<Params> + std::fmt::Debug,
180    ) -> Result<Transaction, LedgerError> {
181        let prepared_tx = self
182            .tx_templates
183            .prepare_transaction_in_op(db, tx_id, tx_template_code, params.into())
184            .await?;
185
186        let transaction = self
187            .transactions
188            .create_in_op(db, prepared_tx.transaction)
189            .await?;
190
191        let span = tracing::Span::current();
192        span.record("transaction_id", transaction.id().to_string());
193        span.record("external_id", &transaction.values().external_id);
194
195        let entries = self
196            .entries
197            .create_all_in_op(db, prepared_tx.entries)
198            .await?;
199
200        let account_ids = entries
201            .iter()
202            .map(|entry| entry.account_id)
203            .collect::<Vec<_>>();
204        let mappings = self
205            .account_sets
206            .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
207            .await?;
208
209        self.velocities
210            .update_balances_with_limit_enforcement_in_op(
211                db,
212                transaction.created_at(),
213                transaction.values(),
214                &entries,
215                &account_ids,
216                &mappings,
217            )
218            .await?;
219
220        self.balances
221            .update_balances_in_op(
222                db,
223                transaction.journal_id(),
224                entries,
225                transaction.effective(),
226                transaction.created_at(),
227                mappings,
228            )
229            .await?;
230        Ok(transaction)
231    }
232
233    #[instrument(name = "cala_ledger.void_transaction", skip(self))]
234    pub async fn void_transaction(
235        &self,
236        voiding_tx_id: TransactionId,
237        existing_tx_id: TransactionId,
238    ) -> Result<Transaction, LedgerError> {
239        let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
240        let transaction = self
241            .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
242            .await?;
243        db.commit().await?;
244        Ok(transaction)
245    }
246
247    #[instrument(
248        name = "cala_ledger.transaction_void",
249        skip(self, db)
250        fields(transaction_id, external_id)
251    )]
252    pub async fn void_transaction_in_op(
253        &self,
254        db: &mut LedgerOperation<'_>,
255        voiding_tx_id: TransactionId,
256        existing_tx_id: TransactionId,
257    ) -> Result<Transaction, LedgerError> {
258        let new_entries = self
259            .entries
260            .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
261            .await?;
262
263        let transaction = self
264            .transactions()
265            .create_voided_tx_in_op(
266                db,
267                voiding_tx_id,
268                existing_tx_id,
269                new_entries.iter().map(|entry| entry.id),
270            )
271            .await?;
272
273        let span = tracing::Span::current();
274        span.record("transaction_id", transaction.id().to_string());
275        span.record("external_id", &transaction.values().external_id);
276
277        let entries = self.entries.create_all_in_op(db, new_entries).await?;
278
279        let account_ids = entries
280            .iter()
281            .map(|entry| entry.account_id)
282            .collect::<Vec<_>>();
283        let mappings = self
284            .account_sets
285            .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
286            .await?;
287
288        self.velocities
289            .update_balances_with_limit_enforcement_in_op(
290                db,
291                transaction.created_at(),
292                transaction.values(),
293                &entries,
294                &account_ids,
295                &mappings,
296            )
297            .await?;
298
299        self.balances
300            .update_balances_in_op(
301                db,
302                transaction.journal_id(),
303                entries,
304                transaction.effective(),
305                transaction.created_at(),
306                mappings,
307            )
308            .await?;
309        Ok(transaction)
310    }
311
312    pub async fn register_outbox_listener(
313        &self,
314        start_after: Option<EventSequence>,
315    ) -> Result<OutboxListener, LedgerError> {
316        Ok(self.outbox.register_listener(start_after).await?)
317    }
318
319    #[cfg(feature = "import")]
320    #[instrument(name = "cala_ledger.sync_outbox_event", skip(self, db))]
321    pub async fn sync_outbox_event(
322        &self,
323        db: sqlx::Transaction<'_, sqlx::Postgres>,
324        origin: DataSourceId,
325        event: OutboxEvent,
326    ) -> Result<(), LedgerError> {
327        use crate::outbox::OutboxEventPayload::*;
328
329        match event.payload {
330            Empty => (),
331            AccountCreated { account, .. } => {
332                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
333                self.accounts
334                    .sync_account_creation(op, origin, account)
335                    .await?
336            }
337            AccountUpdated {
338                account, fields, ..
339            } => {
340                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
341                self.accounts
342                    .sync_account_update(op, account, fields)
343                    .await?
344            }
345            AccountSetCreated { account_set, .. } => {
346                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
347                self.account_sets
348                    .sync_account_set_creation(op, origin, account_set)
349                    .await?
350            }
351            AccountSetUpdated {
352                account_set,
353                fields,
354                ..
355            } => {
356                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
357                self.account_sets
358                    .sync_account_set_update(op, account_set, fields)
359                    .await?
360            }
361            AccountSetMemberCreated {
362                account_set_id,
363                member_id,
364                ..
365            } => {
366                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
367                self.account_sets
368                    .sync_account_set_member_creation(op, origin, account_set_id, member_id)
369                    .await?
370            }
371            AccountSetMemberRemoved {
372                account_set_id,
373                member_id,
374                ..
375            } => {
376                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
377                self.account_sets
378                    .sync_account_set_member_removal(op, origin, account_set_id, member_id)
379                    .await?
380            }
381            JournalCreated { journal, .. } => {
382                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
383                self.journals
384                    .sync_journal_creation(op, origin, journal)
385                    .await?
386            }
387            JournalUpdated {
388                journal, fields, ..
389            } => {
390                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
391                self.journals
392                    .sync_journal_update(op, journal, fields)
393                    .await?
394            }
395            TransactionCreated { transaction, .. } => {
396                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
397                self.transactions
398                    .sync_transaction_creation(op, origin, transaction)
399                    .await?
400            }
401            TransactionUpdated { transaction, .. } => {
402                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
403                self.transactions
404                    .sync_transaction_update(op, origin, transaction)
405                    .await?
406            }
407            TxTemplateCreated { tx_template, .. } => {
408                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
409                self.tx_templates
410                    .sync_tx_template_creation(op, origin, tx_template)
411                    .await?
412            }
413            EntryCreated { entry, .. } => {
414                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
415                self.entries.sync_entry_creation(op, origin, entry).await?
416            }
417            BalanceCreated { balance, .. } => {
418                self.balances
419                    .sync_balance_creation(db, origin, balance)
420                    .await?
421            }
422            BalanceUpdated { balance, .. } => {
423                self.balances
424                    .sync_balance_update(db, origin, balance)
425                    .await?
426            }
427        }
428        Ok(())
429    }
430
431    pub async fn await_outbox_handle(&self) -> Result<(), LedgerError> {
432        let handle = { self.outbox_handle.lock().expect("poisened mutex").take() };
433        if let Some(handle) = handle {
434            return handle.await.expect("Couldn't await outbox handle");
435        }
436        Ok(())
437    }
438
439    pub fn shutdown_outbox(&mut self) -> Result<(), LedgerError> {
440        if let Some(handle) = self.outbox_handle.lock().expect("poisened mutex").take() {
441            handle.abort();
442        }
443        Ok(())
444    }
445
446    #[instrument(name = "cala_ledger.start_outbox_server", skip(outbox))]
447    fn start_outbox_server(
448        config: server::OutboxServerConfig,
449        outbox: Outbox,
450    ) -> tokio::task::JoinHandle<Result<(), LedgerError>> {
451        tokio::spawn(async move {
452            server::start(config, outbox).await?;
453            Ok(())
454        })
455    }
456}