1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use sqlx::{Acquire, PgPool, Postgres, Transaction};
use tracing::instrument;

use std::collections::HashMap;

use crate::{
    account::Accounts, balance::*, entry::*, error::*, journal::*, primitives::*, transaction::*,
    tx_template::*,
};

#[derive(Debug, Clone)]
pub struct SqlxLedger {
    pool: PgPool,
    accounts: Accounts,
    journals: Journals,
    tx_templates: TxTemplates,
    transactions: Transactions,
    entries: Entries,
    balances: Balances,
}

impl SqlxLedger {
    pub fn new(pool: &PgPool) -> Self {
        Self {
            accounts: Accounts::new(pool),
            journals: Journals::new(pool),
            tx_templates: TxTemplates::new(pool),
            transactions: Transactions::new(pool),
            entries: Entries::new(pool),
            balances: Balances::new(pool),
            pool: pool.clone(),
        }
    }

    pub fn accounts(&self) -> &Accounts {
        &self.accounts
    }

    pub fn journals(&self) -> &Journals {
        &self.journals
    }

    pub fn tx_templates(&self) -> &TxTemplates {
        &self.tx_templates
    }

    pub fn entries(&self) -> &Entries {
        &self.entries
    }

    pub fn balances(&self) -> &Balances {
        &self.balances
    }

    pub async fn post_transaction(
        &self,
        tx_template_code: &str,
        params: Option<impl Into<TxParams> + std::fmt::Debug>,
    ) -> Result<(), SqlxLedgerError> {
        let tx = self.pool.begin().await?;
        self.post_transaction_in_tx(tx, tx_template_code, params)
            .await?;
        Ok(())
    }

    #[instrument(name = "sqlx_ledger.ledger.post_transaction", skip(self, tx))]
    pub async fn post_transaction_in_tx(
        &self,
        mut tx: Transaction<'_, Postgres>,
        tx_template_code: &str,
        params: Option<impl Into<TxParams> + std::fmt::Debug>,
    ) -> Result<(), SqlxLedgerError> {
        let (new_tx, new_entries) = {
            let tx_template = self.tx_templates.find_core(tx_template_code).await?;
            // tx_template is not Send (Rc<String> nested in CelExpression)
            // so we need to drop it before the next await
            tx_template.prep_tx(params.map(|p| p.into()).unwrap_or_else(TxParams::new))?
        };
        let (journal_id, tx_id) = self.transactions.create_in_tx(&mut tx, new_tx).await?;
        let entries = self
            .entries
            .create_all(journal_id, tx_id, new_entries, &mut tx)
            .await?;
        {
            let ids: Vec<(AccountId, &Currency)> = entries
                .iter()
                .map(|entry| (entry.account_id, &entry.currency))
                .collect();
            let mut balance_tx = tx.begin().await?;

            let mut balances = self
                .balances
                .find_for_update(journal_id, ids.clone(), &mut balance_tx)
                .await?;
            let mut latest_balances: HashMap<AccountId, BalanceDetails> = HashMap::new();
            let mut new_balances = Vec::new();
            for entry in entries.iter() {
                let balance = match (
                    latest_balances.remove(&entry.account_id),
                    balances.remove(&entry.account_id),
                ) {
                    (Some(latest), _) => {
                        new_balances.push(latest.clone());
                        latest
                    }
                    (_, Some(balance)) => balance,
                    _ => {
                        latest_balances
                            .insert(entry.account_id, BalanceDetails::init(journal_id, entry));
                        continue;
                    }
                };
                latest_balances.insert(entry.account_id, balance.update(entry));
            }
            new_balances.extend(latest_balances.into_values());

            self.balances
                .update_balances(journal_id, new_balances, &mut balance_tx)
                .await?;
            balance_tx.commit().await?;
        }
        tx.commit().await?;
        Ok(())
    }
}