#![allow(dead_code)]
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash, RandomState};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use quick_cache::sync::{Cache, DefaultLifecycle};
use quick_cache::UnitWeighter;
#[allow(clippy::type_complexity)]
pub struct ConcurrentLimitedMultimap<K, V, S = RandomState> {
inner: Cache<(Arc<K>, usize), Arc<V>, UnitWeighter, S>,
keys: RwLock<HashMap<Arc<K>, Arc<(AtomicUsize, AtomicUsize)>>>,
}
impl<K, V> ConcurrentLimitedMultimap<K, V>
where
K: Eq + Hash,
{
pub fn new(max_capacity: usize) -> Self {
Self {
inner: Cache::with(
max_capacity,
max_capacity as u64,
UnitWeighter,
RandomState::new(),
DefaultLifecycle::default(),
),
keys: RwLock::new(HashMap::new()),
}
}
pub fn unbounded() -> Self {
Self {
inner: Cache::with(
usize::MAX,
u64::MAX,
UnitWeighter,
RandomState::new(),
DefaultLifecycle::default(),
),
keys: RwLock::new(HashMap::new()),
}
}
}
impl<K, V, S> ConcurrentLimitedMultimap<K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
pub fn with_hasher(max_capacity: usize, hasher: S) -> Self {
Self {
inner: Cache::with(
max_capacity,
max_capacity as u64,
UnitWeighter,
hasher,
DefaultLifecycle::default(),
),
keys: RwLock::new(HashMap::new()),
}
}
pub fn with_hasher_unbounded(hasher: S) -> Self {
Self {
inner: Cache::with(usize::MAX, u64::MAX, UnitWeighter, hasher, DefaultLifecycle::default()),
keys: RwLock::new(HashMap::new()),
}
}
pub fn insert(&self, key: Arc<K>, value: V) {
let keys_read = self.keys.read().expect("inner read-write lock poisoned");
let (_, write_index) = if let Some(indexes) = keys_read.get(&key) {
&*indexes.clone()
} else {
drop(keys_read);
let indexes = Arc::new((AtomicUsize::new(0), AtomicUsize::new(0)));
self
.keys
.write()
.expect("inner read-write lock poisoned")
.insert(key.clone(), indexes.clone());
&*indexes.clone()
};
self
.inner
.insert((key, write_index.fetch_add(1, Ordering::Relaxed)), Arc::new(value));
}
pub fn remove(&self, key: Arc<K>) -> Option<V> {
let (read_index, write_index) = &*self
.keys
.read()
.expect("inner read-write lock poisoned")
.get(&key)?
.clone();
loop {
let write_index = write_index.load(Ordering::Relaxed);
let read_index = read_index
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |x| {
if x == write_index {
None
} else {
Some(x.checked_add(1).unwrap_or(0))
}
})
.ok()?;
if let Some((_, value)) = self.inner.remove(&(key.clone(), read_index)) {
return Some(Arc::into_inner(value).expect("can't obtain value"));
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::hash_map::RandomState;
#[test]
fn test_insert_and_get() {
let map = ConcurrentLimitedMultimap::new(3);
map.insert(Arc::new("a"), 1);
map.insert(Arc::new("b"), 2);
map.insert(Arc::new("c"), 3);
assert_eq!(map.remove(Arc::new("a")), Some(1));
assert_eq!(map.remove(Arc::new("b")), Some(2));
assert_eq!(map.remove(Arc::new("c")), Some(3));
assert_eq!(map.remove(Arc::new("d")), None);
}
#[test]
fn test_eviction() {
let map = ConcurrentLimitedMultimap::new(3);
map.insert(Arc::new("a"), 1);
map.insert(Arc::new("b"), 2);
map.insert(Arc::new("c"), 3);
map.insert(Arc::new("d"), 4);
assert_eq!(map.inner.len(), 3);
}
#[test]
fn test_remove() {
let map = ConcurrentLimitedMultimap::new(3);
map.insert(Arc::new("a"), 1);
map.insert(Arc::new("b"), 2);
map.insert(Arc::new("c"), 3);
assert_eq!(map.remove(Arc::new("b")), Some(2));
assert_eq!(map.remove(Arc::new("b")), None);
assert_eq!(map.remove(Arc::new("a")), Some(1));
assert_eq!(map.remove(Arc::new("c")), Some(3));
}
#[test]
fn test_with_hasher() {
let hasher = RandomState::new();
let map = ConcurrentLimitedMultimap::with_hasher(3, hasher);
map.insert(Arc::new("a"), 1);
map.insert(Arc::new("b"), 2);
map.insert(Arc::new("c"), 3);
assert_eq!(map.remove(Arc::new("a")), Some(1));
assert_eq!(map.remove(Arc::new("b")), Some(2));
assert_eq!(map.remove(Arc::new("c")), Some(3));
}
#[test]
fn test_insert_same_key_multiple_times() {
let map = ConcurrentLimitedMultimap::new(3);
map.insert(Arc::new("a"), 1);
map.insert(Arc::new("a"), 2);
assert_eq!(map.remove(Arc::new("a")), Some(1));
}
#[test]
fn test_remove_then_insert() {
let map = ConcurrentLimitedMultimap::new(3);
map.remove(Arc::new("a"));
map.insert(Arc::new("a"), 1);
assert_eq!(map.remove(Arc::new("a")), Some(1));
}
}