use apalis_core::worker::context::WorkerContext;
use apalis_libsql::{
Config,
fetcher::{LibsqlPollFetcher, fetch_next},
};
use futures::StreamExt;
use libsql::Builder;
use std::{sync::Arc, time::Duration};
use tempfile::TempDir;
use ulid::Ulid;
struct TestDb {
db: &'static libsql::Database,
_temp_dir: Arc<TempDir>,
}
async fn setup_test_db() -> TestDb {
let temp_dir = Arc::new(TempDir::new().unwrap());
let db_path = temp_dir.path().join("test_fetcher.db");
let db = Builder::new_local(db_path.to_str().unwrap())
.build()
.await
.unwrap();
let db_static: &'static libsql::Database = Box::leak(Box::new(db));
let conn = db_static.connect().unwrap();
conn.execute_batch(include_str!("../migrations/001_initial.sql"))
.await
.unwrap();
TestDb {
db: db_static,
_temp_dir: temp_dir,
}
}
#[tokio::test]
async fn test_fetch_next_empty() {
let test_db = setup_test_db().await;
let db = test_db.db;
let config = Config::new("TestTask");
let worker = WorkerContext::new::<&str>("test-worker");
let tasks = fetch_next(db, &config, &worker).await.unwrap();
assert!(tasks.is_empty());
}
#[tokio::test]
async fn test_fetch_next_multiple() {
let test_db = setup_test_db().await;
let db = test_db.db;
let job_type = "TestTask";
let worker_id = "test-worker";
let conn = db.connect().unwrap();
conn.execute(
"INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) VALUES (?1, ?2, 'LibsqlStorage', '', strftime('%s', 'now'))",
libsql::params![worker_id, job_type],
)
.await
.unwrap();
for _i in 0..5 {
let task_id = Ulid::new();
conn.execute(
"INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
VALUES (?1, ?2, ?3, 'Pending', 0, 3, strftime('%s', 'now'), 0, '{}')",
libsql::params![b"test_job_data", task_id.to_string(), job_type],
)
.await
.unwrap();
}
let config = Config::new(job_type).set_buffer_size(3);
let worker = WorkerContext::new::<&str>(worker_id);
let tasks = fetch_next(db, &config, &worker).await.unwrap();
assert_eq!(tasks.len(), 3);
let mut rows = conn
.query(
"SELECT COUNT(*) FROM Jobs WHERE status = 'Queued' AND lock_by = ?1",
libsql::params![worker_id],
)
.await
.unwrap();
if let Some(row) = rows.next().await.unwrap() {
let count: i64 = row.get(0).unwrap();
assert_eq!(count, 3);
}
}
#[tokio::test]
async fn test_fetcher_clone() {
let test_db = setup_test_db().await;
let db = test_db.db;
let config = Config::new("TestTask");
let worker = WorkerContext::new::<&str>("test-worker");
let mut fetcher1 = LibsqlPollFetcher::<()>::new(db, &config, &worker);
let mut fetcher2 = fetcher1.clone();
let poll1 = tokio::time::timeout(Duration::from_millis(100), fetcher1.next()).await;
let poll2 = tokio::time::timeout(Duration::from_millis(100), fetcher2.next()).await;
match (poll1, poll2) {
(Ok(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Ok(_)) | (Err(_), Err(_)) => {
}
}
}
#[test]
fn test_fetcher_debug() {
let rt = tokio::runtime::Runtime::new().unwrap();
let test_db = rt.block_on(setup_test_db());
let db = test_db.db;
let config = Config::new("TestTask");
let worker = WorkerContext::new::<&str>("test-worker");
let fetcher = LibsqlPollFetcher::<()>::new(db, &config, &worker);
let debug_str = format!("{:?}", fetcher);
assert!(debug_str.contains("LibsqlPollFetcher"));
assert!(debug_str.contains("Config"));
}
#[tokio::test]
async fn test_fetcher_take_pending() {
let test_db = setup_test_db().await;
let db = test_db.db;
let job_type = "TestTask";
let worker_id = "test-worker";
let conn = db.connect().unwrap();
conn.execute(
"INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) VALUES (?1, ?2, 'LibsqlStorage', '', strftime('%s', 'now'))",
libsql::params![worker_id, job_type],
)
.await
.unwrap();
for _i in 0..2 {
let task_id = Ulid::new();
conn.execute(
"INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
VALUES (?1, ?2, ?3, 'Pending', 0, 3, strftime('%s', 'now'), 0, '{}')",
libsql::params![b"test_job_data", task_id.to_string(), job_type],
)
.await
.unwrap();
}
let config = Config::new(job_type);
let worker = WorkerContext::new::<&str>(worker_id);
let mut fetcher = LibsqlPollFetcher::<()>::new(db, &config, &worker);
let pending = fetcher.take_pending();
assert!(pending.is_empty()); }
#[tokio::test]
async fn test_fetcher_stream_delay_on_empty() {
let test_db = setup_test_db().await;
let db = test_db.db;
let poll_interval = Duration::from_millis(100);
let config = Config::new("TestTask").set_poll_interval(poll_interval);
let worker = WorkerContext::new::<&str>("test-worker");
let mut stream = LibsqlPollFetcher::<()>::new(db, &config, &worker);
let mut successful_polls = 0;
let mut total_polls = 0;
for _i in 0..3 {
total_polls += 1;
let poll_result = tokio::time::timeout(Duration::from_millis(200), stream.next()).await;
match poll_result {
Ok(Some(result)) => match result {
Ok(_) => {
successful_polls += 1;
}
Err(_) => {
successful_polls += 1;
}
},
Ok(None) => {
successful_polls += 1;
}
Err(_) => {
successful_polls += 1;
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
successful_polls, total_polls,
"Stream should remain functional when empty with delays"
);
}
#[tokio::test]
async fn test_fetcher_stream_continuity() {
let test_db = setup_test_db().await;
let db = test_db.db;
let config = Config::new("TestTask");
let worker = WorkerContext::new::<&str>("test-worker");
let mut stream = LibsqlPollFetcher::<()>::new(db, &config, &worker);
let mut successful_polls = 0;
let mut total_polls = 0;
for _i in 0..3 {
total_polls += 1;
let result = tokio::time::timeout(Duration::from_millis(500), stream.next()).await;
match result {
Ok(Some(Ok(_))) => {
successful_polls += 1;
}
Ok(Some(Err(_))) => {
successful_polls += 1;
}
Ok(None) => {
successful_polls += 1;
}
Err(_) => {
successful_polls += 1;
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
successful_polls, total_polls,
"Stream should remain functional across multiple polls"
);
let mut new_stream = LibsqlPollFetcher::<()>::new(db, &config, &worker);
let new_result = tokio::time::timeout(Duration::from_millis(500), new_stream.next()).await;
let _ = new_result; }