Skip to main content

systemprompt_database/services/postgres/
transaction.rs

1//! Concrete [`DatabaseTransaction`] for `PostgreSQL`.
2//!
3//! Part of the documented sqlx allowlist — the SQL strings here come from
4//! runtime-supplied [`QuerySelector`] values.
5
6use crate::error::{DatabaseResult, RepositoryError};
7use async_trait::async_trait;
8
9use super::conversion::{bind_params, row_to_json};
10use crate::models::{DatabaseTransaction, JsonRow, QuerySelector, ToDbValue};
11
12pub struct PostgresTransaction {
13    tx: Option<sqlx::Transaction<'static, sqlx::Postgres>>,
14}
15
16impl std::fmt::Debug for PostgresTransaction {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        f.debug_struct("PostgresTransaction")
19            .field("tx", &self.tx.is_some())
20            .finish()
21    }
22}
23
24impl PostgresTransaction {
25    #[must_use]
26    pub const fn new(tx: sqlx::Transaction<'static, sqlx::Postgres>) -> Self {
27        Self { tx: Some(tx) }
28    }
29}
30
31#[async_trait]
32impl DatabaseTransaction for PostgresTransaction {
33    async fn execute(
34        &mut self,
35        query: &dyn QuerySelector,
36        params: &[&dyn ToDbValue],
37    ) -> DatabaseResult<u64> {
38        let sql = query.select_query();
39        let tx = self
40            .tx
41            .as_mut()
42            .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
43
44        let query_obj = sqlx::query(sql);
45        let query_obj = bind_params(query_obj, params);
46
47        let result = query_obj.execute(&mut **tx).await?;
48
49        Ok(result.rows_affected())
50    }
51
52    async fn fetch_all(
53        &mut self,
54        query: &dyn QuerySelector,
55        params: &[&dyn ToDbValue],
56    ) -> DatabaseResult<Vec<JsonRow>> {
57        let sql = query.select_query();
58        let tx = self
59            .tx
60            .as_mut()
61            .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
62
63        let query_obj = sqlx::query(sql);
64        let query_obj = bind_params(query_obj, params);
65
66        let rows = query_obj.fetch_all(&mut **tx).await?;
67
68        Ok(rows.iter().map(row_to_json).collect())
69    }
70
71    async fn fetch_one(
72        &mut self,
73        query: &dyn QuerySelector,
74        params: &[&dyn ToDbValue],
75    ) -> DatabaseResult<JsonRow> {
76        let sql = query.select_query();
77        let tx = self
78            .tx
79            .as_mut()
80            .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
81
82        let query_obj = sqlx::query(sql);
83        let query_obj = bind_params(query_obj, params);
84
85        let row = query_obj.fetch_one(&mut **tx).await?;
86
87        Ok(row_to_json(&row))
88    }
89
90    async fn fetch_optional(
91        &mut self,
92        query: &dyn QuerySelector,
93        params: &[&dyn ToDbValue],
94    ) -> DatabaseResult<Option<JsonRow>> {
95        let sql = query.select_query();
96        let tx = self
97            .tx
98            .as_mut()
99            .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
100
101        let query_obj = sqlx::query(sql);
102        let query_obj = bind_params(query_obj, params);
103
104        let row = query_obj.fetch_optional(&mut **tx).await?;
105
106        Ok(row.map(|r| row_to_json(&r)))
107    }
108
109    async fn commit(mut self: Box<Self>) -> DatabaseResult<()> {
110        let tx = self
111            .tx
112            .take()
113            .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
114
115        tx.commit().await?;
116
117        Ok(())
118    }
119
120    async fn rollback(mut self: Box<Self>) -> DatabaseResult<()> {
121        let tx = self
122            .tx
123            .take()
124            .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
125
126        tx.rollback().await?;
127
128        Ok(())
129    }
130}