concurrent_pool/
entry.rs

1use std::fmt::{Debug, Display};
2use std::hash::Hash;
3use std::ptr;
4use std::sync::Arc;
5use std::sync::atomic::Ordering::*;
6use std::{ops::Deref, ptr::NonNull, sync::atomic::AtomicUsize};
7
8use crate::Pool;
9
10/// An entry in the pool.
11///
12/// `Entry` holds a reference pointer to an item from the pool and a reference
13/// to the [`Pool`].
14/// When the last `Entry` is dropped, the item is returned to the pool.
15///
16#[derive(Debug)]
17pub struct Entry<'a, T: Default> {
18    // When the last reference is dropped, the item is returned to the pool.
19    // `item` is always `Some` before the last reference is dropped.
20    pub(crate) item: Option<Prc<T>>,
21    pub(crate) pool: &'a Pool<T>,
22}
23
24impl<'a, T: Default> Clone for Entry<'a, T> {
25    /// Makes a clone of the `Entry` that points to the same allocation.
26    fn clone(&self) -> Self {
27        Self {
28            item: self.item.clone(),
29            pool: self.pool,
30        }
31    }
32}
33
34impl<'a, T: Default + PartialEq> PartialEq for Entry<'a, T> {
35    fn eq(&self, other: &Self) -> bool {
36        self.item.eq(&other.item)
37    }
38}
39
40impl<'a, T: Default + Eq> Eq for Entry<'a, T> {}
41
42impl<'a, T: Default + PartialOrd> PartialOrd for Entry<'a, T> {
43    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
44        self.item.partial_cmp(&other.item)
45    }
46}
47
48impl<'a, T: Default + Ord> Ord for Entry<'a, T> {
49    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
50        self.item.cmp(&other.item)
51    }
52}
53
54impl<'a, T: Default + Hash> Hash for Entry<'a, T> {
55    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
56        self.item.hash(state)
57    }
58}
59
60impl<'a, T: Default> Drop for Entry<'a, T> {
61    fn drop(&mut self) {
62        if self.item.as_ref().is_some_and(|i| i.dec_ref() == 1) {
63            // This was the last reference, return to the pool.
64            let item = self.item.take().unwrap();
65            self.pool.recycle(item);
66        }
67    }
68}
69
70impl<'a, T: Default> Deref for Entry<'a, T> {
71    type Target = T;
72    fn deref(&self) -> &Self::Target {
73        self.item.as_ref().unwrap()
74    }
75}
76
77#[cfg(feature = "serde")]
78impl<'a, T: Default + serde::Serialize> serde::Serialize for Entry<'a, T> {
79    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
80    where
81        S: serde::Serializer,
82    {
83        self.get().serialize(serializer)
84    }
85}
86
87impl<'a, T: Default> Entry<'a, T> {
88    /// Get reference to the inner item.
89    pub fn get(&self) -> &T {
90        &self
91    }
92
93    /// Get mutable reference to the inner item if there are no other references.
94    /// Otherwise, return `None`.
95    pub fn get_mut(&mut self) -> Option<&mut T> {
96        Prc::get_mut(self.item.as_mut().unwrap())
97    }
98
99    /// Get mutable reference to the inner item without checking for other references.
100    ///
101    pub unsafe fn get_mut_unchecked(&mut self) -> &mut T {
102        unsafe { Prc::get_mut_unchecked(self.item.as_mut().unwrap()) }
103    }
104}
105
106/// An owned entry in the pool.
107///
108/// `OwnedEntry` holds a reference pointer to an item from the pool and a `Arc`
109/// reference to the [`Pool`].
110/// When the last `OwnedEntry` is dropped, the item is returned to the pool.
111///
112pub struct OwnedEntry<T: Default> {
113    // When the last reference is dropped, the item is returned to the pool.
114    // `item` is always `Some` before the last reference is dropped.
115    pub(crate) item: Option<Prc<T>>,
116    pub(crate) pool: Arc<Pool<T>>,
117}
118
119impl<T: Default> Clone for OwnedEntry<T> {
120    /// Makes a clone of the `OwnedEntry` that points to the same allocation.
121    fn clone(&self) -> Self {
122        Self {
123            item: self.item.clone(),
124            pool: self.pool.clone(),
125        }
126    }
127}
128
129impl<T: Default + PartialEq> PartialEq for OwnedEntry<T> {
130    fn eq(&self, other: &Self) -> bool {
131        self.item.eq(&other.item)
132    }
133}
134
135impl<T: Default + Eq> Eq for OwnedEntry<T> {}
136
137impl<T: Default + PartialOrd> PartialOrd for OwnedEntry<T> {
138    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
139        self.item.partial_cmp(&other.item)
140    }
141}
142
143impl<T: Default + Ord> Ord for OwnedEntry<T> {
144    /// Comparison for two `OwnedEntry`
145    ///
146    /// # Example
147    ///
148    /// ```rust
149    /// use concurrent_pool::Pool;
150    /// use std::sync::Arc;
151    ///
152    /// let pool = Arc::new(Pool::<usize>::with_capacity(2));
153    /// let item1 = pool.pull_owned_with(|i| *i = 1).unwrap();
154    /// let item2 = pool.pull_owned_with(|i| *i = 2).unwrap();
155    /// assert!(item1 < item2);
156    /// ```
157    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
158        self.item.cmp(&other.item)
159    }
160}
161
162impl<T: Default + Hash> Hash for OwnedEntry<T> {
163    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
164        self.item.hash(state)
165    }
166}
167
168impl<T: Default> Drop for OwnedEntry<T> {
169    fn drop(&mut self) {
170        if self.item.as_ref().is_some_and(|i| i.dec_ref() == 1) {
171            // This was the last reference, return to the pool.
172            let item = self.item.take().unwrap();
173            self.pool.recycle(item);
174        }
175    }
176}
177
178impl<T: Default> Deref for OwnedEntry<T> {
179    type Target = T;
180    fn deref(&self) -> &Self::Target {
181        self.item.as_ref().unwrap()
182    }
183}
184
185#[cfg(feature = "serde")]
186impl<T: Default + serde::Serialize> serde::Serialize for OwnedEntry<T> {
187    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
188    where
189        S: serde::Serializer,
190    {
191        self.get().serialize(serializer)
192    }
193}
194
195impl<T: Default> OwnedEntry<T> {
196    /// Get reference to the inner item.
197    pub fn get(&self) -> &T {
198        &self
199    }
200
201    /// Get mutable reference to the inner item if there are no other references.
202    /// Otherwise, return `None`.
203    pub fn get_mut(&mut self) -> Option<&mut T> {
204        Prc::get_mut(self.item.as_mut().unwrap())
205    }
206
207    /// Get mutable reference to the inner item without checking for other references.
208    ///
209    pub unsafe fn get_mut_unchecked(&mut self) -> &mut T {
210        unsafe { Prc::get_mut_unchecked(self.item.as_mut().unwrap()) }
211    }
212}
213
214/// A thread-safe reference-counting pointer. `Prc` stands for 'Pooled
215/// Reference Counted'. This is like `Arc`, but only used in the pool
216/// implemented in this crate.
217///
218/// **Note**: `Drop` is not implemented for `Prc<T>`. The user should carefully
219/// manage the memory of `Prc<T>`. The user should call `drop_slow` when the
220/// last reference is dropped.
221///
222/// # Thread Safety
223///
224/// `Prc<T>` uses atomic operations for its reference counting. This means that
225/// it is thread-safe.
226///
227/// # Cloning references
228///
229/// Creating a new reference from an existing reference-counted pointer is done using the
230/// `Clone` trait implemented for [`Prc<T>`][Prc].
231///
232pub(crate) struct Prc<T: ?Sized> {
233    ptr: NonNull<PrcInner<T>>,
234}
235
236unsafe impl<T: ?Sized + Send + Sync> Send for Prc<T> {}
237unsafe impl<T: ?Sized + Send + Sync> Sync for Prc<T> {}
238
239impl<T: ?Sized> Deref for Prc<T> {
240    type Target = T;
241    fn deref(&self) -> &Self::Target {
242        &self.inner().data
243    }
244}
245
246impl<T: ?Sized + PartialEq> PartialEq for Prc<T> {
247    fn eq(&self, other: &Self) -> bool {
248        self.inner().data.eq(other)
249    }
250}
251impl<T: ?Sized + Eq> Eq for Prc<T> {}
252
253impl<T: ?Sized + PartialOrd> PartialOrd for Prc<T> {
254    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
255        self.inner().data.partial_cmp(other)
256    }
257}
258
259impl<T: ?Sized + Ord> Ord for Prc<T> {
260    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
261        self.inner().data.cmp(other)
262    }
263}
264
265impl<T: ?Sized + Hash> Hash for Prc<T> {
266    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
267        self.inner().data.hash(state)
268    }
269}
270
271impl<T: ?Sized + Debug> Debug for Prc<T> {
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        self.inner().data.fmt(f)
274    }
275}
276
277impl<T: ?Sized + Display> Display for Prc<T> {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        self.inner().data.fmt(f)
280    }
281}
282
283impl<T: ?Sized> Clone for Prc<T> {
284    fn clone(&self) -> Self {
285        self.inc_ref();
286        Self { ptr: self.ptr }
287    }
288}
289
290impl<T> Prc<T> {
291    /// Starting the pointer count as 0 which means it is in the pool without
292    /// any clone instance.
293    #[inline]
294    pub(crate) fn new_zero(data: T) -> Self {
295        let x: Box<_> = Box::new(PrcInner {
296            count: AtomicUsize::new(0),
297            data,
298        });
299        Self {
300            ptr: Box::leak(x).into(),
301        }
302    }
303
304    /// Create a new `Prc<T>` with the reference count starting at 1.
305    #[inline]
306    pub(crate) fn new(data: T) -> Self {
307        let x: Box<_> = Box::new(PrcInner {
308            count: AtomicUsize::new(1),
309            data,
310        });
311        Self {
312            ptr: Box::leak(x).into(),
313        }
314    }
315}
316
317impl<T: ?Sized> Prc<T> {
318    /// Increase the reference count and return the previous count.
319    #[inline]
320    pub(crate) fn inc_ref(&self) -> usize {
321        self.inner().count.fetch_add(1, Relaxed)
322    }
323
324    /// Decrease the reference count and return the previous count.
325    #[inline]
326    pub(crate) fn dec_ref(&self) -> usize {
327        self.inner().count.fetch_sub(1, Release)
328    }
329
330    /// Drops the inner data.
331    pub(crate) unsafe fn drop_slow(&self) {
332        unsafe { ptr::drop_in_place(&mut (*self.ptr.as_ptr()).data) };
333    }
334
335    #[inline]
336    pub unsafe fn get_mut_unchecked(this: &mut Self) -> &mut T {
337        unsafe { &mut (*this.ptr.as_ptr()).data }
338    }
339
340    #[inline]
341    pub fn get_mut(this: &mut Self) -> Option<&mut T> {
342        // Only one reference exists or in the pool.
343        if this.inner().count.load(Acquire) <= 1 {
344            unsafe { Some(Prc::get_mut_unchecked(this)) }
345        } else {
346            None
347        }
348    }
349
350    #[inline]
351    fn inner(&self) -> &PrcInner<T> {
352        unsafe { self.ptr.as_ref() }
353    }
354}
355
356struct PrcInner<T: ?Sized> {
357    count: AtomicUsize,
358    data: T,
359}
360
361unsafe impl<T: ?Sized + Send + Sync> Send for PrcInner<T> {}
362unsafe impl<T: ?Sized + Send + Sync> Sync for PrcInner<T> {}