pingora_pool/
connection.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Generic connection pooling
16
17use log::{debug, warn};
18use parking_lot::{Mutex, RwLock};
19use pingora_timeout::{sleep, timeout};
20use std::collections::HashMap;
21use std::io;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::io::{AsyncRead, AsyncReadExt};
25use tokio::sync::{oneshot, watch, Notify, OwnedMutexGuard};
26
27use super::lru::Lru;
28
29type GroupKey = u64;
30#[cfg(unix)]
31type ID = i32;
32#[cfg(windows)]
33type ID = usize;
34
35/// the metadata of a connection
36#[derive(Clone, Debug)]
37pub struct ConnectionMeta {
38    /// The group key. All connections under the same key are considered the same for connection reuse.
39    pub key: GroupKey,
40    /// The unique ID of a connection.
41    pub id: ID,
42}
43
44impl ConnectionMeta {
45    /// Create a new [ConnectionMeta]
46    pub fn new(key: GroupKey, id: ID) -> Self {
47        ConnectionMeta { key, id }
48    }
49}
50
51struct PoolConnection<S> {
52    pub notify_use: oneshot::Sender<bool>,
53    pub connection: S,
54}
55
56impl<S> PoolConnection<S> {
57    pub fn new(notify_use: oneshot::Sender<bool>, connection: S) -> Self {
58        PoolConnection {
59            notify_use,
60            connection,
61        }
62    }
63
64    pub fn release(self) -> S {
65        // notify the idle watcher to release the connection
66        let _ = self.notify_use.send(true);
67        // wait for the watcher to release
68        self.connection
69    }
70}
71
72use crossbeam_queue::ArrayQueue;
73
74/// A pool of exchangeable items
75pub struct PoolNode<T> {
76    connections: Mutex<HashMap<ID, T>>,
77    // a small lock free queue to avoid lock contention
78    hot_queue: ArrayQueue<(ID, T)>,
79    // to avoid race between 2 evictions on the queue
80    hot_queue_remove_lock: Mutex<()>,
81    // TODO: store the GroupKey to avoid hash collision?
82}
83
84// Keep the queue size small because eviction is O(n) in the queue
85const HOT_QUEUE_SIZE: usize = 16;
86
87impl<T> PoolNode<T> {
88    /// Create a new [PoolNode]
89    pub fn new() -> Self {
90        PoolNode {
91            connections: Mutex::new(HashMap::new()),
92            hot_queue: ArrayQueue::new(HOT_QUEUE_SIZE),
93            hot_queue_remove_lock: Mutex::new(()),
94        }
95    }
96
97    /// Get any item from the pool
98    pub fn get_any(&self) -> Option<(ID, T)> {
99        let hot_conn = self.hot_queue.pop();
100        if hot_conn.is_some() {
101            return hot_conn;
102        }
103        let mut connections = self.connections.lock();
104        // find one connection, any connection will do
105        let id = match connections.iter().next() {
106            Some((k, _)) => *k, // OK to copy i32
107            None => return None,
108        };
109        // unwrap is safe since we just found it
110        let connection = connections.remove(&id).unwrap();
111        /* NOTE: we don't resize or drop empty connections hashmap
112         * We may want to do it if they consume too much memory
113         * maybe we should use trees to save memory */
114        Some((id, connection))
115        // connections.lock released here
116    }
117
118    /// Insert an item with the given unique ID into the pool
119    pub fn insert(&self, id: ID, conn: T) {
120        if let Err(node) = self.hot_queue.push((id, conn)) {
121            // hot queue is full
122            let mut connections = self.connections.lock();
123            connections.insert(node.0, node.1); // TODO: check dup
124        }
125    }
126
127    // This function acquires 2 locks and iterates over the entire hot queue.
128    // But it should be fine because remove() rarely happens on a busy PoolNode.
129    /// Remove the item associated with the id from the pool. The item is returned
130    /// if it is found and removed.
131    pub fn remove(&self, id: ID) -> Option<T> {
132        // check the table first as least recent used ones are likely there
133        let removed = self.connections.lock().remove(&id);
134        if removed.is_some() {
135            return removed;
136        } // lock drops here
137
138        let _queue_lock = self.hot_queue_remove_lock.lock();
139        // check the hot queue, note that the queue can be accessed in parallel by insert and get
140        let max_len = self.hot_queue.len();
141        for _ in 0..max_len {
142            if let Some((conn_id, conn)) = self.hot_queue.pop() {
143                if conn_id == id {
144                    // this is the item, it is already popped
145                    return Some(conn);
146                } else {
147                    // not this item, put back to hot queue, but it could also be full
148                    self.insert(conn_id, conn);
149                }
150            } else {
151                // other threads grab all the connections
152                return None;
153            }
154        }
155        None
156        // _queue_lock drops here
157    }
158}
159
160/// Connection pool
161///
162/// [ConnectionPool] holds reusable connections. A reusable connection is released to this pool to
163/// be picked up by another user/request.
164pub struct ConnectionPool<S> {
165    // TODO: n-way pools to reduce lock contention
166    pool: RwLock<HashMap<GroupKey, Arc<PoolNode<PoolConnection<S>>>>>,
167    lru: Lru<ID, ConnectionMeta>,
168}
169
170impl<S> ConnectionPool<S> {
171    /// Create a new [ConnectionPool] with a size limit.
172    ///
173    /// When a connection is released to this pool, the least recently used connection will be dropped.
174    pub fn new(size: usize) -> Self {
175        ConnectionPool {
176            pool: RwLock::new(HashMap::with_capacity(size)), // this is oversized since some connections will have the same key
177            lru: Lru::new(size),
178        }
179    }
180
181    /* get or create and insert a pool node for the hash key */
182    fn get_pool_node(&self, key: GroupKey) -> Arc<PoolNode<PoolConnection<S>>> {
183        {
184            let pool = self.pool.read();
185            if let Some(v) = pool.get(&key) {
186                return (*v).clone();
187            }
188        } // read lock released here
189
190        {
191            // write lock section
192            let mut pool = self.pool.write();
193            // check again since another task might have already added it
194            if let Some(v) = pool.get(&key) {
195                return (*v).clone();
196            }
197            let node = Arc::new(PoolNode::new());
198            let node_ret = node.clone();
199            pool.insert(key, node); // TODO: check dup
200            node_ret
201        }
202    }
203
204    // only remove from the pool because lru already removed it
205    fn pop_evicted(&self, meta: &ConnectionMeta) {
206        let pool_node = {
207            let pool = self.pool.read();
208            match pool.get(&meta.key) {
209                Some(v) => (*v).clone(),
210                None => {
211                    warn!("Fail to get pool node for {:?}", meta);
212                    return;
213                } // nothing to pop, should return error?
214            }
215        }; // read lock released here
216
217        pool_node.remove(meta.id);
218        debug!("evict fd: {} from key {}", meta.id, meta.key);
219    }
220
221    pub fn pop_closed(&self, meta: &ConnectionMeta) {
222        // NOTE: which of these should be done first?
223        self.pop_evicted(meta);
224        self.lru.pop(&meta.id);
225    }
226
227    /// Get a connection from this pool under the same group key
228    pub fn get(&self, key: &GroupKey) -> Option<S> {
229        let pool_node = {
230            let pool = self.pool.read();
231            match pool.get(key) {
232                Some(v) => (*v).clone(),
233                None => return None,
234            }
235        }; // read lock released here
236
237        if let Some((id, connection)) = pool_node.get_any() {
238            self.lru.pop(&id); // the notified is not needed
239            Some(connection.release())
240        } else {
241            None
242        }
243    }
244
245    /// Release a connection to this pool for reuse
246    ///
247    /// - The returned [`Arc<Notify>`] will notify any listen when the connection is evicted from the pool.
248    /// - The returned [`oneshot::Receiver<bool>`] will notify when the connection is being picked up by [Self::get()].
249    pub fn put(
250        &self,
251        meta: &ConnectionMeta,
252        connection: S,
253    ) -> (Arc<Notify>, oneshot::Receiver<bool>) {
254        let (notify_close, replaced) = self.lru.add(meta.id, meta.clone());
255        if let Some(meta) = replaced {
256            self.pop_evicted(&meta);
257        };
258        let pool_node = self.get_pool_node(meta.key);
259        let (notify_use, watch_use) = oneshot::channel();
260        let connection = PoolConnection::new(notify_use, connection);
261        pool_node.insert(meta.id, connection);
262        (notify_close, watch_use)
263    }
264
265    /// Actively monitor the health of a connection that is already released to this pool
266    ///
267    /// When the connection breaks, or the optional `timeout` is reached this function will
268    /// remove it from the pool and drop the connection.
269    ///
270    /// If the connection is reused via [Self::get()] or being evicted, this function will just exit.
271    pub async fn idle_poll<Stream>(
272        &self,
273        connection: OwnedMutexGuard<Stream>,
274        meta: &ConnectionMeta,
275        timeout: Option<Duration>,
276        notify_evicted: Arc<Notify>,
277        watch_use: oneshot::Receiver<bool>,
278    ) where
279        Stream: AsyncRead + Unpin + Send,
280    {
281        let read_result = tokio::select! {
282            biased;
283            _ = watch_use => {
284                debug!("idle connection is being picked up");
285                return
286            },
287            _ = notify_evicted.notified() => {
288                debug!("idle connection is being evicted");
289                // TODO: gracefully close the connection?
290                return
291            }
292            read_result = read_with_timeout(connection , timeout) => read_result
293        };
294
295        match read_result {
296            Ok(n) => {
297                if n > 0 {
298                    warn!("Data received on idle client connection, close it")
299                } else {
300                    debug!("Peer closed the idle connection or timeout")
301                }
302            }
303
304            Err(e) => {
305                debug!("error with the idle connection, close it {:?}", e);
306            }
307        }
308        // connection terminated from either peer or timer
309        self.pop_closed(meta);
310    }
311
312    /// Passively wait to close the connection after the timeout
313    ///
314    /// If this connection is not being picked up or evicted before the timeout is reach, this
315    /// function will remove it from the pool and close the connection.
316    pub async fn idle_timeout(
317        &self,
318        meta: &ConnectionMeta,
319        timeout: Duration,
320        notify_evicted: Arc<Notify>,
321        mut notify_closed: watch::Receiver<bool>,
322        watch_use: oneshot::Receiver<bool>,
323    ) {
324        tokio::select! {
325            biased;
326            _ = watch_use => {
327                debug!("idle connection is being picked up");
328            },
329            _ = notify_evicted.notified() => {
330                debug!("idle connection is being evicted");
331                // TODO: gracefully close the connection?
332            }
333            _ = notify_closed.changed() => {
334                // assume always changed from false to true
335                debug!("idle connection is being closed");
336                self.pop_closed(meta);
337            }
338            _ = sleep(timeout) => {
339                debug!("idle connection is being evicted");
340                self.pop_closed(meta);
341            }
342        };
343    }
344}
345
346async fn read_with_timeout<S>(
347    mut connection: OwnedMutexGuard<S>,
348    timeout_duration: Option<Duration>,
349) -> io::Result<usize>
350where
351    S: AsyncRead + Unpin + Send,
352{
353    let mut buf = [0; 1];
354    let read_event = connection.read(&mut buf[..]);
355    match timeout_duration {
356        Some(d) => match timeout(d, read_event).await {
357            Ok(res) => res,
358            Err(e) => {
359                debug!("keepalive timeout {:?} reached, {:?}", d, e);
360                Ok(0)
361            }
362        },
363        _ => read_event.await,
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use log::debug;
371    use tokio::sync::Mutex as AsyncMutex;
372    use tokio_test::io::{Builder, Mock};
373
374    #[tokio::test]
375    async fn test_lookup() {
376        let meta1 = ConnectionMeta::new(101, 1);
377        let value1 = "v1".to_string();
378        let meta2 = ConnectionMeta::new(102, 2);
379        let value2 = "v2".to_string();
380        let meta3 = ConnectionMeta::new(101, 3);
381        let value3 = "v3".to_string();
382        let cp: ConnectionPool<String> = ConnectionPool::new(3); //#CP3
383        cp.put(&meta1, value1.clone());
384        cp.put(&meta2, value2.clone());
385        cp.put(&meta3, value3.clone());
386
387        let found_b = cp.get(&meta2.key).unwrap();
388        assert_eq!(found_b, value2);
389
390        let found_a1 = cp.get(&meta1.key).unwrap();
391        let found_a2 = cp.get(&meta1.key).unwrap();
392
393        assert!(
394            found_a1 == value1 && found_a2 == value3 || found_a2 == value1 && found_a1 == value3
395        );
396    }
397
398    #[tokio::test]
399    async fn test_pop() {
400        let meta1 = ConnectionMeta::new(101, 1);
401        let value1 = "v1".to_string();
402        let meta2 = ConnectionMeta::new(102, 2);
403        let value2 = "v2".to_string();
404        let meta3 = ConnectionMeta::new(101, 3);
405        let value3 = "v3".to_string();
406        let cp: ConnectionPool<String> = ConnectionPool::new(3); //#CP3
407        cp.put(&meta1, value1);
408        cp.put(&meta2, value2);
409        cp.put(&meta3, value3.clone());
410
411        cp.pop_closed(&meta1);
412
413        let found_a1 = cp.get(&meta1.key).unwrap();
414        assert_eq!(found_a1, value3);
415
416        cp.pop_closed(&meta1);
417        assert!(cp.get(&meta1.key).is_none())
418    }
419
420    #[tokio::test]
421    async fn test_eviction() {
422        let meta1 = ConnectionMeta::new(101, 1);
423        let value1 = "v1".to_string();
424        let meta2 = ConnectionMeta::new(102, 2);
425        let value2 = "v2".to_string();
426        let meta3 = ConnectionMeta::new(101, 3);
427        let value3 = "v3".to_string();
428        let cp: ConnectionPool<String> = ConnectionPool::new(2);
429        let (notify_close1, _) = cp.put(&meta1, value1.clone());
430        let (notify_close2, _) = cp.put(&meta2, value2.clone());
431        let (notify_close3, _) = cp.put(&meta3, value3.clone()); // meta 1 should be evicted
432
433        let closed_item = tokio::select! {
434            _ = notify_close1.notified() => {debug!("notifier1"); 1},
435            _ = notify_close2.notified() => {debug!("notifier2"); 2},
436            _ = notify_close3.notified() => {debug!("notifier3"); 3},
437        };
438        assert_eq!(closed_item, 1);
439
440        let found_a1 = cp.get(&meta1.key).unwrap();
441        assert_eq!(found_a1, value3);
442        assert_eq!(cp.get(&meta1.key), None)
443    }
444
445    #[tokio::test]
446    #[should_panic(expected = "There is still data left to read.")]
447    async fn test_read_close() {
448        let meta1 = ConnectionMeta::new(101, 1);
449        let mock_io1 = Arc::new(AsyncMutex::new(Builder::new().read(b"garbage").build()));
450        let meta2 = ConnectionMeta::new(102, 2);
451        let mock_io2 = Arc::new(AsyncMutex::new(
452            Builder::new().wait(Duration::from_secs(99)).build(),
453        ));
454        let meta3 = ConnectionMeta::new(101, 3);
455        let mock_io3 = Arc::new(AsyncMutex::new(
456            Builder::new().wait(Duration::from_secs(99)).build(),
457        ));
458        let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(3);
459        let (c1, u1) = cp.put(&meta1, mock_io1.clone());
460        let (c2, u2) = cp.put(&meta2, mock_io2.clone());
461        let (c3, u3) = cp.put(&meta3, mock_io3.clone());
462
463        let closed_item = tokio::select! {
464            _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, None, c1, u1) => {debug!("notifier1"); 1},
465            _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, None, c2, u2) => {debug!("notifier2"); 2},
466            _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, None, c3, u3) => {debug!("notifier3"); 3},
467        };
468        assert_eq!(closed_item, 1);
469
470        let _ = cp.get(&meta1.key).unwrap(); // mock_io3 should be selected
471        assert!(cp.get(&meta1.key).is_none()) // mock_io1 should already be removed by idle_poll
472    }
473
474    #[tokio::test]
475    async fn test_read_timeout() {
476        let meta1 = ConnectionMeta::new(101, 1);
477        let mock_io1 = Arc::new(AsyncMutex::new(
478            Builder::new().wait(Duration::from_secs(99)).build(),
479        ));
480        let meta2 = ConnectionMeta::new(102, 2);
481        let mock_io2 = Arc::new(AsyncMutex::new(
482            Builder::new().wait(Duration::from_secs(99)).build(),
483        ));
484        let meta3 = ConnectionMeta::new(101, 3);
485        let mock_io3 = Arc::new(AsyncMutex::new(
486            Builder::new().wait(Duration::from_secs(99)).build(),
487        ));
488        let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(3);
489        let (c1, u1) = cp.put(&meta1, mock_io1.clone());
490        let (c2, u2) = cp.put(&meta2, mock_io2.clone());
491        let (c3, u3) = cp.put(&meta3, mock_io3.clone());
492
493        let closed_item = tokio::select! {
494            _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(1)), c1, u1) => {debug!("notifier1"); 1},
495            _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(2)), c2, u2) => {debug!("notifier2"); 2},
496            _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(3)), c3, u3) => {debug!("notifier3"); 3},
497        };
498        assert_eq!(closed_item, 1);
499
500        let _ = cp.get(&meta1.key).unwrap(); // mock_io3 should be selected
501        assert!(cp.get(&meta1.key).is_none()) // mock_io1 should already be removed by idle_poll
502    }
503
504    #[tokio::test]
505    async fn test_evict_poll() {
506        let meta1 = ConnectionMeta::new(101, 1);
507        let mock_io1 = Arc::new(AsyncMutex::new(
508            Builder::new().wait(Duration::from_secs(99)).build(),
509        ));
510        let meta2 = ConnectionMeta::new(102, 2);
511        let mock_io2 = Arc::new(AsyncMutex::new(
512            Builder::new().wait(Duration::from_secs(99)).build(),
513        ));
514        let meta3 = ConnectionMeta::new(101, 3);
515        let mock_io3 = Arc::new(AsyncMutex::new(
516            Builder::new().wait(Duration::from_secs(99)).build(),
517        ));
518        let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(2);
519        let (c1, u1) = cp.put(&meta1, mock_io1.clone());
520        let (c2, u2) = cp.put(&meta2, mock_io2.clone());
521        let (c3, u3) = cp.put(&meta3, mock_io3.clone()); // 1 should be evicted at this point
522
523        let closed_item = tokio::select! {
524            _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, None, c1, u1) => {debug!("notifier1"); 1},
525            _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, None, c2, u2) => {debug!("notifier2"); 2},
526            _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, None, c3, u3) => {debug!("notifier3"); 3},
527        };
528        assert_eq!(closed_item, 1);
529
530        let _ = cp.get(&meta1.key).unwrap(); // mock_io3 should be selected
531        assert!(cp.get(&meta1.key).is_none()) // mock_io1 should already be removed by idle_poll
532    }
533}