spacetimedb_data_structures/
object_pool.rs1use core::any::type_name;
2use core::fmt;
3use core::sync::atomic::{AtomicUsize, Ordering};
4use crossbeam_queue::ArrayQueue;
5use std::sync::Arc;
6
7#[cfg(not(feature = "memory-usage"))]
8pub trait PooledObject {}
10
11#[cfg(feature = "memory-usage")]
12pub trait PooledObject: spacetimedb_memory_usage::MemoryUsage {
17 type ResidentBytesStorage: Default;
22
23 fn resident_object_bytes(storage: &Self::ResidentBytesStorage, num_objects: usize) -> usize;
28
29 fn add_to_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize);
31
32 fn sub_from_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize);
34}
35
36pub struct Pool<T: PooledObject> {
38 inner: Arc<Inner<T>>,
39}
40
41impl<T: PooledObject> fmt::Debug for Pool<T> {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 let dropped = self.dropped_count();
44 let new = self.new_allocated_count();
45 let reused = self.reused_count();
46 let returned = self.returned_count();
47
48 #[cfg(feature = "memory-usage")]
49 let bytes = T::resident_object_bytes(&self.inner.resident_object_bytes, self.inner.objects.len());
50
51 let mut builder = f.debug_struct(&format!("Pool<{}>", type_name::<T>()));
52
53 #[cfg(feature = "memory-usage")]
54 let builder = builder.field("resident_object_bytes", &bytes);
55
56 builder
57 .field("dropped_count", &dropped)
58 .field("new_allocated_count", &new)
59 .field("reused_count", &reused)
60 .field("returned_count", &returned)
61 .finish()
62 }
63}
64
65impl<T: PooledObject> Clone for Pool<T> {
66 fn clone(&self) -> Self {
67 let inner = self.inner.clone();
68 Self { inner }
69 }
70}
71
72#[cfg(feature = "memory-usage")]
73impl<T: PooledObject> spacetimedb_memory_usage::MemoryUsage for Pool<T> {
74 fn heap_usage(&self) -> usize {
75 let Self { inner } = self;
76 inner.heap_usage()
77 }
78}
79
80impl<T: PooledObject> Pool<T> {
81 pub fn new(cap: usize) -> Self {
84 let inner = Arc::new(Inner::new(cap));
85 Self { inner }
86 }
87
88 pub fn put(&self, object: T) {
90 self.inner.put(object);
91 }
92
93 pub fn put_many(&self, objects: impl Iterator<Item = T>) {
95 for obj in objects {
96 self.put(obj);
97 }
98 }
99
100 pub fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T {
102 self.inner.take(clear, new)
103 }
104
105 pub fn dropped_count(&self) -> usize {
107 self.inner.dropped_count.load(Ordering::Relaxed)
108 }
109
110 pub fn new_allocated_count(&self) -> usize {
112 self.inner.new_allocated_count.load(Ordering::Relaxed)
113 }
114
115 pub fn reused_count(&self) -> usize {
117 self.inner.reused_count.load(Ordering::Relaxed)
118 }
119
120 pub fn returned_count(&self) -> usize {
122 self.inner.returned_count.load(Ordering::Relaxed)
123 }
124}
125
126struct Inner<T: PooledObject> {
128 objects: ArrayQueue<T>,
129 dropped_count: AtomicUsize,
130 new_allocated_count: AtomicUsize,
131 reused_count: AtomicUsize,
132 returned_count: AtomicUsize,
133
134 #[cfg(feature = "memory-usage")]
135 resident_object_bytes: T::ResidentBytesStorage,
136}
137
138#[cfg(feature = "memory-usage")]
139impl<T: PooledObject> spacetimedb_memory_usage::MemoryUsage for Inner<T> {
140 fn heap_usage(&self) -> usize {
141 let Self {
142 objects,
143 dropped_count,
144 new_allocated_count,
145 reused_count,
146 returned_count,
147 resident_object_bytes,
148 } = self;
149 dropped_count.heap_usage() +
150 new_allocated_count.heap_usage() +
151 reused_count.heap_usage() +
152 returned_count.heap_usage() +
153 objects.capacity() * size_of::<(AtomicUsize, T)>() +
155 T::resident_object_bytes(resident_object_bytes, objects.len())
157 }
158}
159
160#[inline]
161fn inc(atomic: &AtomicUsize) {
162 atomic.fetch_add(1, Ordering::Relaxed);
163}
164
165impl<T: PooledObject> Inner<T> {
166 fn new(cap: usize) -> Self {
168 let objects = ArrayQueue::new(cap);
169 Self {
170 objects,
171 dropped_count: <_>::default(),
172 new_allocated_count: <_>::default(),
173 reused_count: <_>::default(),
174 returned_count: <_>::default(),
175
176 #[cfg(feature = "memory-usage")]
177 resident_object_bytes: <_>::default(),
178 }
179 }
180
181 fn put(&self, object: T) {
183 #[cfg(feature = "memory-usage")]
184 let bytes = object.heap_usage();
185 if self.objects.push(object).is_ok() {
187 #[cfg(feature = "memory-usage")]
188 T::add_to_resident_object_bytes(&self.resident_object_bytes, bytes);
189
190 inc(&self.returned_count);
191 } else {
192 inc(&self.dropped_count);
193 }
194 }
195
196 fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T {
201 self.objects
202 .pop()
203 .map(|mut object| {
204 #[cfg(feature = "memory-usage")]
205 T::sub_from_resident_object_bytes(&self.resident_object_bytes, object.heap_usage());
206
207 inc(&self.reused_count);
208 clear(&mut object);
209 object
210 })
211 .unwrap_or_else(|| {
212 inc(&self.new_allocated_count);
213 new()
214 })
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use core::{iter, ptr::addr_eq};
222
223 type P = Pool<Box<i32>>;
226
227 #[cfg(not(feature = "memory-usage"))]
228 impl PooledObject for Box<i32> {}
229
230 #[cfg(feature = "memory-usage")]
231 impl PooledObject for Box<i32> {
232 type ResidentBytesStorage = ();
233 fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
234 fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
235 fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize {
236 num_objects * size_of::<i32>()
237 }
238 }
239
240 fn new() -> P {
241 P::new(100)
242 }
243
244 fn assert_metrics(pool: &P, dropped: usize, new: usize, reused: usize, returned: usize) {
245 assert_eq!(pool.dropped_count(), dropped);
246 assert_eq!(pool.new_allocated_count(), new);
247 assert_eq!(pool.reused_count(), reused);
248 assert_eq!(pool.returned_count(), returned);
249 }
250
251 fn take(pool: &P) -> Box<i32> {
252 pool.take(|_| {}, || Box::new(0))
253 }
254
255 #[test]
256 fn pool_returns_same_obj() {
257 let pool = new();
258 assert_metrics(&pool, 0, 0, 0, 0);
259
260 let obj1 = take(&pool);
262 assert_metrics(&pool, 0, 1, 0, 0);
263 let obj1_ptr = &*obj1 as *const _;
264 pool.put(obj1);
265 assert_metrics(&pool, 0, 1, 0, 1);
266
267 let obj2 = take(&pool);
269 assert_metrics(&pool, 0, 1, 1, 1);
270 let obj2_ptr = &*obj2 as *const _;
271 assert!(addr_eq(obj1_ptr, obj2_ptr));
273 pool.put(obj2);
274 assert_metrics(&pool, 0, 1, 1, 2);
275
276 let obj3 = take(&pool);
278 assert_metrics(&pool, 0, 1, 2, 2);
279 let obj3_ptr = &*obj3 as *const _;
280 assert!(addr_eq(obj1_ptr, obj3_ptr));
282
283 let obj4 = Box::new(0);
285 let obj4_ptr = &*obj4 as *const _;
286 pool.put(obj4);
287 pool.put(obj3);
288 assert_metrics(&pool, 0, 1, 2, 4);
289 let obj5 = take(&pool);
291 assert_metrics(&pool, 0, 1, 3, 4);
292 let obj5_ptr = &*obj5 as *const _;
293 assert!(!addr_eq(obj5_ptr, obj1_ptr));
295 assert!(addr_eq(obj5_ptr, obj4_ptr));
296 }
297
298 #[test]
299 fn pool_drops_past_max_size() {
300 const N: usize = 3;
301 let pool = P::new(N);
302
303 let pages = iter::repeat_with(|| take(&pool)).take(N + 1).collect::<Vec<_>>();
304 assert_metrics(&pool, 0, N + 1, 0, 0);
305
306 pool.put_many(pages.into_iter());
307 assert_metrics(&pool, 1, N + 1, 0, N);
308 assert_eq!(pool.inner.objects.len(), N);
309 }
310}