memscope_rs/core/
adaptive_hashmap.rs1use crate::core::safe_operations::SafeLock;
7use crate::core::sharded_locks::ShardedRwLock;
8use std::collections::HashMap;
9use std::hash::Hash;
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11use std::sync::Mutex;
12
13pub struct AdaptiveHashMap<K, V>
16where
17 K: Hash + Eq + Clone,
18 V: Clone,
19{
20 simple_map: Mutex<HashMap<K, V>>,
21 sharded_map: Option<ShardedRwLock<K, V>>,
22 access_counter: AtomicU64,
23 contention_counter: AtomicU64,
24 use_sharded: AtomicBool,
25}
26
27impl<K, V> AdaptiveHashMap<K, V>
28where
29 K: Hash + Eq + Clone,
30 V: Clone,
31{
32 pub fn new() -> Self {
34 Self {
35 simple_map: Mutex::new(HashMap::new()),
36 sharded_map: None,
37 access_counter: AtomicU64::new(0),
38 contention_counter: AtomicU64::new(0),
39 use_sharded: AtomicBool::new(false),
40 }
41 }
42
43 pub fn with_capacity(capacity: usize) -> Self {
45 Self {
46 simple_map: Mutex::new(HashMap::with_capacity(capacity)),
47 sharded_map: None,
48 access_counter: AtomicU64::new(0),
49 contention_counter: AtomicU64::new(0),
50 use_sharded: AtomicBool::new(false),
51 }
52 }
53
54 pub fn new_sharded() -> Self {
56 Self {
57 simple_map: Mutex::new(HashMap::new()),
58 sharded_map: Some(ShardedRwLock::new()),
59 access_counter: AtomicU64::new(0),
60 contention_counter: AtomicU64::new(0),
61 use_sharded: AtomicBool::new(true),
62 }
63 }
64
65 pub fn insert(&self, key: K, value: V) -> Option<V> {
67 self.access_counter.fetch_add(1, Ordering::Relaxed);
68
69 if self.use_sharded.load(Ordering::Relaxed) {
70 if let Some(ref sharded) = self.sharded_map {
71 return sharded.insert(key, value);
72 }
73 }
74
75 match self.simple_map.try_lock() {
77 Ok(mut map) => map.insert(key, value),
78 Err(_) => {
79 self.contention_counter.fetch_add(1, Ordering::Relaxed);
80 self.check_upgrade_to_sharded();
81 let mut map = self
83 .simple_map
84 .safe_lock()
85 .expect("Failed to acquire lock on simple_map");
86 map.insert(key, value)
87 }
88 }
89 }
90
91 pub fn get(&self, key: &K) -> Option<V> {
93 self.access_counter.fetch_add(1, Ordering::Relaxed);
94
95 if self.use_sharded.load(Ordering::Relaxed) {
96 if let Some(ref sharded) = self.sharded_map {
97 return sharded.get(key);
98 }
99 }
100
101 match self.simple_map.try_lock() {
103 Ok(map) => map.get(key).cloned(),
104 Err(_) => {
105 self.contention_counter.fetch_add(1, Ordering::Relaxed);
106 self.check_upgrade_to_sharded();
107 let map = self
109 .simple_map
110 .safe_lock()
111 .expect("Failed to acquire lock on simple_map");
112 map.get(key).cloned()
113 }
114 }
115 }
116
117 pub fn remove(&self, key: &K) -> Option<V> {
119 self.access_counter.fetch_add(1, Ordering::Relaxed);
120
121 if self.use_sharded.load(Ordering::Relaxed) {
122 if let Some(ref sharded) = self.sharded_map {
123 return sharded.remove(key);
124 }
125 }
126
127 match self.simple_map.try_lock() {
129 Ok(mut map) => map.remove(key),
130 Err(_) => {
131 self.contention_counter.fetch_add(1, Ordering::Relaxed);
132 self.check_upgrade_to_sharded();
133 let mut map = self
135 .simple_map
136 .safe_lock()
137 .expect("Failed to acquire lock on simple_map");
138 map.remove(key)
139 }
140 }
141 }
142
143 fn check_upgrade_to_sharded(&self) {
145 if self.use_sharded.load(Ordering::Relaxed) {
146 return;
147 }
148
149 let accesses = self.access_counter.load(Ordering::Relaxed);
150 let contentions = self.contention_counter.load(Ordering::Relaxed);
151
152 if accesses >= 100 && contentions * 10 > accesses {
154 self.use_sharded.store(true, Ordering::Relaxed);
157 }
158 }
159
160 pub fn contention_ratio(&self) -> f64 {
162 let accesses = self.access_counter.load(Ordering::Relaxed);
163 let contentions = self.contention_counter.load(Ordering::Relaxed);
164
165 if accesses > 0 {
166 contentions as f64 / accesses as f64
167 } else {
168 0.0
169 }
170 }
171
172 pub fn is_sharded(&self) -> bool {
174 self.use_sharded.load(Ordering::Relaxed)
175 }
176
177 pub fn len(&self) -> usize {
179 if self.use_sharded.load(Ordering::Relaxed) {
180 if let Some(ref sharded) = self.sharded_map {
181 return sharded.len();
182 }
183 }
184
185 if let Ok(map) = self.simple_map.try_lock() {
186 map.len()
187 } else {
188 0 }
190 }
191
192 pub fn is_empty(&self) -> bool {
194 self.len() == 0
195 }
196}
197
198impl<K, V> Default for AdaptiveHashMap<K, V>
199where
200 K: Hash + Eq + Clone,
201 V: Clone,
202{
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208unsafe impl<K, V> Send for AdaptiveHashMap<K, V>
210where
211 K: Hash + Eq + Clone + Send,
212 V: Clone + Send,
213{
214}
215
216unsafe impl<K, V> Sync for AdaptiveHashMap<K, V>
218where
219 K: Hash + Eq + Clone + Send + Sync,
220 V: Clone + Send + Sync,
221{
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use std::sync::Arc;
228 use std::thread;
229
230 #[test]
231 fn test_basic_operations() {
232 let map = AdaptiveHashMap::new();
233
234 assert_eq!(map.insert("key1".to_string(), 42), None);
236 assert_eq!(map.get(&"key1".to_string()), Some(42));
237
238 assert_eq!(map.insert("key1".to_string(), 43), Some(42));
240 assert_eq!(map.get(&"key1".to_string()), Some(43));
241
242 assert_eq!(map.remove(&"key1".to_string()), Some(43));
244 assert_eq!(map.get(&"key1".to_string()), None);
245 }
246
247 #[test]
248 fn test_concurrent_access_low_contention() {
249 let map = Arc::new(AdaptiveHashMap::new());
250 let mut handles = vec![];
251
252 for i in 0..2 {
254 let map_clone = map.clone();
255 let handle = thread::spawn(move || {
256 for j in 0..20 {
257 let key = format!("key_{i}_{j}");
258 map_clone.insert(key.clone(), i * 100 + j);
259 assert_eq!(map_clone.get(&key), Some(i * 100 + j));
260 }
262 });
263 handles.push(handle);
264 }
265
266 for handle in handles {
267 handle.join().expect("Thread panicked");
268 }
269
270 let contention_ratio = map.contention_ratio();
273 let access_count = map.access_counter.load(Ordering::Relaxed);
274 println!(
275 "Access count: {}, Contention ratio: {:.2}%",
276 access_count,
277 contention_ratio * 100.0
278 );
279
280 assert!(!map.is_sharded());
282 assert_eq!(map.len(), 40); }
284}