cala_ledger/ledger/
mod.rs

1pub mod config;
2pub mod error;
3
4use es_entity::clock::ClockHandle;
5use sqlx::PgPool;
6use std::sync::{Arc, Mutex};
7pub use tracing::instrument;
8use tracing::Instrument;
9
10pub use config::*;
11use error::*;
12
13use crate::{
14    account::Accounts,
15    account_set::AccountSets,
16    balance::Balances,
17    entry::Entries,
18    journal::Journals,
19    outbox::{server, OutboxPublisher},
20    primitives::TransactionId,
21    transaction::{Transaction, Transactions},
22    tx_template::{Params, TxTemplates},
23    velocity::Velocities,
24};
25#[cfg(feature = "import")]
26mod import_deps {
27    pub use crate::primitives::DataSourceId;
28}
29#[cfg(feature = "import")]
30use import_deps::*;
31
32#[derive(Clone)]
33pub struct CalaLedger {
34    pool: PgPool,
35    clock: ClockHandle,
36    accounts: Accounts,
37    account_sets: AccountSets,
38    journals: Journals,
39    transactions: Transactions,
40    tx_templates: TxTemplates,
41    entries: Entries,
42    velocities: Velocities,
43    balances: Balances,
44    publisher: OutboxPublisher,
45    #[allow(clippy::type_complexity)]
46    outbox_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), LedgerError>>>>>,
47}
48
49impl CalaLedger {
50    #[instrument(name = "cala_ledger.init", skip_all)]
51    pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
52        let pool = match (config.pool, config.pg_con) {
53            (Some(pool), None) => pool,
54            (None, Some(pg_con)) => {
55                let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
56                if let Some(max_connections) = config.max_connections {
57                    pool_opts = pool_opts.max_connections(max_connections);
58                }
59                pool_opts.connect(&pg_con).await?
60            }
61            _ => {
62                return Err(LedgerError::ConfigError(
63                    "One of pg_con or pool must be set".to_string(),
64                ))
65            }
66        };
67        if config.exec_migrations {
68            sqlx::migrate!()
69                .run(&pool)
70                .instrument(tracing::info_span!("cala_ledger.migrations"))
71                .await?;
72        }
73
74        let clock = config.clock;
75        let publisher = OutboxPublisher::init(&pool, &clock).await?;
76        let mut outbox_handle = None;
77        if let Some(outbox_config) = config.outbox {
78            outbox_handle = Some(Self::start_outbox_server(
79                outbox_config,
80                publisher.inner().clone(),
81            ));
82        }
83        let accounts = Accounts::new(&pool, &publisher, &clock);
84        let journals = Journals::new(&pool, &publisher, &clock);
85        let tx_templates = TxTemplates::new(&pool, &publisher, &clock);
86        let transactions = Transactions::new(&pool, &publisher);
87        let entries = Entries::new(&pool, &publisher);
88        let balances = Balances::new(&pool, &publisher, &journals);
89        let velocities = Velocities::new(&pool, &clock);
90        let account_sets =
91            AccountSets::new(&pool, &publisher, &accounts, &entries, &balances, &clock);
92        Ok(Self {
93            accounts,
94            account_sets,
95            journals,
96            tx_templates,
97            publisher,
98            transactions,
99            entries,
100            balances,
101            velocities,
102            outbox_handle: Arc::new(Mutex::new(outbox_handle)),
103            pool,
104            clock,
105        })
106    }
107
108    pub fn pool(&self) -> &PgPool {
109        &self.pool
110    }
111
112    pub fn clock(&self) -> &ClockHandle {
113        &self.clock
114    }
115
116    pub async fn begin_operation(&self) -> Result<es_entity::DbOpWithTime<'static>, LedgerError> {
117        let db_op = es_entity::DbOp::init_with_clock(&self.pool, &self.clock)
118            .await?
119            .with_clock_time();
120        Ok(db_op)
121    }
122
123    pub fn accounts(&self) -> &Accounts {
124        &self.accounts
125    }
126
127    pub fn velocities(&self) -> &Velocities {
128        &self.velocities
129    }
130
131    pub fn account_sets(&self) -> &AccountSets {
132        &self.account_sets
133    }
134
135    pub fn journals(&self) -> &Journals {
136        &self.journals
137    }
138
139    pub fn tx_templates(&self) -> &TxTemplates {
140        &self.tx_templates
141    }
142
143    pub fn balances(&self) -> &Balances {
144        &self.balances
145    }
146
147    pub fn entries(&self) -> &Entries {
148        &self.entries
149    }
150
151    pub fn transactions(&self) -> &Transactions {
152        &self.transactions
153    }
154
155    #[instrument(
156        name = "cala_ledger.post_transaction",
157        skip(self, params),
158        fields(tx_template_code)
159    )]
160    pub async fn post_transaction(
161        &self,
162        tx_id: TransactionId,
163        tx_template_code: &str,
164        params: impl Into<Params> + std::fmt::Debug,
165    ) -> Result<Transaction, LedgerError> {
166        let mut db = es_entity::DbOp::init_with_clock(&self.pool, &self.clock).await?;
167        let transaction = self
168            .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
169            .await?;
170        db.commit().await?;
171        Ok(transaction)
172    }
173
174    #[instrument(
175        name = "cala_ledger.post_transaction_in_op",
176        skip(self, db)
177        fields(transaction_id, external_id)
178    )]
179    pub async fn post_transaction_in_op(
180        &self,
181        db: &mut impl es_entity::AtomicOperation,
182        tx_id: TransactionId,
183        tx_template_code: &str,
184        params: impl Into<Params> + std::fmt::Debug,
185    ) -> Result<Transaction, LedgerError> {
186        let mut db = es_entity::OpWithTime::cached_or_db_time(db).await?;
187        let time = db.now();
188        let prepared_tx = self
189            .tx_templates
190            .prepare_transaction_in_op(&mut db, time, tx_id, tx_template_code, params.into())
191            .await?;
192
193        let transaction = self
194            .transactions
195            .create_in_op(&mut db, prepared_tx.transaction)
196            .await?;
197
198        let span = tracing::Span::current();
199        span.record("transaction_id", transaction.id().to_string());
200        span.record("external_id", &transaction.values().external_id);
201
202        let entries = self
203            .entries
204            .create_all_in_op(&mut db, prepared_tx.entries)
205            .await?;
206
207        let account_ids = entries
208            .iter()
209            .map(|entry| entry.account_id)
210            .collect::<Vec<_>>();
211        let mappings = self
212            .account_sets
213            .fetch_mappings_in_op(&mut db, transaction.values().journal_id, &account_ids)
214            .await?;
215
216        self.velocities
217            .update_balances_with_limit_enforcement_in_op(
218                &mut db,
219                transaction.created_at(),
220                transaction.values(),
221                &entries,
222                &account_ids,
223                &mappings,
224            )
225            .await?;
226
227        self.balances
228            .update_balances_in_op(
229                &mut db,
230                transaction.journal_id(),
231                entries,
232                transaction.effective(),
233                transaction.created_at(),
234                mappings,
235            )
236            .await?;
237        Ok(transaction)
238    }
239
240    #[instrument(name = "cala_ledger.void_transaction", skip(self))]
241    pub async fn void_transaction(
242        &self,
243        voiding_tx_id: TransactionId,
244        existing_tx_id: TransactionId,
245    ) -> Result<Transaction, LedgerError> {
246        let mut db = self.begin_operation().await?;
247        let transaction = self
248            .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
249            .await?;
250        db.commit().await?;
251        Ok(transaction)
252    }
253
254    #[instrument(
255        name = "cala_ledger.transaction_void",
256        skip(self, db)
257        fields(transaction_id, external_id)
258    )]
259    pub async fn void_transaction_in_op(
260        &self,
261        db: &mut impl es_entity::AtomicOperationWithTime,
262        voiding_tx_id: TransactionId,
263        existing_tx_id: TransactionId,
264    ) -> Result<Transaction, LedgerError> {
265        let new_entries = self
266            .entries
267            .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
268            .await?;
269
270        let transaction = self
271            .transactions()
272            .create_voided_tx_in_op(
273                db,
274                voiding_tx_id,
275                existing_tx_id,
276                new_entries.iter().map(|entry| entry.id),
277            )
278            .await?;
279
280        let span = tracing::Span::current();
281        span.record("transaction_id", transaction.id().to_string());
282        span.record("external_id", &transaction.values().external_id);
283
284        let entries = self.entries.create_all_in_op(db, new_entries).await?;
285
286        let account_ids = entries
287            .iter()
288            .map(|entry| entry.account_id)
289            .collect::<Vec<_>>();
290        let mappings = self
291            .account_sets
292            .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
293            .await?;
294
295        self.velocities
296            .update_balances_with_limit_enforcement_in_op(
297                db,
298                transaction.created_at(),
299                transaction.values(),
300                &entries,
301                &account_ids,
302                &mappings,
303            )
304            .await?;
305
306        self.balances
307            .update_balances_in_op(
308                db,
309                transaction.journal_id(),
310                entries,
311                transaction.effective(),
312                transaction.created_at(),
313                mappings,
314            )
315            .await?;
316        Ok(transaction)
317    }
318
319    pub fn outbox(&self) -> &crate::outbox::ObixOutbox {
320        self.publisher.inner()
321    }
322
323    pub fn register_outbox_listener(
324        &self,
325        start_after: Option<obix::EventSequence>,
326    ) -> obix::out::PersistentOutboxListener<crate::outbox::OutboxEventPayload> {
327        self.publisher.inner().listen_persisted(start_after)
328    }
329
330    #[cfg(feature = "import")]
331    #[instrument(name = "cala_ledger.sync_outbox_event", skip(self, db))]
332    pub async fn sync_outbox_event(
333        &self,
334        db: es_entity::DbOp<'_>,
335        origin: DataSourceId,
336        event: obix::out::PersistentOutboxEvent<crate::outbox::OutboxEventPayload>,
337    ) -> Result<(), LedgerError> {
338        use crate::outbox::OutboxEventPayload::*;
339        use es_entity::WithEventContext;
340
341        let Some(payload) = event.payload else {
342            return Ok(());
343        };
344
345        match payload {
346            Empty => (),
347            AccountCreated { account, .. } => {
348                let op = db.with_time(event.recorded_at);
349                self.accounts
350                    .sync_account_creation(op, origin, account)
351                    .await?
352            }
353            AccountUpdated {
354                account, fields, ..
355            } => {
356                let data = {
357                    let mut ctx = es_entity::context::EventContext::current();
358                    let _ = ctx.insert("data_source", &origin);
359                    ctx.data()
360                };
361                let op = db.with_time(event.recorded_at);
362                self.accounts
363                    .sync_account_update(op, account, fields)
364                    .with_event_context(data)
365                    .await?
366            }
367            AccountSetCreated { account_set, .. } => {
368                let op = db.with_time(event.recorded_at);
369                self.account_sets
370                    .sync_account_set_creation(op, origin, account_set)
371                    .await?
372            }
373            AccountSetUpdated {
374                account_set,
375                fields,
376                ..
377            } => {
378                let data = {
379                    let mut ctx = es_entity::context::EventContext::current();
380                    let _ = ctx.insert("data_source", &origin);
381                    ctx.data()
382                };
383                let op = db.with_time(event.recorded_at);
384                self.account_sets
385                    .sync_account_set_update(op, account_set, fields)
386                    .with_event_context(data)
387                    .await?
388            }
389            AccountSetMemberCreated {
390                account_set_id,
391                member_id,
392                ..
393            } => {
394                let op = db.with_time(event.recorded_at);
395                self.account_sets
396                    .sync_account_set_member_creation(op, origin, account_set_id, member_id)
397                    .await?
398            }
399            AccountSetMemberRemoved {
400                account_set_id,
401                member_id,
402                ..
403            } => {
404                let op = db.with_time(event.recorded_at);
405                self.account_sets
406                    .sync_account_set_member_removal(op, origin, account_set_id, member_id)
407                    .await?
408            }
409            JournalCreated { journal, .. } => {
410                let op = db.with_time(event.recorded_at);
411                self.journals
412                    .sync_journal_creation(op, origin, journal)
413                    .await?
414            }
415            JournalUpdated {
416                journal, fields, ..
417            } => {
418                let data = {
419                    let mut ctx = es_entity::context::EventContext::current();
420                    let _ = ctx.insert("data_source", &origin);
421                    ctx.data()
422                };
423                let op = db.with_time(event.recorded_at);
424                self.journals
425                    .sync_journal_update(op, journal, fields)
426                    .with_event_context(data)
427                    .await?
428            }
429            TransactionCreated { transaction, .. } => {
430                let op = db.with_time(event.recorded_at);
431                self.transactions
432                    .sync_transaction_creation(op, origin, transaction)
433                    .await?
434            }
435            TransactionUpdated { transaction, .. } => {
436                let data = {
437                    let mut ctx = es_entity::context::EventContext::current();
438                    let _ = ctx.insert("data_source", &origin);
439                    ctx.data()
440                };
441                let op = db.with_time(event.recorded_at);
442                self.transactions
443                    .sync_transaction_update(op, origin, transaction)
444                    .with_event_context(data)
445                    .await?
446            }
447            TxTemplateCreated { tx_template, .. } => {
448                let op = db.with_time(event.recorded_at);
449                self.tx_templates
450                    .sync_tx_template_creation(op, origin, tx_template)
451                    .await?
452            }
453            EntryCreated { entry, .. } => {
454                let op = db.with_time(event.recorded_at);
455                self.entries.sync_entry_creation(op, origin, entry).await?
456            }
457            BalanceCreated { balance, .. } => {
458                let op = db.with_time(event.recorded_at);
459                self.balances
460                    .sync_balance_creation(op, origin, balance)
461                    .await?
462            }
463            BalanceUpdated { balance, .. } => {
464                let op = db.with_time(event.recorded_at);
465                self.balances
466                    .sync_balance_update(op, origin, balance)
467                    .await?
468            }
469        }
470        Ok(())
471    }
472
473    pub async fn await_outbox_handle(&self) -> Result<(), LedgerError> {
474        let handle = { self.outbox_handle.lock().expect("poisened mutex").take() };
475        if let Some(handle) = handle {
476            return handle.await.expect("Couldn't await outbox handle");
477        }
478        Ok(())
479    }
480
481    pub fn shutdown_outbox(&mut self) -> Result<(), LedgerError> {
482        if let Some(handle) = self.outbox_handle.lock().expect("poisened mutex").take() {
483            handle.abort();
484        }
485        Ok(())
486    }
487
488    #[instrument(name = "cala_ledger.start_outbox_server", skip(outbox))]
489    fn start_outbox_server(
490        config: server::OutboxServerConfig,
491        outbox: crate::outbox::ObixOutbox,
492    ) -> tokio::task::JoinHandle<Result<(), LedgerError>> {
493        tokio::spawn(async move {
494            server::start(config, outbox).await?;
495            Ok(())
496        })
497    }
498}