mysql_connector/pool/
async_pool.rs1use {
2 super::{AsyncPoolContent, AsyncPoolGetFuture, AsyncPoolTrait, PoolItem, PoolPut},
3 crossbeam::queue::{ArrayQueue, SegQueue},
4 std::{
5 future::Future,
6 mem::ManuallyDrop,
7 pin::Pin,
8 sync::atomic::{AtomicUsize, Ordering},
9 task::{self, Poll, Waker},
10 },
11};
12
13pub struct AsyncPool<T: AsyncPoolContent<C>, C, const N: usize> {
14 ctx: T::Ctx,
15 items: AtomicUsize,
16 pool: ArrayQueue<T>,
17 wakers: SegQueue<Waker>,
18}
19
20impl<T: AsyncPoolContent<C>, C, const N: usize> PoolPut<T> for AsyncPool<T, C, N> {
21 fn put(&self, value: T) {
22 let _ = self.pool.push(value);
24 }
25}
26
27impl<T: AsyncPoolContent<C>, C, const N: usize> AsyncPool<T, C, N> {
28 pub fn new(ctx: T::Ctx) -> Self {
29 Self {
30 ctx,
31 items: AtomicUsize::new(0),
32 pool: ArrayQueue::new(N),
33 wakers: SegQueue::new(),
34 }
35 }
36}
37
38impl<T: AsyncPoolContent<C>, C, const N: usize> AsyncPoolTrait<T> for AsyncPool<T, C, N> {
39 fn get(&self) -> Pin<Box<AsyncPoolGetFuture<'_, T>>> {
40 Box::pin(PoolTake {
41 pool: self,
42 add: None,
43 waker_added: false,
44 })
45 }
46}
47
48#[allow(clippy::type_complexity)]
49#[must_use = "futures do nothing unless you `.await` or poll them"]
50pub struct PoolTake<'a, T: AsyncPoolContent<C>, C, const N: usize> {
51 pool: &'a AsyncPool<T, C, N>,
52 add: Option<Pin<Box<dyn Future<Output = Result<T, T::Error>> + 'a>>>,
53 waker_added: bool,
54}
55
56impl<'a, T: AsyncPoolContent<C>, C, const N: usize> PoolTake<'a, T, C, N> {
57 fn poll_add(&mut self, cx: &mut task::Context<'_>) -> Option<Poll<<Self as Future>::Output>> {
58 self.add.as_mut().map(|add| {
59 add.as_mut().poll(cx).map(|res| {
60 res.map(|item| PoolItem {
61 item: ManuallyDrop::new(item),
62 pool: self.pool,
63 })
64 })
65 })
66 }
67}
68
69impl<'a, T: AsyncPoolContent<C>, C, const N: usize> Future for PoolTake<'a, T, C, N> {
70 type Output = Result<PoolItem<'a, T>, T::Error>;
71
72 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
73 let this = self.get_mut();
74 let pool = this.pool;
75 if let Some(res) = this.poll_add(cx) {
76 return res;
77 }
78 match pool.pool.pop() {
79 Some(item) => Poll::Ready(Ok(PoolItem {
80 item: ManuallyDrop::new(item),
81 pool,
82 })),
83 None => {
84 let item_count = pool.items.load(Ordering::Relaxed);
85 if item_count < N {
86 if pool
87 .items
88 .compare_exchange(
89 item_count,
90 item_count + 1,
91 Ordering::Relaxed,
92 Ordering::Relaxed,
93 )
94 .is_ok()
95 {
96 this.add = Some(Box::pin(T::new(&pool.ctx)));
97 if let Some(res) = this.poll_add(cx) {
98 return res;
99 }
100 } else {
101 cx.waker().wake_by_ref();
102 }
103 } else if !this.waker_added {
104 this.waker_added = true;
105 pool.wakers.push(cx.waker().clone());
106 cx.waker().wake_by_ref();
108 }
109 Poll::Pending
110 }
111 }
112 }
113}