1pub mod config;
2pub mod error;
3
4use sqlx::PgPool;
5use std::sync::{Arc, Mutex};
6pub use tracing::instrument;
7
8pub use config::*;
9use error::*;
10
11use crate::{
12 account::Accounts,
13 account_set::AccountSets,
14 balance::Balances,
15 entry::Entries,
16 journal::Journals,
17 ledger_operation::*,
18 outbox::{server, EventSequence, Outbox, OutboxListener},
19 primitives::TransactionId,
20 transaction::{Transaction, Transactions},
21 tx_template::{Params, TxTemplates},
22 velocity::Velocities,
23};
24#[cfg(feature = "import")]
25mod import_deps {
26 pub use crate::primitives::DataSourceId;
27 pub use cala_types::outbox::OutboxEvent;
28}
29#[cfg(feature = "import")]
30use import_deps::*;
31
32#[derive(Clone)]
33pub struct CalaLedger {
34 pool: PgPool,
35 accounts: Accounts,
36 account_sets: AccountSets,
37 journals: Journals,
38 transactions: Transactions,
39 tx_templates: TxTemplates,
40 entries: Entries,
41 velocities: Velocities,
42 balances: Balances,
43 outbox: Outbox,
44 #[allow(clippy::type_complexity)]
45 outbox_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), LedgerError>>>>>,
46}
47
48impl CalaLedger {
49 pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
50 let pool = match (config.pool, config.pg_con) {
51 (Some(pool), None) => pool,
52 (None, Some(pg_con)) => {
53 let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
54 if let Some(max_connections) = config.max_connections {
55 pool_opts = pool_opts.max_connections(max_connections);
56 }
57 pool_opts.connect(&pg_con).await?
58 }
59 _ => {
60 return Err(LedgerError::ConfigError(
61 "One of pg_con or pool must be set".to_string(),
62 ))
63 }
64 };
65 if config.exec_migrations {
66 sqlx::migrate!().run(&pool).await?;
67 }
68
69 let outbox = Outbox::init(&pool).await?;
70 let mut outbox_handle = None;
71 if let Some(outbox_config) = config.outbox {
72 outbox_handle = Some(Self::start_outbox_server(outbox_config, outbox.clone()));
73 }
74
75 let accounts = Accounts::new(&pool, outbox.clone());
76 let journals = Journals::new(&pool, outbox.clone());
77 let tx_templates = TxTemplates::new(&pool, outbox.clone());
78 let transactions = Transactions::new(&pool, outbox.clone());
79 let entries = Entries::new(&pool, outbox.clone());
80 let balances = Balances::new(&pool, outbox.clone());
81 let velocities = Velocities::new(&pool, outbox.clone());
82 let account_sets = AccountSets::new(&pool, outbox.clone(), &accounts, &entries, &balances);
83 Ok(Self {
84 accounts,
85 account_sets,
86 journals,
87 tx_templates,
88 outbox,
89 transactions,
90 entries,
91 balances,
92 velocities,
93 outbox_handle: Arc::new(Mutex::new(outbox_handle)),
94 pool,
95 })
96 }
97
98 pub fn pool(&self) -> &PgPool {
99 &self.pool
100 }
101
102 pub async fn begin_operation<'a>(&self) -> Result<LedgerOperation<'a>, LedgerError> {
103 Ok(LedgerOperation::init(&self.pool, &self.outbox).await?)
104 }
105
106 pub fn ledger_operation_from_db_op<'a>(
107 &self,
108 db_op: es_entity::DbOp<'a>,
109 ) -> LedgerOperation<'a> {
110 LedgerOperation::new(db_op, &self.outbox)
111 }
112
113 pub fn accounts(&self) -> &Accounts {
114 &self.accounts
115 }
116
117 pub fn velocities(&self) -> &Velocities {
118 &self.velocities
119 }
120
121 pub fn account_sets(&self) -> &AccountSets {
122 &self.account_sets
123 }
124
125 pub fn journals(&self) -> &Journals {
126 &self.journals
127 }
128
129 pub fn tx_templates(&self) -> &TxTemplates {
130 &self.tx_templates
131 }
132
133 pub fn balances(&self) -> &Balances {
134 &self.balances
135 }
136
137 pub fn entries(&self) -> &Entries {
138 &self.entries
139 }
140
141 pub fn transactions(&self) -> &Transactions {
142 &self.transactions
143 }
144
145 pub async fn post_transaction(
146 &self,
147 tx_id: TransactionId,
148 tx_template_code: &str,
149 params: impl Into<Params> + std::fmt::Debug,
150 ) -> Result<Transaction, LedgerError> {
151 let mut db = LedgerOperation::init(&self.pool, &self.outbox).await?;
152 let transaction = self
153 .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
154 .await?;
155 db.commit().await?;
156 Ok(transaction)
157 }
158
159 #[instrument(
160 name = "cala_ledger.transaction_post",
161 skip(self, db)
162 fields(transaction_id, external_id)
163 err
164 )]
165 pub async fn post_transaction_in_op(
166 &self,
167 db: &mut LedgerOperation<'_>,
168 tx_id: TransactionId,
169 tx_template_code: &str,
170 params: impl Into<Params> + std::fmt::Debug,
171 ) -> Result<Transaction, LedgerError> {
172 let prepared_tx = self
173 .tx_templates
174 .prepare_transaction(db.op().now(), tx_id, tx_template_code, params.into())
175 .await?;
176
177 let transaction = self
178 .transactions
179 .create_in_op(db, prepared_tx.transaction)
180 .await?;
181
182 let span = tracing::Span::current();
183 span.record("transaction_id", transaction.id().to_string());
184 span.record("external_id", &transaction.values().external_id);
185
186 let entries = self
187 .entries
188 .create_all_in_op(db, prepared_tx.entries)
189 .await?;
190
191 let account_ids = entries
192 .iter()
193 .map(|entry| entry.account_id)
194 .collect::<Vec<_>>();
195 let mappings = self
196 .account_sets
197 .fetch_mappings(transaction.values().journal_id, &account_ids)
198 .await?;
199
200 self.velocities
201 .update_balances_in_op(
202 db,
203 transaction.created_at(),
204 transaction.values(),
205 &entries,
206 &account_ids,
207 )
208 .await?;
209
210 self.balances
211 .update_balances_in_op(
212 db,
213 transaction.created_at(),
214 transaction.journal_id(),
215 entries,
216 mappings,
217 )
218 .await?;
219 Ok(transaction)
220 }
221
222 pub async fn register_outbox_listener(
223 &self,
224 start_after: Option<EventSequence>,
225 ) -> Result<OutboxListener, LedgerError> {
226 Ok(self.outbox.register_listener(start_after).await?)
227 }
228
229 #[cfg(feature = "import")]
230 #[instrument(name = "cala_ledger.sync_outbox_event", skip(self, db))]
231 pub async fn sync_outbox_event(
232 &self,
233 db: sqlx::Transaction<'_, sqlx::Postgres>,
234 origin: DataSourceId,
235 event: OutboxEvent,
236 ) -> Result<(), LedgerError> {
237 use crate::outbox::OutboxEventPayload::*;
238
239 match event.payload {
240 Empty => (),
241 AccountCreated { account, .. } => {
242 let op = es_entity::DbOp::new(db, event.recorded_at);
243 self.accounts
244 .sync_account_creation(op, origin, account)
245 .await?
246 }
247 AccountUpdated {
248 account, fields, ..
249 } => {
250 let op = es_entity::DbOp::new(db, event.recorded_at);
251 self.accounts
252 .sync_account_update(op, account, fields)
253 .await?
254 }
255 AccountSetCreated { account_set, .. } => {
256 let op = es_entity::DbOp::new(db, event.recorded_at);
257 self.account_sets
258 .sync_account_set_creation(op, origin, account_set)
259 .await?
260 }
261 AccountSetUpdated {
262 account_set,
263 fields,
264 ..
265 } => {
266 let op = es_entity::DbOp::new(db, event.recorded_at);
267 self.account_sets
268 .sync_account_set_update(op, account_set, fields)
269 .await?
270 }
271 AccountSetMemberCreated {
272 account_set_id,
273 member_id,
274 ..
275 } => {
276 let op = es_entity::DbOp::new(db, event.recorded_at);
277 self.account_sets
278 .sync_account_set_member_creation(op, origin, account_set_id, member_id)
279 .await?
280 }
281 AccountSetMemberRemoved {
282 account_set_id,
283 member_id,
284 ..
285 } => {
286 let op = es_entity::DbOp::new(db, event.recorded_at);
287 self.account_sets
288 .sync_account_set_member_removal(op, origin, account_set_id, member_id)
289 .await?
290 }
291 JournalCreated { journal, .. } => {
292 let op = es_entity::DbOp::new(db, event.recorded_at);
293 self.journals
294 .sync_journal_creation(op, origin, journal)
295 .await?
296 }
297 JournalUpdated {
298 journal, fields, ..
299 } => {
300 let op = es_entity::DbOp::new(db, event.recorded_at);
301 self.journals
302 .sync_journal_update(op, journal, fields)
303 .await?
304 }
305 TransactionCreated { transaction, .. } => {
306 let op = es_entity::DbOp::new(db, event.recorded_at);
307 self.transactions
308 .sync_transaction_creation(op, origin, transaction)
309 .await?
310 }
311 TxTemplateCreated { tx_template, .. } => {
312 let op = es_entity::DbOp::new(db, event.recorded_at);
313 self.tx_templates
314 .sync_tx_template_creation(op, origin, tx_template)
315 .await?
316 }
317 EntryCreated { entry, .. } => {
318 let op = es_entity::DbOp::new(db, event.recorded_at);
319 self.entries.sync_entry_creation(op, origin, entry).await?
320 }
321 BalanceCreated { balance, .. } => {
322 self.balances
323 .sync_balance_creation(db, origin, balance)
324 .await?
325 }
326 BalanceUpdated { balance, .. } => {
327 self.balances
328 .sync_balance_update(db, origin, balance)
329 .await?
330 }
331 }
332 Ok(())
333 }
334
335 pub async fn await_outbox_handle(&self) -> Result<(), LedgerError> {
336 let handle = { self.outbox_handle.lock().expect("poisened mutex").take() };
337 if let Some(handle) = handle {
338 return handle.await.expect("Couldn't await outbox handle");
339 }
340 Ok(())
341 }
342
343 pub fn shutdown_outbox(&mut self) -> Result<(), LedgerError> {
344 if let Some(handle) = self.outbox_handle.lock().expect("poisened mutex").take() {
345 handle.abort();
346 }
347 Ok(())
348 }
349
350 fn start_outbox_server(
351 config: server::OutboxServerConfig,
352 outbox: Outbox,
353 ) -> tokio::task::JoinHandle<Result<(), LedgerError>> {
354 tokio::spawn(async move {
355 server::start(config, outbox).await?;
356 Ok(())
357 })
358 }
359}