Skip to main content

pingora_pool/
connection.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Generic connection pooling
16
17use log::{debug, warn};
18use parking_lot::{Mutex, RwLock};
19use pingora_timeout::{sleep, timeout};
20use std::collections::HashMap;
21use std::io;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::io::{AsyncRead, AsyncReadExt};
25use tokio::sync::{oneshot, watch, Notify, OwnedMutexGuard};
26
27use super::lru::Lru;
28
29type GroupKey = u64;
30#[cfg(unix)]
31type ID = i32;
32#[cfg(windows)]
33type ID = usize;
34
35/// the metadata of a connection
36#[derive(Clone, Debug)]
37pub struct ConnectionMeta {
38    /// The group key. All connections under the same key are considered the same for connection reuse.
39    pub key: GroupKey,
40    /// The unique ID of a connection.
41    pub id: ID,
42}
43
44impl ConnectionMeta {
45    /// Create a new [ConnectionMeta]
46    pub fn new(key: GroupKey, id: ID) -> Self {
47        ConnectionMeta { key, id }
48    }
49}
50
51struct PoolConnection<S> {
52    pub notify_use: oneshot::Sender<bool>,
53    pub connection: S,
54}
55
56impl<S> PoolConnection<S> {
57    pub fn new(notify_use: oneshot::Sender<bool>, connection: S) -> Self {
58        PoolConnection {
59            notify_use,
60            connection,
61        }
62    }
63
64    pub fn release(self) -> S {
65        // notify the idle watcher to release the connection
66        let _ = self.notify_use.send(true);
67        // wait for the watcher to release
68        self.connection
69    }
70}
71
72use crossbeam_queue::ArrayQueue;
73
74/// A pool of exchangeable items
75pub struct PoolNode<T> {
76    connections: Mutex<HashMap<ID, T>>,
77    // a small lock free queue to avoid lock contention
78    hot_queue: ArrayQueue<(ID, T)>,
79    // to avoid race between 2 evictions on the queue
80    hot_queue_remove_lock: Mutex<()>,
81    // TODO: store the GroupKey to avoid hash collision?
82}
83
84// Keep the queue size small because eviction is O(n) in the queue
85const HOT_QUEUE_SIZE: usize = 16;
86
87impl<T> PoolNode<T> {
88    /// Create a new [PoolNode]
89    pub fn new() -> Self {
90        PoolNode {
91            connections: Mutex::new(HashMap::new()),
92            hot_queue: ArrayQueue::new(HOT_QUEUE_SIZE),
93            hot_queue_remove_lock: Mutex::new(()),
94        }
95    }
96
97    /// Get any item from the pool
98    pub fn get_any(&self) -> Option<(ID, T)> {
99        let hot_conn = self.hot_queue.pop();
100        if hot_conn.is_some() {
101            return hot_conn;
102        }
103        let mut connections = self.connections.lock();
104        // find one connection, any connection will do
105        let id = match connections.iter().next() {
106            Some((k, _)) => *k, // OK to copy i32
107            None => return None,
108        };
109        // unwrap is safe since we just found it
110        let connection = connections.remove(&id).unwrap();
111        /* NOTE: we don't resize or drop empty connections hashmap
112         * We may want to do it if they consume too much memory
113         * maybe we should use trees to save memory */
114        Some((id, connection))
115        // connections.lock released here
116    }
117
118    /// Insert an item with the given unique ID into the pool
119    pub fn insert(&self, id: ID, conn: T) {
120        if let Err(node) = self.hot_queue.push((id, conn)) {
121            // hot queue is full
122            let mut connections = self.connections.lock();
123            connections.insert(node.0, node.1); // TODO: check dup
124        }
125    }
126
127    // This function acquires 2 locks and iterates over the entire hot queue.
128    // But it should be fine because remove() rarely happens on a busy PoolNode.
129    /// Remove the item associated with the id from the pool. The item is returned
130    /// if it is found and removed.
131    pub fn remove(&self, id: ID) -> Option<T> {
132        // check the table first as least recent used ones are likely there
133        let removed = self.connections.lock().remove(&id);
134        if removed.is_some() {
135            return removed;
136        } // lock drops here
137
138        let _queue_lock = self.hot_queue_remove_lock.lock();
139        // check the hot queue, note that the queue can be accessed in parallel by insert and get
140        let max_len = self.hot_queue.len();
141        for _ in 0..max_len {
142            if let Some((conn_id, conn)) = self.hot_queue.pop() {
143                if conn_id == id {
144                    // this is the item, it is already popped
145                    return Some(conn);
146                } else {
147                    // not this item, put back to hot queue, but it could also be full
148                    self.insert(conn_id, conn);
149                }
150            } else {
151                // other threads grab all the connections
152                return None;
153            }
154        }
155        None
156        // _queue_lock drops here
157    }
158}
159
160/// Connection pool
161///
162/// [ConnectionPool] holds reusable connections. A reusable connection is released to this pool to
163/// be picked up by another user/request.
164pub struct ConnectionPool<S> {
165    // TODO: n-way pools to reduce lock contention
166    pool: RwLock<HashMap<GroupKey, Arc<PoolNode<PoolConnection<S>>>>>,
167    lru: Lru<ID, ConnectionMeta>,
168}
169
170impl<S> ConnectionPool<S> {
171    /// Create a new [ConnectionPool] with a size limit.
172    ///
173    /// When a connection is released to this pool, the least recently used connection will be dropped.
174    pub fn new(size: usize) -> Self {
175        ConnectionPool {
176            pool: RwLock::new(HashMap::with_capacity(size)), // this is oversized since some connections will have the same key
177            lru: Lru::new(size),
178        }
179    }
180
181    /* get or create and insert a pool node for the hash key */
182    fn get_pool_node(&self, key: GroupKey) -> Arc<PoolNode<PoolConnection<S>>> {
183        {
184            let pool = self.pool.read();
185            if let Some(v) = pool.get(&key) {
186                return (*v).clone();
187            }
188        } // read lock released here
189
190        {
191            // write lock section
192            let mut pool = self.pool.write();
193            // check again since another task might have already added it
194            if let Some(v) = pool.get(&key) {
195                return (*v).clone();
196            }
197            let node = Arc::new(PoolNode::new());
198            let node_ret = node.clone();
199            pool.insert(key, node); // TODO: check dup
200            node_ret
201        }
202    }
203
204    // only remove from the pool because lru already removed it
205    fn pop_evicted(&self, meta: &ConnectionMeta) {
206        let pool_node = {
207            let pool = self.pool.read();
208            match pool.get(&meta.key) {
209                Some(v) => (*v).clone(),
210                None => {
211                    warn!("Fail to get pool node for {:?}", meta);
212                    return;
213                } // nothing to pop, should return error?
214            }
215        }; // read lock released here
216
217        pool_node.remove(meta.id);
218        debug!("evict fd: {} from key {}", meta.id, meta.key);
219    }
220
221    pub fn pop_closed(&self, meta: &ConnectionMeta) {
222        // NOTE: which of these should be done first?
223        self.pop_evicted(meta);
224        self.lru.pop(&meta.id);
225    }
226
227    /// Get a connection from this pool under the same group key
228    pub fn get(&self, key: &GroupKey) -> Option<S> {
229        let pool_node = {
230            let pool = self.pool.read();
231            match pool.get(key) {
232                Some(v) => (*v).clone(),
233                None => return None,
234            }
235        }; // read lock released here
236
237        if let Some((id, connection)) = pool_node.get_any() {
238            self.lru.pop(&id); // the notified is not needed
239            Some(connection.release())
240        } else {
241            None
242        }
243    }
244
245    /// Release a connection to this pool for reuse
246    ///
247    /// - The returned [`Arc<Notify>`] will notify any listen when the connection is evicted from the pool.
248    /// - The returned [`oneshot::Receiver<bool>`] will notify when the connection is being picked up by [Self::get()].
249    pub fn put(
250        &self,
251        meta: &ConnectionMeta,
252        connection: S,
253    ) -> (Arc<Notify>, oneshot::Receiver<bool>) {
254        let (notify_close, replaced) = self.lru.add(meta.id, meta.clone());
255        if let Some(meta) = replaced {
256            self.pop_evicted(&meta);
257        };
258        let pool_node = self.get_pool_node(meta.key);
259        let (notify_use, watch_use) = oneshot::channel();
260        let connection = PoolConnection::new(notify_use, connection);
261        pool_node.insert(meta.id, connection);
262        (notify_close, watch_use)
263    }
264
265    /// Actively monitor the health of a connection that is already released to this pool
266    ///
267    /// When the connection breaks, or the optional `timeout` is reached this function will
268    /// remove it from the pool and drop the connection.
269    ///
270    /// If the connection is reused via [Self::get()] or being evicted, this function will just exit.
271    pub async fn idle_poll<Stream>(
272        &self,
273        connection: OwnedMutexGuard<Stream>,
274        meta: &ConnectionMeta,
275        timeout: Option<Duration>,
276        notify_evicted: Arc<Notify>,
277        watch_use: oneshot::Receiver<bool>,
278    ) where
279        Stream: AsyncRead + Unpin + Send,
280    {
281        let read_result = tokio::select! {
282            biased;
283            _ = watch_use => {
284                debug!("idle connection is being picked up");
285                return
286            },
287            _ = notify_evicted.notified() => {
288                debug!("idle connection is being evicted");
289                // TODO: gracefully close the connection?
290                return
291            }
292            read_result = read_with_timeout(connection , timeout) => read_result
293        };
294
295        match read_result {
296            Ok(n) => {
297                if n > 0 {
298                    warn!("Data received on idle client connection, close it")
299                } else {
300                    debug!("Peer closed the idle connection or timeout")
301                }
302            }
303
304            Err(e) => {
305                debug!("error with the idle connection, close it {:?}", e);
306            }
307        }
308        // connection terminated from either peer or timer
309        self.pop_closed(meta);
310    }
311
312    /// Passively wait to close the connection after the timeout
313    ///
314    /// If this connection is not being picked up or evicted before the timeout is reach, this
315    /// function will remove it from the pool and close the connection.
316    pub async fn idle_timeout(
317        &self,
318        meta: &ConnectionMeta,
319        timeout: Option<Duration>,
320        notify_evicted: Arc<Notify>,
321        mut notify_closed: watch::Receiver<bool>,
322        watch_use: oneshot::Receiver<bool>,
323    ) {
324        tokio::select! {
325            biased;
326            _ = watch_use => {
327                debug!("idle connection is being picked up");
328            },
329            _ = notify_evicted.notified() => {
330                debug!("idle connection is being evicted");
331                // TODO: gracefully close the connection?
332            }
333            _ = notify_closed.changed() => {
334                // assume always changed from false to true
335                debug!("idle connection is being closed");
336                self.pop_closed(meta);
337            }
338            // async expression is evaluated if timeout is None but it's never polled, set it to MAX
339            _ = sleep(timeout.unwrap_or(Duration::MAX)), if timeout.is_some() => {
340                debug!("idle connection is being evicted");
341                self.pop_closed(meta);
342            }
343        };
344    }
345}
346
347async fn read_with_timeout<S>(
348    mut connection: OwnedMutexGuard<S>,
349    timeout_duration: Option<Duration>,
350) -> io::Result<usize>
351where
352    S: AsyncRead + Unpin + Send,
353{
354    let mut buf = [0; 1];
355    let read_event = connection.read(&mut buf[..]);
356    match timeout_duration {
357        Some(d) => match timeout(d, read_event).await {
358            Ok(res) => res,
359            Err(e) => {
360                debug!("keepalive timeout {:?} reached, {:?}", d, e);
361                Ok(0)
362            }
363        },
364        _ => read_event.await,
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use log::debug;
372    use tokio::sync::Mutex as AsyncMutex;
373    use tokio_test::io::{Builder, Mock};
374
375    #[tokio::test]
376    async fn test_lookup() {
377        let meta1 = ConnectionMeta::new(101, 1);
378        let value1 = "v1".to_string();
379        let meta2 = ConnectionMeta::new(102, 2);
380        let value2 = "v2".to_string();
381        let meta3 = ConnectionMeta::new(101, 3);
382        let value3 = "v3".to_string();
383        let cp: ConnectionPool<String> = ConnectionPool::new(3); //#CP3
384        cp.put(&meta1, value1.clone());
385        cp.put(&meta2, value2.clone());
386        cp.put(&meta3, value3.clone());
387
388        let found_b = cp.get(&meta2.key).unwrap();
389        assert_eq!(found_b, value2);
390
391        let found_a1 = cp.get(&meta1.key).unwrap();
392        let found_a2 = cp.get(&meta1.key).unwrap();
393
394        assert!(
395            found_a1 == value1 && found_a2 == value3 || found_a2 == value1 && found_a1 == value3
396        );
397    }
398
399    #[tokio::test]
400    async fn test_pop() {
401        let meta1 = ConnectionMeta::new(101, 1);
402        let value1 = "v1".to_string();
403        let meta2 = ConnectionMeta::new(102, 2);
404        let value2 = "v2".to_string();
405        let meta3 = ConnectionMeta::new(101, 3);
406        let value3 = "v3".to_string();
407        let cp: ConnectionPool<String> = ConnectionPool::new(3); //#CP3
408        cp.put(&meta1, value1);
409        cp.put(&meta2, value2);
410        cp.put(&meta3, value3.clone());
411
412        cp.pop_closed(&meta1);
413
414        let found_a1 = cp.get(&meta1.key).unwrap();
415        assert_eq!(found_a1, value3);
416
417        cp.pop_closed(&meta1);
418        assert!(cp.get(&meta1.key).is_none())
419    }
420
421    #[tokio::test]
422    async fn test_eviction() {
423        let meta1 = ConnectionMeta::new(101, 1);
424        let value1 = "v1".to_string();
425        let meta2 = ConnectionMeta::new(102, 2);
426        let value2 = "v2".to_string();
427        let meta3 = ConnectionMeta::new(101, 3);
428        let value3 = "v3".to_string();
429        let cp: ConnectionPool<String> = ConnectionPool::new(2);
430        let (notify_close1, _) = cp.put(&meta1, value1.clone());
431        let (notify_close2, _) = cp.put(&meta2, value2.clone());
432        let (notify_close3, _) = cp.put(&meta3, value3.clone()); // meta 1 should be evicted
433
434        let closed_item = tokio::select! {
435            _ = notify_close1.notified() => {debug!("notifier1"); 1},
436            _ = notify_close2.notified() => {debug!("notifier2"); 2},
437            _ = notify_close3.notified() => {debug!("notifier3"); 3},
438        };
439        assert_eq!(closed_item, 1);
440
441        let found_a1 = cp.get(&meta1.key).unwrap();
442        assert_eq!(found_a1, value3);
443        assert_eq!(cp.get(&meta1.key), None)
444    }
445
446    #[tokio::test]
447    #[should_panic(expected = "There is still data left to read.")]
448    async fn test_read_close() {
449        let meta1 = ConnectionMeta::new(101, 1);
450        let mock_io1 = Arc::new(AsyncMutex::new(Builder::new().read(b"garbage").build()));
451        let meta2 = ConnectionMeta::new(102, 2);
452        let mock_io2 = Arc::new(AsyncMutex::new(
453            Builder::new().wait(Duration::from_secs(99)).build(),
454        ));
455        let meta3 = ConnectionMeta::new(101, 3);
456        let mock_io3 = Arc::new(AsyncMutex::new(
457            Builder::new().wait(Duration::from_secs(99)).build(),
458        ));
459        let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(3);
460        let (c1, u1) = cp.put(&meta1, mock_io1.clone());
461        let (c2, u2) = cp.put(&meta2, mock_io2.clone());
462        let (c3, u3) = cp.put(&meta3, mock_io3.clone());
463
464        let closed_item = tokio::select! {
465            _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, None, c1, u1) => {debug!("notifier1"); 1},
466            _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, None, c2, u2) => {debug!("notifier2"); 2},
467            _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, None, c3, u3) => {debug!("notifier3"); 3},
468        };
469        assert_eq!(closed_item, 1);
470
471        let _ = cp.get(&meta1.key).unwrap(); // mock_io3 should be selected
472        assert!(cp.get(&meta1.key).is_none()) // mock_io1 should already be removed by idle_poll
473    }
474
475    #[tokio::test]
476    async fn test_read_timeout() {
477        let meta1 = ConnectionMeta::new(101, 1);
478        let mock_io1 = Arc::new(AsyncMutex::new(
479            Builder::new().wait(Duration::from_secs(99)).build(),
480        ));
481        let meta2 = ConnectionMeta::new(102, 2);
482        let mock_io2 = Arc::new(AsyncMutex::new(
483            Builder::new().wait(Duration::from_secs(99)).build(),
484        ));
485        let meta3 = ConnectionMeta::new(101, 3);
486        let mock_io3 = Arc::new(AsyncMutex::new(
487            Builder::new().wait(Duration::from_secs(99)).build(),
488        ));
489        let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(3);
490        let (c1, u1) = cp.put(&meta1, mock_io1.clone());
491        let (c2, u2) = cp.put(&meta2, mock_io2.clone());
492        let (c3, u3) = cp.put(&meta3, mock_io3.clone());
493
494        let closed_item = tokio::select! {
495            _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(1)), c1, u1) => {debug!("notifier1"); 1},
496            _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(2)), c2, u2) => {debug!("notifier2"); 2},
497            _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(3)), c3, u3) => {debug!("notifier3"); 3},
498        };
499        assert_eq!(closed_item, 1);
500
501        let _ = cp.get(&meta1.key).unwrap(); // mock_io3 should be selected
502        assert!(cp.get(&meta1.key).is_none()) // mock_io1 should already be removed by idle_poll
503    }
504
505    #[tokio::test]
506    async fn test_evict_poll() {
507        let meta1 = ConnectionMeta::new(101, 1);
508        let mock_io1 = Arc::new(AsyncMutex::new(
509            Builder::new().wait(Duration::from_secs(99)).build(),
510        ));
511        let meta2 = ConnectionMeta::new(102, 2);
512        let mock_io2 = Arc::new(AsyncMutex::new(
513            Builder::new().wait(Duration::from_secs(99)).build(),
514        ));
515        let meta3 = ConnectionMeta::new(101, 3);
516        let mock_io3 = Arc::new(AsyncMutex::new(
517            Builder::new().wait(Duration::from_secs(99)).build(),
518        ));
519        let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(2);
520        let (c1, u1) = cp.put(&meta1, mock_io1.clone());
521        let (c2, u2) = cp.put(&meta2, mock_io2.clone());
522        let (c3, u3) = cp.put(&meta3, mock_io3.clone()); // 1 should be evicted at this point
523
524        let closed_item = tokio::select! {
525            _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, None, c1, u1) => {debug!("notifier1"); 1},
526            _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, None, c2, u2) => {debug!("notifier2"); 2},
527            _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, None, c3, u3) => {debug!("notifier3"); 3},
528        };
529        assert_eq!(closed_item, 1);
530
531        let _ = cp.get(&meta1.key).unwrap(); // mock_io3 should be selected
532        assert!(cp.get(&meta1.key).is_none()) // mock_io1 should already be removed by idle_poll
533    }
534}