use std::collections::BTreeMap;
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use scry_index::{Config, LearnedMap};
#[test]
fn concurrent_readers_bulk_loaded() {
let pairs: Vec<(u64, u64)> = (0..10_000).map(|i| (i, i * 10)).collect();
let map = Arc::new(LearnedMap::bulk_load(&pairs).unwrap());
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..10_000u64 {
assert_eq!(
map.get(&i, &guard),
Some(&(i * 10)),
"reader missed key {i}"
);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
#[test]
fn readers_writers_disjoint_ranges() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(12));
let writer_handles: Vec<_> = (0..4u64)
.map(|w| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = w * 1000;
for i in 0..1000 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
let reader_handles: Vec<_> = (0..8)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let mut found = 0;
for i in 0..1000u64 {
if map.get(&i, &guard).is_some() {
found += 1;
}
}
found
})
})
.collect();
for h in writer_handles {
h.join().unwrap();
}
for h in reader_handles {
h.join().unwrap();
}
let guard = map.guard();
assert_eq!(map.len(), 4000);
for i in 0..4000u64 {
assert_eq!(map.get(&i, &guard), Some(&i), "key {i} missing after join");
}
}
#[test]
fn writer_contention_shared_keys() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..1000u64 {
map.insert(i, t * 1000 + i, &guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
assert_eq!(map.len(), 1000);
for i in 0..1000u64 {
assert!(
map.get(&i, &guard).is_some(),
"key {i} missing after contention"
);
}
}
#[test]
fn insert_remove_interleaving() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
{
let guard = map.guard();
for i in 0..2000u64 {
map.insert(i, i, &guard);
}
}
assert_eq!(map.len(), 2000);
let barrier = Arc::new(Barrier::new(8));
let remove_handles: Vec<_> = (0..4)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in (0..2000u64).step_by(2) {
if i % 4 == (t * 2) % 4 {
map.remove(&i, &guard);
}
}
})
})
.collect();
let insert_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = 2000 + t * 500;
for i in 0..500 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
for h in remove_handles {
h.join().unwrap();
}
for h in insert_handles {
h.join().unwrap();
}
let guard = map.guard();
for i in (1..2000u64).step_by(2) {
assert_eq!(map.get(&i, &guard), Some(&i), "odd key {i} missing");
}
for i in 2000..4000u64 {
assert_eq!(map.get(&i, &guard), Some(&i), "new key {i} missing");
}
}
#[test]
fn stress_vs_oracle() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let oracle = Arc::new(Mutex::new(BTreeMap::new()));
let barrier = Arc::new(Barrier::new(4));
let handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let oracle = Arc::clone(&oracle);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 500;
for i in 0..500 {
let key = base + i;
map.insert(key, key, &guard);
oracle.lock().unwrap().insert(key, key);
}
for i in (0..500).step_by(3) {
let key = base + i;
map.remove(&key, &guard);
oracle.lock().unwrap().remove(&key);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let oracle = oracle.lock().unwrap();
let guard = map.guard();
assert_eq!(map.len(), oracle.len());
for (&k, &v) in oracle.iter() {
assert_eq!(
map.get(&k, &guard),
Some(&v),
"oracle has key {k} but map doesn't"
);
}
}
#[test]
fn map_ref_concurrent() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(4));
let handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let m = map.pin();
let base = t * 250;
for i in 0..250 {
m.insert(base + i, base + i);
}
for i in 0..250 {
assert!(m.contains_key(&(base + i)));
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(map.len(), 1000);
}
#[test]
fn insert_collision_contention() {
let pairs: Vec<(u64, u64)> = (0..100).map(|i| (i * 2, i)).collect();
let config = Config::new().auto_rebuild(false);
let map = Arc::new(LearnedMap::bulk_load_with_config(&pairs, config).unwrap());
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..100u64 {
if i % 8 == t {
let key = i * 2 + 1;
map.insert(key, key, &guard);
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..200u64 {
assert!(
map.get(&i, &guard).is_some(),
"key {i} missing after collision contention"
);
}
}
#[test]
fn concurrent_insert_remove_same_key() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(8));
{
let guard = map.guard();
for i in 0..100u64 {
map.insert(i, i, &guard);
}
}
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for round in 0..50u64 {
for key in 0..100u64 {
if t < 4 {
map.insert(key, round * 1000 + t, &guard);
} else {
map.remove(&key, &guard);
}
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
for key in 0..100u64 {
let got = map.get(&key, &guard);
let contains = map.contains_key(&key, &guard);
assert_eq!(
got.is_some(),
contains,
"inconsistency at key {key}: get={got:?}, contains={contains}"
);
}
}
#[test]
fn rebuild_with_concurrent_readers() {
let pairs: Vec<(u64, u64)> = (0..5000).map(|i| (i, i)).collect();
let map = Arc::new(LearnedMap::bulk_load(&pairs).unwrap());
let barrier = Arc::new(Barrier::new(5));
let reader_handles: Vec<_> = (0..4)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..10 {
let guard = map.guard();
for i in 0..5000u64 {
let _ = map.get(&i, &guard);
}
}
})
})
.collect();
let map_clone = Arc::clone(&map);
let barrier_clone = Arc::clone(&barrier);
let rebuild_handle = thread::spawn(move || {
barrier_clone.wait();
for _ in 0..3 {
let guard = map_clone.guard();
map_clone.rebuild(&guard);
}
});
rebuild_handle.join().unwrap();
for h in reader_handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..5000u64 {
assert_eq!(map.get(&i, &guard), Some(&i), "key {i} lost after rebuild");
}
}
#[test]
fn global_rebuild_lockfree() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(5));
let insert_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 750;
for i in 0..750 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
let map_r = Arc::clone(&map);
let barrier_r = Arc::clone(&barrier);
let rebuild_handle = thread::spawn(move || {
barrier_r.wait();
for _ in 0..10 {
let guard = map_r.guard();
map_r.rebuild(&guard);
}
});
for h in insert_handles {
h.join().unwrap();
}
rebuild_handle.join().unwrap();
let guard = map.guard();
let actual = map.iter_sorted(&guard).len();
assert!(
actual <= 3000,
"actual count {actual} exceeds total inserts"
);
for i in 0..3000u64 {
map.insert(i, i, &guard);
}
map.rebuild(&guard);
let g2 = map.guard();
assert_eq!(map.len(), 3000);
for i in 0..3000u64 {
assert!(map.get(&i, &g2).is_some(), "key {i} missing after recovery");
}
}
#[test]
fn rebuild_with_concurrent_removes_no_corruption() {
let pairs: Vec<(u64, u64)> = (0..2000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(3));
let remove_handles: Vec<_> = (0..2u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in (0..2000u64).step_by(2) {
if i % 4 == t * 2 {
map.remove(&i, &guard);
}
}
})
})
.collect();
let map_r = Arc::clone(&map);
let barrier_r = Arc::clone(&barrier);
let rebuild_handle = thread::spawn(move || {
barrier_r.wait();
for _ in 0..5 {
let guard = map_r.guard();
map_r.rebuild(&guard);
}
});
for h in remove_handles {
h.join().unwrap();
}
rebuild_handle.join().unwrap();
let guard = map.guard();
for i in (1..2000u64).step_by(2) {
assert!(
map.get(&i, &guard).is_some(),
"odd key {i} should still be present"
);
}
let actual = map.iter_sorted(&guard).len();
assert!(
actual >= 1000,
"at least 1000 odd keys should be present, got {actual}"
);
}
#[test]
fn auto_rebuild_concurrent_no_corruption() {
let map = Arc::new(LearnedMap::new());
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 1000;
for i in 0..1000 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
let actual = map.iter_sorted(&guard).len();
assert!(actual > 0, "map should not be empty after 8000 inserts");
let depth = map.max_depth(&guard);
assert!(
depth <= 20,
"depth {depth} too high with localized rebuilds"
);
for i in 0..8000u64 {
map.insert(i, i, &guard);
}
map.rebuild(&guard);
let g2 = map.guard();
assert_eq!(map.len(), 8000);
for i in 0..8000u64 {
assert!(map.get(&i, &g2).is_some(), "key {i} missing after recovery");
}
}
#[test]
fn multiple_concurrent_rebuilds() {
let pairs: Vec<(u64, u64)> = (0..5000).map(|i| (i, i)).collect();
let map = Arc::new(LearnedMap::bulk_load(&pairs).unwrap());
let barrier = Arc::new(Barrier::new(4));
let handles: Vec<_> = (0..4)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..5 {
let guard = map.guard();
map.rebuild(&guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..5000u64 {
assert_eq!(
map.get(&i, &guard),
Some(&i),
"key {i} corrupted after concurrent rebuilds"
);
}
assert_eq!(map.len(), 5000);
}
#[test]
fn auto_rebuild_keeps_depth_bounded() {
let map = LearnedMap::new();
let g = map.guard();
for i in 0..10_000u64 {
map.insert(i, i, &g);
}
assert_eq!(map.len(), 10_000);
let g2 = map.guard();
let depth = map.max_depth(&g2);
assert!(
depth <= 15,
"depth {depth} too high after 10k inserts with auto root + subtree rebuild"
);
for i in 0..10_000u64 {
assert_eq!(
map.get(&i, &g2),
Some(&i),
"key {i} missing after auto-rebuild"
);
}
map.rebuild(&g2);
let g3 = map.guard();
assert!(
map.max_depth(&g3) <= 5,
"depth {} too high after explicit rebuild",
map.max_depth(&g3)
);
}
#[test]
fn auto_rebuild_disabled_depth_grows() {
let map = LearnedMap::with_config(Config::new().auto_rebuild(false));
let g = map.guard();
for i in 0..200u64 {
map.insert(i, i, &g);
}
let depth = map.max_depth(&g);
assert!(
depth > 5,
"depth {depth} suspiciously low without auto-rebuild"
);
}
#[test]
fn concurrent_range_during_inserts() {
let pairs: Vec<(u64, u64)> = (0..1000).map(|i| (i * 2, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(6));
let writer_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..1000u64 {
if i % 4 == t {
map.insert(i * 2 + 1, i + 10_000, &guard);
}
}
})
})
.collect();
let reader_handles: Vec<_> = (0..2)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..20 {
let guard = map.guard();
let items: Vec<u64> = map.range(100..200, &guard).map(|(k, _)| *k).collect();
for w in items.windows(2) {
assert!(w[0] < w[1], "range not sorted: {} >= {}", w[0], w[1]);
}
for &k in &items {
assert!((100..200).contains(&k), "key {k} out of range");
}
}
})
})
.collect();
for h in writer_handles {
h.join().unwrap();
}
for h in reader_handles {
h.join().unwrap();
}
}
#[test]
fn concurrent_first_last_during_inserts() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(6));
let writer_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 1000;
for i in 0..1000 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
let reader_handles: Vec<_> = (0..2)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..50 {
let guard = map.guard();
let _ = map.first_key_value(&guard);
let _ = map.last_key_value(&guard);
}
})
})
.collect();
for h in writer_handles {
h.join().unwrap();
}
for h in reader_handles {
h.join().unwrap();
}
let guard = map.guard();
assert_eq!(map.first_key_value(&guard).map(|(k, _)| *k), Some(0));
assert_eq!(map.last_key_value(&guard).map(|(k, _)| *k), Some(3999));
}
#[test]
fn localized_rebuild_bounds_depth() {
let anchors: Vec<(u64, u64)> = (0..8000).step_by(80).map(|i| (i, i)).collect();
let map = Arc::new(LearnedMap::bulk_load(&anchors).unwrap());
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 1000;
for i in 0..1000 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
let depth = map.max_depth(&guard);
assert!(
depth <= 20,
"depth {depth} too high after 8x1000 inserts with localized rebuilds"
);
}
#[test]
fn localized_rebuild_no_data_loss() {
let anchors: Vec<(u64, u64)> = (0..8000).step_by(80).map(|i| (i, i)).collect();
let map = Arc::new(LearnedMap::bulk_load(&anchors).unwrap());
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 1000;
for i in 0..1000 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..8000u64 {
assert!(
map.get(&i, &guard).is_some(),
"key {i} lost during localized rebuild"
);
}
assert_eq!(map.len(), 8000);
}
#[test]
fn auto_root_rebuild_concurrent_insert() {
let map = Arc::new(LearnedMap::new());
let barrier = Arc::new(Barrier::new(4));
let handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 2000;
for i in 0..2000 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
let depth = map.max_depth(&guard);
assert!(
depth <= 15,
"depth {depth} too high with auto root rebuild under concurrency"
);
for i in 0..8000u64 {
map.insert(i, i, &guard);
}
map.rebuild(&guard);
let g2 = map.guard();
assert_eq!(map.len(), 8000);
for i in 0..8000u64 {
assert!(map.get(&i, &g2).is_some(), "key {i} missing after recovery");
}
}
#[test]
fn concurrent_remove_tombstone_compaction() {
let pairs: Vec<(u64, u64)> = (0..4000).map(|i| (i, i)).collect();
let map = Arc::new(LearnedMap::bulk_load(&pairs).unwrap());
let barrier = Arc::new(Barrier::new(4));
let handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..4000u64 {
if i % 4 == t {
map.remove(&i, &guard);
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
assert_eq!(map.len(), 0);
for i in 0..1000u64 {
map.insert(i, i, &guard);
}
let g2 = map.guard();
assert_eq!(map.len(), 1000);
for i in 0..1000u64 {
assert!(
map.get(&i, &g2).is_some(),
"key {i} missing after tombstone compaction recovery"
);
}
}
#[test]
fn concurrent_get_or_insert_same_key() {
let map = Arc::new(LearnedMap::new());
let barrier = Arc::new(Barrier::new(8));
let winner_values = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
let winner_values = Arc::clone(&winner_values);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let val = map.get_or_insert(42u64, t, &guard);
winner_values.lock().unwrap().push(*val);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let values = winner_values.lock().unwrap();
assert_eq!(values.len(), 8);
let first = values[0];
for &v in values.iter() {
assert_eq!(v, first, "all threads should see the same value");
}
assert!(first < 8, "winner should be a thread id");
drop(values);
assert_eq!(map.len(), 1);
}
#[test]
fn concurrent_get_or_insert_disjoint_keys() {
let map = Arc::new(LearnedMap::new());
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 500;
for i in 0..500 {
let key = base + i;
let val = map.get_or_insert(key, key * 10, &guard);
assert_eq!(*val, key * 10);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(map.len(), 4000);
let guard = map.guard();
for i in 0..4000u64 {
assert_eq!(
map.get(&i, &guard),
Some(&(i * 10)),
"key {i} has wrong value"
);
}
}
#[test]
fn concurrent_get_or_insert_mixed_ops() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(8));
let writer_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..200 {
let key = i * 4 + t;
map.insert(key, key, &guard);
}
})
})
.collect();
let goi_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..200 {
let key = i * 4 + t;
let _val = map.get_or_insert(key, key + 10_000, &guard);
}
})
})
.collect();
for h in writer_handles {
h.join().unwrap();
}
for h in goi_handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..800u64 {
assert!(
map.get(&i, &guard).is_some(),
"key {i} missing after mixed concurrent ops"
);
}
}
#[test]
fn rebuild_does_not_resurrect_removed_keys() {
let pairs: Vec<(u64, u64)> = (0..1000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(3));
let map1 = Arc::clone(&map);
let b1 = Arc::clone(&barrier);
let h1 = thread::spawn(move || {
b1.wait();
let guard = map1.guard();
for i in (0..1000u64).step_by(2) {
map1.remove(&i, &guard);
}
});
let map2 = Arc::clone(&map);
let b2 = Arc::clone(&barrier);
let h2 = thread::spawn(move || {
b2.wait();
for _ in 0..20 {
let guard = map2.guard();
map2.rebuild(&guard);
}
});
barrier.wait();
{
let guard = map.guard();
for i in (1..1000u64).step_by(2) {
map.remove(&i, &guard);
}
}
h1.join().unwrap();
h2.join().unwrap();
let guard = map.guard();
for i in 0..1000u64 {
assert!(
map.get(&i, &guard).is_none(),
"key {i} was resurrected by rebuild"
);
}
assert_eq!(map.iter_sorted(&guard).len(), 0);
}
#[test]
fn localized_rebuild_does_not_resurrect_removed_keys() {
let config = Config::new()
.auto_rebuild(true)
.rebuild_depth_threshold(3)
.tombstone_ratio_threshold(0.2);
let map = Arc::new(LearnedMap::with_config(config));
let barrier = Arc::new(Barrier::new(3));
let map1 = Arc::clone(&map);
let b1 = Arc::clone(&barrier);
let h1 = thread::spawn(move || {
b1.wait();
let guard = map1.guard();
for i in 0..2000u64 {
map1.insert(i, i, &guard);
}
});
let map2 = Arc::clone(&map);
let b2 = Arc::clone(&barrier);
let h2 = thread::spawn(move || {
b2.wait();
let guard = map2.guard();
for _ in 0..5 {
for i in 0..2000u64 {
map2.remove(&i, &guard);
}
}
});
barrier.wait();
h1.join().unwrap();
h2.join().unwrap();
{
let guard = map.guard();
for i in 0..2000u64 {
map.remove(&i, &guard);
}
}
let guard = map.guard();
assert_eq!(
map.iter_sorted(&guard).len(),
0,
"keys remain after exhaustive remove"
);
}
#[test]
fn insert_during_concurrent_rebuild_retry() {
let pairs: Vec<(u64, u64)> = (0..2000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(12));
let insert_handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = 2000 + t * 500;
for i in 0..500 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
let rebuild_handles: Vec<_> = (0..4)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..20 {
let guard = map.guard();
map.rebuild(&guard);
}
})
})
.collect();
for h in insert_handles {
h.join().unwrap();
}
for h in rebuild_handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..6000u64 {
map.insert(i, i, &guard);
}
map.rebuild(&guard);
let g2 = map.guard();
assert_eq!(map.len(), 6000);
for i in 0..6000u64 {
assert!(map.get(&i, &g2).is_some(), "key {i} missing after recovery");
}
}
#[test]
fn remove_during_concurrent_rebuild_retry() {
let pairs: Vec<(u64, u64)> = (0..4000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(6));
let remove_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..4000u64 {
if i % 4 == t {
map.remove(&i, &guard);
}
}
})
})
.collect();
let rebuild_handles: Vec<_> = (0..2)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..30 {
let guard = map.guard();
map.rebuild(&guard);
}
})
})
.collect();
for h in remove_handles {
h.join().unwrap();
}
for h in rebuild_handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..4000u64 {
map.remove(&i, &guard);
}
assert_eq!(
map.iter_sorted(&guard).len(),
0,
"keys remain after exhaustive remove"
);
}
#[test]
fn get_or_insert_during_concurrent_rebuild_retry() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(6));
let goi_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = t * 500;
for i in 0..500 {
let key = base + i;
let _val = map.get_or_insert(key, key * 10, &guard);
}
})
})
.collect();
let rebuild_handles: Vec<_> = (0..2)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..30 {
let guard = map.guard();
map.rebuild(&guard);
}
})
})
.collect();
for h in goi_handles {
h.join().unwrap();
}
for h in rebuild_handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..2000u64 {
map.insert(i, i * 10, &guard);
}
map.rebuild(&guard);
let g2 = map.guard();
assert_eq!(map.len(), 2000);
for i in 0..2000u64 {
assert!(map.get(&i, &g2).is_some(), "key {i} missing");
}
}
#[test]
fn rebuild_freeze_cas_contention() {
let pairs: Vec<(u64, u64)> = (0..1000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..50 {
let guard = map.guard();
map.rebuild(&guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
assert_eq!(map.len(), 1000);
for i in 0..1000u64 {
assert_eq!(map.get(&i, &guard), Some(&i), "key {i} corrupted");
}
}
#[test]
fn concurrent_drain_contention() {
let pairs: Vec<(u64, u64)> = (0..2000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(4));
let results = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = (0..4)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
let results = Arc::clone(&results);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let drained = map.drain(&guard);
results.lock().unwrap().push(drained.len());
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let total: usize = results.lock().unwrap().iter().sum();
assert_eq!(
total, 2000,
"total drained entries should be 2000, got {total}"
);
assert!(map.is_empty(), "map should be empty after drain");
}
#[test]
fn drain_during_concurrent_inserts() {
let pairs: Vec<(u64, u64)> = (0..1000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(5));
let insert_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = 1000 + t * 500;
for i in 0..500 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
barrier.wait();
{
let guard = map.guard();
let _drained = map.drain(&guard);
}
for h in insert_handles {
h.join().unwrap();
}
let guard = map.guard();
let actual = map.iter_sorted(&guard).len();
assert!(actual <= 3000, "count {actual} exceeds maximum possible");
}
#[test]
fn concurrent_clear_contention() {
let pairs: Vec<(u64, u64)> = (0..2000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(4));
let handles: Vec<_> = (0..4)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
map.clear(&guard);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert!(
map.is_empty(),
"map should be empty after concurrent clears"
);
}
#[test]
fn clear_during_concurrent_inserts() {
let pairs: Vec<(u64, u64)> = (0..1000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(5));
let insert_handles: Vec<_> = (0..4u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
let base = 1000 + t * 500;
for i in 0..500 {
map.insert(base + i, base + i, &guard);
}
})
})
.collect();
barrier.wait();
{
let guard = map.guard();
map.clear(&guard);
}
for h in insert_handles {
h.join().unwrap();
}
let guard = map.guard();
let actual = map.iter_sorted(&guard).len();
assert!(actual <= 3000, "count {actual} exceeds maximum possible");
}
#[test]
fn drain_clear_rebuild_contention() {
let pairs: Vec<(u64, u64)> = (0..2000).map(|i| (i, i)).collect();
let map = Arc::new(
LearnedMap::bulk_load_with_config(&pairs, Config::new().auto_rebuild(false)).unwrap(),
);
let barrier = Arc::new(Barrier::new(3));
let map1 = Arc::clone(&map);
let b1 = Arc::clone(&barrier);
let h1 = thread::spawn(move || {
b1.wait();
for _ in 0..20 {
let guard = map1.guard();
let _ = map1.drain(&guard);
for i in 0..100u64 {
map1.insert(i, i, &guard);
}
}
});
let map2 = Arc::clone(&map);
let b2 = Arc::clone(&barrier);
let h2 = thread::spawn(move || {
b2.wait();
for _ in 0..20 {
let guard = map2.guard();
map2.clear(&guard);
for i in 100..200u64 {
map2.insert(i, i, &guard);
}
}
});
barrier.wait();
for _ in 0..20 {
let guard = map.guard();
map.rebuild(&guard);
}
h1.join().unwrap();
h2.join().unwrap();
let guard = map.guard();
let _ = map.iter_sorted(&guard);
}
#[test]
fn insert_writing_state_spin() {
let map = Arc::new(LearnedMap::with_config(Config::new().auto_rebuild(false)));
let barrier = Arc::new(Barrier::new(16));
let handles: Vec<_> = (0..16u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..100u64 {
map.insert(i, t * 1000 + i, &guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
assert_eq!(map.len(), 100);
for i in 0..100u64 {
assert!(map.get(&i, &guard).is_some(), "key {i} missing");
}
}
#[test]
fn insert_descent_snapshot_retry_under_localized_rebuild() {
let config = Config::new().auto_rebuild(true).rebuild_depth_threshold(3);
let map = Arc::new(LearnedMap::with_config(config));
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..200u64 {
map.insert(i, t * 1000 + i, &guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..200u64 {
map.insert(i, i, &guard);
}
map.rebuild(&guard);
let g2 = map.guard();
assert_eq!(map.len(), 200);
for i in 0..200u64 {
assert!(map.get(&i, &g2).is_some(), "key {i} missing");
}
}
#[test]
fn get_or_insert_descent_snapshot_retry() {
let config = Config::new().auto_rebuild(true).rebuild_depth_threshold(3);
let map = Arc::new(LearnedMap::with_config(config));
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let guard = map.guard();
for i in 0..200u64 {
let _val = map.get_or_insert(i, t * 1000 + i, &guard);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let guard = map.guard();
for i in 0..200u64 {
map.insert(i, i, &guard);
}
map.rebuild(&guard);
let g2 = map.guard();
assert_eq!(map.len(), 200);
for i in 0..200u64 {
assert!(map.get(&i, &g2).is_some(), "key {i} missing");
}
}