prax_postgres/
connection.rs

1//! PostgreSQL connection wrapper.
2
3use std::sync::Arc;
4
5use deadpool_postgres::Object;
6use tokio_postgres::Row;
7use tracing::debug;
8
9use crate::error::PgResult;
10use crate::statement::PreparedStatementCache;
11
12/// A wrapper around a PostgreSQL connection with statement caching.
13pub struct PgConnection {
14    client: Object,
15    statement_cache: Arc<PreparedStatementCache>,
16}
17
18impl PgConnection {
19    /// Create a new connection wrapper.
20    pub(crate) fn new(client: Object, statement_cache: Arc<PreparedStatementCache>) -> Self {
21        Self {
22            client,
23            statement_cache,
24        }
25    }
26
27    /// Execute a query and return all rows.
28    pub async fn query(
29        &self,
30        sql: &str,
31        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
32    ) -> PgResult<Vec<Row>> {
33        debug!(sql = %sql, "Executing query");
34
35        // Try to get a cached prepared statement
36        let stmt = self
37            .statement_cache
38            .get_or_prepare(&self.client, sql)
39            .await?;
40
41        let rows = self.client.query(&stmt, params).await?;
42        Ok(rows)
43    }
44
45    /// Execute a query and return exactly one row.
46    pub async fn query_one(
47        &self,
48        sql: &str,
49        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
50    ) -> PgResult<Row> {
51        debug!(sql = %sql, "Executing query_one");
52
53        let stmt = self
54            .statement_cache
55            .get_or_prepare(&self.client, sql)
56            .await?;
57
58        let row = self.client.query_one(&stmt, params).await?;
59        Ok(row)
60    }
61
62    /// Execute a query and return zero or one row.
63    pub async fn query_opt(
64        &self,
65        sql: &str,
66        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
67    ) -> PgResult<Option<Row>> {
68        debug!(sql = %sql, "Executing query_opt");
69
70        let stmt = self
71            .statement_cache
72            .get_or_prepare(&self.client, sql)
73            .await?;
74
75        let row = self.client.query_opt(&stmt, params).await?;
76        Ok(row)
77    }
78
79    /// Execute a statement and return the number of affected rows.
80    pub async fn execute(
81        &self,
82        sql: &str,
83        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
84    ) -> PgResult<u64> {
85        debug!(sql = %sql, "Executing statement");
86
87        let stmt = self
88            .statement_cache
89            .get_or_prepare(&self.client, sql)
90            .await?;
91
92        let count = self.client.execute(&stmt, params).await?;
93        Ok(count)
94    }
95
96    /// Execute a batch of statements in a single round-trip.
97    pub async fn batch_execute(&self, sql: &str) -> PgResult<()> {
98        debug!(sql = %sql, "Executing batch");
99        self.client.batch_execute(sql).await?;
100        Ok(())
101    }
102
103    /// Begin a transaction.
104    pub async fn transaction(&mut self) -> PgResult<PgTransaction<'_>> {
105        debug!("Beginning transaction");
106        let txn = self.client.transaction().await?;
107        Ok(PgTransaction {
108            txn,
109            statement_cache: self.statement_cache.clone(),
110        })
111    }
112
113    /// Get the underlying tokio-postgres client.
114    ///
115    /// This is useful for advanced operations not covered by this wrapper.
116    pub fn inner(&self) -> &Object {
117        &self.client
118    }
119
120    /// Execute a query using the prepared statement cache.
121    ///
122    /// This is an alias for `query` that makes it explicit that statement caching
123    /// is being used. All query methods already use prepared statement caching,
124    /// but this method name makes it more explicit for benchmark comparisons.
125    #[inline]
126    pub async fn query_cached(
127        &self,
128        sql: &str,
129        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
130    ) -> PgResult<Vec<Row>> {
131        self.query(sql, params).await
132    }
133
134    /// Execute a raw query without using the prepared statement cache.
135    ///
136    /// This is useful for one-off queries where the overhead of preparing
137    /// a statement isn't worth it.
138    pub async fn query_raw(
139        &self,
140        sql: &str,
141        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
142    ) -> PgResult<Vec<Row>> {
143        debug!(sql = %sql, "Executing raw query (no statement cache)");
144        let rows = self.client.query(sql, params).await?;
145        Ok(rows)
146    }
147
148    /// Execute a raw query and return zero or one row without using statement cache.
149    pub async fn query_opt_raw(
150        &self,
151        sql: &str,
152        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
153    ) -> PgResult<Option<Row>> {
154        debug!(sql = %sql, "Executing raw query_opt (no statement cache)");
155        let row = self.client.query_opt(sql, params).await?;
156        Ok(row)
157    }
158}
159
160/// A PostgreSQL transaction.
161pub struct PgTransaction<'a> {
162    txn: deadpool_postgres::Transaction<'a>,
163    statement_cache: Arc<PreparedStatementCache>,
164}
165
166impl<'a> PgTransaction<'a> {
167    /// Execute a query and return all rows.
168    pub async fn query(
169        &self,
170        sql: &str,
171        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
172    ) -> PgResult<Vec<Row>> {
173        debug!(sql = %sql, "Executing query in transaction");
174
175        let stmt = self
176            .statement_cache
177            .get_or_prepare_in_txn(&self.txn, sql)
178            .await?;
179
180        let rows = self.txn.query(&stmt, params).await?;
181        Ok(rows)
182    }
183
184    /// Execute a query and return exactly one row.
185    pub async fn query_one(
186        &self,
187        sql: &str,
188        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
189    ) -> PgResult<Row> {
190        let stmt = self
191            .statement_cache
192            .get_or_prepare_in_txn(&self.txn, sql)
193            .await?;
194
195        let row = self.txn.query_one(&stmt, params).await?;
196        Ok(row)
197    }
198
199    /// Execute a query and return zero or one row.
200    pub async fn query_opt(
201        &self,
202        sql: &str,
203        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
204    ) -> PgResult<Option<Row>> {
205        let stmt = self
206            .statement_cache
207            .get_or_prepare_in_txn(&self.txn, sql)
208            .await?;
209
210        let row = self.txn.query_opt(&stmt, params).await?;
211        Ok(row)
212    }
213
214    /// Execute a statement and return the number of affected rows.
215    pub async fn execute(
216        &self,
217        sql: &str,
218        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
219    ) -> PgResult<u64> {
220        let stmt = self
221            .statement_cache
222            .get_or_prepare_in_txn(&self.txn, sql)
223            .await?;
224
225        let count = self.txn.execute(&stmt, params).await?;
226        Ok(count)
227    }
228
229    /// Create a savepoint.
230    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
231        debug!(name = %name, "Creating savepoint");
232        self.txn
233            .batch_execute(&format!("SAVEPOINT {}", name))
234            .await?;
235        Ok(())
236    }
237
238    /// Rollback to a savepoint.
239    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
240        debug!(name = %name, "Rolling back to savepoint");
241        self.txn
242            .batch_execute(&format!("ROLLBACK TO SAVEPOINT {}", name))
243            .await?;
244        Ok(())
245    }
246
247    /// Release a savepoint.
248    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
249        debug!(name = %name, "Releasing savepoint");
250        self.txn
251            .batch_execute(&format!("RELEASE SAVEPOINT {}", name))
252            .await?;
253        Ok(())
254    }
255
256    /// Commit the transaction.
257    pub async fn commit(self) -> PgResult<()> {
258        debug!("Committing transaction");
259        self.txn.commit().await?;
260        Ok(())
261    }
262
263    /// Rollback the transaction.
264    pub async fn rollback(self) -> PgResult<()> {
265        debug!("Rolling back transaction");
266        self.txn.rollback().await?;
267        Ok(())
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    // Integration tests would require a real PostgreSQL connection
274    // Unit tests for connection wrapper are limited without mocking
275}