prax_postgres/
connection.rs1use std::sync::Arc;
4
5use deadpool_postgres::Object;
6use tokio_postgres::Row;
7use tracing::debug;
8
9use crate::error::PgResult;
10use crate::statement::PreparedStatementCache;
11
12pub struct PgConnection {
14 client: Object,
15 statement_cache: Arc<PreparedStatementCache>,
16}
17
18impl PgConnection {
19 pub(crate) fn new(client: Object, statement_cache: Arc<PreparedStatementCache>) -> Self {
21 Self {
22 client,
23 statement_cache,
24 }
25 }
26
27 pub async fn query(
29 &self,
30 sql: &str,
31 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
32 ) -> PgResult<Vec<Row>> {
33 debug!(sql = %sql, "Executing query");
34
35 let stmt = self
37 .statement_cache
38 .get_or_prepare(&self.client, sql)
39 .await?;
40
41 let rows = self.client.query(&stmt, params).await?;
42 Ok(rows)
43 }
44
45 pub async fn query_one(
47 &self,
48 sql: &str,
49 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
50 ) -> PgResult<Row> {
51 debug!(sql = %sql, "Executing query_one");
52
53 let stmt = self
54 .statement_cache
55 .get_or_prepare(&self.client, sql)
56 .await?;
57
58 let row = self.client.query_one(&stmt, params).await?;
59 Ok(row)
60 }
61
62 pub async fn query_opt(
64 &self,
65 sql: &str,
66 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
67 ) -> PgResult<Option<Row>> {
68 debug!(sql = %sql, "Executing query_opt");
69
70 let stmt = self
71 .statement_cache
72 .get_or_prepare(&self.client, sql)
73 .await?;
74
75 let row = self.client.query_opt(&stmt, params).await?;
76 Ok(row)
77 }
78
79 pub async fn execute(
81 &self,
82 sql: &str,
83 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
84 ) -> PgResult<u64> {
85 debug!(sql = %sql, "Executing statement");
86
87 let stmt = self
88 .statement_cache
89 .get_or_prepare(&self.client, sql)
90 .await?;
91
92 let count = self.client.execute(&stmt, params).await?;
93 Ok(count)
94 }
95
96 pub async fn batch_execute(&self, sql: &str) -> PgResult<()> {
98 debug!(sql = %sql, "Executing batch");
99 self.client.batch_execute(sql).await?;
100 Ok(())
101 }
102
103 pub async fn transaction(&mut self) -> PgResult<PgTransaction<'_>> {
105 debug!("Beginning transaction");
106 let txn = self.client.transaction().await?;
107 Ok(PgTransaction {
108 txn,
109 statement_cache: self.statement_cache.clone(),
110 })
111 }
112
113 pub fn inner(&self) -> &Object {
117 &self.client
118 }
119
120 #[inline]
126 pub async fn query_cached(
127 &self,
128 sql: &str,
129 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
130 ) -> PgResult<Vec<Row>> {
131 self.query(sql, params).await
132 }
133
134 pub async fn query_raw(
139 &self,
140 sql: &str,
141 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
142 ) -> PgResult<Vec<Row>> {
143 debug!(sql = %sql, "Executing raw query (no statement cache)");
144 let rows = self.client.query(sql, params).await?;
145 Ok(rows)
146 }
147
148 pub async fn query_opt_raw(
150 &self,
151 sql: &str,
152 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
153 ) -> PgResult<Option<Row>> {
154 debug!(sql = %sql, "Executing raw query_opt (no statement cache)");
155 let row = self.client.query_opt(sql, params).await?;
156 Ok(row)
157 }
158}
159
160pub struct PgTransaction<'a> {
162 txn: deadpool_postgres::Transaction<'a>,
163 statement_cache: Arc<PreparedStatementCache>,
164}
165
166impl<'a> PgTransaction<'a> {
167 pub async fn query(
169 &self,
170 sql: &str,
171 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
172 ) -> PgResult<Vec<Row>> {
173 debug!(sql = %sql, "Executing query in transaction");
174
175 let stmt = self
176 .statement_cache
177 .get_or_prepare_in_txn(&self.txn, sql)
178 .await?;
179
180 let rows = self.txn.query(&stmt, params).await?;
181 Ok(rows)
182 }
183
184 pub async fn query_one(
186 &self,
187 sql: &str,
188 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
189 ) -> PgResult<Row> {
190 let stmt = self
191 .statement_cache
192 .get_or_prepare_in_txn(&self.txn, sql)
193 .await?;
194
195 let row = self.txn.query_one(&stmt, params).await?;
196 Ok(row)
197 }
198
199 pub async fn query_opt(
201 &self,
202 sql: &str,
203 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
204 ) -> PgResult<Option<Row>> {
205 let stmt = self
206 .statement_cache
207 .get_or_prepare_in_txn(&self.txn, sql)
208 .await?;
209
210 let row = self.txn.query_opt(&stmt, params).await?;
211 Ok(row)
212 }
213
214 pub async fn execute(
216 &self,
217 sql: &str,
218 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
219 ) -> PgResult<u64> {
220 let stmt = self
221 .statement_cache
222 .get_or_prepare_in_txn(&self.txn, sql)
223 .await?;
224
225 let count = self.txn.execute(&stmt, params).await?;
226 Ok(count)
227 }
228
229 pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
231 debug!(name = %name, "Creating savepoint");
232 self.txn
233 .batch_execute(&format!("SAVEPOINT {}", name))
234 .await?;
235 Ok(())
236 }
237
238 pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
240 debug!(name = %name, "Rolling back to savepoint");
241 self.txn
242 .batch_execute(&format!("ROLLBACK TO SAVEPOINT {}", name))
243 .await?;
244 Ok(())
245 }
246
247 pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
249 debug!(name = %name, "Releasing savepoint");
250 self.txn
251 .batch_execute(&format!("RELEASE SAVEPOINT {}", name))
252 .await?;
253 Ok(())
254 }
255
256 pub async fn commit(self) -> PgResult<()> {
258 debug!("Committing transaction");
259 self.txn.commit().await?;
260 Ok(())
261 }
262
263 pub async fn rollback(self) -> PgResult<()> {
265 debug!("Rolling back transaction");
266 self.txn.rollback().await?;
267 Ok(())
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 }