tcrm_task/tasks/tokio/
process.rs

1use tokio::{
2    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines},
3    process::{Child, ChildStderr, ChildStdout, Command},
4    sync::mpsc,
5};
6
7use crate::tasks::{
8    control::TaskStatusInfo,
9    error::TaskError,
10    event::{TaskEvent, TaskStopReason},
11    state::TaskState,
12    tokio::executor::TaskExecutor,
13};
14
15impl TaskExecutor {
16    /// Tries to store the process ID from a spawned child process.
17    ///
18    /// Validates that a process ID was successfully obtained and stores it
19    /// in the shared context for later use in process management.
20    ///
21    /// # Arguments
22    ///
23    /// * `pid` - Optional process ID from the spawned child
24    ///
25    /// # Returns
26    ///
27    /// * `Ok(u32)` - The validated process ID
28    /// * `Err(TaskError)` - If no process ID was provided
29    ///
30    /// # Errors
31    ///
32    /// Returns [`TaskError::Handle`] if the process ID is None
33    pub(crate) fn try_store_process_id(&self, pid: Option<u32>) -> Result<u32, TaskError> {
34        let Some(pid) = pid else {
35            let msg = "Failed to get process id";
36
37            #[cfg(feature = "tracing")]
38            tracing::error!(msg);
39
40            let error = TaskError::Handle(msg.to_string());
41            return Err(error);
42        };
43        self.shared_context.set_process_id(pid);
44        Ok(pid)
45    }
46    /// Spawns a child process and handles the result.
47    ///
48    /// Attempts to spawn the configured command and stores the process ID.
49    /// If spawning fails, appropriate error events are sent.
50    ///
51    /// On Windows with process groups enabled, spawns the process in a suspended state,
52    /// assigns it to the job object, then resumes it to avoid the race condition where
53    /// child processes could escape the job object.
54    ///
55    /// # Arguments
56    ///
57    /// * `cmd` - The configured command to spawn
58    /// * `event_tx` - Channel for sending task events
59    ///
60    /// # Returns
61    ///
62    /// * `Ok(Child)` - The spawned child process
63    /// * `Err(TaskError)` - If spawning fails
64    ///
65    /// # Errors
66    ///
67    /// Returns [`TaskError::IO`] if process spawning fails
68    pub(crate) async fn spawn_child(
69        &mut self,
70        mut cmd: Command,
71        event_tx: &mpsc::Sender<TaskEvent>,
72    ) -> Result<Child, TaskError> {
73        let use_pg = self
74            .shared_context
75            .config
76            .use_process_group
77            .unwrap_or_default();
78
79        match cmd.spawn() {
80            Ok(mut child) => {
81                let pid = match self.try_store_process_id(child.id()) {
82                    Ok(pid) => pid,
83                    Err(e) => {
84                        let _ = child.kill().await;
85
86                        self.send_error_event_and_stop(e.clone(), event_tx).await;
87                        return Err(e);
88                    }
89                };
90
91                // Assign the child process to the process group if enabled
92                #[cfg(feature = "process-group")]
93                if use_pg {
94                    let result = self.shared_context.group.lock().await.assign_child(pid);
95                    if let Err(e) = result {
96                        let msg = format!("Failed to add process to group: {}", e);
97
98                        #[cfg(feature = "tracing")]
99                        tracing::error!(error=%e, "Failed to add process to group");
100
101                        let _ = child.kill().await;
102
103                        let error = TaskError::Handle(msg);
104                        self.send_error_event_and_stop(error.clone(), event_tx)
105                            .await;
106                        return Err(error);
107                    }
108                    // Resume the process on Windows if it was suspended
109                    #[cfg(all(windows, feature = "process-group"))]
110                    {
111                        let mut error = None;
112                        {
113                            let group = self.shared_context.group.lock().await;
114                            let result = group.resume_process(pid);
115                            if let Err(e) = result {
116                                let msg = format!("Failed to resume process: {}", e);
117
118                                #[cfg(feature = "tracing")]
119                                tracing::error!(error=%e, "Failed to resume process");
120
121                                let _ = child.kill().await;
122
123                                error = Some(TaskError::Handle(msg));
124                            }
125                        }
126
127                        if let Some(error) = error {
128                            self.send_error_event_and_stop(error.clone(), event_tx)
129                                .await;
130                            return Err(error);
131                        }
132                    }
133                }
134
135                let time = Self::update_state(&self.shared_context, TaskState::Running);
136                Self::send_event(
137                    event_tx,
138                    TaskEvent::Started {
139                        process_id: pid,
140                        created_at: self.get_create_at(),
141                        running_at: time,
142                    },
143                )
144                .await;
145                Ok(child)
146            }
147            Err(e) => {
148                let msg = format!("Failed to spawn child process: {}", e);
149                #[cfg(feature = "tracing")]
150                tracing::error!(error = %e, "Failed to spawn child process");
151                let error = TaskError::IO(msg);
152                let time = Self::update_state(&self.shared_context, TaskState::Finished);
153                let error_event = TaskEvent::Error {
154                    error: error.clone(),
155                };
156
157                Self::send_event(event_tx, error_event).await;
158
159                let finish_event = TaskEvent::Stopped {
160                    exit_code: None,
161                    finished_at: time,
162                    reason: TaskStopReason::Error(error.clone()),
163                    #[cfg(unix)]
164                    signal: None,
165                };
166                Self::send_event(event_tx, finish_event).await;
167
168                Err(error)
169            }
170        }
171    }
172    /// Takes stdout and stderr readers from a child process.
173    ///
174    /// Extracts the stdout and stderr streams from the child process and
175    /// converts them into line readers for processing output.
176    ///
177    /// # Arguments
178    ///
179    /// * `child` - The child process to extract streams from
180    /// * `event_tx` - Channel for sending error events if extraction fails
181    ///
182    /// # Returns
183    ///
184    /// * `Ok((Lines<BufReader<ChildStdout>>, Lines<BufReader<ChildStderr>>))` - The stdout and stderr line readers
185    /// * `Err(TaskError)` - If stream extraction fails
186    ///
187    /// # Errors
188    ///
189    /// Returns [`TaskError::Handle`] if stdout or stderr streams cannot be taken
190    pub(crate) async fn take_std_output_reader(
191        &mut self,
192        child: &mut Child,
193        event_tx: &mpsc::Sender<TaskEvent>,
194    ) -> Result<(Lines<BufReader<ChildStdout>>, Lines<BufReader<ChildStderr>>), TaskError> {
195        let stdout = match child.stdout.take() {
196            Some(out) => BufReader::new(out).lines(),
197            None => {
198                let msg = "Failed to take stdout of child process";
199                #[cfg(feature = "tracing")]
200                tracing::error!(msg);
201
202                let error = TaskError::IO(msg.to_string());
203                self.send_error_event_and_stop(error.clone(), event_tx)
204                    .await;
205
206                return Err(TaskError::IO(msg.to_string()));
207            }
208        };
209
210        let stderr = match child.stderr.take() {
211            Some(err) => BufReader::new(err).lines(),
212            None => {
213                let msg = "Failed to take stderr of child process";
214                #[cfg(feature = "tracing")]
215                tracing::error!(msg);
216
217                let error = TaskError::IO(msg.to_string());
218                self.send_error_event_and_stop(error.clone(), event_tx)
219                    .await;
220
221                return Err(TaskError::IO(msg.to_string()));
222            }
223        };
224
225        Ok((stdout, stderr))
226    }
227
228    /// Stores the stdin handle from a child process for later use.
229    ///
230    /// Extracts and stores the stdin stream from the child process if stdin
231    /// is enabled in the task configuration. This allows sending input to
232    /// the process later via the send_stdin method.
233    ///
234    /// # Arguments
235    ///
236    /// * `child` - The child process to extract stdin from
237    /// * `event_tx` - Channel for sending error events if extraction fails
238    ///
239    /// # Returns
240    ///
241    /// * `Ok(())` - Stdin stored successfully or not required
242    /// * `Err(TaskError)` - If stdin extraction fails when required
243    ///
244    /// # Errors
245    ///
246    /// Returns [`TaskError::Handle`] if stdin cannot be taken from the child process
247    /// when stdin is enabled in the configuration
248    pub(crate) async fn store_stdin(
249        &mut self,
250        child: &mut Child,
251        event_tx: &mpsc::Sender<TaskEvent>,
252    ) -> Result<(), TaskError> {
253        if !self.shared_context.config.enable_stdin.unwrap_or_default() {
254            return Ok(());
255        }
256
257        if let Some(stdin) = child.stdin.take() {
258            self.stdin = Some(stdin);
259            Ok(())
260        } else {
261            let msg = "Failed to take stdin out of child process";
262            #[cfg(feature = "tracing")]
263            tracing::error!(msg);
264
265            let error = TaskError::IO(msg.to_string());
266            self.send_error_event_and_stop(error.clone(), event_tx)
267                .await;
268
269            Err(TaskError::IO(msg.to_string()))
270        }
271    }
272    /// Sends input to the process's stdin.
273    ///
274    /// Writes the provided input to the process's stdin stream. The input
275    /// will be automatically terminated with a newline if it doesn't already end with one.
276    ///
277    /// # Arguments
278    ///
279    /// * `input` - The input string to send to stdin
280    ///
281    /// # Returns
282    ///
283    /// * `Ok(())` - If the input was sent successfully
284    /// * `Err(TaskError)` - If sending fails or the task is not running
285    ///
286    /// # Errors
287    ///
288    /// Returns [`TaskError::Control`] if the task is not in a running state,
289    /// or [`TaskError::IO`] if writing to stdin fails
290    pub async fn send_stdin(&mut self, input: impl Into<String>) -> Result<(), TaskError> {
291        let state = self.get_task_state();
292        if !matches!(state, TaskState::Running | TaskState::Ready) {
293            return Err(TaskError::Control(
294                "Cannot send stdin, task is not running".to_string(),
295            ));
296        }
297        let mut input: String = input.into();
298        if !input.ends_with('\n') {
299            input.push('\n');
300        }
301        if let Some(stdin) = &mut self.stdin.as_mut() {
302            #[allow(clippy::used_underscore_binding)]
303            if let Err(_e) = stdin.write_all(input.as_bytes()).await {
304                let msg = "Failed to write to child stdin";
305                #[cfg(feature = "tracing")]
306                tracing::warn!(error=%_e, msg);
307                return Err(TaskError::Control(msg.to_string()));
308            }
309        } else {
310            let msg = "Stdin is not available";
311            #[cfg(feature = "tracing")]
312            tracing::warn!(msg);
313            return Err(TaskError::Control(msg.to_string()));
314        }
315
316        Ok(())
317    }
318
319    /// Sets up process group configuration for the command.
320    ///
321    /// Configures the command to run in a process group if process group
322    /// support is enabled in the task configuration. This allows for
323    /// coordinated termination of process trees.
324    ///
325    /// # Arguments
326    ///
327    /// * `cmd` - The command to configure for process group execution
328    ///
329    /// # Returns
330    ///
331    /// * `Ok(Command)` - The configured command ready for spawning
332    /// * `Err(TaskError)` - If process group setup fails
333    ///
334    /// # Errors
335    ///
336    /// Returns [`TaskError::Control`] if process group creation fails
337    #[cfg(feature = "process-group")]
338    pub(crate) async fn setup_process_group(&self, cmd: Command) -> Result<Command, TaskError> {
339        if !self
340            .shared_context
341            .config
342            .use_process_group
343            .unwrap_or_default()
344        {
345            return Ok(cmd);
346        }
347        let mut group = self.shared_context.group.lock().await;
348        let cmd = group.create_with_command(cmd).map_err(|e| {
349            let msg = format!("Failed to create process group: {}", e);
350            #[cfg(feature = "tracing")]
351            tracing::error!(error=%e, "{}", msg);
352
353            TaskError::Control(msg)
354        })?;
355        Ok(cmd)
356    }
357}