pg_pool/async_wire.rs
1//! pg-wired AsyncConn integration: implements [`Poolable`] for async connections.
2//!
3//! Unlike `WirePoolable` which pools raw `WireConn` (requiring a new `AsyncConn`
4//! per checkout), this pools `AsyncConn` directly so reader/writer tasks survive
5//! across checkout/return cycles.
6
7use crate::Poolable;
8
9/// Poolable wrapper around [`pg_wired::AsyncConn`].
10///
11/// The `AsyncConn` spawns reader/writer tasks on creation and keeps them
12/// running until the connection dies. Pooling `AsyncConn` directly means
13/// connections are reused without re-establishing TCP or re-authenticating.
14///
15/// The wrapped connection is exposed read-only via [`AsyncPoolable::conn`].
16/// Constructing an `AsyncPoolable` directly is intentionally not part of the
17/// public API: connections enter the wrapper through [`Poolable::connect`]
18/// and the pool's lifecycle hooks, which keeps pool accounting consistent.
19#[derive(Debug)]
20pub struct AsyncPoolable(pg_wired::AsyncConn);
21
22impl AsyncPoolable {
23 /// Borrow the wrapped connection. Use this for read-only access from
24 /// pool consumers (e.g., to check liveness or fetch a cancel token).
25 pub fn conn(&self) -> &pg_wired::AsyncConn {
26 &self.0
27 }
28}
29
30impl std::ops::Deref for AsyncPoolable {
31 type Target = pg_wired::AsyncConn;
32 fn deref(&self) -> &Self::Target {
33 &self.0
34 }
35}
36
37impl Poolable for AsyncPoolable {
38 type Error = pg_wired::PgWireError;
39
40 async fn connect(
41 addr: &str,
42 user: &str,
43 password: &str,
44 database: &str,
45 ) -> Result<Self, Self::Error> {
46 let wire = pg_wired::WireConn::connect(addr, user, password, database).await?;
47 let async_conn = pg_wired::AsyncConn::new(wire);
48 Ok(AsyncPoolable(async_conn))
49 }
50
51 fn has_pending_data(&self) -> bool {
52 // Treat both dead and explicitly-broken connections as un-returnable.
53 // `is_broken()` is set by callers (e.g., a `Transaction` dropped
54 // without commit/rollback) to declare the session unusable even though
55 // the reader/writer tasks are still running. The pool's
56 // `return_conn_async` checks `has_pending_data()` first and destroys
57 // the connection without attempting reset, which is correct here:
58 // sending DISCARD ALL into an aborted transaction returns an error
59 // that cannot be recovered from on the same connection.
60 !self.0.is_alive() || self.0.is_broken()
61 }
62
63 /// Reset connection state on return to pool.
64 ///
65 /// Fast path: if no operation since the last reset has mutated session
66 /// state, skip the round-trip. The reader task automatically flags the
67 /// connection state-mutated whenever ReadyForQuery reports a non-idle
68 /// transaction status; callers that issue `SET` / advisory-lock / temp
69 /// table / `LISTEN` / etc. via simple-query call
70 /// [`pg_wired::AsyncConn::mark_state_mutated`] explicitly. The bulk of pooled
71 /// usage is self-contained Bind/Execute/Sync queries which never set
72 /// the flag, so the fast path is the common case.
73 ///
74 /// Slow path: send `DISCARD ALL` to clear transactions, SET variables,
75 /// temp tables, advisory locks, and prepared statements. If the reset
76 /// fails, the connection is destroyed.
77 async fn reset(&self) -> bool {
78 if !self.0.is_alive() {
79 return false;
80 }
81 if !self.0.take_state_mutated() {
82 return true;
83 }
84 // DISCARD ALL: resets search_path, temp tables, prepared statements,
85 // advisory locks, LISTEN channels, and aborts any open transaction.
86 let mut buf = bytes::BytesMut::new();
87 pg_wired::protocol::frontend::encode_message(
88 &pg_wired::protocol::types::FrontendMsg::Query(b"DISCARD ALL"),
89 &mut buf,
90 );
91 match self.0.submit(buf, pg_wired::ResponseCollector::Drain).await {
92 Ok(_) => {
93 // DISCARD ALL destroys server-side prepared statements,
94 // so clear the client-side cache to stay in sync.
95 self.0.clear_statement_cache();
96 true
97 }
98 Err(e) => {
99 tracing::warn!(error = %e, "connection reset failed, destroying");
100 false
101 }
102 }
103 }
104}