1use sqlx::{Pool, Postgres, Transaction as DbTransaction};
2use tracing::instrument;
3use uuid::Uuid;
4
5use super::entity::*;
6use crate::{error::*, primitives::*};
7
8#[derive(Debug, Clone)]
10pub struct Transactions {
11 pool: Pool<Postgres>,
12}
13
14impl Transactions {
15 pub fn new(pool: &Pool<Postgres>) -> Self {
16 Self { pool: pool.clone() }
17 }
18
19 #[instrument(level = "trace", name = "sqlx_ledger.transactions.create_in_tx")]
20 pub(crate) async fn create_in_tx(
21 &self,
22 tx: &mut DbTransaction<'_, Postgres>,
23 tx_id: TransactionId,
24 NewTransaction {
25 journal_id,
26 tx_template_id,
27 effective,
28 correlation_id,
29 external_id,
30 description,
31 metadata,
32 }: NewTransaction,
33 ) -> Result<(JournalId, TransactionId), SqlxLedgerError> {
34 let record = sqlx::query!(
35 r#"INSERT INTO sqlx_ledger_transactions (id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata)
36 VALUES ($1, 1, (SELECT id FROM sqlx_ledger_journals WHERE id = $2 LIMIT 1), (SELECT id FROM sqlx_ledger_tx_templates WHERE id = $3 LIMIT 1), $4, $5, $6, $7, $8)
37 RETURNING id, version, created_at"#,
38 tx_id as TransactionId,
39 journal_id as JournalId,
40 tx_template_id as TxTemplateId,
41 effective,
42 correlation_id.map(Uuid::from).unwrap_or(Uuid::from(tx_id)),
43 external_id.unwrap_or_else(|| tx_id.to_string()),
44 description,
45 metadata
46 )
47 .fetch_one(&mut **tx)
48 .await?;
49 Ok((journal_id, TransactionId::from(record.id)))
50 }
51
52 pub async fn list_by_external_ids(
53 &self,
54 ids: Vec<String>,
55 ) -> Result<Vec<Transaction>, SqlxLedgerError> {
56 let records = sqlx::query!(
57 r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
58 FROM sqlx_ledger_transactions
59 WHERE external_id = ANY($1)"#,
60 &ids[..]
61 )
62 .fetch_all(&self.pool)
63 .await?;
64 Ok(records
65 .into_iter()
66 .map(|row| Transaction {
67 id: TransactionId::from(row.id),
68 version: row.version as u32,
69 journal_id: JournalId::from(row.journal_id),
70 tx_template_id: TxTemplateId::from(row.tx_template_id),
71 effective: row.effective,
72 correlation_id: CorrelationId::from(row.correlation_id),
73 external_id: row.external_id,
74 description: row.description,
75 metadata_json: row.metadata,
76 created_at: row.created_at,
77 modified_at: row.modified_at,
78 })
79 .collect())
80 }
81
82 pub async fn list_by_ids(
83 &self,
84 ids: impl IntoIterator<Item = impl std::borrow::Borrow<TransactionId>>,
85 ) -> Result<Vec<Transaction>, SqlxLedgerError> {
86 let ids: Vec<_> = ids.into_iter().map(|id| Uuid::from(id.borrow())).collect();
87 let records = sqlx::query!(
88 r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
89 FROM sqlx_ledger_transactions
90 WHERE id = ANY($1)"#,
91 &ids[..]
92 )
93 .fetch_all(&self.pool)
94 .await?;
95 Ok(records
96 .into_iter()
97 .map(|row| Transaction {
98 id: TransactionId::from(row.id),
99 version: row.version as u32,
100 journal_id: JournalId::from(row.journal_id),
101 tx_template_id: TxTemplateId::from(row.tx_template_id),
102 effective: row.effective,
103 correlation_id: CorrelationId::from(row.correlation_id),
104 external_id: row.external_id,
105 description: row.description,
106 metadata_json: row.metadata,
107 created_at: row.created_at,
108 modified_at: row.modified_at,
109 })
110 .collect())
111 }
112
113 pub async fn list_by_template_id(
114 &self,
115 id: TxTemplateId,
116 ) -> Result<Vec<Transaction>, SqlxLedgerError> {
117 let records = sqlx::query!(
118 r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
119 FROM sqlx_ledger_transactions
120 WHERE tx_template_id = $1"#,
121 id as TxTemplateId
122 )
123 .fetch_all(&self.pool)
124 .await?;
125 Ok(records
126 .into_iter()
127 .map(|row| Transaction {
128 id: TransactionId::from(row.id),
129 version: row.version as u32,
130 journal_id: JournalId::from(row.journal_id),
131 tx_template_id: TxTemplateId::from(row.tx_template_id),
132 effective: row.effective,
133 correlation_id: CorrelationId::from(row.correlation_id),
134 external_id: row.external_id,
135 description: row.description,
136 metadata_json: row.metadata,
137 created_at: row.created_at,
138 modified_at: row.modified_at,
139 })
140 .collect())
141 }
142}