use crate::application::metrics::Metrics;
use crate::application::ports::{EvictionPolicy, Storage};
use dashmap::DashMap;
use std::hash::Hash;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug)]
struct StorageEntry<V> {
value: V,
last_access_nanos: AtomicU64,
}
impl<V> StorageEntry<V> {
fn new(value: V, now: Instant, epoch: Instant) -> Self {
let nanos = now
.saturating_duration_since(epoch)
.as_nanos()
.min(u64::MAX as u128) as u64;
Self {
value,
last_access_nanos: AtomicU64::new(nanos),
}
}
fn update_access(&self, now: Instant, epoch: Instant) {
let nanos = now
.saturating_duration_since(epoch)
.as_nanos()
.min(u64::MAX as u128) as u64;
self.last_access_nanos.store(nanos, Ordering::Release);
}
fn last_access(&self, epoch: Instant) -> Instant {
let nanos = self.last_access_nanos.load(Ordering::Acquire);
epoch
.checked_add(std::time::Duration::from_nanos(nanos))
.unwrap_or(epoch)
}
}
pub struct ShardedStorage<K, V>
where
K: Eq + Hash + Clone,
{
map: DashMap<K, StorageEntry<V>>,
epoch: Instant,
metrics: Option<Metrics>,
eviction_policy: Option<Arc<dyn EvictionPolicy<K, V>>>,
current_memory_bytes: AtomicUsize,
}
impl<K, V> std::fmt::Debug for ShardedStorage<K, V>
where
K: Eq + Hash + Clone + std::fmt::Debug,
V: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ShardedStorage")
.field("map", &format!("{} entries", self.map.len()))
.field(
"eviction_policy",
&self.eviction_policy.as_ref().map(|_| "<policy>"),
)
.field(
"current_memory_bytes",
&self.current_memory_bytes.load(Ordering::Relaxed),
)
.finish()
}
}
impl<K, V> ShardedStorage<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
pub fn new() -> Self {
Self {
map: DashMap::new(),
epoch: Instant::now(),
metrics: None,
eviction_policy: None,
current_memory_bytes: AtomicUsize::new(0),
}
}
pub fn with_metrics(mut self, metrics: Metrics) -> Self {
self.metrics = Some(metrics);
self
}
pub fn with_eviction_policy(mut self, policy: Arc<dyn EvictionPolicy<K, V>>) -> Self {
self.eviction_policy = Some(policy);
self
}
fn should_evict(&self) -> bool {
if let Some(policy) = &self.eviction_policy {
policy.should_evict(
self.len(),
self.current_memory_bytes.load(Ordering::Relaxed),
)
} else {
false
}
}
fn evict_one(&self) {
let policy = match &self.eviction_policy {
Some(p) => p,
None => return, };
const SAMPLE_SIZE: usize = 20;
let candidates: Vec<_> = self
.map
.iter()
.take(SAMPLE_SIZE)
.map(|entry| {
use crate::application::ports::EvictionCandidate;
EvictionCandidate {
key: entry.key().clone(),
value: entry.value().value.clone(),
last_access: entry.value().last_access(self.epoch),
}
})
.collect();
if let Some(key_to_evict) = policy.select_victim(&candidates) {
if let Some((_, entry)) = self.map.remove(&key_to_evict) {
if policy.tracks_memory() {
let size = policy.estimate_entry_size(&key_to_evict, &entry.value);
self.current_memory_bytes.fetch_sub(size, Ordering::Relaxed);
}
if let Some(ref metrics) = self.metrics {
metrics.record_eviction();
}
}
}
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
pub fn clear(&self) {
self.map.clear();
}
}
impl<K, V> Default for ShardedStorage<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
fn default() -> Self {
Self::new()
}
}
impl<K, V> Clone for ShardedStorage<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
fn clone(&self) -> Self {
let new_storage = Self {
map: DashMap::new(),
epoch: self.epoch,
metrics: self.metrics.clone(),
eviction_policy: self.eviction_policy.clone(),
current_memory_bytes: AtomicUsize::new(
self.current_memory_bytes.load(Ordering::Relaxed),
),
};
for entry in self.map.iter() {
let key = entry.key().clone();
let storage_entry = StorageEntry {
value: entry.value().value.clone(),
last_access_nanos: AtomicU64::new(
entry.value().last_access_nanos.load(Ordering::Relaxed),
),
};
new_storage.map.insert(key, storage_entry);
}
new_storage
}
}
impl<K, V> Storage<K, V> for ShardedStorage<K, V>
where
K: Hash + Eq + Clone + Send + Sync + std::fmt::Debug,
V: Send + Sync + std::fmt::Debug + Clone,
{
fn with_entry_mut<F, R>(&self, key: K, factory: impl FnOnce() -> V, accessor: F) -> R
where
F: FnOnce(&mut V) -> R,
{
let is_new_entry = !self.map.contains_key(&key);
if is_new_entry && self.should_evict() {
self.evict_one();
}
let now = Instant::now();
let epoch = self.epoch;
let entry = self.map.entry(key.clone());
let mut storage_entry = entry.or_insert_with(|| {
let value = factory();
if is_new_entry {
if let Some(policy) = &self.eviction_policy {
if policy.tracks_memory() {
let size = policy.estimate_entry_size(&key, &value);
self.current_memory_bytes.fetch_add(size, Ordering::Relaxed);
}
}
}
StorageEntry::new(value, now, epoch)
});
storage_entry.update_access(now, epoch);
accessor(&mut storage_entry.value)
}
fn len(&self) -> usize {
self.map.len()
}
fn is_empty(&self) -> bool {
self.map.is_empty()
}
fn clear(&self) {
self.map.clear()
}
fn for_each<F>(&self, mut f: F)
where
F: FnMut(&K, &V),
{
for entry in self.map.iter() {
f(entry.key(), &entry.value().value);
}
}
fn retain<F>(&self, mut f: F)
where
F: FnMut(&K, &mut V) -> bool,
{
self.map
.retain(|k, storage_entry| f(k, &mut storage_entry.value));
}
}
impl<K, V> Storage<K, V> for std::sync::Arc<ShardedStorage<K, V>>
where
K: Hash + Eq + Clone + Send + Sync + std::fmt::Debug,
V: Send + Sync + std::fmt::Debug + Clone,
{
fn with_entry_mut<F, R>(&self, key: K, factory: impl FnOnce() -> V, accessor: F) -> R
where
F: FnOnce(&mut V) -> R,
{
(**self).with_entry_mut(key, factory, accessor)
}
fn len(&self) -> usize {
(**self).len()
}
fn is_empty(&self) -> bool {
(**self).is_empty()
}
fn clear(&self) {
(**self).clear()
}
fn for_each<F>(&self, f: F)
where
F: FnMut(&K, &V),
{
(**self).for_each(f)
}
fn retain<F>(&self, f: F)
where
F: FnMut(&K, &mut V) -> bool,
{
(**self).retain(f)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::application::ports::Storage as StorageTrait;
#[test]
fn test_basic_operations() {
let storage = ShardedStorage::<String, i32>::new();
storage.with_entry_mut("key1".to_string(), || 100, |_v| {});
storage.with_entry_mut("key2".to_string(), || 200, |_v| {});
assert_eq!(storage.len(), 2);
assert!(!storage.is_empty());
}
#[test]
fn test_clear() {
let storage = ShardedStorage::<String, i32>::new();
storage.with_entry_mut("key1".to_string(), || 100, |_v| {});
storage.with_entry_mut("key2".to_string(), || 200, |_v| {});
assert_eq!(storage.len(), 2);
storage.clear();
assert_eq!(storage.len(), 0);
assert!(storage.is_empty());
}
#[test]
fn test_concurrent_access() {
use std::sync::Arc;
use std::thread;
let storage = Arc::new(ShardedStorage::<String, i32>::new());
let mut handles = vec![];
for i in 0..10 {
let storage_clone = Arc::clone(&storage);
let handle = thread::spawn(move || {
for j in 0..100 {
let key = format!("key_{}_{}", i, j);
let value = i * 100 + j;
storage_clone.with_entry_mut(key, || value, |_v| {});
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(storage.len(), 1000);
}
#[test]
fn test_lru_eviction() {
use crate::infrastructure::eviction::LruEviction;
let storage = ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(LruEviction::new(5)));
for i in 0..5 {
storage.with_entry_mut(format!("key{}", i), || i, |_v| {});
}
assert_eq!(storage.len(), 5);
storage.with_entry_mut("key5".to_string(), || 5, |_v| {});
assert_eq!(storage.len(), 5);
for i in 6..10 {
storage.with_entry_mut(format!("key{}", i), || i, |_v| {});
}
assert_eq!(storage.len(), 5);
}
#[test]
fn test_lru_access_order() {
use crate::infrastructure::eviction::LruEviction;
let storage = ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(LruEviction::new(3)));
storage.with_entry_mut("key0".to_string(), || 0, |_v| {});
storage.with_entry_mut("key1".to_string(), || 1, |_v| {});
storage.with_entry_mut("key2".to_string(), || 2, |_v| {});
std::thread::sleep(std::time::Duration::from_millis(10));
storage.with_entry_mut("key0".to_string(), || 0, |_v| {});
std::thread::sleep(std::time::Duration::from_millis(10));
storage.with_entry_mut("key3".to_string(), || 3, |_v| {});
let mut key0_exists = false;
storage.with_entry_mut(
"key0".to_string(),
|| 999,
|v| {
if *v == 0 {
key0_exists = true;
}
},
);
assert!(key0_exists, "key0 should not have been evicted");
assert_eq!(storage.len(), 3);
}
#[test]
fn test_for_each() {
let storage = ShardedStorage::<String, i32>::new();
storage.with_entry_mut("a".to_string(), || 1, |_v| {});
storage.with_entry_mut("b".to_string(), || 2, |_v| {});
storage.with_entry_mut("c".to_string(), || 3, |_v| {});
let mut sum = 0;
storage.for_each(|_k, v| {
sum += v;
});
assert_eq!(sum, 6);
}
#[test]
fn test_retain() {
let storage = ShardedStorage::<String, i32>::new();
storage.with_entry_mut("a".to_string(), || 1, |_v| {});
storage.with_entry_mut("b".to_string(), || 2, |_v| {});
storage.with_entry_mut("c".to_string(), || 3, |_v| {});
storage.retain(|_k, v| *v > 1);
assert_eq!(storage.len(), 2);
}
#[test]
fn test_memory_tracking_basic_insertion() {
use crate::infrastructure::eviction::MemoryEviction;
let storage = ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(MemoryEviction::new(1024)));
assert_eq!(storage.current_memory_bytes.load(Ordering::Relaxed), 0);
storage.with_entry_mut("key1".to_string(), || 100, |_v| {});
let memory_after_first = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(
memory_after_first > 0,
"Memory should be tracked after insertion"
);
storage.with_entry_mut("key2".to_string(), || 200, |_v| {});
let memory_after_second = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(
memory_after_second > memory_after_first,
"Memory should increase after second insertion"
);
}
#[test]
fn test_memory_tracking_with_eviction() {
use crate::infrastructure::eviction::MemoryEviction;
let storage = ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(MemoryEviction::new(100)));
for i in 0..10 {
storage.with_entry_mut(format!("key{}", i), || i, |_v| {});
}
let final_memory = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(
final_memory <= 500,
"Memory should stay reasonable after evictions, got {}",
final_memory
);
}
#[test]
fn test_memory_tracking_decreases_on_eviction() {
use crate::infrastructure::eviction::MemoryEviction;
let storage = ShardedStorage::<String, String>::new()
.with_eviction_policy(Arc::new(MemoryEviction::new(200)));
storage.with_entry_mut("key1".to_string(), || "value1".to_string(), |_v| {});
storage.with_entry_mut("key2".to_string(), || "value2".to_string(), |_v| {});
storage.with_entry_mut("key3".to_string(), || "value3".to_string(), |_v| {});
let memory_before_eviction = storage.current_memory_bytes.load(Ordering::Relaxed);
for i in 4..10 {
storage.with_entry_mut(format!("key{}", i), || format!("value{}", i), |_v| {});
}
let memory_after_eviction = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(
memory_after_eviction <= memory_before_eviction * 2,
"Memory should not grow unbounded with evictions"
);
}
#[test]
fn test_memory_tracking_without_policy() {
let storage = ShardedStorage::<String, i32>::new();
storage.with_entry_mut("key1".to_string(), || 100, |_v| {});
storage.with_entry_mut("key2".to_string(), || 200, |_v| {});
assert_eq!(storage.current_memory_bytes.load(Ordering::Relaxed), 0);
}
#[test]
fn test_memory_tracking_with_lru_policy() {
use crate::infrastructure::eviction::LruEviction;
let storage = ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(LruEviction::new(5)));
storage.with_entry_mut("key1".to_string(), || 100, |_v| {});
storage.with_entry_mut("key2".to_string(), || 200, |_v| {});
assert_eq!(storage.current_memory_bytes.load(Ordering::Relaxed), 0);
}
#[test]
fn test_concurrent_memory_tracking() {
use crate::infrastructure::eviction::MemoryEviction;
use std::thread;
let storage = Arc::new(
ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(MemoryEviction::new(10000))),
);
let mut handles = vec![];
for thread_id in 0..5 {
let storage_clone = Arc::clone(&storage);
let handle = thread::spawn(move || {
for i in 0..20 {
let key = format!("thread{}_key{}", thread_id, i);
storage_clone.with_entry_mut(key, || i, |_v| {});
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let final_memory = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(
final_memory > 0,
"Memory should be tracked after concurrent insertions"
);
assert!(!storage.is_empty(), "Should have some entries");
}
#[test]
fn test_memory_tracking_consistency_with_repeated_access() {
use crate::infrastructure::eviction::MemoryEviction;
let storage = ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(MemoryEviction::new(10000)));
storage.with_entry_mut("key1".to_string(), || 100, |_v| {});
let memory_after_insert = storage.current_memory_bytes.load(Ordering::Relaxed);
for _ in 0..10 {
storage.with_entry_mut("key1".to_string(), || 100, |_v| {});
}
let memory_after_accesses = storage.current_memory_bytes.load(Ordering::Relaxed);
assert_eq!(
memory_after_insert, memory_after_accesses,
"Memory should not change when accessing existing entries"
);
}
#[test]
fn test_memory_tracking_with_priority_memory_policy() {
use crate::infrastructure::eviction::PriorityWithMemoryEviction;
let storage = ShardedStorage::<String, i32>::new().with_eviction_policy(Arc::new(
PriorityWithMemoryEviction::new(100, Arc::new(|_k: &String, _v: &i32| 1), 200),
));
assert_eq!(storage.current_memory_bytes.load(Ordering::Relaxed), 0);
storage.with_entry_mut("key1".to_string(), || 100, |_v| {});
storage.with_entry_mut("key2".to_string(), || 200, |_v| {});
let memory = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(
memory > 0,
"Memory should be tracked with priority+memory policy"
);
}
#[test]
fn test_memory_tracking_large_values() {
use crate::infrastructure::eviction::MemoryEviction;
let storage = ShardedStorage::<String, Vec<u8>>::new()
.with_eviction_policy(Arc::new(MemoryEviction::new(50000)));
let large_value = vec![0u8; 1000]; storage.with_entry_mut("key1".to_string(), || large_value.clone(), |_v| {});
let memory_after_one = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(memory_after_one > 100, "Should track large value size");
storage.with_entry_mut("key2".to_string(), || large_value.clone(), |_v| {});
let memory_after_two = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(
memory_after_two > memory_after_one,
"Memory should increase with second large value"
);
assert!(
(100..5000).contains(&memory_after_two),
"Memory estimate should be reasonable for 2x1KB values, got {}",
memory_after_two
);
}
#[test]
fn test_memory_tracking_after_clear() {
use crate::infrastructure::eviction::MemoryEviction;
let storage = ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(MemoryEviction::new(10000)));
for i in 0..10 {
storage.with_entry_mut(format!("key{}", i), || i, |_v| {});
}
let memory_before_clear = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(memory_before_clear > 0);
storage.clear();
let memory_after_clear = storage.current_memory_bytes.load(Ordering::Relaxed);
assert_eq!(memory_before_clear, memory_after_clear);
}
#[test]
fn test_memory_overflow_resistance() {
use crate::infrastructure::eviction::MemoryEviction;
let storage = ShardedStorage::<String, i32>::new()
.with_eviction_policy(Arc::new(MemoryEviction::new(usize::MAX)));
for i in 0..1000 {
storage.with_entry_mut(format!("key{}", i), || i, |_v| {});
}
let memory = storage.current_memory_bytes.load(Ordering::Relaxed);
assert!(memory < usize::MAX, "Memory tracking should not overflow");
}
}