use std::sync::OnceLock;
use sea_orm::{
ConnectOptions, ConnectionTrait, Database, DatabaseConnection, DbBackend, Statement,
};
use sea_orm_migration::MigratorTrait;
use tokio::sync::Mutex;
static DB_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
fn db_lock() -> &'static Mutex<()> {
DB_LOCK.get_or_init(|| Mutex::new(()))
}
fn database_url() -> String {
std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://durable:durable@localhost:5432/durable".to_string())
}
async fn setup() -> (DatabaseConnection, tokio::sync::MutexGuard<'static, ()>) {
let guard = db_lock().lock().await;
let mut opt = ConnectOptions::new(&database_url());
opt.max_connections(2);
let db = Database::connect(opt)
.await
.expect("failed to connect to database");
db.execute_unprepared("DROP TABLE IF EXISTS durable.executor_heartbeat CASCADE")
.await
.ok();
db.execute_unprepared("DROP TABLE IF EXISTS durable.task CASCADE")
.await
.ok();
db.execute_unprepared("DROP TABLE IF EXISTS durable.task_queue CASCADE")
.await
.ok();
db.execute_unprepared("DROP TYPE IF EXISTS durable.task_status CASCADE")
.await
.ok();
db.execute_unprepared(
"DELETE FROM seaql_migrations WHERE version IN ('m20260315_000001_init', 'm20260315_000002_drop_task_name_unique', 'm20260316_000001_add_task_handler', 'm20260316_000002_add_recovery_count')",
)
.await
.ok();
durable_db::Migrator::up(&db, None)
.await
.expect("migration failed");
(db, guard)
}
async fn query_one(db: &DatabaseConnection, sql: &str) -> serde_json::Value {
let row = db
.query_one(Statement::from_string(DbBackend::Postgres, sql))
.await
.expect("query failed")
.expect("no row returned");
let val: String = row.try_get_by_index(0).unwrap_or_default();
serde_json::Value::String(val)
}
async fn query_count(db: &DatabaseConnection, sql: &str) -> i64 {
let row = db
.query_one(Statement::from_string(DbBackend::Postgres, sql))
.await
.expect("query failed")
.expect("no row returned");
row.try_get_by_index::<i64>(0).expect("expected i64 count")
}
async fn exec(db: &DatabaseConnection, sql: &str) {
db.execute(Statement::from_string(DbBackend::Postgres, sql))
.await
.expect("exec failed");
}
#[tokio::test]
async fn migration_creates_schema_and_tables() {
let (db, _guard) = setup().await;
let count = query_count(
&db,
"SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'durable'",
)
.await;
assert_eq!(count, 1, "durable schema should exist");
let count = query_count(
&db,
"SELECT count(*) FROM information_schema.tables \
WHERE table_schema = 'durable' AND table_name = 'task'",
)
.await;
assert_eq!(count, 1, "task table should exist");
let count = query_count(
&db,
"SELECT count(*) FROM information_schema.tables \
WHERE table_schema = 'durable' AND table_name = 'task_queue'",
)
.await;
assert_eq!(count, 1, "task_queue table should exist");
}
#[tokio::test]
async fn migration_creates_indexes() {
let (db, _guard) = setup().await;
let expected_indexes = vec![
"idx-task-status",
"idx-task-parent_id",
"idx-task-queue_name-status",
"idx-task-executor_id-status",
"idx-task-next_run_at",
"task_parent_id_sequence_key",
"idx-task-parent_id-name",
];
for idx_name in expected_indexes {
let count = query_count(
&db,
&format!(
"SELECT count(*) FROM pg_indexes \
WHERE schemaname = 'durable' AND indexname = '{idx_name}'"
),
)
.await;
assert_eq!(count, 1, "index {idx_name} should exist");
}
}
#[tokio::test]
async fn migration_is_idempotent() {
let (db, _guard) = setup().await;
durable_db::Migrator::up(&db, None)
.await
.expect("re-running migration should succeed");
}
#[tokio::test]
async fn migration_down_drops_everything() {
let (db, _guard) = setup().await;
exec(&db, "DROP TABLE IF EXISTS durable.task CASCADE").await;
exec(&db, "DROP TABLE IF EXISTS durable.task_queue CASCADE").await;
let count = query_count(
&db,
"SELECT count(*) FROM information_schema.tables \
WHERE table_schema = 'durable' AND table_name = 'task'",
)
.await;
assert_eq!(count, 0, "task table should be gone after drop");
let count = query_count(
&db,
"SELECT count(*) FROM information_schema.tables \
WHERE table_schema = 'durable' AND table_name = 'task_queue'",
)
.await;
assert_eq!(count, 0, "task_queue table should be gone after drop");
}
#[tokio::test]
async fn insert_and_query_task_queue() {
let (db, _guard) = setup().await;
exec(
&db,
"INSERT INTO durable.task_queue (name, max_concurrency) VALUES ('ingest', 4)",
)
.await;
let name = query_one(
&db,
"SELECT name FROM durable.task_queue WHERE name = 'ingest'",
)
.await;
assert_eq!(name, "ingest");
}
#[tokio::test]
async fn insert_root_workflow() {
let (db, _guard) = setup().await;
exec(
&db,
"INSERT INTO durable.task (id, name, kind) \
VALUES ('a0000000-0000-0000-0000-000000000001', 'ingest', 'WORKFLOW')",
)
.await;
let status = query_one(
&db,
"SELECT status::text FROM durable.task WHERE id = 'a0000000-0000-0000-0000-000000000001'",
)
.await;
assert_eq!(status, "PENDING", "default status should be PENDING");
let kind = query_one(
&db,
"SELECT kind FROM durable.task WHERE id = 'a0000000-0000-0000-0000-000000000001'",
)
.await;
assert_eq!(kind, "WORKFLOW");
}
#[tokio::test]
async fn insert_child_steps() {
let (db, _guard) = setup().await;
let wf_id = "b0000000-0000-0000-0000-000000000001";
let step1_id = "b0000000-0000-0000-0000-000000000002";
let step2_id = "b0000000-0000-0000-0000-000000000003";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind, status) \
VALUES ('{wf_id}', 'ingest', 'WORKFLOW', 'RUNNING')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind, status) \
VALUES ('{step1_id}', '{wf_id}', 0, 'resolve_crawl', 'STEP', 'COMPLETED')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind, status) \
VALUES ('{step2_id}', '{wf_id}', 1, 'shard_0', 'STEP', 'RUNNING')"
),
)
.await;
let count = query_count(
&db,
&format!("SELECT count(*) FROM durable.task WHERE parent_id = '{wf_id}'"),
)
.await;
assert_eq!(count, 2);
let first_step = query_one(
&db,
&format!(
"SELECT name FROM durable.task \
WHERE parent_id = '{wf_id}' ORDER BY sequence ASC LIMIT 1"
),
)
.await;
assert_eq!(first_step, "resolve_crawl");
}
#[tokio::test]
async fn unique_constraint_parent_sequence() {
let (db, _guard) = setup().await;
let wf_id = "c0000000-0000-0000-0000-000000000001";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind) \
VALUES ('{wf_id}', 'test_wf', 'WORKFLOW')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind) \
VALUES ('c0000000-0000-0000-0000-000000000002', '{wf_id}', 0, 'step_a', 'STEP')"
),
)
.await;
let result = db
.execute(Statement::from_string(
DbBackend::Postgres,
format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind) \
VALUES ('c0000000-0000-0000-0000-000000000003', '{wf_id}', 0, 'step_b', 'STEP')"
),
))
.await;
assert!(
result.is_err(),
"duplicate (parent_id, sequence) should violate unique constraint"
);
}
#[tokio::test]
async fn unique_constraint_parent_name() {
let (db, _guard) = setup().await;
let wf_id = "d0000000-0000-0000-0000-000000000001";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind) \
VALUES ('{wf_id}', 'test_wf', 'WORKFLOW')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind) \
VALUES ('d0000000-0000-0000-0000-000000000002', '{wf_id}', 0, 'same_name', 'STEP')"
),
)
.await;
let result = db
.execute(Statement::from_string(
DbBackend::Postgres,
format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind) \
VALUES ('d0000000-0000-0000-0000-000000000003', '{wf_id}', 1, 'same_name', 'STEP')"
),
))
.await;
assert!(
result.is_ok(),
"duplicate (parent_id, name) should be allowed after dropping unique constraint"
);
}
#[tokio::test]
async fn cascade_delete_children_on_parent_delete() {
let (db, _guard) = setup().await;
let wf_id = "e0000000-0000-0000-0000-000000000001";
let step_id = "e0000000-0000-0000-0000-000000000002";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind) \
VALUES ('{wf_id}', 'parent_wf', 'WORKFLOW')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind) \
VALUES ('{step_id}', '{wf_id}', 0, 'child_step', 'STEP')"
),
)
.await;
exec(
&db,
&format!("DELETE FROM durable.task WHERE id = '{wf_id}'"),
)
.await;
let count = query_count(
&db,
&format!("SELECT count(*) FROM durable.task WHERE id = '{step_id}'"),
)
.await;
assert_eq!(count, 0, "child should be cascade-deleted");
}
#[tokio::test]
async fn queue_foreign_key() {
let (db, _guard) = setup().await;
exec(
&db,
"INSERT INTO durable.task_queue (name, max_concurrency) VALUES ('embed', 2)",
)
.await;
exec(
&db,
"INSERT INTO durable.task (id, name, kind, queue_name, status) \
VALUES ('f0000000-0000-0000-0000-000000000001', 'embed_job', 'WORKFLOW', 'embed', 'PENDING')",
)
.await;
let result = db
.execute(Statement::from_string(
DbBackend::Postgres,
"INSERT INTO durable.task (id, name, kind, queue_name) \
VALUES ('f0000000-0000-0000-0000-000000000002', 'bad_job', 'WORKFLOW', 'nonexistent')"
.to_string(),
))
.await;
assert!(result.is_err(), "FK to nonexistent queue should fail");
}
#[tokio::test]
async fn default_values() {
let (db, _guard) = setup().await;
exec(
&db,
"INSERT INTO durable.task (id, name, kind) \
VALUES ('10000000-0000-0000-0000-000000000001', 'defaults_test', 'WORKFLOW')",
)
.await;
let max_retries = query_count(
&db,
"SELECT max_retries::bigint FROM durable.task WHERE id = '10000000-0000-0000-0000-000000000001'",
)
.await;
assert_eq!(max_retries, 3, "default max_retries should be 3");
let retry_count = query_count(
&db,
"SELECT retry_count::bigint FROM durable.task WHERE id = '10000000-0000-0000-0000-000000000001'",
)
.await;
assert_eq!(retry_count, 0, "default retry_count should be 0");
}
#[tokio::test]
async fn task_with_jsonb_input_output() {
let (db, _guard) = setup().await;
let task_id = "20000000-0000-0000-0000-000000000001";
let input_json = r#"{"crawl": "CC-2026", "shards": [1, 2, 3]}"#;
let output_json = r#"{"result": "ok", "count": 42}"#;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind, input) \
VALUES ('{task_id}', 'jsonb_test', 'WORKFLOW', '{input_json}')"
),
)
.await;
exec(
&db,
&format!(
"UPDATE durable.task SET output = '{output_json}', status = 'COMPLETED' \
WHERE id = '{task_id}'"
),
)
.await;
let row = db
.query_one(Statement::from_string(
DbBackend::Postgres,
format!("SELECT input::text, output::text FROM durable.task WHERE id = '{task_id}'"),
))
.await
.expect("query failed")
.expect("no row returned");
let input_str: String = row.try_get_by_index(0).unwrap();
let output_str: String = row.try_get_by_index(1).unwrap();
let input_val: serde_json::Value = serde_json::from_str(&input_str).unwrap();
let output_val: serde_json::Value = serde_json::from_str(&output_str).unwrap();
assert_eq!(input_val["crawl"], "CC-2026");
assert_eq!(input_val["shards"][1], 2);
assert_eq!(output_val["result"], "ok");
assert_eq!(output_val["count"], 42);
}
#[tokio::test]
async fn task_queue_rate_limit_columns() {
let (db, _guard) = setup().await;
exec(
&db,
"INSERT INTO durable.task_queue (name, max_concurrency, rate_limit, rate_limit_window_ms) \
VALUES ('rate_limited_q', 10, 50, 5000)",
)
.await;
let row = db
.query_one(Statement::from_string(
DbBackend::Postgres,
"SELECT rate_limit::bigint, rate_limit_window_ms \
FROM durable.task_queue WHERE name = 'rate_limited_q'",
))
.await
.expect("query failed")
.expect("no row returned");
let rate_limit: i64 = row.try_get_by_index(0).unwrap();
let window_ms: i64 = row.try_get_by_index(1).unwrap();
assert_eq!(rate_limit, 50);
assert_eq!(window_ms, 5000);
}
#[tokio::test]
async fn task_timestamps_auto_set() {
let (db, _guard) = setup().await;
let task_id = "30000000-0000-0000-0000-000000000001";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind) \
VALUES ('{task_id}', 'ts_test', 'WORKFLOW')"
),
)
.await;
let count = query_count(
&db,
&format!(
"SELECT count(*) FROM durable.task \
WHERE id = '{task_id}' AND created_at IS NOT NULL \
AND created_at >= now() - interval '10 seconds'"
),
)
.await;
assert_eq!(count, 1, "created_at should be auto-set and recent");
}
#[tokio::test]
async fn null_parent_allows_multiple_root_tasks() {
let (db, _guard) = setup().await;
let id1 = "40000000-0000-0000-0000-000000000001";
let id2 = "40000000-0000-0000-0000-000000000002";
let id3 = "40000000-0000-0000-0000-000000000003";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind) \
VALUES ('{id1}', 'root_a', 'WORKFLOW')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind) \
VALUES ('{id2}', 'root_b', 'WORKFLOW')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind) \
VALUES ('{id3}', 'root_c', 'WORKFLOW')"
),
)
.await;
let count = query_count(
&db,
"SELECT count(*) FROM durable.task WHERE parent_id IS NULL",
)
.await;
assert!(count >= 3, "should have at least 3 root tasks, got {count}");
}
#[tokio::test]
async fn test_migration_creates_executor_heartbeat_table() {
let (db, _guard) = setup().await;
let count = query_count(
&db,
"SELECT count(*) FROM information_schema.tables \
WHERE table_schema = 'durable' AND table_name = 'executor_heartbeat'",
)
.await;
assert_eq!(
count, 1,
"executor_heartbeat table should exist after migration"
);
}
#[tokio::test]
async fn test_executor_heartbeat_crud() {
let (db, _guard) = setup().await;
exec(
&db,
"INSERT INTO durable.executor_heartbeat (executor_id, last_seen) \
VALUES ('test-worker-crud', now())",
)
.await;
let count = query_count(
&db,
"SELECT count(*) FROM durable.executor_heartbeat WHERE executor_id = 'test-worker-crud'",
)
.await;
assert_eq!(count, 1, "should have inserted one row");
exec(
&db,
"INSERT INTO durable.executor_heartbeat (executor_id, last_seen) \
VALUES ('test-worker-crud', now()) \
ON CONFLICT (executor_id) DO UPDATE SET last_seen = now()",
)
.await;
let count_after_upsert = query_count(
&db,
"SELECT count(*) FROM durable.executor_heartbeat WHERE executor_id = 'test-worker-crud'",
)
.await;
assert_eq!(count_after_upsert, 1, "upsert should leave exactly one row");
exec(
&db,
"DELETE FROM durable.executor_heartbeat WHERE executor_id = 'test-worker-crud'",
)
.await;
let count_after_delete = query_count(
&db,
"SELECT count(*) FROM durable.executor_heartbeat WHERE executor_id = 'test-worker-crud'",
)
.await;
assert_eq!(count_after_delete, 0, "row should be deleted");
}
#[tokio::test]
async fn insert_scheduled_task_with_cron_and_next_run() {
let (db, _guard) = setup().await;
let task_id = "50000000-0000-0000-0000-000000000001";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind, cron, next_run_at) \
VALUES ('{task_id}', 'daily_cleanup', 'WORKFLOW', '0 0 * * *', \
'2026-04-01T00:00:00Z')"
),
)
.await;
let row = db
.query_one(Statement::from_string(
DbBackend::Postgres,
format!("SELECT cron, next_run_at::text FROM durable.task WHERE id = '{task_id}'"),
))
.await
.expect("query failed")
.expect("no row returned");
let cron: String = row.try_get_by_index(0).unwrap();
let next_run: String = row.try_get_by_index(1).unwrap();
assert_eq!(cron, "0 0 * * *");
assert!(
next_run.contains("2026-04-01"),
"next_run_at should contain the scheduled date, got: {next_run}"
);
}
#[tokio::test]
async fn scheduled_task_null_cron_means_one_shot() {
let (db, _guard) = setup().await;
let task_id = "51000000-0000-0000-0000-000000000001";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind) \
VALUES ('{task_id}', 'one_shot_job', 'WORKFLOW')"
),
)
.await;
let count = query_count(
&db,
&format!(
"SELECT count(*) FROM durable.task \
WHERE id = '{task_id}' AND cron IS NULL AND next_run_at IS NULL"
),
)
.await;
assert_eq!(
count, 1,
"one-shot task should have NULL cron and next_run_at"
);
}
#[tokio::test]
async fn schedule_index_is_used_for_due_tasks_query() {
let (db, _guard) = setup().await;
let due_id = "52000000-0000-0000-0000-000000000001";
let future_id = "52000000-0000-0000-0000-000000000002";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind, cron, next_run_at, status) \
VALUES ('{due_id}', 'due_job', 'WORKFLOW', '0 */6 * * *', \
now() - interval '1 minute', 'PENDING')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind, cron, next_run_at, status) \
VALUES ('{future_id}', 'future_job', 'WORKFLOW', '0 */6 * * *', \
now() + interval '1 hour', 'PENDING')"
),
)
.await;
let due_count = query_count(
&db,
"SELECT count(*) FROM durable.task \
WHERE cron IS NOT NULL AND next_run_at <= now() AND status = 'PENDING'",
)
.await;
assert!(
due_count >= 1,
"should find at least 1 due scheduled task, got {due_count}"
);
let future_due = query_count(
&db,
&format!(
"SELECT count(*) FROM durable.task \
WHERE id = '{future_id}' AND cron IS NOT NULL AND next_run_at <= now()"
),
)
.await;
assert_eq!(future_due, 0, "future task should not be due yet");
}
#[tokio::test]
async fn scheduled_parent_can_spawn_child_instances() {
let (db, _guard) = setup().await;
let sched_id = "53000000-0000-0000-0000-000000000001";
let child1_id = "53000000-0000-0000-0000-000000000002";
let child2_id = "53000000-0000-0000-0000-000000000003";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind, cron, next_run_at) \
VALUES ('{sched_id}', 'etl_pipeline', 'WORKFLOW', '0 */6 * * *', \
now() + interval '6 hours')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind, status) \
VALUES ('{child1_id}', '{sched_id}', 0, 'etl_run_1', 'WORKFLOW', 'COMPLETED')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind, status) \
VALUES ('{child2_id}', '{sched_id}', 1, 'etl_run_2', 'WORKFLOW', 'RUNNING')"
),
)
.await;
let children = query_count(
&db,
&format!("SELECT count(*) FROM durable.task WHERE parent_id = '{sched_id}'"),
)
.await;
assert_eq!(
children, 2,
"scheduled parent should have 2 child instances"
);
let row = db
.query_one(Statement::from_string(
DbBackend::Postgres,
format!("SELECT cron FROM durable.task WHERE id = '{sched_id}'"),
))
.await
.expect("query failed")
.expect("no row");
let cron: String = row.try_get_by_index(0).unwrap();
assert_eq!(cron, "0 */6 * * *");
}
#[tokio::test]
async fn update_next_run_at_after_trigger() {
let (db, _guard) = setup().await;
let task_id = "54000000-0000-0000-0000-000000000001";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind, cron, next_run_at) \
VALUES ('{task_id}', 'hourly_sync', 'WORKFLOW', '0 * * * *', \
now() - interval '1 minute')"
),
)
.await;
exec(
&db,
&format!(
"UPDATE durable.task SET next_run_at = now() + interval '1 hour' \
WHERE id = '{task_id}'"
),
)
.await;
let due = query_count(
&db,
&format!(
"SELECT count(*) FROM durable.task \
WHERE id = '{task_id}' AND next_run_at <= now()"
),
)
.await;
assert_eq!(due, 0, "task should not be due after advancing next_run_at");
}
#[tokio::test]
async fn cascade_delete_removes_scheduled_children() {
let (db, _guard) = setup().await;
let sched_id = "55000000-0000-0000-0000-000000000001";
let child_id = "55000000-0000-0000-0000-000000000002";
exec(
&db,
&format!(
"INSERT INTO durable.task (id, name, kind, cron, next_run_at) \
VALUES ('{sched_id}', 'scheduled_parent', 'WORKFLOW', '0 0 * * *', \
now() + interval '1 day')"
),
)
.await;
exec(
&db,
&format!(
"INSERT INTO durable.task (id, parent_id, sequence, name, kind) \
VALUES ('{child_id}', '{sched_id}', 0, 'run_instance', 'WORKFLOW')"
),
)
.await;
exec(
&db,
&format!("DELETE FROM durable.task WHERE id = '{sched_id}'"),
)
.await;
let count = query_count(
&db,
&format!("SELECT count(*) FROM durable.task WHERE id = '{child_id}'"),
)
.await;
assert_eq!(
count, 0,
"child should be cascade-deleted with scheduled parent"
);
}