cala_ledger/tx_template/
mod.rs

1mod entity;
2mod repo;
3
4pub mod error;
5
6use chrono::{DateTime, NaiveDate, Utc};
7use rust_decimal::Decimal;
8use sqlx::PgPool;
9use std::collections::HashMap;
10use tracing::instrument;
11use uuid::Uuid;
12
13pub use crate::param::*;
14use crate::{
15    entry::NewEntry,
16    ledger_operation::*,
17    outbox::*,
18    primitives::{DataSource, *},
19    transaction::NewTransaction,
20};
21
22pub use entity::*;
23use error::*;
24pub use repo::tx_template_cursor::TxTemplatesByCodeCursor;
25use repo::*;
26
27pub(crate) struct PreparedTransaction {
28    pub transaction: NewTransaction,
29    pub entries: Vec<NewEntry>,
30}
31
32#[derive(Clone)]
33pub struct TxTemplates {
34    repo: TxTemplateRepo,
35    outbox: Outbox,
36    pool: PgPool,
37}
38
39impl TxTemplates {
40    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
41        Self {
42            repo: TxTemplateRepo::new(pool),
43            outbox,
44            pool: pool.clone(),
45        }
46    }
47
48    #[instrument(name = "cala_ledger.tx_template.create", skip(self))]
49    pub async fn create(
50        &self,
51        new_tx_template: NewTxTemplate,
52    ) -> Result<TxTemplate, TxTemplateError> {
53        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
54        let tx_template = self.create_in_op(&mut op, new_tx_template).await?;
55        op.commit().await?;
56        Ok(tx_template)
57    }
58
59    pub async fn create_in_op(
60        &self,
61        db: &mut LedgerOperation<'_>,
62        new_tx_template: NewTxTemplate,
63    ) -> Result<TxTemplate, TxTemplateError> {
64        let tx_template = self.repo.create_in_op(db.op(), new_tx_template).await?;
65        db.accumulate(tx_template.events.last_persisted(1).map(|p| &p.event));
66        Ok(tx_template)
67    }
68
69    #[instrument(name = "cala_ledger.tx_templates.find_all", skip(self), err)]
70    pub async fn find_all<T: From<TxTemplate>>(
71        &self,
72        tx_template_ids: &[TxTemplateId],
73    ) -> Result<HashMap<TxTemplateId, T>, TxTemplateError> {
74        self.repo.find_all(tx_template_ids).await
75    }
76
77    #[instrument(name = "cala_ledger.tx_templates.list", skip(self), err)]
78    pub async fn list(
79        &self,
80        cursor: es_entity::PaginatedQueryArgs<TxTemplatesByCodeCursor>,
81        direction: es_entity::ListDirection,
82    ) -> Result<es_entity::PaginatedQueryRet<TxTemplate, TxTemplatesByCodeCursor>, TxTemplateError>
83    {
84        self.repo.list_by_code(cursor, direction).await
85    }
86
87    pub async fn find_by_code(&self, code: impl AsRef<str>) -> Result<TxTemplate, TxTemplateError> {
88        self.repo.find_by_code(code.as_ref().to_string()).await
89    }
90
91    #[instrument(
92        level = "trace",
93        name = "cala_ledger.tx_template.prepare_transaction",
94        skip(self)
95    )]
96    pub(crate) async fn prepare_transaction(
97        &self,
98        time: DateTime<Utc>,
99        tx_id: TransactionId,
100        code: &str,
101        params: Params,
102    ) -> Result<PreparedTransaction, TxTemplateError> {
103        let tmpl = self.repo.find_latest_version(code).await?;
104
105        let ctx = params.into_context(tmpl.params.as_ref())?;
106
107        let journal_id: Uuid = tmpl.transaction.journal_id.try_evaluate(&ctx)?;
108
109        let entries = self.prep_entries(&tmpl, tx_id, JournalId::from(journal_id), &ctx)?;
110
111        let mut tx_builder = NewTransaction::builder();
112        tx_builder
113            .id(tx_id)
114            .created_at(time)
115            .tx_template_id(tmpl.id)
116            .entry_ids(entries.iter().map(|e| e.id).collect());
117
118        tx_builder.journal_id(journal_id);
119
120        let effective: NaiveDate = tmpl.transaction.effective.try_evaluate(&ctx)?;
121        tx_builder.effective(effective);
122
123        if let Some(correlation_id) = tmpl.transaction.correlation_id.as_ref() {
124            let correlation_id: String = correlation_id.try_evaluate(&ctx)?;
125            tx_builder.correlation_id(correlation_id);
126        }
127
128        if let Some(external_id) = tmpl.transaction.external_id.as_ref() {
129            let external_id: String = external_id.try_evaluate(&ctx)?;
130            tx_builder.external_id(external_id);
131        }
132
133        if let Some(description) = tmpl.transaction.description.as_ref() {
134            let description: String = description.try_evaluate(&ctx)?;
135            tx_builder.description(description);
136        }
137
138        if let Some(metadata) = tmpl.transaction.metadata.as_ref() {
139            let metadata: serde_json::Value = metadata.try_evaluate(&ctx)?;
140            tx_builder.metadata(metadata);
141        }
142
143        let tx = tx_builder.build().expect("tx_build should succeed");
144
145        Ok(PreparedTransaction {
146            transaction: tx,
147            entries,
148        })
149    }
150
151    fn prep_entries(
152        &self,
153        tmpl: &TxTemplateValues,
154        transaction_id: TransactionId,
155        journal_id: JournalId,
156        ctx: &cel_interpreter::CelContext,
157    ) -> Result<Vec<NewEntry>, TxTemplateError> {
158        let mut new_entries = Vec::new();
159        let mut totals = HashMap::new();
160        for (zero_based_sequence, entry) in tmpl.entries.iter().enumerate() {
161            let mut builder = NewEntry::builder();
162            builder
163                .id(EntryId::new())
164                .transaction_id(transaction_id)
165                .journal_id(journal_id)
166                .sequence(zero_based_sequence as u32 + 1);
167            let account_id: Uuid = entry.account_id.try_evaluate(ctx)?;
168            builder.account_id(account_id);
169
170            let entry_type: String = entry.entry_type.try_evaluate(ctx)?;
171            builder.entry_type(entry_type);
172
173            let layer: Layer = entry.layer.try_evaluate(ctx)?;
174            builder.layer(layer);
175
176            let units: Decimal = entry.units.try_evaluate(ctx)?;
177            let currency: Currency = entry.currency.try_evaluate(ctx)?;
178            let direction: DebitOrCredit = entry.direction.try_evaluate(ctx)?;
179
180            let total = totals.entry((currency, layer)).or_insert(Decimal::ZERO);
181            match direction {
182                DebitOrCredit::Debit => *total -= units,
183                DebitOrCredit::Credit => *total += units,
184            };
185            builder.units(units);
186            builder.currency(currency);
187            builder.direction(direction);
188
189            if let Some(description) = entry.description.as_ref() {
190                let description: String = description.try_evaluate(ctx)?;
191                builder.description(description);
192            }
193
194            if let Some(metadata) = entry.metadata.as_ref() {
195                let metadata: serde_json::Value = metadata.try_evaluate(ctx)?;
196                builder.metadata(metadata);
197            }
198
199            new_entries.push(builder.build().expect("Couldn't build entry"));
200        }
201
202        for ((c, l), v) in totals {
203            if v != Decimal::ZERO {
204                return Err(TxTemplateError::UnbalancedTransaction(c, l, v));
205            }
206        }
207
208        Ok(new_entries)
209    }
210
211    #[cfg(feature = "import")]
212    pub async fn sync_tx_template_creation(
213        &self,
214        mut db: es_entity::DbOp<'_>,
215        origin: DataSourceId,
216        values: TxTemplateValues,
217    ) -> Result<(), TxTemplateError> {
218        let mut tx_template = TxTemplate::import(origin, values);
219        self.repo
220            .import_in_op(&mut db, origin, &mut tx_template)
221            .await?;
222        let recorded_at = db.now();
223        let outbox_events: Vec<_> = tx_template
224            .events
225            .last_persisted(1)
226            .map(|p| OutboxEventPayload::from(&p.event))
227            .collect();
228        self.outbox
229            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
230            .await?;
231        Ok(())
232    }
233}
234
235impl From<&TxTemplateEvent> for OutboxEventPayload {
236    fn from(event: &TxTemplateEvent) -> Self {
237        match event {
238            #[cfg(feature = "import")]
239            TxTemplateEvent::Imported { source, values } => OutboxEventPayload::TxTemplateCreated {
240                source: *source,
241                tx_template: values.clone(),
242            },
243            TxTemplateEvent::Initialized {
244                values: tx_template,
245            } => OutboxEventPayload::TxTemplateCreated {
246                source: DataSource::Local,
247                tx_template: tx_template.clone(),
248            },
249        }
250    }
251}