cala_ledger/tx_template/
mod.rs

1mod entity;
2mod repo;
3
4pub mod error;
5
6use chrono::NaiveDate;
7use rust_decimal::Decimal;
8use sqlx::PgPool;
9use std::collections::HashMap;
10use tracing::instrument;
11use uuid::Uuid;
12
13pub use crate::param::*;
14use crate::{
15    entry::NewEntry,
16    outbox::*,
17    primitives::{DataSource, *},
18    transaction::NewTransaction,
19};
20
21pub use entity::*;
22use error::*;
23pub use repo::tx_template_cursor::TxTemplatesByCodeCursor;
24use repo::*;
25
26pub(crate) struct PreparedTransaction {
27    pub transaction: NewTransaction,
28    pub entries: Vec<NewEntry>,
29}
30
31#[derive(Clone)]
32pub struct TxTemplates {
33    repo: TxTemplateRepo,
34}
35
36impl TxTemplates {
37    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
38        Self {
39            repo: TxTemplateRepo::new(pool, publisher),
40        }
41    }
42
43    #[instrument(name = "cala_ledger.tx_template.create", skip(self))]
44    pub async fn create(
45        &self,
46        new_tx_template: NewTxTemplate,
47    ) -> Result<TxTemplate, TxTemplateError> {
48        let mut op = self.repo.begin_op().await?;
49        let tx_template = self.create_in_op(&mut op, new_tx_template).await?;
50        op.commit().await?;
51        Ok(tx_template)
52    }
53
54    #[instrument(name = "cala_ledger.tx_template.create_in_op", skip(self, db), err)]
55    pub async fn create_in_op(
56        &self,
57        db: &mut impl es_entity::AtomicOperation,
58        new_tx_template: NewTxTemplate,
59    ) -> Result<TxTemplate, TxTemplateError> {
60        let tx_template = self.repo.create_in_op(db, new_tx_template).await?;
61        Ok(tx_template)
62    }
63
64    #[instrument(name = "cala_ledger.tx_templates.find_all", skip(self))]
65    pub async fn find_all<T: From<TxTemplate>>(
66        &self,
67        tx_template_ids: &[TxTemplateId],
68    ) -> Result<HashMap<TxTemplateId, T>, TxTemplateError> {
69        self.repo.find_all(tx_template_ids).await
70    }
71
72    #[instrument(name = "cala_ledger.tx_templates.list", skip(self))]
73    pub async fn list(
74        &self,
75        cursor: es_entity::PaginatedQueryArgs<TxTemplatesByCodeCursor>,
76        direction: es_entity::ListDirection,
77    ) -> Result<es_entity::PaginatedQueryRet<TxTemplate, TxTemplatesByCodeCursor>, TxTemplateError>
78    {
79        self.repo.list_by_code(cursor, direction).await
80    }
81
82    #[instrument(name = "cala_ledger.tx_templates.find_by_code", skip(self), fields(code = %code.as_ref()), err)]
83    pub async fn find_by_code(&self, code: impl AsRef<str>) -> Result<TxTemplate, TxTemplateError> {
84        self.repo.find_by_code(code.as_ref().to_string()).await
85    }
86
87    #[instrument(
88        name = "cala_ledger.tx_template.prepare_transaction_in_op",
89        skip(self, db)
90    )]
91    pub(crate) async fn prepare_transaction_in_op(
92        &self,
93        db: &mut impl es_entity::AtomicOperation,
94        time: chrono::DateTime<chrono::Utc>,
95        tx_id: TransactionId,
96        code: &str,
97        params: Params,
98    ) -> Result<PreparedTransaction, TxTemplateError> {
99        let tmpl = self.repo.find_latest_version_in_op(db, code).await?;
100
101        let ctx = params.into_context(tmpl.params.as_ref())?;
102
103        let journal_id: Uuid = tmpl.transaction.journal_id.try_evaluate(&ctx)?;
104
105        let entries = self.prep_entries(&tmpl, tx_id, JournalId::from(journal_id), &ctx)?;
106
107        let mut tx_builder = NewTransaction::builder();
108        tx_builder
109            .id(tx_id)
110            .created_at(time)
111            .tx_template_id(tmpl.id)
112            .entry_ids(entries.iter().map(|e| e.id).collect());
113
114        tx_builder.journal_id(journal_id);
115
116        let effective: NaiveDate = tmpl.transaction.effective.try_evaluate(&ctx)?;
117        tx_builder.effective(effective);
118
119        if let Some(correlation_id) = tmpl.transaction.correlation_id.as_ref() {
120            let correlation_id: String = correlation_id.try_evaluate(&ctx)?;
121            tx_builder.correlation_id(correlation_id);
122        }
123
124        if let Some(external_id) = tmpl.transaction.external_id.as_ref() {
125            let external_id: String = external_id.try_evaluate(&ctx)?;
126            tx_builder.external_id(external_id);
127        }
128
129        if let Some(description) = tmpl.transaction.description.as_ref() {
130            let description: String = description.try_evaluate(&ctx)?;
131            tx_builder.description(description);
132        }
133
134        if let Some(metadata) = tmpl.transaction.metadata.as_ref() {
135            let metadata: serde_json::Value = metadata.try_evaluate(&ctx)?;
136            tx_builder.metadata(metadata);
137        }
138
139        let tx = tx_builder.build().expect("tx_build should succeed");
140
141        Ok(PreparedTransaction {
142            transaction: tx,
143            entries,
144        })
145    }
146
147    #[instrument(
148        name = "tx_template.prep_entries",
149        skip(self, tmpl, ctx),
150        fields(
151            template_id = %tmpl.id,
152            template_code = %tmpl.code,
153            transaction_id = %transaction_id,
154            journal_id = %journal_id,
155            entries_count = tmpl.entries.len()
156        ),
157        err
158    )]
159    fn prep_entries(
160        &self,
161        tmpl: &TxTemplateValues,
162        transaction_id: TransactionId,
163        journal_id: JournalId,
164        ctx: &cel_interpreter::CelContext,
165    ) -> Result<Vec<NewEntry>, TxTemplateError> {
166        let mut new_entries = Vec::new();
167        let mut totals = HashMap::new();
168        for (zero_based_sequence, entry) in tmpl.entries.iter().enumerate() {
169            let mut builder = NewEntry::builder();
170            builder
171                .id(EntryId::new())
172                .transaction_id(transaction_id)
173                .journal_id(journal_id)
174                .sequence(zero_based_sequence as u32 + 1);
175            let account_id: Uuid = entry.account_id.try_evaluate(ctx)?;
176            builder.account_id(account_id);
177
178            let entry_type: String = entry.entry_type.try_evaluate(ctx)?;
179            builder.entry_type(entry_type);
180
181            let layer: Layer = entry.layer.try_evaluate(ctx)?;
182            builder.layer(layer);
183
184            let units: Decimal = entry.units.try_evaluate(ctx)?;
185            let currency: Currency = entry.currency.try_evaluate(ctx)?;
186            let direction: DebitOrCredit = entry.direction.try_evaluate(ctx)?;
187
188            let total = totals.entry((currency, layer)).or_insert(Decimal::ZERO);
189            match direction {
190                DebitOrCredit::Debit => *total -= units,
191                DebitOrCredit::Credit => *total += units,
192            };
193            builder.units(units);
194            builder.currency(currency);
195            builder.direction(direction);
196
197            if let Some(description) = entry.description.as_ref() {
198                let description: String = description.try_evaluate(ctx)?;
199                builder.description(description);
200            }
201
202            if let Some(metadata) = entry.metadata.as_ref() {
203                let metadata: serde_json::Value = metadata.try_evaluate(ctx)?;
204                builder.metadata(metadata);
205            }
206
207            new_entries.push(builder.build().expect("Couldn't build entry"));
208        }
209
210        for ((c, l), v) in totals {
211            if v != Decimal::ZERO {
212                return Err(TxTemplateError::UnbalancedTransaction(c, l, v));
213            }
214        }
215
216        Ok(new_entries)
217    }
218
219    #[cfg(feature = "import")]
220    #[instrument(
221        name = "tx_template.sync_creation",
222        skip(self, db, values),
223        fields(
224            template_id = %values.id,
225            template_code = %values.code,
226            origin = ?origin
227        ),
228        err
229    )]
230    pub async fn sync_tx_template_creation(
231        &self,
232        mut db: es_entity::DbOpWithTime<'_>,
233        origin: DataSourceId,
234        values: TxTemplateValues,
235    ) -> Result<(), TxTemplateError> {
236        let mut tx_template = TxTemplate::import(origin, values);
237        self.repo
238            .import_in_op(&mut db, origin, &mut tx_template)
239            .await?;
240        db.commit().await?;
241        Ok(())
242    }
243}
244
245impl From<&TxTemplateEvent> for OutboxEventPayload {
246    fn from(event: &TxTemplateEvent) -> Self {
247        let source = es_entity::context::EventContext::current()
248            .data()
249            .lookup("data_source")
250            .ok()
251            .flatten()
252            .unwrap_or(DataSource::Local);
253
254        match event {
255            #[cfg(feature = "import")]
256            TxTemplateEvent::Imported { source, values } => OutboxEventPayload::TxTemplateCreated {
257                source: *source,
258                tx_template: values.clone(),
259            },
260            TxTemplateEvent::Initialized {
261                values: tx_template,
262            } => OutboxEventPayload::TxTemplateCreated {
263                source,
264                tx_template: tx_template.clone(),
265            },
266        }
267    }
268}