tcrm_task/tasks/process/group/
builder.rs

1use tokio::process::Command;
2
3use crate::tasks::process::group::error::ProcessGroupError;
4
5/// A cross-platform wrapper for managing process groups/jobs.
6///
7/// On Unix systems, this uses process groups with `setsid()`.
8/// On Windows, this uses Job Objects for full process tree termination.
9///
10/// # Platform Support
11/// - **Unix/Linux**: Full process group support using `setsid()` and `killpg()`
12/// - **Windows**: Full process tree support using Job Objects
13/// - **Other platforms**: No special handling
14#[derive(Debug)]
15pub struct ProcessGroup {
16    pub(crate) inner: ProcessGroupInner,
17}
18
19#[derive(Debug)]
20pub(crate) struct ProcessGroupInner {
21    #[cfg(unix)]
22    pub(crate) process_group_id: Option<i32>,
23    #[cfg(windows)]
24    pub(crate) job_handle: Option<SendHandle>,
25    #[cfg(not(any(unix, windows)))]
26    _phantom: (),
27}
28
29#[cfg(windows)]
30#[derive(Debug)]
31pub(crate) struct SendHandle(pub(crate) windows::Win32::Foundation::HANDLE);
32
33#[cfg(windows)]
34unsafe impl Send for SendHandle {}
35
36#[cfg(windows)]
37unsafe impl Sync for SendHandle {}
38
39impl ProcessGroup {
40    /// Create a new, inactive process group
41    ///
42    /// # Returns
43    ///
44    /// A new `ProcessGroup` instance that is not yet active
45    ///
46    /// # Examples
47    ///
48    /// ```rust
49    /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
50    ///
51    /// let group = ProcessGroup::new();
52    /// assert!(!group.is_active());
53    /// ```
54    pub fn new() -> Self {
55        Self {
56            inner: ProcessGroupInner {
57                #[cfg(unix)]
58                process_group_id: None,
59                #[cfg(windows)]
60                job_handle: None,
61                #[cfg(not(any(unix, windows)))]
62                _phantom: (),
63            },
64        }
65    }
66
67    /// Check if the process group is active
68    ///
69    /// # Returns
70    ///
71    /// `true` if the process group has been created and is active, `false` otherwise
72    ///
73    /// # Examples
74    ///
75    /// ```rust
76    /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
77    ///
78    /// let group = ProcessGroup::new();
79    /// assert!(!group.is_active());
80    /// ```
81    pub fn is_active(&self) -> bool {
82        #[cfg(unix)]
83        {
84            self.inner.process_group_id.is_some()
85        }
86        #[cfg(windows)]
87        {
88            self.inner.job_handle.is_some()
89        }
90        #[cfg(not(any(unix, windows)))]
91        {
92            false
93        }
94    }
95    /// Creates a new process group and configures the command to use it.
96    ///
97    /// This method prepares a Command to run as part of this process group. On Unix systems,
98    /// it configures the command to create a new session and process group using setsid().
99    /// On Windows, it configures the command to run in a new job object with appropriate
100    /// creation flags and enables CREATE_SUSPENDED to avoid race conditions.
101    ///
102    /// # Arguments
103    ///
104    /// * `command` - The Command to configure for process group execution
105    ///
106    /// # Returns
107    ///
108    /// * `Ok(Command)` - The configured command ready for execution
109    /// * `Err(ProcessGroupError)` - If process group configuration fails
110    ///
111    /// # Platform-Specific Behavior
112    ///
113    /// ## Windows Race Condition Mitigation
114    /// On Windows, the process is configured to start in a suspended state (CREATE_SUSPENDED).
115    /// After spawning, you must call `assign_child()` and then manually resume the process
116    /// to avoid the race condition where child processes can escape the job before assignment.
117    ///
118    /// ## Unix Behavior  
119    /// On Unix, the process starts normally in its own process group via setsid().
120    ///
121    /// # Example
122    ///
123    /// ```rust,no_run
124    /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
125    /// use tokio::process::Command;
126    ///
127    /// let mut group = ProcessGroup::new();
128    /// let mut cmd = Command::new("echo");
129    /// cmd.arg("hello");
130    ///
131    /// let configured_cmd = group.create_with_command(cmd).unwrap();
132    /// // Command is now configured to run in the process group
133    /// ```
134    pub fn create_with_command(
135        &mut self,
136        #[allow(unused_mut)] mut command: Command,
137    ) -> Result<Command, ProcessGroupError> {
138        #[cfg(unix)]
139        {
140            // Configure the command to create a new session and process group
141            unsafe {
142                command.pre_exec(|| {
143                    use nix::unistd::setsid;
144                    if setsid().is_err() {
145                        return Err(std::io::Error::last_os_error());
146                    }
147                    Ok(())
148                });
149            }
150            Ok(command)
151        }
152        #[cfg(windows)]
153        {
154            use windows::Win32::System::JobObjects::JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
155            use windows::Win32::System::JobObjects::{
156                CreateJobObjectW, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
157                JobObjectExtendedLimitInformation, SetInformationJobObject,
158            };
159            use windows::Win32::System::Threading::CREATE_SUSPENDED;
160            use windows::core::PCWSTR;
161
162            // Create a Job Object for the process group
163            let job_handle = unsafe { CreateJobObjectW(None, PCWSTR::null()) }.map_err(|e| {
164                ProcessGroupError::CreationFailed(format!("Failed to create Job Object: {}", e))
165            })?;
166
167            // Configure the job to kill all processes when the job handle is closed
168            let mut job_info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION::default();
169            job_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
170
171            let set_info_result = unsafe {
172                SetInformationJobObject(
173                    job_handle,
174                    JobObjectExtendedLimitInformation,
175                    &job_info as *const _ as *const std::ffi::c_void,
176                    std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
177                )
178            };
179
180            if let Err(e) = set_info_result {
181                unsafe {
182                    let _ = windows::Win32::Foundation::CloseHandle(job_handle);
183                }
184                return Err(ProcessGroupError::CreationFailed(format!(
185                    "Failed to configure Job Object: {}",
186                    e
187                )));
188            }
189
190            self.inner.job_handle = Some(SendHandle(job_handle));
191
192            // Configure the command to start suspended to avoid race conditions
193            // This is essential to prevent child processes from escaping the job
194            command.creation_flags(CREATE_SUSPENDED.0);
195
196            Ok(command)
197        }
198        #[cfg(not(any(unix, windows)))]
199        {
200            Err(ProcessGroupError::UnsupportedPlatform(
201                "Process group management not available on this platform".to_string(),
202            ))
203        }
204    }
205
206    /// Assigns a spawned child process to this process group/job.
207    ///
208    /// On Unix systems, this stores the process group ID.
209    ///
210    /// On Windows, this assigns the process to the job object.
211    ///
212    /// After assignment, all future children of the process will be contained in the job, unless the process has
213    /// breakaway privileges (which are not enabled by default in this implementation).
214    ///
215    /// # Arguments
216    ///
217    /// * `child_id` - The process ID of the child to assign to this group
218    ///
219    /// # Returns
220    ///
221    /// * `Ok(())` - If the assignment was successful
222    /// * `Err(ProcessGroupError)` - If assignment fails or the platform is unsupported
223    ///
224    /// # Example
225    ///
226    /// ```rust
227    /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
228    /// use std::process::Command;
229    ///
230    /// let mut group = ProcessGroup::new();
231    ///
232    /// // After spawning a process, assign it to the group
233    /// // let child = Command::new("echo").spawn()?;
234    /// // group.assign_child(child.id())?;
235    /// # Ok::<(), Box<dyn std::error::Error>>(())
236    /// ```
237    ///
238    /// # Windows Race Condition Note
239    /// On Windows, there is a well-known race condition: if a spawned process creates child processes
240    /// before it is assigned to the job object, those children will not be part of the job
241    /// and can escape containment.
242    ///
243    /// See: <https://devblogs.microsoft.com/oldnewthing/20130405-00/?p=4743>
244    ///
245    /// To avoid this issue, the process needs to be spawned in a suspended state,
246    /// assigned to the job object, and only then resuming it. This ensures that no
247    /// child processes can escape the job before assignment.
248    pub fn assign_child(&mut self, child_id: u32) -> Result<(), ProcessGroupError> {
249        #[cfg(unix)]
250        {
251            self.inner.process_group_id = Some(child_id as i32);
252            Ok(())
253        }
254        #[cfg(windows)]
255        {
256            use windows::Win32::Foundation::CloseHandle;
257            use windows::Win32::System::JobObjects::AssignProcessToJobObject;
258            use windows::Win32::System::Threading::{
259                OpenProcess, PROCESS_SET_INFORMATION, PROCESS_SET_QUOTA, PROCESS_TERMINATE,
260            };
261
262            let process_handle = unsafe {
263                OpenProcess(
264                    PROCESS_SET_QUOTA | PROCESS_TERMINATE | PROCESS_SET_INFORMATION,
265                    false,
266                    child_id,
267                )
268            }
269            .map_err(|e| {
270                ProcessGroupError::AssignmentFailed(format!("Failed to open process handle: {}", e))
271            })?;
272
273            let result = if let Some(SendHandle(job_handle)) = &self.inner.job_handle {
274                unsafe { AssignProcessToJobObject(*job_handle, process_handle) }
275            } else {
276                unsafe {
277                    let _ = CloseHandle(process_handle);
278                }
279                return Err(ProcessGroupError::AssignmentFailed(
280                    "No Job Object handle available".to_string(),
281                ));
282            };
283
284            unsafe {
285                let _ = CloseHandle(process_handle);
286            }
287
288            result.map_err(|e| {
289                ProcessGroupError::AssignmentFailed(format!(
290                    "Failed to assign process to Job Object: {}",
291                    e
292                ))
293            })?;
294            Ok(())
295        }
296        #[cfg(not(any(unix, windows)))]
297        {
298            let _ = child_id;
299            Err(ProcessGroupError::UnsupportedPlatform(
300                "Process group assignment not available on this platform".to_string(),
301            ))
302        }
303    }
304
305    /// Resumes a suspended process (Windows only).
306    ///
307    /// This method should be called after `assign_child()` when using processes
308    /// spawned with CREATE_SUSPENDED to complete the race-condition-safe setup.
309    ///
310    /// # Arguments
311    ///
312    /// * `child_id` - The process ID of the child to resume
313    ///
314    /// # Returns
315    ///
316    /// * `Ok(())` - If the process was resumed successfully
317    /// * `Err(ProcessGroupError)` - If resuming fails or the platform is unsupported
318    ///
319    /// # Example
320    ///
321    /// ```rust,no_run
322    /// use tcrm_task::tasks::process::group::builder::ProcessGroup;
323    /// use tokio::process::Command;
324    ///
325    /// let mut group = ProcessGroup::new();
326    /// let mut cmd = group.create_with_command(Command::new("echo")).unwrap();
327    /// let child = cmd.spawn().unwrap();
328    /// let pid = child.id().expect("Failed to get process ID");
329    /// group.assign_child(pid).unwrap();
330    /// group.resume_process(pid).unwrap(); // Windows only
331    /// ```
332    #[cfg(windows)]
333    pub fn resume_process(&self, child_id: u32) -> Result<(), ProcessGroupError> {
334        use windows::Win32::Foundation::CloseHandle;
335        use windows::Win32::System::Diagnostics::ToolHelp::{
336            CreateToolhelp32Snapshot, TH32CS_SNAPTHREAD, THREADENTRY32, Thread32First, Thread32Next,
337        };
338        use windows::Win32::System::Threading::{OpenThread, ResumeThread, THREAD_SUSPEND_RESUME};
339
340        unsafe {
341            // Take a snapshot of all threads in the system
342            let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0).map_err(|e| {
343                ProcessGroupError::SignalFailed(format!("Failed to create thread snapshot: {}", e))
344            })?;
345
346            let mut thread_entry = THREADENTRY32 {
347                dwSize: std::mem::size_of::<THREADENTRY32>() as u32,
348                ..Default::default()
349            };
350
351            let mut resumed_count = 0;
352
353            // Iterate through all threads and resume those belonging to the process
354            if Thread32First(snapshot, &mut thread_entry).is_ok() {
355                loop {
356                    if thread_entry.th32OwnerProcessID == child_id {
357                        let thread_handle =
358                            OpenThread(THREAD_SUSPEND_RESUME, false, thread_entry.th32ThreadID);
359                        if let Ok(handle) = thread_handle {
360                            ResumeThread(handle);
361                            let _ = CloseHandle(handle);
362                            resumed_count += 1;
363                        }
364                    }
365
366                    if Thread32Next(snapshot, &mut thread_entry).is_err() {
367                        break;
368                    }
369                }
370            }
371
372            let _ = CloseHandle(snapshot);
373
374            if resumed_count == 0 {
375                Err(ProcessGroupError::SignalFailed(format!(
376                    "No threads found to resume for process with PID {}",
377                    child_id
378                )))
379            } else {
380                Ok(())
381            }
382        }
383    }
384
385    /// No-op on non-Windows platforms.
386    #[cfg(not(windows))]
387    pub fn resume_process(&self, _child_id: u32) -> Result<(), ProcessGroupError> {
388        Ok(()) // No-op on Unix systems
389    }
390}
391
392impl Drop for ProcessGroupInner {
393    fn drop(&mut self) {
394        #[cfg(windows)]
395        {
396            if let Some(SendHandle(job_handle)) = self.job_handle.take() {
397                unsafe {
398                    let _ = windows::Win32::Foundation::CloseHandle(job_handle);
399                }
400            }
401        }
402    }
403}