1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use sqlx::{Pool, Postgres, Transaction as DbTransaction};
use tracing::instrument;
use uuid::Uuid;

use super::entity::*;
use crate::{error::*, primitives::*};

/// Repository for working with `TxTemplate` entities.
#[derive(Debug, Clone)]
pub struct Transactions {
    pool: Pool<Postgres>,
}

impl Transactions {
    pub fn new(pool: &Pool<Postgres>) -> Self {
        Self { pool: pool.clone() }
    }

    #[instrument(level = "trace", name = "sqlx_ledger.transactions.create_in_tx")]
    pub(crate) async fn create_in_tx(
        &self,
        tx: &mut DbTransaction<'_, Postgres>,
        tx_id: TransactionId,
        NewTransaction {
            journal_id,
            tx_template_id,
            effective,
            correlation_id,
            external_id,
            description,
            metadata,
        }: NewTransaction,
    ) -> Result<(JournalId, TransactionId), SqlxLedgerError> {
        let record = sqlx::query!(
            r#"INSERT INTO sqlx_ledger_transactions (id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata)
            VALUES ($1, 1, (SELECT id FROM sqlx_ledger_journals WHERE id = $2 LIMIT 1), (SELECT id FROM sqlx_ledger_tx_templates WHERE id = $3 LIMIT 1), $4, $5, $6, $7, $8)
            RETURNING id, version, created_at"#,
            tx_id as TransactionId,
            journal_id as JournalId,
            tx_template_id as TxTemplateId,
            effective,
            correlation_id.map(Uuid::from).unwrap_or(Uuid::from(tx_id)),
            external_id.unwrap_or_else(|| tx_id.to_string()),
            description,
            metadata
        )
        .fetch_one(&mut *tx)
        .await?;
        Ok((journal_id, TransactionId::from(record.id)))
    }

    pub async fn list_by_external_ids(
        &self,
        ids: Vec<String>,
    ) -> Result<Vec<Transaction>, SqlxLedgerError> {
        let records = sqlx::query!(
            r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
            FROM sqlx_ledger_transactions
            WHERE external_id = ANY($1)"#,
            &ids[..]
        )
        .fetch_all(&self.pool)
        .await?;
        Ok(records
            .into_iter()
            .map(|row| Transaction {
                id: TransactionId::from(row.id),
                version: row.version as u32,
                journal_id: JournalId::from(row.journal_id),
                tx_template_id: TxTemplateId::from(row.tx_template_id),
                effective: row.effective,
                correlation_id: CorrelationId::from(row.correlation_id),
                external_id: row.external_id,
                description: row.description,
                metadata_json: row.metadata,
                created_at: row.created_at,
                modified_at: row.modified_at,
            })
            .collect())
    }

    pub async fn list_by_ids(
        &self,
        ids: impl IntoIterator<Item = impl std::borrow::Borrow<TransactionId>>,
    ) -> Result<Vec<Transaction>, SqlxLedgerError> {
        let ids: Vec<_> = ids.into_iter().map(|id| Uuid::from(id.borrow())).collect();
        let records = sqlx::query!(
            r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
            FROM sqlx_ledger_transactions
            WHERE id = ANY($1)"#,
            &ids[..]
        )
        .fetch_all(&self.pool)
        .await?;
        Ok(records
            .into_iter()
            .map(|row| Transaction {
                id: TransactionId::from(row.id),
                version: row.version as u32,
                journal_id: JournalId::from(row.journal_id),
                tx_template_id: TxTemplateId::from(row.tx_template_id),
                effective: row.effective,
                correlation_id: CorrelationId::from(row.correlation_id),
                external_id: row.external_id,
                description: row.description,
                metadata_json: row.metadata,
                created_at: row.created_at,
                modified_at: row.modified_at,
            })
            .collect())
    }

    pub async fn list_by_template_id(
        &self,
        id: TxTemplateId,
    ) -> Result<Vec<Transaction>, SqlxLedgerError> {
        let records = sqlx::query!(
            r#"SELECT id, version, journal_id, tx_template_id, effective, correlation_id, external_id, description, metadata, created_at, modified_at
            FROM sqlx_ledger_transactions
            WHERE tx_template_id = $1"#,
            id as TxTemplateId
        )
        .fetch_all(&self.pool)
        .await?;
        Ok(records
            .into_iter()
            .map(|row| Transaction {
                id: TransactionId::from(row.id),
                version: row.version as u32,
                journal_id: JournalId::from(row.journal_id),
                tx_template_id: TxTemplateId::from(row.tx_template_id),
                effective: row.effective,
                correlation_id: CorrelationId::from(row.correlation_id),
                external_id: row.external_id,
                description: row.description,
                metadata_json: row.metadata,
                created_at: row.created_at,
                modified_at: row.modified_at,
            })
            .collect())
    }
}