use std::time::Duration;
use chrono::Utc;
use gradatum_core::{
CurateSpec, Job, JobClass, JobLifecycle, JobLineage, JobMode, JobPriority, JobRecord, JobRetry,
JobScheduling, JobScope, JobSpec, JobStatus, QueueStore, RetryBackoff, TriggerSource,
};
use gradatum_db_sqlite::{apply_sqlite_pragmas, run_migrations, SqliteQueueStore};
use gradatum_worker::{ApalisConfig, WorkerMetrics};
use sqlx::SqlitePool;
use ulid::Ulid;
async fn test_pool() -> SqlitePool {
let pool = SqlitePool::connect("sqlite::memory:")
.await
.expect("pool in-memory");
apply_sqlite_pragmas(&pool).await.expect("pragmas");
run_migrations(&pool).await.expect("migrations");
pool
}
async fn test_store() -> SqliteQueueStore {
SqliteQueueStore::new(test_pool().await)
}
fn make_curate_record() -> JobRecord {
let now = Utc::now();
JobRecord {
id: Ulid::new(),
spec: JobSpec {
kind: Job::Curate(CurateSpec {
note_id: Ulid::new(),
tenant_id: "main".to_string(),
..Default::default()
}),
class: JobClass::Agent,
mode: JobMode::Batch,
scope: JobScope::VaultWide,
priority: JobPriority::default_for(&JobClass::Agent),
},
scheduling: JobScheduling {
trigger: TriggerSource::Demand,
scheduled_at: now,
await_jobs: vec![],
deadline: None,
cron_expr: None,
},
lifecycle: JobLifecycle {
status: JobStatus::Pending,
created_at: now,
started_at: None,
completed_at: None,
lease_until: None,
result: None,
},
retry: JobRetry {
count: 0,
max: 3,
backoff: RetryBackoff::Exponential { base: 5, max: 120 },
last_error: None,
errors: vec![],
},
lineage: JobLineage {
triggered_by: None,
parent_job: None,
pipeline_id: None,
pipeline_step: None,
children: vec![],
cost_usd: None,
},
}
}
#[tokio::test]
async fn sweep_no_stale_leases_returns_empty() {
let store = test_store().await;
let record = make_curate_record();
store.enqueue(record).await.expect("enqueue");
let _dequeued = store.dequeue().await.expect("dequeue").expect("déqueué");
let recovered = store
.recover_stale_leases(Duration::from_secs(300))
.await
.expect("recover_stale_leases");
assert!(
recovered.is_empty(),
"aucun lease expiré attendu (TTL=300s, lease_until=now+300s) : {recovered:?}"
);
}
#[tokio::test]
async fn sweep_recover_stale_leases_returns_ids() {
use gradatum_worker::schedules::run_sweep_once as sweep;
let store = test_store().await;
sweep(&store, Duration::from_secs(0), None).await;
let record = make_curate_record();
let id = store.enqueue(record).await.expect("enqueue");
sweep(&store, Duration::from_secs(0), None).await;
let job = store.get(id).await.expect("get").expect("job existe");
assert_eq!(
job.lifecycle.status,
JobStatus::Pending,
"job Pending ne doit pas être modifié par recover_stale_leases"
);
}
#[tokio::test]
async fn subscribe_broadcast_two_receivers() {
use gradatum_core::QueueEvent;
let store = test_store().await;
let mut rx1 = store.subscribe();
let mut rx2 = store.subscribe();
let record = make_curate_record();
let id = store.enqueue(record).await.expect("enqueue");
let ev1 = tokio::time::timeout(Duration::from_millis(500), rx1.recv())
.await
.expect("rx1 timeout")
.expect("rx1 recv");
let ev2 = tokio::time::timeout(Duration::from_millis(500), rx2.recv())
.await
.expect("rx2 timeout")
.expect("rx2 recv");
assert!(
matches!(ev1, QueueEvent::JobInserted(eid) if eid == id),
"rx1 doit recevoir JobInserted({id}), reçu : {ev1:?}"
);
assert!(
matches!(ev2, QueueEvent::JobInserted(eid) if eid == id),
"rx2 doit recevoir JobInserted({id}), reçu : {ev2:?}"
);
}
#[test]
fn metrics_prometheus_render_all_four() {
let m = WorkerMetrics::new();
m.inc_jobs_total("curate", "done");
m.inc_jobs_total("embed", "error");
m.observe_duration("reindex", 2.5);
m.inc_dlq("curate");
m.inc_workers_active("embed");
let rendered = m.render();
for metric_name in [
"gradatum_jobs_total",
"gradatum_jobs_duration_seconds",
"gradatum_jobs_dlq_total",
"gradatum_workers_active",
] {
assert!(
rendered.contains(metric_name),
"métrique absente du rendu Prometheus : {metric_name}"
);
}
}
#[test]
fn apalis_config_from_toml_schedules() {
let toml_str = r#"
[workers]
[workers.curate]
concurrency = 3
timeout_secs = 45
max_retries = 2
[[schedules]]
name = "cleanup_dlq_daily"
cron = "0 3 * * *"
retention_days = 14
"#;
let cfg: ApalisConfig = toml::from_str(toml_str).expect("désérialisation ApalisConfig");
assert_eq!(cfg.workers.curate.concurrency, 3);
assert_eq!(cfg.workers.curate.timeout_secs, 45);
assert_eq!(cfg.workers.curate.max_retries, 2);
assert_eq!(cfg.schedules.len(), 1);
assert_eq!(cfg.schedules[0].name, "cleanup_dlq_daily");
assert_eq!(cfg.schedules[0].cron, "0 3 * * *");
assert_eq!(cfg.schedules[0].retention_days, 14);
}
#[tokio::test]
async fn sweep_cancel_expired_deadlines_returns_ids() {
let store = test_store().await;
let mut record = make_curate_record();
record.scheduling.deadline = Some(Utc::now() - chrono::Duration::hours(1));
let id = store.enqueue(record).await.expect("enqueue");
let cancelled_ids = store
.cancel_expired_deadlines(Utc::now())
.await
.expect("cancel_expired_deadlines");
assert!(
cancelled_ids.contains(&id),
"l'ID du job deadline passée doit figurer dans cancelled_ids : {cancelled_ids:?}"
);
}
#[tokio::test]
async fn sweep_no_deadline_job_not_cancelled() {
let store = test_store().await;
let record = make_curate_record(); let id = store.enqueue(record).await.expect("enqueue");
let cancelled_ids = store
.cancel_expired_deadlines(Utc::now())
.await
.expect("cancel_expired_deadlines");
assert!(
!cancelled_ids.contains(&id),
"job sans deadline ne doit PAS être dans cancelled_ids : {cancelled_ids:?}"
);
}