tcrm_task/tasks/tokio/process.rs
1use tokio::{
2 io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines},
3 process::{Child, ChildStderr, ChildStdout, Command},
4 sync::mpsc,
5};
6
7use crate::tasks::{
8 control::TaskStatusInfo,
9 error::TaskError,
10 event::{TaskEvent, TaskStopReason},
11 state::TaskState,
12 tokio::executor::TaskExecutor,
13};
14
15impl TaskExecutor {
16 /// Tries to store the process ID from a spawned child process.
17 ///
18 /// Validates that a process ID was successfully obtained and stores it
19 /// in the shared context for later use in process management.
20 ///
21 /// # Arguments
22 ///
23 /// * `pid` - Optional process ID from the spawned child
24 ///
25 /// # Returns
26 ///
27 /// * `Ok(u32)` - The validated process ID
28 /// * `Err(TaskError)` - If no process ID was provided
29 ///
30 /// # Errors
31 ///
32 /// Returns [`TaskError::Handle`] if the process ID is None
33 pub(crate) fn try_store_process_id(&self, pid: Option<u32>) -> Result<u32, TaskError> {
34 let Some(pid) = pid else {
35 let msg = "Failed to get process id";
36
37 #[cfg(feature = "tracing")]
38 tracing::error!(msg);
39
40 let error = TaskError::Handle(msg.to_string());
41 return Err(error);
42 };
43 self.shared_context.set_process_id(pid);
44 Ok(pid)
45 }
46 /// Spawns a child process and handles the result.
47 ///
48 /// Attempts to spawn the configured command and stores the process ID.
49 /// If spawning fails, appropriate error events are sent.
50 ///
51 /// On Windows with process groups enabled, spawns the process in a suspended state,
52 /// assigns it to the job object, then resumes it to avoid the race condition where
53 /// child processes could escape the job object.
54 ///
55 /// # Arguments
56 ///
57 /// * `cmd` - The configured command to spawn
58 /// * `event_tx` - Channel for sending task events
59 ///
60 /// # Returns
61 ///
62 /// * `Ok(Child)` - The spawned child process
63 /// * `Err(TaskError)` - If spawning fails
64 ///
65 /// # Errors
66 ///
67 /// Returns [`TaskError::IO`] if process spawning fails
68 pub(crate) async fn spawn_child(
69 &mut self,
70 mut cmd: Command,
71 event_tx: &mpsc::Sender<TaskEvent>,
72 ) -> Result<Child, TaskError> {
73 let use_pg = self
74 .shared_context
75 .config
76 .use_process_group
77 .unwrap_or_default();
78
79 match cmd.spawn() {
80 Ok(mut child) => {
81 let pid = match self.try_store_process_id(child.id()) {
82 Ok(pid) => pid,
83 Err(e) => {
84 let _ = child.kill().await;
85
86 self.send_error_event_and_stop(e.clone(), event_tx).await;
87 return Err(e);
88 }
89 };
90
91 // Assign the child process to the process group if enabled
92 #[cfg(feature = "process-group")]
93 if use_pg {
94 let result = self.shared_context.group.lock().await.assign_child(pid);
95 if let Err(e) = result {
96 let msg = format!("Failed to add process to group: {}", e);
97
98 #[cfg(feature = "tracing")]
99 tracing::error!(error=%e, "Failed to add process to group");
100
101 let _ = child.kill().await;
102
103 let error = TaskError::Handle(msg);
104 self.send_error_event_and_stop(error.clone(), event_tx)
105 .await;
106 return Err(error);
107 }
108 // Resume the process on Windows if it was suspended
109 #[cfg(all(windows, feature = "process-group"))]
110 {
111 let mut error = None;
112 {
113 let group = self.shared_context.group.lock().await;
114 let result = group.resume_process(pid);
115 if let Err(e) = result {
116 let msg = format!("Failed to resume process: {}", e);
117
118 #[cfg(feature = "tracing")]
119 tracing::error!(error=%e, "Failed to resume process");
120
121 let _ = child.kill().await;
122
123 error = Some(TaskError::Handle(msg));
124 }
125 }
126
127 if let Some(error) = error {
128 self.send_error_event_and_stop(error.clone(), event_tx)
129 .await;
130 return Err(error);
131 }
132 }
133 }
134
135 let time = Self::update_state(&self.shared_context, TaskState::Running);
136 Self::send_event(
137 event_tx,
138 TaskEvent::Started {
139 process_id: pid,
140 created_at: self.get_create_at(),
141 running_at: time,
142 },
143 )
144 .await;
145 Ok(child)
146 }
147 Err(e) => {
148 let msg = format!("Failed to spawn child process: {}", e);
149 #[cfg(feature = "tracing")]
150 tracing::error!(error = %e, "Failed to spawn child process");
151 let error = TaskError::IO(msg);
152 let time = Self::update_state(&self.shared_context, TaskState::Finished);
153 let error_event = TaskEvent::Error {
154 error: error.clone(),
155 };
156
157 Self::send_event(event_tx, error_event).await;
158
159 let finish_event = TaskEvent::Stopped {
160 exit_code: None,
161 finished_at: time,
162 reason: TaskStopReason::Error(error.clone()),
163 #[cfg(unix)]
164 signal: None,
165 };
166 Self::send_event(event_tx, finish_event).await;
167
168 Err(error)
169 }
170 }
171 }
172 /// Takes stdout and stderr readers from a child process.
173 ///
174 /// Extracts the stdout and stderr streams from the child process and
175 /// converts them into line readers for processing output.
176 ///
177 /// # Arguments
178 ///
179 /// * `child` - The child process to extract streams from
180 /// * `event_tx` - Channel for sending error events if extraction fails
181 ///
182 /// # Returns
183 ///
184 /// * `Ok((Lines<BufReader<ChildStdout>>, Lines<BufReader<ChildStderr>>))` - The stdout and stderr line readers
185 /// * `Err(TaskError)` - If stream extraction fails
186 ///
187 /// # Errors
188 ///
189 /// Returns [`TaskError::Handle`] if stdout or stderr streams cannot be taken
190 pub(crate) async fn take_std_output_reader(
191 &mut self,
192 child: &mut Child,
193 event_tx: &mpsc::Sender<TaskEvent>,
194 ) -> Result<(Lines<BufReader<ChildStdout>>, Lines<BufReader<ChildStderr>>), TaskError> {
195 let stdout = match child.stdout.take() {
196 Some(out) => BufReader::new(out).lines(),
197 None => {
198 let msg = "Failed to take stdout of child process";
199 #[cfg(feature = "tracing")]
200 tracing::error!(msg);
201
202 let error = TaskError::IO(msg.to_string());
203 self.send_error_event_and_stop(error.clone(), event_tx)
204 .await;
205
206 return Err(TaskError::IO(msg.to_string()));
207 }
208 };
209
210 let stderr = match child.stderr.take() {
211 Some(err) => BufReader::new(err).lines(),
212 None => {
213 let msg = "Failed to take stderr of child process";
214 #[cfg(feature = "tracing")]
215 tracing::error!(msg);
216
217 let error = TaskError::IO(msg.to_string());
218 self.send_error_event_and_stop(error.clone(), event_tx)
219 .await;
220
221 return Err(TaskError::IO(msg.to_string()));
222 }
223 };
224
225 Ok((stdout, stderr))
226 }
227
228 /// Stores the stdin handle from a child process for later use.
229 ///
230 /// Extracts and stores the stdin stream from the child process if stdin
231 /// is enabled in the task configuration. This allows sending input to
232 /// the process later via the send_stdin method.
233 ///
234 /// # Arguments
235 ///
236 /// * `child` - The child process to extract stdin from
237 /// * `event_tx` - Channel for sending error events if extraction fails
238 ///
239 /// # Returns
240 ///
241 /// * `Ok(())` - Stdin stored successfully or not required
242 /// * `Err(TaskError)` - If stdin extraction fails when required
243 ///
244 /// # Errors
245 ///
246 /// Returns [`TaskError::Handle`] if stdin cannot be taken from the child process
247 /// when stdin is enabled in the configuration
248 pub(crate) async fn store_stdin(
249 &mut self,
250 child: &mut Child,
251 event_tx: &mpsc::Sender<TaskEvent>,
252 ) -> Result<(), TaskError> {
253 if !self.shared_context.config.enable_stdin.unwrap_or_default() {
254 return Ok(());
255 }
256
257 if let Some(stdin) = child.stdin.take() {
258 self.stdin = Some(stdin);
259 Ok(())
260 } else {
261 let msg = "Failed to take stdin out of child process";
262 #[cfg(feature = "tracing")]
263 tracing::error!(msg);
264
265 let error = TaskError::IO(msg.to_string());
266 self.send_error_event_and_stop(error.clone(), event_tx)
267 .await;
268
269 Err(TaskError::IO(msg.to_string()))
270 }
271 }
272 /// Sends input to the process's stdin.
273 ///
274 /// Writes the provided input to the process's stdin stream. The input
275 /// will be automatically terminated with a newline if it doesn't already end with one.
276 ///
277 /// # Arguments
278 ///
279 /// * `input` - The input string to send to stdin
280 ///
281 /// # Returns
282 ///
283 /// * `Ok(())` - If the input was sent successfully
284 /// * `Err(TaskError)` - If sending fails or the task is not running
285 ///
286 /// # Errors
287 ///
288 /// Returns [`TaskError::Control`] if the task is not in a running state,
289 /// or [`TaskError::IO`] if writing to stdin fails
290 pub async fn send_stdin(&mut self, input: impl Into<String>) -> Result<(), TaskError> {
291 let state = self.get_task_state();
292 if !matches!(state, TaskState::Running | TaskState::Ready) {
293 return Err(TaskError::Control(
294 "Cannot send stdin, task is not running".to_string(),
295 ));
296 }
297 let mut input: String = input.into();
298 if !input.ends_with('\n') {
299 input.push('\n');
300 }
301 if let Some(stdin) = &mut self.stdin.as_mut() {
302 #[allow(clippy::used_underscore_binding)]
303 if let Err(_e) = stdin.write_all(input.as_bytes()).await {
304 let msg = "Failed to write to child stdin";
305 #[cfg(feature = "tracing")]
306 tracing::warn!(error=%_e, msg);
307 return Err(TaskError::Control(msg.to_string()));
308 }
309 } else {
310 let msg = "Stdin is not available";
311 #[cfg(feature = "tracing")]
312 tracing::warn!(msg);
313 return Err(TaskError::Control(msg.to_string()));
314 }
315
316 Ok(())
317 }
318
319 /// Sets up process group configuration for the command.
320 ///
321 /// Configures the command to run in a process group if process group
322 /// support is enabled in the task configuration. This allows for
323 /// coordinated termination of process trees.
324 ///
325 /// # Arguments
326 ///
327 /// * `cmd` - The command to configure for process group execution
328 ///
329 /// # Returns
330 ///
331 /// * `Ok(Command)` - The configured command ready for spawning
332 /// * `Err(TaskError)` - If process group setup fails
333 ///
334 /// # Errors
335 ///
336 /// Returns [`TaskError::Control`] if process group creation fails
337 #[cfg(feature = "process-group")]
338 pub(crate) async fn setup_process_group(&self, cmd: Command) -> Result<Command, TaskError> {
339 if !self
340 .shared_context
341 .config
342 .use_process_group
343 .unwrap_or_default()
344 {
345 return Ok(cmd);
346 }
347 let mut group = self.shared_context.group.lock().await;
348 let cmd = group.create_with_command(cmd).map_err(|e| {
349 let msg = format!("Failed to create process group: {}", e);
350 #[cfg(feature = "tracing")]
351 tracing::error!(error=%e, "{}", msg);
352
353 TaskError::Control(msg)
354 })?;
355 Ok(cmd)
356 }
357}