use sea_orm::Database;
use sea_orm_migration::MigratorTrait;
use ferro_queue::{claim, enqueue, requeue_claimed_by, CreateJobsTable};
struct TestMigrator;
#[async_trait::async_trait]
impl MigratorTrait for TestMigrator {
fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
vec![Box::new(CreateJobsTable)]
}
}
#[tokio::test]
async fn graceful_shutdown_requeues_claimed_jobs() {
let db_file = tempfile::NamedTempFile::new().unwrap();
let db_url = format!("sqlite://{}?mode=rwc", db_file.path().display());
let conn = Database::connect(&db_url).await.unwrap();
TestMigrator::up(&conn, None).await.unwrap();
let now = chrono::Utc::now();
enqueue(
&conn,
"default",
"ShutdownTestJob",
"{}",
3,
None,
None,
now,
)
.await
.expect("enqueue failed");
let row = claim(&conn, "default", "w-shutdown")
.await
.expect("claim failed");
assert!(
row.is_some(),
"expected a job to be claimed, got None (queue may be empty)"
);
requeue_claimed_by(&conn, "w-shutdown")
.await
.expect("requeue_claimed_by failed");
let reclaimed = claim(&conn, "default", "w-next")
.await
.expect("re-claim failed");
assert!(
reclaimed.is_some(),
"job should be pending again after requeue_claimed_by; \
expected Some, got None (re-queue did not reset status)"
);
let reclaimed = reclaimed.unwrap();
assert_eq!(
reclaimed.job_type, "ShutdownTestJob",
"re-claimed job should be the original job"
);
}