sqlx_ledger/ledger/
mod.rs

1use sqlx::{Acquire, PgPool, Postgres, Transaction};
2use tracing::instrument;
3
4use std::collections::HashMap;
5
6use crate::{
7    account::Accounts, balance::*, entry::*, error::*, event::*, journal::*, primitives::*,
8    transaction::*, tx_template::*,
9};
10
11#[derive(Debug, Clone)]
12pub struct SqlxLedger {
13    pool: PgPool,
14    accounts: Accounts,
15    journals: Journals,
16    tx_templates: TxTemplates,
17    transactions: Transactions,
18    entries: Entries,
19    balances: Balances,
20}
21
22impl SqlxLedger {
23    pub fn new(pool: &PgPool) -> Self {
24        Self {
25            accounts: Accounts::new(pool),
26            journals: Journals::new(pool),
27            tx_templates: TxTemplates::new(pool),
28            transactions: Transactions::new(pool),
29            entries: Entries::new(pool),
30            balances: Balances::new(pool),
31            pool: pool.clone(),
32        }
33    }
34
35    pub fn accounts(&self) -> &Accounts {
36        &self.accounts
37    }
38
39    pub fn journals(&self) -> &Journals {
40        &self.journals
41    }
42
43    pub fn tx_templates(&self) -> &TxTemplates {
44        &self.tx_templates
45    }
46
47    pub fn entries(&self) -> &Entries {
48        &self.entries
49    }
50
51    pub fn balances(&self) -> &Balances {
52        &self.balances
53    }
54
55    pub fn transactions(&self) -> &Transactions {
56        &self.transactions
57    }
58
59    pub async fn post_transaction(
60        &self,
61        tx_id: TransactionId,
62        tx_template_code: &str,
63        params: Option<impl Into<TxParams> + std::fmt::Debug>,
64    ) -> Result<(), SqlxLedgerError> {
65        let tx = self.pool.begin().await?;
66        self.post_transaction_in_tx(tx, tx_id, tx_template_code, params)
67            .await?;
68        Ok(())
69    }
70
71    #[instrument(name = "sqlx_ledger.ledger.post_transaction", skip(self, tx))]
72    pub async fn post_transaction_in_tx(
73        &self,
74        mut tx: Transaction<'_, Postgres>,
75        tx_id: TransactionId,
76        tx_template_code: &str,
77        params: Option<impl Into<TxParams> + std::fmt::Debug>,
78    ) -> Result<(), SqlxLedgerError> {
79        let tx_template = self.tx_templates.find_core(tx_template_code).await?;
80        let (new_tx, new_entries) =
81            tx_template.prep_tx(params.map(|p| p.into()).unwrap_or_default())?;
82        let (journal_id, tx_id) = self
83            .transactions
84            .create_in_tx(&mut tx, tx_id, new_tx)
85            .await?;
86        let entries = self
87            .entries
88            .create_all(journal_id, tx_id, new_entries, &mut tx)
89            .await?;
90        {
91            let ids: Vec<(AccountId, &Currency)> = entries
92                .iter()
93                .map(|entry| (entry.account_id, &entry.currency))
94                .collect();
95            let mut balance_tx = tx.begin().await?;
96
97            let mut balances = self
98                .balances
99                .find_for_update(journal_id, ids.clone(), &mut balance_tx)
100                .await?;
101            let mut latest_balances: HashMap<(AccountId, &Currency), BalanceDetails> =
102                HashMap::new();
103            let mut new_balances = Vec::new();
104            for entry in entries.iter() {
105                let balance = match (
106                    latest_balances.remove(&(entry.account_id, &entry.currency)),
107                    balances.remove(&(entry.account_id, entry.currency)),
108                ) {
109                    (Some(latest), _) => {
110                        new_balances.push(latest.clone());
111                        latest
112                    }
113                    (_, Some(balance)) => balance,
114                    _ => {
115                        latest_balances.insert(
116                            (entry.account_id, &entry.currency),
117                            BalanceDetails::init(journal_id, entry),
118                        );
119                        continue;
120                    }
121                };
122                latest_balances.insert((entry.account_id, &entry.currency), balance.update(entry));
123            }
124            new_balances.extend(latest_balances.into_values());
125
126            self.balances
127                .update_balances(journal_id, new_balances, &mut balance_tx)
128                .await?;
129            balance_tx.commit().await?;
130        }
131        tx.commit().await?;
132        Ok(())
133    }
134
135    pub async fn events(
136        &self,
137        opts: EventSubscriberOpts,
138    ) -> Result<EventSubscriber, SqlxLedgerError> {
139        EventSubscriber::connect(&self.pool, opts).await
140    }
141}