anchored_pool/
shared_unbounded.rs

1#![expect(unsafe_code, reason = "let unsafe code in Pools rely on PooledResource Drop impl")]
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6#[cfg(feature = "clone-behavior")]
7use clone_behavior::{MirroredClone, Speed};
8
9use crate::{
10    channel::{Receiver, Sender, unbounded_channel},
11    pooled_resource::{PooledResource, SealedPool},
12    other_utils::{ResetNothing, ResetResource},
13};
14
15
16/// A threadsafe resource pool with a growable number of `Resource`s.
17#[derive(Debug)]
18pub struct SharedUnboundedPool<Resource, Reset> {
19    pool_size:      Arc<AtomicUsize>,
20    sender:         Sender<Resource>,
21    receiver:       Receiver<Resource>,
22    reset_resource: Reset,
23}
24
25impl<Resource, Reset> SharedUnboundedPool<Resource, Reset> {
26    /// Create a new `SharedUnboundedPool`, which initially has zero `Resource`s.
27    ///
28    /// Whenever a `Resource` is returned to the pool, `reset_resource` is run on it first.
29    ///
30    /// # Potential Panics or Deadlocks
31    /// `reset_resource` must not call any method on the returned pool or any `Clone` or
32    /// `MirroredClone` of it; otherwise, a panic or deadlock may occur.
33    ///
34    /// Ideally, an `reset_resource` closure should not capture any [`SharedUnboundedPool`].
35    #[inline]
36    #[must_use]
37    pub fn new(reset_resource: Reset) -> Self
38    where
39        Reset: ResetResource<Resource> + Clone,
40    {
41        let (sender, receiver) = unbounded_channel();
42        Self {
43            pool_size: Arc::new(AtomicUsize::new(0)),
44            sender,
45            receiver,
46            reset_resource,
47        }
48    }
49}
50
51impl<Resource> SharedUnboundedPool<Resource, ResetNothing> {
52    /// Create a new `SharedUnboundedPool`, which initially has zero `Resource`s.
53    ///
54    /// When a `Resource` is returned to the pool, it is not reset in any way.
55    #[inline]
56    #[must_use]
57    pub fn new_without_reset() -> Self {
58        Self::new(ResetNothing)
59    }
60}
61
62impl<Resource, Reset> SharedUnboundedPool<Resource, Reset>
63where
64    Resource: Default,
65    Reset:    ResetResource<Resource> + Clone,
66{
67    /// Get a `Resource` from the pool, returning a default `Resource` if none were already
68    /// available in the pool.
69    #[inline]
70    #[must_use]
71    pub fn get_default(&self) -> PooledResource<Self, Resource> {
72        self.get(Resource::default)
73    }
74}
75
76impl<Resource, Reset: ResetResource<Resource> + Clone> SharedUnboundedPool<Resource, Reset> {
77    /// Get a `Resource` from the pool.
78    ///
79    /// # Potential Panics or Deadlocks
80    /// `init_resource` must not call any method on `self` or a `Clone` or `MirroredClone`
81    /// associated with `self`; otherwise, a panic or deadlock may occur.
82    ///
83    /// Ideally, an `init_resource` closure should not capture any [`SharedUnboundedPool`].
84    #[must_use]
85    pub fn get<F>(&self, init_resource: F) -> PooledResource<Self, Resource>
86    where
87        F: FnOnce() -> Resource,
88    {
89        #![expect(clippy::missing_panics_doc, reason = "false positive")]
90        #[expect(clippy::expect_used, reason = "works by inspection of the two channel impls")]
91        let resource = self.receiver
92            .try_recv()
93            .expect("channel is not closed; `self` has both a sender and receiver");
94
95        let resource = resource.unwrap_or_else(|| {
96            self.pool_size.fetch_add(1, Ordering::Relaxed);
97            init_resource()
98        });
99        let returner = (self.sender.clone(), self.reset_resource.clone());
100
101        // SAFETY:
102        // It's safe for the `PooledResource` to call `return_resource` however it
103        // likes, actually, and thus safe in the restricted guaranteed scenario.
104        unsafe { PooledResource::new(returner, resource) }
105    }
106
107    /// Get the total number of `Resource`s in this pool, whether available or in-use.
108    #[inline]
109    #[must_use]
110    pub fn pool_size(&self) -> usize {
111        self.pool_size.load(Ordering::Relaxed)
112    }
113
114    /// Get the number of `Resource`s in the pool which are not currently being used.
115    #[must_use]
116    pub fn available_resources(&self) -> usize {
117        self.receiver.len()
118    }
119}
120
121impl<Resource, Reset> SealedPool<Resource> for SharedUnboundedPool<Resource, Reset>
122where
123    Reset: ResetResource<Resource> + Clone,
124{
125    type Returner = (Sender<Resource>, Reset);
126
127    /// Used by [`PooledResource`] to return a `Resource` to a pool.
128    ///
129    /// # Safety
130    /// Must be called at most once in the `Drop` impl of a `PooledResource` constructed
131    /// via `PooledResource::new`, where `*returner` must be the `returner` value passed to
132    /// `PooledResource::new`.
133    unsafe fn return_resource(returner: &Self::Returner, mut resource: Resource) {
134        let (sender, reset_resource) = returner;
135
136        reset_resource.reset(&mut resource);
137        // If the pool already died, it's no issue to just drop the resource here.
138        let _err = sender.send(resource);
139        // eprintln!("{:?}", _err);
140    }
141}
142
143impl<Resource, Reset> Default for SharedUnboundedPool<Resource, Reset>
144where
145    Reset: ResetResource<Resource> + Clone + Default,
146{
147    #[inline]
148    fn default() -> Self {
149        Self::new(Reset::default())
150    }
151}
152
153impl<Resource, ResetResource: Clone> Clone for SharedUnboundedPool<Resource, ResetResource> {
154    #[inline]
155    fn clone(&self) -> Self {
156        Self {
157            pool_size:      Arc::clone(&self.pool_size),
158            sender:         self.sender.clone(),
159            receiver:       self.receiver.clone(),
160            reset_resource: self.reset_resource.clone(),
161        }
162    }
163
164    #[inline]
165    fn clone_from(&mut self, source: &Self) {
166        self.pool_size.clone_from(&source.pool_size);
167        self.sender.clone_from(&source.sender);
168        self.receiver.clone_from(&source.receiver);
169        self.reset_resource.clone_from(&source.reset_resource);
170    }
171}
172
173#[cfg(feature = "clone-behavior")]
174impl<Resource, ResetResource, S> MirroredClone<S> for SharedUnboundedPool<Resource, ResetResource>
175where
176    ResetResource: MirroredClone<S>,
177    S:             Speed,
178{
179    #[inline]
180    fn mirrored_clone(&self) -> Self {
181        Self {
182            pool_size:      Arc::clone(&self.pool_size),
183            sender:         self.sender.clone(),
184            receiver:       self.receiver.clone(),
185            reset_resource: self.reset_resource.mirrored_clone(),
186        }
187    }
188}
189
190
191#[cfg(all(test, not(tests_with_leaks)))]
192mod tests {
193    use std::{array, sync::mpsc, thread};
194    use super::*;
195
196
197    #[test]
198    fn zero_or_one_size() {
199        let pool: SharedUnboundedPool<(), ResetNothing> = SharedUnboundedPool::new_without_reset();
200        assert_eq!(pool.pool_size(), 0);
201        assert_eq!(pool.available_resources(), 0);
202
203        let unit = pool.get_default();
204        let _: &() = &unit;
205        assert_eq!(pool.pool_size(), 1);
206        assert_eq!(pool.available_resources(), 0);
207
208        drop(unit);
209        assert_eq!(pool.pool_size(), 1);
210        assert_eq!(pool.available_resources(), 1);
211    }
212
213    #[test]
214    fn init_and_reset() {
215        const SIZE: usize = 10;
216
217        let pool = SharedUnboundedPool::new(|int: &mut usize| *int = 1);
218        let integers: [_; SIZE] = array::from_fn(|_| pool.get(|| 1_usize));
219
220        for (idx, mut integer) in integers.into_iter().enumerate() {
221            assert_eq!(*integer, 1);
222            *integer = idx;
223            assert_eq!(*integer, idx);
224        }
225
226        // They've been reset to 1, and the new constructor is not used.
227        let integers: [_; SIZE] = array::from_fn(|_| pool.get(|| 2_usize));
228        for integer in integers {
229            assert_eq!(*integer, 1);
230        }
231    }
232
233    #[test]
234    fn no_reset() {
235        const SIZE: usize = 10;
236
237        let pool = SharedUnboundedPool::new(ResetNothing);
238        let integers: [_; SIZE] = array::from_fn(|_| pool.get(|| 1_usize));
239        for (idx, mut integer) in integers.into_iter().enumerate() {
240            assert_eq!(*integer, 1);
241            *integer = idx;
242            assert_eq!(*integer, idx);
243        }
244
245        // They haven't been reset, nor is the constructor used
246        let integers: [_; SIZE] = array::from_fn(|_| pool.get(|| 1_usize));
247        // This one is new.
248        assert_eq!(*pool.get(|| 11), 11);
249
250        for integer in integers {
251            assert!((0..SIZE).contains(&*integer));
252        }
253    }
254
255    #[test]
256    fn init_and_reset_disagreeing() {
257        let pool = SharedUnboundedPool::new(|int: &mut i32| *int = 2);
258        let first_int = pool.get_default();
259        assert_eq!(*first_int, 0);
260        let second_int = pool.get_default();
261        assert_eq!(*second_int, 0);
262        drop(first_int);
263        let mut reset_first_int = pool.get_default();
264        assert_eq!(*reset_first_int, 2);
265        *reset_first_int = 3;
266        assert_eq!(*reset_first_int, 3);
267        drop(reset_first_int);
268    }
269
270    #[test]
271    fn multithreaded_one_capacity() {
272        let pool: SharedUnboundedPool<i32, _> = SharedUnboundedPool::new_without_reset();
273
274        let cloned_pool = pool.clone();
275
276        assert_eq!(pool.pool_size(), 0);
277        assert_eq!(pool.available_resources(), 0);
278
279        let (signal_main, wait_for_thread) = mpsc::channel();
280        let (signal_thread, wait_for_main) = mpsc::channel();
281
282        thread::spawn(move || {
283            let mut int = cloned_pool.get_default();
284            signal_main.send(()).unwrap();
285            wait_for_main.recv().unwrap();
286            assert_eq!(*int, 0);
287            *int = 1;
288            drop(int);
289            signal_main.send(()).unwrap();
290        });
291
292        wait_for_thread.recv().unwrap();
293        assert_eq!(pool.pool_size(), 1);
294        assert_eq!(pool.available_resources(), 0);
295        signal_thread.send(()).unwrap();
296        wait_for_thread.recv().unwrap();
297        assert_eq!(pool.pool_size(), 1);
298        assert_eq!(pool.available_resources(), 1);
299        assert_eq!(*pool.get_default(), 1);
300    }
301}