1use std::collections::{BTreeMap, HashSet, VecDeque};
2use std::fmt::{self, Debug, Formatter};
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::{
6 atomic::{AtomicU8, Ordering},
7 Arc,
8};
9use std::task::{Context, Poll};
10use std::time::{Duration, Instant};
11
12use concurrent_queue::ConcurrentQueue;
13
14use futures_util::FutureExt;
15
16use super::resource::{ResourceGuard, ResourceInfo, ResourceLock};
17use super::shared::{DisposeFn, ReleaseFn, Shared, SharedEvent};
18use super::util::{sentinel::Sentinel, thread_waker};
19
20mod acquire;
21pub use acquire::Acquire;
22
23mod config;
24pub use config::PoolConfig;
25
26mod error;
27pub use error::AcquireError;
28
29mod executor;
30pub use executor::{default_executor, Executor};
31
32mod operation;
33use operation::{
34 resource_create, resource_verify, ResourceFuture, ResourceOperation, ResourceResolve,
35};
36
37mod wait;
38use wait::{waiter_pair, WaitResponder, Waiter};
39
40const ACTIVE: u8 = 0;
41const DRAIN: u8 = 1;
42const SHUTDOWN: u8 = 2;
43const STOPPED: u8 = 3;
44
45type BoxedOperation<T, E> = Box<dyn ResourceOperation<T, E> + Send + Sync>;
46
47type ErrorFn<E> = Box<dyn Fn(E) + Send + Sync>;
48
49type WaitResource<T, E> = WaitResponder<ResourceResolve<T, E>>;
50
51pub(crate) struct PoolManageState<T: Send + 'static, E: 'static> {
52 acquire_timeout: Option<Duration>,
53 create: BoxedOperation<T, E>,
54 executor: Box<dyn Executor>,
55 handle_error: Option<ErrorFn<E>>,
56 max_waiters: Option<usize>,
57 shared_waiter: thread_waker::Waiter,
58 register_inject: ConcurrentQueue<Register<T, E>>,
59 state: AtomicU8,
60 verify: Option<BoxedOperation<T, E>>,
61}
62
63pub struct PoolInternal<T: Send + 'static, E: 'static> {
64 manage: PoolManageState<T, E>,
65 shared: Arc<Shared<T>>,
66}
67
68impl<T: Send, E> PoolInternal<T, E> {
69 pub fn new(
70 acquire_timeout: Option<Duration>,
71 create: Box<dyn ResourceOperation<T, E> + Send + Sync>,
72 executor: Box<dyn Executor>,
73 handle_error: Option<Box<dyn Fn(E) + Send + Sync>>,
74 idle_timeout: Option<Duration>,
75 min_count: usize,
76 max_count: usize,
77 max_waiters: Option<usize>,
78 on_dispose: Option<DisposeFn<T>>,
79 on_release: Option<ReleaseFn<T>>,
80 verify: Option<Box<dyn ResourceOperation<T, E> + Send + Sync>>,
81 ) -> Self {
82 let (shared, shared_waiter) =
83 Shared::new(on_release, on_dispose, min_count, max_count, idle_timeout);
84 let manage = PoolManageState {
85 acquire_timeout,
86 create,
87 executor,
88 handle_error,
89 max_waiters,
90 register_inject: ConcurrentQueue::unbounded(),
91 shared_waiter,
92 state: AtomicU8::new(ACTIVE),
93 verify,
94 };
95 Self {
96 manage,
97 shared: Arc::new(shared),
98 }
99 }
100
101 pub fn create(self: &Arc<Self>) -> ResourceResolve<T, E> {
102 let mut info = ResourceInfo::default();
103 info.reusable = self.shared.can_reuse();
104 let lock = ResourceLock::new(info, None);
105 let guard = lock.try_lock().unwrap();
106
107 self.shared
110 .push_event(SharedEvent::Created(guard.as_lock()));
111
112 self.manage.create.apply(guard, self)
113 }
114
115 pub fn create_from_count(&self) -> Option<usize> {
116 let count = self.shared.count();
117 let max = self.shared.max_count();
118 if max == 0 || max > count {
119 Some(count)
120 } else {
121 None
122 }
123 }
124
125 pub fn handle_error(&self, err: E) {
126 if let Some(handler) = self.manage.handle_error.as_ref() {
127 (handler)(err)
128 }
129 }
130
131 fn manage(self: Arc<Self>) {
132 let mut drain_count: usize = 0;
133 let inner = Sentinel::new(self, |inner, _| {
134 inner.manage.state.store(STOPPED, Ordering::Release);
137 });
138 let mut last_dispose_count = 0;
139 let mut next_check = None;
140 let mut repo = HashSet::<ResourceLock<T>>::new();
141 let mut timer_id_source: usize = 0;
142 let mut timers = BTreeMap::<(Instant, usize), Timer<T, E>>::new();
143 let mut waiters = VecDeque::<WaitResource<T, E>>::new();
144
145 loop {
146 let remain_timers = timers.split_off(&(Instant::now(), 0));
147 for ((inst, _), timer) in timers {
148 match timer {
149 Timer::Drain(_) => {
150 drain_count -= 1;
152 if drain_count == 0 {
153 inner.stop_drain();
154 }
155 }
156 Timer::Verify(res) => {
157 if let Some(mut guard) = res.try_lock() {
158 repo.remove(&guard.as_lock());
162 guard = guard.detach();
163
164 if guard.is_some() && guard.info().verify_at == Some(inst) {
165 inner.verify(guard);
167 } else {
168 repo.insert(guard.as_lock());
173 inner.shared.release(guard);
175 }
176 } else {
177 }
181 }
182 Timer::Waiter(waiter) => {
183 waiter.cancel();
187 }
188 }
189 }
190 timers = remain_timers;
192
193 if let Some(fst) = timers.keys().next() {
194 next_check = Some(next_check.map_or(fst.0, |t| std::cmp::min(t, fst.0)))
195 }
196
197 inner.manage.shared_waiter.prepare_wait();
200
201 while let Some(event) = inner.shared.pop_event() {
202 match event {
203 SharedEvent::Created(res) => {
204 repo.insert(res);
205 }
206 SharedEvent::Verify(expire, res) => {
207 timer_id_source += 1;
208 timers.insert((expire, timer_id_source), Timer::Verify(res));
209 }
210 }
211 }
212
213 while let Ok(register) = inner.manage.register_inject.pop() {
214 match register {
215 Register::Drain(expire, notify) => {
216 inner.start_drain(false);
217 timer_id_source += 1;
218 timers.insert((expire, timer_id_source), Timer::Drain(notify));
219 drain_count += 1;
220 }
221 Register::Waiter(start, waiter) => {
222 if let Some(expire) =
223 inner.manage.acquire_timeout.clone().map(|dur| start + dur)
224 {
225 timer_id_source += 1;
226 timers.insert((expire, timer_id_source), Timer::Waiter(waiter.clone()));
227 }
228 waiters.push_back(waiter);
229 }
230 }
231 }
232
233 if waiters.is_empty() {
234 inner.shared.set_busy(false);
237 } else {
238 inner.shared.set_busy(true);
242
243 if inner.shared.have_idle() || inner.create_from_count().is_some() {
244 let mut res = None;
245 while let Some(waiter) = waiters.pop_front() {
246 if waiter.is_canceled() {
247 continue;
248 }
249 if let Some(idle) = res.take().or_else(|| inner.shared.try_acquire_idle()) {
251 if let Err(mut failed) =
252 waiter.send((idle, inner.shared.clone()).into())
253 {
254 res = failed.take_resource()
255 }
256 } else {
257 waiters.push_front(waiter);
259 break;
260 }
261 }
262 if let Some(res) = res {
264 inner.shared.release(res);
265 }
266 }
267 }
268
269 let state = inner.manage.state.load(Ordering::Acquire);
270 if state == DRAIN {
271 for res in repo.iter() {
272 if let Some(guard) = res.try_lock() {
273 inner.shared.dispose(guard);
274 }
275 }
276 repo.retain(|res| !res.is_none());
277 if repo.is_empty() {
278 break;
279 }
280 } else {
281 let dispose_count = inner.shared.dispose_count();
282 if last_dispose_count != dispose_count {
283 repo.retain(|res| !res.is_none());
284 last_dispose_count = dispose_count;
285 }
286 }
287
288 if let Some(next_check) = next_check {
289 inner.manage.shared_waiter.wait_until(next_check);
290 } else {
291 inner.manage.shared_waiter.wait();
292 }
293 next_check = None;
294 }
295
296 for (_, timer) in timers {
299 match timer {
300 Timer::Drain(waiter) => {
301 waiter.send(true).unwrap_or(());
304 }
305 _ => (),
307 }
308 }
309 }
310
311 pub fn complete_resolve(self: &Arc<Self>, res: ResourceResolve<T, E>) {
312 if res.is_pending() {
313 let inner = self.clone();
314 self.manage.executor.spawn_ok(
315 res.map(move |res| match res {
316 Some(Ok(res)) => {
317 inner.shared.release(res);
318 }
319 Some(Err(err)) => {
320 inner.handle_error(err);
321 }
322 None => (),
323 })
324 .boxed(),
325 )
326 }
327 }
328
329 pub fn release_future(self: &Arc<Self>, fut: ResourceFuture<T, E>) {
330 self.complete_resolve(ResourceResolve::from((fut.boxed(), self.clone())));
331 }
332
333 fn register(&self, reg: Register<T, E>) {
334 self.manage
335 .register_inject
336 .push(reg)
337 .unwrap_or_else(|_| panic!("Pool manager injector error"));
338 self.shared.notify();
339 }
340
341 pub fn shared(&self) -> &Arc<Shared<T>> {
342 &self.shared
343 }
344
345 pub fn start_drain(self: &Arc<Self>, shutdown: bool) {
346 let mut state = ACTIVE;
347 let next_state = if shutdown { SHUTDOWN } else { DRAIN };
348 loop {
349 match self.manage.state.compare_exchange_weak(
350 state,
351 next_state,
352 Ordering::AcqRel,
353 Ordering::Acquire,
354 ) {
355 Ok(_) => {
356 self.shared.notify();
357 }
358 Err(DRAIN) if next_state == DRAIN => {
359 break;
360 }
361 Err(s @ ACTIVE) | Err(s @ DRAIN) => {
362 state = s;
363 }
364 Err(SHUTDOWN) | Err(STOPPED) => {
365 break;
366 }
367 Err(_) => panic!("Invalid pool state"),
368 }
369 }
370 }
371
372 pub fn stop_drain(self: &Arc<Self>) {
373 self.manage
374 .state
375 .compare_and_swap(DRAIN, ACTIVE, Ordering::AcqRel);
376 }
377
378 pub fn try_acquire_idle(self: &Arc<Self>) -> ResourceResolve<T, E> {
379 if self.shared.is_busy() {
380 ResourceResolve::empty()
381 } else {
382 self.shared
384 .try_acquire_idle()
385 .map(|res| (res, self.shared.clone()))
386 .into()
387 }
388 }
389
390 pub fn try_create(self: &Arc<Self>) -> ResourceResolve<T, E> {
391 let mut count = match self.create_from_count() {
392 Some(c) => c,
393 None => return ResourceResolve::empty(),
394 };
395 let max = self.shared.max_count();
396
397 loop {
398 match self.shared.try_update_count(count, count + 1) {
399 Ok(_) => {
400 break self.create();
401 }
402 Err(c) => {
403 if c > count && max != 0 && c > max {
404 break ResourceResolve::empty();
406 }
407 count = c;
408 continue;
409 }
410 }
411 }
412 }
413
414 pub fn try_wait(self: &Arc<Self>, started: Instant) -> Waiter<ResourceResolve<T, E>> {
415 let (send, receive) = waiter_pair();
416 self.register(Register::Waiter(started, send));
417 self.shared.notify();
418 receive
419 }
420
421 pub fn verify(self: &Arc<Self>, guard: ResourceGuard<T>) {
422 if guard.info().reusable {
423 if let Some(verify) = self.manage.verify.as_ref() {
424 if self.shared.count() <= self.shared.min_count() {
425 return self.complete_resolve(verify.apply(guard, &self));
426 }
427 }
428 }
429 self.shared.dispose(guard);
430 }
431}
432
433pub struct Pool<T: Send + 'static, E: 'static> {
434 pub(crate) inner: Sentinel<PoolInternal<T, E>>,
435}
436
437impl<T: Send, E> Pool<T, E> {
438 pub(crate) fn new(inner: PoolInternal<T, E>) -> Self {
439 let inner = Arc::new(inner);
440 let mgr = inner.clone();
441 let sentinel = Sentinel::new(inner, |inner, count| {
442 if count == 0 {
443 inner.start_drain(true);
444 }
445 });
446 std::thread::spawn(move || mgr.manage());
447 Self { inner: sentinel }
448 }
449
450 pub fn acquire(&self) -> Acquire<T, E> {
451 Acquire::new(self.clone())
452 }
453
454 pub fn count(&self) -> usize {
455 self.inner.shared.count()
456 }
457
458 pub fn drain(self, timeout: Duration) -> PoolDrain<T, E> {
459 let (send, receive) = waiter_pair();
460 self.inner
461 .register(Register::Drain(Instant::now() + timeout, send));
462 PoolDrain {
463 inner: Some(self.inner),
464 receive,
465 }
466 }
467}
468
469impl<T: Send, E> Clone for Pool<T, E> {
470 fn clone(&self) -> Self {
471 Self {
472 inner: self.inner.clone(),
473 }
474 }
475}
476
477impl<T: Send, E> Debug for Pool<T, E> {
478 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
479 f.debug_struct("Pool")
480 .field("count", &self.count())
481 .finish()
482 }
483}
484
485#[derive(Debug)]
486enum Register<T: Send + 'static, E: 'static> {
487 Drain(Instant, WaitResponder<bool>),
488 Waiter(Instant, WaitResource<T, E>),
489}
490
491#[derive(Debug)]
492enum Timer<T: Send + 'static, E: 'static> {
493 Drain(WaitResponder<bool>),
494 Verify(ResourceLock<T>),
495 Waiter(WaitResource<T, E>),
496}
497
498impl<T: Send + 'static, E: 'static> Timer<T, E> {
499 }
507
508pub struct PoolDrain<T: Send + 'static, E: 'static> {
509 inner: Option<Sentinel<PoolInternal<T, E>>>,
510 receive: Waiter<bool>,
511}
512
513impl<T: Send + 'static, E: 'static> Debug for PoolDrain<T, E> {
514 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
515 f.debug_struct("PoolDrain").finish()
516 }
517}
518
519impl<T: Send, E> Future for PoolDrain<T, E> {
520 type Output = Result<(), Pool<T, E>>;
521
522 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
523 if self.inner.is_none() {
524 return Poll::Ready(Ok(()));
525 }
526
527 match Pin::new(&mut *self.receive).poll(cx) {
528 Poll::Ready(done) => {
529 if done.unwrap_or(false) {
530 Poll::Ready(Ok(()))
531 } else {
532 Poll::Ready(Err(Pool {
533 inner: self.inner.take().unwrap(),
534 }))
535 }
536 }
537 Poll::Pending => Poll::Pending,
538 }
539 }
540}
541
542