use dashmap::DashMap;
use std::{hash::Hash, sync::Arc};
const MAX_PREALLOC_SIZE: usize = 1024;
#[derive(Debug)]
pub struct InMemoryStore<K, V>
where
K: Eq + Hash + Clone + Send + Sync,
V: Clone + Send + Sync,
{
data: Arc<DashMap<K, V>>,
}
impl<K, V> InMemoryStore<K, V>
where
K: Eq + Hash + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new() -> Self {
Self {
data: Arc::new(DashMap::new()),
}
}
pub fn count(&self) -> usize {
self.data.len()
}
pub fn clear(&self) {
self.data.clear();
}
pub fn all_keys(&self) -> Vec<K> {
self.data.iter().map(|entry| entry.key().clone()).collect()
}
pub fn get(&self, key: &K) -> Option<V> {
self.data.get(key).map(|entry| entry.value().clone())
}
pub fn insert(&self, key: K, value: V) -> Option<V> {
self.data.insert(key, value)
}
pub fn remove(&self, key: &K) -> Option<V> {
self.data.remove(key).map(|(_k, v)| v)
}
pub fn filter<P>(&self, predicate: P) -> Vec<V>
where
P: Fn(&V) -> bool,
{
self.data
.iter()
.filter(|entry| predicate(entry.value()))
.map(|entry| entry.value().clone())
.collect()
}
pub fn filter_limited<P>(
&self,
predicate: P,
result_limit: usize,
scan_limit: usize,
) -> (Vec<V>, bool)
where
P: Fn(&V) -> bool,
{
let mut results = Vec::with_capacity(result_limit.min(MAX_PREALLOC_SIZE));
let mut limit_reached = false;
for (scanned, entry) in self.data.iter().enumerate() {
if scanned >= scan_limit {
limit_reached = true;
break;
}
if predicate(entry.value()) {
results.push(entry.value().clone());
if results.len() >= result_limit {
limit_reached = true;
break;
}
}
}
(results, limit_reached)
}
pub fn contains_key(&self, key: &K) -> bool {
self.data.contains_key(key)
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn update_with<F, R>(&self, key: &K, f: F) -> Option<R>
where
F: FnOnce(&mut V) -> R,
{
self.data.get_mut(key).map(|mut entry| f(entry.value_mut()))
}
pub fn iter(&self) -> impl Iterator<Item = dashmap::mapref::multiple::RefMulti<'_, K, V>> {
self.data.iter()
}
}
impl<K, V> Default for InMemoryStore<K, V>
where
K: Eq + Hash + Clone + Send + Sync,
V: Clone + Send + Sync,
{
fn default() -> Self {
Self::new()
}
}
impl<K, V> Clone for InMemoryStore<K, V>
where
K: Eq + Hash + Clone + Send + Sync,
V: Clone + Send + Sync,
{
fn clone(&self) -> Self {
Self {
data: Arc::clone(&self.data),
}
}
}
use crate::domain::{
aggregates::StreamSession,
entities::Stream,
value_objects::{SessionId, StreamId},
};
pub type SessionStore = InMemoryStore<SessionId, StreamSession>;
pub type StreamStore = InMemoryStore<StreamId, Stream>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_operations() {
let store: InMemoryStore<String, i32> = InMemoryStore::new();
assert!(store.is_empty());
assert_eq!(store.count(), 0);
store.insert("a".to_string(), 1);
store.insert("b".to_string(), 2);
assert_eq!(store.count(), 2);
assert_eq!(store.get(&"a".to_string()), Some(1));
assert_eq!(store.get(&"c".to_string()), None);
let keys = store.all_keys();
assert_eq!(keys.len(), 2);
store.remove(&"a".to_string());
assert_eq!(store.count(), 1);
store.clear();
assert!(store.is_empty());
}
#[test]
fn test_filter() {
let store: InMemoryStore<String, i32> = InMemoryStore::new();
store.insert("a".to_string(), 1);
store.insert("b".to_string(), 2);
store.insert("c".to_string(), 3);
let evens = store.filter(|v| v % 2 == 0);
assert_eq!(evens, vec![2]);
}
#[test]
fn test_filter_limited_returns_at_most_limit_items() {
let store: InMemoryStore<i32, i32> = InMemoryStore::new();
for i in 0..100 {
store.insert(i, i);
}
let (results, limit_reached) = store.filter_limited(|_| true, 10, 1000);
assert_eq!(results.len(), 10);
assert!(limit_reached);
}
#[test]
fn test_filter_limited_sets_limit_reached_when_scan_exceeded() {
let store: InMemoryStore<i32, i32> = InMemoryStore::new();
for i in 0..100 {
store.insert(i, i);
}
let (results, limit_reached) = store.filter_limited(|v| *v > 1000, 100, 50);
assert!(results.is_empty());
assert!(limit_reached);
}
#[test]
fn test_filter_limited_sets_limit_reached_when_results_exceeded() {
let store: InMemoryStore<i32, i32> = InMemoryStore::new();
for i in 0..100 {
store.insert(i, i);
}
let (results, limit_reached) = store.filter_limited(|_| true, 5, 1000);
assert_eq!(results.len(), 5);
assert!(limit_reached);
}
#[test]
fn test_filter_limited_empty_store() {
let store: InMemoryStore<i32, i32> = InMemoryStore::new();
let (results, limit_reached) = store.filter_limited(|_| true, 10, 100);
assert!(results.is_empty());
assert!(!limit_reached);
}
#[test]
fn test_filter_limited_no_matches() {
let store: InMemoryStore<i32, i32> = InMemoryStore::new();
for i in 0..10 {
store.insert(i, i);
}
let (results, limit_reached) = store.filter_limited(|v| *v > 100, 10, 100);
assert!(results.is_empty());
assert!(!limit_reached);
}
#[test]
fn test_filter_limited_partial_match_within_limits() {
let store: InMemoryStore<i32, i32> = InMemoryStore::new();
for i in 0..10 {
store.insert(i, i);
}
let (results, limit_reached) = store.filter_limited(|v| v % 2 == 0, 100, 100);
assert_eq!(results.len(), 5);
assert!(!limit_reached);
}
#[test]
fn test_clone_shares_data() {
let store1: InMemoryStore<String, i32> = InMemoryStore::new();
store1.insert("key".to_string(), 42);
let store2 = store1.clone();
assert_eq!(store2.get(&"key".to_string()), Some(42));
store2.insert("another".to_string(), 100);
assert_eq!(store1.get(&"another".to_string()), Some(100));
}
#[test]
fn test_contains_key() {
let store: InMemoryStore<String, i32> = InMemoryStore::new();
assert!(!store.contains_key(&"key".to_string()));
store.insert("key".to_string(), 42);
assert!(store.contains_key(&"key".to_string()));
}
#[test]
fn test_concurrent_access() {
use std::thread;
let store: InMemoryStore<i32, String> = InMemoryStore::new();
let store_clone = store.clone();
let write_handle = thread::spawn(move || {
for i in 0..100 {
store_clone.insert(i, format!("thread1-{}", i));
}
});
for i in 100..200 {
store.insert(i, format!("thread2-{}", i));
}
write_handle.join().unwrap();
assert_eq!(store.count(), 200);
assert_eq!(store.get(&50), Some("thread1-50".to_string()));
assert_eq!(store.get(&150), Some("thread2-150".to_string()));
let read_store = store.clone();
let read_handle = thread::spawn(move || {
for i in 0..200 {
read_store.get(&i); }
});
for i in 0..200 {
store.get(&i);
}
read_handle.join().unwrap();
}
#[test]
fn test_iter() {
let store: InMemoryStore<i32, i32> = InMemoryStore::new();
store.insert(1, 10);
store.insert(2, 20);
store.insert(3, 30);
let mut count = 0;
for entry in store.iter() {
assert!(entry.value() == &10 || entry.value() == &20 || entry.value() == &30);
count += 1;
}
assert_eq!(count, 3);
}
#[test]
fn test_max_prealloc_size_limits_allocation() {
let store: InMemoryStore<i32, i32> = InMemoryStore::new();
store.insert(1, 1);
let (results, _) = store.filter_limited(|_| true, 1_000_000, 1_000_000);
assert_eq!(results.len(), 1);
}
}