Skip to main content

pg_pool/
async_wire.rs

1//! pg-wired AsyncConn integration: implements [`Poolable`] for async connections.
2//!
3//! Unlike `WirePoolable` which pools raw `WireConn` (requiring a new `AsyncConn`
4//! per checkout), this pools `AsyncConn` directly so reader/writer tasks survive
5//! across checkout/return cycles.
6
7use crate::Poolable;
8
9/// Poolable wrapper around [`pg_wired::AsyncConn`].
10///
11/// The `AsyncConn` spawns reader/writer tasks on creation and keeps them
12/// running until the connection dies. Pooling `AsyncConn` directly means
13/// connections are reused without re-establishing TCP or re-authenticating.
14///
15/// The wrapped connection is exposed read-only via [`AsyncPoolable::conn`].
16/// Constructing an `AsyncPoolable` directly is intentionally not part of the
17/// public API: connections enter the wrapper through [`Poolable::connect`]
18/// and the pool's lifecycle hooks, which keeps pool accounting consistent.
19#[derive(Debug)]
20pub struct AsyncPoolable(pg_wired::AsyncConn);
21
22impl AsyncPoolable {
23    /// Borrow the wrapped connection. Use this for read-only access from
24    /// pool consumers (e.g., to check liveness or fetch a cancel token).
25    pub fn conn(&self) -> &pg_wired::AsyncConn {
26        &self.0
27    }
28}
29
30impl std::ops::Deref for AsyncPoolable {
31    type Target = pg_wired::AsyncConn;
32    fn deref(&self) -> &Self::Target {
33        &self.0
34    }
35}
36
37impl Poolable for AsyncPoolable {
38    type Error = pg_wired::PgWireError;
39
40    async fn connect(
41        addr: &str,
42        user: &str,
43        password: &str,
44        database: &str,
45    ) -> Result<Self, Self::Error> {
46        let wire = pg_wired::WireConn::connect(addr, user, password, database).await?;
47        let async_conn = pg_wired::AsyncConn::new(wire);
48        Ok(AsyncPoolable(async_conn))
49    }
50
51    fn has_pending_data(&self) -> bool {
52        // Treat both dead and explicitly-broken connections as un-returnable.
53        // `is_broken()` is set by callers (e.g., a `Transaction` dropped
54        // without commit/rollback) to declare the session unusable even though
55        // the reader/writer tasks are still running. The pool's
56        // `return_conn_async` checks `has_pending_data()` first and destroys
57        // the connection without attempting reset, which is correct here:
58        // sending DISCARD ALL into an aborted transaction returns an error
59        // that cannot be recovered from on the same connection.
60        !self.0.is_alive() || self.0.is_broken()
61    }
62
63    /// Reset connection state on return to pool.
64    ///
65    /// Fast path: if no operation since the last reset has mutated session
66    /// state, skip the round-trip. The reader task automatically flags the
67    /// connection state-mutated whenever ReadyForQuery reports a non-idle
68    /// transaction status; callers that issue `SET` / advisory-lock / temp
69    /// table / `LISTEN` / etc. via simple-query call
70    /// [`pg_wired::AsyncConn::mark_state_mutated`] explicitly. The bulk of pooled
71    /// usage is self-contained Bind/Execute/Sync queries which never set
72    /// the flag, so the fast path is the common case.
73    ///
74    /// Slow path: send `DISCARD ALL` to clear transactions, SET variables,
75    /// temp tables, advisory locks, and prepared statements. If the reset
76    /// fails, the connection is destroyed.
77    async fn reset(&self) -> bool {
78        if !self.0.is_alive() {
79            return false;
80        }
81        if !self.0.take_state_mutated() {
82            return true;
83        }
84        // DISCARD ALL: resets search_path, temp tables, prepared statements,
85        // advisory locks, LISTEN channels, and aborts any open transaction.
86        let mut buf = bytes::BytesMut::new();
87        pg_wired::protocol::frontend::encode_message(
88            &pg_wired::protocol::types::FrontendMsg::Query(b"DISCARD ALL"),
89            &mut buf,
90        );
91        match self.0.submit(buf, pg_wired::ResponseCollector::Drain).await {
92            Ok(_) => {
93                // DISCARD ALL destroys server-side prepared statements,
94                // so clear the client-side cache to stay in sync.
95                self.0.clear_statement_cache();
96                true
97            }
98            Err(e) => {
99                tracing::warn!(error = %e, "connection reset failed, destroying");
100                false
101            }
102        }
103    }
104}