monoio_transports/pool/
mod.rs

1//! Provides connection pooling functionality for efficient connection reuse.
2//!
3//! This module includes the `ConnectionPool` for managing connections,
4//! `Pooled` for representing pooled connections, and related traits and types
5//! for implementing and interacting with connection pools.
6mod connector;
7mod map;
8mod reuse;
9use std::{
10    cell::{RefCell, RefMut, UnsafeCell},
11    collections::{HashMap, VecDeque},
12    fmt::Debug,
13    hash::Hash,
14    ops::{Deref, DerefMut},
15    rc::{Rc, Weak},
16    time::{Duration, Instant},
17};
18
19pub use connector::PooledConnector;
20pub use map::{ConnectorMap, ConnectorMapper};
21use monoio::io::{AsyncReadRent, AsyncWriteRent, Split};
22pub use reuse::{Reuse, ReuseConnector};
23
24pub(crate) const DEFAULT_KEEPALIVE_CONNS: usize = 1024;
25pub(crate) const DEFAULT_POOL_SIZE: usize = 32;
26// https://datatracker.ietf.org/doc/html/rfc6335
27pub(crate) const MAX_KEEPALIVE_CONNS: usize = 16384;
28
29pub trait Poolable {
30    fn is_open(&self) -> bool;
31}
32
33type SharedPool<K, IO> = Rc<UnsafeCell<PoolInner<K, IO>>>;
34type WeakPool<K, IO> = Weak<UnsafeCell<PoolInner<K, IO>>>;
35type WeakQueue<IO> = Weak<RefCell<VecDeque<Idle<IO>>>>;
36pub trait Key: Eq + Hash + Clone + 'static {}
37
38impl<T: Eq + Hash + Clone + 'static> Key for T {}
39
40// Partly borrow from hyper-util. All rights reserved.
41pub struct Pooled<K: Key, T: Poolable> {
42    value: Option<T>,
43    is_reused: bool,
44    key: Option<K>,
45    pool: Option<WeakPool<K, T>>,
46    queue: Option<WeakQueue<T>>,
47    max_idle: usize,
48}
49
50unsafe impl<K: Key, T: Poolable + Split> Split for Pooled<K, T> {}
51
52impl<K: Key, I: Poolable + AsyncReadRent> AsyncReadRent for Pooled<K, I> {
53    #[inline]
54    fn read<T: monoio::buf::IoBufMut>(
55        &mut self,
56        buf: T,
57    ) -> impl std::future::Future<Output = monoio::BufResult<usize, T>> {
58        unsafe { self.value.as_mut().unwrap_unchecked() }.read(buf)
59    }
60
61    #[inline]
62    fn readv<T: monoio::buf::IoVecBufMut>(
63        &mut self,
64        buf: T,
65    ) -> impl std::future::Future<Output = monoio::BufResult<usize, T>> {
66        unsafe { self.value.as_mut().unwrap_unchecked() }.readv(buf)
67    }
68}
69
70impl<K: Key, I: Poolable + AsyncWriteRent> AsyncWriteRent for Pooled<K, I> {
71    #[inline]
72    fn write<T: monoio::buf::IoBuf>(
73        &mut self,
74        buf: T,
75    ) -> impl std::future::Future<Output = monoio::BufResult<usize, T>> {
76        unsafe { self.value.as_mut().unwrap_unchecked() }.write(buf)
77    }
78
79    #[inline]
80    fn writev<T: monoio::buf::IoVecBuf>(
81        &mut self,
82        buf_vec: T,
83    ) -> impl std::future::Future<Output = monoio::BufResult<usize, T>> {
84        unsafe { self.value.as_mut().unwrap_unchecked() }.writev(buf_vec)
85    }
86
87    #[inline]
88    fn flush(&mut self) -> impl std::future::Future<Output = std::io::Result<()>> {
89        unsafe { self.value.as_mut().unwrap_unchecked() }.flush()
90    }
91
92    #[inline]
93    fn shutdown(&mut self) -> impl std::future::Future<Output = std::io::Result<()>> {
94        unsafe { self.value.as_mut().unwrap_unchecked() }.shutdown()
95    }
96}
97
98impl<T: Poolable, K: Key> Pooled<K, T> {
99    #[inline]
100    pub(crate) const fn new(
101        key: K,
102        value: T,
103        is_reused: bool,
104        pool: WeakPool<K, T>,
105        queue: Option<WeakQueue<T>>,
106        max_idle: usize,
107    ) -> Self {
108        Self {
109            value: Some(value),
110            is_reused,
111            key: Some(key),
112            pool: Some(pool),
113            queue,
114            max_idle,
115        }
116    }
117
118    #[inline]
119    pub(crate) const fn unpooled(value: T) -> Self {
120        Self {
121            value: Some(value),
122            is_reused: false,
123            key: None,
124            pool: None,
125            queue: None,
126            max_idle: DEFAULT_KEEPALIVE_CONNS,
127        }
128    }
129
130    #[inline]
131    pub fn is_reused(&self) -> bool {
132        self.is_reused
133    }
134
135    #[inline]
136    fn as_ref(&self) -> &T {
137        self.value.as_ref().expect("not dropped")
138    }
139
140    #[inline]
141    fn as_mut(&mut self) -> &mut T {
142        self.value.as_mut().expect("not dropped")
143    }
144}
145
146impl<T: Poolable, K: Key> Deref for Pooled<K, T> {
147    type Target = T;
148
149    #[inline]
150    fn deref(&self) -> &T {
151        self.as_ref()
152    }
153}
154
155impl<T: Poolable, K: Key> DerefMut for Pooled<K, T> {
156    #[inline]
157    fn deref_mut(&mut self) -> &mut T {
158        self.as_mut()
159    }
160}
161
162impl<T: Poolable, K: Key> Drop for Pooled<K, T> {
163    fn drop(&mut self) {
164        if let Some(value) = self.value.take() {
165            if !value.is_open() {
166                // If we *already* know the connection is done here,
167                // it shouldn't be re-inserted back into the pool.
168                return;
169            }
170
171            // Add the connection back to the queue directly if the weak reference is still alive.
172            if let Some(weak_queue) = &self.queue {
173                if let Some(queue) = weak_queue.upgrade() {
174                    let len = queue.borrow().len();
175                    if len >= self.max_idle {
176                        for _ in 0..len - self.max_idle {
177                            let _ = queue.borrow_mut().pop_front();
178                        }
179                    }
180                    let idle = Idle::new(value);
181                    queue.borrow_mut().push_back(idle);
182                    return;
183                }
184            }
185
186            if let Some(weak) = &self.pool {
187                if let Some(pool) = weak.upgrade() {
188                    let pool = unsafe { &mut *pool.get() };
189                    let key = self.key.take().expect("key is not empty");
190                    let queue = pool.idle_conns.entry(key).or_insert(Rc::new(RefCell::new(
191                        VecDeque::with_capacity(pool.max_idle),
192                    )));
193
194                    let len = queue.borrow().len();
195                    if len >= pool.max_idle {
196                        for _ in 0..len - pool.max_idle {
197                            let _ = queue.borrow_mut().pop_front();
198                        }
199                    }
200
201                    let idle = Idle::new(value);
202                    queue.borrow_mut().push_back(idle);
203                }
204            }
205        }
206    }
207}
208
209pub(crate) struct Idle<IO> {
210    pub(crate) conn: IO,
211    idle_at: Instant,
212}
213
214impl<IO> Idle<IO> {
215    #[inline]
216    pub(crate) fn new(io: IO) -> Self {
217        Self {
218            conn: io,
219            idle_at: Instant::now(),
220        }
221    }
222
223    #[allow(unused)]
224    #[inline]
225    pub(crate) fn expired(&self, max_elapsed: Duration) -> bool {
226        self.idle_at.elapsed() > max_elapsed
227    }
228
229    #[allow(unused)]
230    #[inline]
231    pub(crate) fn expired_opt(&self, max_elapsed: Option<Duration>) -> bool {
232        match max_elapsed {
233            Some(e) => self.idle_at.elapsed() > e,
234            None => false,
235        }
236    }
237
238    #[allow(unused)]
239    #[inline]
240    pub(crate) fn reset_idle(&mut self) {
241        self.idle_at = Instant::now()
242    }
243}
244
245pub(crate) struct PoolInner<K, IO> {
246    idle_conns: HashMap<K, Rc<RefCell<VecDeque<Idle<IO>>>>>,
247    max_idle: usize,
248    #[cfg(feature = "time")]
249    idle_dur: Option<Duration>,
250    #[cfg(feature = "time")]
251    _drop: Option<local_sync::oneshot::Receiver<()>>,
252}
253
254impl<K, IO> PoolInner<K, IO> {
255    #[cfg(feature = "time")]
256    fn new_with_dropper(max_idle: Option<usize>) -> (local_sync::oneshot::Sender<()>, Self) {
257        let idle_conns = HashMap::with_capacity(DEFAULT_POOL_SIZE);
258        let max_idle = max_idle
259            .map(|n| n.min(MAX_KEEPALIVE_CONNS))
260            .unwrap_or(DEFAULT_KEEPALIVE_CONNS);
261
262        let (tx, drop) = local_sync::oneshot::channel();
263        (
264            tx,
265            Self {
266                idle_conns,
267                max_idle,
268                idle_dur: None,
269                _drop: Some(drop),
270            },
271        )
272    }
273
274    fn new(max_idle: Option<usize>) -> Self {
275        let idle_conns = HashMap::with_capacity(DEFAULT_POOL_SIZE);
276        let max_idle = max_idle
277            .map(|n| n.min(MAX_KEEPALIVE_CONNS))
278            .unwrap_or(DEFAULT_KEEPALIVE_CONNS);
279        Self {
280            idle_conns,
281            max_idle,
282            #[cfg(feature = "time")]
283            idle_dur: None,
284            #[cfg(feature = "time")]
285            _drop: None,
286        }
287    }
288
289    #[allow(unused)]
290    fn clear_expired(&mut self, dur: Duration) {
291        self.idle_conns.retain(|_, values| {
292            let mut values = values.borrow_mut();
293            values.retain(|entry| !entry.expired(dur));
294            !values.is_empty()
295        });
296    }
297}
298
299#[derive(Debug)]
300pub struct ConnectionPool<K, T> {
301    shared: SharedPool<K, T>,
302}
303
304impl<K, T> Clone for ConnectionPool<K, T> {
305    #[inline]
306    fn clone(&self) -> Self {
307        Self {
308            shared: self.shared.clone(),
309        }
310    }
311}
312
313impl<K: 'static, T: 'static> ConnectionPool<K, T> {
314    #[cfg(feature = "time")]
315    pub fn new_with_idle_interval(
316        // `idle_interval` controls how often the pool will check for idle connections.
317        // It is also used to determine if a connection is expired.
318        idle_interval: Option<Duration>,
319        // `max_idle` is max idle connection count
320        max_idle: Option<usize>,
321    ) -> Self {
322        const MIN_INTERVAL: Duration = Duration::from_secs(1);
323
324        if let Some(idle_interval) = idle_interval {
325            let idle_dur = idle_interval;
326            let idle_interval = idle_interval.max(MIN_INTERVAL);
327
328            let (tx, inner) = PoolInner::new_with_dropper(max_idle);
329            let shared = Rc::new(UnsafeCell::new(inner));
330            monoio::spawn(IdleTask {
331                tx,
332                conns: Rc::downgrade(&shared),
333                interval: monoio::time::interval(idle_interval),
334                idle_dur,
335            });
336
337            Self { shared }
338        } else {
339            let shared = Rc::new(UnsafeCell::new(PoolInner::new(max_idle)));
340            Self { shared }
341        }
342    }
343
344    #[inline]
345    pub fn new(max_idle: Option<usize>) -> Self {
346        Self {
347            shared: Rc::new(UnsafeCell::new(PoolInner::new(max_idle))),
348        }
349    }
350}
351
352impl<K: 'static, T: 'static> Default for ConnectionPool<K, T> {
353    fn default() -> Self {
354        Self::new(None)
355    }
356}
357
358impl<K: Key, T: Poolable> ConnectionPool<K, T> {
359    /// Consume the element and return.
360    /// Mostly use by h1.
361    #[inline]
362    pub fn get(&self, key: &K) -> Option<Pooled<K, T>> {
363        let inner = unsafe { &mut *self.shared.get() };
364
365        #[cfg(feature = "time")]
366        loop {
367            let r = match inner.idle_conns.get_mut(key) {
368                Some(v) => match v.borrow_mut().pop_front() {
369                    Some(idle) if !idle.expired_opt(inner.idle_dur) => Some(Pooled::new(
370                        key.to_owned(),
371                        idle.conn,
372                        true,
373                        Rc::downgrade(&self.shared),
374                        Some(Rc::downgrade(v)),
375                        inner.max_idle,
376                    )),
377                    Some(_) => {
378                        continue;
379                    }
380                    None => None,
381                },
382                None => None,
383            };
384            return r;
385        }
386
387        #[cfg(not(feature = "time"))]
388        match inner.idle_conns.get_mut(key) {
389            Some(v) => match v.pop_front() {
390                Some(idle) => Some(Pooled::new(
391                    key.to_owned(),
392                    idle.conn,
393                    true,
394                    Rc::downgrade(&self.shared),
395                )),
396                None => None,
397            },
398            None => None,
399        }
400    }
401
402    #[inline]
403    pub fn put(&self, key: K, conn: T) {
404        let inner = unsafe { &mut *self.shared.get() };
405        let queue =
406            inner
407                .idle_conns
408                .entry(key)
409                .or_insert(Rc::new(RefCell::new(VecDeque::with_capacity(
410                    inner.max_idle,
411                ))));
412
413        if queue.borrow().len() > inner.max_idle {
414            for _ in 0..queue.borrow().len() - inner.max_idle {
415                let _ = queue.borrow_mut().pop_front();
416            }
417            let _ = queue.borrow_mut().pop_front();
418        }
419
420        let idle = Idle::new(conn);
421        queue.borrow_mut().push_back(idle);
422    }
423
424    /// Get a reference to the element and apply f with map.
425    /// Mostly use by h2.
426    #[inline]
427    #[allow(unused)]
428    pub(crate) fn map_mut<F: FnOnce(RefMut<VecDeque<Idle<T>>>) -> O, O>(
429        &self,
430        key: &K,
431        f: F,
432    ) -> Option<O> {
433        let inner = unsafe { &mut *self.shared.get() };
434        inner.idle_conns.get_mut(key).map(|l| l.borrow_mut()).map(f)
435    }
436
437    /// Get a reference to the element and apply f with and_then.
438    /// Mostly use by h2.
439    #[inline]
440    #[allow(unused)]
441    pub(crate) fn and_then_mut<F: FnOnce(RefMut<VecDeque<Idle<T>>>) -> Option<O>, O>(
442        &self,
443        key: &K,
444        f: F,
445    ) -> Option<O> {
446        let inner = unsafe { &mut *self.shared.get() };
447        inner
448            .idle_conns
449            .get_mut(key)
450            .map(|l| l.borrow_mut())
451            .and_then(f)
452    }
453
454    #[inline]
455    pub fn link(&self, key: K, conn: T) -> Pooled<K, T> {
456        #[cfg(feature = "logging")]
457        tracing::debug!("linked new connection to the pool");
458
459        let inner = unsafe { &mut *self.shared.get() };
460        let queue = inner.idle_conns.get_mut(&key).map(|v| Rc::downgrade(v));
461
462        Pooled::new(
463            key,
464            conn,
465            false,
466            Rc::downgrade(&self.shared),
467            queue,
468            inner.max_idle,
469        )
470    }
471
472    #[inline]
473    pub fn get_idle_connection_count(&self) -> usize {
474        let inner: &PoolInner<K, T> = unsafe { &*self.shared.get() };
475        inner.idle_conns.values().map(|v| v.borrow().len()).sum()
476    }
477}
478
479// TODO: make interval not eq to idle_dur
480#[cfg(feature = "time")]
481struct IdleTask<K, T> {
482    tx: local_sync::oneshot::Sender<()>,
483    conns: WeakPool<K, T>,
484    interval: monoio::time::Interval,
485    idle_dur: Duration,
486}
487
488#[cfg(feature = "time")]
489impl<K, T> std::future::Future for IdleTask<K, T> {
490    type Output = ();
491
492    fn poll(
493        self: std::pin::Pin<&mut Self>,
494        cx: &mut std::task::Context<'_>,
495    ) -> std::task::Poll<Self::Output> {
496        let this = self.get_mut();
497        loop {
498            match this.tx.poll_closed(cx) {
499                std::task::Poll::Ready(_) => {
500                    #[cfg(feature = "logging")]
501                    tracing::debug!("pool rx dropped, idle task exit");
502                    return std::task::Poll::Ready(());
503                }
504                std::task::Poll::Pending => (),
505            }
506
507            std::task::ready!(this.interval.poll_tick(cx));
508            if let Some(inner) = this.conns.upgrade() {
509                let inner_mut = unsafe { &mut *inner.get() };
510                inner_mut.clear_expired(this.idle_dur);
511                #[cfg(feature = "logging")]
512                tracing::debug!("pool clear expired");
513                continue;
514            }
515            #[cfg(feature = "logging")]
516            tracing::debug!("pool upgrade failed, idle task exit");
517            return std::task::Poll::Ready(());
518        }
519    }
520}