Skip to main content

qail_pg/driver/pool/
connection.rs

1//! Pooled connection wrapper: struct, accessors, RLS cleanup, transaction control,
2//! COPY export, pipeline, LISTEN/NOTIFY delegation, and Drop.
3
4use super::churn::{decrement_active_count_saturating, pool_churn_record_destroy};
5use super::lifecycle::{PgPoolInner, execute_simple_with_timeout};
6use crate::driver::{PgConnection, PgError, PgResult};
7use std::sync::Arc;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11/// A pooled connection with creation timestamp for idle tracking.
12pub(super) struct PooledConn {
13    pub(super) conn: PgConnection,
14    pub(super) created_at: Instant,
15    pub(super) last_used: Instant,
16}
17
18/// A pooled connection handle.
19///
20/// Use [`PooledConnection::release`] for deterministic reset+return behavior.
21/// If dropped without `release()`, the pool performs best-effort bounded async
22/// cleanup; on any uncertainty it destroys the connection (fail-closed).
23pub struct PooledConnection {
24    pub(super) conn: Option<PgConnection>,
25    pub(super) pool: Arc<PgPoolInner>,
26    pub(super) rls_dirty: bool,
27    pub(super) created_at: Instant,
28}
29
30impl PooledConnection {
31    /// Get a reference to the underlying connection, returning an error
32    /// if the connection has already been released.
33    pub(super) fn conn_ref(&self) -> PgResult<&PgConnection> {
34        self.conn
35            .as_ref()
36            .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
37    }
38
39    /// Get a mutable reference to the underlying connection, returning an error
40    /// if the connection has already been released.
41    pub(super) fn conn_mut(&mut self) -> PgResult<&mut PgConnection> {
42        self.conn
43            .as_mut()
44            .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
45    }
46
47    /// Get a shared reference to the underlying connection.
48    ///
49    /// Returns an error if the connection has already been released.
50    pub fn get(&self) -> PgResult<&PgConnection> {
51        self.conn_ref()
52    }
53
54    /// Get a mutable reference to the underlying connection.
55    ///
56    /// Returns an error if the connection has already been released.
57    pub fn get_mut(&mut self) -> PgResult<&mut PgConnection> {
58        self.conn_mut()
59    }
60
61    /// Get a token to cancel the currently running query.
62    pub fn cancel_token(&self) -> PgResult<crate::driver::CancelToken> {
63        let conn = self.conn_ref()?;
64        let (process_id, secret_key_bytes) = conn.get_cancel_key_bytes();
65        Ok(crate::driver::CancelToken {
66            host: self.pool.config.host.clone(),
67            port: self.pool.config.port,
68            process_id,
69            secret_key_bytes: secret_key_bytes.to_vec(),
70        })
71    }
72
73    fn reject_outer_transaction_control_in_rls(&self, operation: &str) -> PgResult<()> {
74        if self.rls_dirty {
75            return Err(PgError::Connection(format!(
76                "{operation} is not allowed on an RLS-bound pooled connection; \
77                 use savepoint(), rollback_to(), and release_savepoint() for nested work, \
78                 then release() to close the pool-managed RLS transaction"
79            )));
80        }
81        Ok(())
82    }
83
84    async fn finish_with_reset(
85        mut self,
86        reset_sql: &'static str,
87        operation: &'static str,
88        failure_reason: &'static str,
89    ) -> PgResult<()> {
90        let Some(mut conn) = self.conn.take() else {
91            return Ok(());
92        };
93
94        if conn.is_io_desynced() {
95            tracing::warn!(
96                host = %self.pool.config.host,
97                port = self.pool.config.port,
98                user = %self.pool.config.user,
99                db = %self.pool.config.database,
100                "pool_release_desynced: dropping connection due to prior I/O/protocol desync"
101            );
102            decrement_active_count_saturating(&self.pool.active_count);
103            self.pool.semaphore.add_permits(1);
104            pool_churn_record_destroy(&self.pool.config, "release_desynced");
105            return Err(PgError::Connection(
106                "connection is protocol-desynced; dropped instead of returning to pool".into(),
107            ));
108        }
109
110        let reset_timeout = self.pool.config.connect_timeout;
111        if let Err(e) =
112            execute_simple_with_timeout(&mut conn, reset_sql, reset_timeout, operation).await
113        {
114            tracing::error!(
115                host = %self.pool.config.host,
116                port = self.pool.config.port,
117                user = %self.pool.config.user,
118                db = %self.pool.config.database,
119                timeout_ms = reset_timeout.as_millis() as u64,
120                error = %e,
121                "pool_release_failed: reset failed; dropping connection to prevent state leak"
122            );
123            decrement_active_count_saturating(&self.pool.active_count);
124            self.pool.semaphore.add_permits(1);
125            pool_churn_record_destroy(&self.pool.config, failure_reason);
126            return Err(e);
127        }
128
129        self.pool.return_connection(conn, self.created_at).await;
130        Ok(())
131    }
132
133    /// Deterministic connection cleanup and pool return.
134    ///
135    /// This is the **correct** way to return a connection to the pool.
136    /// COMMITs the transaction (which auto-resets transaction-local RLS
137    /// session variables) and returns the connection to the pool with
138    /// prepared statement caches intact.
139    ///
140    /// If cleanup fails, the connection is destroyed (not returned to pool).
141    ///
142    /// # Usage
143    /// ```ignore
144    /// let mut conn = pool.acquire_with_rls(ctx).await?;
145    /// let result = conn.fetch_all_cached(&cmd).await;
146    /// conn.release().await; // COMMIT + return to pool
147    /// result
148    /// ```
149    pub async fn release(self) {
150        let _ = self.release_checked().await;
151    }
152
153    /// Commit the pool-managed transaction and return the connection to the pool.
154    ///
155    /// This is the checked form of [`Self::release`]. It is useful for callers
156    /// that need to report commit/reset failures rather than only logging them.
157    pub async fn release_checked(self) -> PgResult<()> {
158        // COMMIT the transaction opened by acquire_with_rls.
159        // Transaction-local set_config values auto-reset on COMMIT,
160        // so no explicit RLS cleanup is needed.
161        // Prepared statements survive — they are NOT transaction-scoped.
162        self.finish_with_reset(
163            crate::driver::rls::reset_sql(),
164            "pool release reset/COMMIT",
165            "release_reset_failed",
166        )
167        .await
168    }
169
170    /// Roll back the pool-managed transaction and return the connection to the pool.
171    ///
172    /// Use this for abandoned RLS-bound work, expired transaction sessions, or
173    /// request-level savepoint failures that must fail closed.
174    pub async fn rollback_and_release(self) -> PgResult<()> {
175        self.finish_with_reset(
176            "ROLLBACK",
177            "pool release rollback/ROLLBACK",
178            "release_rollback_failed",
179        )
180        .await
181    }
182
183    // ==================== TRANSACTION CONTROL ====================
184
185    /// Begin an explicit transaction on this pooled connection.
186    ///
187    /// Use this only on raw pooled connections. Connections acquired with
188    /// `acquire_with_rls()` already run inside the pool-managed RLS
189    /// transaction; use savepoints there instead.
190    ///
191    /// # Example
192    /// ```ignore
193    /// let mut conn = pool.acquire_raw().await?;
194    /// conn.begin().await?;
195    /// conn.execute(&insert1).await?;
196    /// conn.execute(&insert2).await?;
197    /// conn.commit().await?;
198    /// conn.release().await;
199    /// ```
200    pub async fn begin(&mut self) -> PgResult<()> {
201        self.reject_outer_transaction_control_in_rls("BEGIN")?;
202        self.conn_mut()?.begin_transaction().await
203    }
204
205    /// Commit the current transaction.
206    /// Makes all changes since `begin()` permanent.
207    pub async fn commit(&mut self) -> PgResult<()> {
208        self.reject_outer_transaction_control_in_rls("COMMIT")?;
209        self.conn_mut()?.commit().await
210    }
211
212    /// Rollback the current transaction.
213    /// Discards all changes since `begin()`.
214    pub async fn rollback(&mut self) -> PgResult<()> {
215        self.reject_outer_transaction_control_in_rls("ROLLBACK")?;
216        self.conn_mut()?.rollback().await
217    }
218
219    /// Create a named savepoint within the current transaction.
220    /// Use `rollback_to()` to return to this savepoint.
221    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
222        self.conn_mut()?.savepoint(name).await
223    }
224
225    /// Rollback to a previously created savepoint.
226    /// Discards changes since the savepoint, but keeps the transaction open.
227    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
228        self.conn_mut()?.rollback_to(name).await
229    }
230
231    /// Release a savepoint (free resources).
232    /// After release, the savepoint cannot be rolled back to.
233    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
234        self.conn_mut()?.release_savepoint(name).await
235    }
236
237    /// Execute multiple QAIL commands in a single PG pipeline round-trip.
238    ///
239    /// Sends all queries as Parse+Bind+Execute in one write, receives all
240    /// responses in one read. Returns raw column data per query per row.
241    ///
242    /// This is the fastest path for batch operations — amortizes TCP
243    /// overhead across N queries into a single syscall pair.
244    pub async fn pipeline_execute_rows_ast(
245        &mut self,
246        cmds: &[qail_core::ast::Qail],
247    ) -> PgResult<Vec<Vec<Vec<Option<Vec<u8>>>>>> {
248        let conn = self.conn_mut()?;
249        conn.pipeline_execute_rows_ast(cmds).await
250    }
251
252    /// Run `EXPLAIN (FORMAT JSON)` on a Qail command and return cost estimates.
253    ///
254    /// Uses `simple_query` under the hood — no additional round-trips beyond
255    /// the single EXPLAIN statement. Returns `None` if parsing fails or
256    /// the EXPLAIN output is unexpected.
257    pub async fn explain_estimate(
258        &mut self,
259        cmd: &qail_core::ast::Qail,
260    ) -> PgResult<Option<crate::driver::explain::ExplainEstimate>> {
261        use qail_core::transpiler::ToSql;
262
263        let sql = cmd.to_sql();
264        let explain_sql = format!("EXPLAIN (FORMAT JSON) {}", sql);
265
266        let rows = self.conn_mut()?.simple_query(&explain_sql).await?;
267
268        // PostgreSQL returns the JSON plan as a single text column across one or more rows
269        let mut json_output = String::new();
270        for row in &rows {
271            if let Some(Some(val)) = row.columns.first()
272                && let Ok(text) = std::str::from_utf8(val)
273            {
274                json_output.push_str(text);
275            }
276        }
277
278        Ok(crate::driver::explain::parse_explain_json(&json_output))
279    }
280
281    // ─── LISTEN / NOTIFY delegation ─────────────────────────────────
282
283    /// Subscribe to a PostgreSQL notification channel.
284    ///
285    /// Delegates to [`PgConnection::listen`].
286    pub async fn listen(&mut self, channel: &str) -> PgResult<()> {
287        self.conn_mut()?.listen(channel).await
288    }
289
290    /// Unsubscribe from a PostgreSQL notification channel.
291    ///
292    /// Delegates to [`PgConnection::unlisten`].
293    pub async fn unlisten(&mut self, channel: &str) -> PgResult<()> {
294        self.conn_mut()?.unlisten(channel).await
295    }
296
297    /// Unsubscribe from all notification channels.
298    ///
299    /// Delegates to [`PgConnection::unlisten_all`].
300    pub async fn unlisten_all(&mut self) -> PgResult<()> {
301        self.conn_mut()?.unlisten_all().await
302    }
303
304    /// Wait for the next notification, blocking until one arrives.
305    ///
306    /// Delegates to [`PgConnection::recv_notification`].
307    /// Useful for dedicated LISTEN connections in background tasks.
308    pub async fn recv_notification(
309        &mut self,
310    ) -> PgResult<crate::driver::notification::Notification> {
311        self.conn_mut()?.recv_notification().await
312    }
313}
314
315impl Drop for PooledConnection {
316    fn drop(&mut self) {
317        if let Some(mut conn) = self.conn.take() {
318            // Safety net: connection was NOT released via `release()`.
319            // Best-effort strategy:
320            // 1) If connection is already desynced, destroy immediately.
321            // 2) Else, queue bounded async rollback+return cleanup.
322            // 3) If cleanup queue/runtime unavailable, destroy.
323            //
324            // This preserves security (fail-closed) while reducing churn under
325            // accidental early-returns in handler code.
326            tracing::warn!(
327                host = %self.pool.config.host,
328                port = self.pool.config.port,
329                user = %self.pool.config.user,
330                db = %self.pool.config.database,
331                rls_dirty = self.rls_dirty,
332                "pool_connection_leaked: dropped without release()"
333            );
334            if conn.is_io_desynced() {
335                tracing::warn!(
336                    host = %self.pool.config.host,
337                    port = self.pool.config.port,
338                    user = %self.pool.config.user,
339                    db = %self.pool.config.database,
340                    "pool_connection_leaked_desynced: destroying immediately"
341                );
342                decrement_active_count_saturating(&self.pool.active_count);
343                self.pool.semaphore.add_permits(1);
344                pool_churn_record_destroy(&self.pool.config, "dropped_without_release_desynced");
345                return;
346            }
347
348            let mut inflight = self.pool.leaked_cleanup_inflight.load(Ordering::Relaxed);
349            let max_inflight = self.pool.config.leaked_cleanup_queue;
350            loop {
351                if inflight >= max_inflight {
352                    tracing::warn!(
353                        host = %self.pool.config.host,
354                        port = self.pool.config.port,
355                        user = %self.pool.config.user,
356                        db = %self.pool.config.database,
357                        max_inflight,
358                        "pool_connection_leaked_cleanup_queue_full: destroying connection"
359                    );
360                    decrement_active_count_saturating(&self.pool.active_count);
361                    self.pool.semaphore.add_permits(1);
362                    pool_churn_record_destroy(
363                        &self.pool.config,
364                        "dropped_without_release_cleanup_queue_full",
365                    );
366                    return;
367                }
368
369                match self.pool.leaked_cleanup_inflight.compare_exchange_weak(
370                    inflight,
371                    inflight + 1,
372                    Ordering::AcqRel,
373                    Ordering::Relaxed,
374                ) {
375                    Ok(_) => break,
376                    Err(actual) => inflight = actual,
377                }
378            }
379
380            let pool = std::sync::Arc::clone(&self.pool);
381            let created_at = self.created_at;
382            let reset_timeout = pool.config.connect_timeout;
383            match tokio::runtime::Handle::try_current() {
384                Ok(handle) => {
385                    handle.spawn(async move {
386                        let cleanup_ok = execute_simple_with_timeout(
387                            &mut conn,
388                            "ROLLBACK",
389                            reset_timeout,
390                            "pool leaked cleanup ROLLBACK",
391                        )
392                        .await
393                        .is_ok();
394
395                        if cleanup_ok && !conn.is_io_desynced() {
396                            pool.return_connection(conn, created_at).await;
397                        } else {
398                            tracing::warn!(
399                                host = %pool.config.host,
400                                port = pool.config.port,
401                                user = %pool.config.user,
402                                db = %pool.config.database,
403                                timeout_ms = reset_timeout.as_millis() as u64,
404                                "pool_connection_leaked_cleanup_failed: destroying connection"
405                            );
406                            decrement_active_count_saturating(&pool.active_count);
407                            pool.semaphore.add_permits(1);
408                            pool_churn_record_destroy(
409                                &pool.config,
410                                "dropped_without_release_cleanup_failed",
411                            );
412                        }
413
414                        pool.leaked_cleanup_inflight.fetch_sub(1, Ordering::AcqRel);
415                    });
416                }
417                Err(_) => {
418                    pool.leaked_cleanup_inflight.fetch_sub(1, Ordering::AcqRel);
419                    tracing::warn!(
420                        host = %pool.config.host,
421                        port = pool.config.port,
422                        user = %pool.config.user,
423                        db = %pool.config.database,
424                        "pool_connection_leaked_no_runtime: destroying connection"
425                    );
426                    decrement_active_count_saturating(&pool.active_count);
427                    pool.semaphore.add_permits(1);
428                    pool_churn_record_destroy(&pool.config, "dropped_without_release_no_runtime");
429                }
430            }
431        }
432    }
433}