qail_pg/driver/pool/connection.rs
1//! Pooled connection wrapper: struct, accessors, RLS cleanup, transaction control,
2//! COPY export, pipeline, LISTEN/NOTIFY delegation, and Drop.
3
4use super::churn::{decrement_active_count_saturating, pool_churn_record_destroy};
5use super::lifecycle::{PgPoolInner, execute_simple_with_timeout};
6use crate::driver::{PgConnection, PgError, PgResult};
7use std::sync::Arc;
8use std::time::Instant;
9
10/// A pooled connection with creation timestamp for idle tracking.
11pub(super) struct PooledConn {
12 pub(super) conn: PgConnection,
13 pub(super) created_at: Instant,
14 pub(super) last_used: Instant,
15}
16
17/// A pooled connection that returns to the pool when dropped.
18///
19/// When `rls_dirty` is true (set by `acquire_with_rls`), the connection
20/// will automatically reset RLS session variables before returning to
21/// the pool. This prevents cross-tenant data leakage.
22pub struct PooledConnection {
23 pub(super) conn: Option<PgConnection>,
24 pub(super) pool: Arc<PgPoolInner>,
25 pub(super) rls_dirty: bool,
26 pub(super) created_at: Instant,
27}
28
29impl PooledConnection {
30 /// Get a reference to the underlying connection, returning an error
31 /// if the connection has already been released.
32 pub(super) fn conn_ref(&self) -> PgResult<&PgConnection> {
33 self.conn
34 .as_ref()
35 .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
36 }
37
38 /// Get a mutable reference to the underlying connection, returning an error
39 /// if the connection has already been released.
40 pub(super) fn conn_mut(&mut self) -> PgResult<&mut PgConnection> {
41 self.conn
42 .as_mut()
43 .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
44 }
45
46 /// Get a shared reference to the underlying connection.
47 ///
48 /// Returns an error if the connection has already been released.
49 pub fn get(&self) -> PgResult<&PgConnection> {
50 self.conn_ref()
51 }
52
53 /// Get a mutable reference to the underlying connection.
54 ///
55 /// Returns an error if the connection has already been released.
56 pub fn get_mut(&mut self) -> PgResult<&mut PgConnection> {
57 self.conn_mut()
58 }
59
60 /// Get a token to cancel the currently running query.
61 pub fn cancel_token(&self) -> PgResult<crate::driver::CancelToken> {
62 let conn = self.conn_ref()?;
63 let (process_id, secret_key) = conn.get_cancel_key();
64 Ok(crate::driver::CancelToken {
65 host: self.pool.config.host.clone(),
66 port: self.pool.config.port,
67 process_id,
68 secret_key,
69 })
70 }
71
72 /// Deterministic connection cleanup and pool return.
73 ///
74 /// This is the **correct** way to return a connection to the pool.
75 /// COMMITs the transaction (which auto-resets transaction-local RLS
76 /// session variables) and returns the connection to the pool with
77 /// prepared statement caches intact.
78 ///
79 /// If cleanup fails, the connection is destroyed (not returned to pool).
80 ///
81 /// # Usage
82 /// ```ignore
83 /// let mut conn = pool.acquire_with_rls(ctx).await?;
84 /// let result = conn.fetch_all_cached(&cmd).await;
85 /// conn.release().await; // COMMIT + return to pool
86 /// result
87 /// ```
88 pub async fn release(mut self) {
89 if let Some(mut conn) = self.conn.take() {
90 if conn.is_io_desynced() {
91 tracing::warn!(
92 host = %self.pool.config.host,
93 port = self.pool.config.port,
94 user = %self.pool.config.user,
95 db = %self.pool.config.database,
96 "pool_release_desynced: dropping connection due to prior I/O/protocol desync"
97 );
98 decrement_active_count_saturating(&self.pool.active_count);
99 self.pool.semaphore.add_permits(1);
100 pool_churn_record_destroy(&self.pool.config, "release_desynced");
101 return;
102 }
103 // COMMIT the transaction opened by acquire_with_rls.
104 // Transaction-local set_config values auto-reset on COMMIT,
105 // so no explicit RLS cleanup is needed.
106 // Prepared statements survive — they are NOT transaction-scoped.
107 let reset_timeout = self.pool.config.connect_timeout;
108 if let Err(e) = execute_simple_with_timeout(
109 &mut conn,
110 crate::driver::rls::reset_sql(),
111 reset_timeout,
112 "pool release reset/COMMIT",
113 )
114 .await
115 {
116 tracing::error!(
117 host = %self.pool.config.host,
118 port = self.pool.config.port,
119 user = %self.pool.config.user,
120 db = %self.pool.config.database,
121 timeout_ms = reset_timeout.as_millis() as u64,
122 error = %e,
123 "pool_release_failed: reset/COMMIT failed; dropping connection to prevent state leak"
124 );
125 decrement_active_count_saturating(&self.pool.active_count);
126 self.pool.semaphore.add_permits(1);
127 pool_churn_record_destroy(&self.pool.config, "release_reset_failed");
128 return; // Connection destroyed — not returned to pool
129 }
130
131 self.pool.return_connection(conn, self.created_at).await;
132 }
133 }
134
135 // ==================== TRANSACTION CONTROL ====================
136
137 /// Begin an explicit transaction on this pooled connection.
138 ///
139 /// Use this when you need multi-statement atomicity beyond the
140 /// implicit transaction created by `acquire_with_rls()`.
141 ///
142 /// # Example
143 /// ```ignore
144 /// let mut conn = pool.acquire_with_rls(ctx).await?;
145 /// conn.begin().await?;
146 /// conn.execute(&insert1).await?;
147 /// conn.execute(&insert2).await?;
148 /// conn.commit().await?;
149 /// conn.release().await;
150 /// ```
151 pub async fn begin(&mut self) -> PgResult<()> {
152 self.conn_mut()?.begin_transaction().await
153 }
154
155 /// Commit the current transaction.
156 /// Makes all changes since `begin()` permanent.
157 pub async fn commit(&mut self) -> PgResult<()> {
158 self.conn_mut()?.commit().await
159 }
160
161 /// Rollback the current transaction.
162 /// Discards all changes since `begin()`.
163 pub async fn rollback(&mut self) -> PgResult<()> {
164 self.conn_mut()?.rollback().await
165 }
166
167 /// Create a named savepoint within the current transaction.
168 /// Use `rollback_to()` to return to this savepoint.
169 pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
170 self.conn_mut()?.savepoint(name).await
171 }
172
173 /// Rollback to a previously created savepoint.
174 /// Discards changes since the savepoint, but keeps the transaction open.
175 pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
176 self.conn_mut()?.rollback_to(name).await
177 }
178
179 /// Release a savepoint (free resources).
180 /// After release, the savepoint cannot be rolled back to.
181 pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
182 self.conn_mut()?.release_savepoint(name).await
183 }
184
185 /// Execute multiple QAIL commands in a single PG pipeline round-trip.
186 ///
187 /// Sends all queries as Parse+Bind+Execute in one write, receives all
188 /// responses in one read. Returns raw column data per query per row.
189 ///
190 /// This is the fastest path for batch operations — amortizes TCP
191 /// overhead across N queries into a single syscall pair.
192 pub async fn pipeline_ast(
193 &mut self,
194 cmds: &[qail_core::ast::Qail],
195 ) -> PgResult<Vec<Vec<Vec<Option<Vec<u8>>>>>> {
196 let conn = self.conn_mut()?;
197 conn.pipeline_ast(cmds).await
198 }
199
200 /// Run `EXPLAIN (FORMAT JSON)` on a Qail command and return cost estimates.
201 ///
202 /// Uses `simple_query` under the hood — no additional round-trips beyond
203 /// the single EXPLAIN statement. Returns `None` if parsing fails or
204 /// the EXPLAIN output is unexpected.
205 pub async fn explain_estimate(
206 &mut self,
207 cmd: &qail_core::ast::Qail,
208 ) -> PgResult<Option<crate::driver::explain::ExplainEstimate>> {
209 use qail_core::transpiler::ToSql;
210
211 let sql = cmd.to_sql();
212 let explain_sql = format!("EXPLAIN (FORMAT JSON) {}", sql);
213
214 let rows = self.conn_mut()?.simple_query(&explain_sql).await?;
215
216 // PostgreSQL returns the JSON plan as a single text column across one or more rows
217 let mut json_output = String::new();
218 for row in &rows {
219 if let Some(Some(val)) = row.columns.first()
220 && let Ok(text) = std::str::from_utf8(val)
221 {
222 json_output.push_str(text);
223 }
224 }
225
226 Ok(crate::driver::explain::parse_explain_json(&json_output))
227 }
228
229 // ─── LISTEN / NOTIFY delegation ─────────────────────────────────
230
231 /// Subscribe to a PostgreSQL notification channel.
232 ///
233 /// Delegates to [`PgConnection::listen`].
234 pub async fn listen(&mut self, channel: &str) -> PgResult<()> {
235 self.conn_mut()?.listen(channel).await
236 }
237
238 /// Unsubscribe from a PostgreSQL notification channel.
239 ///
240 /// Delegates to [`PgConnection::unlisten`].
241 pub async fn unlisten(&mut self, channel: &str) -> PgResult<()> {
242 self.conn_mut()?.unlisten(channel).await
243 }
244
245 /// Unsubscribe from all notification channels.
246 ///
247 /// Delegates to [`PgConnection::unlisten_all`].
248 pub async fn unlisten_all(&mut self) -> PgResult<()> {
249 self.conn_mut()?.unlisten_all().await
250 }
251
252 /// Wait for the next notification, blocking until one arrives.
253 ///
254 /// Delegates to [`PgConnection::recv_notification`].
255 /// Useful for dedicated LISTEN connections in background tasks.
256 pub async fn recv_notification(
257 &mut self,
258 ) -> PgResult<crate::driver::notification::Notification> {
259 self.conn_mut()?.recv_notification().await
260 }
261}
262
263impl Drop for PooledConnection {
264 fn drop(&mut self) {
265 if self.conn.is_some() {
266 // Safety net: connection was NOT released via `release()`.
267 // This happens when:
268 // - Handler panicked
269 // - Early return without calling release()
270 // - Missed release() call (programming error)
271 //
272 // We DESTROY the connection (don't return to pool) to prevent
273 // dirty session state from being reused. But we MUST return the
274 // semaphore permit so the pool can create a replacement connection
275 // on the next acquire. Without this, leaked connections permanently
276 // reduce pool capacity until all slots are consumed.
277 //
278 // The `conn` field is dropped here, closing the TCP socket.
279 tracing::warn!(
280 host = %self.pool.config.host,
281 port = self.pool.config.port,
282 user = %self.pool.config.user,
283 db = %self.pool.config.database,
284 rls_dirty = self.rls_dirty,
285 "pool_connection_leaked: dropped without release(); connection destroyed to prevent state leak"
286 );
287 decrement_active_count_saturating(&self.pool.active_count);
288 // Return the semaphore permit so the pool slot can be reused.
289 // Without this, each leaked connection permanently reduces capacity.
290 self.pool.semaphore.add_permits(1);
291 pool_churn_record_destroy(&self.pool.config, "dropped_without_release");
292 }
293 }
294}