use parking_lot::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{MemoryPool, PoolIndex, DEFAULT_CHUNK_SIZE};
pub struct ConcurrentMemoryPool<T> {
shards: Vec<Mutex<MemoryPool<T>>>,
num_shards: usize,
next_shard: AtomicUsize,
}
impl<T> ConcurrentMemoryPool<T> {
#[must_use]
pub fn new(num_shards: usize, chunk_size: usize) -> Self {
let num_shards = num_shards.max(1);
let shards = (0..num_shards)
.map(|_| Mutex::new(MemoryPool::new(chunk_size)))
.collect();
Self {
shards,
num_shards,
next_shard: AtomicUsize::new(0),
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(4, DEFAULT_CHUNK_SIZE)
}
pub fn allocate(&self) -> ConcurrentPoolHandle {
let shard_idx = self.next_shard.fetch_add(1, Ordering::Relaxed) % self.num_shards;
let index = self.shards[shard_idx].lock().allocate();
ConcurrentPoolHandle {
shard: shard_idx,
index,
}
}
pub fn store(&self, handle: ConcurrentPoolHandle, value: T) {
self.shards[handle.shard].lock().store(handle.index, value);
}
pub fn with_value<R>(
&self,
handle: ConcurrentPoolHandle,
f: impl FnOnce(&T) -> R,
) -> Option<R> {
let guard = self.shards[handle.shard].lock();
guard.get(handle.index).map(f)
}
pub fn deallocate(&self, handle: ConcurrentPoolHandle) {
self.shards[handle.shard].lock().deallocate(handle.index);
}
#[must_use]
pub fn allocated_count(&self) -> usize {
self.shards.iter().map(|s| s.lock().allocated_count()).sum()
}
#[must_use]
pub fn capacity(&self) -> usize {
self.shards.iter().map(|s| s.lock().capacity()).sum()
}
}
impl<T> Default for ConcurrentMemoryPool<T> {
fn default() -> Self {
Self::with_defaults()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ConcurrentPoolHandle {
shard: usize,
index: PoolIndex,
}
impl ConcurrentPoolHandle {
#[must_use]
pub fn shard(&self) -> usize {
self.shard
}
#[must_use]
pub fn index(&self) -> PoolIndex {
self.index
}
}
#[allow(dead_code)]
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<ConcurrentMemoryPool<u64>>();
};