cala_ledger/transaction/
mod.rs

1pub mod error;
2
3mod entity;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::primitives::{EntryId, TxTemplateId};
15use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
16
17pub use entity::*;
18use error::*;
19pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
20use repo::*;
21
22#[derive(Clone)]
23pub struct Transactions {
24    repo: TransactionRepo,
25    // Used only for "import" feature
26    #[allow(dead_code)]
27    outbox: Outbox,
28    _pool: PgPool,
29}
30
31impl Transactions {
32    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
33        Self {
34            repo: TransactionRepo::new(pool),
35            outbox,
36            _pool: pool.clone(),
37        }
38    }
39
40    pub(crate) async fn create_in_op(
41        &self,
42        db: &mut LedgerOperation<'_>,
43        new_transaction: NewTransaction,
44    ) -> Result<Transaction, TransactionError> {
45        let transaction = self.repo.create_in_op(db, new_transaction).await?;
46        db.accumulate(transaction.last_persisted(1).map(|p| &p.event));
47        Ok(transaction)
48    }
49
50    pub async fn create_voided_tx_in_op(
51        &self,
52        db: &mut LedgerOperation<'_>,
53        voiding_tx_id: TransactionId,
54        existing_tx_id: TransactionId,
55        entry_ids: impl IntoIterator<Item = EntryId>,
56    ) -> Result<Transaction, TransactionError> {
57        let mut existing_tx = self.repo.find_by_id_in_op(&mut *db, existing_tx_id).await?;
58
59        let new_tx = existing_tx.void(voiding_tx_id, entry_ids.into_iter().collect(), db.now())?;
60
61        self.repo.update_in_op(db, &mut existing_tx).await?;
62        let voided_tx = self.repo.create_in_op(db, new_tx).await?;
63
64        db.accumulate(
65            existing_tx
66                .last_persisted(1)
67                .map(|p| &p.event)
68                .chain(voided_tx.last_persisted(1).map(|p| &p.event)),
69        );
70
71        Ok(voided_tx)
72    }
73
74    #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self))]
75    pub async fn find_by_external_id(
76        &self,
77        external_id: String,
78    ) -> Result<Transaction, TransactionError> {
79        self.repo.find_by_external_id(Some(external_id)).await
80    }
81
82    #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self))]
83    pub async fn find_by_id(
84        &self,
85        transaction_id: TransactionId,
86    ) -> Result<Transaction, TransactionError> {
87        self.repo.find_by_id(transaction_id).await
88    }
89
90    #[instrument(name = "cala_ledger.transactions.list_for_template_id", skip(self))]
91    pub async fn list_for_template_id(
92        &self,
93        template_id: TxTemplateId,
94        query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
95        direction: es_entity::ListDirection,
96    ) -> Result<
97        es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
98        TransactionError,
99    > {
100        self.repo
101            .list_for_tx_template_id_by_created_at(template_id, query, direction)
102            .await
103    }
104
105    #[instrument(name = "cala_ledger.transactions.find_all", skip(self))]
106    pub async fn find_all<T: From<Transaction>>(
107        &self,
108        transaction_ids: &[TransactionId],
109    ) -> Result<HashMap<TransactionId, T>, TransactionError> {
110        self.repo.find_all(transaction_ids).await
111    }
112
113    #[cfg(feature = "import")]
114    pub async fn sync_transaction_creation(
115        &self,
116        mut db: es_entity::DbOpWithTime<'_>,
117        origin: DataSourceId,
118        values: TransactionValues,
119    ) -> Result<(), TransactionError> {
120        let mut transaction = Transaction::import(origin, values);
121        self.repo
122            .import_in_op(&mut db, origin, &mut transaction)
123            .await?;
124        let outbox_events: Vec<_> = transaction
125            .last_persisted(1)
126            .map(|p| OutboxEventPayload::from(&p.event))
127            .collect();
128        let time = db.now();
129        self.outbox
130            .persist_events_at(db, outbox_events, time)
131            .await?;
132        Ok(())
133    }
134
135    #[cfg(feature = "import")]
136    pub async fn sync_transaction_update(
137        &self,
138        mut db: es_entity::DbOpWithTime<'_>,
139        origin: DataSourceId,
140        values: TransactionValues,
141    ) -> Result<(), TransactionError> {
142        let mut transaction = Transaction::import(origin, values);
143        self.repo
144            .import_in_op(&mut db, origin, &mut transaction)
145            .await?;
146        let outbox_events: Vec<_> = transaction
147            .last_persisted(1)
148            .map(|p| OutboxEventPayload::from(&p.event))
149            .collect();
150        let time = db.now();
151        self.outbox
152            .persist_events_at(db, outbox_events, time)
153            .await?;
154        Ok(())
155    }
156}
157
158impl From<&TransactionEvent> for OutboxEventPayload {
159    fn from(event: &TransactionEvent) -> Self {
160        match event {
161            #[cfg(feature = "import")]
162            TransactionEvent::Imported {
163                source,
164                values: transaction,
165            } => OutboxEventPayload::TransactionCreated {
166                source: *source,
167                transaction: transaction.clone(),
168            },
169            TransactionEvent::Initialized {
170                values: transaction,
171            } => OutboxEventPayload::TransactionCreated {
172                source: DataSource::Local,
173                transaction: transaction.clone(),
174            },
175            TransactionEvent::Updated { values, fields } => {
176                OutboxEventPayload::TransactionUpdated {
177                    source: DataSource::Local,
178                    transaction: values.clone(),
179                    fields: fields.clone(),
180                }
181            }
182        }
183    }
184}