1mod config;
32mod errors;
33
34use std::{
35 convert::TryInto,
36 ops::{Deref, DerefMut},
37 sync::{
38 atomic::{AtomicIsize, AtomicUsize, Ordering},
39 Arc, Mutex, Weak,
40 },
41 time::Duration,
42};
43
44use tokio::sync::{Semaphore, TryAcquireError};
45
46pub use crate::Status;
47
48pub use self::{config::PoolConfig, errors::PoolError};
49
50#[derive(Debug)]
56#[must_use]
57pub struct Object<T> {
58 obj: Option<T>,
60
61 pool: Weak<PoolInner<T>>,
63}
64
65impl<T> Object<T> {
66 #[must_use]
70 pub fn take(mut this: Self) -> T {
71 if let Some(pool) = this.pool.upgrade() {
72 let _ = pool.size.fetch_sub(1, Ordering::Relaxed);
73 pool.size_semaphore.add_permits(1);
74 }
75 this.obj.take().unwrap()
76 }
77}
78
79impl<T> Drop for Object<T> {
80 fn drop(&mut self) {
81 if let Some(obj) = self.obj.take() {
82 if let Some(pool) = self.pool.upgrade() {
83 {
84 let mut queue = pool.queue.lock().unwrap();
85 queue.push(obj);
86 }
87 let _ = pool.available.fetch_add(1, Ordering::Relaxed);
88 pool.semaphore.add_permits(1);
89 pool.clean_up();
90 }
91 }
92 }
93}
94
95impl<T> Deref for Object<T> {
96 type Target = T;
97 fn deref(&self) -> &T {
98 self.obj.as_ref().unwrap()
99 }
100}
101
102impl<T> DerefMut for Object<T> {
103 fn deref_mut(&mut self) -> &mut T {
104 self.obj.as_mut().unwrap()
105 }
106}
107
108impl<T> AsRef<T> for Object<T> {
109 fn as_ref(&self) -> &T {
110 self
111 }
112}
113
114impl<T> AsMut<T> for Object<T> {
115 fn as_mut(&mut self) -> &mut T {
116 self
117 }
118}
119
120#[derive(Debug)]
133pub struct Pool<T> {
134 inner: Arc<PoolInner<T>>,
135}
136
137impl<T> Clone for Pool<T> {
138 fn clone(&self) -> Self {
139 Self {
140 inner: self.inner.clone(),
141 }
142 }
143}
144
145impl<T> Default for Pool<T> {
146 fn default() -> Self {
147 Self::from_config(&PoolConfig::default())
148 }
149}
150
151impl<T> Pool<T> {
152 #[must_use]
154 pub fn new(max_size: usize) -> Self {
155 Self::from_config(&PoolConfig::new(max_size))
156 }
157
158 #[must_use]
160 pub fn from_config(config: &PoolConfig) -> Self {
161 Self {
162 inner: Arc::new(PoolInner {
163 config: *config,
164 queue: Mutex::new(Vec::with_capacity(config.max_size)),
165 size: AtomicUsize::new(0),
166 size_semaphore: Semaphore::new(config.max_size),
167 available: AtomicIsize::new(0),
168 semaphore: Semaphore::new(0),
169 }),
170 }
171 }
172
173 pub async fn get(&self) -> Result<Object<T>, PoolError> {
180 self.timeout_get(self.inner.config.timeout).await
181 }
182
183 pub fn try_get(&self) -> Result<Object<T>, PoolError> {
191 let inner = self.inner.as_ref();
192 let permit = inner.semaphore.try_acquire().map_err(|e| match e {
193 TryAcquireError::NoPermits => PoolError::Timeout,
194 TryAcquireError::Closed => PoolError::Closed,
195 })?;
196 let obj = {
197 let mut queue = inner.queue.lock().unwrap();
198 queue.pop().unwrap()
199 };
200 permit.forget();
201 let _ = inner.available.fetch_sub(1, Ordering::Relaxed);
202 Ok(Object {
203 pool: Arc::downgrade(&self.inner),
204 obj: Some(obj),
205 })
206 }
207
208 pub async fn timeout_get(&self, timeout: Option<Duration>) -> Result<Object<T>, PoolError> {
215 let inner = self.inner.as_ref();
216 let permit = match (timeout, inner.config.runtime) {
217 (None, _) => inner
218 .semaphore
219 .acquire()
220 .await
221 .map_err(|_| PoolError::Closed),
222 (Some(timeout), _) if timeout.as_nanos() == 0 => {
223 inner.semaphore.try_acquire().map_err(|e| match e {
224 TryAcquireError::NoPermits => PoolError::Timeout,
225 TryAcquireError::Closed => PoolError::Closed,
226 })
227 }
228 (Some(timeout), Some(runtime)) => runtime
229 .timeout(timeout, inner.semaphore.acquire())
230 .await
231 .ok_or(PoolError::Timeout)?
232 .map_err(|_| PoolError::Closed),
233 (Some(_), None) => Err(PoolError::NoRuntimeSpecified),
234 }?;
235 let obj = {
236 let mut queue = inner.queue.lock().unwrap();
237 queue.pop().unwrap()
238 };
239 permit.forget();
240 let _ = inner.available.fetch_sub(1, Ordering::Relaxed);
241 Ok(Object {
242 pool: Arc::downgrade(&self.inner),
243 obj: Some(obj),
244 })
245 }
246
247 pub async fn add(&self, object: T) -> Result<(), (T, PoolError)> {
257 match self.inner.size_semaphore.acquire().await {
258 Ok(permit) => {
259 permit.forget();
260 self._add(object);
261 Ok(())
262 }
263 Err(_) => Err((object, PoolError::Closed)),
264 }
265 }
266
267 pub fn try_add(&self, object: T) -> Result<(), (T, PoolError)> {
275 match self.inner.size_semaphore.try_acquire() {
276 Ok(permit) => {
277 permit.forget();
278 self._add(object);
279 Ok(())
280 }
281 Err(e) => Err(match e {
282 TryAcquireError::NoPermits => (object, PoolError::Timeout),
283 TryAcquireError::Closed => (object, PoolError::Closed),
284 }),
285 }
286 }
287
288 fn _add(&self, object: T) {
294 let _ = self.inner.size.fetch_add(1, Ordering::Relaxed);
295 {
296 let mut queue = self.inner.queue.lock().unwrap();
297 queue.push(object);
298 }
299 let _ = self.inner.available.fetch_add(1, Ordering::Relaxed);
300 self.inner.semaphore.add_permits(1);
301 }
302
303 pub async fn remove(&self) -> Result<T, PoolError> {
305 self.get().await.map(Object::take)
306 }
307
308 pub fn try_remove(&self) -> Result<T, PoolError> {
310 self.try_get().map(Object::take)
311 }
312
313 pub async fn timeout_remove(&self, timeout: Option<Duration>) -> Result<T, PoolError> {
316 self.timeout_get(timeout).await.map(Object::take)
317 }
318
319 pub fn close(&self) {
324 self.inner.semaphore.close();
325 self.inner.size_semaphore.close();
326 self.inner.clear();
327 }
328
329 pub fn is_closed(&self) -> bool {
331 self.inner.is_closed()
332 }
333
334 #[must_use]
336 pub fn status(&self) -> Status {
337 let max_size = self.inner.config.max_size;
338 let size = self.inner.size.load(Ordering::Relaxed);
339 let available = self.inner.available.load(Ordering::Relaxed);
340 Status {
341 max_size,
342 size,
343 available: if available > 0 { available as usize } else { 0 },
344 waiting: if available < 0 {
345 (-available) as usize
346 } else {
347 0
348 },
349 }
350 }
351}
352
353#[derive(Debug)]
354struct PoolInner<T> {
355 config: PoolConfig,
356 queue: Mutex<Vec<T>>,
357 size: AtomicUsize,
358 size_semaphore: Semaphore,
363 available: AtomicIsize,
369 semaphore: Semaphore,
370}
371
372impl<T> PoolInner<T> {
373 fn clean_up(&self) {
379 if self.is_closed() {
380 self.clear();
381 }
382 }
383
384 fn clear(&self) {
386 let mut queue = self.queue.lock().unwrap();
387 let _ = self.size.fetch_sub(queue.len(), Ordering::Relaxed);
388 let _ = self
389 .available
390 .fetch_sub(queue.len() as isize, Ordering::Relaxed);
391 queue.clear();
392 }
393
394 fn is_closed(&self) -> bool {
396 matches!(
397 self.semaphore.try_acquire_many(0),
398 Err(TryAcquireError::Closed)
399 )
400 }
401}
402
403impl<T, I> From<I> for Pool<T>
404where
405 I: IntoIterator<Item = T>,
406 <I as IntoIterator>::IntoIter: ExactSizeIterator,
407{
408 fn from(iter: I) -> Self {
411 let queue = iter.into_iter().collect::<Vec<_>>();
412 let len = queue.len();
413 Self {
414 inner: Arc::new(PoolInner {
415 queue: Mutex::new(queue),
416 config: PoolConfig::new(len),
417 size: AtomicUsize::new(len),
418 size_semaphore: Semaphore::new(0),
419 available: AtomicIsize::new(len.try_into().unwrap()),
420 semaphore: Semaphore::new(len),
421 }),
422 }
423 }
424}