cala_ledger/ledger/
mod.rs

1pub mod config;
2pub mod error;
3
4use sqlx::PgPool;
5use std::sync::{Arc, Mutex};
6pub use tracing::instrument;
7
8pub use config::*;
9use error::*;
10
11use crate::{
12    account::Accounts,
13    account_set::AccountSets,
14    balance::Balances,
15    entry::Entries,
16    journal::Journals,
17    ledger_operation::*,
18    outbox::{server, EventSequence, Outbox, OutboxListener},
19    primitives::TransactionId,
20    transaction::{Transaction, Transactions},
21    tx_template::{Params, TxTemplates},
22    velocity::Velocities,
23};
24#[cfg(feature = "import")]
25mod import_deps {
26    pub use crate::primitives::DataSourceId;
27    pub use cala_types::outbox::OutboxEvent;
28}
29#[cfg(feature = "import")]
30use import_deps::*;
31
32#[derive(Clone)]
33pub struct CalaLedger {
34    pool: PgPool,
35    accounts: Accounts,
36    account_sets: AccountSets,
37    journals: Journals,
38    transactions: Transactions,
39    tx_templates: TxTemplates,
40    entries: Entries,
41    velocities: Velocities,
42    balances: Balances,
43    outbox: Outbox,
44    #[allow(clippy::type_complexity)]
45    outbox_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), LedgerError>>>>>,
46}
47
48impl CalaLedger {
49    pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
50        let pool = match (config.pool, config.pg_con) {
51            (Some(pool), None) => pool,
52            (None, Some(pg_con)) => {
53                let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
54                if let Some(max_connections) = config.max_connections {
55                    pool_opts = pool_opts.max_connections(max_connections);
56                }
57                pool_opts.connect(&pg_con).await?
58            }
59            _ => {
60                return Err(LedgerError::ConfigError(
61                    "One of pg_con or pool must be set".to_string(),
62                ))
63            }
64        };
65        if config.exec_migrations {
66            sqlx::migrate!().run(&pool).await?;
67        }
68
69        let outbox = Outbox::init(&pool).await?;
70        let mut outbox_handle = None;
71        if let Some(outbox_config) = config.outbox {
72            outbox_handle = Some(Self::start_outbox_server(outbox_config, outbox.clone()));
73        }
74
75        let accounts = Accounts::new(&pool, outbox.clone());
76        let journals = Journals::new(&pool, outbox.clone());
77        let tx_templates = TxTemplates::new(&pool, outbox.clone());
78        let transactions = Transactions::new(&pool, outbox.clone());
79        let entries = Entries::new(&pool, outbox.clone());
80        let balances = Balances::new(&pool, outbox.clone(), &journals);
81        let velocities = Velocities::new(&pool, outbox.clone());
82        let account_sets = AccountSets::new(&pool, outbox.clone(), &accounts, &entries, &balances);
83        Ok(Self {
84            accounts,
85            account_sets,
86            journals,
87            tx_templates,
88            outbox,
89            transactions,
90            entries,
91            balances,
92            velocities,
93            outbox_handle: Arc::new(Mutex::new(outbox_handle)),
94            pool,
95        })
96    }
97
98    pub fn pool(&self) -> &PgPool {
99        &self.pool
100    }
101
102    pub async fn begin_operation<'a>(&self) -> Result<LedgerOperation<'a>, LedgerError> {
103        Ok(LedgerOperation::init(&self.pool, &self.outbox).await?)
104    }
105
106    pub fn ledger_operation_from_db_op<'a>(
107        &self,
108        db_op: es_entity::DbOp<'a>,
109    ) -> LedgerOperation<'a> {
110        LedgerOperation::new(db_op, &self.outbox)
111    }
112
113    // ledger.accounts().create()
114    // ledger.create_account()
115    pub fn accounts(&self) -> &Accounts {
116        &self.accounts
117    }
118
119    pub fn velocities(&self) -> &Velocities {
120        &self.velocities
121    }
122
123    pub fn account_sets(&self) -> &AccountSets {
124        &self.account_sets
125    }
126
127    pub fn journals(&self) -> &Journals {
128        &self.journals
129    }
130
131    pub fn tx_templates(&self) -> &TxTemplates {
132        &self.tx_templates
133    }
134
135    pub fn balances(&self) -> &Balances {
136        &self.balances
137    }
138
139    pub fn entries(&self) -> &Entries {
140        &self.entries
141    }
142
143    pub fn transactions(&self) -> &Transactions {
144        &self.transactions
145    }
146
147    pub async fn post_transaction(
148        &self,
149        tx_id: TransactionId,
150        tx_template_code: &str,
151        params: impl Into<Params> + std::fmt::Debug,
152    ) -> Result<Transaction, LedgerError> {
153        let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
154        let transaction = self
155            .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
156            .await?;
157        db.commit().await?;
158        Ok(transaction)
159    }
160
161    #[instrument(
162        name = "cala_ledger.transaction_post",
163        skip(self, db)
164        fields(transaction_id, external_id)
165        err
166    )]
167    pub async fn post_transaction_in_op(
168        &self,
169        db: &mut LedgerOperation<'_>,
170        tx_id: TransactionId,
171        tx_template_code: &str,
172        params: impl Into<Params> + std::fmt::Debug,
173    ) -> Result<Transaction, LedgerError> {
174        let prepared_tx = self
175            .tx_templates
176            .prepare_transaction(db.op().now(), tx_id, tx_template_code, params.into())
177            .await?;
178
179        let transaction = self
180            .transactions
181            .create_in_op(db, prepared_tx.transaction)
182            .await?;
183
184        let span = tracing::Span::current();
185        span.record("transaction_id", transaction.id().to_string());
186        span.record("external_id", &transaction.values().external_id);
187
188        let entries = self
189            .entries
190            .create_all_in_op(db, prepared_tx.entries)
191            .await?;
192
193        let account_ids = entries
194            .iter()
195            .map(|entry| entry.account_id)
196            .collect::<Vec<_>>();
197        let mappings = self
198            .account_sets
199            .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
200            .await?;
201
202        self.velocities
203            .update_balances_in_op(
204                db,
205                transaction.created_at(),
206                transaction.values(),
207                &entries,
208                &account_ids,
209            )
210            .await?;
211
212        self.balances
213            .update_balances_in_op(
214                db,
215                transaction.journal_id(),
216                entries,
217                transaction.effective(),
218                transaction.created_at(),
219                mappings,
220            )
221            .await?;
222        Ok(transaction)
223    }
224
225    pub async fn register_outbox_listener(
226        &self,
227        start_after: Option<EventSequence>,
228    ) -> Result<OutboxListener, LedgerError> {
229        Ok(self.outbox.register_listener(start_after).await?)
230    }
231
232    #[cfg(feature = "import")]
233    #[instrument(name = "cala_ledger.sync_outbox_event", skip(self, db))]
234    pub async fn sync_outbox_event(
235        &self,
236        db: sqlx::Transaction<'_, sqlx::Postgres>,
237        origin: DataSourceId,
238        event: OutboxEvent,
239    ) -> Result<(), LedgerError> {
240        use crate::outbox::OutboxEventPayload::*;
241
242        match event.payload {
243            Empty => (),
244            AccountCreated { account, .. } => {
245                let op = es_entity::DbOp::new(db, event.recorded_at);
246                self.accounts
247                    .sync_account_creation(op, origin, account)
248                    .await?
249            }
250            AccountUpdated {
251                account, fields, ..
252            } => {
253                let op = es_entity::DbOp::new(db, event.recorded_at);
254                self.accounts
255                    .sync_account_update(op, account, fields)
256                    .await?
257            }
258            AccountSetCreated { account_set, .. } => {
259                let op = es_entity::DbOp::new(db, event.recorded_at);
260                self.account_sets
261                    .sync_account_set_creation(op, origin, account_set)
262                    .await?
263            }
264            AccountSetUpdated {
265                account_set,
266                fields,
267                ..
268            } => {
269                let op = es_entity::DbOp::new(db, event.recorded_at);
270                self.account_sets
271                    .sync_account_set_update(op, account_set, fields)
272                    .await?
273            }
274            AccountSetMemberCreated {
275                account_set_id,
276                member_id,
277                ..
278            } => {
279                let op = es_entity::DbOp::new(db, event.recorded_at);
280                self.account_sets
281                    .sync_account_set_member_creation(op, origin, account_set_id, member_id)
282                    .await?
283            }
284            AccountSetMemberRemoved {
285                account_set_id,
286                member_id,
287                ..
288            } => {
289                let op = es_entity::DbOp::new(db, event.recorded_at);
290                self.account_sets
291                    .sync_account_set_member_removal(op, origin, account_set_id, member_id)
292                    .await?
293            }
294            JournalCreated { journal, .. } => {
295                let op = es_entity::DbOp::new(db, event.recorded_at);
296                self.journals
297                    .sync_journal_creation(op, origin, journal)
298                    .await?
299            }
300            JournalUpdated {
301                journal, fields, ..
302            } => {
303                let op = es_entity::DbOp::new(db, event.recorded_at);
304                self.journals
305                    .sync_journal_update(op, journal, fields)
306                    .await?
307            }
308            TransactionCreated { transaction, .. } => {
309                let op = es_entity::DbOp::new(db, event.recorded_at);
310                self.transactions
311                    .sync_transaction_creation(op, origin, transaction)
312                    .await?
313            }
314            TxTemplateCreated { tx_template, .. } => {
315                let op = es_entity::DbOp::new(db, event.recorded_at);
316                self.tx_templates
317                    .sync_tx_template_creation(op, origin, tx_template)
318                    .await?
319            }
320            EntryCreated { entry, .. } => {
321                let op = es_entity::DbOp::new(db, event.recorded_at);
322                self.entries.sync_entry_creation(op, origin, entry).await?
323            }
324            BalanceCreated { balance, .. } => {
325                self.balances
326                    .sync_balance_creation(db, origin, balance)
327                    .await?
328            }
329            BalanceUpdated { balance, .. } => {
330                self.balances
331                    .sync_balance_update(db, origin, balance)
332                    .await?
333            }
334        }
335        Ok(())
336    }
337
338    pub async fn await_outbox_handle(&self) -> Result<(), LedgerError> {
339        let handle = { self.outbox_handle.lock().expect("poisened mutex").take() };
340        if let Some(handle) = handle {
341            return handle.await.expect("Couldn't await outbox handle");
342        }
343        Ok(())
344    }
345
346    pub fn shutdown_outbox(&mut self) -> Result<(), LedgerError> {
347        if let Some(handle) = self.outbox_handle.lock().expect("poisened mutex").take() {
348            handle.abort();
349        }
350        Ok(())
351    }
352
353    fn start_outbox_server(
354        config: server::OutboxServerConfig,
355        outbox: Outbox,
356    ) -> tokio::task::JoinHandle<Result<(), LedgerError>> {
357        tokio::spawn(async move {
358            server::start(config, outbox).await?;
359            Ok(())
360        })
361    }
362}