cala_ledger/transaction/
mod.rs1pub mod error;
2
3mod entity;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::primitives::{EntryId, TxTemplateId};
14use crate::{outbox::*, primitives::DataSource};
15
16pub use entity::*;
17use error::*;
18pub use repo::transaction_cursor::TransactionsByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Transactions {
23 repo: TransactionRepo,
24}
25
26impl Transactions {
27 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
28 Self {
29 repo: TransactionRepo::new(pool, publisher),
30 }
31 }
32
33 #[instrument(name = "cala_ledger.transactions.create_in_op", skip_all)]
34 pub(crate) async fn create_in_op(
35 &self,
36 db: &mut impl es_entity::AtomicOperation,
37 new_transaction: NewTransaction,
38 ) -> Result<Transaction, TransactionError> {
39 let transaction = self.repo.create_in_op(db, new_transaction).await?;
40 Ok(transaction)
41 }
42
43 #[instrument(name = "cala_ledger.transactions.create_voided_tx_in_op", skip_all)]
44 pub async fn create_voided_tx_in_op(
45 &self,
46 db: &mut impl es_entity::AtomicOperationWithTime,
47 voiding_tx_id: TransactionId,
48 existing_tx_id: TransactionId,
49 entry_ids: impl IntoIterator<Item = EntryId>,
50 ) -> Result<Transaction, TransactionError> {
51 let mut existing_tx = self.repo.find_by_id_in_op(&mut *db, existing_tx_id).await?;
52
53 let new_tx = existing_tx.void(voiding_tx_id, entry_ids.into_iter().collect(), db.now())?;
54
55 self.repo.update_in_op(db, &mut existing_tx).await?;
56 let voided_tx = self.repo.create_in_op(db, new_tx).await?;
57
58 Ok(voided_tx)
59 }
60
61 #[instrument(name = "cala_ledger.transactions.find_by_external_id", skip(self))]
62 pub async fn find_by_external_id(
63 &self,
64 external_id: String,
65 ) -> Result<Transaction, TransactionError> {
66 self.repo.find_by_external_id(Some(external_id)).await
67 }
68
69 #[instrument(name = "cala_ledger.transactions.find_by_id", skip(self))]
70 pub async fn find_by_id(
71 &self,
72 transaction_id: TransactionId,
73 ) -> Result<Transaction, TransactionError> {
74 self.repo.find_by_id(transaction_id).await
75 }
76
77 #[instrument(name = "cala_ledger.transactions.list_for_template_id", skip(self))]
78 pub async fn list_for_template_id(
79 &self,
80 template_id: TxTemplateId,
81 query: es_entity::PaginatedQueryArgs<TransactionsByCreatedAtCursor>,
82 direction: es_entity::ListDirection,
83 ) -> Result<
84 es_entity::PaginatedQueryRet<Transaction, TransactionsByCreatedAtCursor>,
85 TransactionError,
86 > {
87 self.repo
88 .list_for_tx_template_id_by_created_at(template_id, query, direction)
89 .await
90 }
91
92 #[instrument(name = "cala_ledger.transactions.find_all", skip(self))]
93 pub async fn find_all<T: From<Transaction>>(
94 &self,
95 transaction_ids: &[TransactionId],
96 ) -> Result<HashMap<TransactionId, T>, TransactionError> {
97 self.repo.find_all(transaction_ids).await
98 }
99
100 #[cfg(feature = "import")]
101 #[instrument(name = "cala_ledger.transactions.sync_transaction_creation", skip_all)]
102 pub async fn sync_transaction_creation(
103 &self,
104 mut db: es_entity::DbOpWithTime<'_>,
105 origin: DataSourceId,
106 values: TransactionValues,
107 ) -> Result<(), TransactionError> {
108 let mut transaction = Transaction::import(origin, values);
109 self.repo
110 .import_in_op(&mut db, origin, &mut transaction)
111 .await?;
112 db.commit().await?;
113 Ok(())
114 }
115
116 #[cfg(feature = "import")]
117 #[instrument(name = "cala_ledger.transactions.sync_transaction_update", skip_all)]
118 pub async fn sync_transaction_update(
119 &self,
120 mut db: es_entity::DbOpWithTime<'_>,
121 origin: DataSourceId,
122 values: TransactionValues,
123 ) -> Result<(), TransactionError> {
124 let mut transaction = Transaction::import(origin, values);
125 self.repo
126 .import_in_op(&mut db, origin, &mut transaction)
127 .await?;
128 db.commit().await?;
129 Ok(())
130 }
131}
132
133impl From<&TransactionEvent> for OutboxEventPayload {
134 fn from(event: &TransactionEvent) -> Self {
135 let source = es_entity::context::EventContext::current()
136 .data()
137 .lookup("data_source")
138 .ok()
139 .flatten()
140 .unwrap_or(DataSource::Local);
141
142 match event {
143 #[cfg(feature = "import")]
144 TransactionEvent::Imported {
145 source,
146 values: transaction,
147 } => OutboxEventPayload::TransactionCreated {
148 source: *source,
149 transaction: transaction.clone(),
150 },
151 TransactionEvent::Initialized {
152 values: transaction,
153 } => OutboxEventPayload::TransactionCreated {
154 source,
155 transaction: transaction.clone(),
156 },
157 TransactionEvent::Updated { values, fields } => {
158 OutboxEventPayload::TransactionUpdated {
159 source,
160 transaction: values.clone(),
161 fields: fields.clone(),
162 }
163 }
164 }
165 }
166}