use apalis_core::{
backend::{Backend, BackendExt},
worker::context::WorkerContext,
};
use apalis_libsql::{LibsqlStorage, enable_wal_mode};
use futures::StreamExt;
use libsql::Builder;
use std::{sync::Arc, time::Duration};
use tempfile::TempDir;
use ulid::Ulid;
struct TestDb {
db: &'static libsql::Database,
_temp_dir: Arc<TempDir>,
}
async fn setup_test_db() -> TestDb {
let temp_dir = Arc::new(TempDir::new().unwrap());
let db_path = temp_dir.path().join("test_lib.db");
let db = Builder::new_local(db_path.to_str().unwrap())
.build()
.await
.unwrap();
let db_static: &'static libsql::Database = Box::leak(Box::new(db));
let conn = db_static.connect().unwrap();
conn.execute_batch(include_str!("../migrations/001_initial.sql"))
.await
.unwrap();
TestDb {
db: db_static,
_temp_dir: temp_dir,
}
}
#[tokio::test]
async fn test_storage_new() {
let test_db = setup_test_db().await;
let db = test_db.db;
let storage = LibsqlStorage::<(), ()>::new(db);
assert_eq!(storage.db() as *const _, db as *const _); assert_eq!(storage.config().buffer_size(), 10); assert_eq!(storage.config().queue().to_string(), "()"); }
#[tokio::test]
async fn test_storage_new_with_config() {
let test_db = setup_test_db().await;
let db = test_db.db;
let config = apalis_libsql::Config::new("TestTask").set_buffer_size(20);
let storage = LibsqlStorage::<(), ()>::new_with_config(db, config);
assert_eq!(storage.config().buffer_size(), 20);
assert_eq!(storage.config().queue().to_string(), "TestTask");
}
#[tokio::test]
async fn test_storage_with_codec() {
let test_db = setup_test_db().await;
let db = test_db.db;
let storage = LibsqlStorage::<(), ()>::new(db);
let _storage_with_codec =
storage.with_codec::<apalis_core::backend::codec::json::JsonCodec<Vec<u8>>>();
println!("Storage codec changed successfully");
}
#[tokio::test]
async fn test_storage_db_getter() {
let test_db = setup_test_db().await;
let db = test_db.db;
let storage = LibsqlStorage::<(), ()>::new(db);
let retrieved_db = storage.db();
assert_eq!(retrieved_db as *const _, db as *const _);
}
#[tokio::test]
async fn test_storage_config_getter() {
let test_db = setup_test_db().await;
let db = test_db.db;
let config = apalis_libsql::Config::new("TestTask").set_buffer_size(25);
let _storage = LibsqlStorage::<(), ()>::new_with_config(db, config);
let retrieved_config = _storage.config();
assert_eq!(retrieved_config.buffer_size(), 25);
assert_eq!(retrieved_config.queue().to_string(), "TestTask");
}
#[test]
fn test_storage_debug() {
let rt = tokio::runtime::Runtime::new().unwrap();
let test_db = rt.block_on(setup_test_db());
let db = test_db.db;
let storage = LibsqlStorage::<(), ()>::new(db);
let debug_str = format!("{:?}", storage);
assert!(debug_str.contains("LibsqlStorage"));
assert!(debug_str.contains("Database"));
}
#[test]
fn test_storage_clone() {
let rt = tokio::runtime::Runtime::new().unwrap();
let test_db = rt.block_on(setup_test_db());
let db = test_db.db;
let storage1 = LibsqlStorage::<(), ()>::new(db);
let storage2 = storage1;
assert_eq!(storage2.db() as *const _, db as *const _);
}
#[tokio::test]
async fn test_enable_wal_mode() {
let test_db = setup_test_db().await;
let db = test_db.db;
enable_wal_mode(db).await.unwrap();
let conn = db.connect().unwrap();
let mut rows = conn
.query("PRAGMA journal_mode", libsql::params![])
.await
.unwrap();
if let Some(row) = rows.next().await.unwrap() {
let mode: String = row.get(0).unwrap();
assert_eq!(mode, "wal");
}
}
#[tokio::test]
async fn test_backend_poll() {
let test_db = setup_test_db().await;
let db = test_db.db;
let job_type = "TestTask";
let worker_id = "test-worker";
let conn = db.connect().unwrap();
conn.execute(
"INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) VALUES (?1, ?2, 'LibsqlStorage', '', strftime('%s', 'now'))",
libsql::params![worker_id, job_type],
)
.await
.unwrap();
let task_id = Ulid::new();
conn.execute(
"INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
VALUES (?1, ?2, ?3, 'Pending', 0, 3, strftime('%s', 'now'), 0, '{}')",
libsql::params![b"test_job_data", task_id.to_string(), job_type],
)
.await
.unwrap();
let storage = LibsqlStorage::<(), ()>::new(db);
let worker = WorkerContext::new::<&str>(worker_id);
let mut stream = storage.poll(&worker);
let first_result = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
match first_result {
Ok(Some(Ok(None))) => {
}
Ok(Some(Ok(Some(_task)))) => {
panic!("Expected registration result (None) first, but got a task");
}
Ok(Some(Err(e))) => {
panic!("Expected registration result, but got error: {}", e);
}
Ok(None) => {
panic!("Stream ended before registration result");
}
Err(_) => {
panic!("Timeout waiting for registration result");
}
}
let second_result = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
match second_result {
Ok(Some(Ok(Some(_task)))) => {
}
Ok(Some(Ok(None))) => {
}
Ok(Some(Err(e))) => {
println!("Task decoding failed as expected: {}", e);
}
Ok(None) => {
println!("Stream ended");
}
Err(_) => {
println!("No task available within timeout");
}
}
}
#[tokio::test]
async fn test_backend_heartbeat() {
let test_db = setup_test_db().await;
let db = test_db.db;
let _job_type = "TestTask";
let worker_id = "test-worker";
let storage = LibsqlStorage::<(), ()>::new(db);
let worker = WorkerContext::new::<&str>(worker_id);
let mut heartbeat_stream = storage.heartbeat(&worker);
let first_result = tokio::time::timeout(Duration::from_secs(5), heartbeat_stream.next()).await;
match first_result {
Ok(Some(Err(e))) => {
println!("Heartbeat error (stream still functional): {}", e);
}
Ok(Some(Ok(()))) | Ok(None) | Err(_) => {
}
}
let conn = db.connect().unwrap();
let mut rows = conn
.query(
"SELECT id FROM Workers WHERE id = ?1",
libsql::params![worker_id],
)
.await
.unwrap();
let _worker_exists = rows.next().await.unwrap().is_some();
}
#[tokio::test]
async fn test_backend_middleware() {
let test_db = setup_test_db().await;
let db = test_db.db;
let storage = LibsqlStorage::<(), ()>::new(db);
let _middleware = storage.middleware();
println!("Storage middleware created successfully");
}
#[tokio::test]
async fn test_backend_poll_compact() {
let test_db = setup_test_db().await;
let db = test_db.db;
let job_type = "TestTask";
let worker_id = "test-worker";
let conn = db.connect().unwrap();
conn.execute(
"INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) VALUES (?1, ?2, 'LibsqlStorage', '', strftime('%s', 'now'))",
libsql::params![worker_id, job_type],
)
.await
.unwrap();
let task_id = Ulid::new();
conn.execute(
"INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
VALUES (?1, ?2, ?3, 'Pending', 0, 3, strftime('%s', 'now'), 0, '{}')",
libsql::params![b"test_job_data", task_id.to_string(), job_type],
)
.await
.unwrap();
let storage =
LibsqlStorage::<(), ()>::new_with_config(db, apalis_libsql::Config::new(job_type));
let worker = WorkerContext::new::<&str>(worker_id);
let mut stream = storage.poll_compact(&worker);
let first = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
assert!(first.is_ok());
let second = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
assert!(second.is_ok());
if let Ok(Some(Ok(Some(task)))) = second {
assert_eq!(task.args, b"test_job_data");
} else {
panic!("Should have received a task: {:?}", second);
}
}
#[tokio::test]
async fn test_storage_poll_default() {
let test_db = setup_test_db().await;
let db = test_db.db;
let job_type = "TestTask";
let worker_id = "test-worker";
let conn = db.connect().unwrap();
conn.execute(
"INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) VALUES (?1, ?2, 'LibsqlStorage', '', strftime('%s', 'now'))",
libsql::params![worker_id, job_type],
)
.await
.unwrap();
let task_id = Ulid::new();
conn.execute(
"INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
VALUES (?1, ?2, ?3, 'Pending', 0, 3, strftime('%s', 'now'), 0, '{}')",
libsql::params![b"test_job_data", task_id.to_string(), job_type],
)
.await
.unwrap();
let storage =
LibsqlStorage::<(), ()>::new_with_config(db, apalis_libsql::Config::new(job_type));
let worker = WorkerContext::new::<&str>(worker_id);
let mut stream = storage.poll_default(&worker);
let first = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
assert!(first.is_ok());
let second = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
assert!(second.is_ok());
if let Ok(Some(Ok(Some(task)))) = second {
assert_eq!(task.args, b"test_job_data");
} else {
panic!("Should have received a task");
}
}
#[tokio::test]
async fn test_orphan_task_reenqueue() {
let test_db = setup_test_db().await;
let db = test_db.db;
let job_type = "TestTask";
let dead_worker_id = "dead-worker";
let task_id = Ulid::new();
let conn = db.connect().unwrap();
let old_last_seen = chrono::Utc::now().timestamp() - 301;
conn.execute(
"INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) VALUES (?1, ?2, 'LibsqlStorage', '', ?3)",
libsql::params![dead_worker_id, job_type, old_last_seen],
)
.await
.unwrap();
conn.execute(
"INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata, lock_by, lock_at)
VALUES (?1, ?2, ?3, 'Running', 0, 3, strftime('%s', 'now'), 0, '{}', ?4, strftime('%s', 'now'))",
libsql::params![b"orphaned_task_data", task_id.to_string(), job_type, dead_worker_id],
)
.await
.unwrap();
let config = apalis_libsql::Config::new(job_type);
let reenqueued_count = apalis_libsql::reenqueue_orphaned(db, &config)
.await
.unwrap();
assert_eq!(
reenqueued_count, 1,
"Should have re-enqueued 1 orphaned task"
);
let mut rows = conn
.query(
"SELECT status, lock_by, lock_at FROM Jobs WHERE id = ?1",
libsql::params![task_id.to_string()],
)
.await
.unwrap();
if let Some(row) = rows.next().await.unwrap() {
let status: String = row.get(0).unwrap();
let lock_by: Option<String> = row.get(1).unwrap();
let lock_at: Option<i64> = row.get(2).unwrap();
assert_eq!(
status, "Pending",
"Task status should be 'Pending' after re-enqueue"
);
assert!(lock_by.is_none(), "lock_by should be NULL after re-enqueue");
assert!(lock_at.is_none(), "lock_at should be NULL after re-enqueue");
} else {
panic!("Task should exist after re-enqueue");
}
}