async_resource/pool/
config.rs

1use std::time::Duration;
2
3use futures_util::future::TryFuture;
4
5use super::{
6    default_executor, resource_create, resource_verify, DisposeFn, ErrorFn, Executor, ReleaseFn,
7    ResourceInfo, ResourceOperation,
8};
9use super::{Pool, PoolInternal};
10
11pub struct PoolConfig<T: Send, E> {
12    acquire_timeout: Option<Duration>,
13    executor: Option<Box<dyn Executor>>,
14    handle_error: Option<ErrorFn<E>>,
15    idle_timeout: Option<Duration>,
16    min_count: usize,
17    max_count: usize,
18    max_waiters: Option<usize>,
19    on_create: Box<dyn ResourceOperation<T, E> + Send + Sync>,
20    on_dispose: Option<DisposeFn<T>>,
21    on_release: Option<ReleaseFn<T>>,
22    on_verify: Option<Box<dyn ResourceOperation<T, E> + Send + Sync>>,
23}
24
25impl<T: Send, E> PoolConfig<T, E> {
26    pub fn new<C, F>(create: C) -> Self
27    where
28        C: Fn() -> F + Send + Sync + 'static,
29        F: TryFuture<Ok = T, Error = E> + Send + 'static,
30        T: Send + 'static,
31        E: 'static,
32    {
33        Self {
34            acquire_timeout: None,
35            executor: None,
36            handle_error: None,
37            idle_timeout: None,
38            min_count: 0,
39            max_count: 0,
40            max_waiters: None,
41            on_create: Box::new(resource_create(create)),
42            on_dispose: None,
43            on_release: None,
44            on_verify: None,
45        }
46    }
47
48    pub fn acquire_timeout(mut self, val: Duration) -> Self {
49        if val.as_micros() > 0 {
50            self.acquire_timeout.replace(val);
51        } else {
52            self.acquire_timeout.take();
53        }
54        self
55    }
56
57    pub fn dispose<F>(mut self, dispose: F) -> Self
58    where
59        F: Fn(T, ResourceInfo) -> () + Send + Sync + 'static,
60    {
61        self.on_dispose.replace(Box::new(dispose));
62        self
63    }
64
65    pub fn handle_error<F>(mut self, handler: F) -> Self
66    where
67        F: Fn(E) + Send + Sync + 'static,
68    {
69        self.handle_error.replace(Box::new(handler));
70        self
71    }
72
73    pub fn idle_timeout(mut self, val: Duration) -> Self {
74        if val.as_micros() > 0 {
75            self.idle_timeout.replace(val);
76        } else {
77            self.idle_timeout.take();
78        }
79        self
80    }
81
82    pub fn verify<V, F>(mut self, verify: V) -> Self
83    where
84        V: Fn(&mut T, ResourceInfo) -> F + Send + Sync + 'static,
85        F: TryFuture<Ok = Option<T>, Error = E> + Send + 'static,
86        T: Send + 'static,
87        E: 'static,
88    {
89        self.on_verify.replace(Box::new(resource_verify(verify)));
90        self
91    }
92
93    pub fn max_count(mut self, val: usize) -> Self {
94        self.max_count = val;
95        self
96    }
97
98    pub fn max_waiters(mut self, val: usize) -> Self {
99        self.max_waiters.replace(val);
100        self
101    }
102
103    pub fn min_count(mut self, val: usize) -> Self {
104        self.min_count = val;
105        self
106    }
107
108    pub fn release<F>(mut self, release: F) -> Self
109    where
110        F: Fn(&mut T, ResourceInfo) -> bool + Send + Sync + 'static,
111    {
112        self.on_release.replace(Box::new(release));
113        self
114    }
115
116    pub fn build(self) -> Pool<T, E> {
117        let inner = PoolInternal::new(
118            self.acquire_timeout,
119            self.on_create,
120            self.executor.unwrap_or_else(default_executor),
121            self.handle_error,
122            self.idle_timeout,
123            self.min_count,
124            self.max_count,
125            self.max_waiters,
126            self.on_dispose,
127            self.on_release,
128            self.on_verify,
129        );
130        Pool::new(inner)
131        // let exec = Executor::new(self.thread_count.unwrap_or(1));
132        // Pool::new(queue, mgr, exec)
133    }
134}