qail_pg/driver/pool/connection.rs
1//! Pooled connection wrapper: struct, accessors, RLS cleanup, transaction control,
2//! COPY export, pipeline, LISTEN/NOTIFY delegation, and Drop.
3
4use super::churn::{decrement_active_count_saturating, pool_churn_record_destroy};
5use super::lifecycle::{PgPoolInner, execute_simple_with_timeout};
6use crate::driver::{PgConnection, PgError, PgResult};
7use std::sync::Arc;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11/// A pooled connection with creation timestamp for idle tracking.
12pub(super) struct PooledConn {
13 pub(super) conn: PgConnection,
14 pub(super) created_at: Instant,
15 pub(super) last_used: Instant,
16}
17
18/// A pooled connection handle.
19///
20/// Use [`PooledConnection::release`] for deterministic reset+return behavior.
21/// If dropped without `release()`, the pool performs best-effort bounded async
22/// cleanup; on any uncertainty it destroys the connection (fail-closed).
23pub struct PooledConnection {
24 pub(super) conn: Option<PgConnection>,
25 pub(super) pool: Arc<PgPoolInner>,
26 pub(super) rls_dirty: bool,
27 pub(super) created_at: Instant,
28}
29
30impl PooledConnection {
31 /// Get a reference to the underlying connection, returning an error
32 /// if the connection has already been released.
33 pub(super) fn conn_ref(&self) -> PgResult<&PgConnection> {
34 self.conn
35 .as_ref()
36 .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
37 }
38
39 /// Get a mutable reference to the underlying connection, returning an error
40 /// if the connection has already been released.
41 pub(super) fn conn_mut(&mut self) -> PgResult<&mut PgConnection> {
42 self.conn
43 .as_mut()
44 .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
45 }
46
47 /// Get a shared reference to the underlying connection.
48 ///
49 /// Returns an error if the connection has already been released.
50 pub fn get(&self) -> PgResult<&PgConnection> {
51 self.conn_ref()
52 }
53
54 /// Get a mutable reference to the underlying connection.
55 ///
56 /// Returns an error if the connection has already been released.
57 pub fn get_mut(&mut self) -> PgResult<&mut PgConnection> {
58 self.conn_mut()
59 }
60
61 /// Get a token to cancel the currently running query.
62 pub fn cancel_token(&self) -> PgResult<crate::driver::CancelToken> {
63 let conn = self.conn_ref()?;
64 let (process_id, secret_key) = conn.get_cancel_key();
65 Ok(crate::driver::CancelToken {
66 host: self.pool.config.host.clone(),
67 port: self.pool.config.port,
68 process_id,
69 secret_key,
70 })
71 }
72
73 /// Deterministic connection cleanup and pool return.
74 ///
75 /// This is the **correct** way to return a connection to the pool.
76 /// COMMITs the transaction (which auto-resets transaction-local RLS
77 /// session variables) and returns the connection to the pool with
78 /// prepared statement caches intact.
79 ///
80 /// If cleanup fails, the connection is destroyed (not returned to pool).
81 ///
82 /// # Usage
83 /// ```ignore
84 /// let mut conn = pool.acquire_with_rls(ctx).await?;
85 /// let result = conn.fetch_all_cached(&cmd).await;
86 /// conn.release().await; // COMMIT + return to pool
87 /// result
88 /// ```
89 pub async fn release(mut self) {
90 if let Some(mut conn) = self.conn.take() {
91 if conn.is_io_desynced() {
92 tracing::warn!(
93 host = %self.pool.config.host,
94 port = self.pool.config.port,
95 user = %self.pool.config.user,
96 db = %self.pool.config.database,
97 "pool_release_desynced: dropping connection due to prior I/O/protocol desync"
98 );
99 decrement_active_count_saturating(&self.pool.active_count);
100 self.pool.semaphore.add_permits(1);
101 pool_churn_record_destroy(&self.pool.config, "release_desynced");
102 return;
103 }
104 // COMMIT the transaction opened by acquire_with_rls.
105 // Transaction-local set_config values auto-reset on COMMIT,
106 // so no explicit RLS cleanup is needed.
107 // Prepared statements survive — they are NOT transaction-scoped.
108 let reset_timeout = self.pool.config.connect_timeout;
109 if let Err(e) = execute_simple_with_timeout(
110 &mut conn,
111 crate::driver::rls::reset_sql(),
112 reset_timeout,
113 "pool release reset/COMMIT",
114 )
115 .await
116 {
117 tracing::error!(
118 host = %self.pool.config.host,
119 port = self.pool.config.port,
120 user = %self.pool.config.user,
121 db = %self.pool.config.database,
122 timeout_ms = reset_timeout.as_millis() as u64,
123 error = %e,
124 "pool_release_failed: reset/COMMIT failed; dropping connection to prevent state leak"
125 );
126 decrement_active_count_saturating(&self.pool.active_count);
127 self.pool.semaphore.add_permits(1);
128 pool_churn_record_destroy(&self.pool.config, "release_reset_failed");
129 return; // Connection destroyed — not returned to pool
130 }
131
132 self.pool.return_connection(conn, self.created_at).await;
133 }
134 }
135
136 // ==================== TRANSACTION CONTROL ====================
137
138 /// Begin an explicit transaction on this pooled connection.
139 ///
140 /// Use this when you need multi-statement atomicity beyond the
141 /// implicit transaction created by `acquire_with_rls()`.
142 ///
143 /// # Example
144 /// ```ignore
145 /// let mut conn = pool.acquire_with_rls(ctx).await?;
146 /// conn.begin().await?;
147 /// conn.execute(&insert1).await?;
148 /// conn.execute(&insert2).await?;
149 /// conn.commit().await?;
150 /// conn.release().await;
151 /// ```
152 pub async fn begin(&mut self) -> PgResult<()> {
153 self.conn_mut()?.begin_transaction().await
154 }
155
156 /// Commit the current transaction.
157 /// Makes all changes since `begin()` permanent.
158 pub async fn commit(&mut self) -> PgResult<()> {
159 self.conn_mut()?.commit().await
160 }
161
162 /// Rollback the current transaction.
163 /// Discards all changes since `begin()`.
164 pub async fn rollback(&mut self) -> PgResult<()> {
165 self.conn_mut()?.rollback().await
166 }
167
168 /// Create a named savepoint within the current transaction.
169 /// Use `rollback_to()` to return to this savepoint.
170 pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
171 self.conn_mut()?.savepoint(name).await
172 }
173
174 /// Rollback to a previously created savepoint.
175 /// Discards changes since the savepoint, but keeps the transaction open.
176 pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
177 self.conn_mut()?.rollback_to(name).await
178 }
179
180 /// Release a savepoint (free resources).
181 /// After release, the savepoint cannot be rolled back to.
182 pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
183 self.conn_mut()?.release_savepoint(name).await
184 }
185
186 /// Execute multiple QAIL commands in a single PG pipeline round-trip.
187 ///
188 /// Sends all queries as Parse+Bind+Execute in one write, receives all
189 /// responses in one read. Returns raw column data per query per row.
190 ///
191 /// This is the fastest path for batch operations — amortizes TCP
192 /// overhead across N queries into a single syscall pair.
193 pub async fn pipeline_ast(
194 &mut self,
195 cmds: &[qail_core::ast::Qail],
196 ) -> PgResult<Vec<Vec<Vec<Option<Vec<u8>>>>>> {
197 let conn = self.conn_mut()?;
198 conn.pipeline_ast(cmds).await
199 }
200
201 /// Run `EXPLAIN (FORMAT JSON)` on a Qail command and return cost estimates.
202 ///
203 /// Uses `simple_query` under the hood — no additional round-trips beyond
204 /// the single EXPLAIN statement. Returns `None` if parsing fails or
205 /// the EXPLAIN output is unexpected.
206 pub async fn explain_estimate(
207 &mut self,
208 cmd: &qail_core::ast::Qail,
209 ) -> PgResult<Option<crate::driver::explain::ExplainEstimate>> {
210 use qail_core::transpiler::ToSql;
211
212 let sql = cmd.to_sql();
213 let explain_sql = format!("EXPLAIN (FORMAT JSON) {}", sql);
214
215 let rows = self.conn_mut()?.simple_query(&explain_sql).await?;
216
217 // PostgreSQL returns the JSON plan as a single text column across one or more rows
218 let mut json_output = String::new();
219 for row in &rows {
220 if let Some(Some(val)) = row.columns.first()
221 && let Ok(text) = std::str::from_utf8(val)
222 {
223 json_output.push_str(text);
224 }
225 }
226
227 Ok(crate::driver::explain::parse_explain_json(&json_output))
228 }
229
230 // ─── LISTEN / NOTIFY delegation ─────────────────────────────────
231
232 /// Subscribe to a PostgreSQL notification channel.
233 ///
234 /// Delegates to [`PgConnection::listen`].
235 pub async fn listen(&mut self, channel: &str) -> PgResult<()> {
236 self.conn_mut()?.listen(channel).await
237 }
238
239 /// Unsubscribe from a PostgreSQL notification channel.
240 ///
241 /// Delegates to [`PgConnection::unlisten`].
242 pub async fn unlisten(&mut self, channel: &str) -> PgResult<()> {
243 self.conn_mut()?.unlisten(channel).await
244 }
245
246 /// Unsubscribe from all notification channels.
247 ///
248 /// Delegates to [`PgConnection::unlisten_all`].
249 pub async fn unlisten_all(&mut self) -> PgResult<()> {
250 self.conn_mut()?.unlisten_all().await
251 }
252
253 /// Wait for the next notification, blocking until one arrives.
254 ///
255 /// Delegates to [`PgConnection::recv_notification`].
256 /// Useful for dedicated LISTEN connections in background tasks.
257 pub async fn recv_notification(
258 &mut self,
259 ) -> PgResult<crate::driver::notification::Notification> {
260 self.conn_mut()?.recv_notification().await
261 }
262}
263
264impl Drop for PooledConnection {
265 fn drop(&mut self) {
266 if let Some(mut conn) = self.conn.take() {
267 // Safety net: connection was NOT released via `release()`.
268 // Best-effort strategy:
269 // 1) If connection is already desynced, destroy immediately.
270 // 2) Else, queue bounded async reset+return cleanup.
271 // 3) If cleanup queue/runtime unavailable, destroy.
272 //
273 // This preserves security (fail-closed) while reducing churn under
274 // accidental early-returns in handler code.
275 tracing::warn!(
276 host = %self.pool.config.host,
277 port = self.pool.config.port,
278 user = %self.pool.config.user,
279 db = %self.pool.config.database,
280 rls_dirty = self.rls_dirty,
281 "pool_connection_leaked: dropped without release()"
282 );
283 if conn.is_io_desynced() {
284 tracing::warn!(
285 host = %self.pool.config.host,
286 port = self.pool.config.port,
287 user = %self.pool.config.user,
288 db = %self.pool.config.database,
289 "pool_connection_leaked_desynced: destroying immediately"
290 );
291 decrement_active_count_saturating(&self.pool.active_count);
292 self.pool.semaphore.add_permits(1);
293 pool_churn_record_destroy(&self.pool.config, "dropped_without_release_desynced");
294 return;
295 }
296
297 let mut inflight = self.pool.leaked_cleanup_inflight.load(Ordering::Relaxed);
298 let max_inflight = self.pool.config.leaked_cleanup_queue;
299 loop {
300 if inflight >= max_inflight {
301 tracing::warn!(
302 host = %self.pool.config.host,
303 port = self.pool.config.port,
304 user = %self.pool.config.user,
305 db = %self.pool.config.database,
306 max_inflight,
307 "pool_connection_leaked_cleanup_queue_full: destroying connection"
308 );
309 decrement_active_count_saturating(&self.pool.active_count);
310 self.pool.semaphore.add_permits(1);
311 pool_churn_record_destroy(
312 &self.pool.config,
313 "dropped_without_release_cleanup_queue_full",
314 );
315 return;
316 }
317
318 match self.pool.leaked_cleanup_inflight.compare_exchange_weak(
319 inflight,
320 inflight + 1,
321 Ordering::AcqRel,
322 Ordering::Relaxed,
323 ) {
324 Ok(_) => break,
325 Err(actual) => inflight = actual,
326 }
327 }
328
329 let pool = self.pool.clone();
330 let created_at = self.created_at;
331 let reset_timeout = pool.config.connect_timeout;
332 match tokio::runtime::Handle::try_current() {
333 Ok(handle) => {
334 handle.spawn(async move {
335 let cleanup_ok = execute_simple_with_timeout(
336 &mut conn,
337 crate::driver::rls::reset_sql(),
338 reset_timeout,
339 "pool leaked cleanup reset/COMMIT",
340 )
341 .await
342 .is_ok();
343
344 if cleanup_ok && !conn.is_io_desynced() {
345 pool.return_connection(conn, created_at).await;
346 } else {
347 tracing::warn!(
348 host = %pool.config.host,
349 port = pool.config.port,
350 user = %pool.config.user,
351 db = %pool.config.database,
352 timeout_ms = reset_timeout.as_millis() as u64,
353 "pool_connection_leaked_cleanup_failed: destroying connection"
354 );
355 decrement_active_count_saturating(&pool.active_count);
356 pool.semaphore.add_permits(1);
357 pool_churn_record_destroy(
358 &pool.config,
359 "dropped_without_release_cleanup_failed",
360 );
361 }
362
363 pool.leaked_cleanup_inflight.fetch_sub(1, Ordering::AcqRel);
364 });
365 }
366 Err(_) => {
367 pool.leaked_cleanup_inflight.fetch_sub(1, Ordering::AcqRel);
368 tracing::warn!(
369 host = %pool.config.host,
370 port = pool.config.port,
371 user = %pool.config.user,
372 db = %pool.config.database,
373 "pool_connection_leaked_no_runtime: destroying connection"
374 );
375 decrement_active_count_saturating(&pool.active_count);
376 pool.semaphore.add_permits(1);
377 pool_churn_record_destroy(&pool.config, "dropped_without_release_no_runtime");
378 }
379 }
380 }
381 }
382}