use core::any::type_name;
use core::fmt;
use core::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_queue::ArrayQueue;
use std::sync::Arc;
#[cfg(not(feature = "memory-usage"))]
pub trait PooledObject {}
#[cfg(feature = "memory-usage")]
pub trait PooledObject: spacetimedb_memory_usage::MemoryUsage {
type ResidentBytesStorage: Default;
fn resident_object_bytes(storage: &Self::ResidentBytesStorage, num_objects: usize) -> usize;
fn add_to_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize);
fn sub_from_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize);
}
pub struct Pool<T: PooledObject> {
inner: Arc<Inner<T>>,
}
impl<T: PooledObject> fmt::Debug for Pool<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let dropped = self.dropped_count();
let new = self.new_allocated_count();
let reused = self.reused_count();
let returned = self.returned_count();
#[cfg(feature = "memory-usage")]
let bytes = T::resident_object_bytes(&self.inner.resident_object_bytes, self.inner.objects.len());
let mut builder = f.debug_struct(&format!("Pool<{}>", type_name::<T>()));
#[cfg(feature = "memory-usage")]
let builder = builder.field("resident_object_bytes", &bytes);
builder
.field("dropped_count", &dropped)
.field("new_allocated_count", &new)
.field("reused_count", &reused)
.field("returned_count", &returned)
.finish()
}
}
impl<T: PooledObject> Clone for Pool<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self { inner }
}
}
#[cfg(feature = "memory-usage")]
impl<T: PooledObject> spacetimedb_memory_usage::MemoryUsage for Pool<T> {
fn heap_usage(&self) -> usize {
let Self { inner } = self;
inner.heap_usage()
}
}
impl<T: PooledObject> Pool<T> {
pub fn new(cap: usize) -> Self {
let inner = Arc::new(Inner::new(cap));
Self { inner }
}
pub fn put(&self, object: T) {
self.inner.put(object);
}
pub fn put_many(&self, objects: impl Iterator<Item = T>) {
for obj in objects {
self.put(obj);
}
}
pub fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T {
self.inner.take(clear, new)
}
pub fn dropped_count(&self) -> usize {
self.inner.dropped_count.load(Ordering::Relaxed)
}
pub fn new_allocated_count(&self) -> usize {
self.inner.new_allocated_count.load(Ordering::Relaxed)
}
pub fn reused_count(&self) -> usize {
self.inner.reused_count.load(Ordering::Relaxed)
}
pub fn returned_count(&self) -> usize {
self.inner.returned_count.load(Ordering::Relaxed)
}
}
struct Inner<T: PooledObject> {
objects: ArrayQueue<T>,
dropped_count: AtomicUsize,
new_allocated_count: AtomicUsize,
reused_count: AtomicUsize,
returned_count: AtomicUsize,
#[cfg(feature = "memory-usage")]
resident_object_bytes: T::ResidentBytesStorage,
}
#[cfg(feature = "memory-usage")]
impl<T: PooledObject> spacetimedb_memory_usage::MemoryUsage for Inner<T> {
fn heap_usage(&self) -> usize {
let Self {
objects,
dropped_count,
new_allocated_count,
reused_count,
returned_count,
resident_object_bytes,
} = self;
dropped_count.heap_usage() +
new_allocated_count.heap_usage() +
reused_count.heap_usage() +
returned_count.heap_usage() +
objects.capacity() * size_of::<(AtomicUsize, T)>() +
T::resident_object_bytes(resident_object_bytes, objects.len())
}
}
#[inline]
fn inc(atomic: &AtomicUsize) {
atomic.fetch_add(1, Ordering::Relaxed);
}
impl<T: PooledObject> Inner<T> {
fn new(cap: usize) -> Self {
let objects = ArrayQueue::new(cap);
Self {
objects,
dropped_count: <_>::default(),
new_allocated_count: <_>::default(),
reused_count: <_>::default(),
returned_count: <_>::default(),
#[cfg(feature = "memory-usage")]
resident_object_bytes: <_>::default(),
}
}
fn put(&self, object: T) {
#[cfg(feature = "memory-usage")]
let bytes = object.heap_usage();
if self.objects.push(object).is_ok() {
#[cfg(feature = "memory-usage")]
T::add_to_resident_object_bytes(&self.resident_object_bytes, bytes);
inc(&self.returned_count);
} else {
inc(&self.dropped_count);
}
}
fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T {
self.objects
.pop()
.map(|mut object| {
#[cfg(feature = "memory-usage")]
T::sub_from_resident_object_bytes(&self.resident_object_bytes, object.heap_usage());
inc(&self.reused_count);
clear(&mut object);
object
})
.unwrap_or_else(|| {
inc(&self.new_allocated_count);
new()
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::{iter, ptr::addr_eq};
type P = Pool<Box<i32>>;
#[cfg(not(feature = "memory-usage"))]
impl PooledObject for Box<i32> {}
#[cfg(feature = "memory-usage")]
impl PooledObject for Box<i32> {
type ResidentBytesStorage = ();
fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize {
num_objects * size_of::<i32>()
}
}
fn new() -> P {
P::new(100)
}
fn assert_metrics(pool: &P, dropped: usize, new: usize, reused: usize, returned: usize) {
assert_eq!(pool.dropped_count(), dropped);
assert_eq!(pool.new_allocated_count(), new);
assert_eq!(pool.reused_count(), reused);
assert_eq!(pool.returned_count(), returned);
}
fn take(pool: &P) -> Box<i32> {
pool.take(|_| {}, || Box::new(0))
}
#[test]
fn pool_returns_same_obj() {
let pool = new();
assert_metrics(&pool, 0, 0, 0, 0);
let obj1 = take(&pool);
assert_metrics(&pool, 0, 1, 0, 0);
let obj1_ptr = &*obj1 as *const _;
pool.put(obj1);
assert_metrics(&pool, 0, 1, 0, 1);
let obj2 = take(&pool);
assert_metrics(&pool, 0, 1, 1, 1);
let obj2_ptr = &*obj2 as *const _;
assert!(addr_eq(obj1_ptr, obj2_ptr));
pool.put(obj2);
assert_metrics(&pool, 0, 1, 1, 2);
let obj3 = take(&pool);
assert_metrics(&pool, 0, 1, 2, 2);
let obj3_ptr = &*obj3 as *const _;
assert!(addr_eq(obj1_ptr, obj3_ptr));
let obj4 = Box::new(0);
let obj4_ptr = &*obj4 as *const _;
pool.put(obj4);
pool.put(obj3);
assert_metrics(&pool, 0, 1, 2, 4);
let obj5 = take(&pool);
assert_metrics(&pool, 0, 1, 3, 4);
let obj5_ptr = &*obj5 as *const _;
assert!(!addr_eq(obj5_ptr, obj1_ptr));
assert!(addr_eq(obj5_ptr, obj4_ptr));
}
#[test]
fn pool_drops_past_max_size() {
const N: usize = 3;
let pool = P::new(N);
let pages = iter::repeat_with(|| take(&pool)).take(N + 1).collect::<Vec<_>>();
assert_metrics(&pool, 0, N + 1, 0, 0);
pool.put_many(pages.into_iter());
assert_metrics(&pool, 1, N + 1, 0, N);
assert_eq!(pool.inner.objects.len(), N);
}
}