heliosdb_proxy/pool/backend_pool.rs
1//! Data-path backend connection pool for Transaction / Statement pooling modes.
2//!
3//! This is the *raw-stream* pool that actually multiplexes clients onto a
4//! bounded set of backend connections — the piece that makes the
5//! `pool-modes` feature do real work on the wire. It is deliberately distinct
6//! from [`crate::pool::manager::ConnectionPoolManager`], which models pooling
7//! over the higher-level `BackendClient` message API; the proxy data path
8//! forwards raw PostgreSQL-wire bytes, so it needs a pool of authenticated
9//! `TcpStream`s.
10//!
11//! ## Identity keying (why this is safe)
12//!
13//! HeliosProxy authenticates backend connections by **passing the client's own
14//! credentials through** to PostgreSQL (the client SCRAM handshake is relayed).
15//! A parked connection is therefore authenticated as a specific
16//! `(user, database)` principal. The pool keys idle connections by
17//! `node\0user\0database`, so a connection is only ever handed to a client that
18//! connected with the *same* identity — and that client independently
19//! authenticated before it could reach the pool. This is exactly PgBouncer's
20//! per-(user,db) pooling model; it does not multiplex distinct users onto one
21//! backend identity (that would need proxy-terminated auth with a shared
22//! backend credential, which is a separate, larger change).
23//!
24//! ## Cleanliness
25//!
26//! A connection is `DISCARD ALL`-reset by the caller before it is parked
27//! (see the release path in `server.rs`), so the next borrower — possibly a
28//! *different* client of the same identity — never inherits GUCs, temp tables,
29//! prepared statements, or advisory locks. On checkout the connection is
30//! liveness-probed so a peer that closed the socket while idle is dropped
31//! rather than handed out.
32
33use dashmap::DashMap;
34use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
35use std::time::{Duration, Instant};
36use tokio::net::TcpStream;
37
38/// Build the pool key for a `(node, user, database)` triple. NUL-delimited so
39/// the three components can never collide across boundaries.
40pub fn pool_key(node: &str, user: &str, database: &str) -> String {
41 format!("{}\0{}\0{}", node, user, database)
42}
43
44/// A bounded set of idle, authenticated backend connections, partitioned by
45/// connection identity. Cheap to clone-share behind an `Arc`.
46pub struct BackendIdlePool {
47 /// identity-key -> stack of (idle authenticated stream, park time).
48 idle: DashMap<String, Vec<(TcpStream, Instant)>>,
49 /// Hard cap on idle connections parked per identity key.
50 max_idle_per_key: usize,
51 /// Hard cap on idle connections parked across ALL identities — bounds total
52 /// file descriptors / memory regardless of how many distinct
53 /// `(node,user,db)` identities appear.
54 max_total_idle: usize,
55 /// Live count of parked connections (kept in step with `idle`) so the
56 /// global-cap check and `idle_count()` are O(1) instead of O(keys).
57 total_idle: AtomicUsize,
58 /// Checkout hits — a parked connection was reused.
59 reuses: AtomicU64,
60 /// Connections parked (checked in) successfully.
61 parked: AtomicU64,
62 /// Check-ins refused because an idle cap (per-key or global) was reached —
63 /// the connection is closed by the caller dropping it.
64 over_capacity: AtomicU64,
65 /// Parked connections dropped at checkout because the peer had closed
66 /// them (or left unexpected bytes) while idle.
67 stale_evicted: AtomicU64,
68 /// Parked connections dropped by the idle reaper for exceeding the TTL.
69 reaped: AtomicU64,
70}
71
72impl BackendIdlePool {
73 /// Create a pool that parks at most `max_idle_per_key` connections per
74 /// `(node,user,db)` identity and `max_total_idle` across all identities.
75 /// A floor of 1 is enforced on each so the pool always retains at least one
76 /// reusable connection.
77 pub fn new(max_idle_per_key: usize, max_total_idle: usize) -> Self {
78 Self {
79 idle: DashMap::new(),
80 max_idle_per_key: max_idle_per_key.max(1),
81 max_total_idle: max_total_idle.max(1),
82 total_idle: AtomicUsize::new(0),
83 reuses: AtomicU64::new(0),
84 parked: AtomicU64::new(0),
85 over_capacity: AtomicU64::new(0),
86 stale_evicted: AtomicU64::new(0),
87 reaped: AtomicU64::new(0),
88 }
89 }
90
91 /// Take a live idle connection for `key`, or `None` if the pool has no
92 /// usable one (caller then dials a fresh connection). Dead/stale parked
93 /// connections are evicted in passing.
94 pub fn checkout(&self, key: &str) -> Option<TcpStream> {
95 let mut guard = self.idle.get_mut(key)?;
96 while let Some((stream, _parked_at)) = guard.pop() {
97 self.total_idle.fetch_sub(1, Ordering::Relaxed);
98 if Self::probe_alive(&stream) {
99 self.reuses.fetch_add(1, Ordering::Relaxed);
100 return Some(stream);
101 }
102 // Peer closed (or desynced) while idle — drop it and try the next.
103 self.stale_evicted.fetch_add(1, Ordering::Relaxed);
104 }
105 None
106 }
107
108 /// Park a (freshly reset) connection for reuse under `key`. Returns `false`
109 /// when an idle cap (per-key OR global) is already reached — in that case
110 /// the connection is dropped (closed) by being moved in and discarded,
111 /// shedding excess capacity.
112 pub fn checkin(&self, key: &str, stream: TcpStream) -> bool {
113 // Global ceiling first — bounds total FDs across all identities. Reserve
114 // the slot atomically (fetch_add, inspect the prior value) rather than
115 // load-then-act: a plain load outside the lock lets N concurrent
116 // check-ins to distinct keys each observe `cap - 1` and all push,
117 // overshooting the ceiling. With a reservation only one racer sees a
118 // prior value below the cap; the rest roll their increment back.
119 if self.total_idle.fetch_add(1, Ordering::Relaxed) >= self.max_total_idle {
120 self.total_idle.fetch_sub(1, Ordering::Relaxed);
121 self.over_capacity.fetch_add(1, Ordering::Relaxed);
122 return false; // `stream` dropped here → socket closed.
123 }
124 let mut entry = self.idle.entry(key.to_string()).or_default();
125 if entry.len() >= self.max_idle_per_key {
126 // Per-key cap reached — release the global slot we reserved.
127 self.total_idle.fetch_sub(1, Ordering::Relaxed);
128 self.over_capacity.fetch_add(1, Ordering::Relaxed);
129 return false; // `stream` dropped here → socket closed.
130 }
131 entry.push((stream, Instant::now()));
132 self.parked.fetch_add(1, Ordering::Relaxed);
133 true
134 }
135
136 /// Drop parked connections that have been idle longer than `max_age` so a
137 /// connection the backend has (or will) close on its own idle timeout is
138 /// not handed out stale, and idle capacity is released back to the OS.
139 /// Returns the number reaped. Intended to be called periodically by a
140 /// background task.
141 pub fn reap_idle(&self, max_age: Duration) -> usize {
142 let mut reaped = 0usize;
143 for mut entry in self.idle.iter_mut() {
144 let before = entry.value().len();
145 entry
146 .value_mut()
147 .retain(|(_, parked_at)| parked_at.elapsed() < max_age);
148 reaped += before - entry.value().len();
149 }
150 if reaped > 0 {
151 self.total_idle.fetch_sub(reaped, Ordering::Relaxed);
152 self.reaped.fetch_add(reaped as u64, Ordering::Relaxed);
153 }
154 reaped
155 }
156
157 /// Liveness probe for an idle parked connection: a clean idle backend has
158 /// no pending bytes, so a non-blocking read should report `WouldBlock`.
159 /// `Ok(0)` means the peer closed; `Ok(n>0)` means unexpected data (protocol
160 /// desync) — both are treated as dead.
161 fn probe_alive(stream: &TcpStream) -> bool {
162 let mut probe = [0u8; 1];
163 matches!(
164 stream.try_read(&mut probe),
165 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock
166 )
167 }
168
169 /// Total idle connections currently parked across all identities (O(1)).
170 pub fn idle_count(&self) -> usize {
171 self.total_idle.load(Ordering::Relaxed)
172 }
173
174 /// Global ceiling on parked idle connections.
175 pub fn max_total_idle(&self) -> usize {
176 self.max_total_idle
177 }
178
179 /// Number of parked connections dropped by the idle reaper (TTL).
180 pub fn reaped(&self) -> u64 {
181 self.reaped.load(Ordering::Relaxed)
182 }
183
184 /// Number of checkout hits (connections reused rather than dialed fresh).
185 pub fn reuses(&self) -> u64 {
186 self.reuses.load(Ordering::Relaxed)
187 }
188
189 /// Number of successful check-ins.
190 pub fn parked(&self) -> u64 {
191 self.parked.load(Ordering::Relaxed)
192 }
193
194 /// Number of check-ins refused for exceeding the per-key idle cap.
195 pub fn over_capacity(&self) -> u64 {
196 self.over_capacity.load(Ordering::Relaxed)
197 }
198
199 /// Number of stale connections evicted at checkout.
200 pub fn stale_evicted(&self) -> u64 {
201 self.stale_evicted.load(Ordering::Relaxed)
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use tokio::net::TcpListener;
209
210 /// Open a connected TcpStream pair against a throwaway loopback listener so
211 /// tests can exercise the pool's bookkeeping with real (live) sockets.
212 async fn live_stream(listener: &TcpListener) -> TcpStream {
213 let addr = listener.local_addr().unwrap();
214 let connect = TcpStream::connect(addr);
215 let accept = listener.accept();
216 let (client, _server) = tokio::join!(connect, accept);
217 // Keep the server side alive by leaking it into a long-lived holder via
218 // the caller; here we just return the client side. The accepted half is
219 // dropped, which is fine for liveness tests that re-accept per stream.
220 client.unwrap()
221 }
222
223 #[test]
224 fn pool_key_is_nul_delimited_and_distinct() {
225 assert_eq!(pool_key("n", "u", "d"), "n\0u\0d");
226 assert_ne!(pool_key("n", "ud", ""), pool_key("n", "u", "d"));
227 }
228
229 #[tokio::test]
230 async fn checkin_then_checkout_reuses_same_connection() {
231 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
232 let pool = BackendIdlePool::new(4, 1000);
233 let key = pool_key("127.0.0.1:5432", "bench", "benchdb");
234
235 // Park a live connection, then check it back out.
236 let s = live_stream(&listener).await;
237 let parked_addr = s.local_addr().unwrap();
238 assert!(pool.checkin(&key, s));
239 assert_eq!(pool.idle_count(), 1);
240
241 let got = pool
242 .checkout(&key)
243 .expect("a parked connection is reusable");
244 assert_eq!(got.local_addr().unwrap(), parked_addr, "same socket reused");
245 assert_eq!(pool.reuses(), 1);
246 assert_eq!(pool.idle_count(), 0);
247
248 // Empty pool → miss.
249 assert!(pool.checkout(&key).is_none());
250 }
251
252 #[tokio::test]
253 async fn distinct_identities_do_not_share() {
254 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
255 let pool = BackendIdlePool::new(4, 1000);
256 let alice = pool_key("n", "alice", "db");
257 let bob = pool_key("n", "bob", "db");
258
259 pool.checkin(&alice, live_stream(&listener).await);
260 // Bob must NOT see alice's connection.
261 assert!(pool.checkout(&bob).is_none());
262 assert!(pool.checkout(&alice).is_some());
263 }
264
265 #[tokio::test]
266 async fn per_key_cap_sheds_excess() {
267 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
268 let pool = BackendIdlePool::new(2, 1000);
269 let key = pool_key("n", "u", "d");
270
271 assert!(pool.checkin(&key, live_stream(&listener).await));
272 assert!(pool.checkin(&key, live_stream(&listener).await));
273 // Third exceeds the cap of 2 → refused (and dropped/closed).
274 assert!(!pool.checkin(&key, live_stream(&listener).await));
275 assert_eq!(pool.over_capacity(), 1);
276 assert_eq!(pool.idle_count(), 2);
277 }
278
279 #[tokio::test]
280 async fn checkout_evicts_a_closed_connection() {
281 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
282 let pool = BackendIdlePool::new(4, 1000);
283 let key = pool_key("n", "u", "d");
284
285 // Park a connection, then close the server side so the parked socket is
286 // dead.
287 let addr = listener.local_addr().unwrap();
288 let client = TcpStream::connect(addr).await.unwrap();
289 let (server, _) = listener.accept().await.unwrap();
290 pool.checkin(&key, client);
291 drop(server); // peer closes
292 // Give the close a moment to propagate.
293 tokio::task::yield_now().await;
294 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
295
296 // Checkout must not hand out the dead connection.
297 assert!(pool.checkout(&key).is_none());
298 assert_eq!(pool.stale_evicted(), 1);
299 }
300
301 #[tokio::test]
302 async fn global_cap_sheds_across_distinct_identities() {
303 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
304 // Per-key cap is generous (10) but the GLOBAL cap is 2.
305 let pool = BackendIdlePool::new(10, 2);
306 // Three different identities, one connection each.
307 assert!(pool.checkin(&pool_key("n", "a", "d"), live_stream(&listener).await));
308 assert!(pool.checkin(&pool_key("n", "b", "d"), live_stream(&listener).await));
309 // Third exceeds the global ceiling even though its per-key bucket is empty.
310 assert!(!pool.checkin(&pool_key("n", "c", "d"), live_stream(&listener).await));
311 assert_eq!(pool.idle_count(), 2);
312 assert_eq!(pool.over_capacity(), 1);
313 }
314
315 #[tokio::test]
316 async fn reaper_drops_aged_idle_connections() {
317 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
318 let pool = BackendIdlePool::new(4, 100);
319 let key = pool_key("n", "u", "d");
320 pool.checkin(&key, live_stream(&listener).await);
321 assert_eq!(pool.idle_count(), 1);
322
323 // Nothing reaped while within the TTL.
324 assert_eq!(pool.reap_idle(std::time::Duration::from_secs(60)), 0);
325 assert_eq!(pool.idle_count(), 1);
326
327 // Let it age, then reap with a tiny TTL.
328 tokio::time::sleep(std::time::Duration::from_millis(15)).await;
329 assert_eq!(pool.reap_idle(std::time::Duration::from_millis(5)), 1);
330 assert_eq!(pool.idle_count(), 0);
331 assert_eq!(pool.reaped(), 1);
332 // A subsequent checkout misses (it was reaped).
333 assert!(pool.checkout(&key).is_none());
334 }
335}