#![cfg(not(miri))]
use std::sync::Arc;
use std::thread;
use fastskip::ConcurrentSkipList;
#[test]
fn test_4_writer_threads_1k_each() {
let mt = Arc::new(ConcurrentSkipList::with_shards(8));
let mut handles = vec![];
for tid in 0..4 {
let mt: Arc<ConcurrentSkipList> = Arc::clone(&mt);
handles.push(thread::spawn(move || {
for i in 0..1000 {
let key = format!("t{}_k{:04}", tid, i);
let val = format!("t{}_v{:04}", tid, i);
mt.insert(key.as_bytes(), val.as_bytes());
}
}));
}
for h in handles {
h.join().unwrap();
}
for tid in 0..4 {
for i in 0..1000 {
let key = format!("t{}_k{:04}", tid, i);
match mt.get(key.as_bytes()) {
Some((v, false)) => {
let expected = format!("t{}_v{:04}", tid, i);
assert_eq!(v, expected.as_bytes());
}
other => panic!("expected value for t{} k{}, got {:?}", tid, i, other),
}
}
}
assert!(mt.len() >= 4000);
}
#[test]
fn test_8_writer_threads_1k_each() {
let mt = Arc::new(ConcurrentSkipList::with_shards(32));
let mut handles = vec![];
for tid in 0..8 {
let mt: Arc<ConcurrentSkipList> = Arc::clone(&mt);
handles.push(thread::spawn(move || {
for i in 0..1000 {
let key = format!("t{}_k{:06}", tid, i);
let val = format!("t{}_v{:06}", tid, i);
mt.insert(key.as_bytes(), val.as_bytes());
}
}));
}
for h in handles {
h.join().unwrap();
}
for tid in 0..8 {
let key = format!("t{}_k000000", tid);
assert!(mt.get(key.as_bytes()).is_some());
}
assert!(mt.len() >= 8000);
}
#[test]
fn test_concurrent_insert_and_read() {
let mt = Arc::new(ConcurrentSkipList::with_shards(32));
for i in 0..100 {
let key = format!("init_{:04}", i);
let val = format!("val_{:04}", i);
mt.insert(key.as_bytes(), val.as_bytes());
}
let mut handles = vec![];
for tid in 0..4 {
let mt: Arc<ConcurrentSkipList> = Arc::clone(&mt);
handles.push(thread::spawn(move || {
for i in 0..500 {
let key = format!("w{}_k{:04}", tid, i);
let val = format!("w{}_v{:04}", tid, i);
mt.insert(key.as_bytes(), val.as_bytes());
}
}));
}
for _ in 0..2 {
let mt: Arc<ConcurrentSkipList> = Arc::clone(&mt);
handles.push(thread::spawn(move || {
for i in 0..100 {
let key = format!("init_{:04}", i);
let _ = mt.get(key.as_bytes());
}
for _ in 0..100 {
let _ = mt.get(b"nonexistent_key");
}
}));
}
for h in handles {
h.join().unwrap();
}
}
#[test]
fn test_no_data_races() {
let mt = Arc::new(ConcurrentSkipList::with_shards(8));
let mut handles = vec![];
for tid in 0..4 {
let mt: Arc<ConcurrentSkipList> = Arc::clone(&mt);
handles.push(thread::spawn(move || {
for i in 0..2000 {
let key = format!("shared_{:06}", i);
let val = format!("t{}_{:06}", tid, i);
mt.insert(key.as_bytes(), val.as_bytes());
}
}));
}
for h in handles {
h.join().unwrap();
}
for i in 0..2000 {
let key = format!("shared_{:06}", i);
match mt.get(key.as_bytes()) {
Some((v, false)) => {
let s = std::str::from_utf8(v).unwrap();
assert!(s.starts_with("t"), "corrupted value: {:?}", s);
}
other => panic!("expected value for key {}, got {:?}", i, other),
}
}
}
#[test]
fn test_concurrent_snapshot_while_writing() {
let mt = Arc::new(ConcurrentSkipList::with_shards(4));
for i in 0..50 {
let key = format!("pre_{:04}", i);
let val = format!("val_{:04}", i);
mt.insert(key.as_bytes(), val.as_bytes());
}
let snap = mt.snapshot();
let snap_entries: Vec<_> = snap.iter().collect();
assert_eq!(snap_entries.len(), 50);
let mt2 = Arc::clone(&mt);
let writer = thread::spawn(move || {
for i in 0..200 {
let key = format!("post_{:04}", i);
let val = format!("val_{:04}", i);
mt2.insert(key.as_bytes(), val.as_bytes());
}
});
let mut count = 0;
for entry in snap.iter() {
assert!(
entry.key.starts_with(b"pre_"),
"snapshot should not see post_ keys, got {:?}",
std::str::from_utf8(entry.key).unwrap_or("<non-utf8>")
);
count += 1;
}
assert_eq!(
count, 50,
"snapshot must remain consistent during concurrent writes"
);
writer.join().unwrap();
let live_count = mt.iter().count();
assert!(
live_count >= 250,
"live list should have >= 250 entries, got {}",
live_count
);
}