memscope_rs/core/
adaptive_hashmap.rs

1//! Adaptive HashMap that switches between simple and sharded modes based on contention
2//!
3//! This module provides an adaptive data structure that starts with a simple mutex-protected
4//! HashMap and upgrades to a sharded version when contention is detected.
5
6use crate::core::safe_operations::SafeLock;
7use crate::core::sharded_locks::ShardedRwLock;
8use std::collections::HashMap;
9use std::hash::Hash;
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11use std::sync::Mutex;
12
13/// Simplified adaptive HashMap that chooses mode at creation time
14/// This avoids runtime switching complexity while providing the benefits
15pub struct AdaptiveHashMap<K, V>
16where
17    K: Hash + Eq + Clone,
18    V: Clone,
19{
20    simple_map: Mutex<HashMap<K, V>>,
21    sharded_map: Option<ShardedRwLock<K, V>>,
22    access_counter: AtomicU64,
23    contention_counter: AtomicU64,
24    use_sharded: AtomicBool,
25}
26
27impl<K, V> AdaptiveHashMap<K, V>
28where
29    K: Hash + Eq + Clone,
30    V: Clone,
31{
32    /// Create new adaptive HashMap starting in simple mode
33    pub fn new() -> Self {
34        Self {
35            simple_map: Mutex::new(HashMap::new()),
36            sharded_map: None,
37            access_counter: AtomicU64::new(0),
38            contention_counter: AtomicU64::new(0),
39            use_sharded: AtomicBool::new(false),
40        }
41    }
42
43    /// Create with initial capacity
44    pub fn with_capacity(capacity: usize) -> Self {
45        Self {
46            simple_map: Mutex::new(HashMap::with_capacity(capacity)),
47            sharded_map: None,
48            access_counter: AtomicU64::new(0),
49            contention_counter: AtomicU64::new(0),
50            use_sharded: AtomicBool::new(false),
51        }
52    }
53
54    /// Create in sharded mode for high contention scenarios
55    pub fn new_sharded() -> Self {
56        Self {
57            simple_map: Mutex::new(HashMap::new()),
58            sharded_map: Some(ShardedRwLock::new()),
59            access_counter: AtomicU64::new(0),
60            contention_counter: AtomicU64::new(0),
61            use_sharded: AtomicBool::new(true),
62        }
63    }
64
65    /// Insert a key-value pair
66    pub fn insert(&self, key: K, value: V) -> Option<V> {
67        self.access_counter.fetch_add(1, Ordering::Relaxed);
68
69        if self.use_sharded.load(Ordering::Relaxed) {
70            if let Some(ref sharded) = self.sharded_map {
71                return sharded.insert(key, value);
72            }
73        }
74
75        // Use simple map
76        match self.simple_map.try_lock() {
77            Ok(mut map) => map.insert(key, value),
78            Err(_) => {
79                self.contention_counter.fetch_add(1, Ordering::Relaxed);
80                self.check_upgrade_to_sharded();
81                // Fall back to blocking lock
82                let mut map = self
83                    .simple_map
84                    .safe_lock()
85                    .expect("Failed to acquire lock on simple_map");
86                map.insert(key, value)
87            }
88        }
89    }
90
91    /// Get a value by key
92    pub fn get(&self, key: &K) -> Option<V> {
93        self.access_counter.fetch_add(1, Ordering::Relaxed);
94
95        if self.use_sharded.load(Ordering::Relaxed) {
96            if let Some(ref sharded) = self.sharded_map {
97                return sharded.get(key);
98            }
99        }
100
101        // Use simple map
102        match self.simple_map.try_lock() {
103            Ok(map) => map.get(key).cloned(),
104            Err(_) => {
105                self.contention_counter.fetch_add(1, Ordering::Relaxed);
106                self.check_upgrade_to_sharded();
107                // Fall back to blocking lock
108                let map = self
109                    .simple_map
110                    .safe_lock()
111                    .expect("Failed to acquire lock on simple_map");
112                map.get(key).cloned()
113            }
114        }
115    }
116
117    /// Remove a key-value pair
118    pub fn remove(&self, key: &K) -> Option<V> {
119        self.access_counter.fetch_add(1, Ordering::Relaxed);
120
121        if self.use_sharded.load(Ordering::Relaxed) {
122            if let Some(ref sharded) = self.sharded_map {
123                return sharded.remove(key);
124            }
125        }
126
127        // Use simple map
128        match self.simple_map.try_lock() {
129            Ok(mut map) => map.remove(key),
130            Err(_) => {
131                self.contention_counter.fetch_add(1, Ordering::Relaxed);
132                self.check_upgrade_to_sharded();
133                // Fall back to blocking lock
134                let mut map = self
135                    .simple_map
136                    .safe_lock()
137                    .expect("Failed to acquire lock on simple_map");
138                map.remove(key)
139            }
140        }
141    }
142
143    /// Check if we should upgrade to sharded mode
144    fn check_upgrade_to_sharded(&self) {
145        if self.use_sharded.load(Ordering::Relaxed) {
146            return;
147        }
148
149        let accesses = self.access_counter.load(Ordering::Relaxed);
150        let contentions = self.contention_counter.load(Ordering::Relaxed);
151
152        // Upgrade if contention rate > 10% and we have enough samples
153        if accesses >= 100 && contentions * 10 > accesses {
154            // For this simplified version, we just switch the flag
155            // In a real implementation, we would migrate data
156            self.use_sharded.store(true, Ordering::Relaxed);
157        }
158    }
159
160    /// Get current contention ratio
161    pub fn contention_ratio(&self) -> f64 {
162        let accesses = self.access_counter.load(Ordering::Relaxed);
163        let contentions = self.contention_counter.load(Ordering::Relaxed);
164
165        if accesses > 0 {
166            contentions as f64 / accesses as f64
167        } else {
168            0.0
169        }
170    }
171
172    /// Check if currently using sharded mode
173    pub fn is_sharded(&self) -> bool {
174        self.use_sharded.load(Ordering::Relaxed)
175    }
176
177    /// Get number of entries (approximate for sharded mode)
178    pub fn len(&self) -> usize {
179        if self.use_sharded.load(Ordering::Relaxed) {
180            if let Some(ref sharded) = self.sharded_map {
181                return sharded.len();
182            }
183        }
184
185        if let Ok(map) = self.simple_map.try_lock() {
186            map.len()
187        } else {
188            0 // Return 0 if locked to avoid blocking
189        }
190    }
191
192    /// Check if empty
193    pub fn is_empty(&self) -> bool {
194        self.len() == 0
195    }
196}
197
198impl<K, V> Default for AdaptiveHashMap<K, V>
199where
200    K: Hash + Eq + Clone,
201    V: Clone,
202{
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208// Safety: AdaptiveHashMap is Send if K and V are Send
209unsafe impl<K, V> Send for AdaptiveHashMap<K, V>
210where
211    K: Hash + Eq + Clone + Send,
212    V: Clone + Send,
213{
214}
215
216// Safety: AdaptiveHashMap is Sync if K and V are Send + Sync
217unsafe impl<K, V> Sync for AdaptiveHashMap<K, V>
218where
219    K: Hash + Eq + Clone + Send + Sync,
220    V: Clone + Send + Sync,
221{
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use std::sync::Arc;
228    use std::thread;
229
230    #[test]
231    fn test_basic_operations() {
232        let map = AdaptiveHashMap::new();
233
234        // Test insert and get
235        assert_eq!(map.insert("key1".to_string(), 42), None);
236        assert_eq!(map.get(&"key1".to_string()), Some(42));
237
238        // Test update
239        assert_eq!(map.insert("key1".to_string(), 43), Some(42));
240        assert_eq!(map.get(&"key1".to_string()), Some(43));
241
242        // Test remove
243        assert_eq!(map.remove(&"key1".to_string()), Some(43));
244        assert_eq!(map.get(&"key1".to_string()), None);
245    }
246
247    #[test]
248    fn test_concurrent_access_low_contention() {
249        let map = Arc::new(AdaptiveHashMap::new());
250        let mut handles = vec![];
251
252        // Low contention scenario - use fewer threads and operations to stay below upgrade threshold
253        for i in 0..2 {
254            let map_clone = map.clone();
255            let handle = thread::spawn(move || {
256                for j in 0..20 {
257                    let key = format!("key_{i}_{j}");
258                    map_clone.insert(key.clone(), i * 100 + j);
259                    assert_eq!(map_clone.get(&key), Some(i * 100 + j));
260                    // Use different keys per thread to minimize contention naturally
261                }
262            });
263            handles.push(handle);
264        }
265
266        for handle in handles {
267            handle.join().expect("Thread panicked");
268        }
269
270        // Should still be in simple mode due to low contention
271        // Check contention ratio first for debugging
272        let contention_ratio = map.contention_ratio();
273        let access_count = map.access_counter.load(Ordering::Relaxed);
274        println!(
275            "Access count: {}, Contention ratio: {:.2}%",
276            access_count,
277            contention_ratio * 100.0
278        );
279
280        // With reduced load, should stay in simple mode
281        assert!(!map.is_sharded());
282        assert_eq!(map.len(), 40); // 2 threads × 20 operations each
283    }
284}