use super::*;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[test]
fn test_concurrent_hash_map_stress() {
let map = Arc::new(ConcurrentHashMap::new());
let num_threads = 8;
let operations_per_thread = 10000;
let mut handles = vec![];
for thread_id in 0..num_threads {
let map = Arc::clone(&map);
let handle = thread::spawn(move || {
let mut local_sum = 0;
for i in 0..operations_per_thread {
let key = thread_id * operations_per_thread + i;
map.insert(key, key * 2);
if let Some(value) = map.get(&key) {
local_sum += *value;
}
if i % 100 == 0 {
map.remove(&key);
map.insert(key, key * 3);
}
}
local_sum
});
handles.push(handle);
}
let mut total_sum = 0;
for handle in handles {
total_sum += handle.join().unwrap();
}
let mut expected_sum = 0;
for thread_id in 0..num_threads {
for i in 0..operations_per_thread {
let key = thread_id * operations_per_thread + i;
let value = if i % 100 == 0 { key * 3 } else { key * 2 };
expected_sum += value;
}
}
assert_eq!(total_sum, expected_sum);
}
#[test]
fn test_concurrent_hash_map_resize_under_load() {
let map = Arc::new(ConcurrentHashMap::with_capacity(16));
let num_producers = 4;
let num_consumers = 4;
let items_per_producer = 1000;
let mut producer_handles = vec![];
for producer_id in 0..num_producers {
let map = Arc::clone(&map);
let handle = thread::spawn(move || {
for i in 0..items_per_producer {
let key = producer_id * items_per_producer + i;
map.insert(key, format!("value_{}_{}", producer_id, i));
if i % 100 == 0 {
thread::sleep(Duration::from_micros(1));
}
}
});
producer_handles.push(handle);
}
let mut consumer_handles = vec![];
for _ in 0..num_consumers {
let map = Arc::clone(&map);
let handle = thread::spawn(move || {
let mut count = 0;
let mut found_keys = vec![];
while count < items_per_producer * num_producers / num_consumers {
for key in 0..items_per_producer * num_producers {
if let Some(_value) = map.get(&key) {
if !found_keys.contains(&key) {
found_keys.push(key);
count += 1;
}
}
}
if found_keys.len() == count {
thread::yield_now();
}
}
count
});
consumer_handles.push(handle);
}
for handle in producer_handles {
handle.join().unwrap();
}
let mut total_consumed = 0;
for handle in consumer_handles {
total_consumed += handle.join().unwrap();
}
for key in 0..num_producers * items_per_producer {
assert!(map.get(&key).is_some(), "Missing key: {}", key);
}
assert!(map.capacity() > 16);
}
#[test]
fn test_concurrent_hash_map_memory_safety() {
use std::sync::atomic::{AtomicUsize, Ordering};
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq, Eq, Hash)]
struct DropTracker {
id: usize,
}
impl Drop for DropTracker {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
let map: Arc<ConcurrentHashMap<DropTracker, String>> = Arc::new(ConcurrentHashMap::new());
let num_threads = 4;
let items_per_thread = 100;
let mut handles = vec![];
for thread_id in 0..num_threads {
let map = Arc::clone(&map);
let handle = thread::spawn(move || {
for i in 0..items_per_thread {
let key = DropTracker { id: thread_id * items_per_thread + i };
let value = format!("value_{}", key.id);
map.insert(key, value);
if i % 2 == 1 {
let key = DropTracker { id: thread_id * items_per_thread + (i - 1) };
map.remove(&key);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
drop(map);
let total_items = num_threads * items_per_thread;
let dropped_items = DROP_COUNT.load(Ordering::Relaxed);
assert!(dropped_items >= total_items / 2); }
#[test]
fn test_concurrent_hash_map_fairness() {
let map = Arc::new(ConcurrentHashMap::new());
let num_threads = 8;
let items_per_thread = 100;
let mut handles = vec![];
for thread_id in 0..num_threads {
let map = Arc::clone(&map);
let handle = thread::spawn(move || {
for i in 0..items_per_thread {
let key = thread_id * 10000 + i; let value = format!("thread_{}_item_{}", thread_id, i);
map.insert(key, value);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for thread_id in 0..num_threads {
for i in 0..items_per_thread {
let key = thread_id * 10000 + i;
let expected_value = format!("thread_{}_item_{}", thread_id, i);
assert_eq!(map.get(&key), Some(&expected_value));
}
}
assert_eq!(map.len(), num_threads * items_per_thread);
}
#[test]
fn test_concurrent_hash_map_high_contention() {
let map = Arc::new(ConcurrentHashMap::new());
let num_threads = 16;
let operations_per_thread = 1000;
let mut handles = vec![];
for thread_id in 0..num_threads {
let map = Arc::clone(&map);
let handle = thread::spawn(move || {
for i in 0..operations_per_thread {
let key = i % 10; let value = format!("thread_{}_op_{}", thread_id, i);
match i % 3 {
0 => map.insert(key, value),
1 => map.get(&key),
2 => map.remove(&key),
_ => unreachable!(),
};
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for key in 0..10 {
map.insert(key, format!("final_value_{}", key));
assert!(map.get(&key).is_some());
}
}
#[test]
fn test_concurrent_hash_map_complex_keys() {
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ComplexKey {
id: u64,
name: String,
tags: Vec<u32>,
}
let map: Arc<ConcurrentHashMap<ComplexKey, Vec<String>>> = Arc::new(ConcurrentHashMap::new());
let num_threads = 4;
let items_per_thread = 100;
let mut handles = vec![];
for thread_id in 0..num_threads {
let map = Arc::clone(&map);
let handle = thread::spawn(move || {
for i in 0..items_per_thread {
let key = ComplexKey {
id: (thread_id * items_per_thread + i) as u64,
name: format!("item_{}", i),
tags: vec![i as u32, (i + 1) as u32],
};
let value = vec![
format!("thread_{}", thread_id),
format!("item_{}", i),
format!("data_{}", i * 2),
];
map.insert(key.clone(), value.clone());
assert_eq!(map.get(&key), Some(&value));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for thread_id in 0..num_threads {
for i in 0..items_per_thread {
let key = ComplexKey {
id: (thread_id * items_per_thread + i) as u64,
name: format!("item_{}", i),
tags: vec![i as u32, (i + 1) as u32],
};
assert!(map.get(&key).is_some(), "Missing complex key: {:?}", key);
}
}
}