concurrent_pool/
entry.rs

1use std::fmt::{Debug, Display};
2use std::hash::Hash;
3use std::sync::Arc;
4use std::sync::atomic::Ordering::*;
5use std::{ops::Deref, ptr::NonNull, sync::atomic::AtomicUsize};
6
7use crate::Pool;
8
9/// An entry in the pool.
10///
11/// `Entry` holds a reference pointer to an item from the pool and a reference
12/// to the [`Pool`].
13/// When the last `Entry` is dropped, the item is returned to the pool.
14///
15#[derive(Debug)]
16pub struct Entry<'a, T: Default> {
17    // When the last reference is dropped, the item is returned to the pool.
18    // `item` is always `Some` before the last reference is dropped.
19    pub(crate) item: Option<Prc<T>>,
20    pub(crate) pool: &'a Pool<T>,
21}
22
23impl<'a, T: Default> Clone for Entry<'a, T> {
24    /// Makes a clone of the `Entry` that points to the same allocation.
25    fn clone(&self) -> Self {
26        Self {
27            item: self.item.clone(),
28            pool: self.pool,
29        }
30    }
31}
32
33impl<'a, T: Default + PartialEq> PartialEq for Entry<'a, T> {
34    fn eq(&self, other: &Self) -> bool {
35        self.item.eq(&other.item)
36    }
37}
38
39impl<'a, T: Default + Eq> Eq for Entry<'a, T> {}
40
41impl<'a, T: Default + PartialOrd> PartialOrd for Entry<'a, T> {
42    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
43        self.item.partial_cmp(&other.item)
44    }
45}
46
47impl<'a, T: Default + Ord> Ord for Entry<'a, T> {
48    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
49        self.item.cmp(&other.item)
50    }
51}
52
53impl<'a, T: Default + Hash> Hash for Entry<'a, T> {
54    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
55        self.item.hash(state)
56    }
57}
58
59impl<'a, T: Default> Drop for Entry<'a, T> {
60    fn drop(&mut self) {
61        if self.item.as_ref().is_some_and(|i| i.dec_ref() == 1) {
62            // This was the last reference, return to the pool.
63            let item = self.item.take().unwrap();
64            self.pool.recycle(item);
65        }
66    }
67}
68
69impl<'a, T: Default> Deref for Entry<'a, T> {
70    type Target = T;
71    fn deref(&self) -> &Self::Target {
72        self.item.as_ref().unwrap()
73    }
74}
75
76#[cfg(feature = "serde")]
77impl<'a, T: Default + serde::Serialize> serde::Serialize for Entry<'a, T> {
78    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
79    where
80        S: serde::Serializer,
81    {
82        self.get().serialize(serializer)
83    }
84}
85
86impl<'a, T: Default> Entry<'a, T> {
87    /// Get reference to the inner item.
88    pub fn get(&self) -> &T {
89        &self
90    }
91
92    /// Get mutable reference to the inner item if there are no other references.
93    /// Otherwise, return `None`.
94    pub fn get_mut(&mut self) -> Option<&mut T> {
95        Prc::get_mut(self.item.as_mut().unwrap())
96    }
97
98    /// Get mutable reference to the inner item without checking for other references.
99    ///
100    pub unsafe fn get_mut_unchecked(&mut self) -> &mut T {
101        unsafe { Prc::get_mut_unchecked(self.item.as_mut().unwrap()) }
102    }
103}
104
105/// An owned entry in the pool.
106///
107/// `OwnedEntry` holds a reference pointer to an item from the pool and a `Arc`
108/// reference to the [`Pool`].
109/// When the last `OwnedEntry` is dropped, the item is returned to the pool.
110///
111pub struct OwnedEntry<T: Default> {
112    // When the last reference is dropped, the item is returned to the pool.
113    // `item` is always `Some` before the last reference is dropped.
114    pub(crate) item: Option<Prc<T>>,
115    pub(crate) pool: Arc<Pool<T>>,
116}
117
118impl<T: Default> Clone for OwnedEntry<T> {
119    /// Makes a clone of the `OwnedEntry` that points to the same allocation.
120    fn clone(&self) -> Self {
121        Self {
122            item: self.item.clone(),
123            pool: self.pool.clone(),
124        }
125    }
126}
127
128impl<T: Default + PartialEq> PartialEq for OwnedEntry<T> {
129    fn eq(&self, other: &Self) -> bool {
130        self.item.eq(&other.item)
131    }
132}
133
134impl<T: Default + Eq> Eq for OwnedEntry<T> {}
135
136impl<T: Default + PartialOrd> PartialOrd for OwnedEntry<T> {
137    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
138        self.item.partial_cmp(&other.item)
139    }
140}
141
142impl<T: Default + Ord> Ord for OwnedEntry<T> {
143    /// Comparison for two `OwnedEntry`
144    ///
145    /// # Example
146    ///
147    /// ```rust
148    /// use concurrent_pool::Pool;
149    /// use std::sync::Arc;
150    ///
151    /// let pool = Arc::new(Pool::<usize>::with_capacity(2));
152    /// let item1 = pool.pull_owned_with(|i| *i = 1).unwrap();
153    /// let item2 = pool.pull_owned_with(|i| *i = 2).unwrap();
154    /// assert!(item1 < item2);
155    /// ```
156    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
157        self.item.cmp(&other.item)
158    }
159}
160
161impl<T: Default + Hash> Hash for OwnedEntry<T> {
162    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
163        self.item.hash(state)
164    }
165}
166
167impl<T: Default> Drop for OwnedEntry<T> {
168    fn drop(&mut self) {
169        if self.item.as_ref().is_some_and(|i| i.dec_ref() == 1) {
170            // This was the last reference, return to the pool.
171            let item = self.item.take().unwrap();
172            self.pool.recycle(item);
173        }
174    }
175}
176
177impl<T: Default> Deref for OwnedEntry<T> {
178    type Target = T;
179    fn deref(&self) -> &Self::Target {
180        self.item.as_ref().unwrap()
181    }
182}
183
184#[cfg(feature = "serde")]
185impl<T: Default + serde::Serialize> serde::Serialize for OwnedEntry<T> {
186    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
187    where
188        S: serde::Serializer,
189    {
190        self.get().serialize(serializer)
191    }
192}
193
194impl<T: Default> OwnedEntry<T> {
195    /// Get reference to the inner item.
196    pub fn get(&self) -> &T {
197        &self
198    }
199
200    /// Get mutable reference to the inner item if there are no other references.
201    /// Otherwise, return `None`.
202    pub fn get_mut(&mut self) -> Option<&mut T> {
203        Prc::get_mut(self.item.as_mut().unwrap())
204    }
205
206    /// Get mutable reference to the inner item without checking for other references.
207    ///
208    pub unsafe fn get_mut_unchecked(&mut self) -> &mut T {
209        unsafe { Prc::get_mut_unchecked(self.item.as_mut().unwrap()) }
210    }
211}
212
213/// A thread-safe reference-counting pointer. `Prc` stands for 'Pooled
214/// Reference Counted'. This is like `Arc`, but only used in the pool
215/// implemented in this crate.
216///
217/// **Note**: `Drop` is not implemented for `Prc<T>`. The user should carefully
218/// manage the memory of `Prc<T>`. The user should call `drop_slow` when the
219/// last reference is dropped.
220///
221/// # Thread Safety
222///
223/// `Prc<T>` uses atomic operations for its reference counting. This means that
224/// it is thread-safe.
225///
226/// # Cloning references
227///
228/// Creating a new reference from an existing reference-counted pointer is done using the
229/// `Clone` trait implemented for [`Prc<T>`][Prc].
230///
231pub(crate) struct Prc<T: ?Sized> {
232    ptr: NonNull<PrcInner<T>>,
233}
234
235unsafe impl<T: ?Sized + Send + Sync> Send for Prc<T> {}
236unsafe impl<T: ?Sized + Send + Sync> Sync for Prc<T> {}
237
238impl<T: ?Sized> Deref for Prc<T> {
239    type Target = T;
240    fn deref(&self) -> &Self::Target {
241        &self.inner().data
242    }
243}
244
245impl<T: ?Sized + PartialEq> PartialEq for Prc<T> {
246    fn eq(&self, other: &Self) -> bool {
247        self.inner().data.eq(other)
248    }
249}
250impl<T: ?Sized + Eq> Eq for Prc<T> {}
251
252impl<T: ?Sized + PartialOrd> PartialOrd for Prc<T> {
253    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
254        self.inner().data.partial_cmp(other)
255    }
256}
257
258impl<T: ?Sized + Ord> Ord for Prc<T> {
259    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
260        self.inner().data.cmp(other)
261    }
262}
263
264impl<T: ?Sized + Hash> Hash for Prc<T> {
265    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
266        self.inner().data.hash(state)
267    }
268}
269
270impl<T: ?Sized + Debug> Debug for Prc<T> {
271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272        self.inner().data.fmt(f)
273    }
274}
275
276impl<T: ?Sized + Display> Display for Prc<T> {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        self.inner().data.fmt(f)
279    }
280}
281
282impl<T: ?Sized> Clone for Prc<T> {
283    fn clone(&self) -> Self {
284        self.inc_ref();
285        Self { ptr: self.ptr }
286    }
287}
288
289impl<T> Prc<T> {
290    /// Starting the pointer count as 0 which means it is in the pool without
291    /// any clone instance.
292    #[inline]
293    pub(crate) fn new_zero(data: T) -> Self {
294        let x: Box<_> = Box::new(PrcInner {
295            count: AtomicUsize::new(0),
296            data,
297        });
298        Self {
299            ptr: Box::leak(x).into(),
300        }
301    }
302
303    /// Create a new `Prc<T>` with the reference count starting at 1.
304    #[inline]
305    pub(crate) fn new(data: T) -> Self {
306        let x: Box<_> = Box::new(PrcInner {
307            count: AtomicUsize::new(1),
308            data,
309        });
310        Self {
311            ptr: Box::leak(x).into(),
312        }
313    }
314}
315
316impl<T: ?Sized> Prc<T> {
317    /// Increase the reference count and return the previous count.
318    #[inline]
319    pub(crate) fn inc_ref(&self) -> usize {
320        self.inner().count.fetch_add(1, Relaxed)
321    }
322
323    /// Decrease the reference count and return the previous count.
324    #[inline]
325    pub(crate) fn dec_ref(&self) -> usize {
326        self.inner().count.fetch_sub(1, Release)
327    }
328
329    /// Drops the inner data.
330    pub(crate) unsafe fn drop_slow(&self) {
331        unsafe {
332            drop(Box::from_raw(self.ptr.as_ptr()));
333        }
334    }
335
336    #[inline]
337    pub unsafe fn get_mut_unchecked(this: &mut Self) -> &mut T {
338        unsafe { &mut (*this.ptr.as_ptr()).data }
339    }
340
341    #[inline]
342    pub fn get_mut(this: &mut Self) -> Option<&mut T> {
343        // Only one reference exists or in the pool.
344        if this.inner().count.load(Acquire) <= 1 {
345            unsafe { Some(Prc::get_mut_unchecked(this)) }
346        } else {
347            None
348        }
349    }
350
351    #[inline]
352    fn inner(&self) -> &PrcInner<T> {
353        unsafe { self.ptr.as_ref() }
354    }
355}
356
357struct PrcInner<T: ?Sized> {
358    count: AtomicUsize,
359    data: T,
360}
361
362unsafe impl<T: ?Sized + Send + Sync> Send for PrcInner<T> {}
363unsafe impl<T: ?Sized + Send + Sync> Sync for PrcInner<T> {}