use std::sync::Arc;
use serde_json::json;
use tempfile::tempdir;
use rustqueue::engine::queue::QueueManager;
use rustqueue::storage::{
BufferedRedbConfig, BufferedRedbStorage, HybridConfig, HybridStorage, MemoryStorage,
RedbStorage, StorageBackend,
};
#[tokio::test]
async fn crash_recovery_redb() {
let dir = tempdir().expect("failed to create tempdir");
let db_path = dir.path().join("crash-test.redb");
{
let storage = Arc::new(RedbStorage::new(&db_path).expect("open redb"));
let mgr = QueueManager::new(Arc::clone(&storage) as Arc<dyn StorageBackend>);
for i in 0..100 {
let name = format!("crash-job-{i}");
mgr.push("crash-q", &name, json!({"i": i}), None)
.await
.expect("push should succeed");
}
let counts = storage.get_queue_counts("crash-q").await.unwrap();
assert_eq!(counts.waiting, 100, "should have 100 waiting before crash");
drop(mgr);
drop(storage);
}
{
let storage = Arc::new(RedbStorage::new(&db_path).expect("reopen redb"));
let counts = storage.get_queue_counts("crash-q").await.unwrap();
assert_eq!(
counts.waiting, 100,
"after crash recovery, all 100 waiting jobs should be present"
);
let pulled = storage.dequeue("crash-q", 100).await.unwrap();
assert_eq!(pulled.len(), 100, "should be able to dequeue all 100 jobs");
}
}
#[tokio::test]
async fn hybrid_dirty_flush() {
let dir = tempdir().expect("failed to create tempdir");
let db_path = dir.path().join("hybrid-flush.redb");
{
let inner = Arc::new(RedbStorage::new(&db_path).expect("create redb"));
let inner_ref = Arc::clone(&inner);
let hybrid = HybridStorage::new(
inner,
HybridConfig {
snapshot_interval_ms: 100,
max_dirty_before_flush: 10,
},
);
let mgr = QueueManager::new(Arc::new(hybrid));
for i in 0..50 {
let name = format!("flush-job-{i}");
mgr.push("flush-q", &name, json!({"i": i}), None)
.await
.expect("push should succeed");
}
for _ in 0..100 {
let counts = inner_ref.get_queue_counts("flush-q").await.unwrap();
if counts.waiting >= 50 {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
let counts = inner_ref.get_queue_counts("flush-q").await.unwrap();
assert_eq!(
counts.waiting, 50,
"inner redb should have all 50 jobs after hybrid flush"
);
drop(mgr);
}
}
#[tokio::test]
#[ignore]
async fn memory_stability() {
use sysinfo::{Pid, System};
let storage = Arc::new(MemoryStorage::new());
let mgr = QueueManager::new(storage);
for i in 0..100 {
let name = format!("warmup-{i}");
let id = mgr
.push("mem-q", &name, json!({"i": i}), None)
.await
.unwrap();
let pulled = mgr.pull("mem-q", 1).await.unwrap();
assert_eq!(pulled.len(), 1);
mgr.ack(id, None).await.unwrap();
}
let pid = Pid::from_u32(std::process::id());
let mut sys = System::new();
sys.refresh_processes();
let rss_before = sys.process(pid).map(|p| p.memory()).unwrap_or(0);
for i in 0..10_000 {
let name = format!("stability-{i}");
let id = mgr
.push("mem-q", &name, json!({"i": i}), None)
.await
.unwrap();
let pulled = mgr.pull("mem-q", 1).await.unwrap();
assert_eq!(pulled.len(), 1);
mgr.ack(id, None).await.unwrap();
}
sys.refresh_processes();
let rss_after = sys.process(pid).map(|p| p.memory()).unwrap_or(0);
let growth_mb = (rss_after as f64 - rss_before as f64) / (1024.0 * 1024.0);
eprintln!(
"RSS before: {} MB, after: {} MB, growth: {:.1} MB",
rss_before / (1024 * 1024),
rss_after / (1024 * 1024),
growth_mb
);
assert!(
growth_mb < 50.0,
"RSS grew by {growth_mb:.1} MB over 10K cycles, expected < 50 MB"
);
}
#[tokio::test]
async fn concurrent_push_stress() {
let dir = tempdir().expect("failed to create tempdir");
let db_path = dir.path().join("stress-test.redb");
let inner = Arc::new(RedbStorage::new(&db_path).expect("create redb"));
let buffered = BufferedRedbStorage::new(
inner,
BufferedRedbConfig {
interval_ms: 5,
max_batch: 50,
},
);
let mgr = Arc::new(QueueManager::new(Arc::new(buffered)));
let mut handles = Vec::with_capacity(100);
for i in 0..100u64 {
let mgr = Arc::clone(&mgr);
handles.push(tokio::spawn(async move {
let name = format!("stress-{i}");
mgr.push("stress-q", &name, json!({"i": i}), None)
.await
.expect("concurrent push should succeed");
}));
}
for h in handles {
h.await.expect("task should not panic");
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let stats = mgr.get_queue_stats("stress-q").await.expect("queue stats");
assert_eq!(
stats.waiting, 100,
"all 100 concurrent pushes should be persisted"
);
let pulled = mgr.pull("stress-q", 100).await.expect("pull all");
assert_eq!(
pulled.len(),
100,
"should be able to pull all 100 concurrently pushed jobs"
);
}