cala_ledger/transaction/
mod.rs1pub mod error;
2
3mod entity;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::primitives::{EntryId, TxTemplateId};
15use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
16
17pub use entity::*;
18use error::*;
19pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
20use repo::*;
21
22#[derive(Clone)]
23pub struct Transactions {
24 repo: TransactionRepo,
25 #[allow(dead_code)]
27 outbox: Outbox,
28 _pool: PgPool,
29}
30
31impl Transactions {
32 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
33 Self {
34 repo: TransactionRepo::new(pool),
35 outbox,
36 _pool: pool.clone(),
37 }
38 }
39
40 pub(crate) async fn create_in_op(
41 &self,
42 db: &mut LedgerOperation<'_>,
43 new_transaction: NewTransaction,
44 ) -> Result<Transaction, TransactionError> {
45 let transaction = self.repo.create_in_op(db, new_transaction).await?;
46 db.accumulate(transaction.last_persisted(1).map(|p| &p.event));
47 Ok(transaction)
48 }
49
50 pub async fn create_voided_tx_in_op(
51 &self,
52 db: &mut LedgerOperation<'_>,
53 voiding_tx_id: TransactionId,
54 existing_tx_id: TransactionId,
55 entry_ids: impl IntoIterator<Item = EntryId>,
56 ) -> Result<Transaction, TransactionError> {
57 let mut existing_tx = self.repo.find_by_id_in_op(&mut *db, existing_tx_id).await?;
58
59 let new_tx = existing_tx.void(voiding_tx_id, entry_ids.into_iter().collect(), db.now())?;
60
61 self.repo.update_in_op(db, &mut existing_tx).await?;
62 let voided_tx = self.repo.create_in_op(db, new_tx).await?;
63
64 db.accumulate(
65 existing_tx
66 .last_persisted(1)
67 .map(|p| &p.event)
68 .chain(voided_tx.last_persisted(1).map(|p| &p.event)),
69 );
70
71 Ok(voided_tx)
72 }
73
74 #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self))]
75 pub async fn find_by_external_id(
76 &self,
77 external_id: String,
78 ) -> Result<Transaction, TransactionError> {
79 self.repo.find_by_external_id(Some(external_id)).await
80 }
81
82 #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self))]
83 pub async fn find_by_id(
84 &self,
85 transaction_id: TransactionId,
86 ) -> Result<Transaction, TransactionError> {
87 self.repo.find_by_id(transaction_id).await
88 }
89
90 #[instrument(name = "cala_ledger.transactions.list_for_template_id", skip(self))]
91 pub async fn list_for_template_id(
92 &self,
93 template_id: TxTemplateId,
94 query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
95 direction: es_entity::ListDirection,
96 ) -> Result<
97 es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
98 TransactionError,
99 > {
100 self.repo
101 .list_for_tx_template_id_by_created_at(template_id, query, direction)
102 .await
103 }
104
105 #[instrument(name = "cala_ledger.transactions.find_all", skip(self))]
106 pub async fn find_all<T: From<Transaction>>(
107 &self,
108 transaction_ids: &[TransactionId],
109 ) -> Result<HashMap<TransactionId, T>, TransactionError> {
110 self.repo.find_all(transaction_ids).await
111 }
112
113 #[cfg(feature = "import")]
114 pub async fn sync_transaction_creation(
115 &self,
116 mut db: es_entity::DbOpWithTime<'_>,
117 origin: DataSourceId,
118 values: TransactionValues,
119 ) -> Result<(), TransactionError> {
120 let mut transaction = Transaction::import(origin, values);
121 self.repo
122 .import_in_op(&mut db, origin, &mut transaction)
123 .await?;
124 let outbox_events: Vec<_> = transaction
125 .last_persisted(1)
126 .map(|p| OutboxEventPayload::from(&p.event))
127 .collect();
128 let time = db.now();
129 self.outbox
130 .persist_events_at(db, outbox_events, time)
131 .await?;
132 Ok(())
133 }
134
135 #[cfg(feature = "import")]
136 pub async fn sync_transaction_update(
137 &self,
138 mut db: es_entity::DbOpWithTime<'_>,
139 origin: DataSourceId,
140 values: TransactionValues,
141 ) -> Result<(), TransactionError> {
142 let mut transaction = Transaction::import(origin, values);
143 self.repo
144 .import_in_op(&mut db, origin, &mut transaction)
145 .await?;
146 let outbox_events: Vec<_> = transaction
147 .last_persisted(1)
148 .map(|p| OutboxEventPayload::from(&p.event))
149 .collect();
150 let time = db.now();
151 self.outbox
152 .persist_events_at(db, outbox_events, time)
153 .await?;
154 Ok(())
155 }
156}
157
158impl From<&TransactionEvent> for OutboxEventPayload {
159 fn from(event: &TransactionEvent) -> Self {
160 match event {
161 #[cfg(feature = "import")]
162 TransactionEvent::Imported {
163 source,
164 values: transaction,
165 } => OutboxEventPayload::TransactionCreated {
166 source: *source,
167 transaction: transaction.clone(),
168 },
169 TransactionEvent::Initialized {
170 values: transaction,
171 } => OutboxEventPayload::TransactionCreated {
172 source: DataSource::Local,
173 transaction: transaction.clone(),
174 },
175 TransactionEvent::Updated { values, fields } => {
176 OutboxEventPayload::TransactionUpdated {
177 source: DataSource::Local,
178 transaction: values.clone(),
179 fields: fields.clone(),
180 }
181 }
182 }
183 }
184}