use std::time::Duration;
use anyhow::{Context, Result};
use sqlx::SqlitePool;
use tracing::{info, warn};
const CLEANUP_INTERVAL: Duration = Duration::from_secs(5 * 60);
const PENDING_TIMEOUT: &str = "-1 hour";
const HISTORY_RETENTION: &str = "-90 days";
pub fn spawn(pool: SqlitePool) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
info!(
interval_secs = CLEANUP_INTERVAL.as_secs(),
pending_timeout = PENDING_TIMEOUT,
"executions cleanup task started",
);
let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
loop {
interval.tick().await;
match expire_stale_pending(&pool).await {
Ok(n) if n > 0 => info!(
expired = n,
"executions cleanup: marked {n} stale pending rows as expired",
),
Ok(_) => {}
Err(e) => warn!(error = %e, "executions cleanup failed"),
}
match prune_inventory_history(&pool).await {
Ok(n) if n > 0 => info!(
deleted = n,
"inventory_history cleanup: pruned {n} rows older than {HISTORY_RETENTION}",
),
Ok(_) => {}
Err(e) => warn!(error = %e, "inventory_history cleanup failed"),
}
}
})
}
async fn prune_inventory_history(pool: &SqlitePool) -> Result<u64> {
let rows = sqlx::query(
"DELETE FROM inventory_history
WHERE observed_at < datetime('now', ?)",
)
.bind(HISTORY_RETENTION)
.execute(pool)
.await
.context("DELETE inventory_history retention sweep")?;
Ok(rows.rows_affected())
}
async fn expire_stale_pending(pool: &SqlitePool) -> Result<u64> {
let rows = sqlx::query(
"UPDATE executions
SET status = 'expired'
WHERE status = 'pending'
AND initiated_at < datetime('now', ?)",
)
.bind(PENDING_TIMEOUT)
.execute(pool)
.await
.context("UPDATE executions expire stale pending")?;
Ok(rows.rows_affected())
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
async fn insert_exec(pool: &SqlitePool, exec_id: &str, status: &str, offset_minutes: i64) {
let sql = format!(
"INSERT INTO executions
(exec_id, job_id, version, initiated_by, target_count, status, initiated_at)
VALUES (?, 'j', '1.0', 'tester', 1, ?, datetime('now', '{offset_minutes} minutes'))"
);
sqlx::query(&sql)
.bind(exec_id)
.bind(status)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn pending_older_than_1h_becomes_expired() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-stale", "pending", -120).await; let affected = expire_stale_pending(&pool).await.unwrap();
assert_eq!(affected, 1);
let status: (String,) = sqlx::query_as("SELECT status FROM executions WHERE exec_id = ?")
.bind("e-stale")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, "expired");
}
#[tokio::test]
async fn pending_within_1h_is_left_alone() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-fresh", "pending", -30).await; let affected = expire_stale_pending(&pool).await.unwrap();
assert_eq!(affected, 0, "fresh pending must not be touched");
let status: (String,) = sqlx::query_as("SELECT status FROM executions WHERE exec_id = ?")
.bind("e-fresh")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, "pending");
}
#[tokio::test]
async fn other_statuses_are_never_touched() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-run", "running", -180).await;
insert_exec(&pool, "e-done", "completed", -180).await;
insert_exec(&pool, "e-exp", "expired", -180).await;
let affected = expire_stale_pending(&pool).await.unwrap();
assert_eq!(affected, 0);
for (id, expected) in [
("e-run", "running"),
("e-done", "completed"),
("e-exp", "expired"),
] {
let status: (String,) =
sqlx::query_as("SELECT status FROM executions WHERE exec_id = ?")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, expected, "{id} status should be unchanged");
}
}
#[tokio::test]
async fn pending_exactly_1h_is_left_alone() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-boundary", "pending", -60).await;
let affected = expire_stale_pending(&pool).await.unwrap();
assert_eq!(affected, 0, "exactly-1h-old pending is at the boundary");
let status: (String,) = sqlx::query_as("SELECT status FROM executions WHERE exec_id = ?")
.bind("e-boundary")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, "pending");
}
#[tokio::test]
async fn cleanup_is_idempotent() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-old", "pending", -120).await;
let first = expire_stale_pending(&pool).await.unwrap();
let second = expire_stale_pending(&pool).await.unwrap();
assert_eq!(first, 1);
assert_eq!(second, 0, "second run finds nothing to expire");
}
}