sqlx_ledger/transaction/
repo.rs

1use sqlx::{Pool, Postgres, Transaction as DbTransaction};
2use tracing::instrument;
3use uuid::Uuid;
4
5use super::entity::*;
6use crate::{error::*, primitives::*};
7
8/// Repository for working with `TxTemplate` entities.
9#[derive(Debug, Clone)]
10pub struct Transactions {
11    pool: Pool<Postgres>,
12}
13
14impl Transactions {
15    pub fn new(pool: &Pool<Postgres>) -> Self {
16        Self { pool: pool.clone() }
17    }
18
19    #[instrument(level = "trace", name = "sqlx_ledger.transactions.create_in_tx")]
20    pub(crate) async fn create_in_tx(
21        &self,
22        tx: &mut DbTransaction<'_, Postgres>,
23        tx_id: TransactionId,
24        NewTransaction {
25            journal_id,
26            tx_template_id,
27            effective,
28            correlation_id,
29            external_id,
30            description,
31            metadata,
32        }: NewTransaction,
33    ) -> Result<(JournalId, TransactionId), SqlxLedgerError> {
34        let record = sqlx::query!(
35            r#"INSERT INTO sqlx_ledger_transactions (id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata)
36            VALUES ($1, 1, (SELECT id FROM sqlx_ledger_journals WHERE id = $2 LIMIT 1), (SELECT id FROM sqlx_ledger_tx_templates WHERE id = $3 LIMIT 1), $4, $5, $6, $7, $8)
37            RETURNING id, version, created_at"#,
38            tx_id as TransactionId,
39            journal_id as JournalId,
40            tx_template_id as TxTemplateId,
41            effective,
42            correlation_id.map(Uuid::from).unwrap_or(Uuid::from(tx_id)),
43            external_id.unwrap_or_else(|| tx_id.to_string()),
44            description,
45            metadata
46        )
47        .fetch_one(&mut **tx)
48        .await?;
49        Ok((journal_id, TransactionId::from(record.id)))
50    }
51
52    pub async fn list_by_external_ids(
53        &self,
54        ids: Vec<String>,
55    ) -> Result<Vec<Transaction>, SqlxLedgerError> {
56        let records = sqlx::query!(
57            r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
58            FROM sqlx_ledger_transactions
59            WHERE external_id = ANY($1)"#,
60            &ids[..]
61        )
62        .fetch_all(&self.pool)
63        .await?;
64        Ok(records
65            .into_iter()
66            .map(|row| Transaction {
67                id: TransactionId::from(row.id),
68                version: row.version as u32,
69                journal_id: JournalId::from(row.journal_id),
70                tx_template_id: TxTemplateId::from(row.tx_template_id),
71                effective: row.effective,
72                correlation_id: CorrelationId::from(row.correlation_id),
73                external_id: row.external_id,
74                description: row.description,
75                metadata_json: row.metadata,
76                created_at: row.created_at,
77                modified_at: row.modified_at,
78            })
79            .collect())
80    }
81
82    pub async fn list_by_ids(
83        &self,
84        ids: impl IntoIterator<Item = impl std::borrow::Borrow<TransactionId>>,
85    ) -> Result<Vec<Transaction>, SqlxLedgerError> {
86        let ids: Vec<_> = ids.into_iter().map(|id| Uuid::from(id.borrow())).collect();
87        let records = sqlx::query!(
88            r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
89            FROM sqlx_ledger_transactions
90            WHERE id = ANY($1)"#,
91            &ids[..]
92        )
93        .fetch_all(&self.pool)
94        .await?;
95        Ok(records
96            .into_iter()
97            .map(|row| Transaction {
98                id: TransactionId::from(row.id),
99                version: row.version as u32,
100                journal_id: JournalId::from(row.journal_id),
101                tx_template_id: TxTemplateId::from(row.tx_template_id),
102                effective: row.effective,
103                correlation_id: CorrelationId::from(row.correlation_id),
104                external_id: row.external_id,
105                description: row.description,
106                metadata_json: row.metadata,
107                created_at: row.created_at,
108                modified_at: row.modified_at,
109            })
110            .collect())
111    }
112
113    pub async fn list_by_template_id(
114        &self,
115        id: TxTemplateId,
116    ) -> Result<Vec<Transaction>, SqlxLedgerError> {
117        let records = sqlx::query!(
118            r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
119            FROM sqlx_ledger_transactions
120            WHERE tx_template_id = $1"#,
121            id as TxTemplateId
122        )
123        .fetch_all(&self.pool)
124        .await?;
125        Ok(records
126            .into_iter()
127            .map(|row| Transaction {
128                id: TransactionId::from(row.id),
129                version: row.version as u32,
130                journal_id: JournalId::from(row.journal_id),
131                tx_template_id: TxTemplateId::from(row.tx_template_id),
132                effective: row.effective,
133                correlation_id: CorrelationId::from(row.correlation_id),
134                external_id: row.external_id,
135                description: row.description,
136                metadata_json: row.metadata,
137                created_at: row.created_at,
138                modified_at: row.modified_at,
139            })
140            .collect())
141    }
142}