#![cfg(not(miri))]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use fastskip::ConcurrentSkipList;
#[test]
fn test_16_writers_5k_each() {
let sl = Arc::new(ConcurrentSkipList::with_shards(32));
let mut handles = vec![];
for tid in 0..16 {
let sl: Arc<ConcurrentSkipList> = Arc::clone(&sl);
handles.push(thread::spawn(move || {
for i in 0..5000 {
let key = format!("t{:02}_k{:06}", tid, i);
let val = format!("v{:02}_{:06}", tid, i);
sl.insert(key.as_bytes(), val.as_bytes());
}
}));
}
for h in handles {
h.join().unwrap();
}
assert!(sl.len() >= 80_000);
for tid in 0..16 {
for i in 0..5000 {
let key = format!("t{:02}_k{:06}", tid, i);
let val = sl.get_live(key.as_bytes()).unwrap();
let expected = format!("v{:02}_{:06}", tid, i);
assert_eq!(val, expected.as_bytes());
}
}
}
#[test]
fn test_concurrent_insert_delete_read() {
let sl = Arc::new(ConcurrentSkipList::with_shards(8));
for i in 0..500 {
let key = format!("init_{:06}", i);
sl.insert(key.as_bytes(), b"val");
}
let running = Arc::new(AtomicBool::new(true));
let mut handles = vec![];
for tid in 0..4 {
let sl: Arc<ConcurrentSkipList> = Arc::clone(&sl);
handles.push(thread::spawn(move || {
for i in 0..2000 {
let key = format!("w{}_{:06}", tid, i);
sl.insert(key.as_bytes(), b"val");
}
}));
}
for tid in 0..2 {
let sl: Arc<ConcurrentSkipList> = Arc::clone(&sl);
handles.push(thread::spawn(move || {
for i in 0..500 {
let key = format!("w{}_{:06}", tid, i);
sl.delete(key.as_bytes());
}
}));
}
let running_r = Arc::clone(&running);
let sl_r: Arc<ConcurrentSkipList> = Arc::clone(&sl);
handles.push(thread::spawn(move || {
while running_r.load(Ordering::Relaxed) {
for i in (0..500).step_by(10) {
let key = format!("init_{:06}", i);
let _ = sl_r.get(key.as_bytes());
}
}
}));
for h in handles.into_iter().take(6) {
h.join().unwrap();
}
running.store(false, Ordering::Relaxed);
}
#[test]
fn test_edge_empty_key() {
let sl = ConcurrentSkipList::new();
assert!(sl.insert(b"", b"empty_key"));
match sl.get(b"") {
Some((v, false)) => assert_eq!(v, b"empty_key"),
other => panic!(
"expected value for empty key, got {:?}",
other.map(|(v, _)| v.len())
),
}
}
#[test]
fn test_edge_empty_value() {
let sl = ConcurrentSkipList::new();
assert!(sl.insert(b"key", b""));
match sl.get(b"key") {
Some((v, false)) => assert_eq!(v, b""),
other => panic!("expected empty value, got {:?}", other),
}
}
#[test]
fn test_edge_large_key_value() {
let sl = ConcurrentSkipList::new();
let key = vec![0xABu8; 4096];
let val = vec![0xCDu8; 8192];
assert!(sl.insert(&key, &val));
match sl.get(&key) {
Some((v, false)) => assert_eq!(v, val.as_slice()),
_other => panic!("expected large value"),
}
}
#[test]
fn test_edge_single_element() {
let sl = ConcurrentSkipList::new();
assert!(sl.insert(b"only", b"one"));
assert_eq!(sl.len(), 1);
assert_eq!(sl.get_live(b"only"), Some(b"one".as_slice()));
assert_eq!(sl.get_live(b"nope"), None);
let entries: Vec<_> = sl.iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].key, b"only");
}
#[test]
fn test_edge_reverse_order_insert() {
let sl = ConcurrentSkipList::new();
for i in (0..100).rev() {
let key = format!("k{:04}", i);
let val = format!("v{:04}", i);
sl.insert(key.as_bytes(), val.as_bytes());
}
let snap = sl.snapshot();
let entries: Vec<_> = snap.iter().collect();
assert_eq!(entries.len(), 100);
for (i, entry) in entries.iter().enumerate() {
let expected = format!("k{:04}", i);
assert_eq!(entry.key, expected.as_bytes());
}
}
#[test]
fn test_concurrent_same_key_insert() {
let sl = Arc::new(ConcurrentSkipList::with_shards(16));
let mut handles = vec![];
for _ in 0..8 {
let sl: Arc<ConcurrentSkipList> = Arc::clone(&sl);
handles.push(thread::spawn(move || {
sl.insert(b"same_key", b"value");
}));
}
for h in handles {
h.join().unwrap();
}
match sl.get(b"same_key") {
Some((v, false)) => assert_eq!(v, b"value"),
other => panic!("expected value, got {:?}", other),
}
}
#[test]
fn test_delete_then_reinsert_different_thread() {
let sl = Arc::new(ConcurrentSkipList::new());
sl.insert(b"key", b"v1");
let sl2: Arc<ConcurrentSkipList> = Arc::clone(&sl);
let h = thread::spawn(move || {
sl2.delete(b"key");
});
h.join().unwrap();
match sl.get(b"key") {
Some((_, true)) => {}
_other => panic!("expected tombstone"),
}
assert_eq!(sl.get_live(b"key"), None);
}
#[test]
fn test_seal_full_lifecycle_with_concurrent_writers() {
let sl = Arc::new(ConcurrentSkipList::with_shards(8));
let mut handles = vec![];
for tid in 0..4 {
let sl = Arc::clone(&sl);
handles.push(thread::spawn(move || {
for i in 0..1000 {
let key = format!("t{}_k{:04}", tid, i);
let val = format!("v{}_{}", tid, i);
sl.insert(key.as_bytes(), val.as_bytes());
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(sl.len(), 4000);
let sl = Arc::try_unwrap(sl).unwrap();
let (frozen, fresh) = sl.seal().unwrap();
assert_eq!(frozen.len(), 4000);
assert!(fresh.is_empty());
let frozen = Arc::new(frozen);
let mut readers = vec![];
for _ in 0..4 {
let frozen = Arc::clone(&frozen);
readers.push(thread::spawn(move || {
let mut count = 0;
for entry in frozen.iter() {
if !entry.is_tombstone {
count += 1;
}
}
count
}));
}
let mut total = 0;
for r in readers {
total += r.join().unwrap();
}
assert_eq!(total, 4000 * 4);
let fresh = Arc::new(fresh);
let mut writers = vec![];
for tid in 0..4 {
let fresh = Arc::clone(&fresh);
writers.push(thread::spawn(move || {
for i in 0..500 {
let key = format!("new_t{}_k{:04}", tid, i);
fresh.insert(key.as_bytes(), b"val");
}
}));
}
for w in writers {
w.join().unwrap();
}
assert_eq!(fresh.len(), 2000);
let mem_before = frozen.memory_usage();
assert!(mem_before > 0);
std::mem::drop(frozen);
assert!(fresh.memory_usage() > 0);
}
#[test]
fn test_cursor_concurrent_reads() {
let sl = Arc::new(ConcurrentSkipList::new());
for i in 0..100 {
let key = format!("k{:04}", i);
let val = format!("v{:04}", i);
sl.insert(key.as_bytes(), val.as_bytes());
}
let mut handles = vec![];
for _ in 0..4 {
let sl = Arc::clone(&sl);
handles.push(thread::spawn(move || {
let cursor = sl.cursor_at(b"k0050").unwrap();
let keys: Vec<_> = cursor.map(|e| e.key.to_vec()).collect();
assert_eq!(keys.len(), 50); assert_eq!(keys[0], b"k0050");
assert_eq!(keys[49], b"k0099");
}));
}
for h in handles {
h.join().unwrap();
}
}
#[test]
fn test_snapshot_under_concurrent_writes() {
let sl = Arc::new(ConcurrentSkipList::with_shards(8));
for i in 0..100 {
let key = format!("key_{:04}", i);
sl.insert(key.as_bytes(), b"val");
}
let snap = sl.snapshot();
let mut handles = vec![];
for tid in 0..4 {
let sl = Arc::clone(&sl);
handles.push(thread::spawn(move || {
for i in 0..500 {
let key = format!("new_t{}_{:04}", tid, i);
sl.insert(key.as_bytes(), b"val");
}
}));
}
let snap_entries: Vec<_> = snap.iter().map(|e| e.key.to_vec()).collect();
assert_eq!(snap_entries.len(), 100);
for key in &snap_entries {
let s = std::str::from_utf8(key).unwrap();
assert!(s.starts_with("key_"), "snapshot leaked: {}", s);
}
for h in handles {
h.join().unwrap();
}
assert_eq!(sl.len(), 2100); }