tcrm_task/tasks/tokio/
process.rs

1use tokio::{
2    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines},
3    process::{Child, ChildStderr, ChildStdout, Command},
4};
5
6use crate::tasks::{
7    control::TaskStatusInfo,
8    error::TaskError,
9    event::{TaskEvent, TaskStopReason},
10    state::TaskState,
11    tokio::executor::TaskExecutor,
12};
13
14impl TaskExecutor {
15    /// Tries to store the process ID from a spawned child process.
16    ///
17    /// Validates that a process ID was successfully obtained and stores it
18    /// in the shared context for later use in process management.
19    ///
20    /// # Arguments
21    ///
22    /// * `pid` - Optional process ID from the spawned child
23    ///
24    /// # Returns
25    ///
26    /// * `Ok(u32)` - The validated process ID
27    /// * `Err(TaskError)` - If no process ID was provided
28    ///
29    /// # Errors
30    ///
31    /// Returns [`TaskError::Handle`] if the process ID is None
32    pub(crate) fn try_store_process_id(&self, pid: Option<u32>) -> Result<u32, TaskError> {
33        let Some(pid) = pid else {
34            let msg = "Failed to get process id";
35
36            #[cfg(feature = "tracing")]
37            tracing::error!(msg);
38
39            let error = TaskError::Handle(msg.to_string());
40            return Err(error);
41        };
42        self.shared_context.set_process_id(pid);
43        Ok(pid)
44    }
45    /// Spawns a child process and handles the result.
46    ///
47    /// Attempts to spawn the configured command and stores the process ID.
48    /// If spawning fails, appropriate error events are sent.
49    ///
50    /// On Windows with process groups enabled, spawns the process in a suspended state,
51    /// assigns it to the job object, then resumes it to avoid the race condition where
52    /// child processes could escape the job object.
53    ///
54    /// # Arguments
55    ///
56    /// * `cmd` - The configured command to spawn
57    /// * `event_tx` - Channel for sending task events
58    ///
59    /// # Returns
60    ///
61    /// * `Ok(Child)` - The spawned child process
62    /// * `Err(TaskError)` - If spawning fails
63    ///
64    /// # Errors
65    ///
66    /// Returns [`TaskError::IO`] if process spawning fails
67    pub(crate) async fn spawn_child(&mut self, mut cmd: Command) -> Result<Child, TaskError> {
68        let use_pg = self
69            .shared_context
70            .config
71            .use_process_group
72            .unwrap_or_default();
73
74        match cmd.spawn() {
75            Ok(mut child) => {
76                let pid = match self.try_store_process_id(child.id()) {
77                    Ok(pid) => pid,
78                    Err(e) => {
79                        let _ = child.kill().await;
80
81                        self.send_error_event_and_stop(e.clone()).await;
82                        return Err(e);
83                    }
84                };
85
86                // Assign the child process to the process group if enabled
87                #[cfg(feature = "process-group")]
88                if use_pg {
89                    let result = self.shared_context.group.lock().await.assign_child(pid);
90                    if let Err(e) = result {
91                        let msg = format!("Failed to add process to group: {}", e);
92
93                        #[cfg(feature = "tracing")]
94                        tracing::error!(error=%e, "Failed to add process to group");
95
96                        let _ = child.kill().await;
97
98                        let error = TaskError::Handle(msg);
99                        self.send_error_event_and_stop(error.clone()).await;
100                        return Err(error);
101                    }
102                    // Resume the process on Windows if it was suspended
103                    #[cfg(all(windows, feature = "process-group"))]
104                    {
105                        let mut error = None;
106                        {
107                            let group = self.shared_context.group.lock().await;
108                            let result = group.resume_process(pid);
109                            if let Err(e) = result {
110                                let msg = format!("Failed to resume process: {}", e);
111
112                                #[cfg(feature = "tracing")]
113                                tracing::error!(error=%e, "Failed to resume process");
114
115                                let _ = child.kill().await;
116
117                                error = Some(TaskError::Handle(msg));
118                            }
119                        }
120
121                        if let Some(error) = error {
122                            self.send_error_event_and_stop(error.clone()).await;
123                            return Err(error);
124                        }
125                    }
126                }
127
128                let time = Self::update_state(&self.shared_context, TaskState::Running);
129                let _ = self
130                    .shared_context
131                    .send_event(TaskEvent::Started {
132                        process_id: pid,
133                        created_at: self.get_create_at(),
134                        running_at: time,
135                    })
136                    .await;
137                Ok(child)
138            }
139            Err(e) => {
140                let msg = format!("Failed to spawn child process: {}", e);
141                #[cfg(feature = "tracing")]
142                tracing::error!(error = %e, "Failed to spawn child process");
143                let error = TaskError::IO(msg);
144                let time = Self::update_state(&self.shared_context, TaskState::Finished);
145                let error_event = TaskEvent::Error {
146                    error: error.clone(),
147                };
148
149                let _ = self.shared_context.send_event(error_event).await;
150
151                let finish_event = TaskEvent::Stopped {
152                    exit_code: None,
153                    finished_at: time,
154                    reason: TaskStopReason::Error(error.clone()),
155                    #[cfg(unix)]
156                    signal: None,
157                };
158                let _ = self.shared_context.send_event(finish_event).await;
159
160                Err(error)
161            }
162        }
163    }
164    /// Takes stdout and stderr readers from a child process.
165    ///
166    /// Extracts the stdout and stderr streams from the child process and
167    /// converts them into line readers for processing output.
168    ///
169    /// # Arguments
170    ///
171    /// * `child` - The child process to extract streams from
172    /// * `event_tx` - Channel for sending error events if extraction fails
173    ///
174    /// # Returns
175    ///
176    /// * `Ok((Lines<BufReader<ChildStdout>>, Lines<BufReader<ChildStderr>>))` - The stdout and stderr line readers
177    /// * `Err(TaskError)` - If stream extraction fails
178    ///
179    /// # Errors
180    ///
181    /// Returns [`TaskError::Handle`] if stdout or stderr streams cannot be taken
182    pub(crate) async fn take_std_output_reader(
183        &mut self,
184        child: &mut Child,
185    ) -> Result<(Lines<BufReader<ChildStdout>>, Lines<BufReader<ChildStderr>>), TaskError> {
186        let stdout = match child.stdout.take() {
187            Some(out) => BufReader::new(out).lines(),
188            None => {
189                let msg = "Failed to take stdout of child process";
190                #[cfg(feature = "tracing")]
191                tracing::error!(msg);
192
193                let error = TaskError::IO(msg.to_string());
194                self.send_error_event_and_stop(error.clone()).await;
195
196                return Err(TaskError::IO(msg.to_string()));
197            }
198        };
199
200        let stderr = match child.stderr.take() {
201            Some(err) => BufReader::new(err).lines(),
202            None => {
203                let msg = "Failed to take stderr of child process";
204                #[cfg(feature = "tracing")]
205                tracing::error!(msg);
206
207                let error = TaskError::IO(msg.to_string());
208                self.send_error_event_and_stop(error.clone()).await;
209
210                return Err(TaskError::IO(msg.to_string()));
211            }
212        };
213
214        Ok((stdout, stderr))
215    }
216
217    /// Stores the stdin handle from a child process for later use.
218    ///
219    /// Extracts and stores the stdin stream from the child process if stdin
220    /// is enabled in the task configuration. This allows sending input to
221    /// the process later via the send_stdin method.
222    ///
223    /// # Arguments
224    ///
225    /// * `child` - The child process to extract stdin from
226    /// * `event_tx` - Channel for sending error events if extraction fails
227    ///
228    /// # Returns
229    ///
230    /// * `Ok(())` - Stdin stored successfully or not required
231    /// * `Err(TaskError)` - If stdin extraction fails when required
232    ///
233    /// # Errors
234    ///
235    /// Returns [`TaskError::Handle`] if stdin cannot be taken from the child process
236    /// when stdin is enabled in the configuration
237    pub(crate) async fn store_stdin(&mut self, child: &mut Child) -> Result<(), TaskError> {
238        if !self.shared_context.config.enable_stdin.unwrap_or_default() {
239            return Ok(());
240        }
241
242        if let Some(stdin) = child.stdin.take() {
243            self.stdin = Some(stdin);
244            Ok(())
245        } else {
246            let msg = "Failed to take stdin out of child process";
247            #[cfg(feature = "tracing")]
248            tracing::error!(msg);
249
250            let error = TaskError::IO(msg.to_string());
251            self.send_error_event_and_stop(error.clone()).await;
252
253            Err(TaskError::IO(msg.to_string()))
254        }
255    }
256    /// Sends input to the process's stdin.
257    ///
258    /// Writes the provided input to the process's stdin stream. The input
259    /// will be automatically terminated with a newline if it doesn't already end with one.
260    ///
261    /// # Arguments
262    ///
263    /// * `input` - The input string to send to stdin
264    ///
265    /// # Returns
266    ///
267    /// * `Ok(())` - If the input was sent successfully
268    /// * `Err(TaskError)` - If sending fails or the task is not running
269    ///
270    /// # Errors
271    ///
272    /// Returns [`TaskError::Control`] if the task is not in a running state,
273    /// or [`TaskError::IO`] if writing to stdin fails
274    pub async fn send_stdin(&mut self, input: impl Into<String>) -> Result<(), TaskError> {
275        let state = self.get_task_state();
276        if !matches!(state, TaskState::Running | TaskState::Ready) {
277            return Err(TaskError::Control(
278                "Cannot send stdin, task is not running".to_string(),
279            ));
280        }
281        let mut input: String = input.into();
282        if !input.ends_with('\n') {
283            input.push('\n');
284        }
285        if let Some(stdin) = &mut self.stdin.as_mut() {
286            #[allow(clippy::used_underscore_binding)]
287            if let Err(_e) = stdin.write_all(input.as_bytes()).await {
288                let msg = "Failed to write to child stdin";
289                #[cfg(feature = "tracing")]
290                tracing::warn!(error=%_e, msg);
291                return Err(TaskError::Control(msg.to_string()));
292            }
293        } else {
294            let msg = "Stdin is not available";
295            #[cfg(feature = "tracing")]
296            tracing::warn!(msg);
297            return Err(TaskError::Control(msg.to_string()));
298        }
299
300        Ok(())
301    }
302
303    /// Sets up process group configuration for the command.
304    ///
305    /// Configures the command to run in a process group if process group
306    /// support is enabled in the task configuration. This allows for
307    /// coordinated termination of process trees.
308    ///
309    /// # Arguments
310    ///
311    /// * `cmd` - The command to configure for process group execution
312    ///
313    /// # Returns
314    ///
315    /// * `Ok(Command)` - The configured command ready for spawning
316    /// * `Err(TaskError)` - If process group setup fails
317    ///
318    /// # Errors
319    ///
320    /// Returns [`TaskError::Control`] if process group creation fails
321    #[cfg(feature = "process-group")]
322    pub(crate) async fn setup_process_group(&self, cmd: Command) -> Result<Command, TaskError> {
323        if !self
324            .shared_context
325            .config
326            .use_process_group
327            .unwrap_or_default()
328        {
329            return Ok(cmd);
330        }
331        let mut group = self.shared_context.group.lock().await;
332        let cmd = group.create_with_command(cmd).map_err(|e| {
333            let msg = format!("Failed to create process group: {}", e);
334            #[cfg(feature = "tracing")]
335            tracing::error!(error=%e, "{}", msg);
336
337            TaskError::Control(msg)
338        })?;
339        Ok(cmd)
340    }
341}