tcrm_task/tasks/tokio/coordinate/
start.rs

1use tokio::sync::oneshot;
2
3use crate::tasks::{
4    config::StreamSource, error::TaskError, event::TaskTerminateReason, state::TaskState,
5    tokio::executor::TaskExecutor,
6};
7
8impl TaskExecutor {
9    /// Start execution in a coordinated async event loop.  
10    ///
11    /// This is the main execution method that spawns the process, sets up event monitoring,
12    /// and manages the complete process lifecycle. It handles stdout/stderr streaming,
13    /// timeout management, termination signals, and process cleanup in a coordinated
14    /// async event loop using `tokio::select!``.
15    ///
16    /// # Arguments
17    ///
18    /// * `event_tx` - Channel sender for emitting [`TaskEvent`]s during process execution
19    ///
20    /// # Returns
21    ///
22    /// * `Ok(())` - Process coordination started successfully
23    /// * `Err(TaskError)` - Configuration validation or process spawning failed
24    ///
25    /// # Errors
26    ///
27    /// Returns [`TaskError`] for:
28    /// - [`TaskError::InvalidConfiguration`] - Configuration validation failed
29    /// - [`TaskError::IO`] - Process spawning failed
30    /// - [`TaskError::Handle`] - Process handle or watcher setup failed
31    ///
32    /// # Events Emitted
33    ///
34    /// During execution, the following events are sent via `event_tx`:
35    /// - [`TaskEvent::Started`] - Process spawned successfully
36    /// - [`TaskEvent::Output`] - Lines from stdout/stderr
37    /// - [`TaskEvent::Ready`] - Ready indicator detected (if configured)
38    /// - [`TaskEvent::Stopped`] - Process completed with exit code
39    /// - [`TaskEvent::Error`] - Errors during execution
40    ///
41    /// # Examples
42    ///
43    /// ## Basic Usage
44    /// ```rust
45    /// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
46    /// use tokio::sync::mpsc;
47    ///
48    /// #[tokio::main]
49    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
50    ///     #[cfg(windows)]
51    ///     let config = TaskConfig::new("cmd").args(["/C", "echo", "test"]);
52    ///     #[cfg(unix)]
53    ///     let config = TaskConfig::new("echo").args(["test"]);
54    ///     
55    ///     config.validate()?;
56    ///     
57    ///     let (tx, mut rx) = mpsc::channel(100);
58    ///     let mut executor = TaskExecutor::new(config, tx);
59    ///     
60    ///     // Start coordination - returns immediately, process runs in background
61    ///     executor.coordinate_start().await?;
62    ///     
63    ///     // Process events until completion
64    ///     while let Some(event) = rx.recv().await {
65    ///         println!("Event: {:?}", event);
66    ///         if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
67    ///             break;
68    ///         }
69    ///     }
70    ///     
71    ///     Ok(())
72    /// }
73    /// ```
74    ///
75    /// ## With Ready Indicator
76    /// ```rust
77    /// use tcrm_task::tasks::{
78    ///     config::{TaskConfig, StreamSource},
79    ///     tokio::executor::TaskExecutor,
80    ///     event::TaskEvent
81    /// };
82    /// use tokio::sync::mpsc;
83    ///
84    /// #[tokio::main]
85    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
86    ///     #[cfg(windows)]
87    ///     let config = TaskConfig::new("cmd")
88    ///         .args(["/C", "echo", "Server ready"])
89    ///         .ready_indicator("Server ready")
90    ///         .ready_indicator_source(StreamSource::Stdout);
91    ///     
92    ///     #[cfg(unix)]
93    ///     let config = TaskConfig::new("echo")
94    ///         .args(["Server ready"])
95    ///         .ready_indicator("Server ready")
96    ///         .ready_indicator_source(StreamSource::Stdout);
97    ///     
98    ///     let (tx, mut rx) = mpsc::channel(100);
99    ///     let mut executor = TaskExecutor::new(config, tx);
100    ///     
101    ///     executor.coordinate_start().await?;
102    ///     
103    ///     while let Some(event) = rx.recv().await {
104    ///         match event {
105    ///             TaskEvent::Ready => {
106    ///                 println!("Process is ready!");
107    ///                 // Can now interact with the running process
108    ///             }
109    ///             TaskEvent::Stopped { .. } => break,
110    ///             _ => {}
111    ///         }
112    ///     }
113    ///     
114    ///     Ok(())
115    /// }
116    /// ```
117    pub async fn coordinate_start(&mut self) -> Result<(), TaskError> {
118        let event_tx = match self.shared_context.get_event_tx().await {
119            Some(tx) => tx,
120            None => {
121                let msg = "Event channel sender not available".to_string();
122                #[cfg(feature = "tracing")]
123                tracing::error!("{}", msg);
124                return Err(TaskError::Channel(msg));
125            }
126        };
127
128        Self::update_state(&self.shared_context, TaskState::Initiating);
129
130        self.validate_config(&event_tx).await?;
131
132        let cmd = self.setup_command();
133
134        #[cfg(feature = "process-group")]
135        let cmd = self.setup_process_group(cmd).await?;
136
137        let mut child = self.spawn_child(cmd, &event_tx).await?;
138        self.store_stdin(&mut child, &event_tx).await?;
139
140        let (mut stdout, mut stderr) = self.take_std_output_reader(&mut child, &event_tx).await?;
141        let (terminate_tx, mut terminate_rx) = oneshot::channel::<TaskTerminateReason>();
142        self.terminate_tx = Some(terminate_tx);
143
144        let (internal_terminate_tx, mut internal_terminate_rx) =
145            oneshot::channel::<TaskTerminateReason>();
146        self.shared_context
147            .set_internal_terminate_tx(internal_terminate_tx)
148            .await;
149
150        let shared_context = self.shared_context.clone();
151
152        tokio::spawn(async move {
153            let mut process_exited = false;
154            let mut termination_requested = false;
155            let mut stdout_eof = false;
156            let mut stderr_eof = false;
157
158            // Create timeout future once outside the loop to prevent reset on each iteration.
159            // This fixes a bug where the timeout would never trigger if other select!
160            // branches (stdout/stderr) were frequently selected, causing the timeout future
161            // to be dropped and recreated on each iteration.
162            let mut timeout_future = Self::create_timeout_future(shared_context.clone());
163            let mut timeout_triggered = false;
164
165            loop {
166                // Exit conditions
167                if process_exited && stdout_eof && stderr_eof {
168                    break;
169                }
170
171                // Force exit if termination was requested and streams are taking too long
172                if termination_requested && stdout_eof && stderr_eof {
173                    break;
174                }
175                tokio::select! {
176                    line = stdout.next_line(), if !stdout_eof =>
177                        stdout_eof = Self::handle_output(shared_context.clone(), line, &event_tx, StreamSource::Stdout).await,
178                    line = stderr.next_line(), if !stderr_eof =>
179                        stderr_eof = Self::handle_output(shared_context.clone(), line, &event_tx, StreamSource::Stderr).await,
180
181                    _ = &mut timeout_future, if !timeout_triggered => {
182                        timeout_triggered = true;
183                        Self::handle_timeout(shared_context.clone()).await;
184                    },
185
186                    reason = Self::await_oneshot(&mut terminate_rx, termination_requested) =>
187                        Self::handle_terminate(shared_context.clone(), &mut child, reason, &mut termination_requested, &event_tx).await,
188                    reason = Self::await_oneshot(&mut internal_terminate_rx, termination_requested) =>
189                        Self::handle_terminate(shared_context.clone(), &mut child, reason, &mut termination_requested, &event_tx).await,
190
191                    result = child.wait() => Self::handle_wait_result(shared_context.clone(), result,&mut process_exited).await,
192                }
193            }
194            Self::handle_result(shared_context.clone(), &event_tx).await;
195        });
196        Ok(())
197    }
198    async fn await_oneshot<T>(
199        rx: &mut oneshot::Receiver<T>,
200        termination_requested: bool,
201    ) -> Result<T, oneshot::error::RecvError> {
202        if termination_requested {
203            std::future::pending().await
204        } else {
205            rx.await
206        }
207    }
208}