#![cfg(feature = "object-store")]
use object_store::memory::InMemory;
use seerdb::storage::{ObjectStoreBackend, RetryConfig, Storage};
use std::path::Path;
use std::sync::Arc;
#[test]
fn test_high_concurrency_writes() {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let backend = Arc::new(rt.block_on(async { ObjectStoreBackend::new(store, String::new()) }));
let _guard = rt.enter();
let mut handles = vec![];
for i in 0..100 {
let backend = backend.clone();
let handle = std::thread::spawn(move || {
let path_str = format!("concurrent_{}.sst", i);
let path = Path::new(&path_str);
let data = format!("data_{}", i);
backend.write_sstable(path, data.as_bytes())
});
handles.push(handle);
}
let mut successes = 0;
for handle in handles {
if handle.join().unwrap().is_ok() {
successes += 1;
}
}
println!("High concurrency writes: {}/100 succeeded", successes);
assert_eq!(successes, 100, "all concurrent writes should succeed");
let mut read_successes = 0;
for i in 0..100 {
let path_str = format!("concurrent_{}.sst", i);
let path = Path::new(&path_str);
if backend.exists(path).unwrap_or(false) {
read_successes += 1;
}
}
assert_eq!(read_successes, 100, "all written files should exist");
}
#[test]
fn test_high_concurrency_reads() {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let backend = Arc::new(rt.block_on(async { ObjectStoreBackend::new(store, String::new()) }));
let _guard = rt.enter();
for i in 0..10 {
let path_str = format!("shared_{}.sst", i);
let path = Path::new(&path_str);
let data = format!("shared_data_{}", i);
backend.write_sstable(path, data.as_bytes()).unwrap();
}
let mut handles = vec![];
for i in 0..200 {
let backend = backend.clone();
let handle = std::thread::spawn(move || {
let file_idx = i % 10;
let path_str = format!("shared_{}.sst", file_idx);
let path = Path::new(&path_str);
let expected = format!("shared_data_{}", file_idx);
match backend.read_block(path, 0, expected.len() as u32) {
Ok(data) => data == expected.as_bytes(),
Err(_) => false,
}
});
handles.push(handle);
}
let mut successes = 0;
for handle in handles {
if handle.join().unwrap() {
successes += 1;
}
}
println!("High concurrency reads: {}/200 succeeded", successes);
assert_eq!(successes, 200, "all concurrent reads should succeed");
}
#[test]
fn test_mixed_workload() {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let backend = Arc::new(rt.block_on(async { ObjectStoreBackend::new(store, String::new()) }));
let _guard = rt.enter();
for i in 0..20 {
let path_str = format!("mixed_{}.sst", i);
let path = Path::new(&path_str);
let data = format!("initial_data_{}", i);
backend.write_sstable(path, data.as_bytes()).unwrap();
}
let mut handles = vec![];
for i in 0..50 {
let backend = backend.clone();
let handle = std::thread::spawn(move || {
let file_idx = i % 20;
let path_str = format!("mixed_{}.sst", file_idx);
let path = Path::new(&path_str);
backend.exists(path).is_ok()
});
handles.push(handle);
}
for i in 100..130 {
let backend = backend.clone();
let handle = std::thread::spawn(move || {
let path_str = format!("mixed_{}.sst", i);
let path = Path::new(&path_str);
let data = format!("new_data_{}", i);
backend.write_sstable(path, data.as_bytes()).is_ok()
});
handles.push(handle);
}
for i in 0..20 {
let backend = backend.clone();
let handle = std::thread::spawn(move || {
let path_str = format!("mixed_{}.sst", i);
let path = Path::new(&path_str);
backend.delete_sstable(path).is_ok()
});
handles.push(handle);
}
let mut successes = 0;
for handle in handles {
if handle.join().unwrap() {
successes += 1;
}
}
println!("Mixed workload: {}/100 operations succeeded", successes);
assert!(
successes >= 90,
"at least 90% of mixed operations should succeed"
);
}
#[test]
fn test_retry_config_presets() {
let default = RetryConfig::default();
assert_eq!(default.max_attempts, 3);
assert_eq!(default.base_delay_ms, 100);
assert_eq!(default.max_delay_ms, 5000);
assert!(default.jitter);
let none = RetryConfig::none();
assert_eq!(none.max_attempts, 0);
let aggressive = RetryConfig::aggressive();
assert_eq!(aggressive.max_attempts, 5);
assert_eq!(aggressive.base_delay_ms, 50);
assert_eq!(aggressive.max_delay_ms, 10000);
}
#[test]
fn test_custom_retry_config() {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let custom_config = RetryConfig {
max_attempts: 10,
base_delay_ms: 50,
max_delay_ms: 2000,
jitter: false,
};
let backend = rt.block_on(async {
ObjectStoreBackend::with_retry_config(store, String::new(), custom_config)
});
let _guard = rt.enter();
let path = Path::new("custom_config.sst");
let data = b"test data";
backend.write_sstable(path, data).unwrap();
assert!(backend.exists(path).unwrap());
let read_data = backend.read_block(path, 0, data.len() as u32).unwrap();
assert_eq!(&read_data, data);
}
#[test]
fn test_list_operations_stress() {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let backend = Arc::new(rt.block_on(async { ObjectStoreBackend::new(store, String::new()) }));
let _guard = rt.enter();
for i in 0..50 {
let path_str = format!("list_test_{}.sst", i);
let path = Path::new(&path_str);
let data = format!("data_{}", i);
backend.write_sstable(path, data.as_bytes()).unwrap();
}
let mut handles = vec![];
for _ in 0..20 {
let backend = backend.clone();
let handle = std::thread::spawn(move || {
let list_result = backend.list_sstables(Path::new(""));
match list_result {
Ok(files) => files.len() >= 50, Err(_) => false,
}
});
handles.push(handle);
}
let mut successes = 0;
for handle in handles {
if handle.join().unwrap() {
successes += 1;
}
}
println!("List stress test: {}/20 succeeded", successes);
assert_eq!(successes, 20, "all list operations should succeed");
}
#[test]
fn test_nonexistent_file_operations() {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let backend = rt.block_on(async { ObjectStoreBackend::new(store, String::new()) });
let _guard = rt.enter();
let path = Path::new("nonexistent.sst");
assert_eq!(backend.exists(path).unwrap(), false);
assert!(backend.read_block(path, 0, 100).is_err());
}
#[test]
#[ignore] fn bench_write_throughput() {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let backend = rt.block_on(async { ObjectStoreBackend::new(store, String::new()) });
let _guard = rt.enter();
let start = std::time::Instant::now();
let count = 1000;
for i in 0..count {
let path_str = format!("bench_{}.sst", i);
let path = Path::new(&path_str);
let data = vec![0u8; 4096]; backend.write_sstable(path, &data).unwrap();
}
let elapsed = start.elapsed();
let ops_per_sec = count as f64 / elapsed.as_secs_f64();
println!(
"Write throughput: {:.0} ops/sec ({} files in {:?})",
ops_per_sec, count, elapsed
);
}