1pub mod config;
2pub mod error;
3
4use sqlx::PgPool;
5use std::sync::{Arc, Mutex};
6pub use tracing::instrument;
7
8pub use config::*;
9use error::*;
10
11use crate::{
12 account::Accounts,
13 account_set::AccountSets,
14 balance::Balances,
15 entry::Entries,
16 journal::Journals,
17 ledger_operation::*,
18 outbox::{server, EventSequence, Outbox, OutboxListener},
19 primitives::TransactionId,
20 transaction::{Transaction, Transactions},
21 tx_template::{Params, TxTemplates},
22 velocity::Velocities,
23};
24#[cfg(feature = "import")]
25mod import_deps {
26 pub use crate::primitives::DataSourceId;
27 pub use cala_types::outbox::OutboxEvent;
28}
29#[cfg(feature = "import")]
30use import_deps::*;
31
32#[derive(Clone)]
33pub struct CalaLedger {
34 pool: PgPool,
35 accounts: Accounts,
36 account_sets: AccountSets,
37 journals: Journals,
38 transactions: Transactions,
39 tx_templates: TxTemplates,
40 entries: Entries,
41 velocities: Velocities,
42 balances: Balances,
43 outbox: Outbox,
44 #[allow(clippy::type_complexity)]
45 outbox_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), LedgerError>>>>>,
46}
47
48impl CalaLedger {
49 pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
50 let pool = match (config.pool, config.pg_con) {
51 (Some(pool), None) => pool,
52 (None, Some(pg_con)) => {
53 let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
54 if let Some(max_connections) = config.max_connections {
55 pool_opts = pool_opts.max_connections(max_connections);
56 }
57 pool_opts.connect(&pg_con).await?
58 }
59 _ => {
60 return Err(LedgerError::ConfigError(
61 "One of pg_con or pool must be set".to_string(),
62 ))
63 }
64 };
65 if config.exec_migrations {
66 sqlx::migrate!().run(&pool).await?;
67 }
68
69 let outbox = Outbox::init(&pool).await?;
70 let mut outbox_handle = None;
71 if let Some(outbox_config) = config.outbox {
72 outbox_handle = Some(Self::start_outbox_server(outbox_config, outbox.clone()));
73 }
74
75 let accounts = Accounts::new(&pool, outbox.clone());
76 let journals = Journals::new(&pool, outbox.clone());
77 let tx_templates = TxTemplates::new(&pool, outbox.clone());
78 let transactions = Transactions::new(&pool, outbox.clone());
79 let entries = Entries::new(&pool, outbox.clone());
80 let balances = Balances::new(&pool, outbox.clone(), &journals);
81 let velocities = Velocities::new(&pool, outbox.clone());
82 let account_sets = AccountSets::new(&pool, outbox.clone(), &accounts, &entries, &balances);
83 Ok(Self {
84 accounts,
85 account_sets,
86 journals,
87 tx_templates,
88 outbox,
89 transactions,
90 entries,
91 balances,
92 velocities,
93 outbox_handle: Arc::new(Mutex::new(outbox_handle)),
94 pool,
95 })
96 }
97
98 pub fn pool(&self) -> &PgPool {
99 &self.pool
100 }
101
102 pub async fn begin_operation(&self) -> Result<LedgerOperation<'static>, LedgerError> {
103 Ok(LedgerOperation::init(&self.pool, &self.outbox).await?)
104 }
105
106 pub fn ledger_operation_from_db_op<'a>(
107 &self,
108 db_op: es_entity::DbOpWithTime<'a>,
109 ) -> LedgerOperation<'a> {
110 LedgerOperation::new(db_op, &self.outbox)
111 }
112
113 pub fn accounts(&self) -> &Accounts {
116 &self.accounts
117 }
118
119 pub fn velocities(&self) -> &Velocities {
120 &self.velocities
121 }
122
123 pub fn account_sets(&self) -> &AccountSets {
124 &self.account_sets
125 }
126
127 pub fn journals(&self) -> &Journals {
128 &self.journals
129 }
130
131 pub fn tx_templates(&self) -> &TxTemplates {
132 &self.tx_templates
133 }
134
135 pub fn balances(&self) -> &Balances {
136 &self.balances
137 }
138
139 pub fn entries(&self) -> &Entries {
140 &self.entries
141 }
142
143 pub fn transactions(&self) -> &Transactions {
144 &self.transactions
145 }
146
147 pub async fn post_transaction(
148 &self,
149 tx_id: TransactionId,
150 tx_template_code: &str,
151 params: impl Into<Params> + std::fmt::Debug,
152 ) -> Result<Transaction, LedgerError> {
153 let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
154 let transaction = self
155 .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
156 .await?;
157 db.commit().await?;
158 Ok(transaction)
159 }
160
161 #[instrument(
162 name = "cala_ledger.transaction_post",
163 skip(self, db)
164 fields(transaction_id, external_id)
165 err
166 )]
167 pub async fn post_transaction_in_op(
168 &self,
169 db: &mut LedgerOperation<'_>,
170 tx_id: TransactionId,
171 tx_template_code: &str,
172 params: impl Into<Params> + std::fmt::Debug,
173 ) -> Result<Transaction, LedgerError> {
174 let prepared_tx = self
175 .tx_templates
176 .prepare_transaction(db.now(), tx_id, tx_template_code, params.into())
177 .await?;
178
179 let transaction = self
180 .transactions
181 .create_in_op(db, prepared_tx.transaction)
182 .await?;
183
184 let span = tracing::Span::current();
185 span.record("transaction_id", transaction.id().to_string());
186 span.record("external_id", &transaction.values().external_id);
187
188 let entries = self
189 .entries
190 .create_all_in_op(db, prepared_tx.entries)
191 .await?;
192
193 let account_ids = entries
194 .iter()
195 .map(|entry| entry.account_id)
196 .collect::<Vec<_>>();
197 let mappings = self
198 .account_sets
199 .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
200 .await?;
201
202 self.velocities
203 .update_balances_with_limit_enforcement_in_op(
204 db,
205 transaction.created_at(),
206 transaction.values(),
207 &entries,
208 &account_ids,
209 &mappings,
210 )
211 .await?;
212
213 self.balances
214 .update_balances_in_op(
215 db,
216 transaction.journal_id(),
217 entries,
218 transaction.effective(),
219 transaction.created_at(),
220 mappings,
221 )
222 .await?;
223 Ok(transaction)
224 }
225
226 pub async fn void_transaction(
227 &self,
228 voiding_tx_id: TransactionId,
229 existing_tx_id: TransactionId,
230 ) -> Result<Transaction, LedgerError> {
231 let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
232 let transaction = self
233 .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
234 .await?;
235 db.commit().await?;
236 Ok(transaction)
237 }
238
239 #[instrument(
240 name = "cala_ledger.transaction_void",
241 skip(self, db)
242 fields(transaction_id, external_id)
243 err
244 )]
245 pub async fn void_transaction_in_op(
246 &self,
247 db: &mut LedgerOperation<'_>,
248 voiding_tx_id: TransactionId,
249 existing_tx_id: TransactionId,
250 ) -> Result<Transaction, LedgerError> {
251 let new_entries = self
252 .entries
253 .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
254 .await?;
255
256 let transaction = self
257 .transactions()
258 .create_voided_tx_in_op(
259 db,
260 voiding_tx_id,
261 existing_tx_id,
262 new_entries.iter().map(|entry| entry.id),
263 )
264 .await?;
265
266 let span = tracing::Span::current();
267 span.record("transaction_id", transaction.id().to_string());
268 span.record("external_id", &transaction.values().external_id);
269
270 let entries = self.entries.create_all_in_op(db, new_entries).await?;
271
272 let account_ids = entries
273 .iter()
274 .map(|entry| entry.account_id)
275 .collect::<Vec<_>>();
276 let mappings = self
277 .account_sets
278 .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
279 .await?;
280
281 self.velocities
282 .update_balances_with_limit_enforcement_in_op(
283 db,
284 transaction.created_at(),
285 transaction.values(),
286 &entries,
287 &account_ids,
288 &mappings,
289 )
290 .await?;
291
292 self.balances
293 .update_balances_in_op(
294 db,
295 transaction.journal_id(),
296 entries,
297 transaction.effective(),
298 transaction.created_at(),
299 mappings,
300 )
301 .await?;
302 Ok(transaction)
303 }
304
305 pub async fn register_outbox_listener(
306 &self,
307 start_after: Option<EventSequence>,
308 ) -> Result<OutboxListener, LedgerError> {
309 Ok(self.outbox.register_listener(start_after).await?)
310 }
311
312 #[cfg(feature = "import")]
313 #[instrument(name = "cala_ledger.sync_outbox_event", skip(self, db))]
314 pub async fn sync_outbox_event(
315 &self,
316 db: sqlx::Transaction<'_, sqlx::Postgres>,
317 origin: DataSourceId,
318 event: OutboxEvent,
319 ) -> Result<(), LedgerError> {
320 use crate::outbox::OutboxEventPayload::*;
321
322 match event.payload {
323 Empty => (),
324 AccountCreated { account, .. } => {
325 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
326 self.accounts
327 .sync_account_creation(op, origin, account)
328 .await?
329 }
330 AccountUpdated {
331 account, fields, ..
332 } => {
333 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
334 self.accounts
335 .sync_account_update(op, account, fields)
336 .await?
337 }
338 AccountSetCreated { account_set, .. } => {
339 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
340 self.account_sets
341 .sync_account_set_creation(op, origin, account_set)
342 .await?
343 }
344 AccountSetUpdated {
345 account_set,
346 fields,
347 ..
348 } => {
349 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
350 self.account_sets
351 .sync_account_set_update(op, account_set, fields)
352 .await?
353 }
354 AccountSetMemberCreated {
355 account_set_id,
356 member_id,
357 ..
358 } => {
359 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
360 self.account_sets
361 .sync_account_set_member_creation(op, origin, account_set_id, member_id)
362 .await?
363 }
364 AccountSetMemberRemoved {
365 account_set_id,
366 member_id,
367 ..
368 } => {
369 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
370 self.account_sets
371 .sync_account_set_member_removal(op, origin, account_set_id, member_id)
372 .await?
373 }
374 JournalCreated { journal, .. } => {
375 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
376 self.journals
377 .sync_journal_creation(op, origin, journal)
378 .await?
379 }
380 JournalUpdated {
381 journal, fields, ..
382 } => {
383 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
384 self.journals
385 .sync_journal_update(op, journal, fields)
386 .await?
387 }
388 TransactionCreated { transaction, .. } => {
389 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
390 self.transactions
391 .sync_transaction_creation(op, origin, transaction)
392 .await?
393 }
394 TransactionUpdated { transaction, .. } => {
395 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
396 self.transactions
397 .sync_transaction_update(op, origin, transaction)
398 .await?
399 }
400 TxTemplateCreated { tx_template, .. } => {
401 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
402 self.tx_templates
403 .sync_tx_template_creation(op, origin, tx_template)
404 .await?
405 }
406 EntryCreated { entry, .. } => {
407 let op = es_entity::DbOp::from(db).with_time(event.recorded_at);
408 self.entries.sync_entry_creation(op, origin, entry).await?
409 }
410 BalanceCreated { balance, .. } => {
411 self.balances
412 .sync_balance_creation(db, origin, balance)
413 .await?
414 }
415 BalanceUpdated { balance, .. } => {
416 self.balances
417 .sync_balance_update(db, origin, balance)
418 .await?
419 }
420 }
421 Ok(())
422 }
423
424 pub async fn await_outbox_handle(&self) -> Result<(), LedgerError> {
425 let handle = { self.outbox_handle.lock().expect("poisened mutex").take() };
426 if let Some(handle) = handle {
427 return handle.await.expect("Couldn't await outbox handle");
428 }
429 Ok(())
430 }
431
432 pub fn shutdown_outbox(&mut self) -> Result<(), LedgerError> {
433 if let Some(handle) = self.outbox_handle.lock().expect("poisened mutex").take() {
434 handle.abort();
435 }
436 Ok(())
437 }
438
439 fn start_outbox_server(
440 config: server::OutboxServerConfig,
441 outbox: Outbox,
442 ) -> tokio::task::JoinHandle<Result<(), LedgerError>> {
443 tokio::spawn(async move {
444 server::start(config, outbox).await?;
445 Ok(())
446 })
447 }
448}