anchored_pool/
shared_bounded.rs

1#![expect(unsafe_code, reason = "let unsafe code in Pools rely on PooledResource Drop impl")]
2
3#[cfg(feature = "clone-behavior")]
4use clone_behavior::{MirroredClone, Speed};
5
6use crate::{
7    channel::{bounded_channel, Receiver, Sender},
8    other_utils::{ResetNothing, ResetResource, ResourcePoolEmpty},
9    pooled_resource::{PooledResource, SealedPool},
10};
11
12
13/// A threadsafe resource pool with a fixed number of `Resource`s.
14#[derive(Debug)]
15pub struct SharedBoundedPool<Resource, Reset> {
16    sender:         Sender<Resource>,
17    receiver:       Receiver<Resource>,
18    reset_resource: Reset,
19}
20
21impl<Resource, Reset> SharedBoundedPool<Resource, Reset> {
22    /// Create a new `SharedBoundedPool` which has the indicated, fixed number of `Resource`s.
23    ///
24    /// Each `Resource` is immediately initialized, using the provided function.
25    ///
26    /// Whenever a `Resource` is returned to the pool, `reset_resource` is run on it first.
27    #[inline]
28    #[must_use]
29    pub fn new<F>(pool_size: usize, mut init_resource: F, reset_resource: Reset) -> Self
30    where
31        F: FnMut() -> Resource,
32        Reset: ResetResource<Resource> + Clone,
33    {
34        #![expect(clippy::missing_panics_doc, reason = "false positive")]
35
36        let (sender, receiver) = bounded_channel(pool_size);
37        for _ in 0..pool_size {
38            #[expect(clippy::expect_used, reason = "works by inspection of the two channel impls")]
39            sender.send(init_resource()).expect("channel is not yet closed");
40        }
41
42        Self { sender, receiver, reset_resource }
43    }
44}
45
46impl<Resource: Default, Reset> SharedBoundedPool<Resource, Reset> {
47    /// Create a new `SharedBoundedPool` which has the indicated, fixed number of `Resource`s.
48    ///
49    /// Each `Resource` is immediately initialized to its default value.
50    ///
51    /// Whenever a `Resource` is returned to the pool, `reset_resource` is run on it first.
52    #[inline]
53    #[must_use]
54    pub fn new_default(pool_size: usize, reset_resource: Reset) -> Self
55    where
56        Reset: ResetResource<Resource> + Clone,
57    {
58        Self::new(pool_size, Resource::default, reset_resource)
59    }
60}
61
62impl<Resource> SharedBoundedPool<Resource, ResetNothing> {
63    /// Create a new `SharedBoundedPool` which has the indicated, fixed number of `Resource`s.
64    ///
65    /// Each `Resource` is immediately initialized, using the provided function.
66    ///
67    /// When a `Resource` is returned to the pool, it is not reset in any way.
68    #[inline]
69    #[must_use]
70    pub fn new_without_reset<F>(pool_size: usize, init_resource: F) -> Self
71    where
72        F: FnMut() -> Resource,
73    {
74        Self::new(pool_size, init_resource, ResetNothing)
75    }
76}
77
78impl<Resource: Default> SharedBoundedPool<Resource, ResetNothing> {
79    /// Create a new `SharedBoundedPool` which has the indicated, fixed number of `Resource`s.
80    ///
81    /// Each `Resource` is immediately initialized to its default value.
82    ///
83    /// When a `Resource` is returned to the pool, it is not reset in any way.
84    #[inline]
85    #[must_use]
86    pub fn new_default_without_reset(pool_size: usize) -> Self {
87        Self::new(pool_size, Resource::default, ResetNothing)
88    }
89}
90
91impl<Resource, Reset: ResetResource<Resource> + Clone> SharedBoundedPool<Resource, Reset> {
92    /// Get a `Resource` from the pool, if any are available.
93    pub fn try_get(&self) -> Result<PooledResource<Self, Resource>, ResourcePoolEmpty> {
94        #![expect(clippy::missing_panics_doc, reason = "false positive")]
95        #[expect(clippy::expect_used, reason = "works by inspection of the two channel impls")]
96        let resource = self.receiver
97            .try_recv()
98            .expect("channel is not closed; `self` has both a sender and receiver");
99
100        if let Some(resource) = resource {
101            let returner = (self.sender.clone(), self.reset_resource.clone());
102            // SAFETY:
103            // It's safe for the `PooledResource` to call `return_resource` however it
104            // likes, actually, and thus safe in the restricted guaranteed scenario.
105            Ok(unsafe { PooledResource::new(returner, resource) })
106        } else {
107            Err(ResourcePoolEmpty)
108        }
109    }
110
111    /// Get a `Resource` from the pool.
112    ///
113    /// May need to wait for a resource to become available.
114    ///
115    /// # Potential Panics or Deadlocks
116    /// If `self.pool_size() == 0`, then this method will panic or deadlock.
117    /// This method may also cause a deadlock if no `Resource`s are currently available, and the
118    /// current thread needs to make progress in order to release a `Resource`.
119    #[expect(clippy::missing_panics_doc, reason = "false positive")]
120    #[must_use]
121    pub fn get(&self) -> PooledResource<Self, Resource> {
122        #[expect(clippy::expect_used, reason = "works by inspection of the two channel impls")]
123        let resource = self.receiver
124            .recv()
125            .expect("channel is not closed; `self` has both a sender and receiver");
126        let returner = (self.sender.clone(), self.reset_resource.clone());
127
128        // SAFETY:
129        // It's safe for the `PooledResource` to call `return_resource` however it
130        // likes, actually, and thus safe in the restricted guaranteed scenario.
131        unsafe { PooledResource::new(returner, resource) }
132    }
133
134    /// Get the total number of `Resource`s in this pool, whether available or in-use.
135    #[inline]
136    #[must_use]
137    pub fn pool_size(&self) -> usize {
138        self.receiver.capacity()
139    }
140
141    /// Get the number of `Resource`s in the pool which are not currently being used.
142    #[must_use]
143    pub fn available_resources(&self) -> usize {
144        self.receiver.len()
145    }
146}
147
148impl<Resource, Reset> SealedPool<Resource> for SharedBoundedPool<Resource, Reset>
149where
150    Reset: ResetResource<Resource> + Clone,
151{
152    type Returner = (Sender<Resource>, Reset);
153
154    /// Used by [`PooledResource`] to return a `Resource` to a pool.
155    ///
156    /// # Safety
157    /// Must be called at most once in the `Drop` impl of a `PooledResource` constructed
158    /// via `PooledResource::new`, where `*returner` must be the `returner` value passed to
159    /// `PooledResource::new`.
160    unsafe fn return_resource(returner: &Self::Returner, mut resource: Resource) {
161        let (sender, reset_resource) = returner;
162
163        reset_resource.reset(&mut resource);
164        // If the pool already died, it's no issue to just drop the resource here.
165        let _err = sender.send(resource);
166    }
167}
168
169impl<Resource, ResetResource: Clone> Clone for SharedBoundedPool<Resource, ResetResource> {
170    #[inline]
171    fn clone(&self) -> Self {
172        Self {
173            sender:         self.sender.clone(),
174            receiver:       self.receiver.clone(),
175            reset_resource: self.reset_resource.clone(),
176        }
177    }
178
179    #[inline]
180    fn clone_from(&mut self, source: &Self) {
181        self.sender.clone_from(&source.sender);
182        self.receiver.clone_from(&source.receiver);
183        self.reset_resource.clone_from(&source.reset_resource);
184    }
185}
186
187#[cfg(feature = "clone-behavior")]
188impl<Resource, ResetResource, S> MirroredClone<S> for SharedBoundedPool<Resource, ResetResource>
189where
190    ResetResource: MirroredClone<S>,
191    S:             Speed,
192{
193    #[inline]
194    fn mirrored_clone(&self) -> Self {
195        Self {
196            sender:         self.sender.clone(),
197            receiver:       self.receiver.clone(),
198            reset_resource: self.reset_resource.mirrored_clone(),
199        }
200    }
201}
202
203
204#[cfg(all(test, not(tests_with_leaks)))]
205mod tests {
206    use std::{array, sync::mpsc, thread};
207    use super::*;
208
209
210    #[test]
211    fn zero_capacity() {
212        let pool: SharedBoundedPool<(), _> = SharedBoundedPool::new_default_without_reset(0);
213        assert_eq!(pool.pool_size(), 0);
214        assert_eq!(pool.available_resources(), 0);
215        assert!(pool.try_get().is_err());
216    }
217
218    #[test]
219    fn one_capacity() {
220        let pool: SharedBoundedPool<(), _> = SharedBoundedPool::new_default_without_reset(1);
221        let unit = pool.get();
222        assert_eq!(pool.pool_size(), 1);
223        assert_eq!(pool.available_resources(), 0);
224        assert!(pool.try_get().is_err());
225        drop(unit);
226        assert_eq!(pool.available_resources(), 1);
227    }
228
229    #[test]
230    fn init_and_reset() {
231        const CAPACITY: usize = 10;
232
233        let pool = SharedBoundedPool::new(CAPACITY, || 1_usize, |int: &mut usize| *int = 1);
234        let integers: [_; CAPACITY] = array::from_fn(|_| pool.get());
235        for (idx, mut integer) in integers.into_iter().enumerate() {
236            assert_eq!(*integer, 1);
237            *integer = idx;
238            assert_eq!(*integer, idx);
239        }
240
241        // They've been reset to 1
242        let integers: [_; CAPACITY] = array::from_fn(|_| pool.get());
243        for integer in integers {
244            assert_eq!(*integer, 1);
245        }
246    }
247
248    #[test]
249    fn no_reset() {
250        const CAPACITY: usize = 10;
251
252        let pool = SharedBoundedPool::new(CAPACITY, || 1_usize, ResetNothing);
253        let integers: [_; CAPACITY] = array::from_fn(|_| pool.get());
254        for (idx, mut integer) in integers.into_iter().enumerate() {
255            assert_eq!(*integer, 1);
256            *integer = idx;
257            assert_eq!(*integer, idx);
258        }
259
260        // They haven't been reset.
261        let integers: [_; CAPACITY] = array::from_fn(|_| pool.get());
262        for integer in integers {
263            assert!((0..CAPACITY).contains(&*integer));
264        }
265    }
266
267    #[test]
268    fn init_and_reset_disagreeing() {
269        let pool = SharedBoundedPool::new(2, || 1, |int: &mut i32| *int = 2);
270        let first_int = pool.get();
271        let second_int = pool.get();
272        assert_eq!(*first_int, 1);
273        assert_eq!(*second_int, 1);
274        drop(first_int);
275        let mut reset_first_int = pool.get();
276        assert_eq!(*reset_first_int, 2);
277        *reset_first_int = 3;
278        assert_eq!(*reset_first_int, 3);
279        drop(reset_first_int);
280    }
281
282    #[test]
283    fn multithreaded_one_capacity() {
284        let pool: SharedBoundedPool<i32, _> = SharedBoundedPool::new_default_without_reset(1);
285
286        let cloned_pool = pool.clone();
287
288        assert_eq!(pool.available_resources(), 1);
289
290        let (signal_main, wait_for_thread) = mpsc::channel();
291        let (signal_thread, wait_for_main) = mpsc::channel();
292
293        thread::spawn(move || {
294            let mut int = cloned_pool.get();
295            signal_main.send(()).unwrap();
296            wait_for_main.recv().unwrap();
297            assert_eq!(*int, 0);
298            *int = 1;
299            drop(int);
300            signal_main.send(()).unwrap();
301        });
302
303        wait_for_thread.recv().unwrap();
304        assert_eq!(pool.available_resources(), 0);
305        signal_thread.send(()).unwrap();
306        wait_for_thread.recv().unwrap();
307        assert_eq!(pool.available_resources(), 1);
308        assert_eq!(*pool.get(), 1);
309    }
310}