use std::hash::Hash;
#[cfg(feature = "concurrency")]
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(feature = "concurrency")]
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
#[cfg(feature = "concurrency")]
use crate::store::traits::{ConcurrentStore, ConcurrentStoreFactory, ConcurrentStoreRead};
use crate::store::traits::{StoreCore, StoreFactory, StoreFull, StoreMetrics, StoreMut};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[must_use]
pub struct EntryId(usize);
impl EntryId {
#[must_use]
pub fn index(&self) -> usize {
self.0
}
}
#[derive(Debug)]
struct Entry<K, V> {
key: K,
value: V,
}
#[derive(Debug, Default)]
struct StoreCounters {
hits: AtomicU64,
misses: AtomicU64,
inserts: AtomicU64,
updates: AtomicU64,
removes: AtomicU64,
evictions: AtomicU64,
}
impl StoreCounters {
fn snapshot(&self) -> StoreMetrics {
StoreMetrics {
hits: self.hits.load(Ordering::Relaxed),
misses: self.misses.load(Ordering::Relaxed),
inserts: self.inserts.load(Ordering::Relaxed),
updates: self.updates.load(Ordering::Relaxed),
removes: self.removes.load(Ordering::Relaxed),
evictions: self.evictions.load(Ordering::Relaxed),
}
}
fn inc_hit(&self) {
self.hits.fetch_add(1, Ordering::Relaxed);
}
fn inc_miss(&self) {
self.misses.fetch_add(1, Ordering::Relaxed);
}
fn inc_insert(&self) {
self.inserts.fetch_add(1, Ordering::Relaxed);
}
fn inc_update(&self) {
self.updates.fetch_add(1, Ordering::Relaxed);
}
fn inc_remove(&self) {
self.removes.fetch_add(1, Ordering::Relaxed);
}
fn inc_eviction(&self) {
self.evictions.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Debug)]
#[must_use]
pub struct SlabStore<K, V> {
entries: Vec<Option<Entry<K, V>>>,
free_list: Vec<usize>,
index: FxHashMap<K, EntryId>,
capacity: usize,
metrics: StoreCounters,
}
impl<K, V> SlabStore<K, V>
where
K: Eq + Hash,
{
pub fn new(capacity: usize) -> Self {
Self {
entries: Vec::with_capacity(capacity),
free_list: Vec::new(),
index: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
capacity,
metrics: StoreCounters::default(),
}
}
pub fn entry_id(&self, key: &K) -> Option<EntryId> {
self.index.get(key).copied()
}
pub fn get_by_id(&self, id: EntryId) -> Option<&V> {
self.entries
.get(id.0)
.and_then(|slot| slot.as_ref().map(|entry| &entry.value))
}
pub fn get_by_id_mut(&mut self, id: EntryId) -> Option<&mut V> {
self.entries
.get_mut(id.0)
.and_then(|slot| slot.as_mut().map(|entry| &mut entry.value))
}
pub fn key_by_id(&self, id: EntryId) -> Option<&K> {
self.entries
.get(id.0)
.and_then(|slot| slot.as_ref().map(|entry| &entry.key))
}
pub fn peek(&self, key: &K) -> Option<&V> {
self.index.get(key).and_then(|id| self.get_by_id(*id))
}
fn allocate_slot(&mut self) -> usize {
if let Some(idx) = self.free_list.pop() {
idx
} else {
self.entries.push(None);
self.entries.len() - 1
}
}
pub fn record_eviction(&self) {
self.metrics.inc_eviction();
}
pub fn remove_by_id(&mut self, id: EntryId) -> Option<(K, V)> {
let entry = self.entries.get_mut(id.0)?.take()?;
self.index.remove(&entry.key);
self.free_list.push(id.0);
self.metrics.inc_remove();
Some((entry.key, entry.value))
}
}
impl<K, V> StoreCore<K, V> for SlabStore<K, V>
where
K: Eq + Hash,
{
fn get(&self, key: &K) -> Option<&V> {
match self.index.get(key).and_then(|id| self.get_by_id(*id)) {
Some(value) => {
self.metrics.inc_hit();
Some(value)
},
None => {
self.metrics.inc_miss();
None
},
}
}
fn contains(&self, key: &K) -> bool {
self.index.contains_key(key)
}
fn len(&self) -> usize {
self.index.len()
}
fn capacity(&self) -> usize {
self.capacity
}
fn metrics(&self) -> StoreMetrics {
self.metrics.snapshot()
}
}
impl<K, V> StoreMut<K, V> for SlabStore<K, V>
where
K: Eq + Hash + Clone,
{
fn try_insert(&mut self, key: K, value: V) -> Result<Option<V>, StoreFull> {
if let Some(id) = self.index.get(&key).copied() {
let entry = self.entries[id.0].as_mut().expect("slab entry missing");
let previous = std::mem::replace(&mut entry.value, value);
self.metrics.inc_update();
return Ok(Some(previous));
}
if self.index.len() >= self.capacity {
return Err(StoreFull);
}
let idx = self.allocate_slot();
self.entries[idx] = Some(Entry {
key: key.clone(),
value,
});
self.index.insert(key, EntryId(idx));
self.metrics.inc_insert();
Ok(None)
}
fn remove(&mut self, key: &K) -> Option<V> {
let id = self.index.remove(key)?;
let entry = self.entries[id.0].take()?;
self.free_list.push(id.0);
self.metrics.inc_remove();
Some(entry.value)
}
fn clear(&mut self) {
self.entries.clear();
self.free_list.clear();
self.index.clear();
}
}
impl<K, V> StoreFactory<K, V> for SlabStore<K, V>
where
K: Eq + Hash + Clone,
{
type Store = SlabStore<K, V>;
fn create(capacity: usize) -> Self::Store {
Self::new(capacity)
}
}
#[cfg(feature = "concurrency")]
#[derive(Debug)]
struct SlabInner<K, V> {
entries: Vec<Option<Entry<K, Arc<V>>>>,
free_list: Vec<usize>,
index: FxHashMap<K, EntryId>,
capacity: usize,
}
#[cfg(feature = "concurrency")]
impl<K: Eq + Hash, V> SlabInner<K, V> {
fn allocate_slot(&mut self) -> usize {
if let Some(idx) = self.free_list.pop() {
idx
} else {
self.entries.push(None);
self.entries.len() - 1
}
}
}
#[cfg(feature = "concurrency")]
#[derive(Debug)]
#[must_use]
#[allow(clippy::type_complexity)]
pub struct ConcurrentSlabStore<K, V> {
inner: RwLock<SlabInner<K, V>>,
metrics: StoreCounters,
}
#[cfg(feature = "concurrency")]
impl<K, V> ConcurrentSlabStore<K, V>
where
K: Eq + Hash,
{
pub fn new(capacity: usize) -> Self {
Self {
inner: RwLock::new(SlabInner {
entries: Vec::with_capacity(capacity),
free_list: Vec::new(),
index: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
capacity,
}),
metrics: StoreCounters::default(),
}
}
pub fn entry_id(&self, key: &K) -> Option<EntryId> {
self.inner.read().index.get(key).copied()
}
pub fn get_by_id(&self, id: EntryId) -> Option<Arc<V>> {
let inner = self.inner.read();
inner
.entries
.get(id.0)
.and_then(|slot| slot.as_ref().map(|entry| Arc::clone(&entry.value)))
}
pub fn key_by_id(&self, id: EntryId) -> Option<K>
where
K: Clone,
{
let inner = self.inner.read();
inner
.entries
.get(id.0)
.and_then(|slot| slot.as_ref().map(|entry| entry.key.clone()))
}
pub fn peek(&self, key: &K) -> Option<Arc<V>> {
let inner = self.inner.read();
let id = inner.index.get(key)?;
inner
.entries
.get(id.0)
.and_then(|slot| slot.as_ref().map(|entry| Arc::clone(&entry.value)))
}
pub fn remove_by_id(&self, id: EntryId) -> Option<(K, Arc<V>)> {
let mut inner = self.inner.write();
let entry = inner.entries.get_mut(id.0)?.take()?;
inner.index.remove(&entry.key);
inner.free_list.push(id.0);
self.metrics.inc_remove();
Some((entry.key, entry.value))
}
pub fn record_eviction(&self) {
self.metrics.inc_eviction();
}
}
#[cfg(feature = "concurrency")]
impl<K, V> ConcurrentStoreRead<K, V> for ConcurrentSlabStore<K, V>
where
K: Eq + Hash + Send + Sync,
V: Send + Sync,
{
fn get(&self, key: &K) -> Option<Arc<V>> {
let inner = self.inner.read();
let id = inner.index.get(key)?;
match inner.entries.get(id.0).and_then(|slot| slot.as_ref()) {
Some(entry) => {
self.metrics.inc_hit();
Some(Arc::clone(&entry.value))
},
None => {
self.metrics.inc_miss();
None
},
}
}
fn contains(&self, key: &K) -> bool {
self.inner.read().index.contains_key(key)
}
fn len(&self) -> usize {
self.inner.read().index.len()
}
fn capacity(&self) -> usize {
self.inner.read().capacity
}
fn metrics(&self) -> StoreMetrics {
self.metrics.snapshot()
}
}
#[cfg(feature = "concurrency")]
impl<K, V> ConcurrentStore<K, V> for ConcurrentSlabStore<K, V>
where
K: Eq + Hash + Send + Sync + Clone,
V: Send + Sync,
{
fn try_insert(&self, key: K, value: Arc<V>) -> Result<Option<Arc<V>>, StoreFull> {
let mut inner = self.inner.write();
if let Some(&id) = inner.index.get(&key) {
if let Some(slot) = inner.entries.get_mut(id.0) {
if let Some(entry) = slot.as_mut() {
let previous = std::mem::replace(&mut entry.value, value);
self.metrics.inc_update();
return Ok(Some(previous));
}
}
}
if inner.index.len() >= inner.capacity {
return Err(StoreFull);
}
let idx = inner.allocate_slot();
inner.entries[idx] = Some(Entry {
key: key.clone(),
value,
});
inner.index.insert(key, EntryId(idx));
self.metrics.inc_insert();
Ok(None)
}
fn remove(&self, key: &K) -> Option<Arc<V>> {
let mut inner = self.inner.write();
let id = inner.index.remove(key)?;
let entry = inner.entries.get_mut(id.0)?.take()?;
inner.free_list.push(id.0);
self.metrics.inc_remove();
Some(entry.value)
}
fn clear(&self) {
let mut inner = self.inner.write();
inner.entries.clear();
inner.free_list.clear();
inner.index.clear();
}
}
#[cfg(feature = "concurrency")]
impl<K, V> ConcurrentStoreFactory<K, V> for ConcurrentSlabStore<K, V>
where
K: Eq + Hash + Send + Sync + Clone,
V: Send + Sync,
{
type Store = ConcurrentSlabStore<K, V>;
fn create(capacity: usize) -> Self::Store {
Self::new(capacity)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn slab_store_basic_ops() {
let mut store = SlabStore::new(2);
assert_eq!(store.try_insert("k1", "v1".to_string()), Ok(None));
assert_eq!(store.get(&"k1"), Some(&"v1".to_string()));
assert!(store.contains(&"k1"));
assert_eq!(store.len(), 1);
assert_eq!(store.capacity(), 2);
assert_eq!(store.remove(&"k1"), Some("v1".to_string()));
assert!(!store.contains(&"k1"));
}
#[test]
fn slab_store_returns_reference() {
let mut store = SlabStore::new(2);
store.try_insert("k1", "hello".to_string()).unwrap();
let value: &String = store.get(&"k1").unwrap();
assert_eq!(value, "hello");
let id = store.entry_id(&"k1").unwrap();
let by_id: &String = store.get_by_id(id).unwrap();
assert_eq!(by_id, "hello");
}
#[test]
fn slab_store_capacity_enforced() {
let mut store = SlabStore::new(1);
assert_eq!(store.try_insert("k1", "v1".to_string()), Ok(None));
assert_eq!(store.try_insert("k2", "v2".to_string()), Err(StoreFull));
assert_eq!(store.len(), 1);
}
#[test]
fn slab_store_entry_id_indirection() {
let mut store = SlabStore::new(2);
assert_eq!(store.try_insert("k1", "v1".to_string()), Ok(None));
let id = store.entry_id(&"k1").expect("missing entry id");
assert_eq!(store.get_by_id(id), Some(&"v1".to_string()));
}
#[test]
fn slab_store_key_by_id() {
let mut store = SlabStore::new(1);
assert_eq!(store.try_insert("k1", "v1".to_string()), Ok(None));
let id = store.entry_id(&"k1").expect("missing entry id");
assert_eq!(store.key_by_id(id), Some(&"k1"));
}
#[test]
fn slab_store_remove_by_id() {
let mut store = SlabStore::new(2);
store.try_insert("k1", "v1".to_string()).unwrap();
let id = store.entry_id(&"k1").unwrap();
let (key, value) = store.remove_by_id(id).unwrap();
assert_eq!(key, "k1");
assert_eq!(value, "v1".to_string());
assert!(!store.contains(&"k1"));
}
#[test]
fn slab_store_slot_reuse() {
let mut store = SlabStore::new(2);
store.try_insert("k1", "v1".to_string()).unwrap();
let id1 = store.entry_id(&"k1").unwrap();
store.remove(&"k1");
store.try_insert("k2", "v2".to_string()).unwrap();
let id2 = store.entry_id(&"k2").unwrap();
assert_eq!(id1.index(), id2.index());
}
#[cfg(feature = "concurrency")]
#[test]
fn concurrent_slab_store_basic_ops() {
let store = ConcurrentSlabStore::new(2);
let value = Arc::new("v1".to_string());
assert_eq!(store.try_insert("k1", value.clone()), Ok(None));
assert_eq!(store.get(&"k1"), Some(value.clone()));
assert!(store.contains(&"k1"));
assert_eq!(store.len(), 1);
assert_eq!(store.capacity(), 2);
assert_eq!(store.remove(&"k1"), Some(value));
assert!(!store.contains(&"k1"));
}
#[cfg(feature = "concurrency")]
#[test]
fn concurrent_slab_store_entry_id_roundtrip() {
let store = ConcurrentSlabStore::new(2);
assert_eq!(store.try_insert("k1", Arc::new("v1".to_string())), Ok(None));
let id = store.entry_id(&"k1").expect("missing entry id");
assert_eq!(store.get_by_id(id), Some(Arc::new("v1".to_string())));
assert_eq!(store.key_by_id(id), Some("k1"));
}
#[cfg(feature = "concurrency")]
#[test]
fn concurrent_slab_store_remove_by_id() {
let store = ConcurrentSlabStore::new(2);
store.try_insert("k1", Arc::new("v1".to_string())).unwrap();
let id = store.entry_id(&"k1").unwrap();
let (key, value) = store.remove_by_id(id).unwrap();
assert_eq!(key, "k1");
assert_eq!(*value, "v1".to_string());
assert!(!store.contains(&"k1"));
}
#[cfg(feature = "concurrency")]
#[test]
fn concurrent_slab_store_peek() {
let store = ConcurrentSlabStore::new(2);
store.try_insert("k1", Arc::new("v1".to_string())).unwrap();
assert_eq!(store.peek(&"k1"), Some(Arc::new("v1".to_string())));
assert_eq!(store.metrics().hits, 0);
assert!(store.peek(&"missing").is_none());
assert_eq!(store.metrics().misses, 0);
}
}