Skip to main content

heliosdb_proxy/pool/
backend_pool.rs

1//! Data-path backend connection pool for Transaction / Statement pooling modes.
2//!
3//! This is the *raw-stream* pool that actually multiplexes clients onto a
4//! bounded set of backend connections — the piece that makes the
5//! `pool-modes` feature do real work on the wire. It is deliberately distinct
6//! from [`crate::pool::manager::ConnectionPoolManager`], which models pooling
7//! over the higher-level `BackendClient` message API; the proxy data path
8//! forwards raw PostgreSQL-wire bytes, so it needs a pool of authenticated
9//! `TcpStream`s.
10//!
11//! ## Identity keying (why this is safe)
12//!
13//! HeliosProxy authenticates backend connections by **passing the client's own
14//! credentials through** to PostgreSQL (the client SCRAM handshake is relayed).
15//! A parked connection is therefore authenticated as a specific
16//! `(user, database)` principal. The pool keys idle connections by
17//! `node\0user\0database`, so a connection is only ever handed to a client that
18//! connected with the *same* identity — and that client independently
19//! authenticated before it could reach the pool. This is exactly PgBouncer's
20//! per-(user,db) pooling model; it does not multiplex distinct users onto one
21//! backend identity (that would need proxy-terminated auth with a shared
22//! backend credential, which is a separate, larger change).
23//!
24//! ## Cleanliness
25//!
26//! A connection is `DISCARD ALL`-reset by the caller before it is parked
27//! (see the release path in `server.rs`), so the next borrower — possibly a
28//! *different* client of the same identity — never inherits GUCs, temp tables,
29//! prepared statements, or advisory locks. On checkout the connection is
30//! liveness-probed so a peer that closed the socket while idle is dropped
31//! rather than handed out.
32
33use dashmap::DashMap;
34use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
35use std::time::{Duration, Instant};
36use tokio::net::TcpStream;
37
38/// Build the pool key for a `(node, user, database)` triple. NUL-delimited so
39/// the three components can never collide across boundaries.
40pub fn pool_key(node: &str, user: &str, database: &str) -> String {
41    format!("{}\0{}\0{}", node, user, database)
42}
43
44/// A bounded set of idle, authenticated backend connections, partitioned by
45/// connection identity. Cheap to clone-share behind an `Arc`.
46pub struct BackendIdlePool {
47    /// identity-key -> stack of (idle authenticated stream, park time).
48    idle: DashMap<String, Vec<(TcpStream, Instant)>>,
49    /// Hard cap on idle connections parked per identity key.
50    max_idle_per_key: usize,
51    /// Hard cap on idle connections parked across ALL identities — bounds total
52    /// file descriptors / memory regardless of how many distinct
53    /// `(node,user,db)` identities appear.
54    max_total_idle: usize,
55    /// Live count of parked connections (kept in step with `idle`) so the
56    /// global-cap check and `idle_count()` are O(1) instead of O(keys).
57    total_idle: AtomicUsize,
58    /// Checkout hits — a parked connection was reused.
59    reuses: AtomicU64,
60    /// Connections parked (checked in) successfully.
61    parked: AtomicU64,
62    /// Check-ins refused because an idle cap (per-key or global) was reached —
63    /// the connection is closed by the caller dropping it.
64    over_capacity: AtomicU64,
65    /// Parked connections dropped at checkout because the peer had closed
66    /// them (or left unexpected bytes) while idle.
67    stale_evicted: AtomicU64,
68    /// Parked connections dropped by the idle reaper for exceeding the TTL.
69    reaped: AtomicU64,
70}
71
72impl BackendIdlePool {
73    /// Create a pool that parks at most `max_idle_per_key` connections per
74    /// `(node,user,db)` identity and `max_total_idle` across all identities.
75    /// A floor of 1 is enforced on each so the pool always retains at least one
76    /// reusable connection.
77    pub fn new(max_idle_per_key: usize, max_total_idle: usize) -> Self {
78        Self {
79            idle: DashMap::new(),
80            max_idle_per_key: max_idle_per_key.max(1),
81            max_total_idle: max_total_idle.max(1),
82            total_idle: AtomicUsize::new(0),
83            reuses: AtomicU64::new(0),
84            parked: AtomicU64::new(0),
85            over_capacity: AtomicU64::new(0),
86            stale_evicted: AtomicU64::new(0),
87            reaped: AtomicU64::new(0),
88        }
89    }
90
91    /// Take a live idle connection for `key`, or `None` if the pool has no
92    /// usable one (caller then dials a fresh connection). Dead/stale parked
93    /// connections are evicted in passing.
94    pub fn checkout(&self, key: &str) -> Option<TcpStream> {
95        let mut guard = self.idle.get_mut(key)?;
96        while let Some((stream, _parked_at)) = guard.pop() {
97            self.total_idle.fetch_sub(1, Ordering::Relaxed);
98            if Self::probe_alive(&stream) {
99                self.reuses.fetch_add(1, Ordering::Relaxed);
100                return Some(stream);
101            }
102            // Peer closed (or desynced) while idle — drop it and try the next.
103            self.stale_evicted.fetch_add(1, Ordering::Relaxed);
104        }
105        None
106    }
107
108    /// Park a (freshly reset) connection for reuse under `key`. Returns `false`
109    /// when an idle cap (per-key OR global) is already reached — in that case
110    /// the connection is dropped (closed) by being moved in and discarded,
111    /// shedding excess capacity.
112    pub fn checkin(&self, key: &str, stream: TcpStream) -> bool {
113        // Global ceiling first — bounds total FDs across all identities. Reserve
114        // the slot atomically (fetch_add, inspect the prior value) rather than
115        // load-then-act: a plain load outside the lock lets N concurrent
116        // check-ins to distinct keys each observe `cap - 1` and all push,
117        // overshooting the ceiling. With a reservation only one racer sees a
118        // prior value below the cap; the rest roll their increment back.
119        if self.total_idle.fetch_add(1, Ordering::Relaxed) >= self.max_total_idle {
120            self.total_idle.fetch_sub(1, Ordering::Relaxed);
121            self.over_capacity.fetch_add(1, Ordering::Relaxed);
122            return false; // `stream` dropped here → socket closed.
123        }
124        let mut entry = self.idle.entry(key.to_string()).or_default();
125        if entry.len() >= self.max_idle_per_key {
126            // Per-key cap reached — release the global slot we reserved.
127            self.total_idle.fetch_sub(1, Ordering::Relaxed);
128            self.over_capacity.fetch_add(1, Ordering::Relaxed);
129            return false; // `stream` dropped here → socket closed.
130        }
131        entry.push((stream, Instant::now()));
132        self.parked.fetch_add(1, Ordering::Relaxed);
133        true
134    }
135
136    /// Drop parked connections that have been idle longer than `max_age` so a
137    /// connection the backend has (or will) close on its own idle timeout is
138    /// not handed out stale, and idle capacity is released back to the OS.
139    /// Returns the number reaped. Intended to be called periodically by a
140    /// background task.
141    pub fn reap_idle(&self, max_age: Duration) -> usize {
142        let mut reaped = 0usize;
143        for mut entry in self.idle.iter_mut() {
144            let before = entry.value().len();
145            entry
146                .value_mut()
147                .retain(|(_, parked_at)| parked_at.elapsed() < max_age);
148            reaped += before - entry.value().len();
149        }
150        if reaped > 0 {
151            self.total_idle.fetch_sub(reaped, Ordering::Relaxed);
152            self.reaped.fetch_add(reaped as u64, Ordering::Relaxed);
153        }
154        reaped
155    }
156
157    /// Liveness probe for an idle parked connection: a clean idle backend has
158    /// no pending bytes, so a non-blocking read should report `WouldBlock`.
159    /// `Ok(0)` means the peer closed; `Ok(n>0)` means unexpected data (protocol
160    /// desync) — both are treated as dead.
161    fn probe_alive(stream: &TcpStream) -> bool {
162        let mut probe = [0u8; 1];
163        matches!(
164            stream.try_read(&mut probe),
165            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock
166        )
167    }
168
169    /// Total idle connections currently parked across all identities (O(1)).
170    pub fn idle_count(&self) -> usize {
171        self.total_idle.load(Ordering::Relaxed)
172    }
173
174    /// Global ceiling on parked idle connections.
175    pub fn max_total_idle(&self) -> usize {
176        self.max_total_idle
177    }
178
179    /// Number of parked connections dropped by the idle reaper (TTL).
180    pub fn reaped(&self) -> u64 {
181        self.reaped.load(Ordering::Relaxed)
182    }
183
184    /// Number of checkout hits (connections reused rather than dialed fresh).
185    pub fn reuses(&self) -> u64 {
186        self.reuses.load(Ordering::Relaxed)
187    }
188
189    /// Number of successful check-ins.
190    pub fn parked(&self) -> u64 {
191        self.parked.load(Ordering::Relaxed)
192    }
193
194    /// Number of check-ins refused for exceeding the per-key idle cap.
195    pub fn over_capacity(&self) -> u64 {
196        self.over_capacity.load(Ordering::Relaxed)
197    }
198
199    /// Number of stale connections evicted at checkout.
200    pub fn stale_evicted(&self) -> u64 {
201        self.stale_evicted.load(Ordering::Relaxed)
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use tokio::net::TcpListener;
209
210    /// Open a connected TcpStream pair against a throwaway loopback listener so
211    /// tests can exercise the pool's bookkeeping with real (live) sockets.
212    async fn live_stream(listener: &TcpListener) -> TcpStream {
213        let addr = listener.local_addr().unwrap();
214        let connect = TcpStream::connect(addr);
215        let accept = listener.accept();
216        let (client, _server) = tokio::join!(connect, accept);
217        // Keep the server side alive by leaking it into a long-lived holder via
218        // the caller; here we just return the client side. The accepted half is
219        // dropped, which is fine for liveness tests that re-accept per stream.
220        client.unwrap()
221    }
222
223    #[test]
224    fn pool_key_is_nul_delimited_and_distinct() {
225        assert_eq!(pool_key("n", "u", "d"), "n\0u\0d");
226        assert_ne!(pool_key("n", "ud", ""), pool_key("n", "u", "d"));
227    }
228
229    #[tokio::test]
230    async fn checkin_then_checkout_reuses_same_connection() {
231        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
232        let pool = BackendIdlePool::new(4, 1000);
233        let key = pool_key("127.0.0.1:5432", "bench", "benchdb");
234
235        // Park a live connection, then check it back out.
236        let s = live_stream(&listener).await;
237        let parked_addr = s.local_addr().unwrap();
238        assert!(pool.checkin(&key, s));
239        assert_eq!(pool.idle_count(), 1);
240
241        let got = pool
242            .checkout(&key)
243            .expect("a parked connection is reusable");
244        assert_eq!(got.local_addr().unwrap(), parked_addr, "same socket reused");
245        assert_eq!(pool.reuses(), 1);
246        assert_eq!(pool.idle_count(), 0);
247
248        // Empty pool → miss.
249        assert!(pool.checkout(&key).is_none());
250    }
251
252    #[tokio::test]
253    async fn distinct_identities_do_not_share() {
254        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
255        let pool = BackendIdlePool::new(4, 1000);
256        let alice = pool_key("n", "alice", "db");
257        let bob = pool_key("n", "bob", "db");
258
259        pool.checkin(&alice, live_stream(&listener).await);
260        // Bob must NOT see alice's connection.
261        assert!(pool.checkout(&bob).is_none());
262        assert!(pool.checkout(&alice).is_some());
263    }
264
265    #[tokio::test]
266    async fn per_key_cap_sheds_excess() {
267        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
268        let pool = BackendIdlePool::new(2, 1000);
269        let key = pool_key("n", "u", "d");
270
271        assert!(pool.checkin(&key, live_stream(&listener).await));
272        assert!(pool.checkin(&key, live_stream(&listener).await));
273        // Third exceeds the cap of 2 → refused (and dropped/closed).
274        assert!(!pool.checkin(&key, live_stream(&listener).await));
275        assert_eq!(pool.over_capacity(), 1);
276        assert_eq!(pool.idle_count(), 2);
277    }
278
279    #[tokio::test]
280    async fn checkout_evicts_a_closed_connection() {
281        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
282        let pool = BackendIdlePool::new(4, 1000);
283        let key = pool_key("n", "u", "d");
284
285        // Park a connection, then close the server side so the parked socket is
286        // dead.
287        let addr = listener.local_addr().unwrap();
288        let client = TcpStream::connect(addr).await.unwrap();
289        let (server, _) = listener.accept().await.unwrap();
290        pool.checkin(&key, client);
291        drop(server); // peer closes
292                      // Give the close a moment to propagate.
293        tokio::task::yield_now().await;
294        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
295
296        // Checkout must not hand out the dead connection.
297        assert!(pool.checkout(&key).is_none());
298        assert_eq!(pool.stale_evicted(), 1);
299    }
300
301    #[tokio::test]
302    async fn global_cap_sheds_across_distinct_identities() {
303        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
304        // Per-key cap is generous (10) but the GLOBAL cap is 2.
305        let pool = BackendIdlePool::new(10, 2);
306        // Three different identities, one connection each.
307        assert!(pool.checkin(&pool_key("n", "a", "d"), live_stream(&listener).await));
308        assert!(pool.checkin(&pool_key("n", "b", "d"), live_stream(&listener).await));
309        // Third exceeds the global ceiling even though its per-key bucket is empty.
310        assert!(!pool.checkin(&pool_key("n", "c", "d"), live_stream(&listener).await));
311        assert_eq!(pool.idle_count(), 2);
312        assert_eq!(pool.over_capacity(), 1);
313    }
314
315    #[tokio::test]
316    async fn reaper_drops_aged_idle_connections() {
317        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
318        let pool = BackendIdlePool::new(4, 100);
319        let key = pool_key("n", "u", "d");
320        pool.checkin(&key, live_stream(&listener).await);
321        assert_eq!(pool.idle_count(), 1);
322
323        // Nothing reaped while within the TTL.
324        assert_eq!(pool.reap_idle(std::time::Duration::from_secs(60)), 0);
325        assert_eq!(pool.idle_count(), 1);
326
327        // Let it age, then reap with a tiny TTL.
328        tokio::time::sleep(std::time::Duration::from_millis(15)).await;
329        assert_eq!(pool.reap_idle(std::time::Duration::from_millis(5)), 1);
330        assert_eq!(pool.idle_count(), 0);
331        assert_eq!(pool.reaped(), 1);
332        // A subsequent checkout misses (it was reaped).
333        assert!(pool.checkout(&key).is_none());
334    }
335}