1use std::hash::Hash;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use crate::MapletResult;
9
10#[derive(Debug)]
12pub struct ConcurrentMaplet<K, V, Op>
13where
14 K: Hash + Eq + Clone + Send + Sync + std::fmt::Debug,
15 V: Clone + Send + Sync + std::fmt::Debug,
16 Op: crate::operators::MergeOperator<V> + Send + Sync,
17{
18 inner: Arc<RwLock<crate::maplet::Maplet<K, V, Op>>>,
20}
21
22impl<K, V, Op> ConcurrentMaplet<K, V, Op>
23where
24 K: Hash + Eq + Clone + Send + Sync + std::fmt::Debug,
25 V: Clone + PartialEq + Send + Sync + std::fmt::Debug,
26 Op: crate::operators::MergeOperator<V> + Default + Send + Sync,
27{
28 pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
30 let maplet = crate::maplet::Maplet::<K, V, Op>::new(capacity, false_positive_rate)?;
31 Ok(Self {
32 inner: Arc::new(RwLock::new(maplet)),
33 })
34 }
35
36 pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
38 let maplet = self.inner.read().await;
39 maplet.insert(key, value).await
40 }
41
42 pub async fn query(&self, key: &K) -> Option<V> {
44 let maplet = self.inner.read().await;
45 maplet.query(key).await
46 }
47
48 pub async fn contains(&self, key: &K) -> bool {
50 let maplet = self.inner.read().await;
51 maplet.contains(key).await
52 }
53
54 pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
56 let maplet = self.inner.read().await;
57 maplet.delete(key, value).await
58 }
59
60 pub async fn len(&self) -> usize {
62 let maplet = self.inner.read().await;
63 maplet.len().await
64 }
65
66 pub async fn is_empty(&self) -> bool {
68 let maplet = self.inner.read().await;
69 maplet.is_empty().await
70 }
71
72 pub async fn stats(&self) -> crate::MapletStats {
74 let maplet = self.inner.read().await;
75 maplet.stats().await
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use crate::operators::CounterOperator;
83 use std::sync::Arc;
84
85 #[tokio::test]
86 async fn test_concurrent_maplet() {
87 let maplet = ConcurrentMaplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
88
89 assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
91 assert_eq!(maplet.query(&"key1".to_string()).await, Some(5));
92 assert!(maplet.contains(&"key1".to_string()).await);
93 assert_eq!(maplet.len().await, 1);
94 }
95
96 #[tokio::test]
97 async fn test_concurrent_access() {
98 let maplet = Arc::new(ConcurrentMaplet::<String, u64, CounterOperator>::new(1000, 0.01).unwrap());
99 let mut handles = vec![];
100
101 for i in 0..4 {
103 let maplet = Arc::clone(&maplet);
104 let handle = tokio::spawn(async move {
105 for j in 0..100 {
106 let key = format!("key_{}_{}", i, j);
107 let _ = maplet.insert(key, (i * 100 + j) as u64).await;
108 }
109 });
110 handles.push(handle);
111 }
112
113 for handle in handles {
115 handle.await.unwrap();
116 }
117
118 assert!(maplet.len().await > 0);
120 }
121}