cala_ledger/tx_template/
mod.rs1mod entity;
2mod repo;
3
4pub mod error;
5
6use chrono::{DateTime, NaiveDate, Utc};
7use rust_decimal::Decimal;
8use sqlx::PgPool;
9use std::collections::HashMap;
10use tracing::instrument;
11use uuid::Uuid;
12
13pub use crate::param::*;
14use crate::{
15 entry::NewEntry,
16 ledger_operation::*,
17 outbox::*,
18 primitives::{DataSource, *},
19 transaction::NewTransaction,
20};
21
22pub use entity::*;
23use error::*;
24use repo::*;
25
26pub(crate) struct PreparedTransaction {
27 pub transaction: NewTransaction,
28 pub entries: Vec<NewEntry>,
29}
30
31#[derive(Clone)]
32pub struct TxTemplates {
33 repo: TxTemplateRepo,
34 outbox: Outbox,
35 pool: PgPool,
36}
37
38impl TxTemplates {
39 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
40 Self {
41 repo: TxTemplateRepo::new(pool),
42 outbox,
43 pool: pool.clone(),
44 }
45 }
46
47 #[instrument(name = "cala_ledger.tx_template.create", skip(self))]
48 pub async fn create(
49 &self,
50 new_tx_template: NewTxTemplate,
51 ) -> Result<TxTemplate, TxTemplateError> {
52 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
53 let tx_template = self.create_in_op(&mut op, new_tx_template).await?;
54 op.commit().await?;
55 Ok(tx_template)
56 }
57
58 pub async fn create_in_op(
59 &self,
60 db: &mut LedgerOperation<'_>,
61 new_tx_template: NewTxTemplate,
62 ) -> Result<TxTemplate, TxTemplateError> {
63 let tx_template = self.repo.create_in_op(db.op(), new_tx_template).await?;
64 db.accumulate(tx_template.events.last_persisted(1).map(|p| &p.event));
65 Ok(tx_template)
66 }
67
68 #[instrument(name = "cala_ledger.tx_templates.find_all", skip(self), err)]
69 pub async fn find_all<T: From<TxTemplate>>(
70 &self,
71 tx_template_ids: &[TxTemplateId],
72 ) -> Result<HashMap<TxTemplateId, T>, TxTemplateError> {
73 self.repo.find_all(tx_template_ids).await
74 }
75
76 pub async fn find_by_code(&self, code: impl AsRef<str>) -> Result<TxTemplate, TxTemplateError> {
77 self.repo.find_by_code(code.as_ref().to_string()).await
78 }
79
80 #[instrument(
81 level = "trace",
82 name = "cala_ledger.tx_template.prepare_transaction",
83 skip(self)
84 )]
85 pub(crate) async fn prepare_transaction(
86 &self,
87 time: DateTime<Utc>,
88 tx_id: TransactionId,
89 code: &str,
90 params: Params,
91 ) -> Result<PreparedTransaction, TxTemplateError> {
92 let tmpl = self.repo.find_latest_version(code).await?;
93
94 let ctx = params.into_context(tmpl.params.as_ref())?;
95
96 let journal_id: Uuid = tmpl.transaction.journal_id.try_evaluate(&ctx)?;
97
98 let entries = self.prep_entries(&tmpl, tx_id, JournalId::from(journal_id), &ctx)?;
99
100 let mut tx_builder = NewTransaction::builder();
101 tx_builder
102 .id(tx_id)
103 .created_at(time)
104 .tx_template_id(tmpl.id)
105 .entry_ids(entries.iter().map(|e| e.id).collect());
106
107 tx_builder.journal_id(journal_id);
108
109 let effective: NaiveDate = tmpl.transaction.effective.try_evaluate(&ctx)?;
110 tx_builder.effective(effective);
111
112 if let Some(correlation_id) = tmpl.transaction.correlation_id.as_ref() {
113 let correlation_id: String = correlation_id.try_evaluate(&ctx)?;
114 tx_builder.correlation_id(correlation_id);
115 }
116
117 if let Some(external_id) = tmpl.transaction.external_id.as_ref() {
118 let external_id: String = external_id.try_evaluate(&ctx)?;
119 tx_builder.external_id(external_id);
120 }
121
122 if let Some(description) = tmpl.transaction.description.as_ref() {
123 let description: String = description.try_evaluate(&ctx)?;
124 tx_builder.description(description);
125 }
126
127 if let Some(metadata) = tmpl.transaction.metadata.as_ref() {
128 let metadata: serde_json::Value = metadata.try_evaluate(&ctx)?;
129 tx_builder.metadata(metadata);
130 }
131
132 let tx = tx_builder.build().expect("tx_build should succeed");
133
134 Ok(PreparedTransaction {
135 transaction: tx,
136 entries,
137 })
138 }
139
140 fn prep_entries(
141 &self,
142 tmpl: &TxTemplateValues,
143 transaction_id: TransactionId,
144 journal_id: JournalId,
145 ctx: &cel_interpreter::CelContext,
146 ) -> Result<Vec<NewEntry>, TxTemplateError> {
147 let mut new_entries = Vec::new();
148 let mut totals = HashMap::new();
149 for (zero_based_sequence, entry) in tmpl.entries.iter().enumerate() {
150 let mut builder = NewEntry::builder();
151 builder
152 .id(EntryId::new())
153 .transaction_id(transaction_id)
154 .journal_id(journal_id)
155 .sequence(zero_based_sequence as u32 + 1);
156 let account_id: Uuid = entry.account_id.try_evaluate(ctx)?;
157 builder.account_id(account_id);
158
159 let entry_type: String = entry.entry_type.try_evaluate(ctx)?;
160 builder.entry_type(entry_type);
161
162 let layer: Layer = entry.layer.try_evaluate(ctx)?;
163 builder.layer(layer);
164
165 let units: Decimal = entry.units.try_evaluate(ctx)?;
166 let currency: Currency = entry.currency.try_evaluate(ctx)?;
167 let direction: DebitOrCredit = entry.direction.try_evaluate(ctx)?;
168
169 let total = totals.entry((currency, layer)).or_insert(Decimal::ZERO);
170 match direction {
171 DebitOrCredit::Debit => *total -= units,
172 DebitOrCredit::Credit => *total += units,
173 };
174 builder.units(units);
175 builder.currency(currency);
176 builder.direction(direction);
177
178 if let Some(description) = entry.description.as_ref() {
179 let description: String = description.try_evaluate(ctx)?;
180 builder.description(description);
181 }
182
183 if let Some(metadata) = entry.metadata.as_ref() {
184 let metadata: serde_json::Value = metadata.try_evaluate(ctx)?;
185 builder.metadata(metadata);
186 }
187
188 new_entries.push(builder.build().expect("Couldn't build entry"));
189 }
190
191 for ((c, l), v) in totals {
192 if v != Decimal::ZERO {
193 return Err(TxTemplateError::UnbalancedTransaction(c, l, v));
194 }
195 }
196
197 Ok(new_entries)
198 }
199
200 #[cfg(feature = "import")]
201 pub async fn sync_tx_template_creation(
202 &self,
203 mut db: es_entity::DbOp<'_>,
204 origin: DataSourceId,
205 values: TxTemplateValues,
206 ) -> Result<(), TxTemplateError> {
207 let mut tx_template = TxTemplate::import(origin, values);
208 self.repo
209 .import_in_op(&mut db, origin, &mut tx_template)
210 .await?;
211 let recorded_at = db.now();
212 let outbox_events: Vec<_> = tx_template
213 .events
214 .last_persisted(1)
215 .map(|p| OutboxEventPayload::from(&p.event))
216 .collect();
217 self.outbox
218 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
219 .await?;
220 Ok(())
221 }
222}
223
224impl From<&TxTemplateEvent> for OutboxEventPayload {
225 fn from(event: &TxTemplateEvent) -> Self {
226 match event {
227 #[cfg(feature = "import")]
228 TxTemplateEvent::Imported { source, values } => OutboxEventPayload::TxTemplateCreated {
229 source: *source,
230 tx_template: values.clone(),
231 },
232 TxTemplateEvent::Initialized {
233 values: tx_template,
234 } => OutboxEventPayload::TxTemplateCreated {
235 source: DataSource::Local,
236 tx_template: tx_template.clone(),
237 },
238 }
239 }
240}