use proptest::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
use tokio::task;
use zipora::error::{Result, ZiporaError};
use zipora::fsa::{
ZiporaTrie, ZiporaTrieConfig, TrieStrategy, StorageStrategy, CompressionStrategy, RankSelectType,
FiniteStateAutomaton, PrefixIterable, StatisticsProvider, Trie,
};
use zipora::succinct::RankSelectInterleaved256;
use zipora::memory::{SecureMemoryPool, SecurePoolConfig};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConcurrencyLevel {
NoWriteReadOnly,
SingleThreadStrict,
SingleThreadShared,
OneWriteMultiRead,
MultiWriteMultiRead,
}
fn create_compressed_sparse_trie(level: ConcurrencyLevel) -> Result<ZiporaTrie<RankSelectInterleaved256>> {
let config = ZiporaTrieConfig {
trie_strategy: TrieStrategy::CompressedSparse {
sparse_threshold: 0.3,
compression_level: 6,
adaptive_sparse: true,
},
storage_strategy: StorageStrategy::Standard {
initial_capacity: 256,
growth_factor: 1.5,
},
compression_strategy: CompressionStrategy::PathCompression {
min_path_length: 2,
max_path_length: 64,
adaptive_threshold: true,
},
rank_select_type: RankSelectType::Interleaved256,
enable_simd: true,
enable_concurrency: matches!(level, ConcurrencyLevel::OneWriteMultiRead | ConcurrencyLevel::MultiWriteMultiRead),
cache_optimization: true,
};
Ok(ZiporaTrie::with_config(config))
}
fn create_compressed_sparse_trie_with_pool(level: ConcurrencyLevel, _pool: Arc<SecureMemoryPool>) -> Result<ZiporaTrie<RankSelectInterleaved256>> {
create_compressed_sparse_trie(level)
}
type ReaderToken = (); type WriterToken = ();
fn generate_test_keys() -> Vec<Vec<u8>> {
vec![
b"hello".to_vec(),
b"world".to_vec(),
b"compressed".to_vec(),
b"sparse".to_vec(),
b"trie".to_vec(),
b"patricia".to_vec(),
b"concurrency".to_vec(),
b"lock_free".to_vec(),
b"performance".to_vec(),
b"memory_safe".to_vec(),
]
}
fn generate_prefix_keys() -> Vec<Vec<u8>> {
vec![
b"app".to_vec(),
b"apple".to_vec(),
b"application".to_vec(),
b"apply".to_vec(),
b"compress".to_vec(),
b"compressed".to_vec(),
b"compression".to_vec(),
b"test".to_vec(),
b"testing".to_vec(),
b"tester".to_vec(),
]
}
fn generate_sequential_keys(count: usize) -> Vec<Vec<u8>> {
(0..count)
.map(|i| format!("key_{:06}", i).into_bytes())
.collect()
}
fn generate_unicode_keys() -> Vec<Vec<u8>> {
vec![
"hello".as_bytes().to_vec(),
"world".as_bytes().to_vec(),
"世界".as_bytes().to_vec(),
"🌍".as_bytes().to_vec(),
"café".as_bytes().to_vec(),
"naïve".as_bytes().to_vec(),
"résumé".as_bytes().to_vec(),
"москва".as_bytes().to_vec(),
"東京".as_bytes().to_vec(),
"🚀🌟".as_bytes().to_vec(),
]
}
fn generate_compressed_test_keys() -> Vec<Vec<u8>> {
vec![
b"abcdefghijklmnop".to_vec(),
b"abcdefghijklmnopqrstuv".to_vec(),
b"abcdefghijklmnopqrstuvwxyz".to_vec(),
b"prefix_shared_long_path_1".to_vec(),
b"prefix_shared_long_path_2".to_vec(),
b"prefix_shared_long_path_3".to_vec(),
b"another_completely_different_path".to_vec(),
]
}
fn generate_collision_keys() -> Vec<Vec<u8>> {
let mut keys = Vec::new();
for i in 0..20 {
let base = format!("collision_test_prefix_{:03}", i);
keys.push(base.clone().into_bytes());
keys.push(format!("{}_variant_a", base).into_bytes());
keys.push(format!("{}_variant_b", base).into_bytes());
}
keys
}
#[test]
fn test_basic_creation_and_configuration() {
let levels = vec![
ConcurrencyLevel::NoWriteReadOnly,
ConcurrencyLevel::SingleThreadStrict,
ConcurrencyLevel::SingleThreadShared,
ConcurrencyLevel::OneWriteMultiRead,
ConcurrencyLevel::MultiWriteMultiRead,
];
for level in levels {
let result = create_compressed_sparse_trie(level);
assert!(
result.is_ok(),
"Failed to create trie with level {:?}",
level
);
let trie = result.unwrap();
assert_eq!(trie.len(), 0);
assert!(trie.is_empty());
}
}
#[test]
fn test_custom_memory_pool_creation() {
let pool_config = SecurePoolConfig::new(8192, 64, 8);
let pool = SecureMemoryPool::new(pool_config).expect("Failed to create pool");
let result = create_compressed_sparse_trie_with_pool(ConcurrencyLevel::SingleThreadStrict, pool);
assert!(result.is_ok());
let trie = result.unwrap();
assert_eq!(trie.len(), 0);
}
#[tokio::test]
async fn test_token_based_operations() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::OneWriteMultiRead).unwrap();
let keys = generate_test_keys();
for key in &keys {
let result = trie.insert(key);
assert!(result.is_ok(), "Failed to insert key: {:?}", key);
}
assert_eq!(trie.len(), keys.len());
for key in &keys {
assert!(trie.contains(key), "Failed to find key: {:?}", key);
assert!(trie.lookup(key).is_some(), "Failed to lookup key: {:?}", key);
}
let trie_clone = Arc::new(trie);
let mut handles = vec![];
for i in 0..4 {
let trie_ref = Arc::clone(&trie_clone);
let keys_clone = keys.clone();
let handle = tokio::spawn(async move {
for key in &keys_clone {
assert!(trie_ref.contains(key), "Concurrent read failed for key: {:?}", key);
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
#[test]
fn test_path_compression_efficiency() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let compressed_keys = generate_compressed_test_keys();
for key in &compressed_keys {
trie.insert(key).unwrap();
}
for key in &compressed_keys {
assert!(trie.contains(key));
}
let stats = trie.stats();
assert!(stats.memory_usage > 0);
let total_chars: usize = compressed_keys.iter().map(|k| k.len()).sum();
assert!(
stats.num_states < total_chars,
"Path compression should reduce state count below character count"
);
}
#[tokio::test]
async fn test_concurrent_reader_operations() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::OneWriteMultiRead).unwrap();
let keys = generate_test_keys();
for key in &keys {
trie.insert(key).unwrap();
}
let trie = Arc::new(trie);
let mut handles = Vec::new();
for task_id in 0..10 {
let trie_clone = Arc::clone(&trie);
let keys_clone = keys.clone();
let handle = task::spawn(async move {
for (i, key) in keys_clone.iter().enumerate() {
if i % 10 == task_id {
assert!(trie_clone.contains(key));
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_concurrent_writer_operations() {
let trie = create_compressed_sparse_trie(ConcurrencyLevel::MultiWriteMultiRead).unwrap();
let trie = Arc::new(tokio::sync::Mutex::new(trie));
let mut handles = Vec::new();
for task_id in 0..5 {
let trie_clone = Arc::clone(&trie);
let handle = task::spawn(async move {
let mut trie_guard = trie_clone.lock().await;
for i in 0..20 {
let key = format!("concurrent_key_{}_{:03}", task_id, i);
let result = trie_guard.insert(key.as_bytes());
assert!(result.is_ok());
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let trie_guard = trie.lock().await;
assert_eq!(trie_guard.len(), 5 * 20); }
#[tokio::test]
async fn test_token_validation() {
let mut trie1 = create_compressed_sparse_trie(ConcurrencyLevel::OneWriteMultiRead).unwrap();
let mut trie2 = create_compressed_sparse_trie(ConcurrencyLevel::OneWriteMultiRead).unwrap();
let result = trie1.insert(b"test1");
assert!(result.is_ok());
let result = trie2.insert(b"test2");
assert!(result.is_ok());
assert!(trie1.contains(b"test1"));
assert!(!trie1.contains(b"test2"));
assert!(trie2.contains(b"test2"));
assert!(!trie2.contains(b"test1"));
}
#[test]
fn test_empty_key_handling() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let result = trie.insert(b"");
assert!(result.is_ok());
assert_eq!(trie.len(), 1);
assert!(trie.contains(b""));
let result = trie.insert(b"");
assert!(result.is_ok());
assert_eq!(trie.len(), 1);
}
#[test]
fn test_very_long_keys() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let long_key1 = vec![42u8; 10000];
let long_key2 = vec![84u8; 5000];
let long_key3 = vec![126u8; 15000];
trie.insert(&long_key1).unwrap();
trie.insert(&long_key2).unwrap();
trie.insert(&long_key3).unwrap();
assert_eq!(trie.len(), 3);
assert!(trie.contains(&long_key1));
assert!(trie.contains(&long_key2));
assert!(trie.contains(&long_key3));
}
#[test]
fn test_unicode_key_support() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let unicode_keys = generate_unicode_keys();
for key in &unicode_keys {
trie.insert(key).unwrap();
}
assert_eq!(trie.len(), unicode_keys.len());
for key in &unicode_keys {
assert!(trie.contains(key));
}
}
#[test]
fn test_duplicate_insertion_handling() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let key = b"duplicate_test";
for _ in 0..5 {
trie.insert(key).unwrap();
}
assert_eq!(trie.len(), 1);
assert!(trie.contains(key));
}
#[test]
fn test_large_dataset_performance() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let keys = generate_sequential_keys(10000);
let start = Instant::now();
for key in &keys {
trie.insert(key).unwrap();
}
let insert_duration = start.elapsed();
assert_eq!(trie.len(), keys.len());
let start = Instant::now();
for key in &keys {
assert!(trie.contains(key));
}
let lookup_duration = start.elapsed();
println!(
"CSP Trie Performance - Insert: {:?}, Lookup: {:?}",
insert_duration, lookup_duration
);
assert!(
insert_duration.as_secs() < 30,
"Insert time too slow: {:?}",
insert_duration
);
assert!(
lookup_duration.as_secs() < 10,
"Lookup time too slow: {:?}",
lookup_duration
);
}
#[test]
fn test_memory_efficiency() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let collision_keys = generate_collision_keys();
for key in &collision_keys {
trie.insert(key).unwrap();
}
let stats = trie.stats();
assert!(stats.memory_usage > 0);
assert!(stats.bits_per_key > 0.0);
let total_key_bytes: usize = collision_keys.iter().map(|k| k.len()).sum();
let compression_ratio = stats.memory_usage as f64 / total_key_bytes as f64;
println!(
"Memory efficiency - Compression ratio: {:.2}",
compression_ratio
);
assert!(compression_ratio < 5.0, "Memory usage seems too high");
}
#[tokio::test]
async fn test_concurrent_stress() {
let trie = Arc::new(tokio::sync::Mutex::new(
create_compressed_sparse_trie(ConcurrencyLevel::MultiWriteMultiRead).unwrap(),
));
let mut handles = Vec::new();
let num_tasks = 20;
let keys_per_task = 100;
for task_id in 0..num_tasks {
let trie_clone = Arc::clone(&trie);
let handle = task::spawn(async move {
let mut trie_guard = trie_clone.lock().await;
if task_id % 2 == 0 {
for i in 0..keys_per_task {
let key = format!("stress_{}_{:04}", task_id, i);
trie_guard
.insert(key.as_bytes())
.unwrap();
}
} else {
tokio::time::sleep(Duration::from_millis(10)).await;
for i in 0..keys_per_task / 2 {
let key = format!("stress_{}_{:04}", task_id - 1, i);
let _ = trie_guard.contains(key.as_bytes());
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let trie_guard = trie.lock().await;
let writer_tasks = num_tasks / 2;
assert_eq!(trie_guard.len(), writer_tasks * keys_per_task);
}
#[test]
fn test_fsa_interface_compliance() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let keys = generate_prefix_keys();
for key in &keys {
trie.insert(key).unwrap();
}
let root = trie.root();
assert_eq!(root, 0);
for key in &keys {
assert!(trie.contains(key));
}
assert!(!trie.contains(b"nonexistent"));
}
#[test]
fn test_longest_prefix_functionality() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let keys = vec![
b"test".as_slice(),
b"testing".as_slice(),
b"tester".as_slice(),
];
for key in &keys {
trie.insert(key).unwrap();
}
for key in &keys {
assert!(trie.contains(key));
}
}
#[test]
fn test_prefix_iteration() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let keys = generate_prefix_keys();
for key in &keys {
trie.insert(key).unwrap();
}
assert!(trie.contains(b"app"));
assert!(trie.contains(b"apple"));
assert!(trie.contains(b"application"));
assert!(trie.contains(b"test"));
assert!(trie.contains(b"testing"));
assert!(trie.contains(b"tester"));
}
#[test]
fn test_complete_iteration() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let keys = generate_test_keys();
for key in &keys {
trie.insert(key).unwrap();
}
assert_eq!(trie.len(), keys.len());
for key in &keys {
assert!(trie.contains(key));
}
}
#[test]
fn test_state_inspection() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let keys = vec![b"hello".as_slice(), b"help".as_slice(), b"world".as_slice()];
for key in &keys {
trie.insert(key).unwrap();
}
let root = trie.root();
assert_eq!(root, 0);
for key in &keys {
assert!(trie.contains(key));
}
}
#[test]
fn test_statistics_accuracy() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let keys = generate_test_keys();
for key in &keys {
trie.insert(key).unwrap();
}
let stats = trie.stats();
assert_eq!(stats.num_keys, keys.len());
assert!(stats.num_states > 0);
assert!(stats.memory_usage > 0);
assert!(stats.bits_per_key > 0.0);
assert!(stats.bits_per_key < 100_000.0); }
proptest! {
#[test]
fn property_test_insert_lookup_consistency(
keys in prop::collection::vec(prop::collection::vec(any::<u8>(), 0..100), 0..200)
) {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict)
.unwrap();
let mut expected_keys = HashSet::new();
for key in &keys {
if trie.insert(key).is_ok() {
expected_keys.insert(key.clone());
}
}
for key in &expected_keys {
prop_assert!(trie.contains(key));
}
prop_assert_eq!(trie.len(), expected_keys.len());
}
#[test]
fn property_test_path_compression_correctness(
keys in prop::collection::vec(prop::collection::vec(any::<u8>(), 1..50), 1..100)
) {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict)
.unwrap();
for key in &keys {
let _ = trie.insert(key);
}
for key in &keys {
if !key.is_empty() {
let lookup_result = trie.lookup(key);
let contains_result = trie.contains(key);
prop_assert_eq!(lookup_result.is_some(), contains_result);
}
}
}
}
#[tokio::test]
async fn test_no_write_read_only_mode() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::NoWriteReadOnly).unwrap();
let result = trie.insert(b"test");
assert!(result.is_ok(), "Insert should work in NoWriteReadOnly mode");
assert!(trie.contains(b"test"), "Should be able to read inserted data");
assert_eq!(trie.len(), 1, "Length should be correct");
}
#[tokio::test]
async fn test_single_thread_strict_mode() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
trie.insert(b"test").unwrap();
assert!(trie.contains(b"test"));
}
#[tokio::test]
async fn test_one_write_multi_read_mode() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::OneWriteMultiRead).unwrap();
trie.insert(b"test").unwrap();
assert!(trie.contains(b"test"));
assert!(trie.contains(b"test"));
}
#[test]
fn test_integration_with_memory_pool() {
let pool_config = SecurePoolConfig::new(16384, 128, 16);
let pool = SecureMemoryPool::new(pool_config).unwrap();
let mut trie =
create_compressed_sparse_trie_with_pool(ConcurrencyLevel::SingleThreadStrict, pool).unwrap();
let keys = generate_test_keys();
for key in &keys {
trie.insert(key).unwrap();
}
for key in &keys {
assert!(trie.contains(key));
}
let stats = trie.stats();
assert_eq!(stats.num_keys, keys.len());
}
#[test]
fn test_comparison_with_standard_trie() {
let mut csp_trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let keys = generate_collision_keys();
for key in &keys {
csp_trie.insert(key).unwrap();
}
for key in &keys {
assert!(csp_trie.contains(key));
assert!(csp_trie.lookup(key).is_some());
}
assert!(!csp_trie.contains(b"definitely_not_there"));
assert!(csp_trie.lookup(b"definitely_not_there").is_none());
}
#[test]
fn test_error_recovery_and_consistency() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::SingleThreadStrict).unwrap();
let initial_keys = vec![b"stable1".as_slice(), b"stable2".as_slice()];
for key in &initial_keys {
trie.insert(key).unwrap();
}
let initial_len = trie.len();
let very_long_key = vec![255u8; 100000];
let result = trie.insert(&very_long_key);
if result.is_ok() {
assert!(trie.contains(&very_long_key));
assert_eq!(trie.len(), initial_len + 1);
} else {
assert_eq!(trie.len(), initial_len);
}
for key in &initial_keys {
assert!(trie.contains(key));
}
}
#[tokio::test]
async fn test_token_lifecycle_management() {
let mut trie = create_compressed_sparse_trie(ConcurrencyLevel::OneWriteMultiRead).unwrap();
for i in 0..10 {
let key = format!("lifecycle_test_{:02}", i);
trie.insert(key.as_bytes())
.unwrap();
assert!(trie.contains(key.as_bytes()));
}
assert_eq!(trie.len(), 10);
}