use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use uuid::Uuid;
use crate::context::ContextManager;
use crate::orchestrator::job_manager::ContainerJobManager;
use crate::sandbox::connect_docker;
#[derive(Debug, Clone)]
pub struct ReaperConfig {
pub scan_interval: Duration,
pub orphan_threshold: Duration,
pub container_label: String,
}
impl Default for ReaperConfig {
fn default() -> Self {
Self {
scan_interval: Duration::from_secs(300),
orphan_threshold: Duration::from_secs(600),
container_label: "ironclaw.job_id".to_string(),
}
}
}
pub struct SandboxReaper {
docker: bollard::Docker,
job_manager: Arc<ContainerJobManager>,
context_manager: Arc<ContextManager>,
config: ReaperConfig,
}
impl SandboxReaper {
pub async fn new(
job_manager: Arc<ContainerJobManager>,
context_manager: Arc<ContextManager>,
config: ReaperConfig,
) -> Result<Self, crate::sandbox::SandboxError> {
let docker = connect_docker().await?;
Ok(Self {
docker,
job_manager,
context_manager,
config,
})
}
pub async fn run(self) {
if self.config.scan_interval.as_secs() == 0 {
tracing::error!(
"Reaper: scan_interval must be > 0, got {:?}. Reaper will not start.",
self.config.scan_interval
);
return;
}
let mut interval = tokio::time::interval(self.config.scan_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
self.scan_and_reap().await;
}
}
async fn scan_and_reap(&self) {
let containers = match self.list_ironclaw_containers().await {
Ok(c) => c,
Err(e) => {
tracing::error!(error = %e, "Reaper: failed to list Docker containers");
return;
}
};
let now = Utc::now();
let threshold = match chrono::Duration::from_std(self.config.orphan_threshold) {
Ok(d) => d,
Err(e) => {
tracing::warn!(
error = %e,
"Reaper: failed to convert orphan_threshold to chrono::Duration, using default of 10 minutes"
);
chrono::Duration::minutes(10)
}
};
for (container_id, job_id, created_at) in containers {
let age = now.signed_duration_since(created_at);
if age < threshold {
continue; }
let is_active = match self.context_manager.get_context(job_id).await {
Ok(ctx) => ctx.state.is_active(),
Err(_) => false, };
if is_active {
tracing::debug!(
job_id = %job_id,
container_id = %&container_id[..12.min(container_id.len())],
"Reaper: container has active job, skipping"
);
continue;
}
tracing::info!(
job_id = %job_id,
container_id = %&container_id[..12.min(container_id.len())],
age_secs = age.num_seconds(),
"Reaper: orphaned container detected, cleaning up"
);
self.reap_container(&container_id, job_id).await;
}
}
async fn list_ironclaw_containers(
&self,
) -> Result<Vec<(String, Uuid, DateTime<Utc>)>, bollard::errors::Error> {
use bollard::container::ListContainersOptions;
let mut filters = HashMap::new();
filters.insert("label", vec![self.config.container_label.as_str()]);
let options = ListContainersOptions {
all: true, filters,
..Default::default()
};
let summaries = self.docker.list_containers(Some(options)).await?;
let mut result = Vec::new();
for summary in summaries {
let container_id = match summary.id {
Some(id) => id,
None => continue,
};
let labels = summary.labels.unwrap_or_default();
let job_id = match labels
.get(&self.config.container_label)
.and_then(|s| s.parse::<Uuid>().ok())
{
Some(id) => id,
None => {
tracing::warn!(
container_id = %&container_id[..12.min(container_id.len())],
label_key = %&self.config.container_label,
"Reaper: ironclaw container missing valid job_id label"
);
continue;
}
};
let created_at = match labels
.get("ironclaw.created_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.or_else(|| {
summary
.created
.and_then(|ts| DateTime::from_timestamp(ts, 0))
}) {
Some(ts) => ts,
None => {
tracing::warn!(
container_id = %&container_id[..12.min(container_id.len())],
"Reaper: could not determine creation time for container, skipping"
);
continue;
}
};
result.push((container_id, job_id, created_at));
}
Ok(result)
}
async fn reap_container(&self, container_id: &str, job_id: Uuid) {
match self.job_manager.stop_job(job_id).await {
Ok(()) => {
tracing::info!(
job_id = %job_id,
"Reaper: cleaned up orphaned container via job_manager"
);
return;
}
Err(e) => {
tracing::debug!(
job_id = %job_id,
error = %e,
"Reaper: job_manager.stop_job failed (likely no handle after restart), falling back to direct Docker cleanup"
);
}
}
if let Err(e) = self
.docker
.stop_container(
container_id,
Some(bollard::container::StopContainerOptions { t: 10 }),
)
.await
{
tracing::debug!(
job_id = %job_id,
container_id = %&container_id[..12.min(container_id.len())],
error = %e,
"Reaper: stop_container failed (may already be stopped)"
);
}
if let Err(e) = self
.docker
.remove_container(
container_id,
Some(bollard::container::RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await
{
tracing::error!(
job_id = %job_id,
container_id = %&container_id[..12.min(container_id.len())],
error = %e,
"Reaper: failed to remove orphaned container"
);
} else {
tracing::info!(
job_id = %job_id,
container_id = %&container_id[..12.min(container_id.len())],
"Reaper: removed orphaned container via direct Docker API"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
#[test]
fn orphan_threshold_filters_young_containers() {
let threshold = chrono::Duration::minutes(10);
let young_age = chrono::Duration::minutes(2);
assert!(young_age < threshold, "Young container should be skipped");
}
#[test]
fn orphan_threshold_allows_old_containers() {
let threshold = chrono::Duration::minutes(10);
let old_age = chrono::Duration::minutes(15);
assert!(old_age >= threshold, "Old container should be reaped");
}
#[tokio::test]
async fn active_job_is_not_orphaned() {
let ctx_mgr = Arc::new(ContextManager::new(5));
let job_id = ctx_mgr
.create_job_for_user("default", "test", "test description")
.await
.unwrap();
let ctx = ctx_mgr.get_context(job_id).await.unwrap();
assert!(ctx.state.is_active(), "Pending job should be active");
}
#[tokio::test]
async fn missing_job_is_treated_as_orphaned() {
let ctx_mgr = Arc::new(ContextManager::new(5));
let job_id = Uuid::new_v4(); let is_active = match ctx_mgr.get_context(job_id).await {
Ok(ctx) => ctx.state.is_active(),
Err(_) => false,
};
assert!(!is_active, "Missing job should be treated as orphaned");
}
#[tokio::test]
async fn terminal_job_is_treated_as_orphaned() {
use crate::context::JobState;
let ctx_mgr = Arc::new(ContextManager::new(5));
let job_id = ctx_mgr
.create_job_for_user("default", "test", "test description")
.await
.unwrap();
ctx_mgr
.update_context(job_id, |ctx| {
ctx.state = JobState::Failed;
})
.await
.unwrap();
let ctx = ctx_mgr.get_context(job_id).await.unwrap();
assert!(
!ctx.state.is_active(),
"Failed job should be treated as orphaned"
);
}
#[allow(dead_code)]
struct MockDocker {
containers: Arc<std::sync::Mutex<Vec<ContainerSummary>>>,
stop_called: Arc<AtomicU32>,
remove_called: Arc<AtomicU32>,
stop_error: Arc<AtomicBool>,
remove_error: Arc<AtomicBool>,
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
struct ContainerSummary {
id: String,
labels: HashMap<String, String>,
created: Option<i64>,
}
#[allow(dead_code)]
impl MockDocker {
fn new() -> Self {
Self {
containers: Arc::new(std::sync::Mutex::new(Vec::new())),
stop_called: Arc::new(AtomicU32::new(0)),
remove_called: Arc::new(AtomicU32::new(0)),
stop_error: Arc::new(AtomicBool::new(false)),
remove_error: Arc::new(AtomicBool::new(false)),
}
}
fn add_container(&self, id: String, labels: HashMap<String, String>, created: Option<i64>) {
let mut cs = self.containers.lock().unwrap();
cs.push(ContainerSummary {
id,
labels,
created,
});
}
fn set_stop_error(&self, error: bool) {
self.stop_error.store(error, Ordering::SeqCst);
}
fn set_remove_error(&self, error: bool) {
self.remove_error.store(error, Ordering::SeqCst);
}
fn stop_call_count(&self) -> u32 {
self.stop_called.load(Ordering::SeqCst)
}
fn remove_call_count(&self) -> u32 {
self.remove_called.load(Ordering::SeqCst)
}
}
#[test]
fn parse_container_labels_extracts_job_id_and_timestamp() {
let mut labels = HashMap::new();
let job_id = Uuid::new_v4();
labels.insert("ironclaw.job_id".to_string(), job_id.to_string());
labels.insert(
"ironclaw.created_at".to_string(),
"2024-01-15T10:30:45+00:00".to_string(),
);
let parsed_id: Option<Uuid> = labels
.get("ironclaw.job_id")
.and_then(|s| s.parse::<Uuid>().ok());
assert_eq!(parsed_id, Some(job_id));
let parsed_time = labels
.get("ironclaw.created_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok());
assert!(parsed_time.is_some());
}
#[test]
fn missing_job_id_label_is_skipped() {
let labels: HashMap<String, String> = HashMap::new();
let job_id: Option<Uuid> = labels
.get("ironclaw.job_id")
.and_then(|s| s.parse::<Uuid>().ok());
assert_eq!(job_id, None);
}
#[test]
fn malformed_timestamp_fallback_works() {
let mut labels: HashMap<String, String> = HashMap::new();
labels.insert(
"ironclaw.created_at".to_string(),
"invalid-date".to_string(),
);
let parsed_time = labels
.get("ironclaw.created_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok());
assert!(
parsed_time.is_none(),
"Malformed timestamp should fail to parse"
);
let docker_timestamp: Option<i64> = Some(1705324245); let fallback = docker_timestamp.and_then(|ts| DateTime::from_timestamp(ts, 0));
assert!(
fallback.is_some(),
"Docker timestamp fallback should parse successfully"
);
}
#[tokio::test]
async fn age_calculation_correctly_filters_containers() {
let now = Utc::now();
let young_container = now - chrono::Duration::minutes(2);
let old_container = now - chrono::Duration::minutes(20);
let threshold = chrono::Duration::minutes(10);
let young_age = now.signed_duration_since(young_container);
let old_age = now.signed_duration_since(old_container);
assert!(
young_age < threshold,
"Young container should not be cleaned"
);
assert!(old_age >= threshold, "Old container should be cleaned");
}
#[tokio::test]
async fn active_job_prevents_cleanup_of_old_container() {
let ctx_mgr = Arc::new(ContextManager::new(5));
let job_id = ctx_mgr
.create_job_for_user("default", "test", "test job")
.await
.unwrap();
let ctx = ctx_mgr.get_context(job_id).await.unwrap();
assert!(ctx.state.is_active());
let is_active = match ctx_mgr.get_context(job_id).await {
Ok(ctx) => ctx.state.is_active(),
Err(_) => false,
};
assert!(is_active, "Active job should prevent cleanup");
}
#[tokio::test]
async fn failed_job_allows_cleanup() {
use crate::context::JobState;
let ctx_mgr = Arc::new(ContextManager::new(5));
let job_id = ctx_mgr
.create_job_for_user("default", "test", "test")
.await
.unwrap();
ctx_mgr
.update_context(job_id, |ctx| {
ctx.state = JobState::Failed;
})
.await
.unwrap();
let ctx = ctx_mgr.get_context(job_id).await.unwrap();
assert!(
!ctx.state.is_active(),
"Failed job (terminal state) should allow cleanup"
);
}
#[test]
fn reaper_config_defaults_are_reasonable() {
let cfg = ReaperConfig::default();
assert_eq!(
cfg.scan_interval,
Duration::from_secs(300),
"Scan interval should be 5 min"
);
assert_eq!(
cfg.orphan_threshold,
Duration::from_secs(600),
"Orphan threshold should be 10 min"
);
assert_eq!(cfg.container_label, "ironclaw.job_id");
}
#[test]
fn reaper_config_can_be_customized() {
let cfg = ReaperConfig {
scan_interval: Duration::from_secs(60),
orphan_threshold: Duration::from_secs(300),
container_label: "custom.label".to_string(),
};
assert_eq!(cfg.scan_interval, Duration::from_secs(60));
assert_eq!(cfg.orphan_threshold, Duration::from_secs(300));
assert_eq!(cfg.container_label, "custom.label");
}
#[tokio::test]
async fn reaper_cleanup_decision_matrix() {
use crate::context::JobState;
let ctx_mgr = Arc::new(ContextManager::new(5));
let job1 = ctx_mgr
.create_job_for_user("default", "test", "test1")
.await
.unwrap();
let ctx1 = ctx_mgr.get_context(job1).await.unwrap();
assert!(ctx1.state.is_active(), "Pending job is active");
assert!(ctx1.state.is_active(), "Should NOT cleanup active jobs");
let job2 = ctx_mgr
.create_job_for_user("default", "test", "test2")
.await
.unwrap();
ctx_mgr
.update_context(job2, |ctx| {
ctx.state = JobState::InProgress;
})
.await
.unwrap();
let ctx2 = ctx_mgr.get_context(job2).await.unwrap();
assert!(ctx2.state.is_active(), "InProgress job is active");
assert!(ctx2.state.is_active(), "Should NOT cleanup active jobs");
let job3 = ctx_mgr
.create_job_for_user("default", "test", "test3")
.await
.unwrap();
ctx_mgr
.update_context(job3, |ctx| {
ctx.state = JobState::Completed;
})
.await
.unwrap();
let ctx3 = ctx_mgr.get_context(job3).await.unwrap();
assert!(ctx3.state.is_active(), "Completed is still active");
let job4 = ctx_mgr
.create_job_for_user("default", "test", "test4")
.await
.unwrap();
ctx_mgr
.update_context(job4, |ctx| {
ctx.state = JobState::Failed;
})
.await
.unwrap();
let ctx4 = ctx_mgr.get_context(job4).await.unwrap();
assert!(
!ctx4.state.is_active(),
"Failed job is terminal (should cleanup if old)"
);
let job5 = ctx_mgr
.create_job_for_user("default", "test", "test5")
.await
.unwrap();
ctx_mgr
.update_context(job5, |ctx| {
ctx.state = JobState::Cancelled;
})
.await
.unwrap();
let ctx5 = ctx_mgr.get_context(job5).await.unwrap();
assert!(!ctx5.state.is_active(), "Cancelled job is terminal");
let missing_job = Uuid::new_v4();
let is_active = match ctx_mgr.get_context(missing_job).await {
Ok(ctx) => ctx.state.is_active(),
Err(_) => false,
};
assert!(!is_active, "Missing job should be treated as inactive");
}
#[cfg(all(test, not(target_env = "msvc")))]
mod e2e_tests {
use super::*;
fn should_run_e2e() -> bool {
std::env::var("IRONCLAW_E2E_DOCKER_TESTS").is_ok()
}
#[tokio::test]
async fn e2e_reaper_lists_ironclaw_containers() {
if !should_run_e2e() {
eprintln!("Skipping e2e test (set IRONCLAW_E2E_DOCKER_TESTS=1 to run)");
return;
}
let docker = match crate::sandbox::connect_docker().await {
Ok(d) => d,
Err(e) => {
eprintln!("Skipping e2e test: Docker unavailable: {e}");
return;
}
};
let job_id = Uuid::new_v4();
let test_name = format!("ironclaw-reaper-test-{}", &job_id.to_string()[..8]);
let job_id_str = job_id.to_string();
let created_at_str = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
let mut labels_str: std::collections::HashMap<&str, &str> =
std::collections::HashMap::new();
labels_str.insert("ironclaw.job_id", &job_id_str);
labels_str.insert("ironclaw.created_at", &created_at_str);
let config = bollard::container::CreateContainerOptions {
name: test_name.as_str(),
platform: None,
};
let container_config = bollard::container::Config {
image: Some("alpine:latest"),
labels: Some(labels_str),
..Default::default()
};
let response = match docker
.create_container(Some(config), container_config)
.await
{
Ok(r) => r,
Err(e) => {
eprintln!("Skipping e2e test: Could not create test container: {e}");
return;
}
};
let container_id = &response.id;
tracing::info!(
container_id = %&container_id[..12.min(container_id.len())],
job_id = %job_id,
"e2e test: created test container"
);
let inspect = match docker.inspect_container(container_id, None).await {
Ok(c) => c,
Err(e) => {
let _ = docker.remove_container(container_id, None).await;
eprintln!("Failed to inspect container: {e}");
return;
}
};
let labels = inspect.config.and_then(|c| c.labels).unwrap_or_default();
assert!(
labels.contains_key("ironclaw.job_id"),
"Container should have ironclaw.job_id label"
);
assert_eq!(
labels.get("ironclaw.job_id").map(|s| s.as_str()),
Some(job_id.to_string().as_str()),
"job_id label should match"
);
tracing::info!("e2e test: verified container labels");
let _ = docker.remove_container(container_id, None).await;
tracing::info!("e2e test: cleaned up test container");
}
#[tokio::test]
async fn e2e_reaper_removes_orphaned_containers() {
if !should_run_e2e() {
eprintln!("Skipping e2e test (set IRONCLAW_E2E_DOCKER_TESTS=1 to run)");
return;
}
let docker = match crate::sandbox::connect_docker().await {
Ok(d) => d,
Err(e) => {
eprintln!("Skipping e2e test: Docker unavailable: {e}");
return;
}
};
let orphaned_job_id = Uuid::new_v4();
let test_name = format!("ironclaw-orphan-test-{}", &orphaned_job_id.to_string()[..8]);
let job_id_str = orphaned_job_id.to_string();
let created_at_str = (Utc::now() - chrono::Duration::hours(2)).to_rfc3339();
let mut labels: std::collections::HashMap<&str, &str> =
std::collections::HashMap::new();
labels.insert("ironclaw.job_id", &job_id_str);
labels.insert("ironclaw.created_at", &created_at_str);
let config = bollard::container::CreateContainerOptions {
name: test_name.as_str(),
platform: None,
};
let container_config = bollard::container::Config {
image: Some("alpine:latest"),
labels: Some(labels),
..Default::default()
};
let response = match docker
.create_container(Some(config), container_config)
.await
{
Ok(r) => r,
Err(e) => {
eprintln!("Skipping e2e test: Could not create test container: {e}");
return;
}
};
let container_id = response.id.clone();
tracing::info!(
container_id = %&container_id[..12.min(container_id.len())],
job_id = %orphaned_job_id,
"e2e test: created orphaned test container"
);
let exists_before = docker.inspect_container(&container_id, None).await.is_ok();
assert!(exists_before, "Container should exist before cleanup");
let _ = docker
.stop_container(
&container_id,
Some(bollard::container::StopContainerOptions { t: 10 }),
)
.await;
let removal_result = docker
.remove_container(
&container_id,
Some(bollard::container::RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await;
match removal_result {
Ok(()) => {
tracing::info!(
container_id = %&container_id[..12.min(container_id.len())],
"e2e test: successfully removed orphaned container"
);
let exists_after = docker.inspect_container(&container_id, None).await.is_ok();
assert!(!exists_after, "Container should not exist after removal");
}
Err(e) => {
eprintln!("Warning: failed to remove test container: {e}");
let _ = docker.remove_container(&container_id, None).await;
}
}
}
#[tokio::test]
async fn e2e_reaper_respects_age_threshold() {
if !should_run_e2e() {
eprintln!("Skipping e2e test (set IRONCLAW_E2E_DOCKER_TESTS=1 to run)");
return;
}
let docker = match crate::sandbox::connect_docker().await {
Ok(d) => d,
Err(e) => {
eprintln!("Skipping e2e test: Docker unavailable: {e}");
return;
}
};
let old_job_id = Uuid::new_v4();
let new_job_id = Uuid::new_v4();
let old_id_str = old_job_id.to_string();
let old_time_str = (Utc::now() - chrono::Duration::hours(2)).to_rfc3339();
let mut old_labels: std::collections::HashMap<&str, &str> =
std::collections::HashMap::new();
old_labels.insert("ironclaw.job_id", &old_id_str);
old_labels.insert("ironclaw.created_at", &old_time_str);
let new_id_str = new_job_id.to_string();
let new_time_str = (Utc::now() - chrono::Duration::minutes(1)).to_rfc3339();
let mut new_labels: std::collections::HashMap<&str, &str> =
std::collections::HashMap::new();
new_labels.insert("ironclaw.job_id", &new_id_str);
new_labels.insert("ironclaw.created_at", &new_time_str);
let mut containers_to_cleanup = Vec::new();
let old_name = format!("ironclaw-age-old-{}", &old_job_id.to_string()[..8]);
if let Ok(r) = docker
.create_container(
Some(bollard::container::CreateContainerOptions {
name: old_name.as_str(),
platform: None,
}),
bollard::container::Config {
image: Some("alpine:latest"),
labels: Some(old_labels),
..Default::default()
},
)
.await
{
containers_to_cleanup.push(r.id.clone());
tracing::info!("e2e test: created old orphaned container for age threshold test");
}
let new_name = format!("ironclaw-age-new-{}", &new_job_id.to_string()[..8]);
if let Ok(r) = docker
.create_container(
Some(bollard::container::CreateContainerOptions {
name: new_name.as_str(),
platform: None,
}),
bollard::container::Config {
image: Some("alpine:latest"),
labels: Some(new_labels),
..Default::default()
},
)
.await
{
containers_to_cleanup.push(r.id.clone());
tracing::info!("e2e test: created new orphaned container for age threshold test");
}
assert_eq!(
containers_to_cleanup.len(),
2,
"Should have created 2 test containers"
);
for container_id in containers_to_cleanup {
let _ = docker
.stop_container(
&container_id,
Some(bollard::container::StopContainerOptions { t: 10 }),
)
.await;
let _ = docker
.remove_container(
&container_id,
Some(bollard::container::RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await;
}
tracing::info!("e2e test: age threshold test completed and cleaned up");
}
}
}