#![cfg(not(target_arch = "wasm32"))]
use transientdb;
use rand::Rng;
use serde_json::json;
use std::fs;
use std::io::Result;
use std::sync::Arc;
use std::thread;
use tempfile::TempDir;
use transientdb::TransientDB;
use transientdb::{DirectoryConfig, DirectoryStore};
use transientdb::{MemoryConfig, MemoryStore};
#[test]
fn test_memory_store_stress() -> Result<()> {
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
let config = MemoryConfig {
write_key: "test-key-mem".to_string(),
max_items: 10_000,
max_fetch_size: 1024 * 1024,
};
let store = MemoryStore::new(config);
let db = Arc::new(TransientDB::new(store));
let total_appends = Arc::new(AtomicU64::new(0));
let total_fetches = Arc::new(AtomicU64::new(0));
let total_resets = Arc::new(AtomicU64::new(0));
let errors_count = Arc::new(AtomicU64::new(0));
let mut handles = vec![];
let test_duration = Duration::from_secs(10);
let start_time = Instant::now();
for pattern in 0..6 {
let db = db.clone();
let total_appends = total_appends.clone();
let errors_count = errors_count.clone();
let start = start_time;
handles.push(thread::spawn(move || {
let mut rng = rand::thread_rng();
while start.elapsed() < test_duration {
let data = match pattern % 3 {
0 => json!({ "small": rng.random::<u64>() }),
1 => json!({
"medium": {
"data": rng.random::<u64>(),
"array": (0..10).collect::<Vec<_>>()
}
}),
_ => {
let mut obj = json!({});
for i in 0..10 {
obj[format!("field_{}", i)] = json!(rng.random::<u64>());
}
obj
}
};
match db.append(data) {
Ok(_) => {
total_appends.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
errors_count.fetch_add(1, Ordering::Relaxed);
}
}
}
}));
}
for _ in 0..4 {
let db = db.clone();
let total_fetches = total_fetches.clone();
let errors_count = errors_count.clone();
let start = start_time;
handles.push(thread::spawn(move || {
let mut rng = rand::thread_rng();
while start.elapsed() < test_duration {
match db.fetch(Some(rng.gen_range(1..500)), None) {
Ok(Some(_)) => {
total_fetches.fetch_add(1, Ordering::Relaxed);
}
Ok(None) => {}
Err(_) => {
errors_count.fetch_add(1, Ordering::Relaxed);
}
}
}
}));
}
for _ in 0..2 {
let db = db.clone();
let total_resets = total_resets.clone();
let total_appends = total_appends.clone();
let errors_count = errors_count.clone();
let start = start_time;
handles.push(thread::spawn(move || {
let mut rng = rand::thread_rng();
while start.elapsed() < test_duration {
match rng.gen_range(0..4) {
0 => {
db.reset();
total_resets.fetch_add(1, Ordering::Relaxed);
}
1 => {
for _ in 0..100 {
let tiny = json!({"x": rng.random::<u64>()});
match db.append(tiny) {
Ok(_) => {
total_appends.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
errors_count.fetch_add(1, Ordering::Relaxed);
}
}
}
}
2 => {
let _ = db.fetch(None, None);
}
_ => {
for _ in 0..10 {
let _ = db.fetch(Some(rng.gen_range(1..50)), None);
}
}
}
}
}));
}
{
let db = db.clone();
let start = start_time;
handles.push(thread::spawn(move || {
while start.elapsed() < test_duration {
let _has_data = db.has_data();
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
println!("=== Memory Store Stress Test Results ===");
println!("Total Duration: {:?}", start_time.elapsed());
println!("Total Appends: {}", total_appends.load(Ordering::Relaxed));
println!("Total Fetches: {}", total_fetches.load(Ordering::Relaxed));
println!("Total Resets: {}", total_resets.load(Ordering::Relaxed));
println!("Total Errors: {}", errors_count.load(Ordering::Relaxed));
Ok(())
}
#[test]
fn test_directory_store_stress() -> Result<()> {
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
let temp_dir = TempDir::new()?;
let config = DirectoryConfig {
write_key: "test-key-dir".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 1024 * 2, };
let store = DirectoryStore::new(config)?;
let db = Arc::new(TransientDB::new(store));
let total_appends = Arc::new(AtomicU64::new(0));
let total_fetches = Arc::new(AtomicU64::new(0));
let total_resets = Arc::new(AtomicU64::new(0));
let errors_count = Arc::new(AtomicU64::new(0));
let files_removed = Arc::new(AtomicU64::new(0));
let mut handles = vec![];
let test_duration = Duration::from_secs(10);
let start_time = Instant::now();
for pattern in 0..6 {
let db = db.clone();
let total_appends = total_appends.clone();
let errors_count = errors_count.clone();
let start = start_time;
handles.push(thread::spawn(move || {
let mut rng = rand::thread_rng();
while start.elapsed() < test_duration {
let data = match pattern % 3 {
0 => json!({
"small": rng.random::<u64>(),
"timestamp": chrono::Utc::now().to_rfc3339()
}),
1 => json!({
"medium": {
"data": rng.random::<u64>(),
"array": (0..10).collect::<Vec<_>>(),
"timestamp": chrono::Utc::now().to_rfc3339()
}
}),
_ => {
let mut obj = json!({
"timestamp": chrono::Utc::now().to_rfc3339()
});
for i in 0..10 {
obj[format!("field_{}", i)] = json!(rng.random::<u64>());
}
obj
}
};
match db.append(data) {
Ok(_) => {
total_appends.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
errors_count.fetch_add(1, Ordering::Relaxed);
}
}
thread::sleep(Duration::from_micros(100));
}
}));
}
for _ in 0..4 {
let db = db.clone();
let total_fetches = total_fetches.clone();
let errors_count = errors_count.clone();
let files_removed = files_removed.clone();
let start = start_time;
handles.push(thread::spawn(move || {
let mut rng = rand::thread_rng();
while start.elapsed() < test_duration {
let count = if rng.gen_bool(0.7) {
Some(rng.gen_range(1..5))
} else {
Some(rng.gen_range(5..20))
};
match db.fetch(count, None) {
Ok(Some(result)) => {
total_fetches.fetch_add(1, Ordering::Relaxed);
if let Some(removable) = result.removable {
match db.remove(&removable) {
Ok(_) => {
files_removed
.fetch_add(removable.len() as u64, Ordering::Relaxed);
}
Err(_) => {
errors_count.fetch_add(1, Ordering::Relaxed);
}
}
}
}
Ok(None) => {}
Err(_) => {
errors_count.fetch_add(1, Ordering::Relaxed);
}
}
thread::sleep(Duration::from_millis(rng.gen_range(10..50)));
}
}));
}
for _ in 0..2 {
let db = db.clone();
let total_resets = total_resets.clone();
let total_appends = total_appends.clone();
let errors_count = errors_count.clone();
let start = start_time;
handles.push(thread::spawn(move || {
let mut rng = rand::thread_rng();
while start.elapsed() < test_duration {
match rng.gen_range(0..4) {
0 => {
db.reset();
total_resets.fetch_add(1, Ordering::Relaxed);
thread::sleep(Duration::from_millis(100));
}
1 => {
for _ in 0..20 {
let tiny = json!({
"x": rng.random::<u64>(),
"timestamp": chrono::Utc::now().to_rfc3339()
});
match db.append(tiny) {
Ok(_) => {
total_appends.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
errors_count.fetch_add(1, Ordering::Relaxed);
}
}
}
}
2 => {
if let Ok(Some(result)) = db.fetch(None, Some(1024 * 100)) {
if let Some(removable) = result.removable {
let _ = db.remove(&removable);
}
}
}
_ => {
for _ in 0..5 {
if let Ok(Some(result)) = db.fetch(Some(1), None) {
if let Some(removable) = result.removable {
let _ = db.remove(&removable);
}
}
}
}
}
}
}));
}
{
let temp_dir_path = temp_dir.path().to_owned();
let start = start_time;
handles.push(thread::spawn(move || {
while start.elapsed() < test_duration {
if let Ok(entries) = fs::read_dir(&temp_dir_path) {
let mut open_files = 0;
let mut temp_files = 0;
for entry in entries.filter_map(Result::ok) {
let path = entry.path();
if let Some(ext) = path.extension() {
if ext == "temp" {
temp_files += 1;
} else {
open_files += 1;
}
}
}
assert!(
open_files + temp_files < 10000,
"Too many files accumulated"
);
}
thread::sleep(Duration::from_millis(100));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
println!("=== Directory Store Stress Test Results ===");
println!("Total Duration: {:?}", start_time.elapsed());
println!("Total Appends: {}", total_appends.load(Ordering::Relaxed));
println!("Total Fetches: {}", total_fetches.load(Ordering::Relaxed));
println!("Total Resets: {}", total_resets.load(Ordering::Relaxed));
println!("Files Removed: {}", files_removed.load(Ordering::Relaxed));
println!("Total Errors: {}", errors_count.load(Ordering::Relaxed));
db.append(json!({"final_test": true}))?;
assert!(
db.has_data() || !db.has_data(),
"Store should be in valid state"
);
Ok(())
}