use apalis_core::backend::{Backend, TaskSink};
use apalis_libsql::LibsqlStorage;
use futures::StreamExt;
use libsql::Builder;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicUsize, Ordering};
use tempfile::TempDir;
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
fn unique_db_path() -> String {
let id = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
format!("test_db_{}.db", id)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestTask {
pub message: String,
pub value: i32,
}
#[tokio::test]
async fn test_push_task() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let db_path = temp_dir.path().join(unique_db_path());
let db = Builder::new_local(db_path.to_str().unwrap())
.build()
.await?;
let db_static: &'static libsql::Database = Box::leak(Box::new(db));
let mut storage = LibsqlStorage::<TestTask, ()>::new(db_static);
storage.setup().await?;
apalis_libsql::enable_wal_mode(db_static).await?;
let test_task = TestTask {
message: "Hello World".to_string(),
value: 42,
};
storage.push(test_task.clone()).await?;
let conn = db_static.connect()?;
let mut rows = conn
.query(
"SELECT COUNT(*) FROM Jobs WHERE status = 'Pending'",
libsql::params![],
)
.await?;
if let Some(row) = rows.next().await? {
let count: i64 = row.get(0)?;
assert_eq!(count, 1, "Should have exactly 1 pending task");
}
Ok(())
}
#[tokio::test]
async fn test_poll_task() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let db_path = temp_dir.path().join(unique_db_path());
let db = Builder::new_local(db_path.to_str().unwrap())
.build()
.await?;
let db_static: &'static libsql::Database = Box::leak(Box::new(db));
let storage = LibsqlStorage::<TestTask, ()>::new(db_static);
storage.setup().await?;
apalis_libsql::enable_wal_mode(db_static).await?;
let conn = db_static.connect()?;
let task_data = serde_json::to_vec(&TestTask {
message: "Test Poll".to_string(),
value: 123,
})?;
let task_id = ulid::Ulid::new().to_string();
let job_type = std::any::type_name::<TestTask>();
conn.execute(
"INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
VALUES (?1, ?2, ?3, 'Pending', 0, 25, strftime('%s', 'now'), 0, '{}')",
libsql::params![task_data, task_id, job_type]
).await?;
drop(conn);
use apalis_core::worker::context::WorkerContext;
let worker = WorkerContext::new::<&'static str>("test-worker");
let mut task_stream = storage.poll_default(&worker).boxed();
let first = tokio::time::timeout(std::time::Duration::from_secs(2), task_stream.next())
.await?
.ok_or("Stream ended unexpectedly")?;
match first? {
None => {
let task_result =
tokio::time::timeout(std::time::Duration::from_secs(2), task_stream.next())
.await?
.ok_or("Stream ended unexpectedly")?;
let task = task_result?.expect("Should receive a task");
let decoded: TestTask = serde_json::from_slice(&task.args)?;
assert_eq!(decoded.message, "Test Poll");
assert_eq!(decoded.value, 123);
}
Some(task) => {
let decoded: TestTask = serde_json::from_slice(&task.args)?;
assert_eq!(decoded.message, "Test Poll");
assert_eq!(decoded.value, 123);
}
}
Ok(())
}
#[tokio::test]
async fn test_full_lifecycle() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let db_path = temp_dir.path().join(unique_db_path());
let db = Builder::new_local(db_path.to_str().unwrap())
.build()
.await?;
let db_static: &'static libsql::Database = Box::leak(Box::new(db));
let mut storage = LibsqlStorage::<TestTask, ()>::new(db_static);
storage.setup().await?;
apalis_libsql::enable_wal_mode(db_static).await?;
let conn = db_static.connect()?;
let test_task = TestTask {
message: "Lifecycle Test".to_string(),
value: 999,
};
let task_data = serde_json::to_vec(&test_task)?;
let task_id = ulid::Ulid::new();
let task_id_str = task_id.to_string();
let job_type = std::any::type_name::<TestTask>();
conn.execute(
"INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
VALUES (?1, ?2, ?3, 'Pending', 0, 25, strftime('%s', 'now'), 0, '{}')",
libsql::params![task_data, task_id_str.clone(), job_type]
).await?;
let mut rows = conn
.query(
"SELECT status FROM Jobs WHERE id = ?1",
libsql::params![task_id_str.clone()],
)
.await?;
if let Some(row) = rows.next().await? {
let status: String = row.get(0)?;
assert_eq!(status, "Pending");
}
drop(conn);
use apalis_core::worker::context::WorkerContext;
let worker = WorkerContext::new::<&'static str>("test-worker");
let storage_clone = storage.clone();
let mut task_stream = storage_clone.poll_default(&worker).boxed();
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), task_stream.next()).await?;
let task_result = tokio::time::timeout(std::time::Duration::from_secs(2), task_stream.next())
.await?
.ok_or("Stream ended unexpectedly")?;
let task = task_result?.expect("Should receive a task");
let decoded: TestTask = serde_json::from_slice(&task.args)?;
assert_eq!(decoded.message, "Lifecycle Test");
assert_eq!(decoded.value, 999);
let conn = db_static.connect()?;
let mut rows = conn
.query(
"SELECT status, lock_by FROM Jobs WHERE id = ?1",
libsql::params![task_id_str.clone()],
)
.await?;
if let Some(row) = rows.next().await? {
let status: String = row.get(0)?;
let lock_by: Option<String> = row.get(1)?;
assert_eq!(status, "Queued");
assert_eq!(lock_by, Some("test-worker".to_string()));
}
drop(conn);
let result: Result<(), Box<dyn std::error::Error + Send + Sync>> = Ok(());
storage.ack(&task_id, result).await?;
let conn = db_static.connect()?;
let mut rows = conn
.query(
"SELECT status, done_at FROM Jobs WHERE id = ?1",
libsql::params![task_id_str.clone()],
)
.await?;
if let Some(row) = rows.next().await? {
let status: String = row.get(0)?;
let done_at: Option<i64> = row.get(1)?;
assert_eq!(status, "Done");
assert!(done_at.is_some(), "done_at should be set");
}
Ok(())
}
#[tokio::test]
async fn test_schema_setup() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let db_path = temp_dir.path().join(unique_db_path());
let db = Builder::new_local(db_path.to_str().unwrap())
.build()
.await?;
let db_static: &'static libsql::Database = Box::leak(Box::new(db));
let storage = LibsqlStorage::<TestTask, ()>::new(db_static);
storage.setup().await?;
let conn = db_static.connect()?;
let mut rows = conn
.query(
"SELECT name FROM sqlite_master WHERE type='table' AND name='Jobs'",
libsql::params![],
)
.await?;
assert!(rows.next().await?.is_some(), "Jobs table should exist");
let mut rows = conn
.query(
"SELECT name FROM sqlite_master WHERE type='table' AND name='Workers'",
libsql::params![],
)
.await?;
assert!(rows.next().await?.is_some(), "Workers table should exist");
Ok(())
}
#[tokio::test]
async fn test_worker_heartbeat() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let db_path = temp_dir.path().join(unique_db_path());
let db = Builder::new_local(db_path.to_str().unwrap())
.build()
.await?;
let db_static: &'static libsql::Database = Box::leak(Box::new(db));
let mut config = apalis_libsql::Config::new(std::any::type_name::<TestTask>());
config = config.set_keep_alive(std::time::Duration::from_millis(500)); let storage = LibsqlStorage::<TestTask, ()>::new_with_config(db_static, config);
storage.setup().await?;
apalis_libsql::enable_wal_mode(db_static).await?;
let conn = db_static.connect()?;
use apalis_core::worker::context::WorkerContext;
let worker = WorkerContext::new::<&'static str>("heartbeat-test-worker");
let mut poll_stream = storage.clone().poll_default(&worker).boxed();
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), poll_stream.next())
.await?
.ok_or("Poll stream ended unexpectedly")??;
let mut heartbeat_stream = storage.heartbeat(&worker).boxed();
tokio::time::timeout(std::time::Duration::from_secs(1), heartbeat_stream.next())
.await?
.ok_or("Heartbeat stream ended unexpectedly")??;
let mut rows = conn
.query(
"SELECT id, last_seen FROM Workers WHERE id = ?1",
libsql::params!["heartbeat-test-worker"],
)
.await?;
if let Some(row) = rows.next().await? {
let worker_id: String = row.get(0)?;
let last_seen: i64 = row.get(1)?;
assert_eq!(worker_id, "heartbeat-test-worker");
assert!(last_seen > 0, "last_seen should be set");
} else {
panic!("Worker should be registered after initial poll");
}
tokio::time::timeout(std::time::Duration::from_secs(2), heartbeat_stream.next())
.await?
.ok_or("Heartbeat stream ended unexpectedly")??;
let mut rows = conn
.query(
"SELECT last_seen FROM Workers WHERE id = ?1",
libsql::params!["heartbeat-test-worker"],
)
.await?;
if let Some(row) = rows.next().await? {
let last_seen_after: i64 = row.get(0)?;
assert!(last_seen_after > 0, "last_seen should still be set");
}
Ok(())
}