Skip to main content

qail_pg/driver/pool/
connection.rs

1//! Pooled connection wrapper: struct, accessors, RLS cleanup, transaction control,
2//! COPY export, pipeline, LISTEN/NOTIFY delegation, and Drop.
3
4use super::churn::{decrement_active_count_saturating, pool_churn_record_destroy};
5use super::lifecycle::{PgPoolInner, execute_simple_with_timeout};
6use crate::driver::{PgConnection, PgError, PgResult};
7use std::sync::Arc;
8use std::time::Instant;
9
10/// A pooled connection with creation timestamp for idle tracking.
11pub(super) struct PooledConn {
12    pub(super) conn: PgConnection,
13    pub(super) created_at: Instant,
14    pub(super) last_used: Instant,
15}
16
17/// A pooled connection that returns to the pool when dropped.
18///
19/// When `rls_dirty` is true (set by `acquire_with_rls`), the connection
20/// will automatically reset RLS session variables before returning to
21/// the pool. This prevents cross-tenant data leakage.
22pub struct PooledConnection {
23    pub(super) conn: Option<PgConnection>,
24    pub(super) pool: Arc<PgPoolInner>,
25    pub(super) rls_dirty: bool,
26    pub(super) created_at: Instant,
27}
28
29impl PooledConnection {
30    /// Get a reference to the underlying connection, returning an error
31    /// if the connection has already been released.
32    pub(super) fn conn_ref(&self) -> PgResult<&PgConnection> {
33        self.conn
34            .as_ref()
35            .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
36    }
37
38    /// Get a mutable reference to the underlying connection, returning an error
39    /// if the connection has already been released.
40    pub(super) fn conn_mut(&mut self) -> PgResult<&mut PgConnection> {
41        self.conn
42            .as_mut()
43            .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
44    }
45
46    /// Get a shared reference to the underlying connection.
47    ///
48    /// Returns an error if the connection has already been released.
49    pub fn get(&self) -> PgResult<&PgConnection> {
50        self.conn_ref()
51    }
52
53    /// Get a mutable reference to the underlying connection.
54    ///
55    /// Returns an error if the connection has already been released.
56    pub fn get_mut(&mut self) -> PgResult<&mut PgConnection> {
57        self.conn_mut()
58    }
59
60    /// Get a token to cancel the currently running query.
61    pub fn cancel_token(&self) -> PgResult<crate::driver::CancelToken> {
62        let conn = self.conn_ref()?;
63        let (process_id, secret_key) = conn.get_cancel_key();
64        Ok(crate::driver::CancelToken {
65            host: self.pool.config.host.clone(),
66            port: self.pool.config.port,
67            process_id,
68            secret_key,
69        })
70    }
71
72    /// Deterministic connection cleanup and pool return.
73    ///
74    /// This is the **correct** way to return a connection to the pool.
75    /// COMMITs the transaction (which auto-resets transaction-local RLS
76    /// session variables) and returns the connection to the pool with
77    /// prepared statement caches intact.
78    ///
79    /// If cleanup fails, the connection is destroyed (not returned to pool).
80    ///
81    /// # Usage
82    /// ```ignore
83    /// let mut conn = pool.acquire_with_rls(ctx).await?;
84    /// let result = conn.fetch_all_cached(&cmd).await;
85    /// conn.release().await; // COMMIT + return to pool
86    /// result
87    /// ```
88    pub async fn release(mut self) {
89        if let Some(mut conn) = self.conn.take() {
90            if conn.is_io_desynced() {
91                tracing::warn!(
92                    host = %self.pool.config.host,
93                    port = self.pool.config.port,
94                    user = %self.pool.config.user,
95                    db = %self.pool.config.database,
96                    "pool_release_desynced: dropping connection due to prior I/O/protocol desync"
97                );
98                decrement_active_count_saturating(&self.pool.active_count);
99                self.pool.semaphore.add_permits(1);
100                pool_churn_record_destroy(&self.pool.config, "release_desynced");
101                return;
102            }
103            // COMMIT the transaction opened by acquire_with_rls.
104            // Transaction-local set_config values auto-reset on COMMIT,
105            // so no explicit RLS cleanup is needed.
106            // Prepared statements survive — they are NOT transaction-scoped.
107            let reset_timeout = self.pool.config.connect_timeout;
108            if let Err(e) = execute_simple_with_timeout(
109                &mut conn,
110                crate::driver::rls::reset_sql(),
111                reset_timeout,
112                "pool release reset/COMMIT",
113            )
114            .await
115            {
116                tracing::error!(
117                    host = %self.pool.config.host,
118                    port = self.pool.config.port,
119                    user = %self.pool.config.user,
120                    db = %self.pool.config.database,
121                    timeout_ms = reset_timeout.as_millis() as u64,
122                    error = %e,
123                    "pool_release_failed: reset/COMMIT failed; dropping connection to prevent state leak"
124                );
125                decrement_active_count_saturating(&self.pool.active_count);
126                self.pool.semaphore.add_permits(1);
127                pool_churn_record_destroy(&self.pool.config, "release_reset_failed");
128                return; // Connection destroyed — not returned to pool
129            }
130
131            self.pool.return_connection(conn, self.created_at).await;
132        }
133    }
134
135    // ==================== TRANSACTION CONTROL ====================
136
137    /// Begin an explicit transaction on this pooled connection.
138    ///
139    /// Use this when you need multi-statement atomicity beyond the
140    /// implicit transaction created by `acquire_with_rls()`.
141    ///
142    /// # Example
143    /// ```ignore
144    /// let mut conn = pool.acquire_with_rls(ctx).await?;
145    /// conn.begin().await?;
146    /// conn.execute(&insert1).await?;
147    /// conn.execute(&insert2).await?;
148    /// conn.commit().await?;
149    /// conn.release().await;
150    /// ```
151    pub async fn begin(&mut self) -> PgResult<()> {
152        self.conn_mut()?.begin_transaction().await
153    }
154
155    /// Commit the current transaction.
156    /// Makes all changes since `begin()` permanent.
157    pub async fn commit(&mut self) -> PgResult<()> {
158        self.conn_mut()?.commit().await
159    }
160
161    /// Rollback the current transaction.
162    /// Discards all changes since `begin()`.
163    pub async fn rollback(&mut self) -> PgResult<()> {
164        self.conn_mut()?.rollback().await
165    }
166
167    /// Create a named savepoint within the current transaction.
168    /// Use `rollback_to()` to return to this savepoint.
169    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
170        self.conn_mut()?.savepoint(name).await
171    }
172
173    /// Rollback to a previously created savepoint.
174    /// Discards changes since the savepoint, but keeps the transaction open.
175    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
176        self.conn_mut()?.rollback_to(name).await
177    }
178
179    /// Release a savepoint (free resources).
180    /// After release, the savepoint cannot be rolled back to.
181    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
182        self.conn_mut()?.release_savepoint(name).await
183    }
184
185    /// Execute multiple QAIL commands in a single PG pipeline round-trip.
186    ///
187    /// Sends all queries as Parse+Bind+Execute in one write, receives all
188    /// responses in one read. Returns raw column data per query per row.
189    ///
190    /// This is the fastest path for batch operations — amortizes TCP
191    /// overhead across N queries into a single syscall pair.
192    pub async fn pipeline_ast(
193        &mut self,
194        cmds: &[qail_core::ast::Qail],
195    ) -> PgResult<Vec<Vec<Vec<Option<Vec<u8>>>>>> {
196        let conn = self.conn_mut()?;
197        conn.pipeline_ast(cmds).await
198    }
199
200    /// Run `EXPLAIN (FORMAT JSON)` on a Qail command and return cost estimates.
201    ///
202    /// Uses `simple_query` under the hood — no additional round-trips beyond
203    /// the single EXPLAIN statement. Returns `None` if parsing fails or
204    /// the EXPLAIN output is unexpected.
205    pub async fn explain_estimate(
206        &mut self,
207        cmd: &qail_core::ast::Qail,
208    ) -> PgResult<Option<crate::driver::explain::ExplainEstimate>> {
209        use qail_core::transpiler::ToSql;
210
211        let sql = cmd.to_sql();
212        let explain_sql = format!("EXPLAIN (FORMAT JSON) {}", sql);
213
214        let rows = self.conn_mut()?.simple_query(&explain_sql).await?;
215
216        // PostgreSQL returns the JSON plan as a single text column across one or more rows
217        let mut json_output = String::new();
218        for row in &rows {
219            if let Some(Some(val)) = row.columns.first()
220                && let Ok(text) = std::str::from_utf8(val)
221            {
222                json_output.push_str(text);
223            }
224        }
225
226        Ok(crate::driver::explain::parse_explain_json(&json_output))
227    }
228
229    // ─── LISTEN / NOTIFY delegation ─────────────────────────────────
230
231    /// Subscribe to a PostgreSQL notification channel.
232    ///
233    /// Delegates to [`PgConnection::listen`].
234    pub async fn listen(&mut self, channel: &str) -> PgResult<()> {
235        self.conn_mut()?.listen(channel).await
236    }
237
238    /// Unsubscribe from a PostgreSQL notification channel.
239    ///
240    /// Delegates to [`PgConnection::unlisten`].
241    pub async fn unlisten(&mut self, channel: &str) -> PgResult<()> {
242        self.conn_mut()?.unlisten(channel).await
243    }
244
245    /// Unsubscribe from all notification channels.
246    ///
247    /// Delegates to [`PgConnection::unlisten_all`].
248    pub async fn unlisten_all(&mut self) -> PgResult<()> {
249        self.conn_mut()?.unlisten_all().await
250    }
251
252    /// Wait for the next notification, blocking until one arrives.
253    ///
254    /// Delegates to [`PgConnection::recv_notification`].
255    /// Useful for dedicated LISTEN connections in background tasks.
256    pub async fn recv_notification(
257        &mut self,
258    ) -> PgResult<crate::driver::notification::Notification> {
259        self.conn_mut()?.recv_notification().await
260    }
261}
262
263impl Drop for PooledConnection {
264    fn drop(&mut self) {
265        if self.conn.is_some() {
266            // Safety net: connection was NOT released via `release()`.
267            // This happens when:
268            //   - Handler panicked
269            //   - Early return without calling release()
270            //   - Missed release() call (programming error)
271            //
272            // We DESTROY the connection (don't return to pool) to prevent
273            // dirty session state from being reused. But we MUST return the
274            // semaphore permit so the pool can create a replacement connection
275            // on the next acquire. Without this, leaked connections permanently
276            // reduce pool capacity until all slots are consumed.
277            //
278            // The `conn` field is dropped here, closing the TCP socket.
279            tracing::warn!(
280                host = %self.pool.config.host,
281                port = self.pool.config.port,
282                user = %self.pool.config.user,
283                db = %self.pool.config.database,
284                rls_dirty = self.rls_dirty,
285                "pool_connection_leaked: dropped without release(); connection destroyed to prevent state leak"
286            );
287            decrement_active_count_saturating(&self.pool.active_count);
288            // Return the semaphore permit so the pool slot can be reused.
289            // Without this, each leaked connection permanently reduces capacity.
290            self.pool.semaphore.add_permits(1);
291            pool_churn_record_destroy(&self.pool.config, "dropped_without_release");
292        }
293    }
294}