1use super::churn::{decrement_active_count_saturating, pool_churn_record_destroy};
5use super::lifecycle::{PgPoolInner, execute_simple_with_timeout};
6use crate::driver::{PgConnection, PgError, PgResult};
7use std::sync::Arc;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11pub(super) struct PooledConn {
13 pub(super) conn: PgConnection,
14 pub(super) created_at: Instant,
15 pub(super) last_used: Instant,
16}
17
18pub struct PooledConnection {
24 pub(super) conn: Option<PgConnection>,
25 pub(super) pool: Arc<PgPoolInner>,
26 pub(super) rls_dirty: bool,
27 pub(super) created_at: Instant,
28}
29
30impl PooledConnection {
31 pub(super) fn conn_ref(&self) -> PgResult<&PgConnection> {
34 self.conn
35 .as_ref()
36 .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
37 }
38
39 pub(super) fn conn_mut(&mut self) -> PgResult<&mut PgConnection> {
42 self.conn
43 .as_mut()
44 .ok_or_else(|| PgError::Connection("Connection already released back to pool".into()))
45 }
46
47 pub fn get(&self) -> PgResult<&PgConnection> {
51 self.conn_ref()
52 }
53
54 pub fn get_mut(&mut self) -> PgResult<&mut PgConnection> {
58 self.conn_mut()
59 }
60
61 pub fn cancel_token(&self) -> PgResult<crate::driver::CancelToken> {
63 let conn = self.conn_ref()?;
64 let (process_id, secret_key_bytes) = conn.get_cancel_key_bytes();
65 Ok(crate::driver::CancelToken {
66 host: self.pool.config.host.clone(),
67 port: self.pool.config.port,
68 process_id,
69 secret_key_bytes: secret_key_bytes.to_vec(),
70 })
71 }
72
73 fn reject_outer_transaction_control_in_rls(&self, operation: &str) -> PgResult<()> {
74 if self.rls_dirty {
75 return Err(PgError::Connection(format!(
76 "{operation} is not allowed on an RLS-bound pooled connection; \
77 use savepoint(), rollback_to(), and release_savepoint() for nested work, \
78 then release() to close the pool-managed RLS transaction"
79 )));
80 }
81 Ok(())
82 }
83
84 async fn finish_with_reset(
85 mut self,
86 reset_sql: &'static str,
87 operation: &'static str,
88 failure_reason: &'static str,
89 ) -> PgResult<()> {
90 let Some(mut conn) = self.conn.take() else {
91 return Ok(());
92 };
93
94 if conn.is_io_desynced() {
95 tracing::warn!(
96 host = %self.pool.config.host,
97 port = self.pool.config.port,
98 user = %self.pool.config.user,
99 db = %self.pool.config.database,
100 "pool_release_desynced: dropping connection due to prior I/O/protocol desync"
101 );
102 decrement_active_count_saturating(&self.pool.active_count);
103 self.pool.semaphore.add_permits(1);
104 pool_churn_record_destroy(&self.pool.config, "release_desynced");
105 return Err(PgError::Connection(
106 "connection is protocol-desynced; dropped instead of returning to pool".into(),
107 ));
108 }
109
110 let reset_timeout = self.pool.config.connect_timeout;
111 if let Err(e) =
112 execute_simple_with_timeout(&mut conn, reset_sql, reset_timeout, operation).await
113 {
114 tracing::error!(
115 host = %self.pool.config.host,
116 port = self.pool.config.port,
117 user = %self.pool.config.user,
118 db = %self.pool.config.database,
119 timeout_ms = reset_timeout.as_millis() as u64,
120 error = %e,
121 "pool_release_failed: reset failed; dropping connection to prevent state leak"
122 );
123 decrement_active_count_saturating(&self.pool.active_count);
124 self.pool.semaphore.add_permits(1);
125 pool_churn_record_destroy(&self.pool.config, failure_reason);
126 return Err(e);
127 }
128
129 self.pool.return_connection(conn, self.created_at).await;
130 Ok(())
131 }
132
133 pub async fn release(self) {
150 let _ = self.release_checked().await;
151 }
152
153 pub async fn release_checked(self) -> PgResult<()> {
158 self.finish_with_reset(
163 crate::driver::rls::reset_sql(),
164 "pool release reset/COMMIT",
165 "release_reset_failed",
166 )
167 .await
168 }
169
170 pub async fn rollback_and_release(self) -> PgResult<()> {
175 self.finish_with_reset(
176 "ROLLBACK",
177 "pool release rollback/ROLLBACK",
178 "release_rollback_failed",
179 )
180 .await
181 }
182
183 pub async fn begin(&mut self) -> PgResult<()> {
201 self.reject_outer_transaction_control_in_rls("BEGIN")?;
202 self.conn_mut()?.begin_transaction().await
203 }
204
205 pub async fn commit(&mut self) -> PgResult<()> {
208 self.reject_outer_transaction_control_in_rls("COMMIT")?;
209 self.conn_mut()?.commit().await
210 }
211
212 pub async fn rollback(&mut self) -> PgResult<()> {
215 self.reject_outer_transaction_control_in_rls("ROLLBACK")?;
216 self.conn_mut()?.rollback().await
217 }
218
219 pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
222 self.conn_mut()?.savepoint(name).await
223 }
224
225 pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
228 self.conn_mut()?.rollback_to(name).await
229 }
230
231 pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
234 self.conn_mut()?.release_savepoint(name).await
235 }
236
237 pub async fn pipeline_execute_rows_ast(
245 &mut self,
246 cmds: &[qail_core::ast::Qail],
247 ) -> PgResult<Vec<Vec<Vec<Option<Vec<u8>>>>>> {
248 let conn = self.conn_mut()?;
249 conn.pipeline_execute_rows_ast(cmds).await
250 }
251
252 pub async fn explain_estimate(
258 &mut self,
259 cmd: &qail_core::ast::Qail,
260 ) -> PgResult<Option<crate::driver::explain::ExplainEstimate>> {
261 use qail_core::transpiler::ToSql;
262
263 let sql = cmd.to_sql();
264 let explain_sql = format!("EXPLAIN (FORMAT JSON) {}", sql);
265
266 let rows = self.conn_mut()?.simple_query(&explain_sql).await?;
267
268 let mut json_output = String::new();
270 for row in &rows {
271 if let Some(Some(val)) = row.columns.first()
272 && let Ok(text) = std::str::from_utf8(val)
273 {
274 json_output.push_str(text);
275 }
276 }
277
278 Ok(crate::driver::explain::parse_explain_json(&json_output))
279 }
280
281 pub async fn listen(&mut self, channel: &str) -> PgResult<()> {
287 self.conn_mut()?.listen(channel).await
288 }
289
290 pub async fn unlisten(&mut self, channel: &str) -> PgResult<()> {
294 self.conn_mut()?.unlisten(channel).await
295 }
296
297 pub async fn unlisten_all(&mut self) -> PgResult<()> {
301 self.conn_mut()?.unlisten_all().await
302 }
303
304 pub async fn recv_notification(
309 &mut self,
310 ) -> PgResult<crate::driver::notification::Notification> {
311 self.conn_mut()?.recv_notification().await
312 }
313}
314
315impl Drop for PooledConnection {
316 fn drop(&mut self) {
317 if let Some(mut conn) = self.conn.take() {
318 tracing::warn!(
327 host = %self.pool.config.host,
328 port = self.pool.config.port,
329 user = %self.pool.config.user,
330 db = %self.pool.config.database,
331 rls_dirty = self.rls_dirty,
332 "pool_connection_leaked: dropped without release()"
333 );
334 if conn.is_io_desynced() {
335 tracing::warn!(
336 host = %self.pool.config.host,
337 port = self.pool.config.port,
338 user = %self.pool.config.user,
339 db = %self.pool.config.database,
340 "pool_connection_leaked_desynced: destroying immediately"
341 );
342 decrement_active_count_saturating(&self.pool.active_count);
343 self.pool.semaphore.add_permits(1);
344 pool_churn_record_destroy(&self.pool.config, "dropped_without_release_desynced");
345 return;
346 }
347
348 let mut inflight = self.pool.leaked_cleanup_inflight.load(Ordering::Relaxed);
349 let max_inflight = self.pool.config.leaked_cleanup_queue;
350 loop {
351 if inflight >= max_inflight {
352 tracing::warn!(
353 host = %self.pool.config.host,
354 port = self.pool.config.port,
355 user = %self.pool.config.user,
356 db = %self.pool.config.database,
357 max_inflight,
358 "pool_connection_leaked_cleanup_queue_full: destroying connection"
359 );
360 decrement_active_count_saturating(&self.pool.active_count);
361 self.pool.semaphore.add_permits(1);
362 pool_churn_record_destroy(
363 &self.pool.config,
364 "dropped_without_release_cleanup_queue_full",
365 );
366 return;
367 }
368
369 match self.pool.leaked_cleanup_inflight.compare_exchange_weak(
370 inflight,
371 inflight + 1,
372 Ordering::AcqRel,
373 Ordering::Relaxed,
374 ) {
375 Ok(_) => break,
376 Err(actual) => inflight = actual,
377 }
378 }
379
380 let pool = std::sync::Arc::clone(&self.pool);
381 let created_at = self.created_at;
382 let reset_timeout = pool.config.connect_timeout;
383 match tokio::runtime::Handle::try_current() {
384 Ok(handle) => {
385 handle.spawn(async move {
386 let cleanup_ok = execute_simple_with_timeout(
387 &mut conn,
388 "ROLLBACK",
389 reset_timeout,
390 "pool leaked cleanup ROLLBACK",
391 )
392 .await
393 .is_ok();
394
395 if cleanup_ok && !conn.is_io_desynced() {
396 pool.return_connection(conn, created_at).await;
397 } else {
398 tracing::warn!(
399 host = %pool.config.host,
400 port = pool.config.port,
401 user = %pool.config.user,
402 db = %pool.config.database,
403 timeout_ms = reset_timeout.as_millis() as u64,
404 "pool_connection_leaked_cleanup_failed: destroying connection"
405 );
406 decrement_active_count_saturating(&pool.active_count);
407 pool.semaphore.add_permits(1);
408 pool_churn_record_destroy(
409 &pool.config,
410 "dropped_without_release_cleanup_failed",
411 );
412 }
413
414 pool.leaked_cleanup_inflight.fetch_sub(1, Ordering::AcqRel);
415 });
416 }
417 Err(_) => {
418 pool.leaked_cleanup_inflight.fetch_sub(1, Ordering::AcqRel);
419 tracing::warn!(
420 host = %pool.config.host,
421 port = pool.config.port,
422 user = %pool.config.user,
423 db = %pool.config.database,
424 "pool_connection_leaked_no_runtime: destroying connection"
425 );
426 decrement_active_count_saturating(&pool.active_count);
427 pool.semaphore.add_permits(1);
428 pool_churn_record_destroy(&pool.config, "dropped_without_release_no_runtime");
429 }
430 }
431 }
432 }
433}