cala_ledger/transaction/
mod.rs1pub mod error;
2
3mod entity;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
14
15pub use entity::*;
16use error::*;
17use repo::*;
18
19#[derive(Clone)]
20pub struct Transactions {
21 repo: TransactionRepo,
22 outbox: Outbox,
23 _pool: PgPool,
24}
25
26impl Transactions {
27 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
28 Self {
29 repo: TransactionRepo::new(pool),
30 outbox,
31 _pool: pool.clone(),
32 }
33 }
34
35 pub(crate) async fn create_in_op(
36 &self,
37 db: &mut LedgerOperation<'_>,
38 new_transaction: NewTransaction,
39 ) -> Result<Transaction, TransactionError> {
40 let transaction = self.repo.create_in_op(db.op(), new_transaction).await?;
41 db.accumulate(transaction.events.last_persisted(1).map(|p| &p.event));
42 Ok(transaction)
43 }
44
45 #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self), err)]
46 pub async fn find_by_external_id(
47 &self,
48 external_id: String,
49 ) -> Result<Transaction, TransactionError> {
50 self.repo.find_by_external_id(Some(external_id)).await
51 }
52
53 #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self), err)]
54 pub async fn find_by_id(
55 &self,
56 transaction_id: TransactionId,
57 ) -> Result<Transaction, TransactionError> {
58 self.repo.find_by_id(transaction_id).await
59 }
60
61 #[instrument(name = "cala_ledger.transactions.find_all", skip(self), err)]
62 pub async fn find_all<T: From<Transaction>>(
63 &self,
64 transaction_ids: &[TransactionId],
65 ) -> Result<HashMap<TransactionId, T>, TransactionError> {
66 self.repo.find_all(transaction_ids).await
67 }
68
69 #[cfg(feature = "import")]
70 pub async fn sync_transaction_creation(
71 &self,
72 mut db: es_entity::DbOp<'_>,
73 origin: DataSourceId,
74 values: TransactionValues,
75 ) -> Result<(), TransactionError> {
76 let mut transaction = Transaction::import(origin, values);
77 self.repo
78 .import_in_op(&mut db, origin, &mut transaction)
79 .await?;
80 let recorded_at = db.now();
81 let outbox_events: Vec<_> = transaction
82 .events
83 .last_persisted(1)
84 .map(|p| OutboxEventPayload::from(&p.event))
85 .collect();
86 self.outbox
87 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
88 .await?;
89 Ok(())
90 }
91}
92
93impl From<&TransactionEvent> for OutboxEventPayload {
94 fn from(event: &TransactionEvent) -> Self {
95 match event {
96 #[cfg(feature = "import")]
97 TransactionEvent::Imported {
98 source,
99 values: transaction,
100 } => OutboxEventPayload::TransactionCreated {
101 source: *source,
102 transaction: transaction.clone(),
103 },
104 TransactionEvent::Initialized {
105 values: transaction,
106 } => OutboxEventPayload::TransactionCreated {
107 source: DataSource::Local,
108 transaction: transaction.clone(),
109 },
110 }
111 }
112}