1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use sqlx::{Acquire, PgPool, Postgres, Transaction};
use tracing::instrument;
use std::collections::HashMap;
use crate::{
account::Accounts, balance::*, entry::*, error::*, journal::*, primitives::*, transaction::*,
tx_template::*,
};
#[derive(Debug, Clone)]
pub struct SqlxLedger {
pool: PgPool,
accounts: Accounts,
journals: Journals,
tx_templates: TxTemplates,
transactions: Transactions,
entries: Entries,
balances: Balances,
}
impl SqlxLedger {
pub fn new(pool: &PgPool) -> Self {
Self {
accounts: Accounts::new(pool),
journals: Journals::new(pool),
tx_templates: TxTemplates::new(pool),
transactions: Transactions::new(pool),
entries: Entries::new(pool),
balances: Balances::new(pool),
pool: pool.clone(),
}
}
pub fn accounts(&self) -> &Accounts {
&self.accounts
}
pub fn journals(&self) -> &Journals {
&self.journals
}
pub fn tx_templates(&self) -> &TxTemplates {
&self.tx_templates
}
pub fn entries(&self) -> &Entries {
&self.entries
}
pub fn balances(&self) -> &Balances {
&self.balances
}
pub async fn post_transaction(
&self,
tx_template_code: &str,
params: Option<impl Into<TxParams> + std::fmt::Debug>,
) -> Result<(), SqlxLedgerError> {
let tx = self.pool.begin().await?;
self.post_transaction_in_tx(tx, tx_template_code, params)
.await?;
Ok(())
}
#[instrument(name = "sqlx_ledger.ledger.post_transaction", skip(self, tx))]
pub async fn post_transaction_in_tx(
&self,
mut tx: Transaction<'_, Postgres>,
tx_template_code: &str,
params: Option<impl Into<TxParams> + std::fmt::Debug>,
) -> Result<(), SqlxLedgerError> {
let (new_tx, new_entries) = {
let tx_template = self.tx_templates.find_core(tx_template_code).await?;
tx_template.prep_tx(params.map(|p| p.into()).unwrap_or_else(TxParams::new))?
};
let (journal_id, tx_id) = self.transactions.create_in_tx(&mut tx, new_tx).await?;
let entries = self
.entries
.create_all(journal_id, tx_id, new_entries, &mut tx)
.await?;
{
let ids: Vec<(AccountId, &Currency)> = entries
.iter()
.map(|entry| (entry.account_id, &entry.currency))
.collect();
let mut balance_tx = tx.begin().await?;
let mut balances = self
.balances
.find_for_update(journal_id, ids.clone(), &mut balance_tx)
.await?;
let mut latest_balances: HashMap<AccountId, BalanceDetails> = HashMap::new();
let mut new_balances = Vec::new();
for entry in entries.iter() {
let balance = match (
latest_balances.remove(&entry.account_id),
balances.remove(&entry.account_id),
) {
(Some(latest), _) => {
new_balances.push(latest.clone());
latest
}
(_, Some(balance)) => balance,
_ => {
latest_balances
.insert(entry.account_id, BalanceDetails::init(journal_id, entry));
continue;
}
};
latest_balances.insert(entry.account_id, balance.update(entry));
}
new_balances.extend(latest_balances.into_values());
self.balances
.update_balances(journal_id, new_balances, &mut balance_tx)
.await?;
balance_tx.commit().await?;
}
tx.commit().await?;
Ok(())
}
}