mappy_core/
concurrent.rs

1//! Thread-safe operations for maplets
2//! 
3//! Implements concurrent access patterns for maplets.
4
5use std::hash::Hash;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use crate::MapletResult;
9
10/// Thread-safe maplet wrapper
11#[derive(Debug)]
12pub struct ConcurrentMaplet<K, V, Op> 
13where
14    K: Hash + Eq + Clone + Send + Sync + std::fmt::Debug,
15    V: Clone + Send + Sync + std::fmt::Debug,
16    Op: crate::operators::MergeOperator<V> + Send + Sync,
17{
18    /// Inner maplet protected by read-write lock
19    inner: Arc<RwLock<crate::maplet::Maplet<K, V, Op>>>,
20}
21
22impl<K, V, Op> ConcurrentMaplet<K, V, Op>
23where
24    K: Hash + Eq + Clone + Send + Sync + std::fmt::Debug,
25    V: Clone + PartialEq + Send + Sync + std::fmt::Debug,
26    Op: crate::operators::MergeOperator<V> + Default + Send + Sync,
27{
28    /// Create a new concurrent maplet
29    pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
30        let maplet = crate::maplet::Maplet::<K, V, Op>::new(capacity, false_positive_rate)?;
31        Ok(Self {
32            inner: Arc::new(RwLock::new(maplet)),
33        })
34    }
35    
36    /// Insert a key-value pair (write lock)
37    pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
38        let maplet = self.inner.read().await;
39        maplet.insert(key, value).await
40    }
41    
42    /// Query a key (read lock)
43    pub async fn query(&self, key: &K) -> Option<V> {
44        let maplet = self.inner.read().await;
45        maplet.query(key).await
46    }
47    
48    /// Check if key exists (read lock)
49    pub async fn contains(&self, key: &K) -> bool {
50        let maplet = self.inner.read().await;
51        maplet.contains(key).await
52    }
53    
54    /// Delete a key-value pair (write lock)
55    pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
56        let maplet = self.inner.read().await;
57        maplet.delete(key, value).await
58    }
59    
60    /// Get length (read lock)
61    pub async fn len(&self) -> usize {
62        let maplet = self.inner.read().await;
63        maplet.len().await
64    }
65    
66    /// Check if empty (read lock)
67    pub async fn is_empty(&self) -> bool {
68        let maplet = self.inner.read().await;
69        maplet.is_empty().await
70    }
71    
72    /// Get statistics (read lock)
73    pub async fn stats(&self) -> crate::MapletStats {
74        let maplet = self.inner.read().await;
75        maplet.stats().await
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use crate::operators::CounterOperator;
83    use std::sync::Arc;
84
85    #[tokio::test]
86    async fn test_concurrent_maplet() {
87        let maplet = ConcurrentMaplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
88        
89        // Test basic operations
90        assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
91        assert_eq!(maplet.query(&"key1".to_string()).await, Some(5));
92        assert!(maplet.contains(&"key1".to_string()).await);
93        assert_eq!(maplet.len().await, 1);
94    }
95
96    #[tokio::test]
97    async fn test_concurrent_access() {
98        let maplet = Arc::new(ConcurrentMaplet::<String, u64, CounterOperator>::new(1000, 0.01).unwrap());
99        let mut handles = vec![];
100        
101        // Spawn multiple tasks to insert data
102        for i in 0..4 {
103            let maplet = Arc::clone(&maplet);
104            let handle = tokio::spawn(async move {
105                for j in 0..100 {
106                    let key = format!("key_{}_{}", i, j);
107                    let _ = maplet.insert(key, (i * 100 + j) as u64).await;
108                }
109            });
110            handles.push(handle);
111        }
112        
113        // Wait for all tasks to complete
114        for handle in handles {
115            handle.await.unwrap();
116        }
117        
118        // Verify some data was inserted
119        assert!(maplet.len().await > 0);
120    }
121}