cala_ledger/transaction/
mod.rs

1pub mod error;
2
3mod entity;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
14
15pub use entity::*;
16use error::*;
17use repo::*;
18
19#[derive(Clone)]
20pub struct Transactions {
21    repo: TransactionRepo,
22    outbox: Outbox,
23    _pool: PgPool,
24}
25
26impl Transactions {
27    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
28        Self {
29            repo: TransactionRepo::new(pool),
30            outbox,
31            _pool: pool.clone(),
32        }
33    }
34
35    pub(crate) async fn create_in_op(
36        &self,
37        db: &mut LedgerOperation<'_>,
38        new_transaction: NewTransaction,
39    ) -> Result<Transaction, TransactionError> {
40        let transaction = self.repo.create_in_op(db.op(), new_transaction).await?;
41        db.accumulate(transaction.events.last_persisted(1).map(|p| &p.event));
42        Ok(transaction)
43    }
44
45    #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self), err)]
46    pub async fn find_by_external_id(
47        &self,
48        external_id: String,
49    ) -> Result<Transaction, TransactionError> {
50        self.repo.find_by_external_id(Some(external_id)).await
51    }
52
53    #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self), err)]
54    pub async fn find_by_id(
55        &self,
56        transaction_id: TransactionId,
57    ) -> Result<Transaction, TransactionError> {
58        self.repo.find_by_id(transaction_id).await
59    }
60
61    #[instrument(name = "cala_ledger.transactions.find_all", skip(self), err)]
62    pub async fn find_all<T: From<Transaction>>(
63        &self,
64        transaction_ids: &[TransactionId],
65    ) -> Result<HashMap<TransactionId, T>, TransactionError> {
66        self.repo.find_all(transaction_ids).await
67    }
68
69    #[cfg(feature = "import")]
70    pub async fn sync_transaction_creation(
71        &self,
72        mut db: es_entity::DbOp<'_>,
73        origin: DataSourceId,
74        values: TransactionValues,
75    ) -> Result<(), TransactionError> {
76        let mut transaction = Transaction::import(origin, values);
77        self.repo
78            .import_in_op(&mut db, origin, &mut transaction)
79            .await?;
80        let recorded_at = db.now();
81        let outbox_events: Vec<_> = transaction
82            .events
83            .last_persisted(1)
84            .map(|p| OutboxEventPayload::from(&p.event))
85            .collect();
86        self.outbox
87            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
88            .await?;
89        Ok(())
90    }
91}
92
93impl From<&TransactionEvent> for OutboxEventPayload {
94    fn from(event: &TransactionEvent) -> Self {
95        match event {
96            #[cfg(feature = "import")]
97            TransactionEvent::Imported {
98                source,
99                values: transaction,
100            } => OutboxEventPayload::TransactionCreated {
101                source: *source,
102                transaction: transaction.clone(),
103            },
104            TransactionEvent::Initialized {
105                values: transaction,
106            } => OutboxEventPayload::TransactionCreated {
107                source: DataSource::Local,
108                transaction: transaction.clone(),
109            },
110        }
111    }
112}