1#![expect(unsafe_code, reason = "let unsafe code in Pools rely on PooledResource Drop impl")]
2
3#[cfg(feature = "clone-behavior")]
4use clone_behavior::{MirroredClone, Speed};
5
6use crate::{
7 channel::{bounded_channel, Receiver, Sender},
8 other_utils::{ResetNothing, ResetResource, ResourcePoolEmpty},
9 pooled_resource::{PooledResource, SealedPool},
10};
11
12
13#[derive(Debug)]
15pub struct SharedBoundedPool<Resource, Reset> {
16 sender: Sender<Resource>,
17 receiver: Receiver<Resource>,
18 reset_resource: Reset,
19}
20
21impl<Resource, Reset> SharedBoundedPool<Resource, Reset> {
22 #[inline]
28 #[must_use]
29 pub fn new<F>(pool_size: usize, mut init_resource: F, reset_resource: Reset) -> Self
30 where
31 F: FnMut() -> Resource,
32 Reset: ResetResource<Resource> + Clone,
33 {
34 #![expect(clippy::missing_panics_doc, reason = "false positive")]
35
36 let (sender, receiver) = bounded_channel(pool_size);
37 for _ in 0..pool_size {
38 #[expect(clippy::expect_used, reason = "works by inspection of the two channel impls")]
39 sender.send(init_resource()).expect("channel is not yet closed");
40 }
41
42 Self { sender, receiver, reset_resource }
43 }
44}
45
46impl<Resource: Default, Reset> SharedBoundedPool<Resource, Reset> {
47 #[inline]
53 #[must_use]
54 pub fn new_default(pool_size: usize, reset_resource: Reset) -> Self
55 where
56 Reset: ResetResource<Resource> + Clone,
57 {
58 Self::new(pool_size, Resource::default, reset_resource)
59 }
60}
61
62impl<Resource> SharedBoundedPool<Resource, ResetNothing> {
63 #[inline]
69 #[must_use]
70 pub fn new_without_reset<F>(pool_size: usize, init_resource: F) -> Self
71 where
72 F: FnMut() -> Resource,
73 {
74 Self::new(pool_size, init_resource, ResetNothing)
75 }
76}
77
78impl<Resource: Default> SharedBoundedPool<Resource, ResetNothing> {
79 #[inline]
85 #[must_use]
86 pub fn new_default_without_reset(pool_size: usize) -> Self {
87 Self::new(pool_size, Resource::default, ResetNothing)
88 }
89}
90
91impl<Resource, Reset: ResetResource<Resource> + Clone> SharedBoundedPool<Resource, Reset> {
92 pub fn try_get(&self) -> Result<PooledResource<Self, Resource>, ResourcePoolEmpty> {
94 #![expect(clippy::missing_panics_doc, reason = "false positive")]
95 #[expect(clippy::expect_used, reason = "works by inspection of the two channel impls")]
96 let resource = self.receiver
97 .try_recv()
98 .expect("channel is not closed; `self` has both a sender and receiver");
99
100 if let Some(resource) = resource {
101 let returner = (self.sender.clone(), self.reset_resource.clone());
102 Ok(unsafe { PooledResource::new(returner, resource) })
106 } else {
107 Err(ResourcePoolEmpty)
108 }
109 }
110
111 #[expect(clippy::missing_panics_doc, reason = "false positive")]
120 #[must_use]
121 pub fn get(&self) -> PooledResource<Self, Resource> {
122 #[expect(clippy::expect_used, reason = "works by inspection of the two channel impls")]
123 let resource = self.receiver
124 .recv()
125 .expect("channel is not closed; `self` has both a sender and receiver");
126 let returner = (self.sender.clone(), self.reset_resource.clone());
127
128 unsafe { PooledResource::new(returner, resource) }
132 }
133
134 #[inline]
136 #[must_use]
137 pub fn pool_size(&self) -> usize {
138 self.receiver.capacity()
139 }
140
141 #[must_use]
143 pub fn available_resources(&self) -> usize {
144 self.receiver.len()
145 }
146}
147
148impl<Resource, Reset> SealedPool<Resource> for SharedBoundedPool<Resource, Reset>
149where
150 Reset: ResetResource<Resource> + Clone,
151{
152 type Returner = (Sender<Resource>, Reset);
153
154 unsafe fn return_resource(returner: &Self::Returner, mut resource: Resource) {
161 let (sender, reset_resource) = returner;
162
163 reset_resource.reset(&mut resource);
164 let _err = sender.send(resource);
166 }
167}
168
169impl<Resource, ResetResource: Clone> Clone for SharedBoundedPool<Resource, ResetResource> {
170 #[inline]
171 fn clone(&self) -> Self {
172 Self {
173 sender: self.sender.clone(),
174 receiver: self.receiver.clone(),
175 reset_resource: self.reset_resource.clone(),
176 }
177 }
178
179 #[inline]
180 fn clone_from(&mut self, source: &Self) {
181 self.sender.clone_from(&source.sender);
182 self.receiver.clone_from(&source.receiver);
183 self.reset_resource.clone_from(&source.reset_resource);
184 }
185}
186
187#[cfg(feature = "clone-behavior")]
188impl<Resource, ResetResource, S> MirroredClone<S> for SharedBoundedPool<Resource, ResetResource>
189where
190 ResetResource: MirroredClone<S>,
191 S: Speed,
192{
193 #[inline]
194 fn mirrored_clone(&self) -> Self {
195 Self {
196 sender: self.sender.clone(),
197 receiver: self.receiver.clone(),
198 reset_resource: self.reset_resource.mirrored_clone(),
199 }
200 }
201}
202
203
204#[cfg(all(test, not(tests_with_leaks)))]
205mod tests {
206 use std::{array, sync::mpsc, thread};
207 use super::*;
208
209
210 #[test]
211 fn zero_capacity() {
212 let pool: SharedBoundedPool<(), _> = SharedBoundedPool::new_default_without_reset(0);
213 assert_eq!(pool.pool_size(), 0);
214 assert_eq!(pool.available_resources(), 0);
215 assert!(pool.try_get().is_err());
216 }
217
218 #[test]
219 fn one_capacity() {
220 let pool: SharedBoundedPool<(), _> = SharedBoundedPool::new_default_without_reset(1);
221 let unit = pool.get();
222 assert_eq!(pool.pool_size(), 1);
223 assert_eq!(pool.available_resources(), 0);
224 assert!(pool.try_get().is_err());
225 drop(unit);
226 assert_eq!(pool.available_resources(), 1);
227 }
228
229 #[test]
230 fn init_and_reset() {
231 const CAPACITY: usize = 10;
232
233 let pool = SharedBoundedPool::new(CAPACITY, || 1_usize, |int: &mut usize| *int = 1);
234 let integers: [_; CAPACITY] = array::from_fn(|_| pool.get());
235 for (idx, mut integer) in integers.into_iter().enumerate() {
236 assert_eq!(*integer, 1);
237 *integer = idx;
238 assert_eq!(*integer, idx);
239 }
240
241 let integers: [_; CAPACITY] = array::from_fn(|_| pool.get());
243 for integer in integers {
244 assert_eq!(*integer, 1);
245 }
246 }
247
248 #[test]
249 fn no_reset() {
250 const CAPACITY: usize = 10;
251
252 let pool = SharedBoundedPool::new(CAPACITY, || 1_usize, ResetNothing);
253 let integers: [_; CAPACITY] = array::from_fn(|_| pool.get());
254 for (idx, mut integer) in integers.into_iter().enumerate() {
255 assert_eq!(*integer, 1);
256 *integer = idx;
257 assert_eq!(*integer, idx);
258 }
259
260 let integers: [_; CAPACITY] = array::from_fn(|_| pool.get());
262 for integer in integers {
263 assert!((0..CAPACITY).contains(&*integer));
264 }
265 }
266
267 #[test]
268 fn init_and_reset_disagreeing() {
269 let pool = SharedBoundedPool::new(2, || 1, |int: &mut i32| *int = 2);
270 let first_int = pool.get();
271 let second_int = pool.get();
272 assert_eq!(*first_int, 1);
273 assert_eq!(*second_int, 1);
274 drop(first_int);
275 let mut reset_first_int = pool.get();
276 assert_eq!(*reset_first_int, 2);
277 *reset_first_int = 3;
278 assert_eq!(*reset_first_int, 3);
279 drop(reset_first_int);
280 }
281
282 #[test]
283 fn multithreaded_one_capacity() {
284 let pool: SharedBoundedPool<i32, _> = SharedBoundedPool::new_default_without_reset(1);
285
286 let cloned_pool = pool.clone();
287
288 assert_eq!(pool.available_resources(), 1);
289
290 let (signal_main, wait_for_thread) = mpsc::channel();
291 let (signal_thread, wait_for_main) = mpsc::channel();
292
293 thread::spawn(move || {
294 let mut int = cloned_pool.get();
295 signal_main.send(()).unwrap();
296 wait_for_main.recv().unwrap();
297 assert_eq!(*int, 0);
298 *int = 1;
299 drop(int);
300 signal_main.send(()).unwrap();
301 });
302
303 wait_for_thread.recv().unwrap();
304 assert_eq!(pool.available_resources(), 0);
305 signal_thread.send(()).unwrap();
306 wait_for_thread.recv().unwrap();
307 assert_eq!(pool.available_resources(), 1);
308 assert_eq!(*pool.get(), 1);
309 }
310}