pg_pool/async_wire.rs
1//! pg-wired AsyncConn integration: implements [`Poolable`] for async connections.
2//!
3//! Unlike `WirePoolable` which pools raw `WireConn` (requiring a new `AsyncConn`
4//! per checkout), this pools `AsyncConn` directly so reader/writer tasks survive
5//! across checkout/return cycles.
6
7use crate::Poolable;
8
9/// Poolable wrapper around [`pg_wired::AsyncConn`].
10///
11/// The `AsyncConn` spawns reader/writer tasks on creation and keeps them
12/// running until the connection dies. Pooling `AsyncConn` directly means
13/// connections are reused without re-establishing TCP or re-authenticating.
14///
15/// The wrapped connection is exposed read-only via [`AsyncPoolable::conn`].
16/// Constructing an `AsyncPoolable` directly is intentionally not part of the
17/// public API: connections enter the wrapper through [`Poolable::connect`]
18/// and the pool's lifecycle hooks, which keeps pool accounting consistent.
19#[derive(Debug)]
20pub struct AsyncPoolable(pg_wired::AsyncConn);
21
22impl AsyncPoolable {
23 /// Borrow the wrapped connection. Use this for read-only access from
24 /// pool consumers (e.g., to check liveness or fetch a cancel token).
25 pub fn conn(&self) -> &pg_wired::AsyncConn {
26 &self.0
27 }
28}
29
30impl std::ops::Deref for AsyncPoolable {
31 type Target = pg_wired::AsyncConn;
32 fn deref(&self) -> &Self::Target {
33 &self.0
34 }
35}
36
37impl Poolable for AsyncPoolable {
38 type Error = pg_wired::PgWireError;
39
40 async fn connect(
41 addr: &str,
42 user: &str,
43 password: &str,
44 database: &str,
45 ) -> Result<Self, Self::Error> {
46 let wire = pg_wired::WireConn::connect(addr, user, password, database).await?;
47 let async_conn = pg_wired::AsyncConn::new(wire);
48 Ok(AsyncPoolable(async_conn))
49 }
50
51 fn has_pending_data(&self) -> bool {
52 !self.0.is_alive()
53 }
54
55 /// Reset connection state on return to pool.
56 ///
57 /// Fast path: if no operation since the last reset has mutated session
58 /// state, skip the round-trip. The reader task automatically flags the
59 /// connection state-mutated whenever ReadyForQuery reports a non-idle
60 /// transaction status; callers that issue `SET` / advisory-lock / temp
61 /// table / `LISTEN` / etc. via simple-query call
62 /// [`pg_wired::AsyncConn::mark_state_mutated`] explicitly. The bulk of pooled
63 /// usage is self-contained Bind/Execute/Sync queries which never set
64 /// the flag, so the fast path is the common case.
65 ///
66 /// Slow path: send `DISCARD ALL` to clear transactions, SET variables,
67 /// temp tables, advisory locks, and prepared statements. If the reset
68 /// fails, the connection is destroyed.
69 async fn reset(&self) -> bool {
70 if !self.0.is_alive() {
71 return false;
72 }
73 if !self.0.take_state_mutated() {
74 return true;
75 }
76 // DISCARD ALL: resets search_path, temp tables, prepared statements,
77 // advisory locks, LISTEN channels, and aborts any open transaction.
78 let mut buf = bytes::BytesMut::new();
79 pg_wired::protocol::frontend::encode_message(
80 &pg_wired::protocol::types::FrontendMsg::Query(b"DISCARD ALL"),
81 &mut buf,
82 );
83 match self.0.submit(buf, pg_wired::ResponseCollector::Drain).await {
84 Ok(_) => {
85 // DISCARD ALL destroys server-side prepared statements,
86 // so clear the client-side cache to stay in sync.
87 self.0.clear_statement_cache();
88 true
89 }
90 Err(e) => {
91 tracing::warn!(error = %e, "connection reset failed, destroying");
92 false
93 }
94 }
95 }
96}