cala_ledger/transaction/
mod.rs

1pub mod error;
2
3mod entity;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::primitives::TxTemplateId;
14use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
15
16pub use entity::*;
17use error::*;
18pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Transactions {
23    repo: TransactionRepo,
24    outbox: Outbox,
25    _pool: PgPool,
26}
27
28impl Transactions {
29    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
30        Self {
31            repo: TransactionRepo::new(pool),
32            outbox,
33            _pool: pool.clone(),
34        }
35    }
36
37    pub(crate) async fn create_in_op(
38        &self,
39        db: &mut LedgerOperation<'_>,
40        new_transaction: NewTransaction,
41    ) -> Result<Transaction, TransactionError> {
42        let transaction = self.repo.create_in_op(db.op(), new_transaction).await?;
43        db.accumulate(transaction.events.last_persisted(1).map(|p| &p.event));
44        Ok(transaction)
45    }
46
47    #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self), err)]
48    pub async fn find_by_external_id(
49        &self,
50        external_id: String,
51    ) -> Result<Transaction, TransactionError> {
52        self.repo.find_by_external_id(Some(external_id)).await
53    }
54
55    #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self), err)]
56    pub async fn find_by_id(
57        &self,
58        transaction_id: TransactionId,
59    ) -> Result<Transaction, TransactionError> {
60        self.repo.find_by_id(transaction_id).await
61    }
62
63    #[instrument(
64        name = "cala_ledger.transactions.list_for_template_id",
65        skip(self),
66        err
67    )]
68    pub async fn list_for_template_id(
69        &self,
70        template_id: TxTemplateId,
71        query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
72        direction: es_entity::ListDirection,
73    ) -> Result<
74        es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
75        TransactionError,
76    > {
77        self.repo
78            .list_for_tx_template_id_by_created_at(template_id, query, direction)
79            .await
80    }
81
82    #[instrument(name = "cala_ledger.transactions.find_all", skip(self), err)]
83    pub async fn find_all<T: From<Transaction>>(
84        &self,
85        transaction_ids: &[TransactionId],
86    ) -> Result<HashMap<TransactionId, T>, TransactionError> {
87        self.repo.find_all(transaction_ids).await
88    }
89
90    #[cfg(feature = "import")]
91    pub async fn sync_transaction_creation(
92        &self,
93        mut db: es_entity::DbOp<'_>,
94        origin: DataSourceId,
95        values: TransactionValues,
96    ) -> Result<(), TransactionError> {
97        let mut transaction = Transaction::import(origin, values);
98        self.repo
99            .import_in_op(&mut db, origin, &mut transaction)
100            .await?;
101        let recorded_at = db.now();
102        let outbox_events: Vec<_> = transaction
103            .events
104            .last_persisted(1)
105            .map(|p| OutboxEventPayload::from(&p.event))
106            .collect();
107        self.outbox
108            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
109            .await?;
110        Ok(())
111    }
112}
113
114impl From<&TransactionEvent> for OutboxEventPayload {
115    fn from(event: &TransactionEvent) -> Self {
116        match event {
117            #[cfg(feature = "import")]
118            TransactionEvent::Imported {
119                source,
120                values: transaction,
121            } => OutboxEventPayload::TransactionCreated {
122                source: *source,
123                transaction: transaction.clone(),
124            },
125            TransactionEvent::Initialized {
126                values: transaction,
127            } => OutboxEventPayload::TransactionCreated {
128                source: DataSource::Local,
129                transaction: transaction.clone(),
130            },
131        }
132    }
133}