1mod connector;
7mod map;
8mod reuse;
9use std::{
10 cell::{RefCell, RefMut, UnsafeCell},
11 collections::{HashMap, VecDeque},
12 fmt::Debug,
13 hash::Hash,
14 ops::{Deref, DerefMut},
15 rc::{Rc, Weak},
16 time::{Duration, Instant},
17};
18
19pub use connector::PooledConnector;
20pub use map::{ConnectorMap, ConnectorMapper};
21use monoio::io::{AsyncReadRent, AsyncWriteRent, Split};
22pub use reuse::{Reuse, ReuseConnector};
23
24pub(crate) const DEFAULT_KEEPALIVE_CONNS: usize = 1024;
25pub(crate) const DEFAULT_POOL_SIZE: usize = 32;
26pub(crate) const MAX_KEEPALIVE_CONNS: usize = 16384;
28
29pub trait Poolable {
30 fn is_open(&self) -> bool;
31}
32
33type SharedPool<K, IO> = Rc<UnsafeCell<PoolInner<K, IO>>>;
34type WeakPool<K, IO> = Weak<UnsafeCell<PoolInner<K, IO>>>;
35type WeakQueue<IO> = Weak<RefCell<VecDeque<Idle<IO>>>>;
36pub trait Key: Eq + Hash + Clone + 'static {}
37
38impl<T: Eq + Hash + Clone + 'static> Key for T {}
39
40pub struct Pooled<K: Key, T: Poolable> {
42 value: Option<T>,
43 is_reused: bool,
44 key: Option<K>,
45 pool: Option<WeakPool<K, T>>,
46 queue: Option<WeakQueue<T>>,
47 max_idle: usize,
48}
49
50unsafe impl<K: Key, T: Poolable + Split> Split for Pooled<K, T> {}
51
52impl<K: Key, I: Poolable + AsyncReadRent> AsyncReadRent for Pooled<K, I> {
53 #[inline]
54 fn read<T: monoio::buf::IoBufMut>(
55 &mut self,
56 buf: T,
57 ) -> impl std::future::Future<Output = monoio::BufResult<usize, T>> {
58 unsafe { self.value.as_mut().unwrap_unchecked() }.read(buf)
59 }
60
61 #[inline]
62 fn readv<T: monoio::buf::IoVecBufMut>(
63 &mut self,
64 buf: T,
65 ) -> impl std::future::Future<Output = monoio::BufResult<usize, T>> {
66 unsafe { self.value.as_mut().unwrap_unchecked() }.readv(buf)
67 }
68}
69
70impl<K: Key, I: Poolable + AsyncWriteRent> AsyncWriteRent for Pooled<K, I> {
71 #[inline]
72 fn write<T: monoio::buf::IoBuf>(
73 &mut self,
74 buf: T,
75 ) -> impl std::future::Future<Output = monoio::BufResult<usize, T>> {
76 unsafe { self.value.as_mut().unwrap_unchecked() }.write(buf)
77 }
78
79 #[inline]
80 fn writev<T: monoio::buf::IoVecBuf>(
81 &mut self,
82 buf_vec: T,
83 ) -> impl std::future::Future<Output = monoio::BufResult<usize, T>> {
84 unsafe { self.value.as_mut().unwrap_unchecked() }.writev(buf_vec)
85 }
86
87 #[inline]
88 fn flush(&mut self) -> impl std::future::Future<Output = std::io::Result<()>> {
89 unsafe { self.value.as_mut().unwrap_unchecked() }.flush()
90 }
91
92 #[inline]
93 fn shutdown(&mut self) -> impl std::future::Future<Output = std::io::Result<()>> {
94 unsafe { self.value.as_mut().unwrap_unchecked() }.shutdown()
95 }
96}
97
98impl<T: Poolable, K: Key> Pooled<K, T> {
99 #[inline]
100 pub(crate) const fn new(
101 key: K,
102 value: T,
103 is_reused: bool,
104 pool: WeakPool<K, T>,
105 queue: Option<WeakQueue<T>>,
106 max_idle: usize,
107 ) -> Self {
108 Self {
109 value: Some(value),
110 is_reused,
111 key: Some(key),
112 pool: Some(pool),
113 queue,
114 max_idle,
115 }
116 }
117
118 #[inline]
119 pub(crate) const fn unpooled(value: T) -> Self {
120 Self {
121 value: Some(value),
122 is_reused: false,
123 key: None,
124 pool: None,
125 queue: None,
126 max_idle: DEFAULT_KEEPALIVE_CONNS,
127 }
128 }
129
130 #[inline]
131 pub fn is_reused(&self) -> bool {
132 self.is_reused
133 }
134
135 #[inline]
136 fn as_ref(&self) -> &T {
137 self.value.as_ref().expect("not dropped")
138 }
139
140 #[inline]
141 fn as_mut(&mut self) -> &mut T {
142 self.value.as_mut().expect("not dropped")
143 }
144}
145
146impl<T: Poolable, K: Key> Deref for Pooled<K, T> {
147 type Target = T;
148
149 #[inline]
150 fn deref(&self) -> &T {
151 self.as_ref()
152 }
153}
154
155impl<T: Poolable, K: Key> DerefMut for Pooled<K, T> {
156 #[inline]
157 fn deref_mut(&mut self) -> &mut T {
158 self.as_mut()
159 }
160}
161
162impl<T: Poolable, K: Key> Drop for Pooled<K, T> {
163 fn drop(&mut self) {
164 if let Some(value) = self.value.take() {
165 if !value.is_open() {
166 return;
169 }
170
171 if let Some(weak_queue) = &self.queue {
173 if let Some(queue) = weak_queue.upgrade() {
174 let len = queue.borrow().len();
175 if len >= self.max_idle {
176 for _ in 0..len - self.max_idle {
177 let _ = queue.borrow_mut().pop_front();
178 }
179 }
180 let idle = Idle::new(value);
181 queue.borrow_mut().push_back(idle);
182 return;
183 }
184 }
185
186 if let Some(weak) = &self.pool {
187 if let Some(pool) = weak.upgrade() {
188 let pool = unsafe { &mut *pool.get() };
189 let key = self.key.take().expect("key is not empty");
190 let queue = pool.idle_conns.entry(key).or_insert(Rc::new(RefCell::new(
191 VecDeque::with_capacity(pool.max_idle),
192 )));
193
194 let len = queue.borrow().len();
195 if len >= pool.max_idle {
196 for _ in 0..len - pool.max_idle {
197 let _ = queue.borrow_mut().pop_front();
198 }
199 }
200
201 let idle = Idle::new(value);
202 queue.borrow_mut().push_back(idle);
203 }
204 }
205 }
206 }
207}
208
209pub(crate) struct Idle<IO> {
210 pub(crate) conn: IO,
211 idle_at: Instant,
212}
213
214impl<IO> Idle<IO> {
215 #[inline]
216 pub(crate) fn new(io: IO) -> Self {
217 Self {
218 conn: io,
219 idle_at: Instant::now(),
220 }
221 }
222
223 #[allow(unused)]
224 #[inline]
225 pub(crate) fn expired(&self, max_elapsed: Duration) -> bool {
226 self.idle_at.elapsed() > max_elapsed
227 }
228
229 #[allow(unused)]
230 #[inline]
231 pub(crate) fn expired_opt(&self, max_elapsed: Option<Duration>) -> bool {
232 match max_elapsed {
233 Some(e) => self.idle_at.elapsed() > e,
234 None => false,
235 }
236 }
237
238 #[allow(unused)]
239 #[inline]
240 pub(crate) fn reset_idle(&mut self) {
241 self.idle_at = Instant::now()
242 }
243}
244
245pub(crate) struct PoolInner<K, IO> {
246 idle_conns: HashMap<K, Rc<RefCell<VecDeque<Idle<IO>>>>>,
247 max_idle: usize,
248 #[cfg(feature = "time")]
249 idle_dur: Option<Duration>,
250 #[cfg(feature = "time")]
251 _drop: Option<local_sync::oneshot::Receiver<()>>,
252}
253
254impl<K, IO> PoolInner<K, IO> {
255 #[cfg(feature = "time")]
256 fn new_with_dropper(max_idle: Option<usize>) -> (local_sync::oneshot::Sender<()>, Self) {
257 let idle_conns = HashMap::with_capacity(DEFAULT_POOL_SIZE);
258 let max_idle = max_idle
259 .map(|n| n.min(MAX_KEEPALIVE_CONNS))
260 .unwrap_or(DEFAULT_KEEPALIVE_CONNS);
261
262 let (tx, drop) = local_sync::oneshot::channel();
263 (
264 tx,
265 Self {
266 idle_conns,
267 max_idle,
268 idle_dur: None,
269 _drop: Some(drop),
270 },
271 )
272 }
273
274 fn new(max_idle: Option<usize>) -> Self {
275 let idle_conns = HashMap::with_capacity(DEFAULT_POOL_SIZE);
276 let max_idle = max_idle
277 .map(|n| n.min(MAX_KEEPALIVE_CONNS))
278 .unwrap_or(DEFAULT_KEEPALIVE_CONNS);
279 Self {
280 idle_conns,
281 max_idle,
282 #[cfg(feature = "time")]
283 idle_dur: None,
284 #[cfg(feature = "time")]
285 _drop: None,
286 }
287 }
288
289 #[allow(unused)]
290 fn clear_expired(&mut self, dur: Duration) {
291 self.idle_conns.retain(|_, values| {
292 let mut values = values.borrow_mut();
293 values.retain(|entry| !entry.expired(dur));
294 !values.is_empty()
295 });
296 }
297}
298
299#[derive(Debug)]
300pub struct ConnectionPool<K, T> {
301 shared: SharedPool<K, T>,
302}
303
304impl<K, T> Clone for ConnectionPool<K, T> {
305 #[inline]
306 fn clone(&self) -> Self {
307 Self {
308 shared: self.shared.clone(),
309 }
310 }
311}
312
313impl<K: 'static, T: 'static> ConnectionPool<K, T> {
314 #[cfg(feature = "time")]
315 pub fn new_with_idle_interval(
316 idle_interval: Option<Duration>,
319 max_idle: Option<usize>,
321 ) -> Self {
322 const MIN_INTERVAL: Duration = Duration::from_secs(1);
323
324 if let Some(idle_interval) = idle_interval {
325 let idle_dur = idle_interval;
326 let idle_interval = idle_interval.max(MIN_INTERVAL);
327
328 let (tx, inner) = PoolInner::new_with_dropper(max_idle);
329 let shared = Rc::new(UnsafeCell::new(inner));
330 monoio::spawn(IdleTask {
331 tx,
332 conns: Rc::downgrade(&shared),
333 interval: monoio::time::interval(idle_interval),
334 idle_dur,
335 });
336
337 Self { shared }
338 } else {
339 let shared = Rc::new(UnsafeCell::new(PoolInner::new(max_idle)));
340 Self { shared }
341 }
342 }
343
344 #[inline]
345 pub fn new(max_idle: Option<usize>) -> Self {
346 Self {
347 shared: Rc::new(UnsafeCell::new(PoolInner::new(max_idle))),
348 }
349 }
350}
351
352impl<K: 'static, T: 'static> Default for ConnectionPool<K, T> {
353 fn default() -> Self {
354 Self::new(None)
355 }
356}
357
358impl<K: Key, T: Poolable> ConnectionPool<K, T> {
359 #[inline]
362 pub fn get(&self, key: &K) -> Option<Pooled<K, T>> {
363 let inner = unsafe { &mut *self.shared.get() };
364
365 #[cfg(feature = "time")]
366 loop {
367 let r = match inner.idle_conns.get_mut(key) {
368 Some(v) => match v.borrow_mut().pop_front() {
369 Some(idle) if !idle.expired_opt(inner.idle_dur) => Some(Pooled::new(
370 key.to_owned(),
371 idle.conn,
372 true,
373 Rc::downgrade(&self.shared),
374 Some(Rc::downgrade(v)),
375 inner.max_idle,
376 )),
377 Some(_) => {
378 continue;
379 }
380 None => None,
381 },
382 None => None,
383 };
384 return r;
385 }
386
387 #[cfg(not(feature = "time"))]
388 match inner.idle_conns.get_mut(key) {
389 Some(v) => match v.pop_front() {
390 Some(idle) => Some(Pooled::new(
391 key.to_owned(),
392 idle.conn,
393 true,
394 Rc::downgrade(&self.shared),
395 )),
396 None => None,
397 },
398 None => None,
399 }
400 }
401
402 #[inline]
403 pub fn put(&self, key: K, conn: T) {
404 let inner = unsafe { &mut *self.shared.get() };
405 let queue =
406 inner
407 .idle_conns
408 .entry(key)
409 .or_insert(Rc::new(RefCell::new(VecDeque::with_capacity(
410 inner.max_idle,
411 ))));
412
413 if queue.borrow().len() > inner.max_idle {
414 for _ in 0..queue.borrow().len() - inner.max_idle {
415 let _ = queue.borrow_mut().pop_front();
416 }
417 let _ = queue.borrow_mut().pop_front();
418 }
419
420 let idle = Idle::new(conn);
421 queue.borrow_mut().push_back(idle);
422 }
423
424 #[inline]
427 #[allow(unused)]
428 pub(crate) fn map_mut<F: FnOnce(RefMut<VecDeque<Idle<T>>>) -> O, O>(
429 &self,
430 key: &K,
431 f: F,
432 ) -> Option<O> {
433 let inner = unsafe { &mut *self.shared.get() };
434 inner.idle_conns.get_mut(key).map(|l| l.borrow_mut()).map(f)
435 }
436
437 #[inline]
440 #[allow(unused)]
441 pub(crate) fn and_then_mut<F: FnOnce(RefMut<VecDeque<Idle<T>>>) -> Option<O>, O>(
442 &self,
443 key: &K,
444 f: F,
445 ) -> Option<O> {
446 let inner = unsafe { &mut *self.shared.get() };
447 inner
448 .idle_conns
449 .get_mut(key)
450 .map(|l| l.borrow_mut())
451 .and_then(f)
452 }
453
454 #[inline]
455 pub fn link(&self, key: K, conn: T) -> Pooled<K, T> {
456 #[cfg(feature = "logging")]
457 tracing::debug!("linked new connection to the pool");
458
459 let inner = unsafe { &mut *self.shared.get() };
460 let queue = inner.idle_conns.get_mut(&key).map(|v| Rc::downgrade(v));
461
462 Pooled::new(
463 key,
464 conn,
465 false,
466 Rc::downgrade(&self.shared),
467 queue,
468 inner.max_idle,
469 )
470 }
471
472 #[inline]
473 pub fn get_idle_connection_count(&self) -> usize {
474 let inner: &PoolInner<K, T> = unsafe { &*self.shared.get() };
475 inner.idle_conns.values().map(|v| v.borrow().len()).sum()
476 }
477}
478
479#[cfg(feature = "time")]
481struct IdleTask<K, T> {
482 tx: local_sync::oneshot::Sender<()>,
483 conns: WeakPool<K, T>,
484 interval: monoio::time::Interval,
485 idle_dur: Duration,
486}
487
488#[cfg(feature = "time")]
489impl<K, T> std::future::Future for IdleTask<K, T> {
490 type Output = ();
491
492 fn poll(
493 self: std::pin::Pin<&mut Self>,
494 cx: &mut std::task::Context<'_>,
495 ) -> std::task::Poll<Self::Output> {
496 let this = self.get_mut();
497 loop {
498 match this.tx.poll_closed(cx) {
499 std::task::Poll::Ready(_) => {
500 #[cfg(feature = "logging")]
501 tracing::debug!("pool rx dropped, idle task exit");
502 return std::task::Poll::Ready(());
503 }
504 std::task::Poll::Pending => (),
505 }
506
507 std::task::ready!(this.interval.poll_tick(cx));
508 if let Some(inner) = this.conns.upgrade() {
509 let inner_mut = unsafe { &mut *inner.get() };
510 inner_mut.clear_expired(this.idle_dur);
511 #[cfg(feature = "logging")]
512 tracing::debug!("pool clear expired");
513 continue;
514 }
515 #[cfg(feature = "logging")]
516 tracing::debug!("pool upgrade failed, idle task exit");
517 return std::task::Poll::Ready(());
518 }
519 }
520}