tcrm_task/tasks/async_tokio/
spawner.rs

1use std::sync::Arc;
2use std::time::Duration;
3use tokio::sync::{Mutex, RwLock, mpsc, oneshot};
4use tokio::task::JoinHandle;
5use tokio::time::{Instant, timeout};
6
7use crate::tasks::error::TaskError;
8use crate::tasks::event::TaskTerminateReason;
9use crate::tasks::{config::TaskConfig, state::TaskState};
10
11/// Information about a running or completed task
12///
13/// Provides metadata about the task execution including timing, state, and lifecycle information.
14///
15/// # Examples
16///
17/// ```rust
18/// use tcrm_task::tasks::async_tokio::spawner::{TaskSpawner, TaskInfo};
19/// use tcrm_task::tasks::config::TaskConfig;
20///
21/// #[tokio::main]
22/// async fn main() {
23///     let config = TaskConfig::new("cmd").args(["/C", "echo", "hello"]);
24///     let spawner = TaskSpawner::new("test".to_string(), config);
25///     
26///     let info: TaskInfo = spawner.get_task_info().await;
27///     println!("Task {} is in state {:?}", info.name, info.state);
28/// }
29/// ```
30#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
31#[derive(Debug, Clone)]
32pub struct TaskInfo {
33    /// Name of the task
34    pub name: String,
35    /// Current execution state
36    pub state: TaskState,
37    /// How long the task has been running
38    pub uptime: Duration,
39    /// When the task was created
40    #[cfg_attr(feature = "serde", serde(skip, default = "default_instant"))]
41    pub created_at: Instant,
42    /// When the task finished (if completed)
43    #[cfg_attr(feature = "serde", serde(skip, default))]
44    pub finished_at: Option<Instant>,
45}
46
47#[cfg(feature = "serde")]
48/// Returns the current instant for serde default value.
49fn default_instant() -> Instant {
50    Instant::now()
51}
52
53/// Spawns and manages the lifecycle of a task
54///
55/// `TaskSpawner` handles the execution of system processes with comprehensive
56/// monitoring, state management, and event emission. It provides both
57/// synchronous and asynchronous interfaces for process management.
58///
59/// # Features
60///
61/// - **State Management**: Track task execution through Pending, Running, Ready, and Finished states
62/// - **Event Emission**: Real-time events for output, state changes, and lifecycle events
63/// - **Timeout Handling**: Automatic termination when tasks exceed configured timeouts
64/// - **Stdin Support**: Send input to running processes when enabled
65/// - **Ready Detection**: Automatic detection when long-running processes are ready
66/// - **Process Control**: Start, stop, and terminate processes with proper cleanup
67///
68/// # Examples
69///
70/// ## Simple Command Execution
71/// ```rust
72/// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
73/// use tokio::sync::mpsc;
74///
75/// #[tokio::main]
76/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
77///     let config = TaskConfig::new("cmd")
78///         .args(["/C", "echo", "Hello World"]);
79///
80///     let (tx, mut rx) = mpsc::channel(100);
81///     let mut spawner = TaskSpawner::new("hello".to_string(), config);
82///     
83///     spawner.start_direct(tx).await?;
84///
85///     // Process events
86///     while let Some(event) = rx.recv().await {
87///         println!("Event: {:?}", event);
88///         if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
89///             break;
90///         }
91///     }
92///
93///     Ok(())
94/// }
95/// ```
96///
97/// ## Long-running Process with Ready Detection
98/// ```rust
99/// use tcrm_task::tasks::{config::{TaskConfig, StreamSource}, async_tokio::spawner::TaskSpawner};
100/// use tokio::sync::mpsc;
101///
102/// #[tokio::main]
103/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
104///     let config = TaskConfig::new("cmd")
105///         .args(["/C", "echo", "Server listening"])
106///         .ready_indicator("Server listening")
107///         .ready_indicator_source(StreamSource::Stdout)
108///         .timeout_ms(30000);
109///
110///     let (tx, mut rx) = mpsc::channel(100);
111///     let mut spawner = TaskSpawner::new("server".to_string(), config);
112///     
113///     spawner.start_direct(tx).await?;
114///
115///     // Wait for ready event
116///     while let Some(event) = rx.recv().await {
117///         if matches!(event, tcrm_task::tasks::event::TaskEvent::Ready { .. }) {
118///             println!("Server is ready to accept requests!");
119///             // Server is now ready, can start sending requests
120///             break;
121///         }
122///     }
123///
124///     Ok(())
125/// }
126/// ```
127///
128/// ## Interactive Process with Stdin
129/// ```rust
130/// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
131/// use tokio::sync::mpsc;
132///
133/// #[tokio::main]
134/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
135///     let config = TaskConfig::new("cmd")
136///         .args(["/C", "echo", "Hello"])
137///         .enable_stdin(true);
138///
139///     let (tx, mut rx) = mpsc::channel(100);
140///     let (stdin_tx, stdin_rx) = mpsc::channel(10);
141///     let mut spawner = TaskSpawner::new("cmd".to_string(), config);
142///     
143///     // Set up stdin channel - note: set_stdin consumes self and returns Self
144///     spawner = spawner.set_stdin(stdin_rx);
145///     
146///     spawner.start_direct(tx).await?;
147///
148///     // Send input to the process
149///     stdin_tx.send("print('Hello from stdin!')".to_string()).await?;
150///     stdin_tx.send("exit()".to_string()).await?;
151///
152///     // Process events
153///     while let Some(event) = rx.recv().await {
154///         match event {
155///             tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
156///                 println!("Output: {}", line);
157///             }
158///             tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
159///             _ => {}
160///         }
161///     }
162///
163///     Ok(())
164/// }
165/// ```
166#[derive(Debug)]
167pub struct TaskSpawner {
168    pub(crate) config: TaskConfig,
169    pub(crate) task_name: String,
170    pub(crate) state: Arc<RwLock<TaskState>>,
171    pub(crate) terminate_tx: Arc<Mutex<Option<oneshot::Sender<TaskTerminateReason>>>>,
172    pub(crate) process_id: Arc<RwLock<Option<u32>>>,
173    pub(crate) created_at: Instant,
174    pub(crate) finished_at: Arc<RwLock<Option<Instant>>>,
175    pub(crate) stdin_rx: Option<mpsc::Receiver<String>>,
176}
177
178impl TaskSpawner {
179    /// Create a new task spawner for the given task name and configuration
180    ///
181    /// Creates a new `TaskSpawner` instance in the Pending state. The configuration
182    /// is not validated until `start_direct` is called.
183    ///
184    /// # Arguments
185    ///
186    /// * `task_name` - Unique identifier for this task instance
187    /// * `config` - Task configuration defining command, arguments, environment, etc.
188    ///
189    /// # Examples
190    /// ```rust
191    /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
192    ///
193    /// let config = TaskConfig::new("echo").args(["hello"]);
194    /// let spawner = TaskSpawner::new("my-task".to_string(), config);
195    /// ```
196    #[must_use]
197    pub fn new(task_name: String, config: TaskConfig) -> Self {
198        Self {
199            task_name,
200            config,
201            state: Arc::new(RwLock::new(TaskState::Pending)),
202            terminate_tx: Arc::new(Mutex::new(None)),
203            process_id: Arc::new(RwLock::new(None)),
204            created_at: Instant::now(),
205            finished_at: Arc::new(RwLock::new(None)),
206            stdin_rx: None,
207        }
208    }
209
210    /// Set the stdin receiver for the task, enabling asynchronous input
211    ///
212    /// Configures a channel for sending input to the process stdin. This method
213    /// has no effect if `enable_stdin` is false in the task configuration.
214    ///
215    /// # Arguments
216    ///
217    /// * `stdin_rx` - Receiver channel for stdin input strings
218    ///
219    /// # Examples
220    /// ```rust
221    /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
222    /// use tokio::sync::mpsc;
223    ///
224    /// let config = TaskConfig::new("python")
225    ///     .args(["-i"])
226    ///     .enable_stdin(true);
227    ///
228    /// let (stdin_tx, stdin_rx) = mpsc::channel(10);
229    /// let spawner = TaskSpawner::new("interactive".to_string(), config)
230    ///     .set_stdin(stdin_rx);
231    /// ```
232    #[must_use]
233    pub fn set_stdin(mut self, stdin_rx: mpsc::Receiver<String>) -> Self {
234        if self.config.enable_stdin.unwrap_or_default() {
235            self.stdin_rx = Some(stdin_rx);
236        }
237        self
238    }
239
240    /// Get the current state of the task
241    ///
242    /// Returns the current execution state of the task. States progress through:
243    /// Pending → Initiating → Running → (Ready) → Finished
244    ///
245    /// # Examples
246    /// ```rust
247    /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner, state::TaskState};
248    ///
249    /// #[tokio::main]
250    /// async fn main() {
251    ///     let config = TaskConfig::new("echo");
252    ///     let spawner = TaskSpawner::new("test".to_string(), config);
253    ///     
254    ///     assert_eq!(spawner.get_state().await, TaskState::Pending);
255    /// }
256    /// ```
257    pub async fn get_state(&self) -> TaskState {
258        self.state.read().await.clone()
259    }
260
261    /// Check if the task is currently running
262    ///
263    /// Returns true if the task state is Running, false otherwise.
264    ///
265    /// # Examples
266    /// ```rust
267    /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
268    ///
269    /// #[tokio::main]
270    /// async fn main() {
271    ///     let config = TaskConfig::new("echo");
272    ///     let spawner = TaskSpawner::new("test".to_string(), config);
273    ///     
274    ///     assert!(!spawner.is_running().await); // Not running initially
275    /// }
276    /// ```
277    pub async fn is_running(&self) -> bool {
278        let state = self.state.read().await.clone();
279        state == TaskState::Running
280    }
281
282    /// Check if the task is currently ready
283    ///
284    /// Returns true if the task state is Ready, false otherwise.
285    /// The Ready state indicates a long-running process has signaled it's
286    /// ready to accept requests (via the ready indicator).
287    ///
288    /// # Examples
289    /// ```rust
290    /// use tcrm_task::tasks::{config::{TaskConfig, StreamSource}, async_tokio::spawner::TaskSpawner};
291    ///
292    /// #[tokio::main]
293    /// async fn main() {
294    ///     let config = TaskConfig::new("my-server")
295    ///         .ready_indicator("Server ready")
296    ///         .ready_indicator_source(StreamSource::Stdout);
297    ///     let spawner = TaskSpawner::new("server".to_string(), config);
298    ///     
299    ///     assert!(!spawner.is_ready().await); // Not ready initially
300    /// }
301    /// ```
302    pub async fn is_ready(&self) -> bool {
303        let state = self.state.read().await.clone();
304        state == TaskState::Ready
305    }
306
307    /// Get the uptime of the task since creation
308    ///
309    /// Returns the duration since the `TaskSpawner` was created, regardless
310    /// of the current execution state.
311    ///
312    /// # Examples
313    /// ```rust
314    /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
315    /// use std::time::Duration;
316    ///
317    /// #[tokio::main]
318    /// async fn main() {
319    ///     let config = TaskConfig::new("echo");
320    ///     let spawner = TaskSpawner::new("test".to_string(), config);
321    ///     
322    ///     let uptime = spawner.uptime();
323    ///     assert!(uptime < Duration::from_secs(1)); // Just created
324    /// }
325    /// ```
326    #[must_use]
327    pub fn uptime(&self) -> Duration {
328        self.created_at.elapsed()
329    }
330
331    /// Get comprehensive information about the task
332    ///
333    /// Returns a `TaskInfo` struct containing the task name, current state,
334    /// uptime, creation time, and completion time (if finished).
335    ///
336    /// # Examples
337    /// ```rust
338    /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
339    ///
340    /// #[tokio::main]
341    /// async fn main() {
342    ///     let config = TaskConfig::new("echo").args(["hello"]);
343    ///     let spawner = TaskSpawner::new("info-test".to_string(), config);
344    ///     
345    ///     let info = spawner.get_task_info().await;
346    ///     println!("Task '{}' has been running for {:?}", info.name, info.uptime);
347    /// }
348    /// ```
349    pub async fn get_task_info(&self) -> TaskInfo {
350        TaskInfo {
351            name: self.task_name.clone(),
352            state: self.get_state().await,
353            uptime: self.uptime(),
354            created_at: self.created_at,
355            finished_at: *self.finished_at.read().await,
356        }
357    }
358
359    /// Get the process ID of the running task (if any)
360    ///
361    /// Returns the system process ID if the task is currently running,
362    /// or None if the task hasn't started or has finished.
363    ///
364    /// # Examples
365    /// ```rust
366    /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
367    /// use tokio::sync::mpsc;
368    ///
369    /// #[tokio::main]
370    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
371    ///     let config = TaskConfig::new("cmd").args(["/C", "ping", "127.0.0.1", "-n", "2"]);
372    ///     let mut spawner = TaskSpawner::new("pid-test".to_string(), config);
373    ///     
374    ///     assert_eq!(spawner.get_process_id().await, None); // Not started yet
375    ///     
376    ///     let (tx, _rx) = mpsc::channel(100);
377    ///     spawner.start_direct(tx).await?;
378    ///     
379    ///     // Now should have a process ID
380    ///     let pid = spawner.get_process_id().await;
381    ///     assert!(pid.is_some());
382    ///     
383    ///     Ok(())
384    /// }
385    /// ```
386    pub async fn get_process_id(&self) -> Option<u32> {
387        *self.process_id.read().await
388    }
389
390    /// Update the state of the task
391    ///
392    /// Internal method used by the spawner to update task state during execution.
393    pub(crate) async fn update_state(&self, new_state: TaskState) {
394        let mut state = self.state.write().await;
395        *state = new_state;
396    }
397
398    /// Send a termination signal to the running task
399    ///
400    /// Requests graceful termination of the running process with the specified reason.
401    /// The process may take some time to respond to the termination signal.
402    ///
403    /// # Arguments
404    ///
405    /// * `reason` - The reason for termination (Timeout, Cleanup, etc.)
406    ///
407    /// # Returns
408    ///
409    /// - `Ok(())` if the termination signal was sent successfully
410    /// - `Err(TaskError::Channel)` if the signal could not be sent
411    ///
412    /// # Errors
413    ///
414    /// Returns a [`TaskError::Channel`] if the internal termination channel
415    /// has been closed and the signal cannot be delivered to the task.
416    ///
417    /// # Examples
418    /// ```rust
419    /// use tcrm_task::tasks::{
420    ///     config::TaskConfig,
421    ///     async_tokio::spawner::TaskSpawner,
422    ///     event::TaskTerminateReason
423    /// };
424    /// use tokio::sync::mpsc;
425    ///
426    /// #[tokio::main]
427    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
428    ///     let config = TaskConfig::new("cmd").args(["/C", "ping", "127.0.0.1", "-n", "10"]); // Long-running task
429    ///     let mut spawner = TaskSpawner::new("terminate-test".to_string(), config);
430    ///     
431    ///     let (tx, mut rx) = mpsc::channel(100);
432    ///     spawner.start_direct(tx).await?;
433    ///     
434    ///     // Wait a bit, then terminate
435    ///     tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
436    ///     spawner.send_terminate_signal(TaskTerminateReason::Cleanup).await?;
437    ///     
438    ///     // Process events until stopped
439    ///     while let Some(event) = rx.recv().await {
440    ///         if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
441    ///             break;
442    ///         }
443    ///     }
444    ///     
445    ///     Ok(())
446    /// }
447    /// ```
448    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
449    pub async fn send_terminate_signal(
450        &self,
451        reason: TaskTerminateReason,
452    ) -> Result<(), TaskError> {
453        if let Some(tx) = self.terminate_tx.lock().await.take() {
454            if tx.send(reason.clone()).is_err() {
455                let msg = "Terminate channel closed while sending signal";
456                #[cfg(feature = "tracing")]
457                tracing::warn!(terminate_reason=?reason, msg);
458                return Err(TaskError::Channel(msg.to_string()));
459            }
460        } else {
461            let msg = "Terminate signal already sent or channel missing";
462            #[cfg(feature = "tracing")]
463            tracing::warn!(msg);
464            return Err(TaskError::Channel(msg.to_string()));
465        }
466
467        Ok(())
468    }
469}
470
471/// Waits for all spawned task handles to complete, with a timeout
472///
473/// Returns an error if any handle fails or times out
474pub(crate) async fn join_all_handles(
475    task_handles: &mut Vec<JoinHandle<()>>,
476) -> Result<(), TaskError> {
477    if task_handles.is_empty() {
478        return Ok(());
479    }
480
481    let handles = std::mem::take(task_handles);
482    let mut errors = Vec::new();
483
484    for mut handle in handles {
485        match timeout(Duration::from_secs(5), &mut handle).await {
486            Ok(Ok(())) => {}
487            Ok(Err(join_err)) => {
488                let err_msg = format!("Handle [{}] join failed: {:?}", handle.id(), join_err);
489
490                errors.push(err_msg);
491            }
492            Err(_) => {
493                let err_msg = format!("Handle [{}] join timeout, aborting", handle.id());
494                handle.abort(); // ensure it’s killed
495                errors.push(err_msg);
496            }
497        }
498    }
499
500    if !errors.is_empty() {
501        return Err(TaskError::Handle(format!(
502            "Multiple task handles join failures: {}",
503            errors.join("; ")
504        )));
505    }
506
507    Ok(())
508}
509#[cfg(test)]
510mod tests {
511    use std::time::Duration;
512    use tokio::sync::mpsc;
513    use tokio::time::sleep;
514
515    use crate::tasks::{
516        async_tokio::spawner::{TaskInfo, TaskSpawner},
517        config::TaskConfig,
518        error::TaskError,
519        event::TaskTerminateReason,
520        state::TaskState,
521    };
522
523    #[tokio::test]
524    async fn task_spawner_is_running_returns_true_when_state_running() {
525        let config = TaskConfig::new("echo");
526        let spawner = TaskSpawner::new("running_task".to_string(), config);
527        assert!(
528            !spawner.is_running().await,
529            "Should not be running initially"
530        );
531        spawner.update_state(TaskState::Running).await;
532        assert!(spawner.is_running().await, "Should be running after update");
533    }
534
535    #[tokio::test]
536    async fn task_spawner_is_ready_returns_true_when_state_ready() {
537        let config = TaskConfig::new("echo");
538        let spawner = TaskSpawner::new("ready_task".to_string(), config);
539        assert!(!spawner.is_ready().await, "Should not be ready initially");
540        spawner.update_state(TaskState::Ready).await;
541        assert!(spawner.is_ready().await, "Should be ready after update");
542    }
543
544    #[tokio::test]
545    async fn task_spawner_initial_state_is_pending() {
546        let config = TaskConfig::new("echo");
547        let spawner = TaskSpawner::new("pending_task".to_string(), config);
548        let state = spawner.get_state().await;
549        assert_eq!(state, TaskState::Pending, "Initial state should be Pending");
550    }
551
552    #[tokio::test]
553    async fn task_spawner_update_state_changes_state() {
554        let config = TaskConfig::new("echo");
555        let spawner = TaskSpawner::new("update_task".to_string(), config);
556        spawner.update_state(TaskState::Running).await;
557        let state = spawner.get_state().await;
558        assert_eq!(
559            state,
560            TaskState::Running,
561            "State should be Running after update"
562        );
563    }
564
565    #[tokio::test]
566    async fn task_spawner_uptime_increases_over_time() {
567        let config = TaskConfig::new("echo");
568        let spawner = TaskSpawner::new("uptime_task".to_string(), config);
569        let uptime1 = spawner.uptime();
570        sleep(Duration::from_millis(20)).await;
571        let uptime2 = spawner.uptime();
572        assert!(uptime2 > uptime1, "Uptime should increase after sleep");
573    }
574
575    #[tokio::test]
576    async fn task_spawner_get_task_info_returns_correct_info() {
577        let config = TaskConfig::new("echo");
578        let spawner = TaskSpawner::new("info_task".to_string(), config);
579        let info: TaskInfo = spawner.get_task_info().await;
580        assert_eq!(info.name, "info_task");
581        assert_eq!(info.state, TaskState::Pending);
582        assert!(info.uptime >= Duration::ZERO);
583    }
584
585    #[tokio::test]
586    async fn task_spawner_process_id_initially_none() {
587        let config = TaskConfig::new("echo");
588        let spawner = TaskSpawner::new("process_id_task".to_string(), config);
589        assert_eq!(spawner.get_process_id().await, None);
590    }
591
592    #[tokio::test]
593    async fn task_spawner_stdin_disabled_ignores_channel() {
594        let config = TaskConfig::new("echo").enable_stdin(false);
595        let (_, rx) = mpsc::channel(100);
596
597        let spawner = TaskSpawner::new("no_stdin".to_string(), config).set_stdin(rx);
598        assert!(spawner.stdin_rx.is_none());
599    }
600
601    #[tokio::test]
602    async fn task_spawner_send_terminate_signal_no_channel() {
603        let config = TaskConfig::new("echo");
604        let spawner = TaskSpawner::new("no_channel".to_string(), config);
605
606        let result = spawner
607            .send_terminate_signal(TaskTerminateReason::Cleanup)
608            .await;
609        assert!(result.is_err());
610        if let Err(TaskError::Channel(msg)) = result {
611            assert_eq!(msg, "Terminate signal already sent or channel missing");
612        } else {
613            panic!("Expected Channel error");
614        }
615    }
616
617    // TODO: Consider adding serde support for TaskInfo, not skipping Instant fields
618    #[cfg(feature = "serde")]
619    #[tokio::test]
620    async fn task_info_serde() {
621        use serde_json;
622
623        let config = TaskConfig::new("echo");
624        let spawner = TaskSpawner::new("serde_task".to_string(), config);
625        let info = spawner.get_task_info().await;
626
627        // This should work even with Instant fields skipped
628        let serialized = serde_json::to_string(&info).unwrap();
629        println!("Serialized TaskInfo: {}", serialized);
630        assert!(serialized.contains("serde_task"));
631        assert!(serialized.contains("Pending"));
632    }
633}