use std::time::Duration;
use anyhow::{Context, Result};
use sqlx::SqlitePool;
use tracing::{info, warn};
const CLEANUP_INTERVAL: Duration = Duration::from_secs(5 * 60);
const PENDING_TIMEOUT: &str = "-1 hour";
const INFLIGHT_TIMEOUT: &str = "-24 hours";
const REAPED_EXIT_CODE: i64 = -1;
const REAPED_STDERR_NOTE: &str = "[backend: reaped — no ExecResult within 24h; agent likely died mid-run \
or hit the pre-v0.43.14 kill-hang (#330)]";
const HISTORY_RETENTION: &str = "-90 days";
const PERF_RETENTION: &str = "-30 days";
const PROCESS_PERF_RETENTION: &str = "-7 days";
const OBS_EVENTS_RETENTION: &str = "-90 days";
pub fn spawn(pool: SqlitePool) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
info!(
interval_secs = CLEANUP_INTERVAL.as_secs(),
pending_timeout = PENDING_TIMEOUT,
"executions cleanup task started",
);
let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
loop {
interval.tick().await;
match expire_stale_pending(&pool).await {
Ok(n) if n > 0 => info!(
expired = n,
"executions cleanup: marked {n} stale pending rows as expired",
),
Ok(_) => {}
Err(e) => warn!(error = %e, "executions cleanup failed"),
}
match reap_orphaned_results(&pool).await {
Ok(n) if n > 0 => info!(
reaped = n,
"execution_results cleanup: reaped {n} orphaned in-flight rows (no result within {INFLIGHT_TIMEOUT})",
),
Ok(_) => {}
Err(e) => warn!(error = %e, "execution_results reap failed"),
}
match prune_inventory_history(&pool).await {
Ok(n) if n > 0 => info!(
deleted = n,
"inventory_history cleanup: pruned {n} rows older than {HISTORY_RETENTION}",
),
Ok(_) => {}
Err(e) => warn!(error = %e, "inventory_history cleanup failed"),
}
match prune_host_perf_samples(&pool).await {
Ok(n) if n > 0 => info!(
deleted = n,
"host_perf_samples cleanup: pruned {n} rows older than {PERF_RETENTION}",
),
Ok(_) => {}
Err(e) => warn!(error = %e, "host_perf_samples cleanup failed"),
}
match prune_process_perf_samples(&pool).await {
Ok(n) if n > 0 => info!(
deleted = n,
"process_perf_samples cleanup: pruned {n} rows older than {PROCESS_PERF_RETENTION}",
),
Ok(_) => {}
Err(e) => warn!(error = %e, "process_perf_samples cleanup failed"),
}
match prune_obs_events(&pool).await {
Ok(n) if n > 0 => info!(
deleted = n,
"obs_events cleanup: pruned {n} rows older than {OBS_EVENTS_RETENTION}",
),
Ok(_) => {}
Err(e) => warn!(error = %e, "obs_events cleanup failed"),
}
}
})
}
async fn prune_obs_events(pool: &SqlitePool) -> Result<u64> {
let rows = sqlx::query(
"DELETE FROM obs_events
WHERE at < datetime('now', ?)",
)
.bind(OBS_EVENTS_RETENTION)
.execute(pool)
.await
.context("DELETE obs_events retention sweep")?;
Ok(rows.rows_affected())
}
async fn prune_process_perf_samples(pool: &SqlitePool) -> Result<u64> {
let rows = sqlx::query(
"DELETE FROM process_perf_samples
WHERE at < datetime('now', ?)",
)
.bind(PROCESS_PERF_RETENTION)
.execute(pool)
.await
.context("DELETE process_perf_samples retention sweep")?;
Ok(rows.rows_affected())
}
async fn prune_host_perf_samples(pool: &SqlitePool) -> Result<u64> {
let rows = sqlx::query(
"DELETE FROM host_perf_samples
WHERE at < datetime('now', ?)",
)
.bind(PERF_RETENTION)
.execute(pool)
.await
.context("DELETE host_perf_samples retention sweep")?;
Ok(rows.rows_affected())
}
async fn prune_inventory_history(pool: &SqlitePool) -> Result<u64> {
let rows = sqlx::query(
"DELETE FROM inventory_history
WHERE observed_at < datetime('now', ?)",
)
.bind(HISTORY_RETENTION)
.execute(pool)
.await
.context("DELETE inventory_history retention sweep")?;
Ok(rows.rows_affected())
}
async fn expire_stale_pending(pool: &SqlitePool) -> Result<u64> {
let rows = sqlx::query(
"UPDATE executions
SET status = 'expired'
WHERE status = 'pending'
AND initiated_at < datetime('now', ?)",
)
.bind(PENDING_TIMEOUT)
.execute(pool)
.await
.context("UPDATE executions expire stale pending")?;
Ok(rows.rows_affected())
}
async fn reap_orphaned_results(pool: &SqlitePool) -> Result<u64> {
let rows = sqlx::query(
"UPDATE execution_results
SET finished_at = CURRENT_TIMESTAMP,
exit_code = ?,
reaped = 1,
stderr = CASE
WHEN stderr = '' THEN ?
ELSE stderr || char(10) || ?
END
WHERE finished_at IS NULL
AND started_at < datetime('now', ?)",
)
.bind(REAPED_EXIT_CODE)
.bind(REAPED_STDERR_NOTE)
.bind(REAPED_STDERR_NOTE)
.bind(INFLIGHT_TIMEOUT)
.execute(pool)
.await
.context("UPDATE execution_results reap orphaned in-flight")?;
Ok(rows.rows_affected())
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
async fn insert_exec(pool: &SqlitePool, exec_id: &str, status: &str, offset_minutes: i64) {
let sql = format!(
"INSERT INTO executions
(exec_id, job_id, version, initiated_by, target_count, status, initiated_at)
VALUES (?, 'j', '1.0', 'tester', 1, ?, datetime('now', '{offset_minutes} minutes'))"
);
sqlx::query(sqlx::AssertSqlSafe(sql))
.bind(exec_id)
.bind(status)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn pending_older_than_1h_becomes_expired() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-stale", "pending", -120).await; let affected = expire_stale_pending(&pool).await.unwrap();
assert_eq!(affected, 1);
let status: (String,) = sqlx::query_as("SELECT status FROM executions WHERE exec_id = ?")
.bind("e-stale")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, "expired");
}
#[tokio::test]
async fn pending_within_1h_is_left_alone() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-fresh", "pending", -30).await; let affected = expire_stale_pending(&pool).await.unwrap();
assert_eq!(affected, 0, "fresh pending must not be touched");
let status: (String,) = sqlx::query_as("SELECT status FROM executions WHERE exec_id = ?")
.bind("e-fresh")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, "pending");
}
#[tokio::test]
async fn other_statuses_are_never_touched() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-run", "running", -180).await;
insert_exec(&pool, "e-done", "completed", -180).await;
insert_exec(&pool, "e-exp", "expired", -180).await;
let affected = expire_stale_pending(&pool).await.unwrap();
assert_eq!(affected, 0);
for (id, expected) in [
("e-run", "running"),
("e-done", "completed"),
("e-exp", "expired"),
] {
let status: (String,) =
sqlx::query_as("SELECT status FROM executions WHERE exec_id = ?")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, expected, "{id} status should be unchanged");
}
}
#[tokio::test]
async fn pending_exactly_1h_is_left_alone() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-boundary", "pending", -60).await;
let affected = expire_stale_pending(&pool).await.unwrap();
assert_eq!(affected, 0, "exactly-1h-old pending is at the boundary");
let status: (String,) = sqlx::query_as("SELECT status FROM executions WHERE exec_id = ?")
.bind("e-boundary")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, "pending");
}
#[tokio::test]
async fn cleanup_is_idempotent() {
let pool = fresh_pool().await;
insert_exec(&pool, "e-old", "pending", -120).await;
let first = expire_stale_pending(&pool).await.unwrap();
let second = expire_stale_pending(&pool).await.unwrap();
assert_eq!(first, 1);
assert_eq!(second, 0, "second run finds nothing to expire");
}
async fn insert_inflight_result(
pool: &SqlitePool,
result_id: &str,
offset_minutes: i64,
stderr: &str,
) {
let sql = format!(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at)
VALUES (?, 'req', 'pc-1', NULL, '', ?,
datetime('now', '{offset_minutes} minutes'), NULL)"
);
sqlx::query(sqlx::AssertSqlSafe(sql))
.bind(result_id)
.bind(stderr)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn inflight_older_than_24h_is_reaped() {
let pool = fresh_pool().await;
insert_inflight_result(&pool, "r-stale", -25 * 60, "").await; let n = reap_orphaned_results(&pool).await.unwrap();
assert_eq!(n, 1);
let row: (Option<String>, Option<i64>, String, i64) = sqlx::query_as(
"SELECT finished_at, exit_code, stderr, reaped \
FROM execution_results WHERE result_id = ?",
)
.bind("r-stale")
.fetch_one(&pool)
.await
.unwrap();
assert!(row.0.is_some(), "finished_at must be stamped");
assert_eq!(row.1, Some(REAPED_EXIT_CODE), "sentinel exit_code set");
assert!(row.2.contains("reaped"), "stderr must carry the reap note");
assert_eq!(row.3, 1, "row must be flagged reaped = 1");
}
#[tokio::test]
async fn inflight_within_24h_is_left_alone() {
let pool = fresh_pool().await;
insert_inflight_result(&pool, "r-fresh", -60, "").await; let n = reap_orphaned_results(&pool).await.unwrap();
assert_eq!(n, 0, "fresh in-flight row must not be touched");
let fin: (Option<String>,) =
sqlx::query_as("SELECT finished_at FROM execution_results WHERE result_id = ?")
.bind("r-fresh")
.fetch_one(&pool)
.await
.unwrap();
assert!(fin.0.is_none(), "row stays in-flight (finished_at NULL)");
}
#[tokio::test]
async fn already_finished_rows_are_untouched() {
let pool = fresh_pool().await;
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at)
VALUES ('r-done', 'req', 'pc-1', 0, '', '',
datetime('now', '-48 hours'), datetime('now', '-47 hours'))",
)
.execute(&pool)
.await
.unwrap();
let n = reap_orphaned_results(&pool).await.unwrap();
assert_eq!(n, 0);
let row: (Option<i64>, String) =
sqlx::query_as("SELECT exit_code, stderr FROM execution_results WHERE result_id = ?")
.bind("r-done")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, Some(0), "finished row's exit_code unchanged");
assert!(!row.1.contains("reaped"), "no note added to finished row");
}
#[tokio::test]
async fn reap_appends_note_after_partial_stderr() {
let pool = fresh_pool().await;
insert_inflight_result(&pool, "r-partial", -25 * 60, "partial output").await;
reap_orphaned_results(&pool).await.unwrap();
let s: (String,) =
sqlx::query_as("SELECT stderr FROM execution_results WHERE result_id = ?")
.bind("r-partial")
.fetch_one(&pool)
.await
.unwrap();
assert!(
s.0.starts_with("partial output"),
"partial capture kept first"
);
assert!(
s.0.contains("reaped"),
"note appended after the partial bytes"
);
}
#[tokio::test]
async fn reap_is_idempotent() {
let pool = fresh_pool().await;
insert_inflight_result(&pool, "r-old", -30 * 60, "").await; let first = reap_orphaned_results(&pool).await.unwrap();
let second = reap_orphaned_results(&pool).await.unwrap();
assert_eq!(first, 1);
assert_eq!(second, 0, "reaped row is no longer in-flight");
}
}