Skip to main content

qail_pg/driver/pool/
connection.rs

1//! Pooled connection wrapper: struct, accessors, RLS cleanup, transaction control,
2//! COPY export, pipeline, LISTEN/NOTIFY delegation, and Drop.
3
4use super::churn::{decrement_active_count_saturating, pool_churn_record_destroy};
5use super::lifecycle::{PgPoolInner, execute_simple_with_timeout};
6use crate::driver::{PgConnection, PgError, PgResult};
7use std::sync::Arc;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11/// A pooled connection with creation timestamp for idle tracking.
12pub(super) struct PooledConn {
13    pub(super) conn: PgConnection,
14    pub(super) created_at: Instant,
15    pub(super) last_used: Instant,
16}
17
18/// A pooled connection handle.
19///
20/// Use [`PooledConnection::release`] for deterministic reset+return behavior.
21/// If dropped without `release()`, the pool performs best-effort bounded async
22/// cleanup; on any uncertainty it destroys the connection (fail-closed).
23pub struct PooledConnection {
24    pub(super) conn: Option<PgConnection>,
25    pub(super) pool: Arc<PgPoolInner>,
26    pub(super) rls_dirty: bool,
27    pub(super) created_at: Instant,
28}
29
30impl PooledConnection {
31    /// Get a reference to the underlying connection, returning an error
32    /// if the connection has already been released.
33    pub(super) fn conn_ref(&self) -> PgResult<&PgConnection> {
34        self.conn
35            .as_ref()
36            .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
37    }
38
39    /// Get a mutable reference to the underlying connection, returning an error
40    /// if the connection has already been released.
41    pub(super) fn conn_mut(&mut self) -> PgResult<&mut PgConnection> {
42        self.conn
43            .as_mut()
44            .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
45    }
46
47    /// Get a shared reference to the underlying connection.
48    ///
49    /// Returns an error if the connection has already been released.
50    pub fn get(&self) -> PgResult<&PgConnection> {
51        self.conn_ref()
52    }
53
54    /// Get a mutable reference to the underlying connection.
55    ///
56    /// Returns an error if the connection has already been released.
57    pub fn get_mut(&mut self) -> PgResult<&mut PgConnection> {
58        self.conn_mut()
59    }
60
61    /// Get a token to cancel the currently running query.
62    pub fn cancel_token(&self) -> PgResult<crate::driver::CancelToken> {
63        let conn = self.conn_ref()?;
64        let (process_id, secret_key) = conn.get_cancel_key();
65        Ok(crate::driver::CancelToken {
66            host: self.pool.config.host.clone(),
67            port: self.pool.config.port,
68            process_id,
69            secret_key,
70        })
71    }
72
73    /// Deterministic connection cleanup and pool return.
74    ///
75    /// This is the **correct** way to return a connection to the pool.
76    /// COMMITs the transaction (which auto-resets transaction-local RLS
77    /// session variables) and returns the connection to the pool with
78    /// prepared statement caches intact.
79    ///
80    /// If cleanup fails, the connection is destroyed (not returned to pool).
81    ///
82    /// # Usage
83    /// ```ignore
84    /// let mut conn = pool.acquire_with_rls(ctx).await?;
85    /// let result = conn.fetch_all_cached(&cmd).await;
86    /// conn.release().await; // COMMIT + return to pool
87    /// result
88    /// ```
89    pub async fn release(mut self) {
90        if let Some(mut conn) = self.conn.take() {
91            if conn.is_io_desynced() {
92                tracing::warn!(
93                    host = %self.pool.config.host,
94                    port = self.pool.config.port,
95                    user = %self.pool.config.user,
96                    db = %self.pool.config.database,
97                    "pool_release_desynced: dropping connection due to prior I/O/protocol desync"
98                );
99                decrement_active_count_saturating(&self.pool.active_count);
100                self.pool.semaphore.add_permits(1);
101                pool_churn_record_destroy(&self.pool.config, "release_desynced");
102                return;
103            }
104            // COMMIT the transaction opened by acquire_with_rls.
105            // Transaction-local set_config values auto-reset on COMMIT,
106            // so no explicit RLS cleanup is needed.
107            // Prepared statements survive — they are NOT transaction-scoped.
108            let reset_timeout = self.pool.config.connect_timeout;
109            if let Err(e) = execute_simple_with_timeout(
110                &mut conn,
111                crate::driver::rls::reset_sql(),
112                reset_timeout,
113                "pool release reset/COMMIT",
114            )
115            .await
116            {
117                tracing::error!(
118                    host = %self.pool.config.host,
119                    port = self.pool.config.port,
120                    user = %self.pool.config.user,
121                    db = %self.pool.config.database,
122                    timeout_ms = reset_timeout.as_millis() as u64,
123                    error = %e,
124                    "pool_release_failed: reset/COMMIT failed; dropping connection to prevent state leak"
125                );
126                decrement_active_count_saturating(&self.pool.active_count);
127                self.pool.semaphore.add_permits(1);
128                pool_churn_record_destroy(&self.pool.config, "release_reset_failed");
129                return; // Connection destroyed — not returned to pool
130            }
131
132            self.pool.return_connection(conn, self.created_at).await;
133        }
134    }
135
136    // ==================== TRANSACTION CONTROL ====================
137
138    /// Begin an explicit transaction on this pooled connection.
139    ///
140    /// Use this when you need multi-statement atomicity beyond the
141    /// implicit transaction created by `acquire_with_rls()`.
142    ///
143    /// # Example
144    /// ```ignore
145    /// let mut conn = pool.acquire_with_rls(ctx).await?;
146    /// conn.begin().await?;
147    /// conn.execute(&insert1).await?;
148    /// conn.execute(&insert2).await?;
149    /// conn.commit().await?;
150    /// conn.release().await;
151    /// ```
152    pub async fn begin(&mut self) -> PgResult<()> {
153        self.conn_mut()?.begin_transaction().await
154    }
155
156    /// Commit the current transaction.
157    /// Makes all changes since `begin()` permanent.
158    pub async fn commit(&mut self) -> PgResult<()> {
159        self.conn_mut()?.commit().await
160    }
161
162    /// Rollback the current transaction.
163    /// Discards all changes since `begin()`.
164    pub async fn rollback(&mut self) -> PgResult<()> {
165        self.conn_mut()?.rollback().await
166    }
167
168    /// Create a named savepoint within the current transaction.
169    /// Use `rollback_to()` to return to this savepoint.
170    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
171        self.conn_mut()?.savepoint(name).await
172    }
173
174    /// Rollback to a previously created savepoint.
175    /// Discards changes since the savepoint, but keeps the transaction open.
176    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
177        self.conn_mut()?.rollback_to(name).await
178    }
179
180    /// Release a savepoint (free resources).
181    /// After release, the savepoint cannot be rolled back to.
182    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
183        self.conn_mut()?.release_savepoint(name).await
184    }
185
186    /// Execute multiple QAIL commands in a single PG pipeline round-trip.
187    ///
188    /// Sends all queries as Parse+Bind+Execute in one write, receives all
189    /// responses in one read. Returns raw column data per query per row.
190    ///
191    /// This is the fastest path for batch operations — amortizes TCP
192    /// overhead across N queries into a single syscall pair.
193    pub async fn pipeline_ast(
194        &mut self,
195        cmds: &[qail_core::ast::Qail],
196    ) -> PgResult<Vec<Vec<Vec<Option<Vec<u8>>>>>> {
197        let conn = self.conn_mut()?;
198        conn.pipeline_ast(cmds).await
199    }
200
201    /// Run `EXPLAIN (FORMAT JSON)` on a Qail command and return cost estimates.
202    ///
203    /// Uses `simple_query` under the hood — no additional round-trips beyond
204    /// the single EXPLAIN statement. Returns `None` if parsing fails or
205    /// the EXPLAIN output is unexpected.
206    pub async fn explain_estimate(
207        &mut self,
208        cmd: &qail_core::ast::Qail,
209    ) -> PgResult<Option<crate::driver::explain::ExplainEstimate>> {
210        use qail_core::transpiler::ToSql;
211
212        let sql = cmd.to_sql();
213        let explain_sql = format!("EXPLAIN (FORMAT JSON) {}", sql);
214
215        let rows = self.conn_mut()?.simple_query(&explain_sql).await?;
216
217        // PostgreSQL returns the JSON plan as a single text column across one or more rows
218        let mut json_output = String::new();
219        for row in &rows {
220            if let Some(Some(val)) = row.columns.first()
221                && let Ok(text) = std::str::from_utf8(val)
222            {
223                json_output.push_str(text);
224            }
225        }
226
227        Ok(crate::driver::explain::parse_explain_json(&json_output))
228    }
229
230    // ─── LISTEN / NOTIFY delegation ─────────────────────────────────
231
232    /// Subscribe to a PostgreSQL notification channel.
233    ///
234    /// Delegates to [`PgConnection::listen`].
235    pub async fn listen(&mut self, channel: &str) -> PgResult<()> {
236        self.conn_mut()?.listen(channel).await
237    }
238
239    /// Unsubscribe from a PostgreSQL notification channel.
240    ///
241    /// Delegates to [`PgConnection::unlisten`].
242    pub async fn unlisten(&mut self, channel: &str) -> PgResult<()> {
243        self.conn_mut()?.unlisten(channel).await
244    }
245
246    /// Unsubscribe from all notification channels.
247    ///
248    /// Delegates to [`PgConnection::unlisten_all`].
249    pub async fn unlisten_all(&mut self) -> PgResult<()> {
250        self.conn_mut()?.unlisten_all().await
251    }
252
253    /// Wait for the next notification, blocking until one arrives.
254    ///
255    /// Delegates to [`PgConnection::recv_notification`].
256    /// Useful for dedicated LISTEN connections in background tasks.
257    pub async fn recv_notification(
258        &mut self,
259    ) -> PgResult<crate::driver::notification::Notification> {
260        self.conn_mut()?.recv_notification().await
261    }
262}
263
264impl Drop for PooledConnection {
265    fn drop(&mut self) {
266        if let Some(mut conn) = self.conn.take() {
267            // Safety net: connection was NOT released via `release()`.
268            // Best-effort strategy:
269            // 1) If connection is already desynced, destroy immediately.
270            // 2) Else, queue bounded async reset+return cleanup.
271            // 3) If cleanup queue/runtime unavailable, destroy.
272            //
273            // This preserves security (fail-closed) while reducing churn under
274            // accidental early-returns in handler code.
275            tracing::warn!(
276                host = %self.pool.config.host,
277                port = self.pool.config.port,
278                user = %self.pool.config.user,
279                db = %self.pool.config.database,
280                rls_dirty = self.rls_dirty,
281                "pool_connection_leaked: dropped without release()"
282            );
283            if conn.is_io_desynced() {
284                tracing::warn!(
285                    host = %self.pool.config.host,
286                    port = self.pool.config.port,
287                    user = %self.pool.config.user,
288                    db = %self.pool.config.database,
289                    "pool_connection_leaked_desynced: destroying immediately"
290                );
291                decrement_active_count_saturating(&self.pool.active_count);
292                self.pool.semaphore.add_permits(1);
293                pool_churn_record_destroy(&self.pool.config, "dropped_without_release_desynced");
294                return;
295            }
296
297            let mut inflight = self.pool.leaked_cleanup_inflight.load(Ordering::Relaxed);
298            let max_inflight = self.pool.config.leaked_cleanup_queue;
299            loop {
300                if inflight >= max_inflight {
301                    tracing::warn!(
302                        host = %self.pool.config.host,
303                        port = self.pool.config.port,
304                        user = %self.pool.config.user,
305                        db = %self.pool.config.database,
306                        max_inflight,
307                        "pool_connection_leaked_cleanup_queue_full: destroying connection"
308                    );
309                    decrement_active_count_saturating(&self.pool.active_count);
310                    self.pool.semaphore.add_permits(1);
311                    pool_churn_record_destroy(
312                        &self.pool.config,
313                        "dropped_without_release_cleanup_queue_full",
314                    );
315                    return;
316                }
317
318                match self.pool.leaked_cleanup_inflight.compare_exchange_weak(
319                    inflight,
320                    inflight + 1,
321                    Ordering::AcqRel,
322                    Ordering::Relaxed,
323                ) {
324                    Ok(_) => break,
325                    Err(actual) => inflight = actual,
326                }
327            }
328
329            let pool = self.pool.clone();
330            let created_at = self.created_at;
331            let reset_timeout = pool.config.connect_timeout;
332            match tokio::runtime::Handle::try_current() {
333                Ok(handle) => {
334                    handle.spawn(async move {
335                        let cleanup_ok = execute_simple_with_timeout(
336                            &mut conn,
337                            crate::driver::rls::reset_sql(),
338                            reset_timeout,
339                            "pool leaked cleanup reset/COMMIT",
340                        )
341                        .await
342                        .is_ok();
343
344                        if cleanup_ok && !conn.is_io_desynced() {
345                            pool.return_connection(conn, created_at).await;
346                        } else {
347                            tracing::warn!(
348                                host = %pool.config.host,
349                                port = pool.config.port,
350                                user = %pool.config.user,
351                                db = %pool.config.database,
352                                timeout_ms = reset_timeout.as_millis() as u64,
353                                "pool_connection_leaked_cleanup_failed: destroying connection"
354                            );
355                            decrement_active_count_saturating(&pool.active_count);
356                            pool.semaphore.add_permits(1);
357                            pool_churn_record_destroy(
358                                &pool.config,
359                                "dropped_without_release_cleanup_failed",
360                            );
361                        }
362
363                        pool.leaked_cleanup_inflight.fetch_sub(1, Ordering::AcqRel);
364                    });
365                }
366                Err(_) => {
367                    pool.leaked_cleanup_inflight.fetch_sub(1, Ordering::AcqRel);
368                    tracing::warn!(
369                        host = %pool.config.host,
370                        port = pool.config.port,
371                        user = %pool.config.user,
372                        db = %pool.config.database,
373                        "pool_connection_leaked_no_runtime: destroying connection"
374                    );
375                    decrement_active_count_saturating(&pool.active_count);
376                    pool.semaphore.add_permits(1);
377                    pool_churn_record_destroy(&pool.config, "dropped_without_release_no_runtime");
378                }
379            }
380        }
381    }
382}