use anyhow::Result;
use drifter::app::state::AppEvent;
use drifter::coordinator::Coordinator;
use drifter::core::config::Config;
use drifter::db;
use rusqlite::Connection;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, mpsc};
use std::time::Instant;
use tokio::sync::Mutex as AsyncMutex;
fn setup_stress_db() -> Result<Connection> {
let conn = Connection::open_in_memory()?;
conn.execute_batch(
"
CREATE TABLE jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL DEFAULT 'legacy',
created_at TEXT NOT NULL,
status TEXT NOT NULL,
source_path TEXT NOT NULL,
staged_path TEXT,
size_bytes INTEGER NOT NULL,
scan_status TEXT,
upload_status TEXT,
s3_bucket TEXT,
s3_key TEXT,
s3_upload_id TEXT,
checksum TEXT,
remote_checksum TEXT,
error TEXT,
priority INTEGER DEFAULT 0,
retry_count INTEGER DEFAULT 0,
next_retry_at TEXT,
scan_duration_ms INTEGER,
upload_duration_ms INTEGER
);
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL,
event_type TEXT NOT NULL,
message TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(job_id) REFERENCES jobs(id)
);
",
)?;
Ok(conn)
}
#[tokio::test]
async fn stress_test_coordinator_cycle() -> Result<()> {
let conn = Arc::new(Mutex::new(setup_stress_db()?));
let config = Arc::new(AsyncMutex::new(Config::default()));
let progress = Arc::new(AsyncMutex::new(HashMap::new()));
let cancellation_tokens = Arc::new(AsyncMutex::new(HashMap::new()));
let (app_tx, _app_rx) = mpsc::channel::<AppEvent>();
let coordinator = Coordinator::new(
conn.clone(),
config.clone(),
progress,
cancellation_tokens,
app_tx,
)?;
{
let c = conn.lock().unwrap();
for i in 0..1000 {
let job_id = db::create_job(
&c,
"stress-session",
&format!("/tmp/file_{}.txt", i),
1024,
None,
)?;
db::update_job_staged(&c, job_id, &format!("/tmp/staged_{}.txt", i), "queued")?;
}
}
println!("Starting stress test: processing 1000 jobs through coordinator cycles...");
let start = Instant::now();
for _ in 0..100 {
coordinator.process_cycle().await?;
}
let elapsed = start.elapsed();
println!("Processed cycles in {:?}", elapsed);
let (scanning_count, uploading_count) = {
let c = conn.lock().unwrap();
let scanning = db::count_jobs_with_status(&c, "scanning")?;
let uploading = db::count_jobs_with_status(&c, "uploading")?;
(scanning, uploading)
};
println!(
"Jobs in scanning: {}, Jobs in uploading: {}",
scanning_count, uploading_count
);
assert!(scanning_count > 0 || uploading_count > 0);
Ok(())
}