cala_ledger/transaction/
mod.rs

1pub mod error;
2
3mod entity;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::primitives::{EntryId, TxTemplateId};
15use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
16
17pub use entity::*;
18use error::*;
19pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
20use repo::*;
21
22#[derive(Clone)]
23pub struct Transactions {
24    repo: TransactionRepo,
25    // Used only for "import" feature
26    #[allow(dead_code)]
27    outbox: Outbox,
28    _pool: PgPool,
29}
30
31impl Transactions {
32    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
33        Self {
34            repo: TransactionRepo::new(pool),
35            outbox,
36            _pool: pool.clone(),
37        }
38    }
39
40    pub(crate) async fn create_in_op(
41        &self,
42        db: &mut LedgerOperation<'_>,
43        new_transaction: NewTransaction,
44    ) -> Result<Transaction, TransactionError> {
45        let transaction = self.repo.create_in_op(db, new_transaction).await?;
46        db.accumulate(transaction.last_persisted(1).map(|p| &p.event));
47        Ok(transaction)
48    }
49
50    pub async fn create_voided_tx_in_op(
51        &self,
52        db: &mut LedgerOperation<'_>,
53        voiding_tx_id: TransactionId,
54        existing_tx_id: TransactionId,
55        entry_ids: impl IntoIterator<Item = EntryId>,
56    ) -> Result<Transaction, TransactionError> {
57        let mut existing_tx = self.repo.find_by_id_in_op(&mut *db, existing_tx_id).await?;
58
59        let new_tx = existing_tx.void(voiding_tx_id, entry_ids.into_iter().collect(), db.now())?;
60
61        self.repo.update_in_op(db, &mut existing_tx).await?;
62        let voided_tx = self.repo.create_in_op(db, new_tx).await?;
63
64        db.accumulate(
65            existing_tx
66                .last_persisted(1)
67                .map(|p| &p.event)
68                .chain(voided_tx.last_persisted(1).map(|p| &p.event)),
69        );
70
71        Ok(voided_tx)
72    }
73
74    #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self), err)]
75    pub async fn find_by_external_id(
76        &self,
77        external_id: String,
78    ) -> Result<Transaction, TransactionError> {
79        self.repo.find_by_external_id(Some(external_id)).await
80    }
81
82    #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self), err)]
83    pub async fn find_by_id(
84        &self,
85        transaction_id: TransactionId,
86    ) -> Result<Transaction, TransactionError> {
87        self.repo.find_by_id(transaction_id).await
88    }
89
90    #[instrument(
91        name = "cala_ledger.transactions.list_for_template_id",
92        skip(self),
93        err
94    )]
95    pub async fn list_for_template_id(
96        &self,
97        template_id: TxTemplateId,
98        query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
99        direction: es_entity::ListDirection,
100    ) -> Result<
101        es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
102        TransactionError,
103    > {
104        self.repo
105            .list_for_tx_template_id_by_created_at(template_id, query, direction)
106            .await
107    }
108
109    #[instrument(name = "cala_ledger.transactions.find_all", skip(self), err)]
110    pub async fn find_all<T: From<Transaction>>(
111        &self,
112        transaction_ids: &[TransactionId],
113    ) -> Result<HashMap<TransactionId, T>, TransactionError> {
114        self.repo.find_all(transaction_ids).await
115    }
116
117    #[cfg(feature = "import")]
118    pub async fn sync_transaction_creation(
119        &self,
120        mut db: es_entity::DbOpWithTime<'_>,
121        origin: DataSourceId,
122        values: TransactionValues,
123    ) -> Result<(), TransactionError> {
124        let mut transaction = Transaction::import(origin, values);
125        self.repo
126            .import_in_op(&mut db, origin, &mut transaction)
127            .await?;
128        let outbox_events: Vec<_> = transaction
129            .last_persisted(1)
130            .map(|p| OutboxEventPayload::from(&p.event))
131            .collect();
132        let time = db.now();
133        self.outbox
134            .persist_events_at(db, outbox_events, time)
135            .await?;
136        Ok(())
137    }
138
139    #[cfg(feature = "import")]
140    pub async fn sync_transaction_update(
141        &self,
142        mut db: es_entity::DbOpWithTime<'_>,
143        origin: DataSourceId,
144        values: TransactionValues,
145    ) -> Result<(), TransactionError> {
146        let mut transaction = Transaction::import(origin, values);
147        self.repo
148            .import_in_op(&mut db, origin, &mut transaction)
149            .await?;
150        let outbox_events: Vec<_> = transaction
151            .last_persisted(1)
152            .map(|p| OutboxEventPayload::from(&p.event))
153            .collect();
154        let time = db.now();
155        self.outbox
156            .persist_events_at(db, outbox_events, time)
157            .await?;
158        Ok(())
159    }
160}
161
162impl From<&TransactionEvent> for OutboxEventPayload {
163    fn from(event: &TransactionEvent) -> Self {
164        match event {
165            #[cfg(feature = "import")]
166            TransactionEvent::Imported {
167                source,
168                values: transaction,
169            } => OutboxEventPayload::TransactionCreated {
170                source: *source,
171                transaction: transaction.clone(),
172            },
173            TransactionEvent::Initialized {
174                values: transaction,
175            } => OutboxEventPayload::TransactionCreated {
176                source: DataSource::Local,
177                transaction: transaction.clone(),
178            },
179            TransactionEvent::Updated { values, fields } => {
180                OutboxEventPayload::TransactionUpdated {
181                    source: DataSource::Local,
182                    transaction: values.clone(),
183                    fields: fields.clone(),
184                }
185            }
186        }
187    }
188}