Skip to main content

symbi_runtime/scheduler/
task_manager.rs

1//! Task manager for executing and monitoring agent tasks
2
3use parking_lot::RwLock;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, SystemTime};
7use sysinfo::{Pid, System};
8use tokio::process::Command;
9use tokio::sync::mpsc;
10use tokio::time::timeout;
11
12use crate::types::*;
13
14/// Task manager for agent execution
15pub struct TaskManager {
16    task_timeout: Duration,
17    running_tasks: Arc<RwLock<HashMap<AgentId, TaskHandle>>>,
18    task_sender: mpsc::Sender<TaskCommand>,
19    #[allow(dead_code)]
20    system_info: Arc<RwLock<System>>,
21}
22
23impl TaskManager {
24    /// Create a new task manager
25    pub fn new(task_timeout: Duration) -> Self {
26        let running_tasks = Arc::new(RwLock::new(HashMap::new()));
27        // Bounded channel prevents unbounded memory growth under load.
28        // 1024 is sufficient for most workloads; submitters block when full.
29        let (task_sender, task_receiver) = mpsc::channel(1024);
30
31        let manager = Self {
32            task_timeout,
33            running_tasks: running_tasks.clone(),
34            task_sender,
35            system_info: Arc::new(RwLock::new(System::new_all())),
36        };
37
38        // Start the task execution loop
39        manager.start_task_loop(task_receiver);
40
41        manager
42    }
43
44    /// Start a new task
45    pub async fn start_task(&self, task: super::ScheduledTask) -> Result<(), SchedulerError> {
46        let handle = TaskHandle::new(task.clone());
47        let agent_id = task.agent_id;
48
49        // Store the handle
50        self.running_tasks.write().insert(agent_id, handle.clone());
51
52        // Send command to start the task (try_send avoids blocking the caller;
53        // returns an error if the bounded channel is full).
54        self.task_sender
55            .try_send(TaskCommand::Start {
56                task: Box::new(task),
57                handle,
58            })
59            .map_err(|_| SchedulerError::SchedulingFailed {
60                agent_id,
61                reason: "Task channel full — backpressure applied".into(),
62            })?;
63
64        Ok(())
65    }
66
67    /// Terminate a task
68    pub async fn terminate_task(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
69        // Remove from running tasks
70        let handle = self.running_tasks.write().remove(&agent_id);
71
72        if let Some(handle) = handle {
73            // Send termination command (try_send to avoid blocking).
74            self.task_sender
75                .try_send(TaskCommand::Terminate { agent_id, handle })
76                .map_err(|_| SchedulerError::SchedulingFailed {
77                    agent_id,
78                    reason: "Task channel full — cannot send terminate command".into(),
79                })?;
80        }
81
82        Ok(())
83    }
84
85    /// Check task health
86    pub async fn check_task_health(&self, agent_id: AgentId) -> Result<TaskHealth, SchedulerError> {
87        let running_tasks = self.running_tasks.read();
88
89        if let Some(handle) = running_tasks.get(&agent_id) {
90            let health = handle.get_health();
91
92            // Check if task has exceeded timeout
93            if health.uptime > self.task_timeout {
94                return Err(SchedulerError::SchedulingFailed {
95                    agent_id,
96                    reason: "Task timeout exceeded".into(),
97                });
98            }
99
100            Ok(health)
101        } else {
102            Err(SchedulerError::AgentNotFound { agent_id })
103        }
104    }
105
106    /// Get task statistics
107    pub async fn get_task_statistics(&self) -> TaskStatistics {
108        let running_tasks = self.running_tasks.read();
109        let total_tasks = running_tasks.len();
110
111        let mut healthy_tasks = 0;
112        let mut total_uptime = Duration::from_secs(0);
113        let mut total_memory_usage = 0;
114
115        for handle in running_tasks.values() {
116            let health = handle.get_health();
117            if health.is_healthy {
118                healthy_tasks += 1;
119            }
120            total_uptime += health.uptime;
121            total_memory_usage += health.memory_usage;
122        }
123
124        TaskStatistics {
125            total_tasks,
126            healthy_tasks,
127            average_uptime: if total_tasks > 0 {
128                total_uptime / total_tasks as u32
129            } else {
130                Duration::from_secs(0)
131            },
132            total_memory_usage,
133        }
134    }
135
136    /// Start the task execution loop
137    fn start_task_loop(&self, mut task_receiver: mpsc::Receiver<TaskCommand>) {
138        let task_timeout = self.task_timeout;
139
140        tokio::spawn(async move {
141            while let Some(command) = task_receiver.recv().await {
142                match command {
143                    TaskCommand::Start { task, handle } => {
144                        Self::execute_task(*task, handle, task_timeout).await;
145                    }
146                    TaskCommand::Terminate { agent_id, handle } => {
147                        Self::terminate_task_execution(agent_id, handle).await;
148                    }
149                }
150            }
151        });
152    }
153
154    /// Execute a task
155    async fn execute_task(task: super::ScheduledTask, handle: TaskHandle, task_timeout: Duration) {
156        let agent_id = task.agent_id;
157
158        tracing::info!("Starting execution of agent {}", agent_id);
159
160        // Update handle status
161        handle.set_status(TaskStatus::Running);
162
163        // Execute the real agent task with proper monitoring and metrics
164        let execution_result = timeout(
165            task_timeout,
166            Self::execute_agent_task(task.clone(), handle.clone()),
167        )
168        .await;
169
170        match execution_result {
171            Ok(Ok(execution_metrics)) => {
172                tracing::info!("Agent {} completed successfully", agent_id);
173                handle.set_status(TaskStatus::Completed);
174                handle.update_execution_metrics(execution_metrics);
175            }
176            Ok(Err(e)) => {
177                tracing::error!("Agent {} failed: {}", agent_id, e);
178                handle.set_status(TaskStatus::Failed);
179            }
180            Err(_) => {
181                tracing::error!("Agent {} timed out", agent_id);
182                handle.set_status(TaskStatus::TimedOut);
183            }
184        }
185    }
186
187    /// Terminate task execution
188    async fn terminate_task_execution(agent_id: AgentId, handle: TaskHandle) {
189        tracing::info!("Terminating agent {}", agent_id);
190
191        // If there's a process associated with this task, terminate it
192        if let Some(process_id) = handle.get_process_id() {
193            if let Err(e) = Self::terminate_process(process_id).await {
194                tracing::warn!("Failed to terminate process {}: {}", process_id, e);
195            }
196        }
197
198        handle.set_status(TaskStatus::Terminated);
199    }
200
201    /// Execute a real agent task with comprehensive monitoring
202    async fn execute_agent_task(
203        task: super::ScheduledTask,
204        handle: TaskHandle,
205    ) -> Result<ExecutionMetrics, String> {
206        let _start_time = SystemTime::now();
207        let agent_id = task.agent_id;
208
209        tracing::debug!(
210            "Executing agent {} with config: {:?}",
211            agent_id,
212            task.config
213        );
214
215        // Create execution context
216        let execution_context = AgentExecutionContext::new(task.clone(), handle.clone());
217
218        // Execute based on agent execution mode
219        match task.config.execution_mode {
220            ExecutionMode::Ephemeral => Self::execute_ephemeral_agent(execution_context).await,
221            ExecutionMode::Persistent => Self::execute_persistent_agent(execution_context).await,
222            ExecutionMode::Scheduled { interval } => {
223                Self::execute_scheduled_agent(execution_context, interval).await
224            }
225            ExecutionMode::CronScheduled { .. } => {
226                Self::execute_cron_scheduled_agent(execution_context).await
227            }
228            ExecutionMode::EventDriven => Self::execute_event_driven_agent(execution_context).await,
229            ExecutionMode::External { .. } => {
230                Err("External agents are not executed by the runtime".to_string())
231            }
232        }
233    }
234
235    /// Execute an ephemeral agent (runs once and terminates)
236    async fn execute_ephemeral_agent(
237        mut context: AgentExecutionContext,
238    ) -> Result<ExecutionMetrics, String> {
239        tracing::debug!("Executing ephemeral agent {}", context.task.agent_id);
240
241        // Launch the agent process
242        let process_handle = Self::launch_agent_process(&context.task).await?;
243        context
244            .handle
245            .set_process_id(Some(process_handle.process_id));
246
247        // Monitor the process execution
248        let result = Self::monitor_process_execution(process_handle, &mut context).await;
249
250        // Collect final metrics
251        let end_time = SystemTime::now();
252        let execution_time = end_time
253            .duration_since(context.start_time)
254            .unwrap_or_default();
255
256        Ok(ExecutionMetrics {
257            execution_time,
258            memory_peak_mb: context.memory_peak_mb,
259            cpu_time_ms: context.cpu_time_ms,
260            exit_code: context.exit_code,
261            error_count: context.error_count,
262            success: result.is_ok(),
263        })
264    }
265
266    /// Execute a persistent agent (long-running)
267    async fn execute_persistent_agent(
268        context: AgentExecutionContext,
269    ) -> Result<ExecutionMetrics, String> {
270        tracing::debug!("Executing persistent agent {}", context.task.agent_id);
271
272        // Launch the agent process
273        let process_handle = Self::launch_agent_process(&context.task).await?;
274        context
275            .handle
276            .set_process_id(Some(process_handle.process_id));
277
278        // For persistent agents, we start monitoring but don't wait for completion
279        let _monitoring_handle =
280            tokio::spawn(
281                async move { Self::monitor_persistent_process(process_handle, context).await },
282            );
283
284        // Return immediately for persistent agents
285        Ok(ExecutionMetrics {
286            execution_time: Duration::from_secs(0),
287            memory_peak_mb: 0,
288            cpu_time_ms: 0,
289            exit_code: None,
290            error_count: 0,
291            success: true,
292        })
293    }
294
295    /// Execute a scheduled agent (single run — the CronScheduler handles recurrence).
296    async fn execute_scheduled_agent(
297        context: AgentExecutionContext,
298        interval: Duration,
299    ) -> Result<ExecutionMetrics, String> {
300        tracing::debug!(
301            "Executing scheduled agent {} (interval {:?}) as ephemeral run",
302            context.task.agent_id,
303            interval
304        );
305        // Execute exactly once and return metrics.
306        // The external CronScheduler is responsible for computing the next
307        // fire time and re-invoking.
308        Self::execute_ephemeral_agent(context).await
309    }
310
311    /// Execute a cron-scheduled agent (single run triggered by CronScheduler).
312    async fn execute_cron_scheduled_agent(
313        context: AgentExecutionContext,
314    ) -> Result<ExecutionMetrics, String> {
315        tracing::debug!("Executing cron-scheduled agent {}", context.task.agent_id,);
316        Self::execute_ephemeral_agent(context).await
317    }
318
319    /// Execute an event-driven agent
320    async fn execute_event_driven_agent(
321        context: AgentExecutionContext,
322    ) -> Result<ExecutionMetrics, String> {
323        tracing::debug!("Executing event-driven agent {}", context.task.agent_id);
324
325        // Event-driven agents are similar to persistent but triggered by events
326        Self::execute_persistent_agent(context).await
327    }
328
329    /// Launch an agent process
330    async fn launch_agent_process(task: &super::ScheduledTask) -> Result<ProcessHandle, String> {
331        let agent_id = task.agent_id;
332
333        // Create a secure execution environment based on security tier
334        let execution_env = Self::create_execution_environment(task)?;
335
336        // Build the command to execute the agent
337        let mut command = Self::build_agent_command(task, &execution_env).await?;
338
339        tracing::debug!("Launching agent {} with command: {:?}", agent_id, command);
340
341        // Spawn the process
342        let child = command
343            .spawn()
344            .map_err(|e| format!("Failed to spawn agent process: {}", e))?;
345
346        let process_id = child.id().unwrap_or(0);
347
348        Ok(ProcessHandle {
349            process_id,
350            child: Arc::new(tokio::sync::Mutex::new(child)),
351            start_time: SystemTime::now(),
352        })
353    }
354
355    /// Create execution environment for the agent
356    fn create_execution_environment(
357        task: &super::ScheduledTask,
358    ) -> Result<ExecutionEnvironment, String> {
359        use std::env;
360
361        // For tests, use a temporary directory that we know exists
362        let working_dir = if cfg!(test) {
363            env::temp_dir()
364                .join(format!("agent_{}", task.agent_id))
365                .to_string_lossy()
366                .to_string()
367        } else {
368            format!("/tmp/agent_{}", task.agent_id)
369        };
370
371        Ok(ExecutionEnvironment {
372            working_directory: working_dir,
373            environment_variables: vec![
374                ("AGENT_ID".to_string(), task.agent_id.to_string()),
375                (
376                    "SECURITY_TIER".to_string(),
377                    format!("{:?}", task.config.security_tier),
378                ),
379            ],
380            resource_limits: task.config.resource_limits.clone(),
381        })
382    }
383
384    /// Build the command to execute the agent
385    async fn build_agent_command(
386        task: &super::ScheduledTask,
387        env: &ExecutionEnvironment,
388    ) -> Result<Command, String> {
389        let mut command = Command::new("sh");
390
391        // Create the working directory if it doesn't exist
392        if let Err(e) = tokio::fs::create_dir_all(&env.working_directory).await {
393            tracing::warn!(
394                "Failed to create working directory {}: {}",
395                env.working_directory,
396                e
397            );
398        }
399
400        // Set working directory
401        command.current_dir(&env.working_directory);
402
403        // Set environment variables
404        for (key, value) in &env.environment_variables {
405            command.env(key, value);
406        }
407
408        // Create a script that executes the agent DSL
409        let script_content = if cfg!(test) {
410            // Simplified script for testing that should always succeed
411            format!(
412                r#"echo "Test execution of agent {}" >&2
413echo "DSL Source: {}" >&2
414echo "Agent test execution completed" >&2"#,
415                task.agent_id, task.config.dsl_source
416            )
417        } else {
418            format!(
419                r#"#!/bin/bash
420set -e
421echo "Executing agent {}" >&2
422echo "DSL Source:" >&2
423echo "{}" >&2
424# In a real implementation, this would compile and execute the DSL
425sleep 1
426echo "Agent execution completed" >&2"#,
427                task.agent_id, task.config.dsl_source
428            )
429        };
430
431        command.args(["-c", &script_content]);
432
433        Ok(command)
434    }
435
436    /// Monitor process execution
437    async fn monitor_process_execution(
438        process_handle: ProcessHandle,
439        context: &mut AgentExecutionContext,
440    ) -> Result<(), String> {
441        let process_id = process_handle.process_id;
442
443        // Start resource monitoring
444        let resource_monitor = tokio::spawn(Self::monitor_process_resources(
445            process_id,
446            context.handle.clone(),
447        ));
448
449        // Wait for process completion
450        let mut child = process_handle.child.lock().await;
451        let exit_status = child
452            .wait()
453            .await
454            .map_err(|e| format!("Failed to wait for process: {}", e))?;
455
456        context.exit_code = exit_status.code();
457
458        // Stop resource monitoring
459        resource_monitor.abort();
460
461        if exit_status.success() {
462            Ok(())
463        } else {
464            Err(format!(
465                "Process exited with code: {:?}",
466                exit_status.code()
467            ))
468        }
469    }
470
471    /// Monitor persistent process
472    async fn monitor_persistent_process(
473        process_handle: ProcessHandle,
474        context: AgentExecutionContext,
475    ) -> Result<(), String> {
476        let process_id = process_handle.process_id;
477
478        // Continuous monitoring for persistent agents
479        let monitor = tokio::spawn(Self::monitor_process_resources(
480            process_id,
481            context.handle.clone(),
482        ));
483
484        // Check if process is still running periodically
485        let mut check_interval = tokio::time::interval(Duration::from_secs(30));
486
487        loop {
488            check_interval.tick().await;
489
490            if !Self::is_process_running(process_id).await {
491                tracing::warn!(
492                    "Persistent agent {} process {} died",
493                    context.task.agent_id,
494                    process_id
495                );
496                break;
497            }
498
499            // Check if termination was requested
500            if matches!(context.handle.get_status(), TaskStatus::Terminated) {
501                break;
502            }
503        }
504
505        monitor.abort();
506        Ok(())
507    }
508
509    /// Monitor process resources
510    async fn monitor_process_resources(process_id: u32, handle: TaskHandle) {
511        let mut interval = tokio::time::interval(Duration::from_secs(5));
512        let mut system = System::new();
513
514        loop {
515            interval.tick().await;
516
517            system.refresh_process(Pid::from(process_id as usize));
518
519            if let Some(process) = system.process(Pid::from(process_id as usize)) {
520                let memory_mb = process.memory() / 1024 / 1024;
521                let cpu_usage = process.cpu_usage();
522
523                handle.update_resource_usage(memory_mb as usize, cpu_usage);
524
525                tracing::trace!(
526                    "Process {} - Memory: {}MB, CPU: {:.2}%",
527                    process_id,
528                    memory_mb,
529                    cpu_usage
530                );
531            } else {
532                // Process no longer exists
533                break;
534            }
535        }
536    }
537
538    /// Check if process is still running
539    async fn is_process_running(process_id: u32) -> bool {
540        let mut system = System::new();
541        system.refresh_process(Pid::from(process_id as usize));
542        system.process(Pid::from(process_id as usize)).is_some()
543    }
544
545    /// Terminate a process
546    async fn terminate_process(process_id: u32) -> Result<(), String> {
547        let mut system = System::new();
548        system.refresh_process(Pid::from(process_id as usize));
549
550        if let Some(process) = system.process(Pid::from(process_id as usize)) {
551            if process.kill() {
552                Ok(())
553            } else {
554                Err("Failed to terminate process".to_string())
555            }
556        } else {
557            Ok(()) // Process already terminated
558        }
559    }
560}
561
562/// Task command for the execution loop
563#[derive(Debug)]
564enum TaskCommand {
565    Start {
566        task: Box<super::ScheduledTask>,
567        handle: TaskHandle,
568    },
569    Terminate {
570        agent_id: AgentId,
571        handle: TaskHandle,
572    },
573}
574
575/// Handle for a running task
576#[derive(Debug, Clone)]
577pub struct TaskHandle {
578    agent_id: AgentId,
579    status: Arc<RwLock<TaskStatus>>,
580    start_time: SystemTime,
581    metrics: Arc<RwLock<TaskMetrics>>,
582}
583
584impl TaskHandle {
585    fn new(task: super::ScheduledTask) -> Self {
586        Self {
587            agent_id: task.agent_id,
588            status: Arc::new(RwLock::new(TaskStatus::Pending)),
589            start_time: SystemTime::now(),
590            metrics: Arc::new(RwLock::new(TaskMetrics::new())),
591        }
592    }
593
594    fn set_status(&self, status: TaskStatus) {
595        *self.status.write() = status.clone();
596        self.metrics.write().update_status(&status);
597    }
598
599    fn get_status(&self) -> TaskStatus {
600        self.status.read().clone()
601    }
602
603    fn set_process_id(&self, process_id: Option<u32>) {
604        self.metrics.write().process_id = process_id;
605    }
606
607    fn get_process_id(&self) -> Option<u32> {
608        self.metrics.read().process_id
609    }
610
611    fn update_resource_usage(&self, memory_mb: usize, cpu_usage: f32) {
612        let mut metrics = self.metrics.write();
613        metrics.memory_usage = memory_mb * 1024 * 1024; // Convert to bytes
614        metrics.cpu_usage = cpu_usage;
615        metrics.last_activity = SystemTime::now();
616    }
617
618    fn update_execution_metrics(&self, execution_metrics: ExecutionMetrics) {
619        let mut metrics = self.metrics.write();
620        metrics.execution_time = Some(execution_metrics.execution_time);
621        metrics.memory_peak_mb = execution_metrics.memory_peak_mb;
622        metrics.cpu_time_ms = execution_metrics.cpu_time_ms;
623        metrics.exit_code = execution_metrics.exit_code;
624        metrics.error_count = execution_metrics.error_count;
625        metrics.last_activity = SystemTime::now();
626    }
627
628    fn get_health(&self) -> TaskHealth {
629        let status = self.status.read().clone();
630        let uptime = SystemTime::now()
631            .duration_since(self.start_time)
632            .unwrap_or_default();
633        let metrics = self.metrics.read();
634
635        TaskHealth {
636            agent_id: self.agent_id,
637            status: status.clone(),
638            uptime,
639            is_healthy: matches!(
640                status,
641                TaskStatus::Running | TaskStatus::Pending | TaskStatus::Completed
642            ),
643            memory_usage: metrics.memory_usage,
644            cpu_usage: metrics.cpu_usage,
645            last_activity: metrics.last_activity,
646        }
647    }
648}
649
650/// Task execution status
651#[derive(Debug, Clone, PartialEq, Eq)]
652pub enum TaskStatus {
653    Pending,
654    Running,
655    Completed,
656    Failed,
657    TimedOut,
658    Terminated,
659}
660
661/// Task health information
662#[derive(Debug, Clone)]
663pub struct TaskHealth {
664    pub agent_id: AgentId,
665    pub status: TaskStatus,
666    pub uptime: Duration,
667    pub is_healthy: bool,
668    pub memory_usage: usize,
669    pub cpu_usage: f32,
670    pub last_activity: SystemTime,
671}
672
673/// Task metrics for monitoring
674#[derive(Debug, Clone)]
675struct TaskMetrics {
676    memory_usage: usize,
677    cpu_usage: f32,
678    last_activity: SystemTime,
679    status_changes: u32,
680    process_id: Option<u32>,
681    execution_time: Option<Duration>,
682    memory_peak_mb: usize,
683    cpu_time_ms: u64,
684    exit_code: Option<i32>,
685    error_count: u32,
686}
687
688impl TaskMetrics {
689    fn new() -> Self {
690        Self {
691            memory_usage: 0,
692            cpu_usage: 0.0,
693            last_activity: SystemTime::now(),
694            status_changes: 0,
695            process_id: None,
696            execution_time: None,
697            memory_peak_mb: 0,
698            cpu_time_ms: 0,
699            exit_code: None,
700            error_count: 0,
701        }
702    }
703
704    fn update_status(&mut self, _status: &TaskStatus) {
705        self.status_changes += 1;
706        self.last_activity = SystemTime::now();
707    }
708}
709
710/// Metrics collected during agent execution
711#[derive(Debug, Clone)]
712struct ExecutionMetrics {
713    execution_time: Duration,
714    memory_peak_mb: usize,
715    cpu_time_ms: u64,
716    exit_code: Option<i32>,
717    error_count: u32,
718    #[allow(dead_code)]
719    success: bool,
720}
721
722/// Context for agent execution
723#[derive(Debug)]
724struct AgentExecutionContext {
725    task: super::ScheduledTask,
726    handle: TaskHandle,
727    start_time: SystemTime,
728    memory_peak_mb: usize,
729    cpu_time_ms: u64,
730    exit_code: Option<i32>,
731    error_count: u32,
732}
733
734impl AgentExecutionContext {
735    fn new(task: super::ScheduledTask, handle: TaskHandle) -> Self {
736        Self {
737            task,
738            handle,
739            start_time: SystemTime::now(),
740            memory_peak_mb: 0,
741            cpu_time_ms: 0,
742            exit_code: None,
743            error_count: 0,
744        }
745    }
746}
747
748/// Handle to a running process
749#[derive(Debug)]
750struct ProcessHandle {
751    process_id: u32,
752    child: Arc<tokio::sync::Mutex<tokio::process::Child>>,
753    #[allow(dead_code)]
754    start_time: SystemTime,
755}
756
757/// Execution environment configuration
758#[derive(Debug, Clone)]
759struct ExecutionEnvironment {
760    working_directory: String,
761    environment_variables: Vec<(String, String)>,
762    #[allow(dead_code)]
763    resource_limits: ResourceLimits,
764}
765
766/// Task statistics for monitoring
767#[derive(Debug, Clone, serde::Serialize)]
768pub struct TaskStatistics {
769    pub total_tasks: usize,
770    pub healthy_tasks: usize,
771    pub average_uptime: Duration,
772    pub total_memory_usage: usize,
773}
774
775#[cfg(test)]
776mod tests {
777    use super::*;
778    use crate::types::{AgentConfig, ExecutionMode, Priority, ResourceLimits, SecurityTier};
779    use std::collections::HashMap;
780
781    fn create_test_task() -> super::super::ScheduledTask {
782        let agent_id = AgentId::new();
783        let config = AgentConfig {
784            id: agent_id,
785            name: "test".to_string(),
786            dsl_source: "test".to_string(),
787            execution_mode: ExecutionMode::Ephemeral,
788            security_tier: SecurityTier::Tier1,
789            resource_limits: ResourceLimits::default(),
790            capabilities: vec![],
791            policies: vec![],
792            metadata: HashMap::new(),
793            priority: Priority::Normal,
794        };
795        super::super::ScheduledTask::new(config)
796    }
797
798    #[tokio::test]
799    async fn test_task_start_and_health_check() {
800        let task_manager = TaskManager::new(Duration::from_secs(60));
801        let task = create_test_task();
802        let agent_id = task.agent_id;
803
804        // Start the task
805        let result = task_manager.start_task(task).await;
806        assert!(result.is_ok());
807
808        // Give it a moment to start
809        tokio::time::sleep(Duration::from_millis(50)).await;
810
811        // Check health
812        let health = task_manager.check_task_health(agent_id).await;
813        assert!(health.is_ok());
814
815        let health = health.unwrap();
816        assert_eq!(health.agent_id, agent_id);
817        assert!(health.is_healthy);
818    }
819
820    #[tokio::test]
821    async fn test_task_termination() {
822        let task_manager = TaskManager::new(Duration::from_secs(60));
823        let task = create_test_task();
824        let agent_id = task.agent_id;
825
826        // Start the task
827        task_manager.start_task(task).await.unwrap();
828
829        // Give it a moment to start
830        tokio::time::sleep(Duration::from_millis(50)).await;
831
832        // Terminate the task
833        let result = task_manager.terminate_task(agent_id).await;
834        assert!(result.is_ok());
835
836        // Check that health check fails for terminated task
837        let health = task_manager.check_task_health(agent_id).await;
838        assert!(health.is_err());
839    }
840
841    #[tokio::test]
842    async fn test_task_statistics() {
843        let task_manager = TaskManager::new(Duration::from_secs(60));
844
845        // Start multiple tasks
846        for _ in 0..3 {
847            let task = create_test_task();
848            task_manager.start_task(task).await.unwrap();
849        }
850
851        // Give them a moment to start
852        tokio::time::sleep(Duration::from_millis(50)).await;
853
854        let stats = task_manager.get_task_statistics().await;
855        assert_eq!(stats.total_tasks, 3);
856        assert!(stats.healthy_tasks <= 3); // Some might have completed already
857    }
858}