1use std::collections::HashMap;
14use std::path::PathBuf;
15use std::process::Stdio;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use serde::{Deserialize, Serialize};
20use tokio::io::{AsyncBufReadExt, BufReader};
21use tokio::process::{Child, Command};
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use super::context::ToolContext;
27use super::error::ToolError;
28
29pub const DEFAULT_MAX_CONCURRENT: usize = 10;
31
32pub const DEFAULT_MAX_RUNTIME_SECS: u64 = 1800;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
39pub enum TaskStatus {
40 Running,
42 Completed,
44 Failed,
46 TimedOut,
48 Killed,
50}
51
52impl TaskStatus {
53 pub fn is_terminal(&self) -> bool {
55 matches!(
56 self,
57 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::TimedOut | TaskStatus::Killed
58 )
59 }
60
61 pub fn is_running(&self) -> bool {
63 matches!(self, TaskStatus::Running)
64 }
65}
66
67impl std::fmt::Display for TaskStatus {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 match self {
70 TaskStatus::Running => write!(f, "running"),
71 TaskStatus::Completed => write!(f, "completed"),
72 TaskStatus::Failed => write!(f, "failed"),
73 TaskStatus::TimedOut => write!(f, "timed_out"),
74 TaskStatus::Killed => write!(f, "killed"),
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct TaskState {
85 pub task_id: String,
87 pub command: String,
89 pub status: TaskStatus,
91 pub output_file: PathBuf,
93 #[serde(with = "instant_serde")]
95 pub start_time: Instant,
96 #[serde(with = "option_instant_serde")]
98 pub end_time: Option<Instant>,
99 pub exit_code: Option<i32>,
101 pub working_directory: PathBuf,
103 pub session_id: String,
105}
106
107impl TaskState {
108 pub fn new(
110 task_id: String,
111 command: String,
112 output_file: PathBuf,
113 working_directory: PathBuf,
114 session_id: String,
115 ) -> Self {
116 Self {
117 task_id,
118 command,
119 status: TaskStatus::Running,
120 output_file,
121 start_time: Instant::now(),
122 end_time: None,
123 exit_code: None,
124 working_directory,
125 session_id,
126 }
127 }
128
129 pub fn duration(&self) -> Duration {
131 match self.end_time {
132 Some(end) => end.duration_since(self.start_time),
133 None => self.start_time.elapsed(),
134 }
135 }
136
137 pub fn mark_completed(&mut self, exit_code: i32) {
139 self.status = if exit_code == 0 {
140 TaskStatus::Completed
141 } else {
142 TaskStatus::Failed
143 };
144 self.end_time = Some(Instant::now());
145 self.exit_code = Some(exit_code);
146 }
147
148 pub fn mark_timed_out(&mut self) {
150 self.status = TaskStatus::TimedOut;
151 self.end_time = Some(Instant::now());
152 }
153
154 pub fn mark_killed(&mut self) {
156 self.status = TaskStatus::Killed;
157 self.end_time = Some(Instant::now());
158 }
159}
160
161struct TaskHandle {
163 child: Child,
165 state: TaskState,
167}
168
169impl std::fmt::Debug for TaskHandle {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("TaskHandle")
173 .field("state", &self.state)
174 .field("child", &"<Child process>")
175 .finish()
176 }
177}
178
179#[derive(Debug)]
189pub struct TaskManager {
190 tasks: Arc<RwLock<HashMap<String, TaskHandle>>>,
192 completed_tasks: Arc<RwLock<HashMap<String, TaskState>>>,
194 max_concurrent: usize,
196 max_runtime: Duration,
198 output_directory: PathBuf,
200}
201
202impl Default for TaskManager {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208impl TaskManager {
209 pub fn new() -> Self {
211 let output_dir = std::env::temp_dir().join("aster_tasks");
212 Self {
213 tasks: Arc::new(RwLock::new(HashMap::new())),
214 completed_tasks: Arc::new(RwLock::new(HashMap::new())),
215 max_concurrent: DEFAULT_MAX_CONCURRENT,
216 max_runtime: Duration::from_secs(DEFAULT_MAX_RUNTIME_SECS),
217 output_directory: output_dir,
218 }
219 }
220
221 pub fn with_config(
223 max_concurrent: usize,
224 max_runtime: Duration,
225 output_directory: PathBuf,
226 ) -> Self {
227 Self {
228 tasks: Arc::new(RwLock::new(HashMap::new())),
229 completed_tasks: Arc::new(RwLock::new(HashMap::new())),
230 max_concurrent,
231 max_runtime,
232 output_directory,
233 }
234 }
235
236 pub fn with_max_concurrent(mut self, max: usize) -> Self {
238 self.max_concurrent = max;
239 self
240 }
241
242 pub fn with_max_runtime(mut self, duration: Duration) -> Self {
244 self.max_runtime = duration;
245 self
246 }
247
248 pub fn with_output_directory(mut self, dir: PathBuf) -> Self {
250 self.output_directory = dir;
251 self
252 }
253
254 pub async fn running_count(&self) -> usize {
256 self.tasks.read().await.len()
257 }
258
259 pub fn max_concurrent(&self) -> usize {
261 self.max_concurrent
262 }
263
264 pub fn max_runtime(&self) -> Duration {
266 self.max_runtime
267 }
268}
269
270mod instant_serde {
272 use serde::{Deserializer, Serialize, Serializer};
273 use std::time::Instant;
274
275 pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
276 where
277 S: Serializer,
278 {
279 instant.elapsed().as_secs().serialize(serializer)
280 }
281
282 pub fn deserialize<'de, D>(_deserializer: D) -> Result<Instant, D::Error>
283 where
284 D: Deserializer<'de>,
285 {
286 Ok(Instant::now())
289 }
290}
291
292mod option_instant_serde {
293 use serde::{Deserializer, Serialize, Serializer};
294 use std::time::Instant;
295
296 pub fn serialize<S>(instant: &Option<Instant>, serializer: S) -> Result<S::Ok, S::Error>
297 where
298 S: Serializer,
299 {
300 match instant {
301 Some(i) => Some(i.elapsed().as_secs()).serialize(serializer),
302 None => None::<u64>.serialize(serializer),
303 }
304 }
305
306 pub fn deserialize<'de, D>(_deserializer: D) -> Result<Option<Instant>, D::Error>
307 where
308 D: Deserializer<'de>,
309 {
310 Ok(None)
311 }
312}
313
314impl TaskManager {
319 pub async fn start(&self, command: &str, context: &ToolContext) -> Result<String, ToolError> {
326 let running_count = self.running_count().await;
328 if running_count >= self.max_concurrent {
329 return Err(ToolError::execution_failed(format!(
330 "Maximum concurrent task limit ({}) reached. {} tasks currently running.",
331 self.max_concurrent, running_count
332 )));
333 }
334
335 let task_id = Uuid::new_v4().to_string();
337
338 if let Err(e) = tokio::fs::create_dir_all(&self.output_directory).await {
340 warn!("Failed to create output directory: {}", e);
341 }
342
343 let output_file = self.output_directory.join(format!("{}.log", task_id));
345
346 let mut cmd = self.build_command(command, context);
348
349 let output_file_handle = tokio::fs::File::create(&output_file).await.map_err(|e| {
351 ToolError::execution_failed(format!("Failed to create output file: {}", e))
352 })?;
353
354 let child = cmd
356 .stdout(Stdio::piped())
357 .stderr(Stdio::piped())
358 .stdin(Stdio::null())
359 .kill_on_drop(true)
360 .spawn()
361 .map_err(|e| ToolError::execution_failed(format!("Failed to spawn process: {}", e)))?;
362
363 let state = TaskState::new(
365 task_id.clone(),
366 command.to_string(),
367 output_file.clone(),
368 context.working_directory.clone(),
369 context.session_id.clone(),
370 );
371
372 info!(
373 "Started background task {} for command: {}",
374 task_id, command
375 );
376
377 let handle = TaskHandle {
379 child,
380 state: state.clone(),
381 };
382
383 {
385 let mut tasks = self.tasks.write().await;
386 tasks.insert(task_id.clone(), handle);
387 }
388
389 let tasks_clone = Arc::clone(&self.tasks);
391 let completed_clone = Arc::clone(&self.completed_tasks);
392 let task_id_clone = task_id.clone();
393 let max_runtime = self.max_runtime;
394
395 tokio::spawn(async move {
396 Self::monitor_task(
397 tasks_clone,
398 completed_clone,
399 task_id_clone,
400 output_file_handle,
401 max_runtime,
402 )
403 .await;
404 });
405
406 Ok(task_id)
407 }
408
409 fn build_command(&self, command: &str, context: &ToolContext) -> Command {
411 let mut cmd = if cfg!(target_os = "windows") {
412 let mut cmd = Command::new("powershell");
413 cmd.args(["-NoProfile", "-NonInteractive", "-Command", command]);
414 cmd
415 } else {
416 let mut cmd = Command::new("sh");
417 cmd.args(["-c", command]);
418 cmd
419 };
420
421 cmd.current_dir(&context.working_directory);
422 cmd.env("ASTER_TERMINAL", "1");
423 cmd.env("ASTER_BACKGROUND", "1");
424
425 for (key, value) in &context.environment {
426 cmd.env(key, value);
427 }
428
429 cmd
430 }
431
432 async fn monitor_task(
434 tasks: Arc<RwLock<HashMap<String, TaskHandle>>>,
435 completed_tasks: Arc<RwLock<HashMap<String, TaskState>>>,
436 task_id: String,
437 output_file: tokio::fs::File,
438 max_runtime: Duration,
439 ) {
440 use tokio::io::AsyncWriteExt;
441
442 let output_file = Arc::new(tokio::sync::Mutex::new(output_file));
443
444 let (stdout, stderr) = {
446 let mut tasks_guard = tasks.write().await;
447 if let Some(handle) = tasks_guard.get_mut(&task_id) {
448 let stdout = handle.child.stdout.take();
449 let stderr = handle.child.stderr.take();
450 (stdout, stderr)
451 } else {
452 return;
453 }
454 };
455
456 let output_file_stdout = Arc::clone(&output_file);
458 let stdout_task = async move {
459 if let Some(stdout) = stdout {
460 let mut reader = BufReader::new(stdout).lines();
461 while let Ok(Some(line)) = reader.next_line().await {
462 let mut file = output_file_stdout.lock().await;
463 let _ = file.write_all(format!("{}\n", line).as_bytes()).await;
464 }
465 }
466 };
467
468 let output_file_stderr = Arc::clone(&output_file);
469 let stderr_task = async move {
470 if let Some(stderr) = stderr {
471 let mut reader = BufReader::new(stderr).lines();
472 while let Ok(Some(line)) = reader.next_line().await {
473 let mut file = output_file_stderr.lock().await;
474 let _ = file
475 .write_all(format!("[stderr] {}\n", line).as_bytes())
476 .await;
477 }
478 }
479 };
480
481 let timeout_result = tokio::time::timeout(max_runtime, async {
483 tokio::join!(stdout_task, stderr_task);
484 })
485 .await;
486
487 {
489 let mut file = output_file.lock().await;
490 let _ = file.flush().await;
491 }
492
493 let mut tasks_guard = tasks.write().await;
495 if let Some(mut handle) = tasks_guard.remove(&task_id) {
496 if timeout_result.is_err() {
497 warn!("Task {} timed out after {:?}", task_id, max_runtime);
499 handle.state.mark_timed_out();
500 let _ = handle.child.kill().await;
502 } else {
503 match handle.child.wait().await {
505 Ok(status) => {
506 let exit_code = status.code().unwrap_or(-1);
507 debug!("Task {} completed with exit code {}", task_id, exit_code);
508 handle.state.mark_completed(exit_code);
509 }
510 Err(e) => {
511 error!("Failed to wait for task {}: {}", task_id, e);
512 handle.state.mark_completed(-1);
513 }
514 }
515 }
516
517 let mut completed = completed_tasks.write().await;
519 completed.insert(task_id, handle.state);
520 }
521 }
522}
523
524impl TaskManager {
529 pub async fn get_status(&self, task_id: &str) -> Option<TaskState> {
535 {
537 let tasks = self.tasks.read().await;
538 if let Some(handle) = tasks.get(task_id) {
539 return Some(handle.state.clone());
540 }
541 }
542
543 {
545 let completed = self.completed_tasks.read().await;
546 if let Some(state) = completed.get(task_id) {
547 return Some(state.clone());
548 }
549 }
550
551 None
552 }
553
554 pub async fn get_output(
561 &self,
562 task_id: &str,
563 lines: Option<usize>,
564 ) -> Result<String, ToolError> {
565 let output_file = {
567 let tasks = self.tasks.read().await;
569 if let Some(handle) = tasks.get(task_id) {
570 handle.state.output_file.clone()
571 } else {
572 let completed = self.completed_tasks.read().await;
574 if let Some(state) = completed.get(task_id) {
575 state.output_file.clone()
576 } else {
577 return Err(ToolError::not_found(format!("Task not found: {}", task_id)));
578 }
579 }
580 };
581
582 let content = tokio::fs::read_to_string(&output_file).await.map_err(|e| {
584 ToolError::execution_failed(format!("Failed to read output file: {}", e))
585 })?;
586
587 match lines {
589 Some(n) if n > 0 => {
590 let all_lines: Vec<&str> = content.lines().collect();
591 let start = all_lines.len().saturating_sub(n);
592 Ok(all_lines[start..].join("\n"))
593 }
594 _ => Ok(content),
595 }
596 }
597
598 pub async fn list_tasks(&self) -> Vec<TaskState> {
600 let mut result = Vec::new();
601
602 {
604 let tasks = self.tasks.read().await;
605 for handle in tasks.values() {
606 result.push(handle.state.clone());
607 }
608 }
609
610 {
612 let completed = self.completed_tasks.read().await;
613 for state in completed.values() {
614 result.push(state.clone());
615 }
616 }
617
618 result
619 }
620
621 pub async fn list_running_tasks(&self) -> Vec<TaskState> {
623 let tasks = self.tasks.read().await;
624 tasks.values().map(|h| h.state.clone()).collect()
625 }
626
627 pub async fn task_exists(&self, task_id: &str) -> bool {
629 self.get_status(task_id).await.is_some()
630 }
631}
632
633impl TaskManager {
638 pub async fn kill(&self, task_id: &str) -> Result<(), ToolError> {
646 let mut tasks = self.tasks.write().await;
647
648 if let Some(mut handle) = tasks.remove(task_id) {
649 info!("Killing task {}", task_id);
650
651 let graceful_timeout = Duration::from_millis(100);
653 match tokio::time::timeout(graceful_timeout, handle.child.wait()).await {
654 Ok(Ok(status)) => {
655 let exit_code = status.code().unwrap_or(-1);
657 debug!(
658 "Task {} already finished with exit code {}",
659 task_id, exit_code
660 );
661 handle.state.mark_completed(exit_code);
662 }
663 _ => {
664 debug!("Task {} still running, sending kill signal", task_id);
666 let _ = handle.child.kill().await;
667 let _ = handle.child.wait().await;
669 handle.state.mark_killed();
670 }
671 }
672
673 let mut completed = self.completed_tasks.write().await;
675 completed.insert(task_id.to_string(), handle.state);
676
677 Ok(())
678 } else {
679 let completed = self.completed_tasks.read().await;
681 if completed.contains_key(task_id) {
682 Err(ToolError::execution_failed(format!(
683 "Task {} has already completed",
684 task_id
685 )))
686 } else {
687 Err(ToolError::not_found(format!("Task not found: {}", task_id)))
688 }
689 }
690 }
691
692 pub async fn kill_all(&self) -> usize {
694 let task_ids: Vec<String> = {
695 let tasks = self.tasks.read().await;
696 tasks.keys().cloned().collect()
697 };
698
699 let mut killed = 0;
700 for task_id in task_ids {
701 if self.kill(&task_id).await.is_ok() {
702 killed += 1;
703 }
704 }
705
706 killed
707 }
708}
709
710impl TaskManager {
715 pub async fn cleanup_timed_out(&self) -> usize {
722 let timed_out_ids: Vec<String> = {
723 let tasks = self.tasks.read().await;
724 tasks
725 .iter()
726 .filter(|(_, handle)| handle.state.duration() > self.max_runtime)
727 .map(|(id, _)| id.clone())
728 .collect()
729 };
730
731 let mut cleaned = 0;
732 for task_id in timed_out_ids {
733 warn!("Cleaning up timed-out task: {}", task_id);
734
735 let mut tasks = self.tasks.write().await;
736 if let Some(mut handle) = tasks.remove(&task_id) {
737 let _ = handle.child.kill().await;
739
740 handle.state.mark_timed_out();
742
743 let mut completed = self.completed_tasks.write().await;
745 completed.insert(task_id, handle.state);
746
747 cleaned += 1;
748 }
749 }
750
751 cleaned
752 }
753
754 pub async fn cleanup_old_completed(&self, max_age: Duration) -> usize {
759 let old_task_ids: Vec<String> = {
760 let completed = self.completed_tasks.read().await;
761 completed
762 .iter()
763 .filter(|(_, state)| state.end_time.is_some_and(|end| end.elapsed() > max_age))
764 .map(|(id, _)| id.clone())
765 .collect()
766 };
767
768 let mut cleaned = 0;
769 for task_id in old_task_ids {
770 let mut completed = self.completed_tasks.write().await;
771 if let Some(state) = completed.remove(&task_id) {
772 if let Err(e) = tokio::fs::remove_file(&state.output_file).await {
774 debug!("Failed to remove output file for task {}: {}", task_id, e);
775 }
776 cleaned += 1;
777 }
778 }
779
780 cleaned
781 }
782
783 pub fn start_cleanup_task(self: Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
785 tokio::spawn(async move {
786 let mut interval_timer = tokio::time::interval(interval);
787 loop {
788 interval_timer.tick().await;
789 let cleaned = self.cleanup_timed_out().await;
790 if cleaned > 0 {
791 info!("Cleaned up {} timed-out tasks", cleaned);
792 }
793 }
794 })
795 }
796}
797
798#[cfg(test)]
803mod tests {
804 use super::*;
805 use std::path::PathBuf;
806 use tempfile::TempDir;
807
808 fn create_test_context() -> ToolContext {
809 ToolContext::new(PathBuf::from("/tmp"))
810 .with_session_id("test-session")
811 .with_user("test-user")
812 }
813
814 fn create_test_manager(temp_dir: &TempDir) -> TaskManager {
815 TaskManager::new()
816 .with_output_directory(temp_dir.path().to_path_buf())
817 .with_max_concurrent(5)
818 .with_max_runtime(Duration::from_secs(60))
819 }
820
821 #[test]
824 fn test_task_status_is_terminal() {
825 assert!(!TaskStatus::Running.is_terminal());
826 assert!(TaskStatus::Completed.is_terminal());
827 assert!(TaskStatus::Failed.is_terminal());
828 assert!(TaskStatus::TimedOut.is_terminal());
829 assert!(TaskStatus::Killed.is_terminal());
830 }
831
832 #[test]
833 fn test_task_status_is_running() {
834 assert!(TaskStatus::Running.is_running());
835 assert!(!TaskStatus::Completed.is_running());
836 assert!(!TaskStatus::Failed.is_running());
837 assert!(!TaskStatus::TimedOut.is_running());
838 assert!(!TaskStatus::Killed.is_running());
839 }
840
841 #[test]
842 fn test_task_status_display() {
843 assert_eq!(TaskStatus::Running.to_string(), "running");
844 assert_eq!(TaskStatus::Completed.to_string(), "completed");
845 assert_eq!(TaskStatus::Failed.to_string(), "failed");
846 assert_eq!(TaskStatus::TimedOut.to_string(), "timed_out");
847 assert_eq!(TaskStatus::Killed.to_string(), "killed");
848 }
849
850 #[test]
853 fn test_task_state_new() {
854 let state = TaskState::new(
855 "task-123".to_string(),
856 "echo hello".to_string(),
857 PathBuf::from("/tmp/task-123.log"),
858 PathBuf::from("/tmp"),
859 "session-1".to_string(),
860 );
861
862 assert_eq!(state.task_id, "task-123");
863 assert_eq!(state.command, "echo hello");
864 assert_eq!(state.status, TaskStatus::Running);
865 assert!(state.end_time.is_none());
866 assert!(state.exit_code.is_none());
867 }
868
869 #[test]
870 fn test_task_state_mark_completed() {
871 let mut state = TaskState::new(
872 "task-123".to_string(),
873 "echo hello".to_string(),
874 PathBuf::from("/tmp/task-123.log"),
875 PathBuf::from("/tmp"),
876 "session-1".to_string(),
877 );
878
879 state.mark_completed(0);
880 assert_eq!(state.status, TaskStatus::Completed);
881 assert!(state.end_time.is_some());
882 assert_eq!(state.exit_code, Some(0));
883
884 let mut state2 = TaskState::new(
885 "task-456".to_string(),
886 "exit 1".to_string(),
887 PathBuf::from("/tmp/task-456.log"),
888 PathBuf::from("/tmp"),
889 "session-1".to_string(),
890 );
891
892 state2.mark_completed(1);
893 assert_eq!(state2.status, TaskStatus::Failed);
894 assert_eq!(state2.exit_code, Some(1));
895 }
896
897 #[test]
898 fn test_task_state_mark_timed_out() {
899 let mut state = TaskState::new(
900 "task-123".to_string(),
901 "sleep 100".to_string(),
902 PathBuf::from("/tmp/task-123.log"),
903 PathBuf::from("/tmp"),
904 "session-1".to_string(),
905 );
906
907 state.mark_timed_out();
908 assert_eq!(state.status, TaskStatus::TimedOut);
909 assert!(state.end_time.is_some());
910 }
911
912 #[test]
913 fn test_task_state_mark_killed() {
914 let mut state = TaskState::new(
915 "task-123".to_string(),
916 "sleep 100".to_string(),
917 PathBuf::from("/tmp/task-123.log"),
918 PathBuf::from("/tmp"),
919 "session-1".to_string(),
920 );
921
922 state.mark_killed();
923 assert_eq!(state.status, TaskStatus::Killed);
924 assert!(state.end_time.is_some());
925 }
926
927 #[test]
930 fn test_task_manager_default() {
931 let manager = TaskManager::new();
932 assert_eq!(manager.max_concurrent(), DEFAULT_MAX_CONCURRENT);
933 assert_eq!(
934 manager.max_runtime(),
935 Duration::from_secs(DEFAULT_MAX_RUNTIME_SECS)
936 );
937 }
938
939 #[test]
940 fn test_task_manager_builder() {
941 let manager = TaskManager::new()
942 .with_max_concurrent(20)
943 .with_max_runtime(Duration::from_secs(3600))
944 .with_output_directory(PathBuf::from("/custom/output"));
945
946 assert_eq!(manager.max_concurrent(), 20);
947 assert_eq!(manager.max_runtime(), Duration::from_secs(3600));
948 }
949
950 #[tokio::test]
951 async fn test_task_manager_running_count() {
952 let manager = TaskManager::new();
953 assert_eq!(manager.running_count().await, 0);
954 }
955
956 #[tokio::test]
957 async fn test_start_simple_task() {
958 let temp_dir = TempDir::new().unwrap();
959 let manager = create_test_manager(&temp_dir);
960 let context = create_test_context();
961
962 let command = "echo hello";
963
964 let result = manager.start(command, &context).await;
965 assert!(result.is_ok());
966
967 let task_id = result.unwrap();
968 assert!(!task_id.is_empty());
969
970 tokio::time::sleep(Duration::from_millis(500)).await;
972
973 let status = manager.get_status(&task_id).await;
975 assert!(status.is_some());
976 }
977
978 #[tokio::test]
979 async fn test_start_task_concurrent_limit() {
980 let temp_dir = TempDir::new().unwrap();
981 let manager = TaskManager::new()
982 .with_output_directory(temp_dir.path().to_path_buf())
983 .with_max_concurrent(1)
984 .with_max_runtime(Duration::from_secs(60));
985 let context = create_test_context();
986
987 let command = if cfg!(target_os = "windows") {
988 "timeout /t 10"
989 } else {
990 "sleep 10"
991 };
992
993 let result1 = manager.start(command, &context).await;
995 assert!(result1.is_ok());
996
997 let result2 = manager.start(command, &context).await;
999 assert!(result2.is_err());
1000
1001 let _ = manager.kill_all().await;
1003 }
1004
1005 #[tokio::test]
1006 async fn test_get_status_not_found() {
1007 let manager = TaskManager::new();
1008 let status = manager.get_status("nonexistent-task").await;
1009 assert!(status.is_none());
1010 }
1011
1012 #[tokio::test]
1013 async fn test_get_output_not_found() {
1014 let manager = TaskManager::new();
1015 let result = manager.get_output("nonexistent-task", None).await;
1016 assert!(result.is_err());
1017 assert!(matches!(result.unwrap_err(), ToolError::NotFound(_)));
1018 }
1019
1020 #[tokio::test]
1021 async fn test_get_output_with_lines() {
1022 let temp_dir = TempDir::new().unwrap();
1023 let manager = create_test_manager(&temp_dir);
1024 let context = create_test_context();
1025
1026 let command = if cfg!(target_os = "windows") {
1028 "echo line1 && echo line2 && echo line3"
1029 } else {
1030 "echo line1; echo line2; echo line3"
1031 };
1032
1033 let task_id = manager.start(command, &context).await.unwrap();
1034
1035 tokio::time::sleep(Duration::from_millis(500)).await;
1037
1038 let output = manager.get_output(&task_id, Some(2)).await;
1040 assert!(output.is_ok());
1041 }
1042
1043 #[tokio::test]
1044 async fn test_kill_task() {
1045 let temp_dir = TempDir::new().unwrap();
1046 let manager = create_test_manager(&temp_dir);
1047 let context = create_test_context();
1048
1049 let command = if cfg!(target_os = "windows") {
1050 "timeout /t 60"
1051 } else {
1052 "sleep 60"
1053 };
1054
1055 let task_id = manager.start(command, &context).await.unwrap();
1056
1057 let result = manager.kill(&task_id).await;
1059 assert!(result.is_ok());
1060
1061 let status = manager.get_status(&task_id).await;
1063 assert!(status.is_some());
1064 assert_eq!(status.unwrap().status, TaskStatus::Killed);
1065 }
1066
1067 #[tokio::test]
1068 async fn test_kill_nonexistent_task() {
1069 let manager = TaskManager::new();
1070 let result = manager.kill("nonexistent-task").await;
1071 assert!(result.is_err());
1072 assert!(matches!(result.unwrap_err(), ToolError::NotFound(_)));
1073 }
1074
1075 #[tokio::test]
1076 async fn test_list_tasks() {
1077 let temp_dir = TempDir::new().unwrap();
1078 let manager = create_test_manager(&temp_dir);
1079 let context = create_test_context();
1080
1081 let command = "echo hello";
1083 let _ = manager.start(command, &context).await.unwrap();
1084
1085 tokio::time::sleep(Duration::from_millis(100)).await;
1087
1088 let tasks = manager.list_tasks().await;
1090 assert!(!tasks.is_empty());
1091 }
1092
1093 #[tokio::test]
1094 async fn test_task_exists() {
1095 let temp_dir = TempDir::new().unwrap();
1096 let manager = create_test_manager(&temp_dir);
1097 let context = create_test_context();
1098
1099 let task_id = manager.start("echo hello", &context).await.unwrap();
1100
1101 assert!(manager.task_exists(&task_id).await);
1102 assert!(!manager.task_exists("nonexistent").await);
1103 }
1104
1105 #[tokio::test]
1106 async fn test_kill_all() {
1107 let temp_dir = TempDir::new().unwrap();
1108 let manager = create_test_manager(&temp_dir);
1109 let context = create_test_context();
1110
1111 let command = if cfg!(target_os = "windows") {
1112 "timeout /t 60"
1113 } else {
1114 "sleep 60"
1115 };
1116
1117 let _ = manager.start(command, &context).await.unwrap();
1119 let _ = manager.start(command, &context).await.unwrap();
1120
1121 let killed = manager.kill_all().await;
1123 assert_eq!(killed, 2);
1124
1125 assert_eq!(manager.running_count().await, 0);
1127 }
1128}