1#![expect(unsafe_code, reason = "let unsafe code in Pools rely on PooledResource Drop impl")]
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6#[cfg(feature = "clone-behavior")]
7use clone_behavior::{MirroredClone, Speed};
8
9use crate::{
10 channel::{Receiver, Sender, unbounded_channel},
11 pooled_resource::{PooledResource, SealedPool},
12 other_utils::{ResetNothing, ResetResource},
13};
14
15
16#[derive(Debug)]
18pub struct SharedUnboundedPool<Resource, Reset> {
19 pool_size: Arc<AtomicUsize>,
20 sender: Sender<Resource>,
21 receiver: Receiver<Resource>,
22 reset_resource: Reset,
23}
24
25impl<Resource, Reset> SharedUnboundedPool<Resource, Reset> {
26 #[inline]
36 #[must_use]
37 pub fn new(reset_resource: Reset) -> Self
38 where
39 Reset: ResetResource<Resource> + Clone,
40 {
41 let (sender, receiver) = unbounded_channel();
42 Self {
43 pool_size: Arc::new(AtomicUsize::new(0)),
44 sender,
45 receiver,
46 reset_resource,
47 }
48 }
49}
50
51impl<Resource> SharedUnboundedPool<Resource, ResetNothing> {
52 #[inline]
56 #[must_use]
57 pub fn new_without_reset() -> Self {
58 Self::new(ResetNothing)
59 }
60}
61
62impl<Resource, Reset> SharedUnboundedPool<Resource, Reset>
63where
64 Resource: Default,
65 Reset: ResetResource<Resource> + Clone,
66{
67 #[inline]
70 #[must_use]
71 pub fn get_default(&self) -> PooledResource<Self, Resource> {
72 self.get(Resource::default)
73 }
74}
75
76impl<Resource, Reset: ResetResource<Resource> + Clone> SharedUnboundedPool<Resource, Reset> {
77 #[must_use]
85 pub fn get<F>(&self, init_resource: F) -> PooledResource<Self, Resource>
86 where
87 F: FnOnce() -> Resource,
88 {
89 #![expect(clippy::missing_panics_doc, reason = "false positive")]
90 #[expect(clippy::expect_used, reason = "works by inspection of the two channel impls")]
91 let resource = self.receiver
92 .try_recv()
93 .expect("channel is not closed; `self` has both a sender and receiver");
94
95 let resource = resource.unwrap_or_else(|| {
96 self.pool_size.fetch_add(1, Ordering::Relaxed);
97 init_resource()
98 });
99 let returner = (self.sender.clone(), self.reset_resource.clone());
100
101 unsafe { PooledResource::new(returner, resource) }
105 }
106
107 #[inline]
109 #[must_use]
110 pub fn pool_size(&self) -> usize {
111 self.pool_size.load(Ordering::Relaxed)
112 }
113
114 #[must_use]
116 pub fn available_resources(&self) -> usize {
117 self.receiver.len()
118 }
119}
120
121impl<Resource, Reset> SealedPool<Resource> for SharedUnboundedPool<Resource, Reset>
122where
123 Reset: ResetResource<Resource> + Clone,
124{
125 type Returner = (Sender<Resource>, Reset);
126
127 unsafe fn return_resource(returner: &Self::Returner, mut resource: Resource) {
134 let (sender, reset_resource) = returner;
135
136 reset_resource.reset(&mut resource);
137 let _err = sender.send(resource);
139 }
141}
142
143impl<Resource, Reset> Default for SharedUnboundedPool<Resource, Reset>
144where
145 Reset: ResetResource<Resource> + Clone + Default,
146{
147 #[inline]
148 fn default() -> Self {
149 Self::new(Reset::default())
150 }
151}
152
153impl<Resource, ResetResource: Clone> Clone for SharedUnboundedPool<Resource, ResetResource> {
154 #[inline]
155 fn clone(&self) -> Self {
156 Self {
157 pool_size: Arc::clone(&self.pool_size),
158 sender: self.sender.clone(),
159 receiver: self.receiver.clone(),
160 reset_resource: self.reset_resource.clone(),
161 }
162 }
163
164 #[inline]
165 fn clone_from(&mut self, source: &Self) {
166 self.pool_size.clone_from(&source.pool_size);
167 self.sender.clone_from(&source.sender);
168 self.receiver.clone_from(&source.receiver);
169 self.reset_resource.clone_from(&source.reset_resource);
170 }
171}
172
173#[cfg(feature = "clone-behavior")]
174impl<Resource, ResetResource, S> MirroredClone<S> for SharedUnboundedPool<Resource, ResetResource>
175where
176 ResetResource: MirroredClone<S>,
177 S: Speed,
178{
179 #[inline]
180 fn mirrored_clone(&self) -> Self {
181 Self {
182 pool_size: Arc::clone(&self.pool_size),
183 sender: self.sender.clone(),
184 receiver: self.receiver.clone(),
185 reset_resource: self.reset_resource.mirrored_clone(),
186 }
187 }
188}
189
190
191#[cfg(all(test, not(tests_with_leaks)))]
192mod tests {
193 use std::{array, sync::mpsc, thread};
194 use super::*;
195
196
197 #[test]
198 fn zero_or_one_size() {
199 let pool: SharedUnboundedPool<(), ResetNothing> = SharedUnboundedPool::new_without_reset();
200 assert_eq!(pool.pool_size(), 0);
201 assert_eq!(pool.available_resources(), 0);
202
203 let unit = pool.get_default();
204 let _: &() = &unit;
205 assert_eq!(pool.pool_size(), 1);
206 assert_eq!(pool.available_resources(), 0);
207
208 drop(unit);
209 assert_eq!(pool.pool_size(), 1);
210 assert_eq!(pool.available_resources(), 1);
211 }
212
213 #[test]
214 fn init_and_reset() {
215 const SIZE: usize = 10;
216
217 let pool = SharedUnboundedPool::new(|int: &mut usize| *int = 1);
218 let integers: [_; SIZE] = array::from_fn(|_| pool.get(|| 1_usize));
219
220 for (idx, mut integer) in integers.into_iter().enumerate() {
221 assert_eq!(*integer, 1);
222 *integer = idx;
223 assert_eq!(*integer, idx);
224 }
225
226 let integers: [_; SIZE] = array::from_fn(|_| pool.get(|| 2_usize));
228 for integer in integers {
229 assert_eq!(*integer, 1);
230 }
231 }
232
233 #[test]
234 fn no_reset() {
235 const SIZE: usize = 10;
236
237 let pool = SharedUnboundedPool::new(ResetNothing);
238 let integers: [_; SIZE] = array::from_fn(|_| pool.get(|| 1_usize));
239 for (idx, mut integer) in integers.into_iter().enumerate() {
240 assert_eq!(*integer, 1);
241 *integer = idx;
242 assert_eq!(*integer, idx);
243 }
244
245 let integers: [_; SIZE] = array::from_fn(|_| pool.get(|| 1_usize));
247 assert_eq!(*pool.get(|| 11), 11);
249
250 for integer in integers {
251 assert!((0..SIZE).contains(&*integer));
252 }
253 }
254
255 #[test]
256 fn init_and_reset_disagreeing() {
257 let pool = SharedUnboundedPool::new(|int: &mut i32| *int = 2);
258 let first_int = pool.get_default();
259 assert_eq!(*first_int, 0);
260 let second_int = pool.get_default();
261 assert_eq!(*second_int, 0);
262 drop(first_int);
263 let mut reset_first_int = pool.get_default();
264 assert_eq!(*reset_first_int, 2);
265 *reset_first_int = 3;
266 assert_eq!(*reset_first_int, 3);
267 drop(reset_first_int);
268 }
269
270 #[test]
271 fn multithreaded_one_capacity() {
272 let pool: SharedUnboundedPool<i32, _> = SharedUnboundedPool::new_without_reset();
273
274 let cloned_pool = pool.clone();
275
276 assert_eq!(pool.pool_size(), 0);
277 assert_eq!(pool.available_resources(), 0);
278
279 let (signal_main, wait_for_thread) = mpsc::channel();
280 let (signal_thread, wait_for_main) = mpsc::channel();
281
282 thread::spawn(move || {
283 let mut int = cloned_pool.get_default();
284 signal_main.send(()).unwrap();
285 wait_for_main.recv().unwrap();
286 assert_eq!(*int, 0);
287 *int = 1;
288 drop(int);
289 signal_main.send(()).unwrap();
290 });
291
292 wait_for_thread.recv().unwrap();
293 assert_eq!(pool.pool_size(), 1);
294 assert_eq!(pool.available_resources(), 0);
295 signal_thread.send(()).unwrap();
296 wait_for_thread.recv().unwrap();
297 assert_eq!(pool.pool_size(), 1);
298 assert_eq!(pool.available_resources(), 1);
299 assert_eq!(*pool.get_default(), 1);
300 }
301}