tcrm_task/tasks/tokio/process.rs
1use tokio::{
2 io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines},
3 process::{Child, ChildStderr, ChildStdout, Command},
4};
5
6use crate::tasks::{
7 control::TaskStatusInfo,
8 error::TaskError,
9 event::{TaskEvent, TaskStopReason},
10 state::TaskState,
11 tokio::executor::TaskExecutor,
12};
13
14impl TaskExecutor {
15 /// Tries to store the process ID from a spawned child process.
16 ///
17 /// Validates that a process ID was successfully obtained and stores it
18 /// in the shared context for later use in process management.
19 ///
20 /// # Arguments
21 ///
22 /// * `pid` - Optional process ID from the spawned child
23 ///
24 /// # Returns
25 ///
26 /// * `Ok(u32)` - The validated process ID
27 /// * `Err(TaskError)` - If no process ID was provided
28 ///
29 /// # Errors
30 ///
31 /// Returns [`TaskError::Handle`] if the process ID is None
32 pub(crate) fn try_store_process_id(&self, pid: Option<u32>) -> Result<u32, TaskError> {
33 let Some(pid) = pid else {
34 let msg = "Failed to get process id";
35
36 #[cfg(feature = "tracing")]
37 tracing::error!(msg);
38
39 let error = TaskError::Handle(msg.to_string());
40 return Err(error);
41 };
42 self.shared_context.set_process_id(pid);
43 Ok(pid)
44 }
45 /// Spawns a child process and handles the result.
46 ///
47 /// Attempts to spawn the configured command and stores the process ID.
48 /// If spawning fails, appropriate error events are sent.
49 ///
50 /// On Windows with process groups enabled, spawns the process in a suspended state,
51 /// assigns it to the job object, then resumes it to avoid the race condition where
52 /// child processes could escape the job object.
53 ///
54 /// # Arguments
55 ///
56 /// * `cmd` - The configured command to spawn
57 /// * `event_tx` - Channel for sending task events
58 ///
59 /// # Returns
60 ///
61 /// * `Ok(Child)` - The spawned child process
62 /// * `Err(TaskError)` - If spawning fails
63 ///
64 /// # Errors
65 ///
66 /// Returns [`TaskError::IO`] if process spawning fails
67 pub(crate) async fn spawn_child(&mut self, mut cmd: Command) -> Result<Child, TaskError> {
68 let use_pg = self
69 .shared_context
70 .config
71 .use_process_group
72 .unwrap_or_default();
73
74 match cmd.spawn() {
75 Ok(mut child) => {
76 let pid = match self.try_store_process_id(child.id()) {
77 Ok(pid) => pid,
78 Err(e) => {
79 let _ = child.kill().await;
80
81 self.send_error_event_and_stop(e.clone()).await;
82 return Err(e);
83 }
84 };
85
86 // Assign the child process to the process group if enabled
87 #[cfg(feature = "process-group")]
88 if use_pg {
89 let result = self.shared_context.group.lock().await.assign_child(pid);
90 if let Err(e) = result {
91 let msg = format!("Failed to add process to group: {}", e);
92
93 #[cfg(feature = "tracing")]
94 tracing::error!(error=%e, "Failed to add process to group");
95
96 let _ = child.kill().await;
97
98 let error = TaskError::Handle(msg);
99 self.send_error_event_and_stop(error.clone()).await;
100 return Err(error);
101 }
102 // Resume the process on Windows if it was suspended
103 #[cfg(all(windows, feature = "process-group"))]
104 {
105 let mut error = None;
106 {
107 let group = self.shared_context.group.lock().await;
108 let result = group.resume_process(pid);
109 if let Err(e) = result {
110 let msg = format!("Failed to resume process: {}", e);
111
112 #[cfg(feature = "tracing")]
113 tracing::error!(error=%e, "Failed to resume process");
114
115 let _ = child.kill().await;
116
117 error = Some(TaskError::Handle(msg));
118 }
119 }
120
121 if let Some(error) = error {
122 self.send_error_event_and_stop(error.clone()).await;
123 return Err(error);
124 }
125 }
126 }
127
128 let time = Self::update_state(&self.shared_context, TaskState::Running);
129 let _ = self
130 .shared_context
131 .send_event(TaskEvent::Started {
132 process_id: pid,
133 created_at: self.get_create_at(),
134 running_at: time,
135 })
136 .await;
137 Ok(child)
138 }
139 Err(e) => {
140 let msg = format!("Failed to spawn child process: {}", e);
141 #[cfg(feature = "tracing")]
142 tracing::error!(error = %e, "Failed to spawn child process");
143 let error = TaskError::IO(msg);
144 let time = Self::update_state(&self.shared_context, TaskState::Finished);
145 let error_event = TaskEvent::Error {
146 error: error.clone(),
147 };
148
149 let _ = self.shared_context.send_event(error_event).await;
150
151 let finish_event = TaskEvent::Stopped {
152 exit_code: None,
153 finished_at: time,
154 reason: TaskStopReason::Error(error.clone()),
155 #[cfg(unix)]
156 signal: None,
157 };
158 let _ = self.shared_context.send_event(finish_event).await;
159
160 Err(error)
161 }
162 }
163 }
164 /// Takes stdout and stderr readers from a child process.
165 ///
166 /// Extracts the stdout and stderr streams from the child process and
167 /// converts them into line readers for processing output.
168 ///
169 /// # Arguments
170 ///
171 /// * `child` - The child process to extract streams from
172 /// * `event_tx` - Channel for sending error events if extraction fails
173 ///
174 /// # Returns
175 ///
176 /// * `Ok((Lines<BufReader<ChildStdout>>, Lines<BufReader<ChildStderr>>))` - The stdout and stderr line readers
177 /// * `Err(TaskError)` - If stream extraction fails
178 ///
179 /// # Errors
180 ///
181 /// Returns [`TaskError::Handle`] if stdout or stderr streams cannot be taken
182 pub(crate) async fn take_std_output_reader(
183 &mut self,
184 child: &mut Child,
185 ) -> Result<(Lines<BufReader<ChildStdout>>, Lines<BufReader<ChildStderr>>), TaskError> {
186 let stdout = match child.stdout.take() {
187 Some(out) => BufReader::new(out).lines(),
188 None => {
189 let msg = "Failed to take stdout of child process";
190 #[cfg(feature = "tracing")]
191 tracing::error!(msg);
192
193 let error = TaskError::IO(msg.to_string());
194 self.send_error_event_and_stop(error.clone()).await;
195
196 return Err(TaskError::IO(msg.to_string()));
197 }
198 };
199
200 let stderr = match child.stderr.take() {
201 Some(err) => BufReader::new(err).lines(),
202 None => {
203 let msg = "Failed to take stderr of child process";
204 #[cfg(feature = "tracing")]
205 tracing::error!(msg);
206
207 let error = TaskError::IO(msg.to_string());
208 self.send_error_event_and_stop(error.clone()).await;
209
210 return Err(TaskError::IO(msg.to_string()));
211 }
212 };
213
214 Ok((stdout, stderr))
215 }
216
217 /// Stores the stdin handle from a child process for later use.
218 ///
219 /// Extracts and stores the stdin stream from the child process if stdin
220 /// is enabled in the task configuration. This allows sending input to
221 /// the process later via the send_stdin method.
222 ///
223 /// # Arguments
224 ///
225 /// * `child` - The child process to extract stdin from
226 /// * `event_tx` - Channel for sending error events if extraction fails
227 ///
228 /// # Returns
229 ///
230 /// * `Ok(())` - Stdin stored successfully or not required
231 /// * `Err(TaskError)` - If stdin extraction fails when required
232 ///
233 /// # Errors
234 ///
235 /// Returns [`TaskError::Handle`] if stdin cannot be taken from the child process
236 /// when stdin is enabled in the configuration
237 pub(crate) async fn store_stdin(&mut self, child: &mut Child) -> Result<(), TaskError> {
238 if !self.shared_context.config.enable_stdin.unwrap_or_default() {
239 return Ok(());
240 }
241
242 if let Some(stdin) = child.stdin.take() {
243 self.stdin = Some(stdin);
244 Ok(())
245 } else {
246 let msg = "Failed to take stdin out of child process";
247 #[cfg(feature = "tracing")]
248 tracing::error!(msg);
249
250 let error = TaskError::IO(msg.to_string());
251 self.send_error_event_and_stop(error.clone()).await;
252
253 Err(TaskError::IO(msg.to_string()))
254 }
255 }
256 /// Sends input to the process's stdin.
257 ///
258 /// Writes the provided input to the process's stdin stream. The input
259 /// will be automatically terminated with a newline if it doesn't already end with one.
260 ///
261 /// # Arguments
262 ///
263 /// * `input` - The input string to send to stdin
264 ///
265 /// # Returns
266 ///
267 /// * `Ok(())` - If the input was sent successfully
268 /// * `Err(TaskError)` - If sending fails or the task is not running
269 ///
270 /// # Errors
271 ///
272 /// Returns [`TaskError::Control`] if the task is not in a running state,
273 /// or [`TaskError::IO`] if writing to stdin fails
274 pub async fn send_stdin(&mut self, input: impl Into<String>) -> Result<(), TaskError> {
275 let state = self.get_task_state();
276 if !matches!(state, TaskState::Running | TaskState::Ready) {
277 return Err(TaskError::Control(
278 "Cannot send stdin, task is not running".to_string(),
279 ));
280 }
281 let mut input: String = input.into();
282 if !input.ends_with('\n') {
283 input.push('\n');
284 }
285 if let Some(stdin) = &mut self.stdin.as_mut() {
286 #[allow(clippy::used_underscore_binding)]
287 if let Err(_e) = stdin.write_all(input.as_bytes()).await {
288 let msg = "Failed to write to child stdin";
289 #[cfg(feature = "tracing")]
290 tracing::warn!(error=%_e, msg);
291 return Err(TaskError::Control(msg.to_string()));
292 }
293 } else {
294 let msg = "Stdin is not available";
295 #[cfg(feature = "tracing")]
296 tracing::warn!(msg);
297 return Err(TaskError::Control(msg.to_string()));
298 }
299
300 Ok(())
301 }
302
303 /// Sets up process group configuration for the command.
304 ///
305 /// Configures the command to run in a process group if process group
306 /// support is enabled in the task configuration. This allows for
307 /// coordinated termination of process trees.
308 ///
309 /// # Arguments
310 ///
311 /// * `cmd` - The command to configure for process group execution
312 ///
313 /// # Returns
314 ///
315 /// * `Ok(Command)` - The configured command ready for spawning
316 /// * `Err(TaskError)` - If process group setup fails
317 ///
318 /// # Errors
319 ///
320 /// Returns [`TaskError::Control`] if process group creation fails
321 #[cfg(feature = "process-group")]
322 pub(crate) async fn setup_process_group(&self, cmd: Command) -> Result<Command, TaskError> {
323 if !self
324 .shared_context
325 .config
326 .use_process_group
327 .unwrap_or_default()
328 {
329 return Ok(cmd);
330 }
331 let mut group = self.shared_context.group.lock().await;
332 let cmd = group.create_with_command(cmd).map_err(|e| {
333 let msg = format!("Failed to create process group: {}", e);
334 #[cfg(feature = "tracing")]
335 tracing::error!(error=%e, "{}", msg);
336
337 TaskError::Control(msg)
338 })?;
339 Ok(cmd)
340 }
341}