1pub mod config;
2pub mod error;
3
4use sqlx::PgPool;
5use std::sync::{Arc, Mutex};
6pub use tracing::instrument;
7use tracing::Instrument;
8
9pub use config::*;
10use error::*;
11
12use crate::{
13 account::Accounts,
14 account_set::AccountSets,
15 balance::Balances,
16 entry::Entries,
17 journal::Journals,
18 ledger_operation::*,
19 outbox::{server, EventSequence, Outbox, OutboxListener},
20 primitives::TransactionId,
21 transaction::{Transaction, Transactions},
22 tx_template::{Params, TxTemplates},
23 velocity::Velocities,
24};
25#[cfg(feature = "import")]
26mod import_deps {
27 pub use crate::primitives::DataSourceId;
28 pub use cala_types::outbox::OutboxEvent;
29}
30#[cfg(feature = "import")]
31use import_deps::*;
32
33#[derive(Clone)]
34pub struct CalaLedger {
35 pool: PgPool,
36 accounts: Accounts,
37 account_sets: AccountSets,
38 journals: Journals,
39 transactions: Transactions,
40 tx_templates: TxTemplates,
41 entries: Entries,
42 velocities: Velocities,
43 balances: Balances,
44 outbox: Outbox,
45 #[allow(clippy::type_complexity)]
46 outbox_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), LedgerError>>>>>,
47}
48
49impl CalaLedger {
50 #[instrument(name = "cala_ledger.init", skip_all)]
51 pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
52 let pool = match (config.pool, config.pg_con) {
53 (Some(pool), None) => pool,
54 (None, Some(pg_con)) => {
55 let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
56 if let Some(max_connections) = config.max_connections {
57 pool_opts = pool_opts.max_connections(max_connections);
58 }
59 pool_opts.connect(&pg_con).await?
60 }
61 _ => {
62 return Err(LedgerError::ConfigError(
63 "One of pg_con or pool must be set".to_string(),
64 ))
65 }
66 };
67 if config.exec_migrations {
68 sqlx::migrate!()
69 .run(&pool)
70 .instrument(tracing::info_span!("cala_ledger.migrations"))
71 .await?;
72 }
73
74 let outbox = Outbox::init(&pool).await?;
75 let mut outbox_handle = None;
76 if let Some(outbox_config) = config.outbox {
77 outbox_handle = Some(Self::start_outbox_server(outbox_config, outbox.clone()));
78 }
79
80 let accounts = Accounts::new(&pool, outbox.clone());
81 let journals = Journals::new(&pool, outbox.clone());
82 let tx_templates = TxTemplates::new(&pool, outbox.clone());
83 let transactions = Transactions::new(&pool, outbox.clone());
84 let entries = Entries::new(&pool, outbox.clone());
85 let balances = Balances::new(&pool, outbox.clone(), &journals);
86 let velocities = Velocities::new(&pool, outbox.clone());
87 let account_sets = AccountSets::new(&pool, outbox.clone(), &accounts, &entries, &balances);
88 Ok(Self {
89 accounts,
90 account_sets,
91 journals,
92 tx_templates,
93 outbox,
94 transactions,
95 entries,
96 balances,
97 velocities,
98 outbox_handle: Arc::new(Mutex::new(outbox_handle)),
99 pool,
100 })
101 }
102
103 pub fn pool(&self) -> &PgPool {
104 &self.pool
105 }
106
107 pub async fn begin_operation(&self) -> Result<LedgerOperation<'static>, LedgerError> {
108 Ok(LedgerOperation::init(&self.pool, &self.outbox).await?)
109 }
110
111 pub fn ledger_operation_from_db_op<'a>(
112 &self,
113 db_op: es_entity::DbOpWithTime<'a>,
114 ) -> LedgerOperation<'a> {
115 LedgerOperation::new(db_op, &self.outbox)
116 }
117
118 pub fn accounts(&self) -> &Accounts {
119 &self.accounts
120 }
121
122 pub fn velocities(&self) -> &Velocities {
123 &self.velocities
124 }
125
126 pub fn account_sets(&self) -> &AccountSets {
127 &self.account_sets
128 }
129
130 pub fn journals(&self) -> &Journals {
131 &self.journals
132 }
133
134 pub fn tx_templates(&self) -> &TxTemplates {
135 &self.tx_templates
136 }
137
138 pub fn balances(&self) -> &Balances {
139 &self.balances
140 }
141
142 pub fn entries(&self) -> &Entries {
143 &self.entries
144 }
145
146 pub fn transactions(&self) -> &Transactions {
147 &self.transactions
148 }
149
150 #[instrument(
151 name = "cala_ledger.post_transaction",
152 skip(self, params),
153 fields(tx_template_code)
154 )]
155 pub async fn post_transaction(
156 &self,
157 tx_id: TransactionId,
158 tx_template_code: &str,
159 params: impl Into<Params> + std::fmt::Debug,
160 ) -> Result<Transaction, LedgerError> {
161 let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
162 let transaction = self
163 .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
164 .await?;
165 db.commit().await?;
166 Ok(transaction)
167 }
168
169 #[instrument(
170 name = "cala_ledger.post_transaction_in_op",
171 skip(self, db)
172 fields(transaction_id, external_id)
173 )]
174 pub async fn post_transaction_in_op(
175 &self,
176 db: &mut LedgerOperation<'_>,
177 tx_id: TransactionId,
178 tx_template_code: &str,
179 params: impl Into<Params> + std::fmt::Debug,
180 ) -> Result<Transaction, LedgerError> {
181 let prepared_tx = self
182 .tx_templates
183 .prepare_transaction_in_op(db, tx_id, tx_template_code, params.into())
184 .await?;
185
186 let transaction = self
187 .transactions
188 .create_in_op(db, prepared_tx.transaction)
189 .await?;
190
191 let span = tracing::Span::current();
192 span.record("transaction_id", transaction.id().to_string());
193 span.record("external_id", &transaction.values().external_id);
194
195 let entries = self
196 .entries
197 .create_all_in_op(db, prepared_tx.entries)
198 .await?;
199
200 let account_ids = entries
201 .iter()
202 .map(|entry| entry.account_id)
203 .collect::<Vec<_>>();
204 let mappings = self
205 .account_sets
206 .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
207 .await?;
208
209 self.velocities
210 .update_balances_with_limit_enforcement_in_op(
211 db,
212 transaction.created_at(),
213 transaction.values(),
214 &entries,
215 &account_ids,
216 &mappings,
217 )
218 .await?;
219
220 self.balances
221 .update_balances_in_op(
222 db,
223 transaction.journal_id(),
224 entries,
225 transaction.effective(),
226 transaction.created_at(),
227 mappings,
228 )
229 .await?;
230 Ok(transaction)
231 }
232
233 #[instrument(name = "cala_ledger.void_transaction", skip(self))]
234 pub async fn void_transaction(
235 &self,
236 voiding_tx_id: TransactionId,
237 existing_tx_id: TransactionId,
238 ) -> Result<Transaction, LedgerError> {
239 let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
240 let transaction = self
241 .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
242 .await?;
243 db.commit().await?;
244 Ok(transaction)
245 }
246
247 #[instrument(
248 name = "cala_ledger.transaction_void",
249 skip(self, db)
250 fields(transaction_id, external_id)
251 )]
252 pub async fn void_transaction_in_op(
253 &self,
254 db: &mut LedgerOperation<'_>,
255 voiding_tx_id: TransactionId,
256 existing_tx_id: TransactionId,
257 ) -> Result<Transaction, LedgerError> {
258 let new_entries = self
259 .entries
260 .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
261 .await?;
262
263 let transaction = self
264 .transactions()
265 .create_voided_tx_in_op(
266 db,
267 voiding_tx_id,
268 existing_tx_id,
269 new_entries.iter().map(|entry| entry.id),
270 )
271 .await?;
272
273 let span = tracing::Span::current();
274 span.record("transaction_id", transaction.id().to_string());
275 span.record("external_id", &transaction.values().external_id);
276
277 let entries = self.entries.create_all_in_op(db, new_entries).await?;
278
279 let account_ids = entries
280 .iter()
281 .map(|entry| entry.account_id)
282 .collect::<Vec<_>>();
283 let mappings = self
284 .account_sets
285 .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
286 .await?;
287
288 self.velocities
289 .update_balances_with_limit_enforcement_in_op(
290 db,
291 transaction.created_at(),
292 transaction.values(),
293 &entries,
294 &account_ids,
295 &mappings,
296 )
297 .await?;
298
299 self.balances
300 .update_balances_in_op(
301 db,
302 transaction.journal_id(),
303 entries,
304 transaction.effective(),
305 transaction.created_at(),
306 mappings,
307 )
308 .await?;
309 Ok(transaction)
310 }
311
312 pub async fn register_outbox_listener(
313 &self,
314 start_after: Option<EventSequence>,
315 ) -> Result<OutboxListener, LedgerError> {
316 Ok(self.outbox.register_listener(start_after).await?)
317 }
318
319 #[cfg(feature = "import")]
320 #[instrument(name = "cala_ledger.sync_outbox_event", skip(self, db))]
321 pub async fn sync_outbox_event(
322 &self,
323 db: sqlx::Transaction<'_, sqlx::Postgres>,
324 origin: DataSourceId,
325 event: OutboxEvent,
326 ) -> Result<(), LedgerError> {
327 use crate::outbox::OutboxEventPayload::*;
328
329 match event.payload {
330 Empty => (),
331 AccountCreated { account, .. } => {
332 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
333 self.accounts
334 .sync_account_creation(op, origin, account)
335 .await?
336 }
337 AccountUpdated {
338 account, fields, ..
339 } => {
340 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
341 self.accounts
342 .sync_account_update(op, account, fields)
343 .await?
344 }
345 AccountSetCreated { account_set, .. } => {
346 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
347 self.account_sets
348 .sync_account_set_creation(op, origin, account_set)
349 .await?
350 }
351 AccountSetUpdated {
352 account_set,
353 fields,
354 ..
355 } => {
356 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
357 self.account_sets
358 .sync_account_set_update(op, account_set, fields)
359 .await?
360 }
361 AccountSetMemberCreated {
362 account_set_id,
363 member_id,
364 ..
365 } => {
366 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
367 self.account_sets
368 .sync_account_set_member_creation(op, origin, account_set_id, member_id)
369 .await?
370 }
371 AccountSetMemberRemoved {
372 account_set_id,
373 member_id,
374 ..
375 } => {
376 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
377 self.account_sets
378 .sync_account_set_member_removal(op, origin, account_set_id, member_id)
379 .await?
380 }
381 JournalCreated { journal, .. } => {
382 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
383 self.journals
384 .sync_journal_creation(op, origin, journal)
385 .await?
386 }
387 JournalUpdated {
388 journal, fields, ..
389 } => {
390 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
391 self.journals
392 .sync_journal_update(op, journal, fields)
393 .await?
394 }
395 TransactionCreated { transaction, .. } => {
396 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
397 self.transactions
398 .sync_transaction_creation(op, origin, transaction)
399 .await?
400 }
401 TransactionUpdated { transaction, .. } => {
402 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
403 self.transactions
404 .sync_transaction_update(op, origin, transaction)
405 .await?
406 }
407 TxTemplateCreated { tx_template, .. } => {
408 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
409 self.tx_templates
410 .sync_tx_template_creation(op, origin, tx_template)
411 .await?
412 }
413 EntryCreated { entry, .. } => {
414 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
415 self.entries.sync_entry_creation(op, origin, entry).await?
416 }
417 BalanceCreated { balance, .. } => {
418 self.balances
419 .sync_balance_creation(db, origin, balance)
420 .await?
421 }
422 BalanceUpdated { balance, .. } => {
423 self.balances
424 .sync_balance_update(db, origin, balance)
425 .await?
426 }
427 }
428 Ok(())
429 }
430
431 pub async fn await_outbox_handle(&self) -> Result<(), LedgerError> {
432 let handle = { self.outbox_handle.lock().expect("poisened mutex").take() };
433 if let Some(handle) = handle {
434 return handle.await.expect("Couldn't await outbox handle");
435 }
436 Ok(())
437 }
438
439 pub fn shutdown_outbox(&mut self) -> Result<(), LedgerError> {
440 if let Some(handle) = self.outbox_handle.lock().expect("poisened mutex").take() {
441 handle.abort();
442 }
443 Ok(())
444 }
445
446 #[instrument(name = "cala_ledger.start_outbox_server", skip(outbox))]
447 fn start_outbox_server(
448 config: server::OutboxServerConfig,
449 outbox: Outbox,
450 ) -> tokio::task::JoinHandle<Result<(), LedgerError>> {
451 tokio::spawn(async move {
452 server::start(config, outbox).await?;
453 Ok(())
454 })
455 }
456}