1#![cfg_attr(not(test), deny(warnings, clippy::all, clippy::pedantic, clippy::cargo))]
19#![allow(clippy::single_match_else)]
20#![deny(missing_docs)]
21#![forbid(unsafe_code)]
22use self::wrapper::Wrapper;
23use core::future::Future;
24use core::iter::FromIterator;
25use core::ops::{Deref, DerefMut};
26use std::sync::Arc;
27
28#[cfg(feature = "async")]
29pub use async_::{AsyncLease, PoolStream};
30pub use init::InitPool;
31use parking_lot::lock_api::ArcMutexGuard;
32use parking_lot::RawMutex;
33
34#[cfg(feature = "async")]
35mod async_;
36pub mod init;
37mod wrapper;
38
39#[must_use]
44pub struct Pool<T> {
45 inner: Arc<PoolInner<T>>,
46}
47
48struct PoolInner<T> {
49 buffer: lockfree::set::Set<Wrapper<T>>,
50 #[cfg(feature = "async")]
51 waiting_futures: Arc<async_::WaitingFutures<T>>,
52}
53
54impl<T> Default for PoolInner<T> {
55 #[inline]
56 fn default() -> Self {
57 Self {
58 buffer: lockfree::set::Set::default(),
59 #[cfg(feature = "async")]
60 waiting_futures: Arc::default(),
61 }
62 }
63}
64
65impl<T: Send + Sync + 'static> Pool<T> {
66 pub fn into_init_pool<I: init::Init>(self, init: I) -> InitPool<T, I> {
68 InitPool::new_from_pool(self, init)
69 }
70}
71
72impl<T> Pool<T> {
73 pub fn try_into_locked_pool(mut self) -> Result<LockedPool<T>, (Pool<T>, PoolConversionError)> {
78 let len = self.len();
79 if len == 0 {
80 return Err((self, PoolConversionError::EmptyPool));
81 }
82 let Some(_) = Arc::get_mut(&mut self.inner) else {
87 let count = Arc::strong_count(&self.inner) - 1;
88 return Err((self, PoolConversionError::OtherCopies { count }));
89 };
90
91 Ok(LockedPool { pool: self, len })
92 }
93
94 #[inline]
96 pub fn new() -> Self {
97 Self::default()
98 }
99
100 #[inline]
102 pub fn with_initial_size(pool_size: usize, mut init: impl FnMut() -> T) -> Self {
103 (0..pool_size).map(|_| init()).collect()
104 }
105
106 #[inline]
111 pub fn try_with_initial_size<E>(pool_size: usize, mut init: impl FnMut() -> Result<T, E>) -> Result<Self, E> {
112 let buffer = lockfree::set::Set::new();
113 for _ in 0..pool_size {
114 buffer
115 .insert(Wrapper::new(init()?))
116 .unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
117 }
118 Ok(Self {
119 inner: Arc::new(PoolInner {
120 buffer,
121 #[cfg(feature = "async")]
122 waiting_futures: Arc::default(),
123 }),
124 })
125 }
126
127 #[must_use]
131 #[inline]
132 pub fn try_get(&self) -> Option<Lease<T>> {
133 self.inner.buffer.iter().find_map(|wrapper| Lease::from_arc_mutex(&wrapper, self))
134 }
135
136 #[inline]
137 fn try_get_or_len(&self) -> Result<Lease<T>, usize> {
138 let mut count = 0;
139 let lease = self
140 .inner
141 .buffer
142 .iter()
143 .inspect(|_| count += 1)
144 .find_map(|wrapper| Lease::from_arc_mutex(&wrapper, self));
145 lease.ok_or(count)
146 }
147
148 #[cfg(feature = "async")]
152 #[inline]
153 pub async fn get(&self) -> Lease<T> {
154 self.get_async().await
155 }
156
157 #[cfg(feature = "async")]
159 #[inline]
160 fn get_async(&self) -> AsyncLease<T> {
161 let (sender, receiver) = futures_channel::oneshot::channel();
162 self.inner.waiting_futures.insert(sender);
163 if let Some(lease) = self.try_get() {
165 self.inner.waiting_futures.wake_next(lease);
166 }
167 AsyncLease::<T>::new(receiver)
168 }
169
170 #[cfg(feature = "async")]
172 #[inline]
173 pub fn stream(&self) -> impl futures_core::Stream<Item = Lease<T>> {
174 PoolStream::new(self)
175 }
176
177 #[inline]
183 pub fn try_get_or_new(&self, init: impl FnOnce() -> T) -> Lease<T> {
184 self.try_get().unwrap_or_else(|| self.insert_with_lease(init()))
185 }
186
187 #[inline]
193 pub async fn get_or_new<FUT: Future<Output = T>, FN: FnOnce() -> FUT>(&self, init: FN) -> Lease<T> {
194 match self.try_get() {
195 Some(lease) => lease,
196 None => self.insert_with_lease(init().await),
197 }
198 }
199
200 #[inline]
209 pub fn try_get_or_try_new<E>(&self, init: impl FnOnce() -> Result<T, E>) -> Result<Lease<T>, E> {
210 match self.try_get() {
211 Some(l) => Ok(l),
212 None => Ok(self.insert_with_lease(init()?)),
213 }
214 }
215
216 #[inline]
225 pub async fn get_or_try_new<E, FUT: Future<Output = Result<T, E>>, FN: FnOnce() -> FUT>(&self, init: FN) -> Result<Lease<T>, E> {
226 match self.try_get() {
227 None => Ok(self.insert_with_lease(init().await?)),
228 Some(l) => Ok(l),
229 }
230 }
231
232 #[inline]
236 pub fn try_get_or_new_with_cap(&self, cap: usize, init: impl FnOnce() -> T) -> Option<Lease<T>> {
237 match self.try_get_or_len() {
238 Ok(t) => Some(t),
239 Err(len) => (len < cap).then(|| self.insert_with_lease(init())),
240 }
241 }
242
243 #[cfg(feature = "async")]
247 #[inline]
248 pub async fn get_or_new_with_cap<FUT: Future<Output = T>, FN: FnOnce() -> FUT>(&self, cap: usize, init: FN) -> Lease<T> {
249 match self.try_get_or_len() {
250 Ok(t) => t,
251 Err(len) => {
252 if len >= cap {
253 return self.get().await;
254 }
255 self.insert_with_lease(init().await)
256 }
257 }
258 }
259
260 #[inline]
267 pub fn try_get_or_try_new_with_cap<E>(&self, cap: usize, init: impl FnOnce() -> Result<T, E>) -> Result<Option<Lease<T>>, E> {
268 match self.try_get_or_len() {
269 Ok(t) => Ok(Some(t)),
270 Err(len) => {
271 if len >= cap {
272 return Ok(None);
273 }
274 Ok(Some(self.insert_with_lease(init()?)))
275 }
276 }
277 }
278
279 #[cfg(feature = "async")]
286 #[inline]
287 pub async fn get_or_try_new_with_cap<E, FUT: Future<Output = Result<T, E>>, FN: FnOnce() -> FUT>(
288 &self,
289 cap: usize,
290 init: FN,
291 ) -> Result<Lease<T>, E> {
292 match self.try_get_or_len() {
293 Ok(t) => Ok(t),
294 Err(len) => {
295 if len >= cap {
296 return Ok(self.get().await);
297 }
298 Ok(self.insert_with_lease(init().await?))
299 }
300 }
301 }
302
303 #[inline]
305 #[must_use]
306 pub fn len(&self) -> usize {
307 self.inner.buffer.iter().count()
308 }
309
310 #[inline]
315 pub fn clear(&self) {
316 self.inner.buffer.iter().for_each(|g| {
317 let wrapper: &Wrapper<_> = &g;
318 self.inner.buffer.remove(wrapper);
319 });
320 }
321
322 #[inline]
326 pub fn resize(&self, pool_size: usize, mut init: impl FnMut() -> T) {
327 let set = &self.inner.buffer;
328 self.inner.buffer.iter().skip(pool_size).for_each(|g| {
329 self.inner.buffer.remove(&*g);
330 });
331 set.extend((self.len()..pool_size).map(|_| Wrapper::new(init())));
332 }
333
334 #[inline]
341 pub fn try_resize<E>(&self, pool_size: usize, mut init: impl FnMut() -> Result<T, E>) -> Result<(), E> {
342 let set = &self.inner.buffer;
343 set.iter().skip(pool_size).for_each(|g| {
344 set.remove(&*g);
345 });
346 for _ in self.len()..pool_size {
347 set
348 .insert(Wrapper::new(init()?))
349 .unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
350 }
351 Ok(())
352 }
353
354 #[inline]
356 pub fn extend<I: IntoIterator<Item = T>>(&self, iter: I) {
357 self.inner.buffer.extend(iter.into_iter().map(Wrapper::new));
358 }
359
360 #[inline]
363 pub fn insert(&self, t: T) {
364 let lease = self.insert_with_lease(t);
365 self.notify(lease);
366 }
367
368 #[inline]
371 pub fn insert_with_lease(&self, t: T) -> Lease<T> {
372 let wrapper = Wrapper::new(t);
373 let lease = Lease::from_arc_mutex(&wrapper, self).unwrap_or_else(|| unreachable!("Wrapper is unlocked when new"));
374 self
375 .inner
376 .buffer
377 .insert(wrapper)
378 .unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
379 lease
380 }
381
382 #[must_use]
385 #[inline]
386 pub fn available(&self) -> usize {
387 self.inner.buffer.iter().filter(|b| !b.is_locked()).count()
388 }
389
390 #[must_use]
392 #[inline]
393 pub fn is_empty(&self) -> bool {
394 self.inner.buffer.iter().next().is_none()
395 }
396
397 #[inline]
399 pub fn disassociate(&self, lease: &Lease<T>) {
400 self.inner.buffer.remove(&Wrapper(ArcMutexGuard::mutex(lease.guard()).clone()));
403 }
404
405 #[cfg_attr(not(feature = "async"), allow(clippy::unused_self))]
406 #[inline]
407 fn notify(&self, lease: Lease<T>) {
408 #[cfg(feature = "async")]
409 self.inner.waiting_futures.wake_next(lease);
410 #[cfg(not(feature = "async"))]
411 drop(lease);
412 }
413}
414
415impl<T: Default> Pool<T> {
416 #[inline]
418 pub fn get_or_default(&self) -> Lease<T> {
419 self.try_get_or_new(T::default)
420 }
421
422 #[must_use]
424 #[inline]
425 pub fn get_or_default_with_cap(&self, cap: usize) -> Option<Lease<T>> {
426 self.try_get_or_new_with_cap(cap, T::default)
427 }
428
429 #[inline]
431 pub fn resize_default(&self, pool_size: usize) {
432 self.resize(pool_size, T::default);
433 }
434}
435
436impl<T> Default for Pool<T> {
437 fn default() -> Self {
438 Self {
439 inner: Arc::new(PoolInner::default()),
440 }
441 }
442}
443
444impl<T> core::fmt::Debug for Pool<T> {
445 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
446 struct ListDebugger<'a, T> {
447 set: &'a lockfree::set::Set<Wrapper<T>>,
448 }
449
450 impl<T> core::fmt::Debug for ListDebugger<'_, T> {
451 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
452 f.debug_list().entries(self.set.iter().map(|m| !m.is_locked())).finish()
453 }
454 }
455
456 let mut s = f.debug_struct("Pool");
457 s.field("len", &self.len())
458 .field("available", &self.available())
459 .field("availabilities", &ListDebugger { set: &self.inner.buffer })
460 .finish()
461 }
462}
463
464impl<T> Clone for Pool<T> {
465 fn clone(&self) -> Self {
466 Self { inner: self.inner.clone() }
467 }
468}
469
470impl<T> FromIterator<T> for Pool<T> {
471 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
472 Self {
473 inner: Arc::new(PoolInner {
474 buffer: iter.into_iter().map(Wrapper::new).collect(),
475 #[cfg(feature = "async")]
476 waiting_futures: Arc::default(),
477 }),
478 }
479 }
480}
481
482#[derive(Default, Clone)]
487pub struct LockedPool<T> {
488 pool: Pool<T>,
489 len: usize,
490}
491
492impl<T> LockedPool<T> {
493 pub fn try_into_pool(mut self) -> Result<Pool<T>, (LockedPool<T>, PoolConversionError)> {
498 let Some(_) = Arc::get_mut(&mut self.pool.inner) else {
499 let count = Arc::strong_count(&self.pool.inner) - 1;
500 return Err((self, PoolConversionError::OtherCopies { count }));
501 };
502
503 Ok(self.pool)
504 }
505 #[must_use]
509 #[inline]
510 pub fn try_get(&self) -> Option<Lease<T>> {
511 self.pool.try_get()
512 }
513
514 #[cfg(feature = "async")]
518 #[inline]
519 pub async fn get(&self) -> Lease<T> {
520 self.pool.get().await
521 }
522
523 #[cfg(feature = "async")]
525 #[inline]
526 pub fn stream(&self) -> impl futures_core::Stream<Item = Lease<T>> {
527 self.pool.stream()
528 }
529
530 #[inline]
532 #[must_use]
533 pub fn len(&self) -> usize {
534 self.len
535 }
536
537 #[must_use]
539 #[inline]
540 pub fn is_empty(&self) -> bool {
541 self.len == 0
542 }
543
544 #[must_use]
547 #[inline]
548 pub fn available(&self) -> usize {
549 self.pool.available()
550 }
551}
552
553impl<T> core::fmt::Debug for LockedPool<T> {
554 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
555 self.pool.fmt(f)
556 }
557}
558
559#[derive(Debug, Clone, Copy, PartialEq, Eq)]
561pub enum PoolConversionError {
562 EmptyPool,
564 OtherCopies {
571 count: usize,
573 },
574}
575
576impl core::fmt::Display for PoolConversionError {
577 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
578 match self {
579 Self::EmptyPool => f.write_str("Pool is empty"),
580 Self::OtherCopies { count } => write!(f, "Pool has {count} other copies"),
581 }
583 }
584}
585
586impl std::error::Error for PoolConversionError {}
587
588#[must_use]
598pub struct Lease<T> {
599 guard: Option<ArcMutexGuard<RawMutex, T>>,
601 #[cfg(feature = "async")]
602 waiting_futures: Arc<async_::WaitingFutures<T>>,
603}
604
605impl<T> Drop for Lease<T> {
606 fn drop(&mut self) {
607 #[cfg(feature = "async")]
608 {
609 if let Some(guard) = self.guard.take() {
610 let lease = Self {
611 guard: Some(guard),
612 waiting_futures: self.waiting_futures.clone(),
613 };
614 self.waiting_futures.wake_next(lease);
615 }
616 }
617 #[cfg(not(feature = "async"))]
618 {
619 self.guard.take();
620 }
621 }
622}
623
624impl<T> Lease<T> {
625 #[inline]
626 fn from_arc_mutex(arc: &Wrapper<T>, #[allow(unused)] pool: &Pool<T>) -> Option<Self> {
627 arc.0.try_lock_arc().map(|guard| Self {
628 guard: Some(guard),
629 #[cfg(feature = "async")]
630 waiting_futures: pool.inner.waiting_futures.clone(),
631 })
632 }
633 fn guard(&self) -> &ArcMutexGuard<RawMutex, T> {
634 self.guard.as_ref().unwrap()
635 }
636 fn guard_mut(&mut self) -> &mut ArcMutexGuard<RawMutex, T> {
637 self.guard.as_mut().unwrap()
638 }
639 #[cfg(feature = "async")]
640 fn drop_without_recursion(mut self) {
641 self.guard.take();
642 }
643}
644
645impl<T: core::fmt::Debug> core::fmt::Debug for Lease<T> {
646 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
647 self.deref().fmt(f)
648 }
649}
650
651impl<T> Deref for Lease<T> {
652 type Target = T;
653
654 fn deref(&self) -> &Self::Target {
655 self.guard()
656 }
657}
658
659impl<T> DerefMut for Lease<T> {
660 fn deref_mut(&mut self) -> &mut Self::Target {
661 self.guard_mut()
662 }
663}
664
665impl<T, U: ?Sized> AsRef<U> for Lease<T>
666where
667 T: AsRef<U>,
668{
669 fn as_ref(&self) -> &U {
670 self.deref().as_ref()
671 }
672}
673
674impl<T, U: ?Sized> AsMut<U> for Lease<T>
675where
676 T: AsMut<U>,
677{
678 fn as_mut(&mut self) -> &mut U {
679 self.deref_mut().as_mut()
680 }
681}
682
683#[allow(unused)]
684fn asserts() {
685 fn bytes<B: AsRef<[u8]> + AsMut<[u8]>>() {}
686 fn send_sync_static_clone<F: Send + 'static + Clone>() {}
687 bytes::<Lease<Vec<u8>>>();
688 send_sync_static_clone::<Pool<u8>>();
689 send_sync_static_clone::<init::InitPool<u8, init::InitFn<u8>>>();
690}