sqlx_ledger/ledger/
mod.rs1use sqlx::{Acquire, PgPool, Postgres, Transaction};
2use tracing::instrument;
3
4use std::collections::HashMap;
5
6use crate::{
7 account::Accounts, balance::*, entry::*, error::*, event::*, journal::*, primitives::*,
8 transaction::*, tx_template::*,
9};
10
11#[derive(Debug, Clone)]
12pub struct SqlxLedger {
13 pool: PgPool,
14 accounts: Accounts,
15 journals: Journals,
16 tx_templates: TxTemplates,
17 transactions: Transactions,
18 entries: Entries,
19 balances: Balances,
20}
21
22impl SqlxLedger {
23 pub fn new(pool: &PgPool) -> Self {
24 Self {
25 accounts: Accounts::new(pool),
26 journals: Journals::new(pool),
27 tx_templates: TxTemplates::new(pool),
28 transactions: Transactions::new(pool),
29 entries: Entries::new(pool),
30 balances: Balances::new(pool),
31 pool: pool.clone(),
32 }
33 }
34
35 pub fn accounts(&self) -> &Accounts {
36 &self.accounts
37 }
38
39 pub fn journals(&self) -> &Journals {
40 &self.journals
41 }
42
43 pub fn tx_templates(&self) -> &TxTemplates {
44 &self.tx_templates
45 }
46
47 pub fn entries(&self) -> &Entries {
48 &self.entries
49 }
50
51 pub fn balances(&self) -> &Balances {
52 &self.balances
53 }
54
55 pub fn transactions(&self) -> &Transactions {
56 &self.transactions
57 }
58
59 pub async fn post_transaction(
60 &self,
61 tx_id: TransactionId,
62 tx_template_code: &str,
63 params: Option<impl Into<TxParams> + std::fmt::Debug>,
64 ) -> Result<(), SqlxLedgerError> {
65 let tx = self.pool.begin().await?;
66 self.post_transaction_in_tx(tx, tx_id, tx_template_code, params)
67 .await?;
68 Ok(())
69 }
70
71 #[instrument(name = "sqlx_ledger.ledger.post_transaction", skip(self, tx))]
72 pub async fn post_transaction_in_tx(
73 &self,
74 mut tx: Transaction<'_, Postgres>,
75 tx_id: TransactionId,
76 tx_template_code: &str,
77 params: Option<impl Into<TxParams> + std::fmt::Debug>,
78 ) -> Result<(), SqlxLedgerError> {
79 let tx_template = self.tx_templates.find_core(tx_template_code).await?;
80 let (new_tx, new_entries) =
81 tx_template.prep_tx(params.map(|p| p.into()).unwrap_or_default())?;
82 let (journal_id, tx_id) = self
83 .transactions
84 .create_in_tx(&mut tx, tx_id, new_tx)
85 .await?;
86 let entries = self
87 .entries
88 .create_all(journal_id, tx_id, new_entries, &mut tx)
89 .await?;
90 {
91 let ids: Vec<(AccountId, &Currency)> = entries
92 .iter()
93 .map(|entry| (entry.account_id, &entry.currency))
94 .collect();
95 let mut balance_tx = tx.begin().await?;
96
97 let mut balances = self
98 .balances
99 .find_for_update(journal_id, ids.clone(), &mut balance_tx)
100 .await?;
101 let mut latest_balances: HashMap<(AccountId, &Currency), BalanceDetails> =
102 HashMap::new();
103 let mut new_balances = Vec::new();
104 for entry in entries.iter() {
105 let balance = match (
106 latest_balances.remove(&(entry.account_id, &entry.currency)),
107 balances.remove(&(entry.account_id, entry.currency)),
108 ) {
109 (Some(latest), _) => {
110 new_balances.push(latest.clone());
111 latest
112 }
113 (_, Some(balance)) => balance,
114 _ => {
115 latest_balances.insert(
116 (entry.account_id, &entry.currency),
117 BalanceDetails::init(journal_id, entry),
118 );
119 continue;
120 }
121 };
122 latest_balances.insert((entry.account_id, &entry.currency), balance.update(entry));
123 }
124 new_balances.extend(latest_balances.into_values());
125
126 self.balances
127 .update_balances(journal_id, new_balances, &mut balance_tx)
128 .await?;
129 balance_tx.commit().await?;
130 }
131 tx.commit().await?;
132 Ok(())
133 }
134
135 pub async fn events(
136 &self,
137 opts: EventSubscriberOpts,
138 ) -> Result<EventSubscriber, SqlxLedgerError> {
139 EventSubscriber::connect(&self.pool, opts).await
140 }
141}