#![cfg(feature = "persistent-artrie")]
use libdictenstein::persistent_artrie::{PersistentARTrie, SharedARTrie};
use libdictenstein::persistent_artrie_core::durability::DurabilityPolicy;
use libdictenstein::persistent_artrie_core::shared_access::SharedTrieAccess;
use libdictenstein::Dictionary;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
use tempfile::TempDir;
const NUM_READERS: usize = 8;
const OPS_PER_THREAD: usize = 100;
fn generate_terms(count: usize, prefix: &str) -> Vec<String> {
(0..count).map(|i| format!("{}{:05}", prefix, i)).collect()
}
fn make_shared<V: libdictenstein::DictionaryValue>(trie: PersistentARTrie<V>) -> SharedARTrie<V> {
Arc::new(trie)
}
#[test]
fn test_concurrent_readers() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("concurrent_readers.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
let terms: Vec<String> = generate_terms(100, "term");
for (i, term) in terms.iter().enumerate() {
let _ = dict.insert_with_value(term, i as i32);
}
dict.sync().expect("sync");
let shared_dict = make_shared(dict);
let barrier = Arc::new(Barrier::new(NUM_READERS));
let terms_arc = Arc::new(terms);
let success_count = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..NUM_READERS)
.map(|_| {
let dict_clone = Arc::clone(&shared_dict);
let barrier_clone = barrier.clone();
let terms_clone = terms_arc.clone();
let success = success_count.clone();
thread::spawn(move || {
barrier_clone.wait();
let mut local_success = 0;
let dict_guard = dict_clone.read();
for term in terms_clone.iter() {
if dict_guard.contains(term) {
local_success += 1;
}
}
success.fetch_add(local_success, Ordering::SeqCst);
})
})
.collect();
for handle in handles {
handle.join().expect("thread join");
}
let total = success_count.load(Ordering::SeqCst);
assert_eq!(
total,
NUM_READERS * 100,
"All readers should find all terms"
);
}
#[test]
fn test_single_writer_multiple_readers() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("writer_readers.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
let initial_terms: Vec<String> = generate_terms(50, "init");
for (i, term) in initial_terms.iter().enumerate() {
let _ = dict.insert_with_value(term, i as i32);
}
dict.sync().expect("sync");
let shared_dict = make_shared(dict);
let stop_flag = Arc::new(AtomicBool::new(false));
let read_count = Arc::new(AtomicUsize::new(0));
let terms_arc = Arc::new(initial_terms.clone());
let reader_handles: Vec<_> = (0..NUM_READERS)
.map(|_| {
let dict_clone = Arc::clone(&shared_dict);
let stop = stop_flag.clone();
let count = read_count.clone();
let terms = terms_arc.clone();
thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let dict_guard = dict_clone.read();
for term in terms.iter() {
let _ = dict_guard.contains(term);
count.fetch_add(1, Ordering::Relaxed);
}
drop(dict_guard);
thread::yield_now();
}
})
})
.collect();
let new_terms: Vec<String> = generate_terms(50, "new_");
let writer_dict = Arc::clone(&shared_dict);
let writer_handle = thread::spawn(move || {
for (i, term) in new_terms.iter().enumerate() {
let dict_guard = writer_dict.write();
let _ = dict_guard.insert_with_value(term, (i + 1000) as i32);
drop(dict_guard);
thread::sleep(Duration::from_micros(100));
}
});
thread::sleep(Duration::from_millis(100));
stop_flag.store(true, Ordering::SeqCst);
writer_handle.join().expect("writer join");
for handle in reader_handles {
handle.join().expect("reader join");
}
let reads = read_count.load(Ordering::SeqCst);
assert!(reads > 0, "Readers should have performed reads");
let dict_guard = shared_dict.read();
for term in initial_terms.iter() {
assert!(
dict_guard.contains(term),
"Initial term should exist: {}",
term
);
}
}
#[test]
fn test_reader_during_checkpoint() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("checkpoint_readers.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
let terms: Vec<String> = generate_terms(100, "chkp");
for (i, term) in terms.iter().enumerate() {
let _ = dict.insert_with_value(term, i as i32);
}
dict.sync().expect("sync");
let shared_dict = make_shared(dict);
let stop_flag = Arc::new(AtomicBool::new(false));
let read_errors = Arc::new(AtomicUsize::new(0));
let terms_arc = Arc::new(terms.clone());
let reader_handles: Vec<_> = (0..NUM_READERS)
.map(|_| {
let dict_clone = Arc::clone(&shared_dict);
let stop = stop_flag.clone();
let errors = read_errors.clone();
let terms = terms_arc.clone();
thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let dict_guard = dict_clone.read();
for term in terms.iter() {
if !dict_guard.contains(term) {
errors.fetch_add(1, Ordering::Relaxed);
}
}
}
})
})
.collect();
for _ in 0..3 {
let dict_guard = shared_dict.write();
dict_guard.checkpoint().expect("checkpoint");
drop(dict_guard);
thread::sleep(Duration::from_millis(10));
}
stop_flag.store(true, Ordering::SeqCst);
for handle in reader_handles {
handle.join().expect("reader join");
}
let errors = read_errors.load(Ordering::SeqCst);
assert_eq!(errors, 0, "No read errors should occur during checkpoints");
}
#[test]
fn test_concurrent_value_lookups() {
use libdictenstein::MappedDictionary;
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("value_lookups.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
let terms: Vec<(String, i32)> = (0..100)
.map(|i| (format!("value{:05}", i), i * 10))
.collect();
for (term, value) in &terms {
let _ = dict.insert_with_value(term, *value);
}
dict.sync().expect("sync");
let shared_dict = make_shared(dict);
let barrier = Arc::new(Barrier::new(NUM_READERS));
let terms_arc = Arc::new(terms);
let value_mismatches = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..NUM_READERS)
.map(|_| {
let dict_clone = Arc::clone(&shared_dict);
let barrier_clone = barrier.clone();
let terms_clone = terms_arc.clone();
let mismatches = value_mismatches.clone();
thread::spawn(move || {
barrier_clone.wait();
let dict_guard = dict_clone.read();
for (term, expected) in terms_clone.iter() {
if let Some(actual) = dict_guard.get_value(term) {
if actual != *expected {
mismatches.fetch_add(1, Ordering::Relaxed);
}
} else {
mismatches.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread join");
}
let mismatches = value_mismatches.load(Ordering::SeqCst);
assert_eq!(mismatches, 0, "All values should match expected");
}
#[test]
fn test_writer_contention() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("writer_contention.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
let shared_dict = make_shared(dict);
let barrier = Arc::new(Barrier::new(4));
let successful_inserts = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..4)
.map(|thread_id| {
let dict_clone = Arc::clone(&shared_dict);
let barrier_clone = barrier.clone();
let inserts = successful_inserts.clone();
thread::spawn(move || {
barrier_clone.wait();
let prefix = format!("t{}_", thread_id);
for i in 0..25 {
let term = format!("{}{:03}", prefix, i);
let dict_guard = dict_clone.write();
if dict_guard.insert_with_value(&term, (thread_id * 100 + i) as i32) {
inserts.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread join");
}
let total = successful_inserts.load(Ordering::SeqCst);
assert_eq!(total, 100, "All inserts should succeed");
let dict_guard = shared_dict.read();
for thread_id in 0..4 {
for i in 0..25 {
let term = format!("t{}_{:03}", thread_id, i);
assert!(dict_guard.contains(&term), "Term should exist: {}", term);
}
}
}
#[test]
fn test_read_write_interleaving() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("interleaving.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
let initial_terms: Vec<String> = generate_terms(50, "pre_");
for (i, term) in initial_terms.iter().enumerate() {
let _ = dict.insert_with_value(term, i as i32);
}
let shared_dict = make_shared(dict);
let operations = Arc::new(AtomicUsize::new(0));
let stop_flag = Arc::new(AtomicBool::new(false));
let handles: Vec<_> = (0..4)
.map(|thread_id| {
let dict_clone = Arc::clone(&shared_dict);
let ops = operations.clone();
let stop = stop_flag.clone();
let terms = initial_terms.clone();
thread::spawn(move || {
let mut local_ops = 0;
while !stop.load(Ordering::Relaxed) && local_ops < OPS_PER_THREAD {
if local_ops % 2 == 0 {
let dict_guard = dict_clone.read();
let term = &terms[local_ops % terms.len()];
let _ = dict_guard.contains(term);
} else {
let dict_guard = dict_clone.write();
let term = format!("new_t{}_{:04}", thread_id, local_ops);
let _ = dict_guard.insert_with_value(&term, local_ops as i32);
}
local_ops += 1;
ops.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
thread::sleep(Duration::from_millis(50));
stop_flag.store(true, Ordering::SeqCst);
for handle in handles {
handle.join().expect("thread join");
}
let total_ops = operations.load(Ordering::SeqCst);
assert!(total_ops > 0, "Operations should have been performed");
let dict_guard = shared_dict.read();
for term in &initial_terms {
assert!(
dict_guard.contains(term),
"Original term should exist: {}",
term
);
}
}
#[test]
fn test_many_short_lived_threads() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("short_lived.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
for i in 0..50 {
let term = format!("base{:03}", i);
let _ = dict.insert_with_value(&term, i);
}
dict.sync().expect("sync");
let shared_dict = make_shared(dict);
let success_count = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..50)
.map(|i| {
let dict_clone = Arc::clone(&shared_dict);
let success = success_count.clone();
thread::spawn(move || {
let dict_guard = dict_clone.read();
let term = format!("base{:03}", i % 50);
if dict_guard.contains(&term) {
success.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread join");
}
let successes = success_count.load(Ordering::SeqCst);
assert_eq!(successes, 50, "All lookups should succeed");
}
#[test]
fn test_concurrent_opens_same_path() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("same_path.part");
let _dict: PersistentARTrie<()> = PersistentARTrie::create(&dict_path).expect("create dict");
let result: Result<PersistentARTrie<()>, _> = PersistentARTrie::create(&dict_path);
assert!(
result.is_err(),
"Creating another dictionary at same path should fail"
);
}
#[test]
fn test_shared_artrie_shares_state() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("shared_state.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
let _ = dict.insert_with_value("hello", 42);
let shared_dict = make_shared(dict);
let dict_clone = Arc::clone(&shared_dict);
{
let dict_guard = dict_clone.read();
assert!(
dict_guard.contains("hello"),
"Clone should see original insert"
);
}
{
let dict_guard = dict_clone.write();
let _ = dict_guard.insert_with_value("world", 100);
}
{
let dict_guard = shared_dict.read();
assert!(
dict_guard.contains("world"),
"Original should see clone's insert"
);
}
}
#[test]
fn test_sync_from_multiple_threads() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("multi_sync.part");
let dict: PersistentARTrie<i32> = PersistentARTrie::create(&dict_path).expect("create dict");
for i in 0..50 {
let _ = dict.insert_with_value(&format!("sync{:03}", i), i);
}
let shared_dict = make_shared(dict);
let barrier = Arc::new(Barrier::new(4));
let sync_errors = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..4)
.map(|_| {
let dict_clone = Arc::clone(&shared_dict);
let barrier_clone = barrier.clone();
let errors = sync_errors.clone();
thread::spawn(move || {
barrier_clone.wait();
for _ in 0..5 {
let dict_guard = dict_clone.read();
if dict_guard.sync().is_err() {
errors.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread join");
}
let errors = sync_errors.load(Ordering::SeqCst);
assert_eq!(errors, 0, "All sync calls should succeed");
}
#[test]
#[ignore = "Transducer concurrent tests require transducer module integration"]
fn test_concurrent_transducer_queries() {
}
#[test]
fn test_lockfree_insert_cas_basic() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("lockfree_basic.part");
let mut dict: PersistentARTrie<()> = PersistentARTrie::create(&dict_path).expect("create dict");
dict.set_durability_policy(DurabilityPolicy::Immediate);
assert!(
dict.insert_cas_durable(b"hello").expect("durable insert"),
"First insert should succeed"
);
assert!(
!dict.insert_cas_durable(b"hello").expect("durable insert"),
"Duplicate insert should return false"
);
assert!(
dict.insert_cas_durable(b"world").expect("durable insert"),
"Second term should succeed"
);
assert!(
dict.insert_cas_durable(b"foo").expect("durable insert"),
"Third term should succeed"
);
assert!(dict.contains_lockfree(b"hello"), "Should find hello");
assert!(dict.contains_lockfree(b"world"), "Should find world");
assert!(dict.contains_lockfree(b"foo"), "Should find foo");
assert!(!dict.contains_lockfree(b"bar"), "Should not find bar");
}
#[test]
fn test_lockfree_insert_cas_concurrent() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("lockfree_concurrent.part");
let mut dict: PersistentARTrie<()> = PersistentARTrie::create(&dict_path).expect("create dict");
dict.set_durability_policy(DurabilityPolicy::Immediate);
let dict = Arc::new(dict);
let barrier = Arc::new(Barrier::new(NUM_READERS));
let insert_count = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..NUM_READERS)
.map(|thread_id| {
let dict_clone = Arc::clone(&dict);
let barrier_clone = barrier.clone();
let count = insert_count.clone();
thread::spawn(move || {
barrier_clone.wait();
for i in 0..OPS_PER_THREAD {
let term = format!("t{}_{:05}", thread_id, i);
if dict_clone
.insert_cas_durable(term.as_bytes())
.expect("durable insert")
{
count.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread join");
}
let total = insert_count.load(Ordering::SeqCst);
assert_eq!(
total,
NUM_READERS * OPS_PER_THREAD,
"All unique inserts should succeed"
);
for thread_id in 0..NUM_READERS {
for i in 0..OPS_PER_THREAD {
let term = format!("t{}_{:05}", thread_id, i);
assert!(
dict.contains_lockfree(term.as_bytes()),
"Term should be found: {}",
term
);
}
}
let retries = dict.cas_retry_count();
println!(
"CAS retries for {} inserts: {} ({:.2}%)",
NUM_READERS * OPS_PER_THREAD,
retries,
100.0 * retries as f64 / (NUM_READERS * OPS_PER_THREAD) as f64
);
}
#[test]
fn test_lockfree_insert_cas_same_terms() {
let temp_dir = TempDir::new().expect("create temp dir");
let dict_path = temp_dir.path().join("lockfree_same.part");
let mut dict: PersistentARTrie<()> = PersistentARTrie::create(&dict_path).expect("create dict");
dict.set_durability_policy(DurabilityPolicy::Immediate);
let dict = Arc::new(dict);
let barrier = Arc::new(Barrier::new(NUM_READERS));
let insert_success = Arc::new(AtomicUsize::new(0));
let terms: Vec<String> = (0..50).map(|i| format!("shared_{:03}", i)).collect();
let terms_arc = Arc::new(terms);
let handles: Vec<_> = (0..NUM_READERS)
.map(|_| {
let dict_clone = Arc::clone(&dict);
let barrier_clone = barrier.clone();
let count = insert_success.clone();
let terms = terms_arc.clone();
thread::spawn(move || {
barrier_clone.wait();
for term in terms.iter() {
if dict_clone
.insert_cas_durable(term.as_bytes())
.expect("durable insert")
{
count.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread join");
}
let total = insert_success.load(Ordering::SeqCst);
assert_eq!(
total, 50,
"Only 50 unique terms should be inserted, got {}",
total
);
for term in terms_arc.iter() {
assert!(
dict.contains_lockfree(term.as_bytes()),
"Term should be found: {}",
term
);
}
}