Skip to main content

pg_pool/
async_wire.rs

1//! pg-wired AsyncConn integration: implements [`Poolable`] for async connections.
2//!
3//! Unlike `WirePoolable` which pools raw `WireConn` (requiring a new `AsyncConn`
4//! per checkout), this pools `AsyncConn` directly so reader/writer tasks survive
5//! across checkout/return cycles.
6
7use crate::Poolable;
8
9/// Poolable wrapper around [`pg_wired::AsyncConn`].
10///
11/// The `AsyncConn` spawns reader/writer tasks on creation and keeps them
12/// running until the connection dies. Pooling `AsyncConn` directly means
13/// connections are reused without re-establishing TCP or re-authenticating.
14///
15/// The wrapped connection is exposed read-only via [`AsyncPoolable::conn`].
16/// Constructing an `AsyncPoolable` directly is intentionally not part of the
17/// public API: connections enter the wrapper through [`Poolable::connect`]
18/// and the pool's lifecycle hooks, which keeps pool accounting consistent.
19#[derive(Debug)]
20pub struct AsyncPoolable(pg_wired::AsyncConn);
21
22impl AsyncPoolable {
23    /// Borrow the wrapped connection. Use this for read-only access from
24    /// pool consumers (e.g., to check liveness or fetch a cancel token).
25    pub fn conn(&self) -> &pg_wired::AsyncConn {
26        &self.0
27    }
28}
29
30impl std::ops::Deref for AsyncPoolable {
31    type Target = pg_wired::AsyncConn;
32    fn deref(&self) -> &Self::Target {
33        &self.0
34    }
35}
36
37impl Poolable for AsyncPoolable {
38    type Error = pg_wired::PgWireError;
39
40    async fn connect(
41        addr: &str,
42        user: &str,
43        password: &str,
44        database: &str,
45    ) -> Result<Self, Self::Error> {
46        let wire = pg_wired::WireConn::connect(addr, user, password, database).await?;
47        let async_conn = pg_wired::AsyncConn::new(wire);
48        Ok(AsyncPoolable(async_conn))
49    }
50
51    fn has_pending_data(&self) -> bool {
52        !self.0.is_alive()
53    }
54
55    /// Reset connection state on return to pool.
56    ///
57    /// Fast path: if no operation since the last reset has mutated session
58    /// state, skip the round-trip. The reader task automatically flags the
59    /// connection state-mutated whenever ReadyForQuery reports a non-idle
60    /// transaction status; callers that issue `SET` / advisory-lock / temp
61    /// table / `LISTEN` / etc. via simple-query call
62    /// [`pg_wired::AsyncConn::mark_state_mutated`] explicitly. The bulk of pooled
63    /// usage is self-contained Bind/Execute/Sync queries which never set
64    /// the flag, so the fast path is the common case.
65    ///
66    /// Slow path: send `DISCARD ALL` to clear transactions, SET variables,
67    /// temp tables, advisory locks, and prepared statements. If the reset
68    /// fails, the connection is destroyed.
69    async fn reset(&self) -> bool {
70        if !self.0.is_alive() {
71            return false;
72        }
73        if !self.0.take_state_mutated() {
74            return true;
75        }
76        // DISCARD ALL: resets search_path, temp tables, prepared statements,
77        // advisory locks, LISTEN channels, and aborts any open transaction.
78        let mut buf = bytes::BytesMut::new();
79        pg_wired::protocol::frontend::encode_message(
80            &pg_wired::protocol::types::FrontendMsg::Query(b"DISCARD ALL"),
81            &mut buf,
82        );
83        match self.0.submit(buf, pg_wired::ResponseCollector::Drain).await {
84            Ok(_) => {
85                // DISCARD ALL destroys server-side prepared statements,
86                // so clear the client-side cache to stay in sync.
87                self.0.clear_statement_cache();
88                true
89            }
90            Err(e) => {
91                tracing::warn!(error = %e, "connection reset failed, destroying");
92                false
93            }
94        }
95    }
96}