cala_ledger/tx_template/
mod.rs

1mod entity;
2mod repo;
3
4pub mod error;
5
6use chrono::NaiveDate;
7use es_entity::clock::ClockHandle;
8use rust_decimal::Decimal;
9use sqlx::PgPool;
10use std::collections::HashMap;
11use tracing::instrument;
12use uuid::Uuid;
13
14pub use crate::param::*;
15use crate::{
16    entry::NewEntry,
17    outbox::*,
18    primitives::{DataSource, *},
19    transaction::NewTransaction,
20};
21
22pub use entity::*;
23use error::*;
24pub use repo::tx_template_cursor::TxTemplatesByCodeCursor;
25use repo::*;
26
27pub(crate) struct PreparedTransaction {
28    pub transaction: NewTransaction,
29    pub entries: Vec<NewEntry>,
30}
31
32#[derive(Clone)]
33pub struct TxTemplates {
34    repo: TxTemplateRepo,
35    clock: ClockHandle,
36}
37
38impl TxTemplates {
39    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher, clock: &ClockHandle) -> Self {
40        Self {
41            repo: TxTemplateRepo::new(pool, publisher),
42            clock: clock.clone(),
43        }
44    }
45
46    #[instrument(name = "cala_ledger.tx_template.create", skip(self))]
47    pub async fn create(
48        &self,
49        new_tx_template: NewTxTemplate,
50    ) -> Result<TxTemplate, TxTemplateError> {
51        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
52        let tx_template = self.create_in_op(&mut op, new_tx_template).await?;
53        op.commit().await?;
54        Ok(tx_template)
55    }
56
57    #[instrument(name = "cala_ledger.tx_template.create_in_op", skip(self, db), err)]
58    pub async fn create_in_op(
59        &self,
60        db: &mut impl es_entity::AtomicOperation,
61        new_tx_template: NewTxTemplate,
62    ) -> Result<TxTemplate, TxTemplateError> {
63        let tx_template = self.repo.create_in_op(db, new_tx_template).await?;
64        Ok(tx_template)
65    }
66
67    #[instrument(name = "cala_ledger.tx_templates.find_all", skip(self))]
68    pub async fn find_all<T: From<TxTemplate>>(
69        &self,
70        tx_template_ids: &[TxTemplateId],
71    ) -> Result<HashMap<TxTemplateId, T>, TxTemplateError> {
72        self.repo.find_all(tx_template_ids).await
73    }
74
75    #[instrument(name = "cala_ledger.tx_templates.list", skip(self))]
76    pub async fn list(
77        &self,
78        cursor: es_entity::PaginatedQueryArgs<TxTemplatesByCodeCursor>,
79        direction: es_entity::ListDirection,
80    ) -> Result<es_entity::PaginatedQueryRet<TxTemplate, TxTemplatesByCodeCursor>, TxTemplateError>
81    {
82        self.repo.list_by_code(cursor, direction).await
83    }
84
85    #[instrument(name = "cala_ledger.tx_templates.find_by_code", skip(self), fields(code = %code.as_ref()), err)]
86    pub async fn find_by_code(&self, code: impl AsRef<str>) -> Result<TxTemplate, TxTemplateError> {
87        self.repo.find_by_code(code.as_ref().to_string()).await
88    }
89
90    #[instrument(
91        name = "cala_ledger.tx_template.prepare_transaction_in_op",
92        skip(self, db)
93    )]
94    pub(crate) async fn prepare_transaction_in_op(
95        &self,
96        db: &mut impl es_entity::AtomicOperation,
97        time: chrono::DateTime<chrono::Utc>,
98        tx_id: TransactionId,
99        code: &str,
100        params: Params,
101    ) -> Result<PreparedTransaction, TxTemplateError> {
102        let tmpl = self.repo.find_latest_version_in_op(db, code).await?;
103
104        let ctx = params.into_context(&self.clock, tmpl.params.as_ref())?;
105
106        let journal_id: Uuid = tmpl.transaction.journal_id.try_evaluate(&ctx)?;
107
108        let entries = self.prep_entries(&tmpl, tx_id, JournalId::from(journal_id), &ctx)?;
109
110        let mut tx_builder = NewTransaction::builder();
111        tx_builder
112            .id(tx_id)
113            .created_at(time)
114            .tx_template_id(tmpl.id)
115            .entry_ids(entries.iter().map(|e| e.id).collect());
116
117        tx_builder.journal_id(journal_id);
118
119        let effective: NaiveDate = tmpl.transaction.effective.try_evaluate(&ctx)?;
120        tx_builder.effective(effective);
121
122        if let Some(correlation_id) = tmpl.transaction.correlation_id.as_ref() {
123            let correlation_id: String = correlation_id.try_evaluate(&ctx)?;
124            tx_builder.correlation_id(correlation_id);
125        }
126
127        if let Some(external_id) = tmpl.transaction.external_id.as_ref() {
128            let external_id: String = external_id.try_evaluate(&ctx)?;
129            tx_builder.external_id(external_id);
130        }
131
132        if let Some(description) = tmpl.transaction.description.as_ref() {
133            let description: String = description.try_evaluate(&ctx)?;
134            tx_builder.description(description);
135        }
136
137        if let Some(metadata) = tmpl.transaction.metadata.as_ref() {
138            let metadata: serde_json::Value = metadata.try_evaluate(&ctx)?;
139            tx_builder.metadata(metadata);
140        }
141
142        let tx = tx_builder.build().expect("tx_build should succeed");
143
144        Ok(PreparedTransaction {
145            transaction: tx,
146            entries,
147        })
148    }
149
150    #[instrument(
151        name = "tx_template.prep_entries",
152        skip(self, tmpl, ctx),
153        fields(
154            template_id = %tmpl.id,
155            template_code = %tmpl.code,
156            transaction_id = %transaction_id,
157            journal_id = %journal_id,
158            entries_count = tmpl.entries.len()
159        ),
160        err
161    )]
162    fn prep_entries(
163        &self,
164        tmpl: &TxTemplateValues,
165        transaction_id: TransactionId,
166        journal_id: JournalId,
167        ctx: &cel_interpreter::CelContext,
168    ) -> Result<Vec<NewEntry>, TxTemplateError> {
169        let mut new_entries = Vec::new();
170        let mut totals = HashMap::new();
171        for (zero_based_sequence, entry) in tmpl.entries.iter().enumerate() {
172            let mut builder = NewEntry::builder();
173            builder
174                .id(EntryId::new())
175                .transaction_id(transaction_id)
176                .journal_id(journal_id)
177                .sequence(zero_based_sequence as u32 + 1);
178            let account_id: Uuid = entry.account_id.try_evaluate(ctx)?;
179            builder.account_id(account_id);
180
181            let entry_type: String = entry.entry_type.try_evaluate(ctx)?;
182            builder.entry_type(entry_type);
183
184            let layer: Layer = entry.layer.try_evaluate(ctx)?;
185            builder.layer(layer);
186
187            let units: Decimal = entry.units.try_evaluate(ctx)?;
188            let currency: Currency = entry.currency.try_evaluate(ctx)?;
189            let direction: DebitOrCredit = entry.direction.try_evaluate(ctx)?;
190
191            let total = totals.entry((currency, layer)).or_insert(Decimal::ZERO);
192            match direction {
193                DebitOrCredit::Debit => *total -= units,
194                DebitOrCredit::Credit => *total += units,
195            };
196            builder.units(units);
197            builder.currency(currency);
198            builder.direction(direction);
199
200            if let Some(description) = entry.description.as_ref() {
201                let description: String = description.try_evaluate(ctx)?;
202                builder.description(description);
203            }
204
205            if let Some(metadata) = entry.metadata.as_ref() {
206                let metadata: serde_json::Value = metadata.try_evaluate(ctx)?;
207                builder.metadata(metadata);
208            }
209
210            new_entries.push(builder.build().expect("Couldn't build entry"));
211        }
212
213        for ((c, l), v) in totals {
214            if v != Decimal::ZERO {
215                return Err(TxTemplateError::UnbalancedTransaction(c, l, v));
216            }
217        }
218
219        Ok(new_entries)
220    }
221
222    #[cfg(feature = "import")]
223    #[instrument(
224        name = "tx_template.sync_creation",
225        skip(self, db, values),
226        fields(
227            template_id = %values.id,
228            template_code = %values.code,
229            origin = ?origin
230        ),
231        err
232    )]
233    pub async fn sync_tx_template_creation(
234        &self,
235        mut db: es_entity::DbOpWithTime<'_>,
236        origin: DataSourceId,
237        values: TxTemplateValues,
238    ) -> Result<(), TxTemplateError> {
239        let mut tx_template = TxTemplate::import(origin, values);
240        self.repo
241            .import_in_op(&mut db, origin, &mut tx_template)
242            .await?;
243        db.commit().await?;
244        Ok(())
245    }
246}
247
248impl From<&TxTemplateEvent> for OutboxEventPayload {
249    fn from(event: &TxTemplateEvent) -> Self {
250        let source = es_entity::context::EventContext::current()
251            .data()
252            .lookup("data_source")
253            .ok()
254            .flatten()
255            .unwrap_or(DataSource::Local);
256
257        match event {
258            #[cfg(feature = "import")]
259            TxTemplateEvent::Imported { source, values } => OutboxEventPayload::TxTemplateCreated {
260                source: *source,
261                tx_template: values.clone(),
262            },
263            TxTemplateEvent::Initialized {
264                values: tx_template,
265            } => OutboxEventPayload::TxTemplateCreated {
266                source,
267                tx_template: tx_template.clone(),
268            },
269        }
270    }
271}