cala_ledger/transaction/
mod.rs

1pub mod error;
2
3mod entity;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::primitives::TxTemplateId;
15use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
16
17pub use entity::*;
18use error::*;
19pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
20use repo::*;
21
22#[derive(Clone)]
23pub struct Transactions {
24    repo: TransactionRepo,
25    outbox: Outbox,
26    _pool: PgPool,
27}
28
29impl Transactions {
30    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
31        Self {
32            repo: TransactionRepo::new(pool),
33            outbox,
34            _pool: pool.clone(),
35        }
36    }
37
38    pub(crate) async fn create_in_op(
39        &self,
40        db: &mut LedgerOperation<'_>,
41        new_transaction: NewTransaction,
42    ) -> Result<Transaction, TransactionError> {
43        let transaction = self.repo.create_in_op(db.op(), new_transaction).await?;
44        db.accumulate(transaction.last_persisted(1).map(|p| &p.event));
45        Ok(transaction)
46    }
47
48    #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self), err)]
49    pub async fn find_by_external_id(
50        &self,
51        external_id: String,
52    ) -> Result<Transaction, TransactionError> {
53        self.repo.find_by_external_id(Some(external_id)).await
54    }
55
56    #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self), err)]
57    pub async fn find_by_id(
58        &self,
59        transaction_id: TransactionId,
60    ) -> Result<Transaction, TransactionError> {
61        self.repo.find_by_id(transaction_id).await
62    }
63
64    #[instrument(
65        name = "cala_ledger.transactions.list_for_template_id",
66        skip(self),
67        err
68    )]
69    pub async fn list_for_template_id(
70        &self,
71        template_id: TxTemplateId,
72        query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
73        direction: es_entity::ListDirection,
74    ) -> Result<
75        es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
76        TransactionError,
77    > {
78        self.repo
79            .list_for_tx_template_id_by_created_at(template_id, query, direction)
80            .await
81    }
82
83    #[instrument(name = "cala_ledger.transactions.find_all", skip(self), err)]
84    pub async fn find_all<T: From<Transaction>>(
85        &self,
86        transaction_ids: &[TransactionId],
87    ) -> Result<HashMap<TransactionId, T>, TransactionError> {
88        self.repo.find_all(transaction_ids).await
89    }
90
91    #[cfg(feature = "import")]
92    pub async fn sync_transaction_creation(
93        &self,
94        mut db: es_entity::DbOp<'_>,
95        origin: DataSourceId,
96        values: TransactionValues,
97    ) -> Result<(), TransactionError> {
98        let mut transaction = Transaction::import(origin, values);
99        self.repo
100            .import_in_op(&mut db, origin, &mut transaction)
101            .await?;
102        let recorded_at = db.now();
103        let outbox_events: Vec<_> = transaction
104            .last_persisted(1)
105            .map(|p| OutboxEventPayload::from(&p.event))
106            .collect();
107        self.outbox
108            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
109            .await?;
110        Ok(())
111    }
112}
113
114impl From<&TransactionEvent> for OutboxEventPayload {
115    fn from(event: &TransactionEvent) -> Self {
116        match event {
117            #[cfg(feature = "import")]
118            TransactionEvent::Imported {
119                source,
120                values: transaction,
121            } => OutboxEventPayload::TransactionCreated {
122                source: *source,
123                transaction: transaction.clone(),
124            },
125            TransactionEvent::Initialized {
126                values: transaction,
127            } => OutboxEventPayload::TransactionCreated {
128                source: DataSource::Local,
129                transaction: transaction.clone(),
130            },
131        }
132    }
133}