use std::path::Path;
use std::time::Instant;
use microsandbox_db::DbWriteConnection;
use microsandbox_db::entity::{
maintenance_lease as lease_entity, run as run_entity, sandbox as sandbox_entity,
};
use sea_orm::sea_query::{Expr, OnConflict};
use sea_orm::{
ColumnTrait, Condition, DbErr, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set,
};
use crate::RuntimeResult;
const LEASE_DURATION_SECS: i64 = 10;
const MIN_SWEEP_INTERVAL_SECS: i64 = 30;
const MAX_SWEEP_DURATION_MS: u64 = 250;
const MAX_STALE_ACTIVE_ROWS: u64 = 250;
const MAX_TERMINAL_EPHEMERAL_ROWS: u64 = 250;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CleanupOutcome {
Removed,
DirRemoveFailed,
AlreadyGone,
AlreadyClaimed,
SkippedPersistent,
SkippedActive,
SkippedLivePid,
}
#[derive(Debug, Clone, Copy)]
pub struct MaintenanceLimits {
pub max_stale_active: u64,
pub max_terminal_ephemeral: u64,
pub max_duration: std::time::Duration,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct MaintenanceReport {
pub reconciled: u64,
pub removed: u64,
pub errors: u64,
pub timed_out: bool,
}
impl Default for MaintenanceLimits {
fn default() -> Self {
Self {
max_stale_active: MAX_STALE_ACTIVE_ROWS,
max_terminal_ephemeral: MAX_TERMINAL_EPHEMERAL_ROWS,
max_duration: std::time::Duration::from_millis(MAX_SWEEP_DURATION_MS),
}
}
}
pub async fn run_startup_maintenance(db: &DbWriteConnection, sandboxes_dir: &Path) {
match try_acquire_lease(db).await {
Ok(true) => {}
Ok(false) => {
tracing::debug!("lifecycle maintenance lease not acquired; skipping sweep");
return;
}
Err(err) => {
tracing::debug!(error = %err, "lifecycle maintenance lease attempt failed");
return;
}
}
match run_sandbox_lifecycle_maintenance(db, sandboxes_dir, MaintenanceLimits::default()).await {
Ok(report) => {
tracing::debug!(
reconciled = report.reconciled,
removed = report.removed,
errors = report.errors,
timed_out = report.timed_out,
"sandbox lifecycle maintenance complete"
);
if let Err(err) = record_completion(db).await {
tracing::debug!(error = %err, "failed to record maintenance completion");
}
}
Err(err) => {
tracing::warn!(error = %err, "sandbox lifecycle maintenance sweep failed");
}
}
}
pub async fn run_sandbox_lifecycle_maintenance(
db: &DbWriteConnection,
sandboxes_dir: &Path,
limits: MaintenanceLimits,
) -> RuntimeResult<MaintenanceReport> {
let mut report = MaintenanceReport::default();
let start = Instant::now();
let active = sandbox_entity::Entity::find()
.filter(sandbox_entity::Column::Status.is_in([
sandbox_entity::SandboxStatus::Running,
sandbox_entity::SandboxStatus::Draining,
]))
.order_by_asc(sandbox_entity::Column::Id)
.limit(limits.max_stale_active)
.all(db)
.await?;
for sandbox in active {
if start.elapsed() >= limits.max_duration {
report.timed_out = true;
break;
}
match reconcile_stale_active(db, &sandbox).await {
Ok(true) => report.reconciled += 1,
Ok(false) => {}
Err(err) => {
report.errors += 1;
tracing::debug!(sandbox = %sandbox.name, error = %err, "stale reconciliation failed");
}
}
}
if !report.timed_out {
let candidates = sandbox_entity::Entity::find()
.filter(sandbox_entity::Column::Ephemeral.eq(true))
.filter(sandbox_entity::Column::Status.is_in([
sandbox_entity::SandboxStatus::Stopped,
sandbox_entity::SandboxStatus::Crashed,
]))
.order_by_asc(sandbox_entity::Column::Id)
.limit(limits.max_terminal_ephemeral)
.all(db)
.await?;
for sandbox in candidates {
if start.elapsed() >= limits.max_duration {
report.timed_out = true;
break;
}
match cleanup_terminal_ephemeral_sandbox(db, sandboxes_dir, sandbox.id).await {
Ok(CleanupOutcome::Removed) => report.removed += 1,
Ok(_) => {}
Err(err) => {
report.errors += 1;
tracing::debug!(sandbox = %sandbox.name, error = %err, "ephemeral cleanup failed");
}
}
}
}
Ok(report)
}
pub async fn cleanup_terminal_ephemeral_sandbox(
db: &DbWriteConnection,
sandboxes_dir: &Path,
sandbox_id: i32,
) -> RuntimeResult<CleanupOutcome> {
let Some(sandbox) = sandbox_entity::Entity::find_by_id(sandbox_id)
.one(db)
.await?
else {
return Ok(CleanupOutcome::AlreadyGone);
};
if !sandbox.ephemeral {
return Ok(CleanupOutcome::SkippedPersistent);
}
if !is_terminal(sandbox.status) {
return Ok(CleanupOutcome::SkippedActive);
}
if has_live_active_run(db, sandbox.id).await? {
return Ok(CleanupOutcome::SkippedLivePid);
}
let dir = sandboxes_dir.join(&sandbox.name);
if let Err(err) = remove_dir_if_exists(&dir) {
tracing::warn!(
sandbox = %sandbox.name,
dir = %dir.display(),
error = %err,
"ephemeral cleanup failed to remove sandbox directory; keeping row for retry"
);
return Ok(CleanupOutcome::DirRemoveFailed);
}
let rows = sandbox_entity::Entity::delete_many()
.filter(sandbox_entity::Column::Id.eq(sandbox.id))
.filter(sandbox_entity::Column::Ephemeral.eq(true))
.filter(sandbox_entity::Column::Status.is_in([
sandbox_entity::SandboxStatus::Stopped,
sandbox_entity::SandboxStatus::Crashed,
]))
.exec(db)
.await?
.rows_affected;
if rows == 0 {
return Ok(CleanupOutcome::AlreadyClaimed);
}
Ok(CleanupOutcome::Removed)
}
async fn try_acquire_lease(db: &DbWriteConnection) -> RuntimeResult<bool> {
let now = chrono::Utc::now().naive_utc();
let recent_cutoff = now - chrono::Duration::seconds(MIN_SWEEP_INTERVAL_SECS);
let existing = lease_entity::Entity::find_by_id(lease_entity::SANDBOX_LIFECYCLE_MAINTENANCE)
.one(db)
.await?;
if let Some(lease) = &existing {
let held = lease.lease_expires_at > now;
let recently_done = lease
.last_completed_at
.is_some_and(|completed| completed > recent_cutoff);
if held || recently_done {
return Ok(false);
}
} else {
let seed = lease_entity::ActiveModel {
name: Set(lease_entity::SANDBOX_LIFECYCLE_MAINTENANCE.to_string()),
holder_pid: Set(None),
lease_expires_at: Set(now),
last_completed_at: Set(None),
};
let insert = lease_entity::Entity::insert(seed)
.on_conflict(
OnConflict::column(lease_entity::Column::Name)
.do_nothing()
.to_owned(),
)
.exec(db)
.await;
match insert {
Ok(_) => {}
Err(DbErr::RecordNotInserted) => {}
Err(err) => return Err(err.into()),
}
}
let lease_deadline = now + chrono::Duration::seconds(LEASE_DURATION_SECS);
let result = lease_entity::Entity::update_many()
.col_expr(
lease_entity::Column::HolderPid,
Expr::value(std::process::id() as i32),
)
.col_expr(
lease_entity::Column::LeaseExpiresAt,
Expr::value(lease_deadline),
)
.filter(lease_entity::Column::Name.eq(lease_entity::SANDBOX_LIFECYCLE_MAINTENANCE))
.filter(lease_entity::Column::LeaseExpiresAt.lte(now))
.filter(
Condition::any()
.add(lease_entity::Column::LastCompletedAt.is_null())
.add(lease_entity::Column::LastCompletedAt.lte(recent_cutoff)),
)
.exec(db)
.await?;
Ok(result.rows_affected == 1)
}
async fn record_completion(db: &DbWriteConnection) -> RuntimeResult<()> {
let now = chrono::Utc::now().naive_utc();
lease_entity::Entity::update_many()
.col_expr(lease_entity::Column::LastCompletedAt, Expr::value(now))
.col_expr(lease_entity::Column::HolderPid, Expr::value(None::<i32>))
.filter(lease_entity::Column::Name.eq(lease_entity::SANDBOX_LIFECYCLE_MAINTENANCE))
.exec(db)
.await?;
Ok(())
}
async fn reconcile_stale_active(
db: &DbWriteConnection,
sandbox: &sandbox_entity::Model,
) -> RuntimeResult<bool> {
let run = run_entity::Entity::find()
.filter(run_entity::Column::SandboxId.eq(sandbox.id))
.filter(run_entity::Column::Status.eq(run_entity::RunStatus::Running))
.order_by_desc(run_entity::Column::StartedAt)
.one(db)
.await?;
let Some(run) = run else {
return Ok(false);
};
if run.pid.is_some_and(pid_is_alive) {
return Ok(false);
}
let now = chrono::Utc::now().naive_utc();
run_entity::Entity::update_many()
.col_expr(
run_entity::Column::Status,
Expr::value(run_entity::RunStatus::Terminated),
)
.col_expr(
run_entity::Column::TerminationReason,
Expr::value(run_entity::TerminationReason::InternalError),
)
.col_expr(run_entity::Column::TerminatedAt, Expr::value(now))
.filter(run_entity::Column::Id.eq(run.id))
.filter(run_entity::Column::Status.eq(run_entity::RunStatus::Running))
.exec(db)
.await?;
let result = sandbox_entity::Entity::update_many()
.col_expr(
sandbox_entity::Column::Status,
Expr::value(sandbox_entity::SandboxStatus::Crashed),
)
.col_expr(sandbox_entity::Column::UpdatedAt, Expr::value(now))
.filter(sandbox_entity::Column::Id.eq(sandbox.id))
.filter(sandbox_entity::Column::Status.is_in([
sandbox_entity::SandboxStatus::Running,
sandbox_entity::SandboxStatus::Draining,
]))
.exec(db)
.await?;
Ok(result.rows_affected > 0)
}
async fn has_live_active_run(db: &DbWriteConnection, sandbox_id: i32) -> RuntimeResult<bool> {
let runs = run_entity::Entity::find()
.filter(run_entity::Column::SandboxId.eq(sandbox_id))
.filter(run_entity::Column::Status.eq(run_entity::RunStatus::Running))
.all(db)
.await?;
Ok(runs.iter().any(|run| run.pid.is_some_and(pid_is_alive)))
}
fn is_terminal(status: sandbox_entity::SandboxStatus) -> bool {
matches!(
status,
sandbox_entity::SandboxStatus::Stopped | sandbox_entity::SandboxStatus::Crashed
)
}
fn pid_is_alive(pid: i32) -> bool {
if unsafe { libc::kill(pid, 0) } == 0 {
return true;
}
matches!(
std::io::Error::last_os_error().raw_os_error(),
Some(code) if code == libc::EPERM
)
}
fn remove_dir_if_exists(path: &Path) -> std::io::Result<()> {
match std::fs::remove_dir_all(path) {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err),
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use microsandbox_migration::{Migrator, MigratorTrait};
use sea_orm::ActiveModelTrait;
use tempfile::TempDir;
use super::*;
const DEAD_PID: i32 = 2_000_000_000;
async fn test_db() -> (TempDir, DbWriteConnection) {
let dir = tempfile::tempdir().unwrap();
let db = DbWriteConnection::open(
&dir.path().join("test.db"),
Duration::from_secs(5),
Duration::from_secs(5),
)
.await
.unwrap();
Migrator::up(db.inner(), None).await.unwrap();
(dir, db)
}
async fn insert_sandbox(
db: &DbWriteConnection,
name: &str,
status: sandbox_entity::SandboxStatus,
ephemeral: bool,
) -> i32 {
let now = chrono::Utc::now().naive_utc();
sandbox_entity::ActiveModel {
name: Set(name.to_string()),
config: Set("{}".to_string()),
status: Set(status),
ephemeral: Set(ephemeral),
created_at: Set(Some(now)),
updated_at: Set(Some(now)),
..Default::default()
}
.insert(db)
.await
.unwrap()
.id
}
async fn insert_run(
db: &DbWriteConnection,
sandbox_id: i32,
pid: Option<i32>,
status: run_entity::RunStatus,
) {
run_entity::ActiveModel {
sandbox_id: Set(sandbox_id),
pid: Set(pid),
status: Set(status),
started_at: Set(Some(chrono::Utc::now().naive_utc())),
..Default::default()
}
.insert(db)
.await
.unwrap();
}
async fn status_of(db: &DbWriteConnection, id: i32) -> Option<sandbox_entity::SandboxStatus> {
sandbox_entity::Entity::find_by_id(id)
.one(db)
.await
.unwrap()
.map(|model| model.status)
}
#[tokio::test]
async fn cleanup_removes_terminal_ephemeral_row_and_dir() {
let (dir, db) = test_db().await;
let id = insert_sandbox(&db, "eph", sandbox_entity::SandboxStatus::Stopped, true).await;
let sandbox_dir = dir.path().join("eph");
std::fs::create_dir_all(&sandbox_dir).unwrap();
let outcome = cleanup_terminal_ephemeral_sandbox(&db, dir.path(), id)
.await
.unwrap();
assert_eq!(outcome, CleanupOutcome::Removed);
assert!(status_of(&db, id).await.is_none(), "row should be deleted");
assert!(!sandbox_dir.exists(), "directory should be removed");
}
#[tokio::test]
async fn cleanup_skips_persistent() {
let (dir, db) = test_db().await;
let id = insert_sandbox(&db, "keep", sandbox_entity::SandboxStatus::Stopped, false).await;
let outcome = cleanup_terminal_ephemeral_sandbox(&db, dir.path(), id)
.await
.unwrap();
assert_eq!(outcome, CleanupOutcome::SkippedPersistent);
assert!(status_of(&db, id).await.is_some(), "row should remain");
}
#[tokio::test]
async fn cleanup_skips_non_terminal() {
let (dir, db) = test_db().await;
let id = insert_sandbox(&db, "run", sandbox_entity::SandboxStatus::Running, true).await;
let outcome = cleanup_terminal_ephemeral_sandbox(&db, dir.path(), id)
.await
.unwrap();
assert_eq!(outcome, CleanupOutcome::SkippedActive);
assert!(status_of(&db, id).await.is_some());
}
#[tokio::test]
async fn cleanup_skips_when_run_pid_is_live() {
let (dir, db) = test_db().await;
let id = insert_sandbox(&db, "live", sandbox_entity::SandboxStatus::Stopped, true).await;
insert_run(
&db,
id,
Some(std::process::id() as i32),
run_entity::RunStatus::Running,
)
.await;
let outcome = cleanup_terminal_ephemeral_sandbox(&db, dir.path(), id)
.await
.unwrap();
assert_eq!(outcome, CleanupOutcome::SkippedLivePid);
assert!(status_of(&db, id).await.is_some());
}
#[tokio::test]
async fn cleanup_second_call_is_no_op() {
let (dir, db) = test_db().await;
let id = insert_sandbox(&db, "eph", sandbox_entity::SandboxStatus::Stopped, true).await;
assert_eq!(
cleanup_terminal_ephemeral_sandbox(&db, dir.path(), id)
.await
.unwrap(),
CleanupOutcome::Removed
);
assert_eq!(
cleanup_terminal_ephemeral_sandbox(&db, dir.path(), id)
.await
.unwrap(),
CleanupOutcome::AlreadyGone
);
}
#[tokio::test]
async fn lease_is_read_gated_while_held() {
let (_dir, db) = test_db().await;
assert!(try_acquire_lease(&db).await.unwrap());
assert!(!try_acquire_lease(&db).await.unwrap());
}
#[tokio::test]
async fn lease_is_read_gated_after_recent_completion() {
let (_dir, db) = test_db().await;
assert!(try_acquire_lease(&db).await.unwrap());
record_completion(&db).await.unwrap();
assert!(!try_acquire_lease(&db).await.unwrap());
}
#[tokio::test]
async fn lease_is_reacquirable_once_stale() {
let (_dir, db) = test_db().await;
assert!(try_acquire_lease(&db).await.unwrap());
let past = chrono::Utc::now().naive_utc() - chrono::Duration::seconds(120);
lease_entity::Entity::update_many()
.col_expr(lease_entity::Column::LeaseExpiresAt, Expr::value(past))
.col_expr(
lease_entity::Column::LastCompletedAt,
Expr::value(None::<chrono::NaiveDateTime>),
)
.filter(lease_entity::Column::Name.eq(lease_entity::SANDBOX_LIFECYCLE_MAINTENANCE))
.exec(&db)
.await
.unwrap();
assert!(try_acquire_lease(&db).await.unwrap());
}
#[tokio::test]
async fn sweep_reconciles_dead_active_and_cleans_terminal_ephemeral() {
let (dir, db) = test_db().await;
let dead = insert_sandbox(&db, "dead", sandbox_entity::SandboxStatus::Running, false).await;
insert_run(&db, dead, Some(DEAD_PID), run_entity::RunStatus::Running).await;
let eph = insert_sandbox(&db, "eph", sandbox_entity::SandboxStatus::Stopped, true).await;
std::fs::create_dir_all(dir.path().join("eph")).unwrap();
let report =
run_sandbox_lifecycle_maintenance(&db, dir.path(), MaintenanceLimits::default())
.await
.unwrap();
assert_eq!(report.reconciled, 1);
assert_eq!(report.removed, 1);
assert_eq!(report.errors, 0);
assert_eq!(
status_of(&db, dead).await,
Some(sandbox_entity::SandboxStatus::Crashed)
);
assert!(status_of(&db, eph).await.is_none());
assert!(!dir.path().join("eph").exists());
}
}