spacetimedb_data_structures/
object_pool.rs

1use core::any::type_name;
2use core::fmt;
3use core::sync::atomic::{AtomicUsize, Ordering};
4use crossbeam_queue::ArrayQueue;
5use std::sync::Arc;
6
7#[cfg(not(feature = "memory-usage"))]
8/// An object that can be put into a [`Pool<T>`].
9pub trait PooledObject {}
10
11#[cfg(feature = "memory-usage")]
12/// An object that can be put into a [`Pool<T>`].
13///
14/// The trait exposes hooks that the pool needs
15/// so that it can e.g., implement `MemoryUsage`.
16pub trait PooledObject: spacetimedb_memory_usage::MemoryUsage {
17    /// The storage for the number of bytes in the pool.
18    ///
19    /// When each object in the pool takes up the same size, this can be `()`.
20    /// Otherwise, it will typically be [`AtomicUsize`].
21    type ResidentBytesStorage: Default;
22
23    /// Returns the number of bytes resident in the pool.
24    ///
25    /// The `storage` is provided as well as the `num_objects` in the pool.
26    /// Typically, exactly one of these will be used.
27    fn resident_object_bytes(storage: &Self::ResidentBytesStorage, num_objects: usize) -> usize;
28
29    /// Called by the pool to add `bytes` to `storage`, if necessary.
30    fn add_to_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize);
31
32    /// Called by the pool to subtract `bytes` from `storage`, if necessary.
33    fn sub_from_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize);
34}
35
36/// A pool of some objects of type `T`.
37pub struct Pool<T: PooledObject> {
38    inner: Arc<Inner<T>>,
39}
40
41impl<T: PooledObject> fmt::Debug for Pool<T> {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        let dropped = self.dropped_count();
44        let new = self.new_allocated_count();
45        let reused = self.reused_count();
46        let returned = self.returned_count();
47
48        #[cfg(feature = "memory-usage")]
49        let bytes = T::resident_object_bytes(&self.inner.resident_object_bytes, self.inner.objects.len());
50
51        let mut builder = f.debug_struct(&format!("Pool<{}>", type_name::<T>()));
52
53        #[cfg(feature = "memory-usage")]
54        let builder = builder.field("resident_object_bytes", &bytes);
55
56        builder
57            .field("dropped_count", &dropped)
58            .field("new_allocated_count", &new)
59            .field("reused_count", &reused)
60            .field("returned_count", &returned)
61            .finish()
62    }
63}
64
65impl<T: PooledObject> Clone for Pool<T> {
66    fn clone(&self) -> Self {
67        let inner = self.inner.clone();
68        Self { inner }
69    }
70}
71
72#[cfg(feature = "memory-usage")]
73impl<T: PooledObject> spacetimedb_memory_usage::MemoryUsage for Pool<T> {
74    fn heap_usage(&self) -> usize {
75        let Self { inner } = self;
76        inner.heap_usage()
77    }
78}
79
80impl<T: PooledObject> Pool<T> {
81    /// Returns a new pool with a maximum capacity of `cap`.
82    /// This capacity is fixed over the lifetime of the pool.
83    pub fn new(cap: usize) -> Self {
84        let inner = Arc::new(Inner::new(cap));
85        Self { inner }
86    }
87
88    /// Puts back an object into the pool.
89    pub fn put(&self, object: T) {
90        self.inner.put(object);
91    }
92
93    /// Puts back an object into the pool.
94    pub fn put_many(&self, objects: impl Iterator<Item = T>) {
95        for obj in objects {
96            self.put(obj);
97        }
98    }
99
100    /// Takes an object from the pool or creates a new one.
101    pub fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T {
102        self.inner.take(clear, new)
103    }
104
105    /// Returns the number of pages dropped by the pool because the pool was at capacity.
106    pub fn dropped_count(&self) -> usize {
107        self.inner.dropped_count.load(Ordering::Relaxed)
108    }
109
110    /// Returns the number of fresh objects allocated through the pool.
111    pub fn new_allocated_count(&self) -> usize {
112        self.inner.new_allocated_count.load(Ordering::Relaxed)
113    }
114
115    /// Returns the number of objects reused from the pool.
116    pub fn reused_count(&self) -> usize {
117        self.inner.reused_count.load(Ordering::Relaxed)
118    }
119
120    /// Returns the number of objects returned to the pool.
121    pub fn returned_count(&self) -> usize {
122        self.inner.returned_count.load(Ordering::Relaxed)
123    }
124}
125
126/// The inner actual page pool containing all the logic.
127struct Inner<T: PooledObject> {
128    objects: ArrayQueue<T>,
129    dropped_count: AtomicUsize,
130    new_allocated_count: AtomicUsize,
131    reused_count: AtomicUsize,
132    returned_count: AtomicUsize,
133
134    #[cfg(feature = "memory-usage")]
135    resident_object_bytes: T::ResidentBytesStorage,
136}
137
138#[cfg(feature = "memory-usage")]
139impl<T: PooledObject> spacetimedb_memory_usage::MemoryUsage for Inner<T> {
140    fn heap_usage(&self) -> usize {
141        let Self {
142            objects,
143            dropped_count,
144            new_allocated_count,
145            reused_count,
146            returned_count,
147            resident_object_bytes,
148        } = self;
149        dropped_count.heap_usage() +
150        new_allocated_count.heap_usage() +
151        reused_count.heap_usage() +
152        returned_count.heap_usage() +
153        // This is the amount the queue itself takes up on the heap.
154        objects.capacity() * size_of::<(AtomicUsize, T)>() +
155        // This is the amount the objects take up on the heap, excluding the static size.
156        T::resident_object_bytes(resident_object_bytes, objects.len())
157    }
158}
159
160#[inline]
161fn inc(atomic: &AtomicUsize) {
162    atomic.fetch_add(1, Ordering::Relaxed);
163}
164
165impl<T: PooledObject> Inner<T> {
166    /// Creates a new pool capable of holding `cap` objects.
167    fn new(cap: usize) -> Self {
168        let objects = ArrayQueue::new(cap);
169        Self {
170            objects,
171            dropped_count: <_>::default(),
172            new_allocated_count: <_>::default(),
173            reused_count: <_>::default(),
174            returned_count: <_>::default(),
175
176            #[cfg(feature = "memory-usage")]
177            resident_object_bytes: <_>::default(),
178        }
179    }
180
181    /// Puts back an object into the pool.
182    fn put(&self, object: T) {
183        #[cfg(feature = "memory-usage")]
184        let bytes = object.heap_usage();
185        // Add it to the pool if there's room, or just drop it.
186        if self.objects.push(object).is_ok() {
187            #[cfg(feature = "memory-usage")]
188            T::add_to_resident_object_bytes(&self.resident_object_bytes, bytes);
189
190            inc(&self.returned_count);
191        } else {
192            inc(&self.dropped_count);
193        }
194    }
195
196    /// Takes an object from the pool or creates a new one.
197    ///
198    /// The closure `clear` provides the opportunity to clear the object before use.
199    /// The closure `new` is called to create a new object when the pool is empty.
200    fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T {
201        self.objects
202            .pop()
203            .map(|mut object| {
204                #[cfg(feature = "memory-usage")]
205                T::sub_from_resident_object_bytes(&self.resident_object_bytes, object.heap_usage());
206
207                inc(&self.reused_count);
208                clear(&mut object);
209                object
210            })
211            .unwrap_or_else(|| {
212                inc(&self.new_allocated_count);
213                new()
214            })
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use core::{iter, ptr::addr_eq};
222
223    // The type of pools used for testing.
224    // We want to include a `Box` so that we can do pointer comparisons.
225    type P = Pool<Box<i32>>;
226
227    #[cfg(not(feature = "memory-usage"))]
228    impl PooledObject for Box<i32> {}
229
230    #[cfg(feature = "memory-usage")]
231    impl PooledObject for Box<i32> {
232        type ResidentBytesStorage = ();
233        fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
234        fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
235        fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize {
236            num_objects * size_of::<i32>()
237        }
238    }
239
240    fn new() -> P {
241        P::new(100)
242    }
243
244    fn assert_metrics(pool: &P, dropped: usize, new: usize, reused: usize, returned: usize) {
245        assert_eq!(pool.dropped_count(), dropped);
246        assert_eq!(pool.new_allocated_count(), new);
247        assert_eq!(pool.reused_count(), reused);
248        assert_eq!(pool.returned_count(), returned);
249    }
250
251    fn take(pool: &P) -> Box<i32> {
252        pool.take(|_| {}, || Box::new(0))
253    }
254
255    #[test]
256    fn pool_returns_same_obj() {
257        let pool = new();
258        assert_metrics(&pool, 0, 0, 0, 0);
259
260        // Create an object and put it back.
261        let obj1 = take(&pool);
262        assert_metrics(&pool, 0, 1, 0, 0);
263        let obj1_ptr = &*obj1 as *const _;
264        pool.put(obj1);
265        assert_metrics(&pool, 0, 1, 0, 1);
266
267        // Extract an object again.
268        let obj2 = take(&pool);
269        assert_metrics(&pool, 0, 1, 1, 1);
270        let obj2_ptr = &*obj2 as *const _;
271        // It should be the same as the previous one.
272        assert!(addr_eq(obj1_ptr, obj2_ptr));
273        pool.put(obj2);
274        assert_metrics(&pool, 0, 1, 1, 2);
275
276        // Extract an object again.
277        let obj3 = take(&pool);
278        assert_metrics(&pool, 0, 1, 2, 2);
279        let obj3_ptr = &*obj3 as *const _;
280        // It should be the same as the previous one.
281        assert!(addr_eq(obj1_ptr, obj3_ptr));
282
283        // Manually create an object and put it in.
284        let obj4 = Box::new(0);
285        let obj4_ptr = &*obj4 as *const _;
286        pool.put(obj4);
287        pool.put(obj3);
288        assert_metrics(&pool, 0, 1, 2, 4);
289        // When we take out an object, it should be the same as `obj4` and not `obj1`.
290        let obj5 = take(&pool);
291        assert_metrics(&pool, 0, 1, 3, 4);
292        let obj5_ptr = &*obj5 as *const _;
293        // Same as obj4.
294        assert!(!addr_eq(obj5_ptr, obj1_ptr));
295        assert!(addr_eq(obj5_ptr, obj4_ptr));
296    }
297
298    #[test]
299    fn pool_drops_past_max_size() {
300        const N: usize = 3;
301        let pool = P::new(N);
302
303        let pages = iter::repeat_with(|| take(&pool)).take(N + 1).collect::<Vec<_>>();
304        assert_metrics(&pool, 0, N + 1, 0, 0);
305
306        pool.put_many(pages.into_iter());
307        assert_metrics(&pool, 1, N + 1, 0, N);
308        assert_eq!(pool.inner.objects.len(), N);
309    }
310}