cala_ledger/tx_template/
mod.rs

1mod entity;
2mod repo;
3
4pub mod error;
5
6use chrono::{DateTime, NaiveDate, Utc};
7use rust_decimal::Decimal;
8use sqlx::PgPool;
9use std::collections::HashMap;
10use tracing::instrument;
11use uuid::Uuid;
12
13pub use crate::param::*;
14use crate::{
15    entry::NewEntry,
16    ledger_operation::*,
17    outbox::*,
18    primitives::{DataSource, *},
19    transaction::NewTransaction,
20};
21
22pub use entity::*;
23use error::*;
24use repo::*;
25
26pub(crate) struct PreparedTransaction {
27    pub transaction: NewTransaction,
28    pub entries: Vec<NewEntry>,
29}
30
31#[derive(Clone)]
32pub struct TxTemplates {
33    repo: TxTemplateRepo,
34    outbox: Outbox,
35    pool: PgPool,
36}
37
38impl TxTemplates {
39    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
40        Self {
41            repo: TxTemplateRepo::new(pool),
42            outbox,
43            pool: pool.clone(),
44        }
45    }
46
47    #[instrument(name = "cala_ledger.tx_template.create", skip(self))]
48    pub async fn create(
49        &self,
50        new_tx_template: NewTxTemplate,
51    ) -> Result<TxTemplate, TxTemplateError> {
52        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
53        let tx_template = self.create_in_op(&mut op, new_tx_template).await?;
54        op.commit().await?;
55        Ok(tx_template)
56    }
57
58    pub async fn create_in_op(
59        &self,
60        db: &mut LedgerOperation<'_>,
61        new_tx_template: NewTxTemplate,
62    ) -> Result<TxTemplate, TxTemplateError> {
63        let tx_template = self.repo.create_in_op(db.op(), new_tx_template).await?;
64        db.accumulate(tx_template.events.last_persisted(1).map(|p| &p.event));
65        Ok(tx_template)
66    }
67
68    #[instrument(name = "cala_ledger.tx_templates.find_all", skip(self), err)]
69    pub async fn find_all<T: From<TxTemplate>>(
70        &self,
71        tx_template_ids: &[TxTemplateId],
72    ) -> Result<HashMap<TxTemplateId, T>, TxTemplateError> {
73        self.repo.find_all(tx_template_ids).await
74    }
75
76    pub async fn find_by_code(&self, code: impl AsRef<str>) -> Result<TxTemplate, TxTemplateError> {
77        self.repo.find_by_code(code.as_ref().to_string()).await
78    }
79
80    #[instrument(
81        level = "trace",
82        name = "cala_ledger.tx_template.prepare_transaction",
83        skip(self)
84    )]
85    pub(crate) async fn prepare_transaction(
86        &self,
87        time: DateTime<Utc>,
88        tx_id: TransactionId,
89        code: &str,
90        params: Params,
91    ) -> Result<PreparedTransaction, TxTemplateError> {
92        let tmpl = self.repo.find_latest_version(code).await?;
93
94        let ctx = params.into_context(tmpl.params.as_ref())?;
95
96        let journal_id: Uuid = tmpl.transaction.journal_id.try_evaluate(&ctx)?;
97
98        let entries = self.prep_entries(&tmpl, tx_id, JournalId::from(journal_id), &ctx)?;
99
100        let mut tx_builder = NewTransaction::builder();
101        tx_builder
102            .id(tx_id)
103            .created_at(time)
104            .tx_template_id(tmpl.id)
105            .entry_ids(entries.iter().map(|e| e.id).collect());
106
107        tx_builder.journal_id(journal_id);
108
109        let effective: NaiveDate = tmpl.transaction.effective.try_evaluate(&ctx)?;
110        tx_builder.effective(effective);
111
112        if let Some(correlation_id) = tmpl.transaction.correlation_id.as_ref() {
113            let correlation_id: String = correlation_id.try_evaluate(&ctx)?;
114            tx_builder.correlation_id(correlation_id);
115        }
116
117        if let Some(external_id) = tmpl.transaction.external_id.as_ref() {
118            let external_id: String = external_id.try_evaluate(&ctx)?;
119            tx_builder.external_id(external_id);
120        }
121
122        if let Some(description) = tmpl.transaction.description.as_ref() {
123            let description: String = description.try_evaluate(&ctx)?;
124            tx_builder.description(description);
125        }
126
127        if let Some(metadata) = tmpl.transaction.metadata.as_ref() {
128            let metadata: serde_json::Value = metadata.try_evaluate(&ctx)?;
129            tx_builder.metadata(metadata);
130        }
131
132        let tx = tx_builder.build().expect("tx_build should succeed");
133
134        Ok(PreparedTransaction {
135            transaction: tx,
136            entries,
137        })
138    }
139
140    fn prep_entries(
141        &self,
142        tmpl: &TxTemplateValues,
143        transaction_id: TransactionId,
144        journal_id: JournalId,
145        ctx: &cel_interpreter::CelContext,
146    ) -> Result<Vec<NewEntry>, TxTemplateError> {
147        let mut new_entries = Vec::new();
148        let mut totals = HashMap::new();
149        for (zero_based_sequence, entry) in tmpl.entries.iter().enumerate() {
150            let mut builder = NewEntry::builder();
151            builder
152                .id(EntryId::new())
153                .transaction_id(transaction_id)
154                .journal_id(journal_id)
155                .sequence(zero_based_sequence as u32 + 1);
156            let account_id: Uuid = entry.account_id.try_evaluate(ctx)?;
157            builder.account_id(account_id);
158
159            let entry_type: String = entry.entry_type.try_evaluate(ctx)?;
160            builder.entry_type(entry_type);
161
162            let layer: Layer = entry.layer.try_evaluate(ctx)?;
163            builder.layer(layer);
164
165            let units: Decimal = entry.units.try_evaluate(ctx)?;
166            let currency: Currency = entry.currency.try_evaluate(ctx)?;
167            let direction: DebitOrCredit = entry.direction.try_evaluate(ctx)?;
168
169            let total = totals.entry((currency, layer)).or_insert(Decimal::ZERO);
170            match direction {
171                DebitOrCredit::Debit => *total -= units,
172                DebitOrCredit::Credit => *total += units,
173            };
174            builder.units(units);
175            builder.currency(currency);
176            builder.direction(direction);
177
178            if let Some(description) = entry.description.as_ref() {
179                let description: String = description.try_evaluate(ctx)?;
180                builder.description(description);
181            }
182
183            new_entries.push(builder.build().expect("Couldn't build entry"));
184        }
185
186        for ((c, l), v) in totals {
187            if v != Decimal::ZERO {
188                return Err(TxTemplateError::UnbalancedTransaction(c, l, v));
189            }
190        }
191
192        Ok(new_entries)
193    }
194
195    #[cfg(feature = "import")]
196    pub async fn sync_tx_template_creation(
197        &self,
198        mut db: es_entity::DbOp<'_>,
199        origin: DataSourceId,
200        values: TxTemplateValues,
201    ) -> Result<(), TxTemplateError> {
202        let mut tx_template = TxTemplate::import(origin, values);
203        self.repo
204            .import_in_op(&mut db, origin, &mut tx_template)
205            .await?;
206        let recorded_at = db.now();
207        let outbox_events: Vec<_> = tx_template
208            .events
209            .last_persisted(1)
210            .map(|p| OutboxEventPayload::from(&p.event))
211            .collect();
212        self.outbox
213            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
214            .await?;
215        Ok(())
216    }
217}
218
219impl From<&TxTemplateEvent> for OutboxEventPayload {
220    fn from(event: &TxTemplateEvent) -> Self {
221        match event {
222            #[cfg(feature = "import")]
223            TxTemplateEvent::Imported { source, values } => OutboxEventPayload::TxTemplateCreated {
224                source: *source,
225                tx_template: values.clone(),
226            },
227            TxTemplateEvent::Initialized {
228                values: tx_template,
229            } => OutboxEventPayload::TxTemplateCreated {
230                source: DataSource::Local,
231                tx_template: tx_template.clone(),
232            },
233        }
234    }
235}