use chrono::{DateTime, Duration, Utc};
use uuid::Uuid;
pub async fn cleanup_old_executions(
executions: &crate::ExecutionMap,
cancellation_tokens: &crate::CancellationTokenMap,
retention_secs: u64,
max_in_memory: usize,
) -> usize {
let mut executions_lock = executions.write().await;
let original_count = executions_lock.len();
if original_count == 0 {
return 0;
}
let now = Utc::now();
let retention_duration = Duration::seconds(retention_secs as i64);
let ids_to_remove: Vec<Uuid> = {
let mut ids = Vec::new();
for (id, state_arc) in executions_lock.iter() {
let state = state_arc.read().await;
if state.status.is_terminal() {
let age = now.signed_duration_since(state.started_at);
if age > retention_duration {
ids.push(*id);
}
}
}
ids
};
let mut tokens_lock = cancellation_tokens.write().await;
for id in ids_to_remove {
executions_lock.remove(&id);
tokens_lock.remove(&id);
}
if executions_lock.len() > max_in_memory {
let mut terminal_executions: Vec<(Uuid, DateTime<Utc>)> = Vec::new();
for (id, state_arc) in executions_lock.iter() {
let state = state_arc.read().await;
if state.status.is_terminal() {
terminal_executions.push((*id, state.started_at));
}
}
terminal_executions.sort_by_key(|(_, started_at)| *started_at);
let current_count = executions_lock.len();
if current_count > max_in_memory {
let to_remove_count = current_count - max_in_memory;
for (id, _) in terminal_executions.iter().take(to_remove_count) {
executions_lock.remove(id);
tokens_lock.remove(id);
}
}
}
original_count - executions_lock.len()
}
pub async fn remove_execution(executions: &crate::ExecutionMap, execution_id: Uuid) -> bool {
let mut executions_lock = executions.write().await;
executions_lock.remove(&execution_id).is_some()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{ExecutionRequest, ExecutionState, ExecutionStatus};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
fn create_test_state(id: Uuid, status: ExecutionStatus, age_secs: i64) -> ExecutionState {
let started_at = Utc::now() - Duration::seconds(age_secs);
let mut state = ExecutionState::new(ExecutionRequest {
id,
command: crate::types::Command::Shell {
command: "test".to_string(),
shell: "bash".to_string(),
},
env: HashMap::new(),
working_dir: None,
timeout_ms: Some(5000),
output_log_path: None,
metadata: Default::default(),
});
state.status = status;
state.started_at = started_at;
if status.is_terminal() {
state.completed_at = Some(started_at + Duration::seconds(5));
}
state
}
#[tokio::test]
async fn test_cleanup_removes_old_executions() {
let executions = Arc::new(RwLock::new(HashMap::new()));
let tokens = Arc::new(RwLock::new(HashMap::new()));
let old_id = Uuid::new_v4();
let recent_id = Uuid::new_v4();
{
let mut map = executions.write().await;
map.insert(
old_id,
Arc::new(RwLock::new(create_test_state(
old_id,
ExecutionStatus::Completed,
7200, ))),
);
map.insert(
recent_id,
Arc::new(RwLock::new(create_test_state(
recent_id,
ExecutionStatus::Completed,
1800, ))),
);
}
let removed = cleanup_old_executions(&executions, &tokens, 3600, 1000).await;
assert_eq!(removed, 1);
let map = executions.read().await;
assert!(!map.contains_key(&old_id));
assert!(map.contains_key(&recent_id));
}
#[tokio::test]
async fn test_cleanup_preserves_running_executions() {
let executions = Arc::new(RwLock::new(HashMap::new()));
let tokens = Arc::new(RwLock::new(HashMap::new()));
let running_id = Uuid::new_v4();
let old_id = Uuid::new_v4();
{
let mut map = executions.write().await;
map.insert(
running_id,
Arc::new(RwLock::new(create_test_state(
running_id,
ExecutionStatus::Running,
7200, ))),
);
map.insert(
old_id,
Arc::new(RwLock::new(create_test_state(
old_id,
ExecutionStatus::Completed,
7200,
))),
);
}
let removed = cleanup_old_executions(&executions, &tokens, 3600, 1000).await;
assert_eq!(removed, 1);
let map = executions.read().await;
assert!(map.contains_key(&running_id)); assert!(!map.contains_key(&old_id)); }
#[tokio::test]
async fn test_cleanup_enforces_count_limit() {
let executions = Arc::new(RwLock::new(HashMap::new()));
let tokens = Arc::new(RwLock::new(HashMap::new()));
let mut ids = Vec::new();
{
let mut map = executions.write().await;
for i in 0..10 {
let id = Uuid::new_v4();
ids.push(id);
map.insert(
id,
Arc::new(RwLock::new(create_test_state(
id,
ExecutionStatus::Completed,
(10 - i) * 60, ))),
);
}
}
let removed = cleanup_old_executions(&executions, &tokens, 3600, 5).await;
assert_eq!(removed, 5);
let map = executions.read().await;
assert_eq!(map.len(), 5);
for i in 0..5 {
assert!(!map.contains_key(&ids[i]), "Oldest should be removed");
}
for i in 5..10 {
assert!(map.contains_key(&ids[i]), "Newest should be kept");
}
}
#[tokio::test]
async fn test_remove_execution() {
let executions = Arc::new(RwLock::new(HashMap::new()));
let id = Uuid::new_v4();
{
let mut map = executions.write().await;
map.insert(
id,
Arc::new(RwLock::new(create_test_state(
id,
ExecutionStatus::Completed,
60,
))),
);
}
let removed = remove_execution(&executions, id).await;
assert!(removed);
{
let map = executions.read().await;
assert!(!map.contains_key(&id));
}
let removed = remove_execution(&executions, Uuid::new_v4()).await;
assert!(!removed);
}
#[tokio::test]
async fn test_cleanup_empty_map() {
let executions = Arc::new(RwLock::new(HashMap::new()));
let tokens = Arc::new(RwLock::new(HashMap::new()));
let removed = cleanup_old_executions(&executions, &tokens, 3600, 1000).await;
assert_eq!(removed, 0);
}
}