escher-execution-engine 0.1.2

Production-ready async execution engine for system commands
Documentation
//! Memory cleanup for execution engine
//!
//! Provides automatic cleanup of old executions based on retention policies.

use chrono::{DateTime, Duration, Utc};
use uuid::Uuid;

/// Clean up old executions from the execution map
///
/// Removes executions based on two criteria:
/// 1. Age: Removes executions older than retention period
/// 2. Count: If count exceeds max, keeps only most recent executions
///
/// Also removes corresponding cancellation tokens.
///
/// Returns the number of executions removed.
pub async fn cleanup_old_executions(
    executions: &crate::ExecutionMap,
    cancellation_tokens: &crate::CancellationTokenMap,
    retention_secs: u64,
    max_in_memory: usize,
) -> usize {
    let mut executions_lock = executions.write().await;
    let original_count = executions_lock.len();

    // If empty, nothing to clean
    if original_count == 0 {
        return 0;
    }

    let now = Utc::now();
    let retention_duration = Duration::seconds(retention_secs as i64);

    // Step 1: Remove by age (only terminal executions older than retention period)
    let ids_to_remove: Vec<Uuid> = {
        let mut ids = Vec::new();

        for (id, state_arc) in executions_lock.iter() {
            let state = state_arc.read().await;

            // Only remove terminal executions
            if state.status.is_terminal() {
                let age = now.signed_duration_since(state.started_at);
                if age > retention_duration {
                    ids.push(*id);
                }
            }
        }

        ids
    };

    // Remove aged-out executions and their cancellation tokens
    let mut tokens_lock = cancellation_tokens.write().await;
    for id in ids_to_remove {
        executions_lock.remove(&id);
        tokens_lock.remove(&id);
    }

    // Step 2: Remove by count limit (keep most recent if over limit)
    if executions_lock.len() > max_in_memory {
        // Collect terminal executions with their start times
        let mut terminal_executions: Vec<(Uuid, DateTime<Utc>)> = Vec::new();

        for (id, state_arc) in executions_lock.iter() {
            let state = state_arc.read().await;
            if state.status.is_terminal() {
                terminal_executions.push((*id, state.started_at));
            }
        }

        // Sort by started_at (oldest first)
        terminal_executions.sort_by_key(|(_, started_at)| *started_at);

        // Calculate how many to remove
        let current_count = executions_lock.len();
        if current_count > max_in_memory {
            let to_remove_count = current_count - max_in_memory;

            // Remove oldest terminal executions and their cancellation tokens
            for (id, _) in terminal_executions.iter().take(to_remove_count) {
                executions_lock.remove(id);
                tokens_lock.remove(id);
            }
        }
    }

    original_count - executions_lock.len()
}

