systemprompt_database/services/postgres/
transaction.rs1use crate::error::{DatabaseResult, RepositoryError};
7use async_trait::async_trait;
8
9use super::conversion::{bind_params, row_to_json};
10use crate::models::{DatabaseTransaction, JsonRow, QuerySelector, ToDbValue};
11
12pub struct PostgresTransaction {
13 tx: Option<sqlx::Transaction<'static, sqlx::Postgres>>,
14}
15
16impl std::fmt::Debug for PostgresTransaction {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 f.debug_struct("PostgresTransaction")
19 .field("tx", &self.tx.is_some())
20 .finish()
21 }
22}
23
24impl PostgresTransaction {
25 #[must_use]
26 pub const fn new(tx: sqlx::Transaction<'static, sqlx::Postgres>) -> Self {
27 Self { tx: Some(tx) }
28 }
29}
30
31#[async_trait]
32impl DatabaseTransaction for PostgresTransaction {
33 async fn execute(
34 &mut self,
35 query: &dyn QuerySelector,
36 params: &[&dyn ToDbValue],
37 ) -> DatabaseResult<u64> {
38 let sql = query.select_query();
39 let tx = self
40 .tx
41 .as_mut()
42 .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
43
44 let query_obj = sqlx::query(sql);
45 let query_obj = bind_params(query_obj, params);
46
47 let result = query_obj.execute(&mut **tx).await?;
48
49 Ok(result.rows_affected())
50 }
51
52 async fn fetch_all(
53 &mut self,
54 query: &dyn QuerySelector,
55 params: &[&dyn ToDbValue],
56 ) -> DatabaseResult<Vec<JsonRow>> {
57 let sql = query.select_query();
58 let tx = self
59 .tx
60 .as_mut()
61 .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
62
63 let query_obj = sqlx::query(sql);
64 let query_obj = bind_params(query_obj, params);
65
66 let rows = query_obj.fetch_all(&mut **tx).await?;
67
68 Ok(rows.iter().map(row_to_json).collect())
69 }
70
71 async fn fetch_one(
72 &mut self,
73 query: &dyn QuerySelector,
74 params: &[&dyn ToDbValue],
75 ) -> DatabaseResult<JsonRow> {
76 let sql = query.select_query();
77 let tx = self
78 .tx
79 .as_mut()
80 .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
81
82 let query_obj = sqlx::query(sql);
83 let query_obj = bind_params(query_obj, params);
84
85 let row = query_obj.fetch_one(&mut **tx).await?;
86
87 Ok(row_to_json(&row))
88 }
89
90 async fn fetch_optional(
91 &mut self,
92 query: &dyn QuerySelector,
93 params: &[&dyn ToDbValue],
94 ) -> DatabaseResult<Option<JsonRow>> {
95 let sql = query.select_query();
96 let tx = self
97 .tx
98 .as_mut()
99 .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
100
101 let query_obj = sqlx::query(sql);
102 let query_obj = bind_params(query_obj, params);
103
104 let row = query_obj.fetch_optional(&mut **tx).await?;
105
106 Ok(row.map(|r| row_to_json(&r)))
107 }
108
109 async fn commit(mut self: Box<Self>) -> DatabaseResult<()> {
110 let tx = self
111 .tx
112 .take()
113 .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
114
115 tx.commit().await?;
116
117 Ok(())
118 }
119
120 async fn rollback(mut self: Box<Self>) -> DatabaseResult<()> {
121 let tx = self
122 .tx
123 .take()
124 .ok_or_else(|| RepositoryError::invalid_state("Transaction already consumed"))?;
125
126 tx.rollback().await?;
127
128 Ok(())
129 }
130}