Skip to main content

aster/tools/
task.rs

1//! Task Manager for Background Task Execution
2//!
3//! This module implements the `TaskManager` for managing background tasks:
4//! - Starting background tasks with unique task_id
5//! - Querying task status and output
6//! - Killing running tasks
7//! - Enforcing maximum concurrent task limit
8//! - Automatic cleanup of timed-out tasks
9//! - Persisting task output to files for retrieval
10//!
11//! Requirements: 10.1, 10.2, 10.3, 10.4, 10.5, 10.6
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::process::Stdio;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use serde::{Deserialize, Serialize};
20use tokio::io::{AsyncBufReadExt, BufReader};
21use tokio::process::{Child, Command};
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use super::context::ToolContext;
27use super::error::ToolError;
28
29/// Default maximum concurrent tasks
30pub const DEFAULT_MAX_CONCURRENT: usize = 10;
31
32/// Default maximum runtime for a task (30 minutes)
33pub const DEFAULT_MAX_RUNTIME_SECS: u64 = 1800;
34
35/// Task status enumeration
36///
37/// Represents the current state of a background task.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
39pub enum TaskStatus {
40    /// Task is currently running
41    Running,
42    /// Task completed successfully
43    Completed,
44    /// Task failed with an error
45    Failed,
46    /// Task was terminated due to timeout
47    TimedOut,
48    /// Task was killed by user request
49    Killed,
50}
51
52impl TaskStatus {
53    /// Check if the task is in a terminal state
54    pub fn is_terminal(&self) -> bool {
55        matches!(
56            self,
57            TaskStatus::Completed | TaskStatus::Failed | TaskStatus::TimedOut | TaskStatus::Killed
58        )
59    }
60
61    /// Check if the task is still running
62    pub fn is_running(&self) -> bool {
63        matches!(self, TaskStatus::Running)
64    }
65}
66
67impl std::fmt::Display for TaskStatus {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        match self {
70            TaskStatus::Running => write!(f, "running"),
71            TaskStatus::Completed => write!(f, "completed"),
72            TaskStatus::Failed => write!(f, "failed"),
73            TaskStatus::TimedOut => write!(f, "timed_out"),
74            TaskStatus::Killed => write!(f, "killed"),
75        }
76    }
77}
78
79/// Task state information
80///
81/// Contains all information about a background task.
82/// Requirements: 10.1
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct TaskState {
85    /// Unique task identifier
86    pub task_id: String,
87    /// The command being executed
88    pub command: String,
89    /// Current task status
90    pub status: TaskStatus,
91    /// Path to the output file
92    pub output_file: PathBuf,
93    /// Task start time (as duration since UNIX_EPOCH for serialization)
94    #[serde(with = "instant_serde")]
95    pub start_time: Instant,
96    /// Task end time (if completed)
97    #[serde(with = "option_instant_serde")]
98    pub end_time: Option<Instant>,
99    /// Exit code (if completed)
100    pub exit_code: Option<i32>,
101    /// Working directory
102    pub working_directory: PathBuf,
103    /// Session ID
104    pub session_id: String,
105}
106
107impl TaskState {
108    /// Create a new TaskState for a starting task
109    pub fn new(
110        task_id: String,
111        command: String,
112        output_file: PathBuf,
113        working_directory: PathBuf,
114        session_id: String,
115    ) -> Self {
116        Self {
117            task_id,
118            command,
119            status: TaskStatus::Running,
120            output_file,
121            start_time: Instant::now(),
122            end_time: None,
123            exit_code: None,
124            working_directory,
125            session_id,
126        }
127    }
128
129    /// Get the duration the task has been running
130    pub fn duration(&self) -> Duration {
131        match self.end_time {
132            Some(end) => end.duration_since(self.start_time),
133            None => self.start_time.elapsed(),
134        }
135    }
136
137    /// Mark the task as completed
138    pub fn mark_completed(&mut self, exit_code: i32) {
139        self.status = if exit_code == 0 {
140            TaskStatus::Completed
141        } else {
142            TaskStatus::Failed
143        };
144        self.end_time = Some(Instant::now());
145        self.exit_code = Some(exit_code);
146    }
147
148    /// Mark the task as timed out
149    pub fn mark_timed_out(&mut self) {
150        self.status = TaskStatus::TimedOut;
151        self.end_time = Some(Instant::now());
152    }
153
154    /// Mark the task as killed
155    pub fn mark_killed(&mut self) {
156        self.status = TaskStatus::Killed;
157        self.end_time = Some(Instant::now());
158    }
159}
160
161/// Internal task handle for managing running processes
162struct TaskHandle {
163    /// The child process
164    child: Child,
165    /// Task state
166    state: TaskState,
167}
168
169// Manual Debug implementation for TaskHandle since Child doesn't implement Debug
170impl std::fmt::Debug for TaskHandle {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("TaskHandle")
173            .field("state", &self.state)
174            .field("child", &"<Child process>")
175            .finish()
176    }
177}
178
179/// Task Manager for background task execution
180///
181/// Manages background tasks with:
182/// - Unique task IDs
183/// - Concurrent task limits
184/// - Timeout enforcement
185/// - Output persistence
186///
187/// Requirements: 10.1, 10.2, 10.3, 10.4, 10.5, 10.6
188#[derive(Debug)]
189pub struct TaskManager {
190    /// Running tasks (task_id -> TaskHandle)
191    tasks: Arc<RwLock<HashMap<String, TaskHandle>>>,
192    /// Completed task states (for status queries after completion)
193    completed_tasks: Arc<RwLock<HashMap<String, TaskState>>>,
194    /// Maximum number of concurrent tasks
195    max_concurrent: usize,
196    /// Maximum runtime for a task before timeout
197    max_runtime: Duration,
198    /// Directory for storing task output files
199    output_directory: PathBuf,
200}
201
202impl Default for TaskManager {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208impl TaskManager {
209    /// Create a new TaskManager with default settings
210    pub fn new() -> Self {
211        let output_dir = std::env::temp_dir().join("aster_tasks");
212        Self {
213            tasks: Arc::new(RwLock::new(HashMap::new())),
214            completed_tasks: Arc::new(RwLock::new(HashMap::new())),
215            max_concurrent: DEFAULT_MAX_CONCURRENT,
216            max_runtime: Duration::from_secs(DEFAULT_MAX_RUNTIME_SECS),
217            output_directory: output_dir,
218        }
219    }
220
221    /// Create a TaskManager with custom settings
222    pub fn with_config(
223        max_concurrent: usize,
224        max_runtime: Duration,
225        output_directory: PathBuf,
226    ) -> Self {
227        Self {
228            tasks: Arc::new(RwLock::new(HashMap::new())),
229            completed_tasks: Arc::new(RwLock::new(HashMap::new())),
230            max_concurrent,
231            max_runtime,
232            output_directory,
233        }
234    }
235
236    /// Set maximum concurrent tasks
237    pub fn with_max_concurrent(mut self, max: usize) -> Self {
238        self.max_concurrent = max;
239        self
240    }
241
242    /// Set maximum runtime
243    pub fn with_max_runtime(mut self, duration: Duration) -> Self {
244        self.max_runtime = duration;
245        self
246    }
247
248    /// Set output directory
249    pub fn with_output_directory(mut self, dir: PathBuf) -> Self {
250        self.output_directory = dir;
251        self
252    }
253
254    /// Get the number of currently running tasks
255    pub async fn running_count(&self) -> usize {
256        self.tasks.read().await.len()
257    }
258
259    /// Get the maximum concurrent tasks limit
260    pub fn max_concurrent(&self) -> usize {
261        self.max_concurrent
262    }
263
264    /// Get the maximum runtime
265    pub fn max_runtime(&self) -> Duration {
266        self.max_runtime
267    }
268}
269
270// Serde helpers for Instant (which doesn't implement Serialize/Deserialize)
271mod instant_serde {
272    use serde::{Deserializer, Serialize, Serializer};
273    use std::time::Instant;
274
275    pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
276    where
277        S: Serializer,
278    {
279        instant.elapsed().as_secs().serialize(serializer)
280    }
281
282    pub fn deserialize<'de, D>(_deserializer: D) -> Result<Instant, D::Error>
283    where
284        D: Deserializer<'de>,
285    {
286        // We can't truly deserialize an Instant, so we return now
287        // This is acceptable since we mainly use this for display purposes
288        Ok(Instant::now())
289    }
290}
291
292mod option_instant_serde {
293    use serde::{Deserializer, Serialize, Serializer};
294    use std::time::Instant;
295
296    pub fn serialize<S>(instant: &Option<Instant>, serializer: S) -> Result<S::Ok, S::Error>
297    where
298        S: Serializer,
299    {
300        match instant {
301            Some(i) => Some(i.elapsed().as_secs()).serialize(serializer),
302            None => None::<u64>.serialize(serializer),
303        }
304    }
305
306    pub fn deserialize<'de, D>(_deserializer: D) -> Result<Option<Instant>, D::Error>
307    where
308        D: Deserializer<'de>,
309    {
310        Ok(None)
311    }
312}
313
314// =============================================================================
315// Task Start Implementation (Requirements: 10.1, 10.4)
316// =============================================================================
317
318impl TaskManager {
319    /// Start a background task
320    ///
321    /// Creates a new background task with a unique task_id.
322    /// The task runs asynchronously and output is persisted to a file.
323    ///
324    /// Requirements: 10.1, 10.4
325    pub async fn start(&self, command: &str, context: &ToolContext) -> Result<String, ToolError> {
326        // Check concurrent task limit
327        let running_count = self.running_count().await;
328        if running_count >= self.max_concurrent {
329            return Err(ToolError::execution_failed(format!(
330                "Maximum concurrent task limit ({}) reached. {} tasks currently running.",
331                self.max_concurrent, running_count
332            )));
333        }
334
335        // Generate unique task ID
336        let task_id = Uuid::new_v4().to_string();
337
338        // Ensure output directory exists
339        if let Err(e) = tokio::fs::create_dir_all(&self.output_directory).await {
340            warn!("Failed to create output directory: {}", e);
341        }
342
343        // Create output file path
344        let output_file = self.output_directory.join(format!("{}.log", task_id));
345
346        // Build the command
347        let mut cmd = self.build_command(command, context);
348
349        // Create output file for writing
350        let output_file_handle = tokio::fs::File::create(&output_file).await.map_err(|e| {
351            ToolError::execution_failed(format!("Failed to create output file: {}", e))
352        })?;
353
354        // Spawn the process
355        let child = cmd
356            .stdout(Stdio::piped())
357            .stderr(Stdio::piped())
358            .stdin(Stdio::null())
359            .kill_on_drop(true)
360            .spawn()
361            .map_err(|e| ToolError::execution_failed(format!("Failed to spawn process: {}", e)))?;
362
363        // Create task state
364        let state = TaskState::new(
365            task_id.clone(),
366            command.to_string(),
367            output_file.clone(),
368            context.working_directory.clone(),
369            context.session_id.clone(),
370        );
371
372        info!(
373            "Started background task {} for command: {}",
374            task_id, command
375        );
376
377        // Create task handle
378        let handle = TaskHandle {
379            child,
380            state: state.clone(),
381        };
382
383        // Store the task
384        {
385            let mut tasks = self.tasks.write().await;
386            tasks.insert(task_id.clone(), handle);
387        }
388
389        // Spawn a task to monitor the process and capture output
390        let tasks_clone = Arc::clone(&self.tasks);
391        let completed_clone = Arc::clone(&self.completed_tasks);
392        let task_id_clone = task_id.clone();
393        let max_runtime = self.max_runtime;
394
395        tokio::spawn(async move {
396            Self::monitor_task(
397                tasks_clone,
398                completed_clone,
399                task_id_clone,
400                output_file_handle,
401                max_runtime,
402            )
403            .await;
404        });
405
406        Ok(task_id)
407    }
408
409    /// Build a platform-specific command
410    fn build_command(&self, command: &str, context: &ToolContext) -> Command {
411        let mut cmd = if cfg!(target_os = "windows") {
412            let mut cmd = Command::new("powershell");
413            cmd.args(["-NoProfile", "-NonInteractive", "-Command", command]);
414            cmd
415        } else {
416            let mut cmd = Command::new("sh");
417            cmd.args(["-c", command]);
418            cmd
419        };
420
421        cmd.current_dir(&context.working_directory);
422        cmd.env("ASTER_TERMINAL", "1");
423        cmd.env("ASTER_BACKGROUND", "1");
424
425        for (key, value) in &context.environment {
426            cmd.env(key, value);
427        }
428
429        cmd
430    }
431
432    /// Monitor a running task and capture its output
433    async fn monitor_task(
434        tasks: Arc<RwLock<HashMap<String, TaskHandle>>>,
435        completed_tasks: Arc<RwLock<HashMap<String, TaskState>>>,
436        task_id: String,
437        output_file: tokio::fs::File,
438        max_runtime: Duration,
439    ) {
440        use tokio::io::AsyncWriteExt;
441
442        let output_file = Arc::new(tokio::sync::Mutex::new(output_file));
443
444        // Get the child process handles
445        let (stdout, stderr) = {
446            let mut tasks_guard = tasks.write().await;
447            if let Some(handle) = tasks_guard.get_mut(&task_id) {
448                let stdout = handle.child.stdout.take();
449                let stderr = handle.child.stderr.take();
450                (stdout, stderr)
451            } else {
452                return;
453            }
454        };
455
456        // Read stdout and stderr concurrently using shared output file
457        let output_file_stdout = Arc::clone(&output_file);
458        let stdout_task = async move {
459            if let Some(stdout) = stdout {
460                let mut reader = BufReader::new(stdout).lines();
461                while let Ok(Some(line)) = reader.next_line().await {
462                    let mut file = output_file_stdout.lock().await;
463                    let _ = file.write_all(format!("{}\n", line).as_bytes()).await;
464                }
465            }
466        };
467
468        let output_file_stderr = Arc::clone(&output_file);
469        let stderr_task = async move {
470            if let Some(stderr) = stderr {
471                let mut reader = BufReader::new(stderr).lines();
472                while let Ok(Some(line)) = reader.next_line().await {
473                    let mut file = output_file_stderr.lock().await;
474                    let _ = file
475                        .write_all(format!("[stderr] {}\n", line).as_bytes())
476                        .await;
477                }
478            }
479        };
480
481        // Wait for output streams with timeout
482        let timeout_result = tokio::time::timeout(max_runtime, async {
483            tokio::join!(stdout_task, stderr_task);
484        })
485        .await;
486
487        // Flush output file
488        {
489            let mut file = output_file.lock().await;
490            let _ = file.flush().await;
491        }
492
493        // Update task state based on result
494        let mut tasks_guard = tasks.write().await;
495        if let Some(mut handle) = tasks_guard.remove(&task_id) {
496            if timeout_result.is_err() {
497                // Task timed out
498                warn!("Task {} timed out after {:?}", task_id, max_runtime);
499                handle.state.mark_timed_out();
500                // Try to kill the process
501                let _ = handle.child.kill().await;
502            } else {
503                // Wait for the process to complete
504                match handle.child.wait().await {
505                    Ok(status) => {
506                        let exit_code = status.code().unwrap_or(-1);
507                        debug!("Task {} completed with exit code {}", task_id, exit_code);
508                        handle.state.mark_completed(exit_code);
509                    }
510                    Err(e) => {
511                        error!("Failed to wait for task {}: {}", task_id, e);
512                        handle.state.mark_completed(-1);
513                    }
514                }
515            }
516
517            // Move to completed tasks
518            let mut completed = completed_tasks.write().await;
519            completed.insert(task_id, handle.state);
520        }
521    }
522}
523
524// =============================================================================
525// Task Query Implementation (Requirements: 10.2, 10.6)
526// =============================================================================
527
528impl TaskManager {
529    /// Get the status of a task
530    ///
531    /// Returns the current state of a task, or None if not found.
532    ///
533    /// Requirements: 10.2
534    pub async fn get_status(&self, task_id: &str) -> Option<TaskState> {
535        // Check running tasks first
536        {
537            let tasks = self.tasks.read().await;
538            if let Some(handle) = tasks.get(task_id) {
539                return Some(handle.state.clone());
540            }
541        }
542
543        // Check completed tasks
544        {
545            let completed = self.completed_tasks.read().await;
546            if let Some(state) = completed.get(task_id) {
547                return Some(state.clone());
548            }
549        }
550
551        None
552    }
553
554    /// Get the output of a task
555    ///
556    /// Returns the output from the task's output file.
557    /// Optionally limits to the last N lines.
558    ///
559    /// Requirements: 10.6
560    pub async fn get_output(
561        &self,
562        task_id: &str,
563        lines: Option<usize>,
564    ) -> Result<String, ToolError> {
565        // Find the output file path
566        let output_file = {
567            // Check running tasks
568            let tasks = self.tasks.read().await;
569            if let Some(handle) = tasks.get(task_id) {
570                handle.state.output_file.clone()
571            } else {
572                // Check completed tasks
573                let completed = self.completed_tasks.read().await;
574                if let Some(state) = completed.get(task_id) {
575                    state.output_file.clone()
576                } else {
577                    return Err(ToolError::not_found(format!("Task not found: {}", task_id)));
578                }
579            }
580        };
581
582        // Read the output file
583        let content = tokio::fs::read_to_string(&output_file).await.map_err(|e| {
584            ToolError::execution_failed(format!("Failed to read output file: {}", e))
585        })?;
586
587        // Apply line limit if specified
588        match lines {
589            Some(n) if n > 0 => {
590                let all_lines: Vec<&str> = content.lines().collect();
591                let start = all_lines.len().saturating_sub(n);
592                Ok(all_lines[start..].join("\n"))
593            }
594            _ => Ok(content),
595        }
596    }
597
598    /// List all tasks (running and completed)
599    pub async fn list_tasks(&self) -> Vec<TaskState> {
600        let mut result = Vec::new();
601
602        // Add running tasks
603        {
604            let tasks = self.tasks.read().await;
605            for handle in tasks.values() {
606                result.push(handle.state.clone());
607            }
608        }
609
610        // Add completed tasks
611        {
612            let completed = self.completed_tasks.read().await;
613            for state in completed.values() {
614                result.push(state.clone());
615            }
616        }
617
618        result
619    }
620
621    /// List only running tasks
622    pub async fn list_running_tasks(&self) -> Vec<TaskState> {
623        let tasks = self.tasks.read().await;
624        tasks.values().map(|h| h.state.clone()).collect()
625    }
626
627    /// Check if a task exists
628    pub async fn task_exists(&self, task_id: &str) -> bool {
629        self.get_status(task_id).await.is_some()
630    }
631}
632
633// =============================================================================
634// Task Termination Implementation (Requirements: 10.3)
635// =============================================================================
636
637impl TaskManager {
638    /// Kill a running task
639    ///
640    /// Attempts to terminate the task. On Unix systems, we use tokio's
641    /// kill method which sends SIGKILL. For graceful termination,
642    /// we first try to wait with a short timeout.
643    ///
644    /// Requirements: 10.3
645    pub async fn kill(&self, task_id: &str) -> Result<(), ToolError> {
646        let mut tasks = self.tasks.write().await;
647
648        if let Some(mut handle) = tasks.remove(task_id) {
649            info!("Killing task {}", task_id);
650
651            // Try to wait briefly first (gives process a chance to finish naturally)
652            let graceful_timeout = Duration::from_millis(100);
653            match tokio::time::timeout(graceful_timeout, handle.child.wait()).await {
654                Ok(Ok(status)) => {
655                    // Process already finished
656                    let exit_code = status.code().unwrap_or(-1);
657                    debug!(
658                        "Task {} already finished with exit code {}",
659                        task_id, exit_code
660                    );
661                    handle.state.mark_completed(exit_code);
662                }
663                _ => {
664                    // Process still running, kill it
665                    debug!("Task {} still running, sending kill signal", task_id);
666                    let _ = handle.child.kill().await;
667                    // Wait for the process to actually terminate
668                    let _ = handle.child.wait().await;
669                    handle.state.mark_killed();
670                }
671            }
672
673            // Move to completed tasks
674            let mut completed = self.completed_tasks.write().await;
675            completed.insert(task_id.to_string(), handle.state);
676
677            Ok(())
678        } else {
679            // Check if it's already completed
680            let completed = self.completed_tasks.read().await;
681            if completed.contains_key(task_id) {
682                Err(ToolError::execution_failed(format!(
683                    "Task {} has already completed",
684                    task_id
685                )))
686            } else {
687                Err(ToolError::not_found(format!("Task not found: {}", task_id)))
688            }
689        }
690    }
691
692    /// Kill all running tasks
693    pub async fn kill_all(&self) -> usize {
694        let task_ids: Vec<String> = {
695            let tasks = self.tasks.read().await;
696            tasks.keys().cloned().collect()
697        };
698
699        let mut killed = 0;
700        for task_id in task_ids {
701            if self.kill(&task_id).await.is_ok() {
702                killed += 1;
703            }
704        }
705
706        killed
707    }
708}
709
710// =============================================================================
711// Timeout Cleanup Implementation (Requirements: 10.5)
712// =============================================================================
713
714impl TaskManager {
715    /// Cleanup timed-out tasks
716    ///
717    /// Checks all running tasks and terminates those that have exceeded
718    /// the maximum runtime. Returns the number of tasks cleaned up.
719    ///
720    /// Requirements: 10.5
721    pub async fn cleanup_timed_out(&self) -> usize {
722        let timed_out_ids: Vec<String> = {
723            let tasks = self.tasks.read().await;
724            tasks
725                .iter()
726                .filter(|(_, handle)| handle.state.duration() > self.max_runtime)
727                .map(|(id, _)| id.clone())
728                .collect()
729        };
730
731        let mut cleaned = 0;
732        for task_id in timed_out_ids {
733            warn!("Cleaning up timed-out task: {}", task_id);
734
735            let mut tasks = self.tasks.write().await;
736            if let Some(mut handle) = tasks.remove(&task_id) {
737                // Kill the process
738                let _ = handle.child.kill().await;
739
740                // Update state
741                handle.state.mark_timed_out();
742
743                // Move to completed tasks
744                let mut completed = self.completed_tasks.write().await;
745                completed.insert(task_id, handle.state);
746
747                cleaned += 1;
748            }
749        }
750
751        cleaned
752    }
753
754    /// Cleanup old completed tasks
755    ///
756    /// Removes completed task records older than the specified duration.
757    /// Also removes their output files.
758    pub async fn cleanup_old_completed(&self, max_age: Duration) -> usize {
759        let old_task_ids: Vec<String> = {
760            let completed = self.completed_tasks.read().await;
761            completed
762                .iter()
763                .filter(|(_, state)| state.end_time.is_some_and(|end| end.elapsed() > max_age))
764                .map(|(id, _)| id.clone())
765                .collect()
766        };
767
768        let mut cleaned = 0;
769        for task_id in old_task_ids {
770            let mut completed = self.completed_tasks.write().await;
771            if let Some(state) = completed.remove(&task_id) {
772                // Remove output file
773                if let Err(e) = tokio::fs::remove_file(&state.output_file).await {
774                    debug!("Failed to remove output file for task {}: {}", task_id, e);
775                }
776                cleaned += 1;
777            }
778        }
779
780        cleaned
781    }
782
783    /// Start a background cleanup task that periodically cleans up timed-out tasks
784    pub fn start_cleanup_task(self: Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
785        tokio::spawn(async move {
786            let mut interval_timer = tokio::time::interval(interval);
787            loop {
788                interval_timer.tick().await;
789                let cleaned = self.cleanup_timed_out().await;
790                if cleaned > 0 {
791                    info!("Cleaned up {} timed-out tasks", cleaned);
792                }
793            }
794        })
795    }
796}
797
798// =============================================================================
799// Unit Tests
800// =============================================================================
801
802#[cfg(test)]
803mod tests {
804    use super::*;
805    use std::path::PathBuf;
806    use tempfile::TempDir;
807
808    fn create_test_context() -> ToolContext {
809        ToolContext::new(PathBuf::from("/tmp"))
810            .with_session_id("test-session")
811            .with_user("test-user")
812    }
813
814    fn create_test_manager(temp_dir: &TempDir) -> TaskManager {
815        TaskManager::new()
816            .with_output_directory(temp_dir.path().to_path_buf())
817            .with_max_concurrent(5)
818            .with_max_runtime(Duration::from_secs(60))
819    }
820
821    // TaskStatus Tests
822
823    #[test]
824    fn test_task_status_is_terminal() {
825        assert!(!TaskStatus::Running.is_terminal());
826        assert!(TaskStatus::Completed.is_terminal());
827        assert!(TaskStatus::Failed.is_terminal());
828        assert!(TaskStatus::TimedOut.is_terminal());
829        assert!(TaskStatus::Killed.is_terminal());
830    }
831
832    #[test]
833    fn test_task_status_is_running() {
834        assert!(TaskStatus::Running.is_running());
835        assert!(!TaskStatus::Completed.is_running());
836        assert!(!TaskStatus::Failed.is_running());
837        assert!(!TaskStatus::TimedOut.is_running());
838        assert!(!TaskStatus::Killed.is_running());
839    }
840
841    #[test]
842    fn test_task_status_display() {
843        assert_eq!(TaskStatus::Running.to_string(), "running");
844        assert_eq!(TaskStatus::Completed.to_string(), "completed");
845        assert_eq!(TaskStatus::Failed.to_string(), "failed");
846        assert_eq!(TaskStatus::TimedOut.to_string(), "timed_out");
847        assert_eq!(TaskStatus::Killed.to_string(), "killed");
848    }
849
850    // TaskState Tests
851
852    #[test]
853    fn test_task_state_new() {
854        let state = TaskState::new(
855            "task-123".to_string(),
856            "echo hello".to_string(),
857            PathBuf::from("/tmp/task-123.log"),
858            PathBuf::from("/tmp"),
859            "session-1".to_string(),
860        );
861
862        assert_eq!(state.task_id, "task-123");
863        assert_eq!(state.command, "echo hello");
864        assert_eq!(state.status, TaskStatus::Running);
865        assert!(state.end_time.is_none());
866        assert!(state.exit_code.is_none());
867    }
868
869    #[test]
870    fn test_task_state_mark_completed() {
871        let mut state = TaskState::new(
872            "task-123".to_string(),
873            "echo hello".to_string(),
874            PathBuf::from("/tmp/task-123.log"),
875            PathBuf::from("/tmp"),
876            "session-1".to_string(),
877        );
878
879        state.mark_completed(0);
880        assert_eq!(state.status, TaskStatus::Completed);
881        assert!(state.end_time.is_some());
882        assert_eq!(state.exit_code, Some(0));
883
884        let mut state2 = TaskState::new(
885            "task-456".to_string(),
886            "exit 1".to_string(),
887            PathBuf::from("/tmp/task-456.log"),
888            PathBuf::from("/tmp"),
889            "session-1".to_string(),
890        );
891
892        state2.mark_completed(1);
893        assert_eq!(state2.status, TaskStatus::Failed);
894        assert_eq!(state2.exit_code, Some(1));
895    }
896
897    #[test]
898    fn test_task_state_mark_timed_out() {
899        let mut state = TaskState::new(
900            "task-123".to_string(),
901            "sleep 100".to_string(),
902            PathBuf::from("/tmp/task-123.log"),
903            PathBuf::from("/tmp"),
904            "session-1".to_string(),
905        );
906
907        state.mark_timed_out();
908        assert_eq!(state.status, TaskStatus::TimedOut);
909        assert!(state.end_time.is_some());
910    }
911
912    #[test]
913    fn test_task_state_mark_killed() {
914        let mut state = TaskState::new(
915            "task-123".to_string(),
916            "sleep 100".to_string(),
917            PathBuf::from("/tmp/task-123.log"),
918            PathBuf::from("/tmp"),
919            "session-1".to_string(),
920        );
921
922        state.mark_killed();
923        assert_eq!(state.status, TaskStatus::Killed);
924        assert!(state.end_time.is_some());
925    }
926
927    // TaskManager Tests
928
929    #[test]
930    fn test_task_manager_default() {
931        let manager = TaskManager::new();
932        assert_eq!(manager.max_concurrent(), DEFAULT_MAX_CONCURRENT);
933        assert_eq!(
934            manager.max_runtime(),
935            Duration::from_secs(DEFAULT_MAX_RUNTIME_SECS)
936        );
937    }
938
939    #[test]
940    fn test_task_manager_builder() {
941        let manager = TaskManager::new()
942            .with_max_concurrent(20)
943            .with_max_runtime(Duration::from_secs(3600))
944            .with_output_directory(PathBuf::from("/custom/output"));
945
946        assert_eq!(manager.max_concurrent(), 20);
947        assert_eq!(manager.max_runtime(), Duration::from_secs(3600));
948    }
949
950    #[tokio::test]
951    async fn test_task_manager_running_count() {
952        let manager = TaskManager::new();
953        assert_eq!(manager.running_count().await, 0);
954    }
955
956    #[tokio::test]
957    async fn test_start_simple_task() {
958        let temp_dir = TempDir::new().unwrap();
959        let manager = create_test_manager(&temp_dir);
960        let context = create_test_context();
961
962        let command = "echo hello";
963
964        let result = manager.start(command, &context).await;
965        assert!(result.is_ok());
966
967        let task_id = result.unwrap();
968        assert!(!task_id.is_empty());
969
970        // Wait a bit for the task to complete
971        tokio::time::sleep(Duration::from_millis(500)).await;
972
973        // Check status
974        let status = manager.get_status(&task_id).await;
975        assert!(status.is_some());
976    }
977
978    #[tokio::test]
979    async fn test_start_task_concurrent_limit() {
980        let temp_dir = TempDir::new().unwrap();
981        let manager = TaskManager::new()
982            .with_output_directory(temp_dir.path().to_path_buf())
983            .with_max_concurrent(1)
984            .with_max_runtime(Duration::from_secs(60));
985        let context = create_test_context();
986
987        let command = if cfg!(target_os = "windows") {
988            "timeout /t 10"
989        } else {
990            "sleep 10"
991        };
992
993        // Start first task
994        let result1 = manager.start(command, &context).await;
995        assert!(result1.is_ok());
996
997        // Try to start second task - should fail due to limit
998        let result2 = manager.start(command, &context).await;
999        assert!(result2.is_err());
1000
1001        // Clean up
1002        let _ = manager.kill_all().await;
1003    }
1004
1005    #[tokio::test]
1006    async fn test_get_status_not_found() {
1007        let manager = TaskManager::new();
1008        let status = manager.get_status("nonexistent-task").await;
1009        assert!(status.is_none());
1010    }
1011
1012    #[tokio::test]
1013    async fn test_get_output_not_found() {
1014        let manager = TaskManager::new();
1015        let result = manager.get_output("nonexistent-task", None).await;
1016        assert!(result.is_err());
1017        assert!(matches!(result.unwrap_err(), ToolError::NotFound(_)));
1018    }
1019
1020    #[tokio::test]
1021    async fn test_get_output_with_lines() {
1022        let temp_dir = TempDir::new().unwrap();
1023        let manager = create_test_manager(&temp_dir);
1024        let context = create_test_context();
1025
1026        // Create a task that outputs multiple lines
1027        let command = if cfg!(target_os = "windows") {
1028            "echo line1 && echo line2 && echo line3"
1029        } else {
1030            "echo line1; echo line2; echo line3"
1031        };
1032
1033        let task_id = manager.start(command, &context).await.unwrap();
1034
1035        // Wait for completion
1036        tokio::time::sleep(Duration::from_millis(500)).await;
1037
1038        // Get last 2 lines
1039        let output = manager.get_output(&task_id, Some(2)).await;
1040        assert!(output.is_ok());
1041    }
1042
1043    #[tokio::test]
1044    async fn test_kill_task() {
1045        let temp_dir = TempDir::new().unwrap();
1046        let manager = create_test_manager(&temp_dir);
1047        let context = create_test_context();
1048
1049        let command = if cfg!(target_os = "windows") {
1050            "timeout /t 60"
1051        } else {
1052            "sleep 60"
1053        };
1054
1055        let task_id = manager.start(command, &context).await.unwrap();
1056
1057        // Kill the task
1058        let result = manager.kill(&task_id).await;
1059        assert!(result.is_ok());
1060
1061        // Check status
1062        let status = manager.get_status(&task_id).await;
1063        assert!(status.is_some());
1064        assert_eq!(status.unwrap().status, TaskStatus::Killed);
1065    }
1066
1067    #[tokio::test]
1068    async fn test_kill_nonexistent_task() {
1069        let manager = TaskManager::new();
1070        let result = manager.kill("nonexistent-task").await;
1071        assert!(result.is_err());
1072        assert!(matches!(result.unwrap_err(), ToolError::NotFound(_)));
1073    }
1074
1075    #[tokio::test]
1076    async fn test_list_tasks() {
1077        let temp_dir = TempDir::new().unwrap();
1078        let manager = create_test_manager(&temp_dir);
1079        let context = create_test_context();
1080
1081        // Start a task
1082        let command = "echo hello";
1083        let _ = manager.start(command, &context).await.unwrap();
1084
1085        // Wait a bit
1086        tokio::time::sleep(Duration::from_millis(100)).await;
1087
1088        // List tasks
1089        let tasks = manager.list_tasks().await;
1090        assert!(!tasks.is_empty());
1091    }
1092
1093    #[tokio::test]
1094    async fn test_task_exists() {
1095        let temp_dir = TempDir::new().unwrap();
1096        let manager = create_test_manager(&temp_dir);
1097        let context = create_test_context();
1098
1099        let task_id = manager.start("echo hello", &context).await.unwrap();
1100
1101        assert!(manager.task_exists(&task_id).await);
1102        assert!(!manager.task_exists("nonexistent").await);
1103    }
1104
1105    #[tokio::test]
1106    async fn test_kill_all() {
1107        let temp_dir = TempDir::new().unwrap();
1108        let manager = create_test_manager(&temp_dir);
1109        let context = create_test_context();
1110
1111        let command = if cfg!(target_os = "windows") {
1112            "timeout /t 60"
1113        } else {
1114            "sleep 60"
1115        };
1116
1117        // Start multiple tasks
1118        let _ = manager.start(command, &context).await.unwrap();
1119        let _ = manager.start(command, &context).await.unwrap();
1120
1121        // Kill all
1122        let killed = manager.kill_all().await;
1123        assert_eq!(killed, 2);
1124
1125        // Verify no running tasks
1126        assert_eq!(manager.running_count().await, 0);
1127    }
1128}