1pub mod config;
2pub mod error;
3
4use es_entity::clock::ClockHandle;
5use sqlx::PgPool;
6pub use tracing::instrument;
7use tracing::Instrument;
8
9pub use config::*;
10use error::*;
11
12use crate::{
13 account::Accounts,
14 account_set::AccountSets,
15 balance::Balances,
16 entry::Entries,
17 journal::Journals,
18 outbox::OutboxPublisher,
19 primitives::TransactionId,
20 transaction::{Transaction, Transactions},
21 tx_template::{Params, TxTemplates},
22 velocity::Velocities,
23};
24
25#[derive(Clone)]
26pub struct CalaLedger {
27 pool: PgPool,
28 clock: ClockHandle,
29 accounts: Accounts,
30 account_sets: AccountSets,
31 journals: Journals,
32 transactions: Transactions,
33 tx_templates: TxTemplates,
34 entries: Entries,
35 velocities: Velocities,
36 balances: Balances,
37 publisher: OutboxPublisher,
38}
39
40impl CalaLedger {
41 #[instrument(name = "cala_ledger.init", skip_all)]
42 pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
43 let pool = match (config.pool, config.pg_con) {
44 (Some(pool), None) => pool,
45 (None, Some(pg_con)) => {
46 let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
47 if let Some(max_connections) = config.max_connections {
48 pool_opts = pool_opts.max_connections(max_connections);
49 }
50 pool_opts.connect(&pg_con).await?
51 }
52 _ => {
53 return Err(LedgerError::ConfigError(
54 "One of pg_con or pool must be set".to_string(),
55 ))
56 }
57 };
58 if config.exec_migrations {
59 sqlx::migrate!()
60 .run(&pool)
61 .instrument(tracing::info_span!("cala_ledger.migrations"))
62 .await?;
63 }
64
65 let clock = config.clock;
66 let publisher = OutboxPublisher::init(&pool, &clock).await?;
67 let accounts = Accounts::new(&pool, &publisher, &clock);
68 let journals = Journals::new(&pool, &publisher, &clock);
69 let tx_templates = TxTemplates::new(&pool, &publisher, &clock);
70 let transactions = Transactions::new(&pool, &publisher);
71 let entries = Entries::new(&pool, &publisher);
72 let balances = Balances::new(&pool, &publisher, &journals);
73 let velocities = Velocities::new(&pool, &clock);
74 let account_sets =
75 AccountSets::new(&pool, &publisher, &accounts, &entries, &balances, &clock);
76 Ok(Self {
77 accounts,
78 account_sets,
79 journals,
80 tx_templates,
81 publisher,
82 transactions,
83 entries,
84 balances,
85 velocities,
86 pool,
87 clock,
88 })
89 }
90
91 pub fn pool(&self) -> &PgPool {
92 &self.pool
93 }
94
95 pub fn clock(&self) -> &ClockHandle {
96 &self.clock
97 }
98
99 pub async fn begin_operation(&self) -> Result<es_entity::DbOpWithTime<'static>, LedgerError> {
100 let db_op = es_entity::DbOp::init_with_clock(&self.pool, &self.clock)
101 .await?
102 .with_clock_time();
103 Ok(db_op)
104 }
105
106 pub fn accounts(&self) -> &Accounts {
107 &self.accounts
108 }
109
110 pub fn velocities(&self) -> &Velocities {
111 &self.velocities
112 }
113
114 pub fn account_sets(&self) -> &AccountSets {
115 &self.account_sets
116 }
117
118 pub fn journals(&self) -> &Journals {
119 &self.journals
120 }
121
122 pub fn tx_templates(&self) -> &TxTemplates {
123 &self.tx_templates
124 }
125
126 pub fn balances(&self) -> &Balances {
127 &self.balances
128 }
129
130 pub fn entries(&self) -> &Entries {
131 &self.entries
132 }
133
134 pub fn transactions(&self) -> &Transactions {
135 &self.transactions
136 }
137
138 #[instrument(
139 name = "cala_ledger.post_transaction",
140 skip(self, params),
141 fields(tx_template_code)
142 )]
143 pub async fn post_transaction(
144 &self,
145 tx_id: TransactionId,
146 tx_template_code: &str,
147 params: impl Into<Params> + std::fmt::Debug,
148 ) -> Result<Transaction, LedgerError> {
149 let mut db = es_entity::DbOp::init_with_clock(&self.pool, &self.clock).await?;
150 let transaction = self
151 .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
152 .await?;
153 db.commit().await?;
154 Ok(transaction)
155 }
156
157 #[instrument(
158 name = "cala_ledger.post_transaction_in_op",
159 skip(self, db)
160 fields(transaction_id, external_id)
161 )]
162 pub async fn post_transaction_in_op(
163 &self,
164 db: &mut impl es_entity::AtomicOperation,
165 tx_id: TransactionId,
166 tx_template_code: &str,
167 params: impl Into<Params> + std::fmt::Debug,
168 ) -> Result<Transaction, LedgerError> {
169 let mut db = es_entity::OpWithTime::cached_or_db_time(db).await?;
170 let time = db.now();
171 let prepared_tx = self
172 .tx_templates
173 .prepare_transaction_in_op(&mut db, time, tx_id, tx_template_code, params.into())
174 .await?;
175
176 let transaction = self
177 .transactions
178 .create_in_op(&mut db, prepared_tx.transaction)
179 .await?;
180
181 let span = tracing::Span::current();
182 span.record("transaction_id", transaction.id().to_string());
183 span.record("external_id", &transaction.values().external_id);
184
185 let entries = self
186 .entries
187 .create_all_in_op(&mut db, prepared_tx.entries)
188 .await?;
189
190 let account_ids = entries
191 .iter()
192 .map(|entry| entry.account_id)
193 .collect::<Vec<_>>();
194 let mappings = self
195 .account_sets
196 .fetch_mappings_in_op(&mut db, transaction.values().journal_id, &account_ids)
197 .await?;
198
199 self.velocities
200 .update_balances_with_limit_enforcement_in_op(
201 &mut db,
202 transaction.created_at(),
203 transaction.values(),
204 &entries,
205 &account_ids,
206 &mappings,
207 )
208 .await?;
209
210 self.balances
211 .update_balances_in_op(
212 &mut db,
213 transaction.journal_id(),
214 entries,
215 transaction.effective(),
216 transaction.created_at(),
217 mappings,
218 )
219 .await?;
220 Ok(transaction)
221 }
222
223 #[instrument(name = "cala_ledger.void_transaction", skip(self))]
224 pub async fn void_transaction(
225 &self,
226 voiding_tx_id: TransactionId,
227 existing_tx_id: TransactionId,
228 ) -> Result<Transaction, LedgerError> {
229 let mut db = self.begin_operation().await?;
230 let transaction = self
231 .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
232 .await?;
233 db.commit().await?;
234 Ok(transaction)
235 }
236
237 #[instrument(
238 name = "cala_ledger.transaction_void",
239 skip(self, db)
240 fields(transaction_id, external_id)
241 )]
242 pub async fn void_transaction_in_op(
243 &self,
244 db: &mut impl es_entity::AtomicOperationWithTime,
245 voiding_tx_id: TransactionId,
246 existing_tx_id: TransactionId,
247 ) -> Result<Transaction, LedgerError> {
248 let new_entries = self
249 .entries
250 .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
251 .await?;
252
253 let transaction = self
254 .transactions()
255 .create_voided_tx_in_op(
256 db,
257 voiding_tx_id,
258 existing_tx_id,
259 new_entries.iter().map(|entry| entry.id),
260 )
261 .await?;
262
263 let span = tracing::Span::current();
264 span.record("transaction_id", transaction.id().to_string());
265 span.record("external_id", &transaction.values().external_id);
266
267 let entries = self.entries.create_all_in_op(db, new_entries).await?;
268
269 let account_ids = entries
270 .iter()
271 .map(|entry| entry.account_id)
272 .collect::<Vec<_>>();
273 let mappings = self
274 .account_sets
275 .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
276 .await?;
277
278 self.velocities
279 .update_balances_with_limit_enforcement_in_op(
280 db,
281 transaction.created_at(),
282 transaction.values(),
283 &entries,
284 &account_ids,
285 &mappings,
286 )
287 .await?;
288
289 self.balances
290 .update_balances_in_op(
291 db,
292 transaction.journal_id(),
293 entries,
294 transaction.effective(),
295 transaction.created_at(),
296 mappings,
297 )
298 .await?;
299 Ok(transaction)
300 }
301
302 pub fn outbox(&self) -> &crate::outbox::ObixOutbox {
303 self.publisher.inner()
304 }
305
306 pub fn register_outbox_listener(
307 &self,
308 start_after: Option<obix::EventSequence>,
309 ) -> obix::out::PersistentOutboxListener<crate::outbox::OutboxEventPayload> {
310 self.publisher.inner().listen_persisted(start_after)
311 }
312}