use std::sync::Arc;
use std::time::Duration;
use apalis::prelude::{BoxDynError, Data};
use apalis_cron::Tick;
use chrono::Utc;
use sqlx::SqlitePool;
use tracing::{error, info, warn};
use gradatum_core::QueueStore;
use gradatum_db_sqlite::idempotency_cleanup;
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct ScheduleConfig {
pub name: String,
pub cron: String,
#[serde(default = "ScheduleConfig::default_retention")]
pub retention_days: u32,
}
impl ScheduleConfig {
fn default_retention() -> u32 {
30
}
}
#[derive(Clone)]
#[allow(dead_code)]
pub struct CronHandlerCtx {
pub pool: Arc<SqlitePool>,
pub dlq_retention_days: u32,
}
pub async fn handle_cleanup_dlq(
_tick: Tick<Utc>,
pool: Data<Arc<SqlitePool>>,
retention: Data<u32>,
) -> Result<(), BoxDynError> {
let cutoff = Utc::now() - chrono::Duration::days(*retention as i64);
let cutoff_str = cutoff.to_rfc3339();
let result = sqlx::query(
r#"
DELETE FROM gradatum_jobs
WHERE status = 'DLQ'
AND COALESCE(completed_at, created_at) < ?
"#,
)
.bind(&cutoff_str)
.execute(pool.as_ref())
.await;
match result {
Ok(res) => {
let deleted = res.rows_affected();
if deleted > 0 {
info!(
deleted = deleted,
retention_days = *retention,
"cleanup_dlq_daily : {} jobs DLQ purgés",
deleted
);
}
Ok(())
}
Err(e) => {
error!(error = %e, "cleanup_dlq_daily : erreur SQL");
Err(BoxDynError::from(format!(
"cleanup_dlq_daily SQL error: {e}"
)))
}
}
}
pub async fn run_sweep_once(
store: &(impl QueueStore + ?Sized),
lease_ttl: Duration,
pool: Option<&SqlitePool>,
) {
let now = Utc::now();
match store.recover_stale_leases(lease_ttl).await {
Ok(ids) if !ids.is_empty() => {
info!(count = ids.len(), "sweep: leases expirés récupérés");
}
Ok(_) => {}
Err(e) => warn!(error = %e, "sweep: recover_stale_leases échoué"),
}
match store.cancel_expired_deadlines(now).await {
Ok(ids) if !ids.is_empty() => {
info!(count = ids.len(), "sweep: deadlines expirés annulés");
}
Ok(_) => {}
Err(e) => warn!(error = %e, "sweep: cancel_expired_deadlines échoué"),
}
match store.promote_retries(now).await {
Ok(ids) if !ids.is_empty() => {
info!(count = ids.len(), "sweep: retries promus en Pending");
}
Ok(_) => {}
Err(e) => warn!(error = %e, "sweep: promote_retries échoué"),
}
match pool {
Some(p) => {
let cutoff_ms = (now - chrono::Duration::hours(24)).timestamp_millis();
if let Err(e) = idempotency_cleanup(p, cutoff_ms).await {
warn!(error = %e, "sweep: idempotency_cleanup échoué — table peut croître");
}
}
None => {
warn!("sweep: pool non disponible — idempotency_cleanup ignoré (table peut croître)");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, Utc};
use gradatum_core::{JobFilter, JobRecord, JobResult, QueueError, QueueEvent};
use std::sync::Mutex;
use tokio::sync::broadcast::Receiver;
use ulid::Ulid;
struct MockStore {
stale_calls: Mutex<u32>,
deadline_calls: Mutex<u32>,
retry_calls: Mutex<u32>,
}
impl MockStore {
fn new() -> Self {
Self {
stale_calls: Mutex::new(0),
deadline_calls: Mutex::new(0),
retry_calls: Mutex::new(0),
}
}
}
#[async_trait::async_trait]
impl QueueStore for MockStore {
async fn enqueue(&self, _: JobRecord) -> Result<Ulid, QueueError> {
unimplemented!()
}
async fn dequeue(&self) -> Result<Option<JobRecord>, QueueError> {
unimplemented!()
}
async fn get(&self, _: Ulid) -> Result<Option<JobRecord>, QueueError> {
unimplemented!()
}
async fn complete(&self, _: Ulid, _: JobResult) -> Result<(), QueueError> {
unimplemented!()
}
async fn fail(&self, _: Ulid, _: &str, _: u32) -> Result<(), QueueError> {
unimplemented!()
}
async fn cancel(&self, _: Ulid) -> Result<(), QueueError> {
unimplemented!()
}
async fn fail_dlq(&self, _: Ulid, _: &str) -> Result<(), QueueError> {
unimplemented!()
}
async fn find_awaiting(&self, _: Ulid) -> Result<Vec<JobRecord>, QueueError> {
unimplemented!()
}
async fn set_pending(&self, _: Ulid) -> Result<(), QueueError> {
unimplemented!()
}
async fn recover_stale_leases(&self, _: Duration) -> Result<Vec<Ulid>, QueueError> {
*self.stale_calls.lock().unwrap() += 1;
Ok(vec![])
}
async fn cancel_expired_deadlines(
&self,
_: DateTime<Utc>,
) -> Result<Vec<Ulid>, QueueError> {
*self.deadline_calls.lock().unwrap() += 1;
Ok(vec![])
}
async fn promote_retries(&self, _: DateTime<Utc>) -> Result<Vec<Ulid>, QueueError> {
*self.retry_calls.lock().unwrap() += 1;
Ok(vec![])
}
async fn schedule_retry(&self, _: Ulid, _: DateTime<Utc>) -> Result<(), QueueError> {
unimplemented!()
}
async fn list(&self, _: JobFilter) -> Result<Vec<JobRecord>, QueueError> {
unimplemented!()
}
fn subscribe(&self) -> Receiver<QueueEvent> {
let (tx, rx) = tokio::sync::broadcast::channel(1);
drop(tx);
rx
}
}
#[tokio::test]
async fn sweep_once_calls_all_three_methods() {
let store = MockStore::new();
run_sweep_once(&store, Duration::from_secs(300), None).await;
assert_eq!(*store.stale_calls.lock().unwrap(), 1);
assert_eq!(*store.deadline_calls.lock().unwrap(), 1);
assert_eq!(*store.retry_calls.lock().unwrap(), 1);
}
}