concurrent_pool/
entry.rs

1use std::ptr;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::{ops::Deref, ptr::NonNull, sync::atomic::AtomicUsize};
5
6use crate::Pool;
7
8/// An entry in the pool.
9///
10/// `Entry` holds a reference pointer to an item from the pool and a reference
11/// to the [`Pool`].
12/// When the last `Entry` is dropped, the item is returned to the pool.
13///
14pub struct Entry<'a, T: Default> {
15    // When the last reference is dropped, the item is returned to the pool.
16    // `item` is always `Some` before the last reference is dropped.
17    pub(crate) item: Option<Prc<T>>,
18    pub(crate) pool: &'a Pool<T>,
19}
20
21impl<'a, T: Default> Clone for Entry<'a, T> {
22    /// Makes a clone of the `Entry` that points to the same allocation.
23    fn clone(&self) -> Self {
24        Self {
25            item: self.item.clone(),
26            pool: self.pool,
27        }
28    }
29}
30
31impl<'a, T: Default> Drop for Entry<'a, T> {
32    fn drop(&mut self) {
33        if self.item.as_ref().is_some_and(|i| i.dec_ref() == 1) {
34            // This was the last reference, return to the pool.
35            let item = self.item.take().unwrap();
36            self.pool.recycle(item);
37        }
38    }
39}
40
41impl<'a, T: Default> Deref for Entry<'a, T> {
42    type Target = T;
43    fn deref(&self) -> &Self::Target {
44        self.item.as_ref().unwrap()
45    }
46}
47
48#[cfg(feature = "serde")]
49impl<'a, T: Default + serde::Serialize> serde::Serialize for Entry<'a, T> {
50    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
51    where
52        S: serde::Serializer,
53    {
54        self.get().serialize(serializer)
55    }
56}
57
58impl<'a, T: Default> Entry<'a, T> {
59    /// Get reference to the inner item.
60    pub fn get(&self) -> &T {
61        &self
62    }
63
64    /// Get mutable reference to the inner item if there are no other references.
65    /// Otherwise, return `None`.
66    pub fn get_mut(&mut self) -> Option<&mut T> {
67        Prc::get_mut(self.item.as_mut().unwrap())
68    }
69
70    /// Get mutable reference to the inner item without checking for other references.
71    ///
72    pub unsafe fn get_mut_unchecked(&mut self) -> &mut T {
73        unsafe { Prc::get_mut_unchecked(self.item.as_mut().unwrap()) }
74    }
75}
76
77/// An owned entry in the pool.
78///
79/// `OwnedEntry` holds a reference pointer to an item from the pool and a `Arc`
80/// reference to the [`Pool`].
81/// When the last `OwnedEntry` is dropped, the item is returned to the pool.
82///
83pub struct OwnedEntry<T: Default> {
84    // When the last reference is dropped, the item is returned to the pool.
85    // `item` is always `Some` before the last reference is dropped.
86    pub(crate) item: Option<Prc<T>>,
87    pub(crate) pool: Arc<Pool<T>>,
88}
89
90impl<T: Default> Clone for OwnedEntry<T> {
91    /// Makes a clone of the `OwnedEntry` that points to the same allocation.
92    fn clone(&self) -> Self {
93        Self {
94            item: self.item.clone(),
95            pool: self.pool.clone(),
96        }
97    }
98}
99
100impl<T: Default> Drop for OwnedEntry<T> {
101    fn drop(&mut self) {
102        if self.item.as_ref().is_some_and(|i| i.dec_ref() == 1) {
103            // This was the last reference, return to the pool.
104            let item = self.item.take().unwrap();
105            self.pool.recycle(item);
106        }
107    }
108}
109
110impl<T: Default> Deref for OwnedEntry<T> {
111    type Target = T;
112    fn deref(&self) -> &Self::Target {
113        self.item.as_ref().unwrap()
114    }
115}
116
117#[cfg(feature = "serde")]
118impl<T: Default + serde::Serialize> serde::Serialize for OwnedEntry<T> {
119    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
120    where
121        S: serde::Serializer,
122    {
123        self.get().serialize(serializer)
124    }
125}
126
127impl<T: Default> OwnedEntry<T> {
128    /// Get reference to the inner item.
129    pub fn get(&self) -> &T {
130        &self
131    }
132
133    /// Get mutable reference to the inner item if there are no other references.
134    /// Otherwise, return `None`.
135    pub fn get_mut(&mut self) -> Option<&mut T> {
136        Prc::get_mut(self.item.as_mut().unwrap())
137    }
138
139    /// Get mutable reference to the inner item without checking for other references.
140    ///
141    pub unsafe fn get_mut_unchecked(&mut self) -> &mut T {
142        unsafe { Prc::get_mut_unchecked(self.item.as_mut().unwrap()) }
143    }
144}
145
146/// A thread-safe reference-counting pointer. `Prc` stands for 'Pooled
147/// Reference Counted'. This is like `Arc`, but only used in the pool
148/// implemented in this crate.
149///
150/// **Note**: `Drop` is not implemented for `Prc<T>`. The user should carefully
151/// manage the memory of `Prc<T>`. The user should call `drop_slow` when the
152/// last reference is dropped.
153///
154/// # Thread Safety
155///
156/// `Prc<T>` uses atomic operations for its reference counting. This means that
157/// it is thread-safe.
158///
159/// # Cloning references
160///
161/// Creating a new reference from an existing reference-counted pointer is done using the
162/// `Clone` trait implemented for [`Prc<T>`][Prc].
163///
164pub(crate) struct Prc<T: ?Sized> {
165    ptr: NonNull<PrcInner<T>>,
166}
167
168unsafe impl<T: ?Sized + Send + Sync> Send for Prc<T> {}
169unsafe impl<T: ?Sized + Send + Sync> Sync for Prc<T> {}
170
171impl<T: ?Sized> Deref for Prc<T> {
172    type Target = T;
173    fn deref(&self) -> &Self::Target {
174        &self.inner().data
175    }
176}
177
178impl<T: ?Sized> Clone for Prc<T> {
179    fn clone(&self) -> Self {
180        self.inc_ref();
181        Self { ptr: self.ptr }
182    }
183}
184
185impl<T> Prc<T> {
186    /// Starting the pointer count as 0 which means it is in the pool without
187    /// any clone instance.
188    #[inline]
189    pub(crate) fn new_zero(data: T) -> Self {
190        let x: Box<_> = Box::new(PrcInner {
191            count: AtomicUsize::new(0),
192            data,
193        });
194        Self {
195            ptr: Box::leak(x).into(),
196        }
197    }
198
199    /// Create a new `Prc<T>` with the reference count starting at 1.
200    #[inline]
201    pub(crate) fn new(data: T) -> Self {
202        let x: Box<_> = Box::new(PrcInner {
203            count: AtomicUsize::new(1),
204            data,
205        });
206        Self {
207            ptr: Box::leak(x).into(),
208        }
209    }
210}
211
212impl<T: ?Sized> Prc<T> {
213    /// Increase the reference count and return the previous count.
214    #[inline]
215    pub(crate) fn inc_ref(&self) -> usize {
216        self.inner().count.fetch_add(1, Relaxed)
217    }
218
219    /// Decrease the reference count and return the previous count.
220    #[inline]
221    pub(crate) fn dec_ref(&self) -> usize {
222        self.inner().count.fetch_sub(1, Release)
223    }
224
225    /// Drops the inner data.
226    pub(crate) unsafe fn drop_slow(&self) {
227        unsafe { ptr::drop_in_place(&mut (*self.ptr.as_ptr()).data) };
228    }
229
230    #[inline]
231    pub unsafe fn get_mut_unchecked(this: &mut Self) -> &mut T {
232        unsafe { &mut (*this.ptr.as_ptr()).data }
233    }
234
235    #[inline]
236    pub fn get_mut(this: &mut Self) -> Option<&mut T> {
237        // Only one reference exists or in the pool.
238        if this.inner().count.load(Acquire) <= 1 {
239            unsafe { Some(Prc::get_mut_unchecked(this)) }
240        } else {
241            None
242        }
243    }
244
245    #[inline]
246    fn inner(&self) -> &PrcInner<T> {
247        unsafe { self.ptr.as_ref() }
248    }
249}
250
251struct PrcInner<T: ?Sized> {
252    count: AtomicUsize,
253    data: T,
254}
255
256unsafe impl<T: ?Sized + Send + Sync> Send for PrcInner<T> {}
257unsafe impl<T: ?Sized + Send + Sync> Sync for PrcInner<T> {}