/// Remove a specific execution from the map
///
/// Returns true if the execution was found and removed, false otherwise.
pub async fn remove_execution(executions: &crate::ExecutionMap, execution_id: Uuid) -> bool {
    let mut executions_lock = executions.write().await;
    executions_lock.remove(&execution_id).is_some()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::{ExecutionRequest, ExecutionState, ExecutionStatus};
    use std::collections::HashMap;
    use std::sync::Arc;
    use tokio::sync::RwLock;

    fn create_test_state(id: Uuid, status: ExecutionStatus, age_secs: i64) -> ExecutionState {
        let started_at = Utc::now() - Duration::seconds(age_secs);
        let mut state = ExecutionState::new(ExecutionRequest {
            id,
            command: crate::types::Command::Shell {
                command: "test".to_string(),
                shell: "bash".to_string(),
            },
            env: HashMap::new(),
            working_dir: None,
            timeout_ms: Some(5000),
            output_log_path: None,
            metadata: Default::default(),
        });
        state.status = status;
        state.started_at = started_at;

        if status.is_terminal() {
            state.completed_at = Some(started_at + Duration::seconds(5));
        }

        state
    }

    #[tokio::test]
    async fn test_cleanup_removes_old_executions() {
        let executions = Arc::new(RwLock::new(HashMap::new()));
        let tokens = Arc::new(RwLock::new(HashMap::new()));

        // Add executions with different ages
        let old_id = Uuid::new_v4();
        let recent_id = Uuid::new_v4();

        {
            let mut map = executions.write().await;

            // Old completed execution (2 hours old)
            map.insert(
                old_id,
                Arc::new(RwLock::new(create_test_state(
                    old_id,
                    ExecutionStatus::Completed,
                    7200, // 2 hours
                ))),
            );

            // Recent completed execution (30 minutes old)
            map.insert(
                recent_id,
                Arc::new(RwLock::new(create_test_state(
                    recent_id,
                    ExecutionStatus::Completed,
                    1800, // 30 minutes
                ))),
            );
        }

        // Cleanup with 1 hour retention
        let removed = cleanup_old_executions(&executions, &tokens, 3600, 1000).await;

        assert_eq!(removed, 1);

        let map = executions.read().await;
        assert!(!map.contains_key(&old_id));
        assert!(map.contains_key(&recent_id));
    }

    #[tokio::test]
    async fn test_cleanup_preserves_running_executions() {
        let executions = Arc::new(RwLock::new(HashMap::new()));
        let tokens = Arc::new(RwLock::new(HashMap::new()));

        let running_id = Uuid::new_v4();
        let old_id = Uuid::new_v4();

        {
            let mut map = executions.write().await;

            // Old running execution (should be kept)
            map.insert(
                running_id,
                Arc::new(RwLock::new(create_test_state(
                    running_id,
                    ExecutionStatus::Running,
                    7200, // 2 hours old but still running
                ))),
            );

            // Old completed execution (should be removed)
            map.insert(
                old_id,
                Arc::new(RwLock::new(create_test_state(
                    old_id,
                    ExecutionStatus::Completed,
                    7200,
                ))),
            );
        }

        // Cleanup with 1 hour retention
        let removed = cleanup_old_executions(&executions, &tokens, 3600, 1000).await;

        assert_eq!(removed, 1);

        let map = executions.read().await;
        assert!(map.contains_key(&running_id)); // Running preserved
        assert!(!map.contains_key(&old_id)); // Completed removed
    }

    #[tokio::test]
    async fn test_cleanup_enforces_count_limit() {
        let executions = Arc::new(RwLock::new(HashMap::new()));
        let tokens = Arc::new(RwLock::new(HashMap::new()));

        // Add 10 completed executions
        let mut ids = Vec::new();
        {
            let mut map = executions.write().await;

            for i in 0..10 {
                let id = Uuid::new_v4();
                ids.push(id);

                // Create with different ages (older first)
                map.insert(
                    id,
                    Arc::new(RwLock::new(create_test_state(
                        id,
                        ExecutionStatus::Completed,
                        (10 - i) * 60, // 10min, 9min, ..., 1min old
                    ))),
                );
            }
        }

        // Cleanup with limit of 5
        let removed = cleanup_old_executions(&executions, &tokens, 3600, 5).await;

        assert_eq!(removed, 5);

        let map = executions.read().await;
        assert_eq!(map.len(), 5);

        // Verify oldest 5 were removed, newest 5 kept
        for i in 0..5 {
            assert!(!map.contains_key(&ids[i]), "Oldest should be removed");
        }
        for i in 5..10 {
            assert!(map.contains_key(&ids[i]), "Newest should be kept");
        }
    }

    #[tokio::test]
    async fn test_remove_execution() {
        let executions = Arc::new(RwLock::new(HashMap::new()));

        let id = Uuid::new_v4();

        {
            let mut map = executions.write().await;
            map.insert(
                id,
                Arc::new(RwLock::new(create_test_state(
                    id,
                    ExecutionStatus::Completed,
                    60,
                ))),
            );
        }

        // Remove existing execution
        let removed = remove_execution(&executions, id).await;
        assert!(removed);

        {
            let map = executions.read().await;
            assert!(!map.contains_key(&id));
        }

        // Try to remove non-existent execution
        let removed = remove_execution(&executions, Uuid::new_v4()).await;
        assert!(!removed);
    }

    #[tokio::test]
    async fn test_cleanup_empty_map() {
        let executions = Arc::new(RwLock::new(HashMap::new()));
        let tokens = Arc::new(RwLock::new(HashMap::new()));

        let removed = cleanup_old_executions(&executions, &tokens, 3600, 1000).await;
        assert_eq!(removed, 0);
    }
}