use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use sysinfo::{Pid, System};
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio::time::timeout;
use crate::types::*;
pub struct TaskManager {
task_timeout: Duration,
running_tasks: Arc<RwLock<HashMap<AgentId, TaskHandle>>>,
task_sender: mpsc::UnboundedSender<TaskCommand>,
#[allow(dead_code)]
system_info: Arc<RwLock<System>>,
}
impl TaskManager {
pub fn new(task_timeout: Duration) -> Self {
let running_tasks = Arc::new(RwLock::new(HashMap::new()));
let (task_sender, task_receiver) = mpsc::unbounded_channel();
let manager = Self {
task_timeout,
running_tasks: running_tasks.clone(),
task_sender,
system_info: Arc::new(RwLock::new(System::new_all())),
};
manager.start_task_loop(task_receiver);
manager
}
pub async fn start_task(&self, task: super::ScheduledTask) -> Result<(), SchedulerError> {
let handle = TaskHandle::new(task.clone());
let agent_id = task.agent_id;
self.running_tasks.write().insert(agent_id, handle.clone());
self.task_sender
.send(TaskCommand::Start {
task: Box::new(task),
handle,
})
.map_err(|_| SchedulerError::SchedulingFailed {
agent_id,
reason: "Failed to send start command".to_string(),
})?;
Ok(())
}
pub async fn terminate_task(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
let handle = self.running_tasks.write().remove(&agent_id);
if let Some(handle) = handle {
self.task_sender
.send(TaskCommand::Terminate { agent_id, handle })
.map_err(|_| SchedulerError::SchedulingFailed {
agent_id,
reason: "Failed to send terminate command".to_string(),
})?;
}
Ok(())
}
pub async fn check_task_health(&self, agent_id: AgentId) -> Result<TaskHealth, SchedulerError> {
let running_tasks = self.running_tasks.read();
if let Some(handle) = running_tasks.get(&agent_id) {
let health = handle.get_health();
if health.uptime > self.task_timeout {
return Err(SchedulerError::SchedulingFailed {
agent_id,
reason: "Task timeout exceeded".to_string(),
});
}
Ok(health)
} else {
Err(SchedulerError::AgentNotFound { agent_id })
}
}
pub async fn get_task_statistics(&self) -> TaskStatistics {
let running_tasks = self.running_tasks.read();
let total_tasks = running_tasks.len();
let mut healthy_tasks = 0;
let mut total_uptime = Duration::from_secs(0);
let mut total_memory_usage = 0;
for handle in running_tasks.values() {
let health = handle.get_health();
if health.is_healthy {
healthy_tasks += 1;
}
total_uptime += health.uptime;
total_memory_usage += health.memory_usage;
}
TaskStatistics {
total_tasks,
healthy_tasks,
average_uptime: if total_tasks > 0 {
total_uptime / total_tasks as u32
} else {
Duration::from_secs(0)
},
total_memory_usage,
}
}
fn start_task_loop(&self, mut task_receiver: mpsc::UnboundedReceiver<TaskCommand>) {
let task_timeout = self.task_timeout;
tokio::spawn(async move {
while let Some(command) = task_receiver.recv().await {
match command {
TaskCommand::Start { task, handle } => {
Self::execute_task(*task, handle, task_timeout).await;
}
TaskCommand::Terminate { agent_id, handle } => {
Self::terminate_task_execution(agent_id, handle).await;
}
}
}
});
}
async fn execute_task(task: super::ScheduledTask, handle: TaskHandle, task_timeout: Duration) {
let agent_id = task.agent_id;
tracing::info!("Starting execution of agent {}", agent_id);
handle.set_status(TaskStatus::Running);
let execution_result = timeout(
task_timeout,
Self::execute_agent_task(task.clone(), handle.clone()),
)
.await;
match execution_result {
Ok(Ok(execution_metrics)) => {
tracing::info!("Agent {} completed successfully", agent_id);
handle.set_status(TaskStatus::Completed);
handle.update_execution_metrics(execution_metrics);
}
Ok(Err(e)) => {
tracing::error!("Agent {} failed: {}", agent_id, e);
handle.set_status(TaskStatus::Failed);
}
Err(_) => {
tracing::error!("Agent {} timed out", agent_id);
handle.set_status(TaskStatus::TimedOut);
}
}
}
async fn terminate_task_execution(agent_id: AgentId, handle: TaskHandle) {
tracing::info!("Terminating agent {}", agent_id);
if let Some(process_id) = handle.get_process_id() {
if let Err(e) = Self::terminate_process(process_id).await {
tracing::warn!("Failed to terminate process {}: {}", process_id, e);
}
}
handle.set_status(TaskStatus::Terminated);
}
async fn execute_agent_task(
task: super::ScheduledTask,
handle: TaskHandle,
) -> Result<ExecutionMetrics, String> {
let _start_time = SystemTime::now();
let agent_id = task.agent_id;
tracing::debug!(
"Executing agent {} with config: {:?}",
agent_id,
task.config
);
let execution_context = AgentExecutionContext::new(task.clone(), handle.clone());
match task.config.execution_mode {
ExecutionMode::Ephemeral => Self::execute_ephemeral_agent(execution_context).await,
ExecutionMode::Persistent => Self::execute_persistent_agent(execution_context).await,
ExecutionMode::Scheduled { interval } => {
Self::execute_scheduled_agent(execution_context, interval).await
}
ExecutionMode::CronScheduled { .. } => {
Self::execute_cron_scheduled_agent(execution_context).await
}
ExecutionMode::EventDriven => Self::execute_event_driven_agent(execution_context).await,
ExecutionMode::External { .. } => {
Err("External agents are not executed by the runtime".to_string())
}
}
}
async fn execute_ephemeral_agent(
mut context: AgentExecutionContext,
) -> Result<ExecutionMetrics, String> {
tracing::debug!("Executing ephemeral agent {}", context.task.agent_id);
let process_handle = Self::launch_agent_process(&context.task).await?;
context
.handle
.set_process_id(Some(process_handle.process_id));
let result = Self::monitor_process_execution(process_handle, &mut context).await;
let end_time = SystemTime::now();
let execution_time = end_time
.duration_since(context.start_time)
.unwrap_or_default();
Ok(ExecutionMetrics {
execution_time,
memory_peak_mb: context.memory_peak_mb,
cpu_time_ms: context.cpu_time_ms,
exit_code: context.exit_code,
error_count: context.error_count,
success: result.is_ok(),
})
}
async fn execute_persistent_agent(
context: AgentExecutionContext,
) -> Result<ExecutionMetrics, String> {
tracing::debug!("Executing persistent agent {}", context.task.agent_id);
let process_handle = Self::launch_agent_process(&context.task).await?;
context
.handle
.set_process_id(Some(process_handle.process_id));
let _monitoring_handle =
tokio::spawn(
async move { Self::monitor_persistent_process(process_handle, context).await },
);
Ok(ExecutionMetrics {
execution_time: Duration::from_secs(0),
memory_peak_mb: 0,
cpu_time_ms: 0,
exit_code: None,
error_count: 0,
success: true,
})
}
async fn execute_scheduled_agent(
context: AgentExecutionContext,
interval: Duration,
) -> Result<ExecutionMetrics, String> {
tracing::debug!(
"Executing scheduled agent {} (interval {:?}) as ephemeral run",
context.task.agent_id,
interval
);
Self::execute_ephemeral_agent(context).await
}
async fn execute_cron_scheduled_agent(
context: AgentExecutionContext,
) -> Result<ExecutionMetrics, String> {
tracing::debug!("Executing cron-scheduled agent {}", context.task.agent_id,);
Self::execute_ephemeral_agent(context).await
}
async fn execute_event_driven_agent(
context: AgentExecutionContext,
) -> Result<ExecutionMetrics, String> {
tracing::debug!("Executing event-driven agent {}", context.task.agent_id);
Self::execute_persistent_agent(context).await
}
async fn launch_agent_process(task: &super::ScheduledTask) -> Result<ProcessHandle, String> {
let agent_id = task.agent_id;
let execution_env = Self::create_execution_environment(task)?;
let mut command = Self::build_agent_command(task, &execution_env).await?;
tracing::debug!("Launching agent {} with command: {:?}", agent_id, command);
let child = command
.spawn()
.map_err(|e| format!("Failed to spawn agent process: {}", e))?;
let process_id = child.id().unwrap_or(0);
Ok(ProcessHandle {
process_id,
child: Arc::new(tokio::sync::Mutex::new(child)),
start_time: SystemTime::now(),
})
}
fn create_execution_environment(
task: &super::ScheduledTask,
) -> Result<ExecutionEnvironment, String> {
use std::env;
let working_dir = if cfg!(test) {
env::temp_dir()
.join(format!("agent_{}", task.agent_id))
.to_string_lossy()
.to_string()
} else {
format!("/tmp/agent_{}", task.agent_id)
};
Ok(ExecutionEnvironment {
working_directory: working_dir,
environment_variables: vec![
("AGENT_ID".to_string(), task.agent_id.to_string()),
(
"SECURITY_TIER".to_string(),
format!("{:?}", task.config.security_tier),
),
],
resource_limits: task.config.resource_limits.clone(),
})
}
async fn build_agent_command(
task: &super::ScheduledTask,
env: &ExecutionEnvironment,
) -> Result<Command, String> {
let mut command = Command::new("sh");
if let Err(e) = tokio::fs::create_dir_all(&env.working_directory).await {
tracing::warn!(
"Failed to create working directory {}: {}",
env.working_directory,
e
);
}
command.current_dir(&env.working_directory);
for (key, value) in &env.environment_variables {
command.env(key, value);
}
let script_content = if cfg!(test) {
format!(
r#"echo "Test execution of agent {}" >&2
echo "DSL Source: {}" >&2
echo "Agent test execution completed" >&2"#,
task.agent_id, task.config.dsl_source
)
} else {
format!(
r#"#!/bin/bash
set -e
echo "Executing agent {}" >&2
echo "DSL Source:" >&2
echo "{}" >&2
# In a real implementation, this would compile and execute the DSL
sleep 1
echo "Agent execution completed" >&2"#,
task.agent_id, task.config.dsl_source
)
};
command.args(["-c", &script_content]);
Ok(command)
}
async fn monitor_process_execution(
process_handle: ProcessHandle,
context: &mut AgentExecutionContext,
) -> Result<(), String> {
let process_id = process_handle.process_id;
let resource_monitor = tokio::spawn(Self::monitor_process_resources(
process_id,
context.handle.clone(),
));
let mut child = process_handle.child.lock().await;
let exit_status = child
.wait()
.await
.map_err(|e| format!("Failed to wait for process: {}", e))?;
context.exit_code = exit_status.code();
resource_monitor.abort();
if exit_status.success() {
Ok(())
} else {
Err(format!(
"Process exited with code: {:?}",
exit_status.code()
))
}
}
async fn monitor_persistent_process(
process_handle: ProcessHandle,
context: AgentExecutionContext,
) -> Result<(), String> {
let process_id = process_handle.process_id;
let monitor = tokio::spawn(Self::monitor_process_resources(
process_id,
context.handle.clone(),
));
let mut check_interval = tokio::time::interval(Duration::from_secs(30));
loop {
check_interval.tick().await;
if !Self::is_process_running(process_id).await {
tracing::warn!(
"Persistent agent {} process {} died",
context.task.agent_id,
process_id
);
break;
}
if matches!(context.handle.get_status(), TaskStatus::Terminated) {
break;
}
}
monitor.abort();
Ok(())
}
async fn monitor_process_resources(process_id: u32, handle: TaskHandle) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut system = System::new();
loop {
interval.tick().await;
system.refresh_process(Pid::from(process_id as usize));
if let Some(process) = system.process(Pid::from(process_id as usize)) {
let memory_mb = process.memory() / 1024 / 1024;
let cpu_usage = process.cpu_usage();
handle.update_resource_usage(memory_mb as usize, cpu_usage);
tracing::trace!(
"Process {} - Memory: {}MB, CPU: {:.2}%",
process_id,
memory_mb,
cpu_usage
);
} else {
break;
}
}
}
async fn is_process_running(process_id: u32) -> bool {
let mut system = System::new();
system.refresh_process(Pid::from(process_id as usize));
system.process(Pid::from(process_id as usize)).is_some()
}
async fn terminate_process(process_id: u32) -> Result<(), String> {
let mut system = System::new();
system.refresh_process(Pid::from(process_id as usize));
if let Some(process) = system.process(Pid::from(process_id as usize)) {
if process.kill() {
Ok(())
} else {
Err("Failed to terminate process".to_string())
}
} else {
Ok(()) }
}
}
#[derive(Debug)]
enum TaskCommand {
Start {
task: Box<super::ScheduledTask>,
handle: TaskHandle,
},
Terminate {
agent_id: AgentId,
handle: TaskHandle,
},
}
#[derive(Debug, Clone)]
pub struct TaskHandle {
agent_id: AgentId,
status: Arc<RwLock<TaskStatus>>,
start_time: SystemTime,
metrics: Arc<RwLock<TaskMetrics>>,
}
impl TaskHandle {
fn new(task: super::ScheduledTask) -> Self {
Self {
agent_id: task.agent_id,
status: Arc::new(RwLock::new(TaskStatus::Pending)),
start_time: SystemTime::now(),
metrics: Arc::new(RwLock::new(TaskMetrics::new())),
}
}
fn set_status(&self, status: TaskStatus) {
*self.status.write() = status.clone();
self.metrics.write().update_status(&status);
}
fn get_status(&self) -> TaskStatus {
self.status.read().clone()
}
fn set_process_id(&self, process_id: Option<u32>) {
self.metrics.write().process_id = process_id;
}
fn get_process_id(&self) -> Option<u32> {
self.metrics.read().process_id
}
fn update_resource_usage(&self, memory_mb: usize, cpu_usage: f32) {
let mut metrics = self.metrics.write();
metrics.memory_usage = memory_mb * 1024 * 1024; metrics.cpu_usage = cpu_usage;
metrics.last_activity = SystemTime::now();
}
fn update_execution_metrics(&self, execution_metrics: ExecutionMetrics) {
let mut metrics = self.metrics.write();
metrics.execution_time = Some(execution_metrics.execution_time);
metrics.memory_peak_mb = execution_metrics.memory_peak_mb;
metrics.cpu_time_ms = execution_metrics.cpu_time_ms;
metrics.exit_code = execution_metrics.exit_code;
metrics.error_count = execution_metrics.error_count;
metrics.last_activity = SystemTime::now();
}
fn get_health(&self) -> TaskHealth {
let status = self.status.read().clone();
let uptime = SystemTime::now()
.duration_since(self.start_time)
.unwrap_or_default();
let metrics = self.metrics.read();
TaskHealth {
agent_id: self.agent_id,
status: status.clone(),
uptime,
is_healthy: matches!(
status,
TaskStatus::Running | TaskStatus::Pending | TaskStatus::Completed
),
memory_usage: metrics.memory_usage,
cpu_usage: metrics.cpu_usage,
last_activity: metrics.last_activity,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TaskStatus {
Pending,
Running,
Completed,
Failed,
TimedOut,
Terminated,
}
#[derive(Debug, Clone)]
pub struct TaskHealth {
pub agent_id: AgentId,
pub status: TaskStatus,
pub uptime: Duration,
pub is_healthy: bool,
pub memory_usage: usize,
pub cpu_usage: f32,
pub last_activity: SystemTime,
}
#[derive(Debug, Clone)]
struct TaskMetrics {
memory_usage: usize,
cpu_usage: f32,
last_activity: SystemTime,
status_changes: u32,
process_id: Option<u32>,
execution_time: Option<Duration>,
memory_peak_mb: usize,
cpu_time_ms: u64,
exit_code: Option<i32>,
error_count: u32,
}
impl TaskMetrics {
fn new() -> Self {
Self {
memory_usage: 0,
cpu_usage: 0.0,
last_activity: SystemTime::now(),
status_changes: 0,
process_id: None,
execution_time: None,
memory_peak_mb: 0,
cpu_time_ms: 0,
exit_code: None,
error_count: 0,
}
}
fn update_status(&mut self, _status: &TaskStatus) {
self.status_changes += 1;
self.last_activity = SystemTime::now();
}
}
#[derive(Debug, Clone)]
struct ExecutionMetrics {
execution_time: Duration,
memory_peak_mb: usize,
cpu_time_ms: u64,
exit_code: Option<i32>,
error_count: u32,
#[allow(dead_code)]
success: bool,
}
#[derive(Debug)]
struct AgentExecutionContext {
task: super::ScheduledTask,
handle: TaskHandle,
start_time: SystemTime,
memory_peak_mb: usize,
cpu_time_ms: u64,
exit_code: Option<i32>,
error_count: u32,
}
impl AgentExecutionContext {
fn new(task: super::ScheduledTask, handle: TaskHandle) -> Self {
Self {
task,
handle,
start_time: SystemTime::now(),
memory_peak_mb: 0,
cpu_time_ms: 0,
exit_code: None,
error_count: 0,
}
}
}
#[derive(Debug)]
struct ProcessHandle {
process_id: u32,
child: Arc<tokio::sync::Mutex<tokio::process::Child>>,
#[allow(dead_code)]
start_time: SystemTime,
}
#[derive(Debug, Clone)]
struct ExecutionEnvironment {
working_directory: String,
environment_variables: Vec<(String, String)>,
#[allow(dead_code)]
resource_limits: ResourceLimits,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct TaskStatistics {
pub total_tasks: usize,
pub healthy_tasks: usize,
pub average_uptime: Duration,
pub total_memory_usage: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{AgentConfig, ExecutionMode, Priority, ResourceLimits, SecurityTier};
use std::collections::HashMap;
fn create_test_task() -> super::super::ScheduledTask {
let agent_id = AgentId::new();
let config = AgentConfig {
id: agent_id,
name: "test".to_string(),
dsl_source: "test".to_string(),
execution_mode: ExecutionMode::Ephemeral,
security_tier: SecurityTier::Tier1,
resource_limits: ResourceLimits::default(),
capabilities: vec![],
policies: vec![],
metadata: HashMap::new(),
priority: Priority::Normal,
};
super::super::ScheduledTask::new(config)
}
#[tokio::test]
async fn test_task_start_and_health_check() {
let task_manager = TaskManager::new(Duration::from_secs(60));
let task = create_test_task();
let agent_id = task.agent_id;
let result = task_manager.start_task(task).await;
assert!(result.is_ok());
tokio::time::sleep(Duration::from_millis(50)).await;
let health = task_manager.check_task_health(agent_id).await;
assert!(health.is_ok());
let health = health.unwrap();
assert_eq!(health.agent_id, agent_id);
assert!(health.is_healthy);
}
#[tokio::test]
async fn test_task_termination() {
let task_manager = TaskManager::new(Duration::from_secs(60));
let task = create_test_task();
let agent_id = task.agent_id;
task_manager.start_task(task).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let result = task_manager.terminate_task(agent_id).await;
assert!(result.is_ok());
let health = task_manager.check_task_health(agent_id).await;
assert!(health.is_err());
}
#[tokio::test]
async fn test_task_statistics() {
let task_manager = TaskManager::new(Duration::from_secs(60));
for _ in 0..3 {
let task = create_test_task();
task_manager.start_task(task).await.unwrap();
}
tokio::time::sleep(Duration::from_millis(50)).await;
let stats = task_manager.get_task_statistics().await;
assert_eq!(stats.total_tasks, 3);
assert!(stats.healthy_tasks <= 3); }
}