cala_ledger/ledger/
mod.rs

1pub mod config;
2pub mod error;
3
4use sqlx::PgPool;
5use std::sync::{Arc, Mutex};
6pub use tracing::instrument;
7
8pub use config::*;
9use error::*;
10
11use crate::{
12    account::Accounts,
13    account_set::AccountSets,
14    balance::Balances,
15    entry::Entries,
16    journal::Journals,
17    ledger_operation::*,
18    outbox::{server, EventSequence, Outbox, OutboxListener},
19    primitives::TransactionId,
20    transaction::{Transaction, Transactions},
21    tx_template::{Params, TxTemplates},
22    velocity::Velocities,
23};
24#[cfg(feature = "import")]
25mod import_deps {
26    pub use crate::primitives::DataSourceId;
27    pub use cala_types::outbox::OutboxEvent;
28}
29#[cfg(feature = "import")]
30use import_deps::*;
31
32#[derive(Clone)]
33pub struct CalaLedger {
34    pool: PgPool,
35    accounts: Accounts,
36    account_sets: AccountSets,
37    journals: Journals,
38    transactions: Transactions,
39    tx_templates: TxTemplates,
40    entries: Entries,
41    velocities: Velocities,
42    balances: Balances,
43    outbox: Outbox,
44    #[allow(clippy::type_complexity)]
45    outbox_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), LedgerError>>>>>,
46}
47
48impl CalaLedger {
49    pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
50        let pool = match (config.pool, config.pg_con) {
51            (Some(pool), None) => pool,
52            (None, Some(pg_con)) => {
53                let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
54                if let Some(max_connections) = config.max_connections {
55                    pool_opts = pool_opts.max_connections(max_connections);
56                }
57                pool_opts.connect(&pg_con).await?
58            }
59            _ => {
60                return Err(LedgerError::ConfigError(
61                    "One of pg_con or pool must be set".to_string(),
62                ))
63            }
64        };
65        if config.exec_migrations {
66            sqlx::migrate!().run(&pool).await?;
67        }
68
69        let outbox = Outbox::init(&pool).await?;
70        let mut outbox_handle = None;
71        if let Some(outbox_config) = config.outbox {
72            outbox_handle = Some(Self::start_outbox_server(outbox_config, outbox.clone()));
73        }
74
75        let accounts = Accounts::new(&pool, outbox.clone());
76        let journals = Journals::new(&pool, outbox.clone());
77        let tx_templates = TxTemplates::new(&pool, outbox.clone());
78        let transactions = Transactions::new(&pool, outbox.clone());
79        let entries = Entries::new(&pool, outbox.clone());
80        let balances = Balances::new(&pool, outbox.clone(), &journals);
81        let velocities = Velocities::new(&pool, outbox.clone());
82        let account_sets = AccountSets::new(&pool, outbox.clone(), &accounts, &entries, &balances);
83        Ok(Self {
84            accounts,
85            account_sets,
86            journals,
87            tx_templates,
88            outbox,
89            transactions,
90            entries,
91            balances,
92            velocities,
93            outbox_handle: Arc::new(Mutex::new(outbox_handle)),
94            pool,
95        })
96    }
97
98    pub fn pool(&self) -> &PgPool {
99        &self.pool
100    }
101
102    pub async fn begin_operation(&self) -> Result<LedgerOperation<'static>, LedgerError> {
103        Ok(LedgerOperation::init(&self.pool, &self.outbox).await?)
104    }
105
106    pub fn ledger_operation_from_db_op<'a>(
107        &self,
108        db_op: es_entity::DbOpWithTime<'a>,
109    ) -> LedgerOperation<'a> {
110        LedgerOperation::new(db_op, &self.outbox)
111    }
112
113    // ledger.accounts().create()
114    // ledger.create_account()
115    pub fn accounts(&self) -> &Accounts {
116        &self.accounts
117    }
118
119    pub fn velocities(&self) -> &Velocities {
120        &self.velocities
121    }
122
123    pub fn account_sets(&self) -> &AccountSets {
124        &self.account_sets
125    }
126
127    pub fn journals(&self) -> &Journals {
128        &self.journals
129    }
130
131    pub fn tx_templates(&self) -> &TxTemplates {
132        &self.tx_templates
133    }
134
135    pub fn balances(&self) -> &Balances {
136        &self.balances
137    }
138
139    pub fn entries(&self) -> &Entries {
140        &self.entries
141    }
142
143    pub fn transactions(&self) -> &Transactions {
144        &self.transactions
145    }
146
147    pub async fn post_transaction(
148        &self,
149        tx_id: TransactionId,
150        tx_template_code: &str,
151        params: impl Into<Params> + std::fmt::Debug,
152    ) -> Result<Transaction, LedgerError> {
153        let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
154        let transaction = self
155            .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
156            .await?;
157        db.commit().await?;
158        Ok(transaction)
159    }
160
161    #[instrument(
162        name = "cala_ledger.transaction_post",
163        skip(self, db)
164        fields(transaction_id, external_id)
165        err
166    )]
167    pub async fn post_transaction_in_op(
168        &self,
169        db: &mut LedgerOperation<'_>,
170        tx_id: TransactionId,
171        tx_template_code: &str,
172        params: impl Into<Params> + std::fmt::Debug,
173    ) -> Result<Transaction, LedgerError> {
174        let prepared_tx = self
175            .tx_templates
176            .prepare_transaction(db.now(), tx_id, tx_template_code, params.into())
177            .await?;
178
179        let transaction = self
180            .transactions
181            .create_in_op(db, prepared_tx.transaction)
182            .await?;
183
184        let span = tracing::Span::current();
185        span.record("transaction_id", transaction.id().to_string());
186        span.record("external_id", &transaction.values().external_id);
187
188        let entries = self
189            .entries
190            .create_all_in_op(db, prepared_tx.entries)
191            .await?;
192
193        let account_ids = entries
194            .iter()
195            .map(|entry| entry.account_id)
196            .collect::<Vec<_>>();
197        let mappings = self
198            .account_sets
199            .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
200            .await?;
201
202        self.velocities
203            .update_balances_with_limit_enforcement_in_op(
204                db,
205                transaction.created_at(),
206                transaction.values(),
207                &entries,
208                &account_ids,
209                &mappings,
210            )
211            .await?;
212
213        self.balances
214            .update_balances_in_op(
215                db,
216                transaction.journal_id(),
217                entries,
218                transaction.effective(),
219                transaction.created_at(),
220                mappings,
221            )
222            .await?;
223        Ok(transaction)
224    }
225
226    pub async fn void_transaction(
227        &self,
228        voiding_tx_id: TransactionId,
229        existing_tx_id: TransactionId,
230    ) -> Result<Transaction, LedgerError> {
231        let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
232        let transaction = self
233            .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
234            .await?;
235        db.commit().await?;
236        Ok(transaction)
237    }
238
239    #[instrument(
240        name = "cala_ledger.transaction_void",
241        skip(self, db)
242        fields(transaction_id, external_id)
243        err
244    )]
245    pub async fn void_transaction_in_op(
246        &self,
247        db: &mut LedgerOperation<'_>,
248        voiding_tx_id: TransactionId,
249        existing_tx_id: TransactionId,
250    ) -> Result<Transaction, LedgerError> {
251        let new_entries = self
252            .entries
253            .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
254            .await?;
255
256        let transaction = self
257            .transactions()
258            .create_voided_tx_in_op(
259                db,
260                voiding_tx_id,
261                existing_tx_id,
262                new_entries.iter().map(|entry| entry.id),
263            )
264            .await?;
265
266        let span = tracing::Span::current();
267        span.record("transaction_id", transaction.id().to_string());
268        span.record("external_id", &transaction.values().external_id);
269
270        let entries = self.entries.create_all_in_op(db, new_entries).await?;
271
272        let account_ids = entries
273            .iter()
274            .map(|entry| entry.account_id)
275            .collect::<Vec<_>>();
276        let mappings = self
277            .account_sets
278            .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
279            .await?;
280
281        self.velocities
282            .update_balances_with_limit_enforcement_in_op(
283                db,
284                transaction.created_at(),
285                transaction.values(),
286                &entries,
287                &account_ids,
288                &mappings,
289            )
290            .await?;
291
292        self.balances
293            .update_balances_in_op(
294                db,
295                transaction.journal_id(),
296                entries,
297                transaction.effective(),
298                transaction.created_at(),
299                mappings,
300            )
301            .await?;
302        Ok(transaction)
303    }
304
305    pub async fn register_outbox_listener(
306        &self,
307        start_after: Option<EventSequence>,
308    ) -> Result<OutboxListener, LedgerError> {
309        Ok(self.outbox.register_listener(start_after).await?)
310    }
311
312    #[cfg(feature = "import")]
313    #[instrument(name = "cala_ledger.sync_outbox_event", skip(self, db))]
314    pub async fn sync_outbox_event(
315        &self,
316        db: sqlx::Transaction<'_, sqlx::Postgres>,
317        origin: DataSourceId,
318        event: OutboxEvent,
319    ) -> Result<(), LedgerError> {
320        use crate::outbox::OutboxEventPayload::*;
321
322        match event.payload {
323            Empty => (),
324            AccountCreated { account, .. } => {
325                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
326                self.accounts
327                    .sync_account_creation(op, origin, account)
328                    .await?
329            }
330            AccountUpdated {
331                account, fields, ..
332            } => {
333                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
334                self.accounts
335                    .sync_account_update(op, account, fields)
336                    .await?
337            }
338            AccountSetCreated { account_set, .. } => {
339                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
340                self.account_sets
341                    .sync_account_set_creation(op, origin, account_set)
342                    .await?
343            }
344            AccountSetUpdated {
345                account_set,
346                fields,
347                ..
348            } => {
349                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
350                self.account_sets
351                    .sync_account_set_update(op, account_set, fields)
352                    .await?
353            }
354            AccountSetMemberCreated {
355                account_set_id,
356                member_id,
357                ..
358            } => {
359                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
360                self.account_sets
361                    .sync_account_set_member_creation(op, origin, account_set_id, member_id)
362                    .await?
363            }
364            AccountSetMemberRemoved {
365                account_set_id,
366                member_id,
367                ..
368            } => {
369                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
370                self.account_sets
371                    .sync_account_set_member_removal(op, origin, account_set_id, member_id)
372                    .await?
373            }
374            JournalCreated { journal, .. } => {
375                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
376                self.journals
377                    .sync_journal_creation(op, origin, journal)
378                    .await?
379            }
380            JournalUpdated {
381                journal, fields, ..
382            } => {
383                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
384                self.journals
385                    .sync_journal_update(op, journal, fields)
386                    .await?
387            }
388            TransactionCreated { transaction, .. } => {
389                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
390                self.transactions
391                    .sync_transaction_creation(op, origin, transaction)
392                    .await?
393            }
394            TransactionUpdated { transaction, .. } => {
395                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
396                self.transactions
397                    .sync_transaction_update(op, origin, transaction)
398                    .await?
399            }
400            TxTemplateCreated { tx_template, .. } => {
401                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
402                self.tx_templates
403                    .sync_tx_template_creation(op, origin, tx_template)
404                    .await?
405            }
406            EntryCreated { entry, .. } => {
407                let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
408                self.entries.sync_entry_creation(op, origin, entry).await?
409            }
410            BalanceCreated { balance, .. } => {
411                self.balances
412                    .sync_balance_creation(db, origin, balance)
413                    .await?
414            }
415            BalanceUpdated { balance, .. } => {
416                self.balances
417                    .sync_balance_update(db, origin, balance)
418                    .await?
419            }
420        }
421        Ok(())
422    }
423
424    pub async fn await_outbox_handle(&self) -> Result<(), LedgerError> {
425        let handle = { self.outbox_handle.lock().expect("poisened mutex").take() };
426        if let Some(handle) = handle {
427            return handle.await.expect("Couldn't await outbox handle");
428        }
429        Ok(())
430    }
431
432    pub fn shutdown_outbox(&mut self) -> Result<(), LedgerError> {
433        if let Some(handle) = self.outbox_handle.lock().expect("poisened mutex").take() {
434            handle.abort();
435        }
436        Ok(())
437    }
438
439    fn start_outbox_server(
440        config: server::OutboxServerConfig,
441        outbox: Outbox,
442    ) -> tokio::task::JoinHandle<Result<(), LedgerError>> {
443        tokio::spawn(async move {
444            server::start(config, outbox).await?;
445            Ok(())
446        })
447    }
448}