cala_ledger/transaction/
mod.rs

1pub mod error;
2
3mod entity;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::primitives::{EntryId, TxTemplateId};
14use crate::{outbox::*, primitives::DataSource};
15
16pub use entity::*;
17use error::*;
18pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Transactions {
23    repo: TransactionRepo,
24}
25
26impl Transactions {
27    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
28        Self {
29            repo: TransactionRepo::new(pool, publisher),
30        }
31    }
32
33    #[instrument(name = "cala_ledger.transactions.create_in_op", skip_all)]
34    pub(crate) async fn create_in_op(
35        &self,
36        db: &mut impl es_entity::AtomicOperation,
37        new_transaction: NewTransaction,
38    ) -> Result<Transaction, TransactionError> {
39        let transaction = self.repo.create_in_op(db, new_transaction).await?;
40        Ok(transaction)
41    }
42
43    #[instrument(name = "cala_ledger.transactions.create_voided_tx_in_op", skip_all)]
44    pub async fn create_voided_tx_in_op(
45        &self,
46        db: &mut impl es_entity::AtomicOperationWithTime,
47        voiding_tx_id: TransactionId,
48        existing_tx_id: TransactionId,
49        entry_ids: impl IntoIterator<Item = EntryId>,
50    ) -> Result<Transaction, TransactionError> {
51        let mut existing_tx = self.repo.find_by_id_in_op(&mut *db, existing_tx_id).await?;
52
53        let new_tx = existing_tx.void(voiding_tx_id, entry_ids.into_iter().collect(), db.now())?;
54
55        self.repo.update_in_op(db, &mut existing_tx).await?;
56        let voided_tx = self.repo.create_in_op(db, new_tx).await?;
57
58        Ok(voided_tx)
59    }
60
61    #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self))]
62    pub async fn find_by_external_id(
63        &self,
64        external_id: String,
65    ) -> Result<Transaction, TransactionError> {
66        self.repo.find_by_external_id(Some(external_id)).await
67    }
68
69    #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self))]
70    pub async fn find_by_id(
71        &self,
72        transaction_id: TransactionId,
73    ) -> Result<Transaction, TransactionError> {
74        self.repo.find_by_id(transaction_id).await
75    }
76
77    #[instrument(name = "cala_ledger.transactions.list_for_template_id", skip(self))]
78    pub async fn list_for_template_id(
79        &self,
80        template_id: TxTemplateId,
81        query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
82        direction: es_entity::ListDirection,
83    ) -> Result<
84        es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
85        TransactionError,
86    > {
87        self.repo
88            .list_for_tx_template_id_by_created_at(template_id, query, direction)
89            .await
90    }
91
92    #[instrument(name = "cala_ledger.transactions.find_all", skip(self))]
93    pub async fn find_all<T: From<Transaction>>(
94        &self,
95        transaction_ids: &[TransactionId],
96    ) -> Result<HashMap<TransactionId, T>, TransactionError> {
97        self.repo.find_all(transaction_ids).await
98    }
99
100    #[cfg(feature = "import")]
101    #[instrument(name = "cala_ledger.transactions.sync_transaction_creation", skip_all)]
102    pub async fn sync_transaction_creation(
103        &self,
104        mut db: es_entity::DbOpWithTime<'_>,
105        origin: DataSourceId,
106        values: TransactionValues,
107    ) -> Result<(), TransactionError> {
108        let mut transaction = Transaction::import(origin, values);
109        self.repo
110            .import_in_op(&mut db, origin, &mut transaction)
111            .await?;
112        db.commit().await?;
113        Ok(())
114    }
115
116    #[cfg(feature = "import")]
117    #[instrument(name = "cala_ledger.transactions.sync_transaction_update", skip_all)]
118    pub async fn sync_transaction_update(
119        &self,
120        mut db: es_entity::DbOpWithTime<'_>,
121        origin: DataSourceId,
122        values: TransactionValues,
123    ) -> Result<(), TransactionError> {
124        let mut transaction = Transaction::import(origin, values);
125        self.repo
126            .import_in_op(&mut db, origin, &mut transaction)
127            .await?;
128        db.commit().await?;
129        Ok(())
130    }
131}
132
133impl From<&TransactionEvent> for OutboxEventPayload {
134    fn from(event: &TransactionEvent) -> Self {
135        let source = es_entity::context::EventContext::current()
136            .data()
137            .lookup("data_source")
138            .ok()
139            .flatten()
140            .unwrap_or(DataSource::Local);
141
142        match event {
143            #[cfg(feature = "import")]
144            TransactionEvent::Imported {
145                source,
146                values: transaction,
147            } => OutboxEventPayload::TransactionCreated {
148                source: *source,
149                transaction: transaction.clone(),
150            },
151            TransactionEvent::Initialized {
152                values: transaction,
153            } => OutboxEventPayload::TransactionCreated {
154                source,
155                transaction: transaction.clone(),
156            },
157            TransactionEvent::Updated { values, fields } => {
158                OutboxEventPayload::TransactionUpdated {
159                    source,
160                    transaction: values.clone(),
161                    fields: fields.clone(),
162                }
163            }
164        }
165    }
166}