cala_ledger/transaction/
mod.rs1pub mod error;
2
3mod entity;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::primitives::TxTemplateId;
15use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
16
17pub use entity::*;
18use error::*;
19pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
20use repo::*;
21
22#[derive(Clone)]
23pub struct Transactions {
24 repo: TransactionRepo,
25 outbox: Outbox,
26 _pool: PgPool,
27}
28
29impl Transactions {
30 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
31 Self {
32 repo: TransactionRepo::new(pool),
33 outbox,
34 _pool: pool.clone(),
35 }
36 }
37
38 pub(crate) async fn create_in_op(
39 &self,
40 db: &mut LedgerOperation<'_>,
41 new_transaction: NewTransaction,
42 ) -> Result<Transaction, TransactionError> {
43 let transaction = self.repo.create_in_op(db.op(), new_transaction).await?;
44 db.accumulate(transaction.last_persisted(1).map(|p| &p.event));
45 Ok(transaction)
46 }
47
48 #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self), err)]
49 pub async fn find_by_external_id(
50 &self,
51 external_id: String,
52 ) -> Result<Transaction, TransactionError> {
53 self.repo.find_by_external_id(Some(external_id)).await
54 }
55
56 #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self), err)]
57 pub async fn find_by_id(
58 &self,
59 transaction_id: TransactionId,
60 ) -> Result<Transaction, TransactionError> {
61 self.repo.find_by_id(transaction_id).await
62 }
63
64 #[instrument(
65 name = "cala_ledger.transactions.list_for_template_id",
66 skip(self),
67 err
68 )]
69 pub async fn list_for_template_id(
70 &self,
71 template_id: TxTemplateId,
72 query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
73 direction: es_entity::ListDirection,
74 ) -> Result<
75 es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
76 TransactionError,
77 > {
78 self.repo
79 .list_for_tx_template_id_by_created_at(template_id, query, direction)
80 .await
81 }
82
83 #[instrument(name = "cala_ledger.transactions.find_all", skip(self), err)]
84 pub async fn find_all<T: From<Transaction>>(
85 &self,
86 transaction_ids: &[TransactionId],
87 ) -> Result<HashMap<TransactionId, T>, TransactionError> {
88 self.repo.find_all(transaction_ids).await
89 }
90
91 #[cfg(feature = "import")]
92 pub async fn sync_transaction_creation(
93 &self,
94 mut db: es_entity::DbOp<'_>,
95 origin: DataSourceId,
96 values: TransactionValues,
97 ) -> Result<(), TransactionError> {
98 let mut transaction = Transaction::import(origin, values);
99 self.repo
100 .import_in_op(&mut db, origin, &mut transaction)
101 .await?;
102 let recorded_at = db.now();
103 let outbox_events: Vec<_> = transaction
104 .last_persisted(1)
105 .map(|p| OutboxEventPayload::from(&p.event))
106 .collect();
107 self.outbox
108 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
109 .await?;
110 Ok(())
111 }
112}
113
114impl From<&TransactionEvent> for OutboxEventPayload {
115 fn from(event: &TransactionEvent) -> Self {
116 match event {
117 #[cfg(feature = "import")]
118 TransactionEvent::Imported {
119 source,
120 values: transaction,
121 } => OutboxEventPayload::TransactionCreated {
122 source: *source,
123 transaction: transaction.clone(),
124 },
125 TransactionEvent::Initialized {
126 values: transaction,
127 } => OutboxEventPayload::TransactionCreated {
128 source: DataSource::Local,
129 transaction: transaction.clone(),
130 },
131 }
132 }
133}