cala_ledger/transaction/
mod.rs1pub mod error;
2
3mod entity;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::primitives::TxTemplateId;
14use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
15
16pub use entity::*;
17use error::*;
18pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Transactions {
23 repo: TransactionRepo,
24 outbox: Outbox,
25 _pool: PgPool,
26}
27
28impl Transactions {
29 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
30 Self {
31 repo: TransactionRepo::new(pool),
32 outbox,
33 _pool: pool.clone(),
34 }
35 }
36
37 pub(crate) async fn create_in_op(
38 &self,
39 db: &mut LedgerOperation<'_>,
40 new_transaction: NewTransaction,
41 ) -> Result<Transaction, TransactionError> {
42 let transaction = self.repo.create_in_op(db.op(), new_transaction).await?;
43 db.accumulate(transaction.events.last_persisted(1).map(|p| &p.event));
44 Ok(transaction)
45 }
46
47 #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self), err)]
48 pub async fn find_by_external_id(
49 &self,
50 external_id: String,
51 ) -> Result<Transaction, TransactionError> {
52 self.repo.find_by_external_id(Some(external_id)).await
53 }
54
55 #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self), err)]
56 pub async fn find_by_id(
57 &self,
58 transaction_id: TransactionId,
59 ) -> Result<Transaction, TransactionError> {
60 self.repo.find_by_id(transaction_id).await
61 }
62
63 #[instrument(
64 name = "cala_ledger.transactions.list_for_template_id",
65 skip(self),
66 err
67 )]
68 pub async fn list_for_template_id(
69 &self,
70 template_id: TxTemplateId,
71 query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
72 direction: es_entity::ListDirection,
73 ) -> Result<
74 es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
75 TransactionError,
76 > {
77 self.repo
78 .list_for_tx_template_id_by_created_at(template_id, query, direction)
79 .await
80 }
81
82 #[instrument(name = "cala_ledger.transactions.find_all", skip(self), err)]
83 pub async fn find_all<T: From<Transaction>>(
84 &self,
85 transaction_ids: &[TransactionId],
86 ) -> Result<HashMap<TransactionId, T>, TransactionError> {
87 self.repo.find_all(transaction_ids).await
88 }
89
90 #[cfg(feature = "import")]
91 pub async fn sync_transaction_creation(
92 &self,
93 mut db: es_entity::DbOp<'_>,
94 origin: DataSourceId,
95 values: TransactionValues,
96 ) -> Result<(), TransactionError> {
97 let mut transaction = Transaction::import(origin, values);
98 self.repo
99 .import_in_op(&mut db, origin, &mut transaction)
100 .await?;
101 let recorded_at = db.now();
102 let outbox_events: Vec<_> = transaction
103 .events
104 .last_persisted(1)
105 .map(|p| OutboxEventPayload::from(&p.event))
106 .collect();
107 self.outbox
108 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
109 .await?;
110 Ok(())
111 }
112}
113
114impl From<&TransactionEvent> for OutboxEventPayload {
115 fn from(event: &TransactionEvent) -> Self {
116 match event {
117 #[cfg(feature = "import")]
118 TransactionEvent::Imported {
119 source,
120 values: transaction,
121 } => OutboxEventPayload::TransactionCreated {
122 source: *source,
123 transaction: transaction.clone(),
124 },
125 TransactionEvent::Initialized {
126 values: transaction,
127 } => OutboxEventPayload::TransactionCreated {
128 source: DataSource::Local,
129 transaction: transaction.clone(),
130 },
131 }
132 }
133}