tcrm_task/tasks/process/group/builder.rs
1use tokio::process::Command;
2
3use crate::tasks::process::group::error::ProcessGroupError;
4
5/// A cross-platform wrapper for managing process groups/jobs.
6///
7/// On Unix systems, this uses process groups with `setsid()`.
8/// On Windows, this uses Job Objects for full process tree termination.
9///
10/// # Platform Support
11/// - **Unix/Linux**: Full process group support using `setsid()` and `killpg()`
12/// - **Windows**: Full process tree support using Job Objects
13/// - **Other platforms**: No special handling
14#[derive(Debug)]
15pub struct ProcessGroup {
16 pub(crate) inner: ProcessGroupInner,
17}
18
19#[derive(Debug)]
20pub(crate) struct ProcessGroupInner {
21 #[cfg(unix)]
22 pub(crate) process_group_id: Option<i32>,
23 #[cfg(windows)]
24 pub(crate) job_handle: Option<SendHandle>,
25 #[cfg(not(any(unix, windows)))]
26 _phantom: (),
27}
28
29#[cfg(windows)]
30#[derive(Debug)]
31pub(crate) struct SendHandle(pub(crate) windows::Win32::Foundation::HANDLE);
32
33#[cfg(windows)]
34unsafe impl Send for SendHandle {}
35
36#[cfg(windows)]
37unsafe impl Sync for SendHandle {}
38
39impl ProcessGroup {
40 /// Create a new, inactive process group
41 ///
42 /// # Returns
43 ///
44 /// A new `ProcessGroup` instance that is not yet active
45 ///
46 /// # Examples
47 ///
48 /// ```rust
49 /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
50 ///
51 /// let group = ProcessGroup::new();
52 /// assert!(!group.is_active());
53 /// ```
54 pub fn new() -> Self {
55 Self {
56 inner: ProcessGroupInner {
57 #[cfg(unix)]
58 process_group_id: None,
59 #[cfg(windows)]
60 job_handle: None,
61 #[cfg(not(any(unix, windows)))]
62 _phantom: (),
63 },
64 }
65 }
66
67 /// Check if the process group is active
68 ///
69 /// # Returns
70 ///
71 /// `true` if the process group has been created and is active, `false` otherwise
72 ///
73 /// # Examples
74 ///
75 /// ```rust
76 /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
77 ///
78 /// let group = ProcessGroup::new();
79 /// assert!(!group.is_active());
80 /// ```
81 pub fn is_active(&self) -> bool {
82 #[cfg(unix)]
83 {
84 self.inner.process_group_id.is_some()
85 }
86 #[cfg(windows)]
87 {
88 self.inner.job_handle.is_some()
89 }
90 #[cfg(not(any(unix, windows)))]
91 {
92 false
93 }
94 }
95 /// Creates a new process group and configures the command to use it.
96 ///
97 /// This method prepares a Command to run as part of this process group. On Unix systems,
98 /// it configures the command to create a new session and process group using setsid().
99 /// On Windows, it configures the command to run in a new job object with appropriate
100 /// creation flags and enables CREATE_SUSPENDED to avoid race conditions.
101 ///
102 /// # Arguments
103 ///
104 /// * `command` - The Command to configure for process group execution
105 ///
106 /// # Returns
107 ///
108 /// * `Ok(Command)` - The configured command ready for execution
109 /// * `Err(ProcessGroupError)` - If process group configuration fails
110 ///
111 /// # Platform-Specific Behavior
112 ///
113 /// ## Windows Race Condition Mitigation
114 /// On Windows, the process is configured to start in a suspended state (CREATE_SUSPENDED).
115 /// After spawning, you must call `assign_child()` and then manually resume the process
116 /// to avoid the race condition where child processes can escape the job before assignment.
117 ///
118 /// ## Unix Behavior
119 /// On Unix, the process starts normally in its own process group via setsid().
120 ///
121 /// # Example
122 ///
123 /// ```rust,no_run
124 /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
125 /// use tokio::process::Command;
126 ///
127 /// let mut group = ProcessGroup::new();
128 /// let mut cmd = Command::new("echo");
129 /// cmd.arg("hello");
130 ///
131 /// let configured_cmd = group.create_with_command(cmd).unwrap();
132 /// // Command is now configured to run in the process group
133 /// ```
134 pub fn create_with_command(
135 &mut self,
136 #[allow(unused_mut)] mut command: Command,
137 ) -> Result<Command, ProcessGroupError> {
138 #[cfg(unix)]
139 {
140 // Configure the command to create a new session and process group
141 unsafe {
142 command.pre_exec(|| {
143 use nix::unistd::setsid;
144 if setsid().is_err() {
145 return Err(std::io::Error::last_os_error());
146 }
147 Ok(())
148 });
149 }
150 Ok(command)
151 }
152 #[cfg(windows)]
153 {
154 use windows::Win32::System::JobObjects::JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
155 use windows::Win32::System::JobObjects::{
156 CreateJobObjectW, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
157 JobObjectExtendedLimitInformation, SetInformationJobObject,
158 };
159 use windows::Win32::System::Threading::CREATE_SUSPENDED;
160 use windows::core::PCWSTR;
161
162 // Create a Job Object for the process group
163 let job_handle = unsafe { CreateJobObjectW(None, PCWSTR::null()) }.map_err(|e| {
164 ProcessGroupError::CreationFailed(format!("Failed to create Job Object: {}", e))
165 })?;
166
167 // Configure the job to kill all processes when the job handle is closed
168 let mut job_info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION::default();
169 job_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
170
171 let set_info_result = unsafe {
172 SetInformationJobObject(
173 job_handle,
174 JobObjectExtendedLimitInformation,
175 &job_info as *const _ as *const std::ffi::c_void,
176 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
177 )
178 };
179
180 if let Err(e) = set_info_result {
181 unsafe {
182 let _ = windows::Win32::Foundation::CloseHandle(job_handle);
183 }
184 return Err(ProcessGroupError::CreationFailed(format!(
185 "Failed to configure Job Object: {}",
186 e
187 )));
188 }
189
190 self.inner.job_handle = Some(SendHandle(job_handle));
191
192 // Configure the command to start suspended to avoid race conditions
193 // This is essential to prevent child processes from escaping the job
194 command.creation_flags(CREATE_SUSPENDED.0);
195
196 Ok(command)
197 }
198 #[cfg(not(any(unix, windows)))]
199 {
200 Err(ProcessGroupError::UnsupportedPlatform(
201 "Process group management not available on this platform".to_string(),
202 ))
203 }
204 }
205
206 /// Assigns a spawned child process to this process group/job.
207 ///
208 /// On Unix systems, this stores the process group ID.
209 ///
210 /// On Windows, this assigns the process to the job object.
211 ///
212 /// After assignment, all future children of the process will be contained in the job, unless the process has
213 /// breakaway privileges (which are not enabled by default in this implementation).
214 ///
215 /// # Arguments
216 ///
217 /// * `child_id` - The process ID of the child to assign to this group
218 ///
219 /// # Returns
220 ///
221 /// * `Ok(())` - If the assignment was successful
222 /// * `Err(ProcessGroupError)` - If assignment fails or the platform is unsupported
223 ///
224 /// # Example
225 ///
226 /// ```rust
227 /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
228 /// use std::process::Command;
229 ///
230 /// let mut group = ProcessGroup::new();
231 ///
232 /// // After spawning a process, assign it to the group
233 /// // let child = Command::new("echo").spawn()?;
234 /// // group.assign_child(child.id())?;
235 /// # Ok::<(), Box<dyn std::error::Error>>(())
236 /// ```
237 ///
238 /// # Windows Race Condition Note
239 /// On Windows, there is a well-known race condition: if a spawned process creates child processes
240 /// before it is assigned to the job object, those children will not be part of the job
241 /// and can escape containment.
242 ///
243 /// See: <https://devblogs.microsoft.com/oldnewthing/20130405-00/?p=4743>
244 ///
245 /// To avoid this issue, the process needs to be spawned in a suspended state,
246 /// assigned to the job object, and only then resuming it. This ensures that no
247 /// child processes can escape the job before assignment.
248 pub fn assign_child(&mut self, child_id: u32) -> Result<(), ProcessGroupError> {
249 #[cfg(unix)]
250 {
251 self.inner.process_group_id = Some(child_id as i32);
252 Ok(())
253 }
254 #[cfg(windows)]
255 {
256 use windows::Win32::Foundation::CloseHandle;
257 use windows::Win32::System::JobObjects::AssignProcessToJobObject;
258 use windows::Win32::System::Threading::{
259 OpenProcess, PROCESS_SET_INFORMATION, PROCESS_SET_QUOTA, PROCESS_TERMINATE,
260 };
261
262 let process_handle = unsafe {
263 OpenProcess(
264 PROCESS_SET_QUOTA | PROCESS_TERMINATE | PROCESS_SET_INFORMATION,
265 false,
266 child_id,
267 )
268 }
269 .map_err(|e| {
270 ProcessGroupError::AssignmentFailed(format!("Failed to open process handle: {}", e))
271 })?;
272
273 let result = if let Some(SendHandle(job_handle)) = &self.inner.job_handle {
274 unsafe { AssignProcessToJobObject(*job_handle, process_handle) }
275 } else {
276 unsafe {
277 let _ = CloseHandle(process_handle);
278 }
279 return Err(ProcessGroupError::AssignmentFailed(
280 "No Job Object handle available".to_string(),
281 ));
282 };
283
284 unsafe {
285 let _ = CloseHandle(process_handle);
286 }
287
288 result.map_err(|e| {
289 ProcessGroupError::AssignmentFailed(format!(
290 "Failed to assign process to Job Object: {}",
291 e
292 ))
293 })?;
294 Ok(())
295 }
296 #[cfg(not(any(unix, windows)))]
297 {
298 let _ = child_id;
299 Err(ProcessGroupError::UnsupportedPlatform(
300 "Process group assignment not available on this platform".to_string(),
301 ))
302 }
303 }
304
305 /// Resumes a suspended process (Windows only).
306 ///
307 /// This method should be called after `assign_child()` when using processes
308 /// spawned with CREATE_SUSPENDED to complete the race-condition-safe setup.
309 ///
310 /// # Arguments
311 ///
312 /// * `child_id` - The process ID of the child to resume
313 ///
314 /// # Returns
315 ///
316 /// * `Ok(())` - If the process was resumed successfully
317 /// * `Err(ProcessGroupError)` - If resuming fails or the platform is unsupported
318 ///
319 /// # Example
320 ///
321 /// ```rust,no_run
322 /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
323 /// use tokio::process::Command;
324 ///
325 /// let mut group = ProcessGroup::new();
326 /// let mut cmd = group.create_with_command(Command::new("echo")).unwrap();
327 /// let child = cmd.spawn().unwrap();
328 /// let pid = child.id().expect("Failed to get process ID");
329 /// group.assign_child(pid).unwrap();
330 /// group.resume_process(pid).unwrap(); // Windows only
331 /// ```
332 #[cfg(windows)]
333 pub fn resume_process(&self, child_id: u32) -> Result<(), ProcessGroupError> {
334 use windows::Win32::Foundation::CloseHandle;
335 use windows::Win32::System::Diagnostics::ToolHelp::{
336 CreateToolhelp32Snapshot, TH32CS_SNAPTHREAD, THREADENTRY32, Thread32First, Thread32Next,
337 };
338 use windows::Win32::System::Threading::{OpenThread, ResumeThread, THREAD_SUSPEND_RESUME};
339
340 unsafe {
341 // Take a snapshot of all threads in the system
342 let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0).map_err(|e| {
343 ProcessGroupError::SignalFailed(format!("Failed to create thread snapshot: {}", e))
344 })?;
345
346 let mut thread_entry = THREADENTRY32 {
347 dwSize: std::mem::size_of::<THREADENTRY32>() as u32,
348 ..Default::default()
349 };
350
351 let mut resumed_count = 0;
352
353 // Iterate through all threads and resume those belonging to the process
354 if Thread32First(snapshot, &mut thread_entry).is_ok() {
355 loop {
356 if thread_entry.th32OwnerProcessID == child_id {
357 let thread_handle =
358 OpenThread(THREAD_SUSPEND_RESUME, false, thread_entry.th32ThreadID);
359 if let Ok(handle) = thread_handle {
360 ResumeThread(handle);
361 let _ = CloseHandle(handle);
362 resumed_count += 1;
363 }
364 }
365
366 if Thread32Next(snapshot, &mut thread_entry).is_err() {
367 break;
368 }
369 }
370 }
371
372 let _ = CloseHandle(snapshot);
373
374 if resumed_count == 0 {
375 Err(ProcessGroupError::SignalFailed(format!(
376 "No threads found to resume for process with PID {}",
377 child_id
378 )))
379 } else {
380 Ok(())
381 }
382 }
383 }
384
385 /// No-op on non-Windows platforms.
386 #[cfg(not(windows))]
387 pub fn resume_process(&self, _child_id: u32) -> Result<(), ProcessGroupError> {
388 Ok(()) // No-op on Unix systems
389 }
390}
391
392impl Drop for ProcessGroupInner {
393 fn drop(&mut self) {
394 #[cfg(windows)]
395 {
396 if let Some(SendHandle(job_handle)) = self.job_handle.take() {
397 unsafe {
398 let _ = windows::Win32::Foundation::CloseHandle(job_handle);
399 }
400 }
401 }
402 }
403}