async_resource/pool/
mod.rs

1use std::collections::{BTreeMap, HashSet, VecDeque};
2use std::fmt::{self, Debug, Formatter};
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::{
6    atomic::{AtomicU8, Ordering},
7    Arc,
8};
9use std::task::{Context, Poll};
10use std::time::{Duration, Instant};
11
12use concurrent_queue::ConcurrentQueue;
13
14use futures_util::FutureExt;
15
16use super::resource::{ResourceGuard, ResourceInfo, ResourceLock};
17use super::shared::{DisposeFn, ReleaseFn, Shared, SharedEvent};
18use super::util::{sentinel::Sentinel, thread_waker};
19
20mod acquire;
21pub use acquire::Acquire;
22
23mod config;
24pub use config::PoolConfig;
25
26mod error;
27pub use error::AcquireError;
28
29mod executor;
30pub use executor::{default_executor, Executor};
31
32mod operation;
33use operation::{
34    resource_create, resource_verify, ResourceFuture, ResourceOperation, ResourceResolve,
35};
36
37mod wait;
38use wait::{waiter_pair, WaitResponder, Waiter};
39
40const ACTIVE: u8 = 0;
41const DRAIN: u8 = 1;
42const SHUTDOWN: u8 = 2;
43const STOPPED: u8 = 3;
44
45type BoxedOperation<T, E> = Box<dyn ResourceOperation<T, E> + Send + Sync>;
46
47type ErrorFn<E> = Box<dyn Fn(E) + Send + Sync>;
48
49type WaitResource<T, E> = WaitResponder<ResourceResolve<T, E>>;
50
51pub(crate) struct PoolManageState<T: Send + 'static, E: 'static> {
52    acquire_timeout: Option<Duration>,
53    create: BoxedOperation<T, E>,
54    executor: Box<dyn Executor>,
55    handle_error: Option<ErrorFn<E>>,
56    max_waiters: Option<usize>,
57    shared_waiter: thread_waker::Waiter,
58    register_inject: ConcurrentQueue<Register<T, E>>,
59    state: AtomicU8,
60    verify: Option<BoxedOperation<T, E>>,
61}
62
63pub struct PoolInternal<T: Send + 'static, E: 'static> {
64    manage: PoolManageState<T, E>,
65    shared: Arc<Shared<T>>,
66}
67
68impl<T: Send, E> PoolInternal<T, E> {
69    pub fn new(
70        acquire_timeout: Option<Duration>,
71        create: Box<dyn ResourceOperation<T, E> + Send + Sync>,
72        executor: Box<dyn Executor>,
73        handle_error: Option<Box<dyn Fn(E) + Send + Sync>>,
74        idle_timeout: Option<Duration>,
75        min_count: usize,
76        max_count: usize,
77        max_waiters: Option<usize>,
78        on_dispose: Option<DisposeFn<T>>,
79        on_release: Option<ReleaseFn<T>>,
80        verify: Option<Box<dyn ResourceOperation<T, E> + Send + Sync>>,
81    ) -> Self {
82        let (shared, shared_waiter) =
83            Shared::new(on_release, on_dispose, min_count, max_count, idle_timeout);
84        let manage = PoolManageState {
85            acquire_timeout,
86            create,
87            executor,
88            handle_error,
89            max_waiters,
90            register_inject: ConcurrentQueue::unbounded(),
91            shared_waiter,
92            state: AtomicU8::new(ACTIVE),
93            verify,
94        };
95        Self {
96            manage,
97            shared: Arc::new(shared),
98        }
99    }
100
101    pub fn create(self: &Arc<Self>) -> ResourceResolve<T, E> {
102        let mut info = ResourceInfo::default();
103        info.reusable = self.shared.can_reuse();
104        let lock = ResourceLock::new(info, None);
105        let guard = lock.try_lock().unwrap();
106
107        // Send the resource lock into the repo, for collection later
108        // Not concerned with failure here
109        self.shared
110            .push_event(SharedEvent::Created(guard.as_lock()));
111
112        self.manage.create.apply(guard, self)
113    }
114
115    pub fn create_from_count(&self) -> Option<usize> {
116        let count = self.shared.count();
117        let max = self.shared.max_count();
118        if max == 0 || max > count {
119            Some(count)
120        } else {
121            None
122        }
123    }
124
125    pub fn handle_error(&self, err: E) {
126        if let Some(handler) = self.manage.handle_error.as_ref() {
127            (handler)(err)
128        }
129    }
130
131    fn manage(self: Arc<Self>) {
132        let mut drain_count: usize = 0;
133        let inner = Sentinel::new(self, |inner, _| {
134            // Set the state to Stopped when this thread exits, whether normally
135            // or due to a panic.
136            inner.manage.state.store(STOPPED, Ordering::Release);
137        });
138        let mut last_dispose_count = 0;
139        let mut next_check = None;
140        let mut repo = HashSet::<ResourceLock<T>>::new();
141        let mut timer_id_source: usize = 0;
142        let mut timers = BTreeMap::<(Instant, usize), Timer<T, E>>::new();
143        let mut waiters = VecDeque::<WaitResource<T, E>>::new();
144
145        loop {
146            let remain_timers = timers.split_off(&(Instant::now(), 0));
147            for ((inst, _), timer) in timers {
148                match timer {
149                    Timer::Drain(_) => {
150                        // Cancel drain (waiter is notified by dropping it)
151                        drain_count -= 1;
152                        if drain_count == 0 {
153                            inner.stop_drain();
154                        }
155                    }
156                    Timer::Verify(res) => {
157                        if let Some(mut guard) = res.try_lock() {
158                            // Any clone of the resource lock in the idle queue can't be
159                            // acquired at the moment, so forget that lock and create a
160                            // new one
161                            repo.remove(&guard.as_lock());
162                            guard = guard.detach();
163
164                            if guard.is_some() && guard.info().verify_at == Some(inst) {
165                                // Act upon the verify timer
166                                inner.verify(guard);
167                            } else {
168                                // The resource was reacquired and released after the
169                                // verify timer was set, put it back into the idle queue.
170                                // FIXME attach timer to the resource lock so it can be
171                                // checked without acquiring it
172                                repo.insert(guard.as_lock());
173                                // FIXME this can trigger another verify!
174                                inner.shared.release(guard);
175                            }
176                        } else {
177                            // If the resource is locked, then a consumer thread
178                            // has acquired it or is in the process of doing so,
179                            // nothing to do here
180                        }
181                    }
182                    Timer::Waiter(waiter) => {
183                        // Try to cancel the waiter. This will only succeed if the
184                        // waiter hasn't been fulfilled already, or cancelled by the
185                        // other side.
186                        waiter.cancel();
187                    }
188                }
189            }
190            // remain_timers.drain_filter(|_, timer| timer.is_canceled());
191            timers = remain_timers;
192
193            if let Some(fst) = timers.keys().next() {
194                next_check = Some(next_check.map_or(fst.0, |t| std::cmp::min(t, fst.0)))
195            }
196
197            // Set the waiter state to Idle.
198            // If anything is subsequently added to the queues, it will move to Busy.
199            inner.manage.shared_waiter.prepare_wait();
200
201            while let Some(event) = inner.shared.pop_event() {
202                match event {
203                    SharedEvent::Created(res) => {
204                        repo.insert(res);
205                    }
206                    SharedEvent::Verify(expire, res) => {
207                        timer_id_source += 1;
208                        timers.insert((expire, timer_id_source), Timer::Verify(res));
209                    }
210                }
211            }
212
213            while let Ok(register) = inner.manage.register_inject.pop() {
214                match register {
215                    Register::Drain(expire, notify) => {
216                        inner.start_drain(false);
217                        timer_id_source += 1;
218                        timers.insert((expire, timer_id_source), Timer::Drain(notify));
219                        drain_count += 1;
220                    }
221                    Register::Waiter(start, waiter) => {
222                        if let Some(expire) =
223                            inner.manage.acquire_timeout.clone().map(|dur| start + dur)
224                        {
225                            timer_id_source += 1;
226                            timers.insert((expire, timer_id_source), Timer::Waiter(waiter.clone()));
227                        }
228                        waiters.push_back(waiter);
229                    }
230                }
231            }
232
233            if waiters.is_empty() {
234                // FIXME dispose idle resources over min_count
235                // because 'busy' allows them to return to the idle queue
236                inner.shared.set_busy(false);
237            } else {
238                // This triggers released resources to go to the idle queue even when
239                // there is no idle timeout, and prevents subsequent acquires from stealing
240                // directly from the idle queue so that the waiters are completed first
241                inner.shared.set_busy(true);
242
243                if inner.shared.have_idle() || inner.create_from_count().is_some() {
244                    let mut res = None;
245                    while let Some(waiter) = waiters.pop_front() {
246                        if waiter.is_canceled() {
247                            continue;
248                        }
249                        // Bypasses busy check because we are satisfying waiters
250                        if let Some(idle) = res.take().or_else(|| inner.shared.try_acquire_idle()) {
251                            if let Err(mut failed) =
252                                waiter.send((idle, inner.shared.clone()).into())
253                            {
254                                res = failed.take_resource()
255                            }
256                        } else {
257                            // Return waiter, no resources available
258                            waiters.push_front(waiter);
259                            break;
260                        }
261                    }
262                    // No active waiters were found, return the resource to the queue
263                    if let Some(res) = res {
264                        inner.shared.release(res);
265                    }
266                }
267            }
268
269            let state = inner.manage.state.load(Ordering::Acquire);
270            if state == DRAIN {
271                for res in repo.iter() {
272                    if let Some(guard) = res.try_lock() {
273                        inner.shared.dispose(guard);
274                    }
275                }
276                repo.retain(|res| !res.is_none());
277                if repo.is_empty() {
278                    break;
279                }
280            } else {
281                let dispose_count = inner.shared.dispose_count();
282                if last_dispose_count != dispose_count {
283                    repo.retain(|res| !res.is_none());
284                    last_dispose_count = dispose_count;
285                }
286            }
287
288            if let Some(next_check) = next_check {
289                inner.manage.shared_waiter.wait_until(next_check);
290            } else {
291                inner.manage.shared_waiter.wait();
292            }
293            next_check = None;
294        }
295
296        // FIXME wait for executor to complete
297
298        for (_, timer) in timers {
299            match timer {
300                Timer::Drain(waiter) => {
301                    // Send true to indicate that the shutdown succeeded.
302                    // Not bothered if the send fails.
303                    waiter.send(true).unwrap_or(());
304                }
305                // Drop any other waiters, leading them to be cancelled
306                _ => (),
307            }
308        }
309    }
310
311    pub fn complete_resolve(self: &Arc<Self>, res: ResourceResolve<T, E>) {
312        if res.is_pending() {
313            let inner = self.clone();
314            self.manage.executor.spawn_ok(
315                res.map(move |res| match res {
316                    Some(Ok(res)) => {
317                        inner.shared.release(res);
318                    }
319                    Some(Err(err)) => {
320                        inner.handle_error(err);
321                    }
322                    None => (),
323                })
324                .boxed(),
325            )
326        }
327    }
328
329    pub fn release_future(self: &Arc<Self>, fut: ResourceFuture<T, E>) {
330        self.complete_resolve(ResourceResolve::from((fut.boxed(), self.clone())));
331    }
332
333    fn register(&self, reg: Register<T, E>) {
334        self.manage
335            .register_inject
336            .push(reg)
337            .unwrap_or_else(|_| panic!("Pool manager injector error"));
338        self.shared.notify();
339    }
340
341    pub fn shared(&self) -> &Arc<Shared<T>> {
342        &self.shared
343    }
344
345    pub fn start_drain(self: &Arc<Self>, shutdown: bool) {
346        let mut state = ACTIVE;
347        let next_state = if shutdown { SHUTDOWN } else { DRAIN };
348        loop {
349            match self.manage.state.compare_exchange_weak(
350                state,
351                next_state,
352                Ordering::AcqRel,
353                Ordering::Acquire,
354            ) {
355                Ok(_) => {
356                    self.shared.notify();
357                }
358                Err(DRAIN) if next_state == DRAIN => {
359                    break;
360                }
361                Err(s @ ACTIVE) | Err(s @ DRAIN) => {
362                    state = s;
363                }
364                Err(SHUTDOWN) | Err(STOPPED) => {
365                    break;
366                }
367                Err(_) => panic!("Invalid pool state"),
368            }
369        }
370    }
371
372    pub fn stop_drain(self: &Arc<Self>) {
373        self.manage
374            .state
375            .compare_and_swap(DRAIN, ACTIVE, Ordering::AcqRel);
376    }
377
378    pub fn try_acquire_idle(self: &Arc<Self>) -> ResourceResolve<T, E> {
379        if self.shared.is_busy() {
380            ResourceResolve::empty()
381        } else {
382            // Wrap the resource guard to ensure it is returned to the queue
383            self.shared
384                .try_acquire_idle()
385                .map(|res| (res, self.shared.clone()))
386                .into()
387        }
388    }
389
390    pub fn try_create(self: &Arc<Self>) -> ResourceResolve<T, E> {
391        let mut count = match self.create_from_count() {
392            Some(c) => c,
393            None => return ResourceResolve::empty(),
394        };
395        let max = self.shared.max_count();
396
397        loop {
398            match self.shared.try_update_count(count, count + 1) {
399                Ok(_) => {
400                    break self.create();
401                }
402                Err(c) => {
403                    if c > count && max != 0 && c > max {
404                        // Count was increased beyond max by another thread
405                        break ResourceResolve::empty();
406                    }
407                    count = c;
408                    continue;
409                }
410            }
411        }
412    }
413
414    pub fn try_wait(self: &Arc<Self>, started: Instant) -> Waiter<ResourceResolve<T, E>> {
415        let (send, receive) = waiter_pair();
416        self.register(Register::Waiter(started, send));
417        self.shared.notify();
418        receive
419    }
420
421    pub fn verify(self: &Arc<Self>, guard: ResourceGuard<T>) {
422        if guard.info().reusable {
423            if let Some(verify) = self.manage.verify.as_ref() {
424                if self.shared.count() <= self.shared.min_count() {
425                    return self.complete_resolve(verify.apply(guard, &self));
426                }
427            }
428        }
429        self.shared.dispose(guard);
430    }
431}
432
433pub struct Pool<T: Send + 'static, E: 'static> {
434    pub(crate) inner: Sentinel<PoolInternal<T, E>>,
435}
436
437impl<T: Send, E> Pool<T, E> {
438    pub(crate) fn new(inner: PoolInternal<T, E>) -> Self {
439        let inner = Arc::new(inner);
440        let mgr = inner.clone();
441        let sentinel = Sentinel::new(inner, |inner, count| {
442            if count == 0 {
443                inner.start_drain(true);
444            }
445        });
446        std::thread::spawn(move || mgr.manage());
447        Self { inner: sentinel }
448    }
449
450    pub fn acquire(&self) -> Acquire<T, E> {
451        Acquire::new(self.clone())
452    }
453
454    pub fn count(&self) -> usize {
455        self.inner.shared.count()
456    }
457
458    pub fn drain(self, timeout: Duration) -> PoolDrain<T, E> {
459        let (send, receive) = waiter_pair();
460        self.inner
461            .register(Register::Drain(Instant::now() + timeout, send));
462        PoolDrain {
463            inner: Some(self.inner),
464            receive,
465        }
466    }
467}
468
469impl<T: Send, E> Clone for Pool<T, E> {
470    fn clone(&self) -> Self {
471        Self {
472            inner: self.inner.clone(),
473        }
474    }
475}
476
477impl<T: Send, E> Debug for Pool<T, E> {
478    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
479        f.debug_struct("Pool")
480            .field("count", &self.count())
481            .finish()
482    }
483}
484
485#[derive(Debug)]
486enum Register<T: Send + 'static, E: 'static> {
487    Drain(Instant, WaitResponder<bool>),
488    Waiter(Instant, WaitResource<T, E>),
489}
490
491#[derive(Debug)]
492enum Timer<T: Send + 'static, E: 'static> {
493    Drain(WaitResponder<bool>),
494    Verify(ResourceLock<T>),
495    Waiter(WaitResource<T, E>),
496}
497
498impl<T: Send + 'static, E: 'static> Timer<T, E> {
499    // fn is_canceled(&self) -> bool {
500    //     match self {
501    //         Self::Drain(ref wait) => wait.is_canceled(),
502    //         Self::Verify(..) => false,
503    //         Self::Waiter(ref wait) => wait.is_canceled(),
504    //     }
505    // }
506}
507
508pub struct PoolDrain<T: Send + 'static, E: 'static> {
509    inner: Option<Sentinel<PoolInternal<T, E>>>,
510    receive: Waiter<bool>,
511}
512
513impl<T: Send + 'static, E: 'static> Debug for PoolDrain<T, E> {
514    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
515        f.debug_struct("PoolDrain").finish()
516    }
517}
518
519impl<T: Send, E> Future for PoolDrain<T, E> {
520    type Output = Result<(), Pool<T, E>>;
521
522    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
523        if self.inner.is_none() {
524            return Poll::Ready(Ok(()));
525        }
526
527        match Pin::new(&mut *self.receive).poll(cx) {
528            Poll::Ready(done) => {
529                if done.unwrap_or(false) {
530                    Poll::Ready(Ok(()))
531                } else {
532                    Poll::Ready(Err(Pool {
533                        inner: self.inner.take().unwrap(),
534                    }))
535                }
536            }
537            Poll::Pending => Poll::Pending,
538        }
539    }
540}
541
542// FIXME test behaviour of cloned WaitResponder + close