1pub mod error;
2
3mod entity;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::primitives::{EntryId, TxTemplateId};
15use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
16
17pub use entity::*;
18use error::*;
19pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
20use repo::*;
21
22#[derive(Clone)]
23pub struct Transactions {
24 repo: TransactionRepo,
25 #[allow(dead_code)]
27 outbox: Outbox,
28 _pool: PgPool,
29}
30
31impl Transactions {
32 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
33 Self {
34 repo: TransactionRepo::new(pool),
35 outbox,
36 _pool: pool.clone(),
37 }
38 }
39
40 pub(crate) async fn create_in_op(
41 &self,
42 db: &mut LedgerOperation<'_>,
43 new_transaction: NewTransaction,
44 ) -> Result<Transaction, TransactionError> {
45 let transaction = self.repo.create_in_op(db, new_transaction).await?;
46 db.accumulate(transaction.last_persisted(1).map(|p| &p.event));
47 Ok(transaction)
48 }
49
50 pub async fn create_voided_tx_in_op(
51 &self,
52 db: &mut LedgerOperation<'_>,
53 voiding_tx_id: TransactionId,
54 existing_tx_id: TransactionId,
55 entry_ids: impl IntoIterator<Item = EntryId>,
56 ) -> Result<Transaction, TransactionError> {
57 let mut existing_tx = self.repo.find_by_id_in_op(&mut *db, existing_tx_id).await?;
58
59 let new_tx = existing_tx.void(voiding_tx_id, entry_ids.into_iter().collect(), db.now())?;
60
61 self.repo.update_in_op(db, &mut existing_tx).await?;
62 let voided_tx = self.repo.create_in_op(db, new_tx).await?;
63
64 db.accumulate(
65 existing_tx
66 .last_persisted(1)
67 .map(|p| &p.event)
68 .chain(voided_tx.last_persisted(1).map(|p| &p.event)),
69 );
70
71 Ok(voided_tx)
72 }
73
74 #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self), err)]
75 pub async fn find_by_external_id(
76 &self,
77 external_id: String,
78 ) -> Result<Transaction, TransactionError> {
79 self.repo.find_by_external_id(Some(external_id)).await
80 }
81
82 #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self), err)]
83 pub async fn find_by_id(
84 &self,
85 transaction_id: TransactionId,
86 ) -> Result<Transaction, TransactionError> {
87 self.repo.find_by_id(transaction_id).await
88 }
89
90 #[instrument(
91 name = "cala_ledger.transactions.list_for_template_id",
92 skip(self),
93 err
94 )]
95 pub async fn list_for_template_id(
96 &self,
97 template_id: TxTemplateId,
98 query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
99 direction: es_entity::ListDirection,
100 ) -> Result<
101 es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
102 TransactionError,
103 > {
104 self.repo
105 .list_for_tx_template_id_by_created_at(template_id, query, direction)
106 .await
107 }
108
109 #[instrument(name = "cala_ledger.transactions.find_all", skip(self), err)]
110 pub async fn find_all<T: From<Transaction>>(
111 &self,
112 transaction_ids: &[TransactionId],
113 ) -> Result<HashMap<TransactionId, T>, TransactionError> {
114 self.repo.find_all(transaction_ids).await
115 }
116
117 #[cfg(feature = "import")]
118 pub async fn sync_transaction_creation(
119 &self,
120 mut db: es_entity::DbOpWithTime<'_>,
121 origin: DataSourceId,
122 values: TransactionValues,
123 ) -> Result<(), TransactionError> {
124 let mut transaction = Transaction::import(origin, values);
125 self.repo
126 .import_in_op(&mut db, origin, &mut transaction)
127 .await?;
128 let outbox_events: Vec<_> = transaction
129 .last_persisted(1)
130 .map(|p| OutboxEventPayload::from(&p.event))
131 .collect();
132 let time = db.now();
133 self.outbox
134 .persist_events_at(db, outbox_events, time)
135 .await?;
136 Ok(())
137 }
138
139 #[cfg(feature = "import")]
140 pub async fn sync_transaction_update(
141 &self,
142 mut db: es_entity::DbOpWithTime<'_>,
143 origin: DataSourceId,
144 values: TransactionValues,
145 ) -> Result<(), TransactionError> {
146 let mut transaction = Transaction::import(origin, values);
147 self.repo
148 .import_in_op(&mut db, origin, &mut transaction)
149 .await?;
150 let outbox_events: Vec<_> = transaction
151 .last_persisted(1)
152 .map(|p| OutboxEventPayload::from(&p.event))
153 .collect();
154 let time = db.now();
155 self.outbox
156 .persist_events_at(db, outbox_events, time)
157 .await?;
158 Ok(())
159 }
160}
161
162impl From<&TransactionEvent> for OutboxEventPayload {
163 fn from(event: &TransactionEvent) -> Self {
164 match event {
165 #[cfg(feature = "import")]
166 TransactionEvent::Imported {
167 source,
168 values: transaction,
169 } => OutboxEventPayload::TransactionCreated {
170 source: *source,
171 transaction: transaction.clone(),
172 },
173 TransactionEvent::Initialized {
174 values: transaction,
175 } => OutboxEventPayload::TransactionCreated {
176 source: DataSource::Local,
177 transaction: transaction.clone(),
178 },
179 TransactionEvent::Updated { values, fields } => {
180 OutboxEventPayload::TransactionUpdated {
181 source: DataSource::Local,
182 transaction: values.clone(),
183 fields: fields.clone(),
184 }
185 }
186 }
187 }
188}