1pub mod config;
2pub mod error;
3
4use es_entity::clock::ClockHandle;
5use sqlx::PgPool;
6use std::sync::{Arc, Mutex};
7pub use tracing::instrument;
8use tracing::Instrument;
9
10pub use config::*;
11use error::*;
12
13use crate::{
14 account::Accounts,
15 account_set::AccountSets,
16 balance::Balances,
17 entry::Entries,
18 journal::Journals,
19 outbox::{server, OutboxPublisher},
20 primitives::TransactionId,
21 transaction::{Transaction, Transactions},
22 tx_template::{Params, TxTemplates},
23 velocity::Velocities,
24};
25#[cfg(feature = "import")]
26mod import_deps {
27 pub use crate::primitives::DataSourceId;
28}
29#[cfg(feature = "import")]
30use import_deps::*;
31
32#[derive(Clone)]
33pub struct CalaLedger {
34 pool: PgPool,
35 clock: ClockHandle,
36 accounts: Accounts,
37 account_sets: AccountSets,
38 journals: Journals,
39 transactions: Transactions,
40 tx_templates: TxTemplates,
41 entries: Entries,
42 velocities: Velocities,
43 balances: Balances,
44 publisher: OutboxPublisher,
45 #[allow(clippy::type_complexity)]
46 outbox_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), LedgerError>>>>>,
47}
48
49impl CalaLedger {
50 #[instrument(name = "cala_ledger.init", skip_all)]
51 pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
52 let pool = match (config.pool, config.pg_con) {
53 (Some(pool), None) => pool,
54 (None, Some(pg_con)) => {
55 let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
56 if let Some(max_connections) = config.max_connections {
57 pool_opts = pool_opts.max_connections(max_connections);
58 }
59 pool_opts.connect(&pg_con).await?
60 }
61 _ => {
62 return Err(LedgerError::ConfigError(
63 "One of pg_con or pool must be set".to_string(),
64 ))
65 }
66 };
67 if config.exec_migrations {
68 sqlx::migrate!()
69 .run(&pool)
70 .instrument(tracing::info_span!("cala_ledger.migrations"))
71 .await?;
72 }
73
74 let clock = config.clock;
75 let publisher = OutboxPublisher::init(&pool, &clock).await?;
76 let mut outbox_handle = None;
77 if let Some(outbox_config) = config.outbox {
78 outbox_handle = Some(Self::start_outbox_server(
79 outbox_config,
80 publisher.inner().clone(),
81 ));
82 }
83 let accounts = Accounts::new(&pool, &publisher, &clock);
84 let journals = Journals::new(&pool, &publisher, &clock);
85 let tx_templates = TxTemplates::new(&pool, &publisher, &clock);
86 let transactions = Transactions::new(&pool, &publisher);
87 let entries = Entries::new(&pool, &publisher);
88 let balances = Balances::new(&pool, &publisher, &journals);
89 let velocities = Velocities::new(&pool, &clock);
90 let account_sets =
91 AccountSets::new(&pool, &publisher, &accounts, &entries, &balances, &clock);
92 Ok(Self {
93 accounts,
94 account_sets,
95 journals,
96 tx_templates,
97 publisher,
98 transactions,
99 entries,
100 balances,
101 velocities,
102 outbox_handle: Arc::new(Mutex::new(outbox_handle)),
103 pool,
104 clock,
105 })
106 }
107
108 pub fn pool(&self) -> &PgPool {
109 &self.pool
110 }
111
112 pub fn clock(&self) -> &ClockHandle {
113 &self.clock
114 }
115
116 pub async fn begin_operation(&self) -> Result<es_entity::DbOpWithTime<'static>, LedgerError> {
117 let db_op = es_entity::DbOp::init_with_clock(&self.pool, &self.clock)
118 .await?
119 .with_clock_time();
120 Ok(db_op)
121 }
122
123 pub fn accounts(&self) -> &Accounts {
124 &self.accounts
125 }
126
127 pub fn velocities(&self) -> &Velocities {
128 &self.velocities
129 }
130
131 pub fn account_sets(&self) -> &AccountSets {
132 &self.account_sets
133 }
134
135 pub fn journals(&self) -> &Journals {
136 &self.journals
137 }
138
139 pub fn tx_templates(&self) -> &TxTemplates {
140 &self.tx_templates
141 }
142
143 pub fn balances(&self) -> &Balances {
144 &self.balances
145 }
146
147 pub fn entries(&self) -> &Entries {
148 &self.entries
149 }
150
151 pub fn transactions(&self) -> &Transactions {
152 &self.transactions
153 }
154
155 #[instrument(
156 name = "cala_ledger.post_transaction",
157 skip(self, params),
158 fields(tx_template_code)
159 )]
160 pub async fn post_transaction(
161 &self,
162 tx_id: TransactionId,
163 tx_template_code: &str,
164 params: impl Into<Params> + std::fmt::Debug,
165 ) -> Result<Transaction, LedgerError> {
166 let mut db = es_entity::DbOp::init_with_clock(&self.pool, &self.clock).await?;
167 let transaction = self
168 .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
169 .await?;
170 db.commit().await?;
171 Ok(transaction)
172 }
173
174 #[instrument(
175 name = "cala_ledger.post_transaction_in_op",
176 skip(self, db)
177 fields(transaction_id, external_id)
178 )]
179 pub async fn post_transaction_in_op(
180 &self,
181 db: &mut impl es_entity::AtomicOperation,
182 tx_id: TransactionId,
183 tx_template_code: &str,
184 params: impl Into<Params> + std::fmt::Debug,
185 ) -> Result<Transaction, LedgerError> {
186 let mut db = es_entity::OpWithTime::cached_or_db_time(db).await?;
187 let time = db.now();
188 let prepared_tx = self
189 .tx_templates
190 .prepare_transaction_in_op(&mut db, time, tx_id, tx_template_code, params.into())
191 .await?;
192
193 let transaction = self
194 .transactions
195 .create_in_op(&mut db, prepared_tx.transaction)
196 .await?;
197
198 let span = tracing::Span::current();
199 span.record("transaction_id", transaction.id().to_string());
200 span.record("external_id", &transaction.values().external_id);
201
202 let entries = self
203 .entries
204 .create_all_in_op(&mut db, prepared_tx.entries)
205 .await?;
206
207 let account_ids = entries
208 .iter()
209 .map(|entry| entry.account_id)
210 .collect::<Vec<_>>();
211 let mappings = self
212 .account_sets
213 .fetch_mappings_in_op(&mut db, transaction.values().journal_id, &account_ids)
214 .await?;
215
216 self.velocities
217 .update_balances_with_limit_enforcement_in_op(
218 &mut db,
219 transaction.created_at(),
220 transaction.values(),
221 &entries,
222 &account_ids,
223 &mappings,
224 )
225 .await?;
226
227 self.balances
228 .update_balances_in_op(
229 &mut db,
230 transaction.journal_id(),
231 entries,
232 transaction.effective(),
233 transaction.created_at(),
234 mappings,
235 )
236 .await?;
237 Ok(transaction)
238 }
239
240 #[instrument(name = "cala_ledger.void_transaction", skip(self))]
241 pub async fn void_transaction(
242 &self,
243 voiding_tx_id: TransactionId,
244 existing_tx_id: TransactionId,
245 ) -> Result<Transaction, LedgerError> {
246 let mut db = self.begin_operation().await?;
247 let transaction = self
248 .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
249 .await?;
250 db.commit().await?;
251 Ok(transaction)
252 }
253
254 #[instrument(
255 name = "cala_ledger.transaction_void",
256 skip(self, db)
257 fields(transaction_id, external_id)
258 )]
259 pub async fn void_transaction_in_op(
260 &self,
261 db: &mut impl es_entity::AtomicOperationWithTime,
262 voiding_tx_id: TransactionId,
263 existing_tx_id: TransactionId,
264 ) -> Result<Transaction, LedgerError> {
265 let new_entries = self
266 .entries
267 .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
268 .await?;
269
270 let transaction = self
271 .transactions()
272 .create_voided_tx_in_op(
273 db,
274 voiding_tx_id,
275 existing_tx_id,
276 new_entries.iter().map(|entry| entry.id),
277 )
278 .await?;
279
280 let span = tracing::Span::current();
281 span.record("transaction_id", transaction.id().to_string());
282 span.record("external_id", &transaction.values().external_id);
283
284 let entries = self.entries.create_all_in_op(db, new_entries).await?;
285
286 let account_ids = entries
287 .iter()
288 .map(|entry| entry.account_id)
289 .collect::<Vec<_>>();
290 let mappings = self
291 .account_sets
292 .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
293 .await?;
294
295 self.velocities
296 .update_balances_with_limit_enforcement_in_op(
297 db,
298 transaction.created_at(),
299 transaction.values(),
300 &entries,
301 &account_ids,
302 &mappings,
303 )
304 .await?;
305
306 self.balances
307 .update_balances_in_op(
308 db,
309 transaction.journal_id(),
310 entries,
311 transaction.effective(),
312 transaction.created_at(),
313 mappings,
314 )
315 .await?;
316 Ok(transaction)
317 }
318
319 pub fn outbox(&self) -> &crate::outbox::ObixOutbox {
320 self.publisher.inner()
321 }
322
323 pub fn register_outbox_listener(
324 &self,
325 start_after: Option<obix::EventSequence>,
326 ) -> obix::out::PersistentOutboxListener<crate::outbox::OutboxEventPayload> {
327 self.publisher.inner().listen_persisted(start_after)
328 }
329
330 #[cfg(feature = "import")]
331 #[instrument(name = "cala_ledger.sync_outbox_event", skip(self, db))]
332 pub async fn sync_outbox_event(
333 &self,
334 db: es_entity::DbOp<'_>,
335 origin: DataSourceId,
336 event: obix::out::PersistentOutboxEvent<crate::outbox::OutboxEventPayload>,
337 ) -> Result<(), LedgerError> {
338 use crate::outbox::OutboxEventPayload::*;
339 use es_entity::WithEventContext;
340
341 let Some(payload) = event.payload else {
342 return Ok(());
343 };
344
345 match payload {
346 Empty => (),
347 AccountCreated { account, .. } => {
348 let op = db.with_time(event.recorded_at);
349 self.accounts
350 .sync_account_creation(op, origin, account)
351 .await?
352 }
353 AccountUpdated {
354 account, fields, ..
355 } => {
356 let data = {
357 let mut ctx = es_entity::context::EventContext::current();
358 let _ = ctx.insert("data_source", &origin);
359 ctx.data()
360 };
361 let op = db.with_time(event.recorded_at);
362 self.accounts
363 .sync_account_update(op, account, fields)
364 .with_event_context(data)
365 .await?
366 }
367 AccountSetCreated { account_set, .. } => {
368 let op = db.with_time(event.recorded_at);
369 self.account_sets
370 .sync_account_set_creation(op, origin, account_set)
371 .await?
372 }
373 AccountSetUpdated {
374 account_set,
375 fields,
376 ..
377 } => {
378 let data = {
379 let mut ctx = es_entity::context::EventContext::current();
380 let _ = ctx.insert("data_source", &origin);
381 ctx.data()
382 };
383 let op = db.with_time(event.recorded_at);
384 self.account_sets
385 .sync_account_set_update(op, account_set, fields)
386 .with_event_context(data)
387 .await?
388 }
389 AccountSetMemberCreated {
390 account_set_id,
391 member_id,
392 ..
393 } => {
394 let op = db.with_time(event.recorded_at);
395 self.account_sets
396 .sync_account_set_member_creation(op, origin, account_set_id, member_id)
397 .await?
398 }
399 AccountSetMemberRemoved {
400 account_set_id,
401 member_id,
402 ..
403 } => {
404 let op = db.with_time(event.recorded_at);
405 self.account_sets
406 .sync_account_set_member_removal(op, origin, account_set_id, member_id)
407 .await?
408 }
409 JournalCreated { journal, .. } => {
410 let op = db.with_time(event.recorded_at);
411 self.journals
412 .sync_journal_creation(op, origin, journal)
413 .await?
414 }
415 JournalUpdated {
416 journal, fields, ..
417 } => {
418 let data = {
419 let mut ctx = es_entity::context::EventContext::current();
420 let _ = ctx.insert("data_source", &origin);
421 ctx.data()
422 };
423 let op = db.with_time(event.recorded_at);
424 self.journals
425 .sync_journal_update(op, journal, fields)
426 .with_event_context(data)
427 .await?
428 }
429 TransactionCreated { transaction, .. } => {
430 let op = db.with_time(event.recorded_at);
431 self.transactions
432 .sync_transaction_creation(op, origin, transaction)
433 .await?
434 }
435 TransactionUpdated { transaction, .. } => {
436 let data = {
437 let mut ctx = es_entity::context::EventContext::current();
438 let _ = ctx.insert("data_source", &origin);
439 ctx.data()
440 };
441 let op = db.with_time(event.recorded_at);
442 self.transactions
443 .sync_transaction_update(op, origin, transaction)
444 .with_event_context(data)
445 .await?
446 }
447 TxTemplateCreated { tx_template, .. } => {
448 let op = db.with_time(event.recorded_at);
449 self.tx_templates
450 .sync_tx_template_creation(op, origin, tx_template)
451 .await?
452 }
453 EntryCreated { entry, .. } => {
454 let op = db.with_time(event.recorded_at);
455 self.entries.sync_entry_creation(op, origin, entry).await?
456 }
457 BalanceCreated { balance, .. } => {
458 let op = db.with_time(event.recorded_at);
459 self.balances
460 .sync_balance_creation(op, origin, balance)
461 .await?
462 }
463 BalanceUpdated { balance, .. } => {
464 let op = db.with_time(event.recorded_at);
465 self.balances
466 .sync_balance_update(op, origin, balance)
467 .await?
468 }
469 }
470 Ok(())
471 }
472
473 pub async fn await_outbox_handle(&self) -> Result<(), LedgerError> {
474 let handle = { self.outbox_handle.lock().expect("poisened mutex").take() };
475 if let Some(handle) = handle {
476 return handle.await.expect("Couldn't await outbox handle");
477 }
478 Ok(())
479 }
480
481 pub fn shutdown_outbox(&mut self) -> Result<(), LedgerError> {
482 if let Some(handle) = self.outbox_handle.lock().expect("poisened mutex").take() {
483 handle.abort();
484 }
485 Ok(())
486 }
487
488 #[instrument(name = "cala_ledger.start_outbox_server", skip(outbox))]
489 fn start_outbox_server(
490 config: server::OutboxServerConfig,
491 outbox: crate::outbox::ObixOutbox,
492 ) -> tokio::task::JoinHandle<Result<(), LedgerError>> {
493 tokio::spawn(async move {
494 server::start(config, outbox).await?;
495 Ok(())
496 })
497 }
498}