#![cfg(feature = "sqlite")]
use std::collections::HashSet;
use std::sync::Arc;
use crawlex::queue::sqlite::SqliteQueue;
use crawlex::queue::{FetchMethod, Job, JobQueue};
use url::Url;
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn concurrent_pop_returns_unique_ids() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("q.db").to_string_lossy().to_string();
let queue = Arc::new(SqliteQueue::open(&path).unwrap());
const N: usize = 200;
for i in 0..N {
queue
.push(Job {
id: 0, url: Url::parse(&format!("https://example.com/p/{i}")).unwrap(),
depth: 0,
priority: 0,
method: FetchMethod::HttpSpoof,
attempts: 0,
last_error: None,
})
.await
.unwrap();
}
const M: usize = 32;
let mut tasks = Vec::with_capacity(M);
for _ in 0..M {
let q = queue.clone();
tasks.push(tokio::spawn(async move {
let mut got = Vec::new();
while let Some(job) = q.pop().await.unwrap() {
got.push(job.id);
}
got
}));
}
let mut all = Vec::new();
for t in tasks {
all.extend(t.await.unwrap());
}
let unique: HashSet<u64> = all.iter().copied().collect();
assert_eq!(
unique.len(),
all.len(),
"duplicate ids returned: total={} unique={}",
all.len(),
unique.len()
);
assert_eq!(unique.len(), N);
}