cala_ledger/tx_template/
mod.rs

1mod entity;
2mod repo;
3
4pub mod error;
5
6use chrono::{DateTime, NaiveDate, Utc};
7use es_entity::EsEntity;
8use rust_decimal::Decimal;
9use sqlx::PgPool;
10use std::collections::HashMap;
11use tracing::instrument;
12use uuid::Uuid;
13
14pub use crate::param::*;
15use crate::{
16    entry::NewEntry,
17    ledger_operation::*,
18    outbox::*,
19    primitives::{DataSource, *},
20    transaction::NewTransaction,
21};
22
23pub use entity::*;
24use error::*;
25pub use repo::tx_template_cursor::TxTemplatesByCodeCursor;
26use repo::*;
27
28pub(crate) struct PreparedTransaction {
29    pub transaction: NewTransaction,
30    pub entries: Vec<NewEntry>,
31}
32
33#[derive(Clone)]
34pub struct TxTemplates {
35    repo: TxTemplateRepo,
36    outbox: Outbox,
37    pool: PgPool,
38}
39
40impl TxTemplates {
41    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
42        Self {
43            repo: TxTemplateRepo::new(pool),
44            outbox,
45            pool: pool.clone(),
46        }
47    }
48
49    #[instrument(name = "cala_ledger.tx_template.create", skip(self))]
50    pub async fn create(
51        &self,
52        new_tx_template: NewTxTemplate,
53    ) -> Result<TxTemplate, TxTemplateError> {
54        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
55        let tx_template = self.create_in_op(&mut op, new_tx_template).await?;
56        op.commit().await?;
57        Ok(tx_template)
58    }
59
60    pub async fn create_in_op(
61        &self,
62        db: &mut LedgerOperation<'_>,
63        new_tx_template: NewTxTemplate,
64    ) -> Result<TxTemplate, TxTemplateError> {
65        let tx_template = self.repo.create_in_op(db.op(), new_tx_template).await?;
66        db.accumulate(tx_template.last_persisted(1).map(|p| &p.event));
67        Ok(tx_template)
68    }
69
70    #[instrument(name = "cala_ledger.tx_templates.find_all", skip(self), err)]
71    pub async fn find_all<T: From<TxTemplate>>(
72        &self,
73        tx_template_ids: &[TxTemplateId],
74    ) -> Result<HashMap<TxTemplateId, T>, TxTemplateError> {
75        self.repo.find_all(tx_template_ids).await
76    }
77
78    #[instrument(name = "cala_ledger.tx_templates.list", skip(self), err)]
79    pub async fn list(
80        &self,
81        cursor: es_entity::PaginatedQueryArgs<TxTemplatesByCodeCursor>,
82        direction: es_entity::ListDirection,
83    ) -> Result<es_entity::PaginatedQueryRet<TxTemplate, TxTemplatesByCodeCursor>, TxTemplateError>
84    {
85        self.repo.list_by_code(cursor, direction).await
86    }
87
88    pub async fn find_by_code(&self, code: impl AsRef<str>) -> Result<TxTemplate, TxTemplateError> {
89        self.repo.find_by_code(code.as_ref().to_string()).await
90    }
91
92    #[instrument(
93        level = "trace",
94        name = "cala_ledger.tx_template.prepare_transaction",
95        skip(self)
96    )]
97    pub(crate) async fn prepare_transaction(
98        &self,
99        time: DateTime<Utc>,
100        tx_id: TransactionId,
101        code: &str,
102        params: Params,
103    ) -> Result<PreparedTransaction, TxTemplateError> {
104        let tmpl = self.repo.find_latest_version(code).await?;
105
106        let ctx = params.into_context(tmpl.params.as_ref())?;
107
108        let journal_id: Uuid = tmpl.transaction.journal_id.try_evaluate(&ctx)?;
109
110        let entries = self.prep_entries(&tmpl, tx_id, JournalId::from(journal_id), &ctx)?;
111
112        let mut tx_builder = NewTransaction::builder();
113        tx_builder
114            .id(tx_id)
115            .created_at(time)
116            .tx_template_id(tmpl.id)
117            .entry_ids(entries.iter().map(|e| e.id).collect());
118
119        tx_builder.journal_id(journal_id);
120
121        let effective: NaiveDate = tmpl.transaction.effective.try_evaluate(&ctx)?;
122        tx_builder.effective(effective);
123
124        if let Some(correlation_id) = tmpl.transaction.correlation_id.as_ref() {
125            let correlation_id: String = correlation_id.try_evaluate(&ctx)?;
126            tx_builder.correlation_id(correlation_id);
127        }
128
129        if let Some(external_id) = tmpl.transaction.external_id.as_ref() {
130            let external_id: String = external_id.try_evaluate(&ctx)?;
131            tx_builder.external_id(external_id);
132        }
133
134        if let Some(description) = tmpl.transaction.description.as_ref() {
135            let description: String = description.try_evaluate(&ctx)?;
136            tx_builder.description(description);
137        }
138
139        if let Some(metadata) = tmpl.transaction.metadata.as_ref() {
140            let metadata: serde_json::Value = metadata.try_evaluate(&ctx)?;
141            tx_builder.metadata(metadata);
142        }
143
144        let tx = tx_builder.build().expect("tx_build should succeed");
145
146        Ok(PreparedTransaction {
147            transaction: tx,
148            entries,
149        })
150    }
151
152    fn prep_entries(
153        &self,
154        tmpl: &TxTemplateValues,
155        transaction_id: TransactionId,
156        journal_id: JournalId,
157        ctx: &cel_interpreter::CelContext,
158    ) -> Result<Vec<NewEntry>, TxTemplateError> {
159        let mut new_entries = Vec::new();
160        let mut totals = HashMap::new();
161        for (zero_based_sequence, entry) in tmpl.entries.iter().enumerate() {
162            let mut builder = NewEntry::builder();
163            builder
164                .id(EntryId::new())
165                .transaction_id(transaction_id)
166                .journal_id(journal_id)
167                .sequence(zero_based_sequence as u32 + 1);
168            let account_id: Uuid = entry.account_id.try_evaluate(ctx)?;
169            builder.account_id(account_id);
170
171            let entry_type: String = entry.entry_type.try_evaluate(ctx)?;
172            builder.entry_type(entry_type);
173
174            let layer: Layer = entry.layer.try_evaluate(ctx)?;
175            builder.layer(layer);
176
177            let units: Decimal = entry.units.try_evaluate(ctx)?;
178            let currency: Currency = entry.currency.try_evaluate(ctx)?;
179            let direction: DebitOrCredit = entry.direction.try_evaluate(ctx)?;
180
181            let total = totals.entry((currency, layer)).or_insert(Decimal::ZERO);
182            match direction {
183                DebitOrCredit::Debit => *total -= units,
184                DebitOrCredit::Credit => *total += units,
185            };
186            builder.units(units);
187            builder.currency(currency);
188            builder.direction(direction);
189
190            if let Some(description) = entry.description.as_ref() {
191                let description: String = description.try_evaluate(ctx)?;
192                builder.description(description);
193            }
194
195            if let Some(metadata) = entry.metadata.as_ref() {
196                let metadata: serde_json::Value = metadata.try_evaluate(ctx)?;
197                builder.metadata(metadata);
198            }
199
200            new_entries.push(builder.build().expect("Couldn't build entry"));
201        }
202
203        for ((c, l), v) in totals {
204            if v != Decimal::ZERO {
205                return Err(TxTemplateError::UnbalancedTransaction(c, l, v));
206            }
207        }
208
209        Ok(new_entries)
210    }
211
212    #[cfg(feature = "import")]
213    pub async fn sync_tx_template_creation(
214        &self,
215        mut db: es_entity::DbOp<'_>,
216        origin: DataSourceId,
217        values: TxTemplateValues,
218    ) -> Result<(), TxTemplateError> {
219        let mut tx_template = TxTemplate::import(origin, values);
220        self.repo
221            .import_in_op(&mut db, origin, &mut tx_template)
222            .await?;
223        let recorded_at = db.now();
224        let outbox_events: Vec<_> = tx_template
225            .last_persisted(1)
226            .map(|p| OutboxEventPayload::from(&p.event))
227            .collect();
228        self.outbox
229            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
230            .await?;
231        Ok(())
232    }
233}
234
235impl From<&TxTemplateEvent> for OutboxEventPayload {
236    fn from(event: &TxTemplateEvent) -> Self {
237        match event {
238            #[cfg(feature = "import")]
239            TxTemplateEvent::Imported { source, values } => OutboxEventPayload::TxTemplateCreated {
240                source: *source,
241                tx_template: values.clone(),
242            },
243            TxTemplateEvent::Initialized {
244                values: tx_template,
245            } => OutboxEventPayload::TxTemplateCreated {
246                source: DataSource::Local,
247                tx_template: tx_template.clone(),
248            },
249        }
250    }
251}