1use parking_lot::RwLock;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, SystemTime};
7use sysinfo::{Pid, System};
8use tokio::process::Command;
9use tokio::sync::mpsc;
10use tokio::time::timeout;
11
12use crate::types::*;
13
14pub struct TaskManager {
16 task_timeout: Duration,
17 running_tasks: Arc<RwLock<HashMap<AgentId, TaskHandle>>>,
18 task_sender: mpsc::Sender<TaskCommand>,
19 #[allow(dead_code)]
20 system_info: Arc<RwLock<System>>,
21}
22
23impl TaskManager {
24 pub fn new(task_timeout: Duration) -> Self {
26 let running_tasks = Arc::new(RwLock::new(HashMap::new()));
27 let (task_sender, task_receiver) = mpsc::channel(1024);
30
31 let manager = Self {
32 task_timeout,
33 running_tasks: running_tasks.clone(),
34 task_sender,
35 system_info: Arc::new(RwLock::new(System::new_all())),
36 };
37
38 manager.start_task_loop(task_receiver);
40
41 manager
42 }
43
44 pub async fn start_task(&self, task: super::ScheduledTask) -> Result<(), SchedulerError> {
46 let handle = TaskHandle::new(task.clone());
47 let agent_id = task.agent_id;
48
49 self.running_tasks.write().insert(agent_id, handle.clone());
51
52 self.task_sender
55 .try_send(TaskCommand::Start {
56 task: Box::new(task),
57 handle,
58 })
59 .map_err(|_| SchedulerError::SchedulingFailed {
60 agent_id,
61 reason: "Task channel full — backpressure applied".into(),
62 })?;
63
64 Ok(())
65 }
66
67 pub async fn terminate_task(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
69 let handle = self.running_tasks.write().remove(&agent_id);
71
72 if let Some(handle) = handle {
73 self.task_sender
75 .try_send(TaskCommand::Terminate { agent_id, handle })
76 .map_err(|_| SchedulerError::SchedulingFailed {
77 agent_id,
78 reason: "Task channel full — cannot send terminate command".into(),
79 })?;
80 }
81
82 Ok(())
83 }
84
85 pub async fn check_task_health(&self, agent_id: AgentId) -> Result<TaskHealth, SchedulerError> {
87 let running_tasks = self.running_tasks.read();
88
89 if let Some(handle) = running_tasks.get(&agent_id) {
90 let health = handle.get_health();
91
92 if health.uptime > self.task_timeout {
94 return Err(SchedulerError::SchedulingFailed {
95 agent_id,
96 reason: "Task timeout exceeded".into(),
97 });
98 }
99
100 Ok(health)
101 } else {
102 Err(SchedulerError::AgentNotFound { agent_id })
103 }
104 }
105
106 pub async fn get_task_statistics(&self) -> TaskStatistics {
108 let running_tasks = self.running_tasks.read();
109 let total_tasks = running_tasks.len();
110
111 let mut healthy_tasks = 0;
112 let mut total_uptime = Duration::from_secs(0);
113 let mut total_memory_usage = 0;
114
115 for handle in running_tasks.values() {
116 let health = handle.get_health();
117 if health.is_healthy {
118 healthy_tasks += 1;
119 }
120 total_uptime += health.uptime;
121 total_memory_usage += health.memory_usage;
122 }
123
124 TaskStatistics {
125 total_tasks,
126 healthy_tasks,
127 average_uptime: if total_tasks > 0 {
128 total_uptime / total_tasks as u32
129 } else {
130 Duration::from_secs(0)
131 },
132 total_memory_usage,
133 }
134 }
135
136 fn start_task_loop(&self, mut task_receiver: mpsc::Receiver<TaskCommand>) {
138 let task_timeout = self.task_timeout;
139
140 tokio::spawn(async move {
141 while let Some(command) = task_receiver.recv().await {
142 match command {
143 TaskCommand::Start { task, handle } => {
144 Self::execute_task(*task, handle, task_timeout).await;
145 }
146 TaskCommand::Terminate { agent_id, handle } => {
147 Self::terminate_task_execution(agent_id, handle).await;
148 }
149 }
150 }
151 });
152 }
153
154 async fn execute_task(task: super::ScheduledTask, handle: TaskHandle, task_timeout: Duration) {
156 let agent_id = task.agent_id;
157
158 tracing::info!("Starting execution of agent {}", agent_id);
159
160 handle.set_status(TaskStatus::Running);
162
163 let execution_result = timeout(
165 task_timeout,
166 Self::execute_agent_task(task.clone(), handle.clone()),
167 )
168 .await;
169
170 match execution_result {
171 Ok(Ok(execution_metrics)) => {
172 tracing::info!("Agent {} completed successfully", agent_id);
173 handle.set_status(TaskStatus::Completed);
174 handle.update_execution_metrics(execution_metrics);
175 }
176 Ok(Err(e)) => {
177 tracing::error!("Agent {} failed: {}", agent_id, e);
178 handle.set_status(TaskStatus::Failed);
179 }
180 Err(_) => {
181 tracing::error!("Agent {} timed out", agent_id);
182 handle.set_status(TaskStatus::TimedOut);
183 }
184 }
185 }
186
187 async fn terminate_task_execution(agent_id: AgentId, handle: TaskHandle) {
189 tracing::info!("Terminating agent {}", agent_id);
190
191 if let Some(process_id) = handle.get_process_id() {
193 if let Err(e) = Self::terminate_process(process_id).await {
194 tracing::warn!("Failed to terminate process {}: {}", process_id, e);
195 }
196 }
197
198 handle.set_status(TaskStatus::Terminated);
199 }
200
201 async fn execute_agent_task(
203 task: super::ScheduledTask,
204 handle: TaskHandle,
205 ) -> Result<ExecutionMetrics, String> {
206 let _start_time = SystemTime::now();
207 let agent_id = task.agent_id;
208
209 tracing::debug!(
210 "Executing agent {} with config: {:?}",
211 agent_id,
212 task.config
213 );
214
215 let execution_context = AgentExecutionContext::new(task.clone(), handle.clone());
217
218 match task.config.execution_mode {
220 ExecutionMode::Ephemeral => Self::execute_ephemeral_agent(execution_context).await,
221 ExecutionMode::Persistent => Self::execute_persistent_agent(execution_context).await,
222 ExecutionMode::Scheduled { interval } => {
223 Self::execute_scheduled_agent(execution_context, interval).await
224 }
225 ExecutionMode::CronScheduled { .. } => {
226 Self::execute_cron_scheduled_agent(execution_context).await
227 }
228 ExecutionMode::EventDriven => Self::execute_event_driven_agent(execution_context).await,
229 ExecutionMode::External { .. } => {
230 Err("External agents are not executed by the runtime".to_string())
231 }
232 }
233 }
234
235 async fn execute_ephemeral_agent(
237 mut context: AgentExecutionContext,
238 ) -> Result<ExecutionMetrics, String> {
239 tracing::debug!("Executing ephemeral agent {}", context.task.agent_id);
240
241 let process_handle = Self::launch_agent_process(&context.task).await?;
243 context
244 .handle
245 .set_process_id(Some(process_handle.process_id));
246
247 let result = Self::monitor_process_execution(process_handle, &mut context).await;
249
250 let end_time = SystemTime::now();
252 let execution_time = end_time
253 .duration_since(context.start_time)
254 .unwrap_or_default();
255
256 Ok(ExecutionMetrics {
257 execution_time,
258 memory_peak_mb: context.memory_peak_mb,
259 cpu_time_ms: context.cpu_time_ms,
260 exit_code: context.exit_code,
261 error_count: context.error_count,
262 success: result.is_ok(),
263 })
264 }
265
266 async fn execute_persistent_agent(
268 context: AgentExecutionContext,
269 ) -> Result<ExecutionMetrics, String> {
270 tracing::debug!("Executing persistent agent {}", context.task.agent_id);
271
272 let process_handle = Self::launch_agent_process(&context.task).await?;
274 context
275 .handle
276 .set_process_id(Some(process_handle.process_id));
277
278 let _monitoring_handle =
280 tokio::spawn(
281 async move { Self::monitor_persistent_process(process_handle, context).await },
282 );
283
284 Ok(ExecutionMetrics {
286 execution_time: Duration::from_secs(0),
287 memory_peak_mb: 0,
288 cpu_time_ms: 0,
289 exit_code: None,
290 error_count: 0,
291 success: true,
292 })
293 }
294
295 async fn execute_scheduled_agent(
297 context: AgentExecutionContext,
298 interval: Duration,
299 ) -> Result<ExecutionMetrics, String> {
300 tracing::debug!(
301 "Executing scheduled agent {} (interval {:?}) as ephemeral run",
302 context.task.agent_id,
303 interval
304 );
305 Self::execute_ephemeral_agent(context).await
309 }
310
311 async fn execute_cron_scheduled_agent(
313 context: AgentExecutionContext,
314 ) -> Result<ExecutionMetrics, String> {
315 tracing::debug!("Executing cron-scheduled agent {}", context.task.agent_id,);
316 Self::execute_ephemeral_agent(context).await
317 }
318
319 async fn execute_event_driven_agent(
321 context: AgentExecutionContext,
322 ) -> Result<ExecutionMetrics, String> {
323 tracing::debug!("Executing event-driven agent {}", context.task.agent_id);
324
325 Self::execute_persistent_agent(context).await
327 }
328
329 async fn launch_agent_process(task: &super::ScheduledTask) -> Result<ProcessHandle, String> {
331 let agent_id = task.agent_id;
332
333 let execution_env = Self::create_execution_environment(task)?;
335
336 let mut command = Self::build_agent_command(task, &execution_env).await?;
338
339 tracing::debug!("Launching agent {} with command: {:?}", agent_id, command);
340
341 let child = command
343 .spawn()
344 .map_err(|e| format!("Failed to spawn agent process: {}", e))?;
345
346 let process_id = child.id().unwrap_or(0);
347
348 Ok(ProcessHandle {
349 process_id,
350 child: Arc::new(tokio::sync::Mutex::new(child)),
351 start_time: SystemTime::now(),
352 })
353 }
354
355 fn create_execution_environment(
357 task: &super::ScheduledTask,
358 ) -> Result<ExecutionEnvironment, String> {
359 use std::env;
360
361 let working_dir = if cfg!(test) {
363 env::temp_dir()
364 .join(format!("agent_{}", task.agent_id))
365 .to_string_lossy()
366 .to_string()
367 } else {
368 format!("/tmp/agent_{}", task.agent_id)
369 };
370
371 Ok(ExecutionEnvironment {
372 working_directory: working_dir,
373 environment_variables: vec![
374 ("AGENT_ID".to_string(), task.agent_id.to_string()),
375 (
376 "SECURITY_TIER".to_string(),
377 format!("{:?}", task.config.security_tier),
378 ),
379 ],
380 resource_limits: task.config.resource_limits.clone(),
381 })
382 }
383
384 async fn build_agent_command(
386 task: &super::ScheduledTask,
387 env: &ExecutionEnvironment,
388 ) -> Result<Command, String> {
389 let mut command = Command::new("sh");
390
391 if let Err(e) = tokio::fs::create_dir_all(&env.working_directory).await {
393 tracing::warn!(
394 "Failed to create working directory {}: {}",
395 env.working_directory,
396 e
397 );
398 }
399
400 command.current_dir(&env.working_directory);
402
403 for (key, value) in &env.environment_variables {
405 command.env(key, value);
406 }
407
408 let script_content = if cfg!(test) {
410 format!(
412 r#"echo "Test execution of agent {}" >&2
413echo "DSL Source: {}" >&2
414echo "Agent test execution completed" >&2"#,
415 task.agent_id, task.config.dsl_source
416 )
417 } else {
418 format!(
419 r#"#!/bin/bash
420set -e
421echo "Executing agent {}" >&2
422echo "DSL Source:" >&2
423echo "{}" >&2
424# In a real implementation, this would compile and execute the DSL
425sleep 1
426echo "Agent execution completed" >&2"#,
427 task.agent_id, task.config.dsl_source
428 )
429 };
430
431 command.args(["-c", &script_content]);
432
433 Ok(command)
434 }
435
436 async fn monitor_process_execution(
438 process_handle: ProcessHandle,
439 context: &mut AgentExecutionContext,
440 ) -> Result<(), String> {
441 let process_id = process_handle.process_id;
442
443 let resource_monitor = tokio::spawn(Self::monitor_process_resources(
445 process_id,
446 context.handle.clone(),
447 ));
448
449 let mut child = process_handle.child.lock().await;
451 let exit_status = child
452 .wait()
453 .await
454 .map_err(|e| format!("Failed to wait for process: {}", e))?;
455
456 context.exit_code = exit_status.code();
457
458 resource_monitor.abort();
460
461 if exit_status.success() {
462 Ok(())
463 } else {
464 Err(format!(
465 "Process exited with code: {:?}",
466 exit_status.code()
467 ))
468 }
469 }
470
471 async fn monitor_persistent_process(
473 process_handle: ProcessHandle,
474 context: AgentExecutionContext,
475 ) -> Result<(), String> {
476 let process_id = process_handle.process_id;
477
478 let monitor = tokio::spawn(Self::monitor_process_resources(
480 process_id,
481 context.handle.clone(),
482 ));
483
484 let mut check_interval = tokio::time::interval(Duration::from_secs(30));
486
487 loop {
488 check_interval.tick().await;
489
490 if !Self::is_process_running(process_id).await {
491 tracing::warn!(
492 "Persistent agent {} process {} died",
493 context.task.agent_id,
494 process_id
495 );
496 break;
497 }
498
499 if matches!(context.handle.get_status(), TaskStatus::Terminated) {
501 break;
502 }
503 }
504
505 monitor.abort();
506 Ok(())
507 }
508
509 async fn monitor_process_resources(process_id: u32, handle: TaskHandle) {
511 let mut interval = tokio::time::interval(Duration::from_secs(5));
512 let mut system = System::new();
513
514 loop {
515 interval.tick().await;
516
517 system.refresh_process(Pid::from(process_id as usize));
518
519 if let Some(process) = system.process(Pid::from(process_id as usize)) {
520 let memory_mb = process.memory() / 1024 / 1024;
521 let cpu_usage = process.cpu_usage();
522
523 handle.update_resource_usage(memory_mb as usize, cpu_usage);
524
525 tracing::trace!(
526 "Process {} - Memory: {}MB, CPU: {:.2}%",
527 process_id,
528 memory_mb,
529 cpu_usage
530 );
531 } else {
532 break;
534 }
535 }
536 }
537
538 async fn is_process_running(process_id: u32) -> bool {
540 let mut system = System::new();
541 system.refresh_process(Pid::from(process_id as usize));
542 system.process(Pid::from(process_id as usize)).is_some()
543 }
544
545 async fn terminate_process(process_id: u32) -> Result<(), String> {
547 let mut system = System::new();
548 system.refresh_process(Pid::from(process_id as usize));
549
550 if let Some(process) = system.process(Pid::from(process_id as usize)) {
551 if process.kill() {
552 Ok(())
553 } else {
554 Err("Failed to terminate process".to_string())
555 }
556 } else {
557 Ok(()) }
559 }
560}
561
562#[derive(Debug)]
564enum TaskCommand {
565 Start {
566 task: Box<super::ScheduledTask>,
567 handle: TaskHandle,
568 },
569 Terminate {
570 agent_id: AgentId,
571 handle: TaskHandle,
572 },
573}
574
575#[derive(Debug, Clone)]
577pub struct TaskHandle {
578 agent_id: AgentId,
579 status: Arc<RwLock<TaskStatus>>,
580 start_time: SystemTime,
581 metrics: Arc<RwLock<TaskMetrics>>,
582}
583
584impl TaskHandle {
585 fn new(task: super::ScheduledTask) -> Self {
586 Self {
587 agent_id: task.agent_id,
588 status: Arc::new(RwLock::new(TaskStatus::Pending)),
589 start_time: SystemTime::now(),
590 metrics: Arc::new(RwLock::new(TaskMetrics::new())),
591 }
592 }
593
594 fn set_status(&self, status: TaskStatus) {
595 *self.status.write() = status.clone();
596 self.metrics.write().update_status(&status);
597 }
598
599 fn get_status(&self) -> TaskStatus {
600 self.status.read().clone()
601 }
602
603 fn set_process_id(&self, process_id: Option<u32>) {
604 self.metrics.write().process_id = process_id;
605 }
606
607 fn get_process_id(&self) -> Option<u32> {
608 self.metrics.read().process_id
609 }
610
611 fn update_resource_usage(&self, memory_mb: usize, cpu_usage: f32) {
612 let mut metrics = self.metrics.write();
613 metrics.memory_usage = memory_mb * 1024 * 1024; metrics.cpu_usage = cpu_usage;
615 metrics.last_activity = SystemTime::now();
616 }
617
618 fn update_execution_metrics(&self, execution_metrics: ExecutionMetrics) {
619 let mut metrics = self.metrics.write();
620 metrics.execution_time = Some(execution_metrics.execution_time);
621 metrics.memory_peak_mb = execution_metrics.memory_peak_mb;
622 metrics.cpu_time_ms = execution_metrics.cpu_time_ms;
623 metrics.exit_code = execution_metrics.exit_code;
624 metrics.error_count = execution_metrics.error_count;
625 metrics.last_activity = SystemTime::now();
626 }
627
628 fn get_health(&self) -> TaskHealth {
629 let status = self.status.read().clone();
630 let uptime = SystemTime::now()
631 .duration_since(self.start_time)
632 .unwrap_or_default();
633 let metrics = self.metrics.read();
634
635 TaskHealth {
636 agent_id: self.agent_id,
637 status: status.clone(),
638 uptime,
639 is_healthy: matches!(
640 status,
641 TaskStatus::Running | TaskStatus::Pending | TaskStatus::Completed
642 ),
643 memory_usage: metrics.memory_usage,
644 cpu_usage: metrics.cpu_usage,
645 last_activity: metrics.last_activity,
646 }
647 }
648}
649
650#[derive(Debug, Clone, PartialEq, Eq)]
652pub enum TaskStatus {
653 Pending,
654 Running,
655 Completed,
656 Failed,
657 TimedOut,
658 Terminated,
659}
660
661#[derive(Debug, Clone)]
663pub struct TaskHealth {
664 pub agent_id: AgentId,
665 pub status: TaskStatus,
666 pub uptime: Duration,
667 pub is_healthy: bool,
668 pub memory_usage: usize,
669 pub cpu_usage: f32,
670 pub last_activity: SystemTime,
671}
672
673#[derive(Debug, Clone)]
675struct TaskMetrics {
676 memory_usage: usize,
677 cpu_usage: f32,
678 last_activity: SystemTime,
679 status_changes: u32,
680 process_id: Option<u32>,
681 execution_time: Option<Duration>,
682 memory_peak_mb: usize,
683 cpu_time_ms: u64,
684 exit_code: Option<i32>,
685 error_count: u32,
686}
687
688impl TaskMetrics {
689 fn new() -> Self {
690 Self {
691 memory_usage: 0,
692 cpu_usage: 0.0,
693 last_activity: SystemTime::now(),
694 status_changes: 0,
695 process_id: None,
696 execution_time: None,
697 memory_peak_mb: 0,
698 cpu_time_ms: 0,
699 exit_code: None,
700 error_count: 0,
701 }
702 }
703
704 fn update_status(&mut self, _status: &TaskStatus) {
705 self.status_changes += 1;
706 self.last_activity = SystemTime::now();
707 }
708}
709
710#[derive(Debug, Clone)]
712struct ExecutionMetrics {
713 execution_time: Duration,
714 memory_peak_mb: usize,
715 cpu_time_ms: u64,
716 exit_code: Option<i32>,
717 error_count: u32,
718 #[allow(dead_code)]
719 success: bool,
720}
721
722#[derive(Debug)]
724struct AgentExecutionContext {
725 task: super::ScheduledTask,
726 handle: TaskHandle,
727 start_time: SystemTime,
728 memory_peak_mb: usize,
729 cpu_time_ms: u64,
730 exit_code: Option<i32>,
731 error_count: u32,
732}
733
734impl AgentExecutionContext {
735 fn new(task: super::ScheduledTask, handle: TaskHandle) -> Self {
736 Self {
737 task,
738 handle,
739 start_time: SystemTime::now(),
740 memory_peak_mb: 0,
741 cpu_time_ms: 0,
742 exit_code: None,
743 error_count: 0,
744 }
745 }
746}
747
748#[derive(Debug)]
750struct ProcessHandle {
751 process_id: u32,
752 child: Arc<tokio::sync::Mutex<tokio::process::Child>>,
753 #[allow(dead_code)]
754 start_time: SystemTime,
755}
756
757#[derive(Debug, Clone)]
759struct ExecutionEnvironment {
760 working_directory: String,
761 environment_variables: Vec<(String, String)>,
762 #[allow(dead_code)]
763 resource_limits: ResourceLimits,
764}
765
766#[derive(Debug, Clone, serde::Serialize)]
768pub struct TaskStatistics {
769 pub total_tasks: usize,
770 pub healthy_tasks: usize,
771 pub average_uptime: Duration,
772 pub total_memory_usage: usize,
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778 use crate::types::{AgentConfig, ExecutionMode, Priority, ResourceLimits, SecurityTier};
779 use std::collections::HashMap;
780
781 fn create_test_task() -> super::super::ScheduledTask {
782 let agent_id = AgentId::new();
783 let config = AgentConfig {
784 id: agent_id,
785 name: "test".to_string(),
786 dsl_source: "test".to_string(),
787 execution_mode: ExecutionMode::Ephemeral,
788 security_tier: SecurityTier::Tier1,
789 resource_limits: ResourceLimits::default(),
790 capabilities: vec![],
791 policies: vec![],
792 metadata: HashMap::new(),
793 priority: Priority::Normal,
794 };
795 super::super::ScheduledTask::new(config)
796 }
797
798 #[tokio::test]
799 async fn test_task_start_and_health_check() {
800 let task_manager = TaskManager::new(Duration::from_secs(60));
801 let task = create_test_task();
802 let agent_id = task.agent_id;
803
804 let result = task_manager.start_task(task).await;
806 assert!(result.is_ok());
807
808 tokio::time::sleep(Duration::from_millis(50)).await;
810
811 let health = task_manager.check_task_health(agent_id).await;
813 assert!(health.is_ok());
814
815 let health = health.unwrap();
816 assert_eq!(health.agent_id, agent_id);
817 assert!(health.is_healthy);
818 }
819
820 #[tokio::test]
821 async fn test_task_termination() {
822 let task_manager = TaskManager::new(Duration::from_secs(60));
823 let task = create_test_task();
824 let agent_id = task.agent_id;
825
826 task_manager.start_task(task).await.unwrap();
828
829 tokio::time::sleep(Duration::from_millis(50)).await;
831
832 let result = task_manager.terminate_task(agent_id).await;
834 assert!(result.is_ok());
835
836 let health = task_manager.check_task_health(agent_id).await;
838 assert!(health.is_err());
839 }
840
841 #[tokio::test]
842 async fn test_task_statistics() {
843 let task_manager = TaskManager::new(Duration::from_secs(60));
844
845 for _ in 0..3 {
847 let task = create_test_task();
848 task_manager.start_task(task).await.unwrap();
849 }
850
851 tokio::time::sleep(Duration::from_millis(50)).await;
853
854 let stats = task_manager.get_task_statistics().await;
855 assert_eq!(stats.total_tasks, 3);
856 assert!(stats.healthy_tasks <= 3); }
858}