Skip to main content

ralph_workflow/executor/
executor_trait.rs

1//! `ProcessExecutor` trait definition.
2//!
3//! This module defines the trait abstraction for process execution,
4//! enabling dependency injection for testing.
5
6use super::{AgentChildHandle, AgentSpawnConfig, ChildProcessInfo, ProcessOutput, RealAgentChild};
7use std::io;
8use std::path::Path;
9
10#[cfg(target_os = "macos")]
11fn child_pid_entry_count(bytes_written: i32) -> Option<usize> {
12    let bytes = usize::try_from(bytes_written).ok()?;
13    let pid_width = std::mem::size_of::<libc::pid_t>();
14    Some(bytes / pid_width)
15}
16
17#[cfg(target_os = "macos")]
18fn child_info_from_libproc(parent_pid: u32) -> Option<ChildProcessInfo> {
19    use std::collections::{HashSet, VecDeque};
20    use std::ffi::c_void;
21
22    const PROC_PIDT_SHORTBSDINFO: libc::c_int = 13;
23    const PROC_PIDTASKINFO: libc::c_int = 4;
24    const MAXCOMLEN: usize = 16;
25    const SIDL: u32 = 1;
26    const SRUN: u32 = 2;
27    const SSTOP: u32 = 4;
28    const SZOMB: u32 = 5;
29    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
30    const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
31
32    #[repr(C)]
33    struct ProcBsdShortInfo {
34        pid: u32,
35        parent_pid: u32,
36        process_group_id: u32,
37        status: u32,
38        command: [libc::c_char; MAXCOMLEN],
39        flags: u32,
40        uid: libc::uid_t,
41        gid: libc::gid_t,
42        real_uid: libc::uid_t,
43        real_gid: libc::gid_t,
44        saved_uid: libc::uid_t,
45        saved_gid: libc::gid_t,
46        reserved: u32,
47    }
48
49    #[repr(C)]
50    struct ProcTaskInfo {
51        virtual_size: u64,
52        resident_size: u64,
53        total_user_time: u64,
54        total_system_time: u64,
55        threads_user_time: u64,
56        threads_system_time: u64,
57        policy: i32,
58        faults: i32,
59        pageins: i32,
60        cow_faults: i32,
61        messages_sent: i32,
62        messages_received: i32,
63        mach_syscalls: i32,
64        unix_syscalls: i32,
65        context_switches: i32,
66        thread_count: i32,
67        running_thread_count: i32,
68        priority: i32,
69    }
70
71    #[link(name = "proc")]
72    unsafe extern "C" {
73        fn proc_listchildpids(pid: libc::pid_t, buffer: *mut c_void, buffersize: i32) -> i32;
74        fn proc_pidinfo(
75            pid: libc::pid_t,
76            flavor: libc::c_int,
77            arg: u64,
78            buffer: *mut c_void,
79            buffersize: libc::c_int,
80        ) -> libc::c_int;
81    }
82
83    fn descendant_pid_signature(descendants: &[u32]) -> u64 {
84        let mut signature = FNV_OFFSET;
85        for pid in descendants {
86            for byte in pid.to_le_bytes() {
87                signature ^= u64::from(byte);
88                signature = signature.wrapping_mul(FNV_PRIME);
89            }
90        }
91        signature
92    }
93
94    const fn qualifies_libproc_status(status: u32) -> bool {
95        !matches!(status, SIDL | SSTOP | SZOMB)
96    }
97
98    const fn libproc_state_indicates_current_activity(
99        status: u32,
100        cpu_time_ms: u64,
101        num_running_threads: i32,
102    ) -> bool {
103        status == SRUN && cpu_time_ms > 0 && num_running_threads > 0
104    }
105
106    fn list_child_pids(parent_pid: u32) -> Option<Vec<u32>> {
107        let pid = libc::pid_t::try_from(parent_pid).ok()?;
108        let mut capacity: usize = 32;
109
110        loop {
111            let byte_len = capacity.checked_mul(std::mem::size_of::<libc::pid_t>())?;
112            let buffer_size = i32::try_from(byte_len).ok()?;
113            let mut buffer = vec![libc::pid_t::default(); capacity];
114
115            // Safety: `buffer` is valid for `buffer_size` bytes, and the kernel
116            // writes at most that many bytes of child pid entries.
117            let bytes_written = unsafe {
118                proc_listchildpids(pid, buffer.as_mut_ptr().cast::<c_void>(), buffer_size)
119            };
120            if bytes_written < 0 {
121                return None;
122            }
123            if bytes_written == 0 {
124                return Some(Vec::new());
125            }
126
127            let count = child_pid_entry_count(bytes_written)?;
128            if count < capacity {
129                buffer.truncate(count);
130                let child_pids = buffer
131                    .into_iter()
132                    .filter_map(|child_pid| u32::try_from(child_pid).ok())
133                    .collect();
134                return Some(child_pids);
135            }
136
137            capacity = capacity.checked_mul(2)?;
138        }
139    }
140
141    fn fetch_bsd_short_info(pid: u32) -> Option<ProcBsdShortInfo> {
142        let mut info = ProcBsdShortInfo {
143            pid: 0,
144            parent_pid: 0,
145            process_group_id: 0,
146            status: 0,
147            command: [0; MAXCOMLEN],
148            flags: 0,
149            uid: 0,
150            gid: 0,
151            real_uid: 0,
152            real_gid: 0,
153            saved_uid: 0,
154            saved_gid: 0,
155            reserved: 0,
156        };
157        let pid = libc::pid_t::try_from(pid).ok()?;
158        let expected = i32::try_from(std::mem::size_of::<ProcBsdShortInfo>()).ok()?;
159        // Safety: `info` points to writable memory sized exactly for
160        // `ProcBsdShortInfo`, which matches the C ABI layout.
161        let bytes = unsafe {
162            proc_pidinfo(
163                pid,
164                PROC_PIDT_SHORTBSDINFO,
165                0,
166                (&raw mut info).cast::<c_void>(),
167                expected,
168            )
169        };
170        (bytes == expected).then_some(info)
171    }
172
173    fn fetch_task_info(pid: u32) -> Option<ProcTaskInfo> {
174        let mut info = ProcTaskInfo {
175            virtual_size: 0,
176            resident_size: 0,
177            total_user_time: 0,
178            total_system_time: 0,
179            threads_user_time: 0,
180            threads_system_time: 0,
181            policy: 0,
182            faults: 0,
183            pageins: 0,
184            cow_faults: 0,
185            messages_sent: 0,
186            messages_received: 0,
187            mach_syscalls: 0,
188            unix_syscalls: 0,
189            context_switches: 0,
190            thread_count: 0,
191            running_thread_count: 0,
192            priority: 0,
193        };
194        let pid = libc::pid_t::try_from(pid).ok()?;
195        let expected = i32::try_from(std::mem::size_of::<ProcTaskInfo>()).ok()?;
196        // Safety: `info` points to writable memory sized exactly for
197        // `ProcTaskInfo`, which matches the C ABI layout.
198        let bytes = unsafe {
199            proc_pidinfo(
200                pid,
201                PROC_PIDTASKINFO,
202                0,
203                (&raw mut info).cast::<c_void>(),
204                expected,
205            )
206        };
207        (bytes == expected).then_some(info)
208    }
209
210    let mut descendants = Vec::new();
211    let mut visited = HashSet::new();
212    let mut queue = VecDeque::new();
213    queue.push_back(parent_pid);
214
215    while let Some(current_pid) = queue.pop_front() {
216        let child_pids = list_child_pids(current_pid)?;
217        for child_pid in child_pids {
218            if visited.insert(child_pid) {
219                descendants.push(child_pid);
220                queue.push_back(child_pid);
221            }
222        }
223    }
224
225    if descendants.is_empty() {
226        return None;
227    }
228
229    descendants.sort_unstable();
230
231    let mut child_count: u32 = 0;
232    let mut active_child_count: u32 = 0;
233    let mut total_cpu_ms: u64 = 0;
234    let mut qualifying_descendants = Vec::new();
235
236    for descendant_pid in descendants {
237        let Some(bsd_info) = fetch_bsd_short_info(descendant_pid) else {
238            continue;
239        };
240        if bsd_info.process_group_id != parent_pid || !qualifies_libproc_status(bsd_info.status) {
241            continue;
242        }
243
244        let task_info = fetch_task_info(descendant_pid);
245        let cpu_time_ms = task_info.as_ref().map_or(0, |info| {
246            (info.total_user_time + info.total_system_time) / 1_000_000
247        });
248        let num_running_threads = task_info
249            .as_ref()
250            .map_or(0, |info| info.running_thread_count);
251
252        child_count += 1;
253        total_cpu_ms += cpu_time_ms;
254        let counts_as_current_activity = libproc_state_indicates_current_activity(
255            bsd_info.status,
256            cpu_time_ms,
257            num_running_threads,
258        );
259
260        if counts_as_current_activity {
261            active_child_count += 1;
262        }
263        qualifying_descendants.push(descendant_pid);
264    }
265
266    if child_count == 0 {
267        return Some(ChildProcessInfo::NONE);
268    }
269
270    Some(ChildProcessInfo {
271        child_count,
272        active_child_count,
273        cpu_time_ms: total_cpu_ms,
274        descendant_pid_signature: descendant_pid_signature(&qualifying_descendants),
275    })
276}
277
278/// Trait for executing external processes.
279///
280/// This trait abstracts process execution to allow dependency injection.
281/// Production code uses `RealProcessExecutor` which calls actual commands.
282/// Test code can use `MockProcessExecutor` to control process behavior.
283///
284/// Only external process execution is abstracted. Internal code logic is never mocked.
285pub trait ProcessExecutor: Send + Sync + std::fmt::Debug {
286    /// Execute a command with given arguments and return its output.
287    ///
288    /// # Arguments
289    ///
290    /// * `command` - The program to execute
291    /// * `args` - Command-line arguments to pass to the program
292    /// * `env` - Environment variables to set for the process (optional)
293    /// * `workdir` - Working directory for the process (optional)
294    ///
295    /// # Returns
296    ///
297    /// Returns a `ProcessOutput` containing exit status, stdout, and stderr.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if command cannot be spawned or if output capture fails.
302    fn execute(
303        &self,
304        command: &str,
305        args: &[&str],
306        env: &[(String, String)],
307        workdir: Option<&Path>,
308    ) -> io::Result<ProcessOutput>;
309
310    /// Spawn a process with stdin input and return the child handle.
311    ///
312    /// This method is used when you need to write to the process's stdin
313    /// or stream its output in real-time. Unlike `execute()`, this returns
314    /// a `Child` handle for direct interaction.
315    ///
316    /// # Arguments
317    ///
318    /// * `command` - The program to execute
319    /// * `args` - Command-line arguments to pass to the program
320    /// * `env` - Environment variables to set for the process (optional)
321    /// * `workdir` - Working directory for the process (optional)
322    ///
323    /// # Returns
324    ///
325    /// Returns a `Child` handle that can be used to interact with the process.
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if command cannot be spawned.
330    fn spawn(
331        &self,
332        command: &str,
333        args: &[&str],
334        env: &[(String, String)],
335        workdir: Option<&Path>,
336    ) -> io::Result<std::process::Child> {
337        let mut cmd = std::process::Command::new(command);
338        cmd.args(args);
339
340        for (key, value) in env {
341            cmd.env(key, value);
342        }
343
344        if let Some(dir) = workdir {
345            cmd.current_dir(dir);
346        }
347
348        cmd.stdin(std::process::Stdio::piped())
349            .stdout(std::process::Stdio::piped())
350            .stderr(std::process::Stdio::piped())
351            .spawn()
352    }
353
354    /// Spawn an agent process with streaming output support.
355    ///
356    /// This method is specifically designed for spawning AI agent subprocesses
357    /// that need to output streaming JSON in real-time. Unlike `spawn()`, this
358    /// returns a handle with boxed stdout for trait object compatibility.
359    ///
360    /// # Arguments
361    ///
362    /// * `config` - Agent spawn configuration including command, args, env, prompt, etc.
363    ///
364    /// # Returns
365    ///
366    /// Returns an `AgentChildHandle` with stdout, stderr, and the child process.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if the agent cannot be spawned.
371    ///
372    /// # Default Implementation
373    ///
374    /// The default implementation uses the `spawn()` method with additional
375    /// configuration for agent-specific needs. Mock implementations should
376    /// override this to return mock results without spawning real processes.
377    fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
378        let mut cmd = std::process::Command::new(&config.command);
379        cmd.args(&config.args);
380
381        // Set environment variables
382        for (key, value) in &config.env {
383            cmd.env(key, value);
384        }
385
386        // Add the prompt as the final argument
387        cmd.arg(&config.prompt);
388
389        // Set buffering variables for real-time streaming
390        cmd.env("PYTHONUNBUFFERED", "1");
391        cmd.env("NODE_ENV", "production");
392
393        // Spawn the process with piped stdout/stderr
394        let mut child = cmd
395            .stdin(std::process::Stdio::null())
396            .stdout(std::process::Stdio::piped())
397            .stderr(std::process::Stdio::piped())
398            .spawn()?;
399
400        let stdout = child
401            .stdout
402            .take()
403            .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
404        let stderr = child
405            .stderr
406            .take()
407            .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
408
409        Ok(AgentChildHandle {
410            stdout: Box::new(stdout),
411            stderr: Box::new(stderr),
412            inner: Box::new(RealAgentChild(child)),
413        })
414    }
415
416    /// Check if a command exists and can be executed.
417    ///
418    /// This is a convenience method that executes a command with a
419    /// `--version` or similar flag to check if it's available.
420    ///
421    /// # Arguments
422    ///
423    /// * `command` - The program to check
424    ///
425    /// # Returns
426    ///
427    /// Returns `true` if command exists, `false` otherwise.
428    fn command_exists(&self, command: &str) -> bool {
429        match self.execute(command, &[], &[], None) {
430            Ok(output) => output.status.success(),
431            Err(_) => false,
432        }
433    }
434
435    /// Returns information about child processes of the given parent, including
436    /// their cumulative CPU time.
437    ///
438    /// Used by the idle-timeout monitor to determine whether child processes
439    /// are actively working (CPU time advancing between consecutive checks)
440    /// versus merely existing (stalled, zombie, or idle daemon).
441    ///
442    /// Default implementation: parses `ps` output for PID, PPID, and cputime
443    /// columns on Unix platforms. Returns `ChildProcessInfo::NONE` on non-Unix.
444    ///
445    /// Any execution error is treated as "no children" to avoid blocking the
446    /// timeout system. If `ps` is unavailable or fails unexpectedly, a one-time
447    /// warning is emitted to stderr so operators can diagnose reduced protection
448    /// against false-positive idle kills.
449    fn get_child_process_info(&self, parent_pid: u32) -> ChildProcessInfo {
450        #[cfg(unix)]
451        {
452            use std::sync::OnceLock;
453
454            #[derive(Clone, Copy)]
455            struct ProcessSnapshotEntry {
456                pid: u32,
457                parent_pid: u32,
458                cpu_time_ms: u64,
459                in_scope: bool,
460                currently_active: bool,
461            }
462
463            /// Parse "DD-HH:MM:SS", "HH:MM:SS", "MM:SS", or "MM:SS.ss" cputime format to milliseconds.
464            fn parse_cputime_ms(s: &str) -> Option<u64> {
465                let parts: Vec<&str> = s.split(':').collect();
466                match parts.len() {
467                    3 => {
468                        // DD-HH:MM:SS or HH:MM:SS (or HH:MM:SS.ss)
469                        let hours = if let Some((days, hours)) = parts[0].split_once('-') {
470                            let days: u64 = days.parse().ok()?;
471                            let hours: u64 = hours.parse().ok()?;
472                            days.checked_mul(24)?.checked_add(hours)?
473                        } else {
474                            parts[0].parse().ok()?
475                        };
476                        let minutes: u64 = parts[1].parse().ok()?;
477                        let seconds_str = parts[2];
478                        let (secs, frac_ms) = if let Some((s, f)) = seconds_str.split_once('.') {
479                            let secs: u64 = s.parse().ok()?;
480                            let frac: u64 = f.get(..2).unwrap_or(f).parse().ok()?;
481                            (secs, frac * 10)
482                        } else {
483                            (seconds_str.parse().ok()?, 0)
484                        };
485                        Some((hours * 3600 + minutes * 60 + secs) * 1000 + frac_ms)
486                    }
487                    2 => {
488                        // MM:SS or M:SS (or MM:SS.ss)
489                        let minutes: u64 = parts[0].parse().ok()?;
490                        let seconds_str = parts[1];
491                        let (secs, frac_ms) = if let Some((s, f)) = seconds_str.split_once('.') {
492                            let secs: u64 = s.parse().ok()?;
493                            let frac: u64 = f.get(..2).unwrap_or(f).parse().ok()?;
494                            (secs, frac * 10)
495                        } else {
496                            (seconds_str.parse().ok()?, 0)
497                        };
498                        Some((minutes * 60 + secs) * 1000 + frac_ms)
499                    }
500                    _ => None,
501                }
502            }
503
504            fn qualifies_process_state(state: &str) -> bool {
505                match state.chars().next() {
506                    Some('Z' | 'X' | 'T' | 'I') | None => false,
507                    Some(_) => true,
508                }
509            }
510
511            fn state_indicates_current_activity(state: &str, cpu_time_ms: u64) -> bool {
512                match state.chars().next() {
513                    Some('D' | 'U') => true,
514                    Some('R') => cpu_time_ms > 0,
515                    _ => false,
516                }
517            }
518
519            fn descendant_pid_signature(descendants: &[u32]) -> u64 {
520                const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
521                const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
522
523                let mut signature = FNV_OFFSET;
524                for pid in descendants {
525                    for byte in pid.to_le_bytes() {
526                        signature ^= u64::from(byte);
527                        signature = signature.wrapping_mul(FNV_PRIME);
528                    }
529                }
530                signature
531            }
532
533            fn child_info_from_descendant_pids(descendants: &[u32]) -> ChildProcessInfo {
534                if descendants.is_empty() {
535                    return ChildProcessInfo::NONE;
536                }
537
538                let child_count = u32::try_from(descendants.len()).unwrap_or(u32::MAX);
539                ChildProcessInfo {
540                    child_count,
541                    active_child_count: 0,
542                    cpu_time_ms: 0,
543                    descendant_pid_signature: descendant_pid_signature(descendants),
544                }
545            }
546
547            fn parse_ps_output(stdout: &str, parent_pid: u32) -> Option<ChildProcessInfo> {
548                use std::collections::{HashMap, HashSet, VecDeque};
549
550                // First pass: parse all descendant snapshot entries. Prefer richer
551                // 5-column output (pid ppid pgid stat cputime) when available and
552                // fall back to the legacy 3-column format (pid ppid cputime).
553                let mut children_of: HashMap<u32, Vec<ProcessSnapshotEntry>> = HashMap::new();
554                let mut saw_parseable = false;
555
556                for line in stdout.lines() {
557                    let parts: Vec<&str> = line.split_whitespace().collect();
558                    if parts.len() < 3 {
559                        continue;
560                    }
561
562                    let Ok(entry_pid) = parts[0].parse::<u32>() else {
563                        continue;
564                    };
565                    let Ok(parent_of_entry) = parts[1].parse::<u32>() else {
566                        continue;
567                    };
568                    saw_parseable = true;
569
570                    let (in_scope, currently_active, cputime_text) = if parts.len() >= 5 {
571                        let pgid_matches_parent = parts[2]
572                            .parse::<u32>()
573                            .ok()
574                            .is_some_and(|pgid| pgid == parent_pid);
575                        let state_qualifies = qualifies_process_state(parts[3]);
576                        let cpu_ms = parse_cputime_ms(parts[4]).unwrap_or(0);
577                        (
578                            pgid_matches_parent && state_qualifies,
579                            state_indicates_current_activity(parts[3], cpu_ms),
580                            parts[4],
581                        )
582                    } else {
583                        (true, false, parts[2])
584                    };
585
586                    let cpu_ms = parse_cputime_ms(cputime_text).unwrap_or(0);
587                    children_of
588                        .entry(parent_of_entry)
589                        .or_default()
590                        .push(ProcessSnapshotEntry {
591                            pid: entry_pid,
592                            parent_pid: parent_of_entry,
593                            cpu_time_ms: cpu_ms,
594                            in_scope,
595                            currently_active,
596                        });
597                }
598
599                if !saw_parseable {
600                    return None;
601                }
602
603                // BFS from parent_pid to find all descendants.
604                let mut child_count: u32 = 0;
605                let mut active_child_count: u32 = 0;
606                let mut total_cpu_ms: u64 = 0;
607                let mut descendant_pids = Vec::new();
608                let mut visited = HashSet::new();
609                let mut queue = VecDeque::new();
610                queue.push_back(parent_pid);
611
612                while let Some(current) = queue.pop_front() {
613                    if let Some(kids) = children_of.get(&current) {
614                        for child in kids {
615                            if !child.in_scope || !visited.insert(child.pid) {
616                                continue;
617                            }
618
619                            debug_assert_eq!(child.parent_pid, current);
620                            child_count += 1;
621                            if child.currently_active {
622                                active_child_count += 1;
623                            }
624                            total_cpu_ms += child.cpu_time_ms;
625                            descendant_pids.push(child.pid);
626                            queue.push_back(child.pid);
627                        }
628                    }
629                }
630
631                descendant_pids.sort_unstable();
632
633                if child_count == 0 {
634                    return Some(ChildProcessInfo::NONE);
635                }
636
637                Some(ChildProcessInfo {
638                    child_count,
639                    active_child_count,
640                    cpu_time_ms: total_cpu_ms,
641                    descendant_pid_signature: descendant_pid_signature(&descendant_pids),
642                })
643            }
644
645            fn parse_pgrep_output(stdout: &str) -> Option<Vec<u32>> {
646                let mut child_pids = Vec::new();
647                for line in stdout.lines() {
648                    let pid = line.trim();
649                    if pid.is_empty() {
650                        continue;
651                    }
652                    child_pids.push(pid.parse::<u32>().ok()?);
653                }
654                Some(child_pids)
655            }
656
657            fn warn_child_process_detection_degraded() {
658                static WARNED: OnceLock<()> = OnceLock::new();
659                if WARNED.set(()).is_ok() {
660                    eprintln!(
661                        "Warning: child-process detection degraded (ps unavailable or failing); \
662                         idle-timeout false-positive prevention may be reduced"
663                    );
664                }
665            }
666
667            fn warn_child_process_detection_conservative() {
668                static WARNED: OnceLock<()> = OnceLock::new();
669                if WARNED.set(()).is_ok() {
670                    eprintln!(
671                        "Warning: child-process detection is running in conservative fallback mode \
672                         (descendant PIDs found without state/CPU evidence); idle timeout will not \
673                         be suppressed by those descendants"
674                    );
675                }
676            }
677
678            let discover_descendants_with_pgrep = |parent_pid: u32| -> Option<Vec<u32>> {
679                use std::collections::{HashSet, VecDeque};
680
681                let mut descendants = Vec::new();
682                let mut visited = HashSet::new();
683                let mut queue = VecDeque::new();
684                queue.push_back(parent_pid);
685
686                while let Some(current_pid) = queue.pop_front() {
687                    let output = self
688                        .execute("pgrep", &["-P", &current_pid.to_string()], &[], None)
689                        .ok()?;
690
691                    let child_pids = if output.status.success() {
692                        parse_pgrep_output(&output.stdout)?
693                    } else if output.status.code() == Some(1) {
694                        Vec::new()
695                    } else {
696                        return None;
697                    };
698
699                    for child_pid in child_pids {
700                        if visited.insert(child_pid) {
701                            descendants.push(child_pid);
702                            queue.push_back(child_pid);
703                        }
704                    }
705                }
706
707                descendants.sort_unstable();
708                Some(descendants)
709            };
710
711            // Try richer ps invocations first so detached/stopped descendants can
712            // be filtered out, then fall back to the legacy shape for compatibility.
713            let ps_attempts: [&[&str]; 6] = [
714                &[
715                    "-ax", "-o", "pid=", "-o", "ppid=", "-o", "pgid=", "-o", "stat=", "-o",
716                    "cputime=", "-o", "comm=",
717                ],
718                &[
719                    "-e", "-o", "pid=", "-o", "ppid=", "-o", "pgid=", "-o", "stat=", "-o",
720                    "cputime=", "-o", "comm=",
721                ],
722                &[
723                    "-ax", "-o", "pid=", "-o", "ppid=", "-o", "pgid=", "-o", "stat=", "-o",
724                    "cputime=",
725                ],
726                &[
727                    "-e", "-o", "pid=", "-o", "ppid=", "-o", "pgid=", "-o", "stat=", "-o",
728                    "cputime=",
729                ],
730                &["-ax", "-o", "pid=", "-o", "ppid=", "-o", "cputime="],
731                &["-e", "-o", "pid=", "-o", "ppid=", "-o", "cputime="],
732            ];
733
734            for args in ps_attempts {
735                if let Ok(out) = self.execute("ps", args, &[], None) {
736                    if out.status.success() {
737                        if let Some(info) = parse_ps_output(&out.stdout, parent_pid) {
738                            return info;
739                        }
740                    }
741                }
742            }
743
744            #[cfg(target_os = "macos")]
745            if let Some(info) = child_info_from_libproc(parent_pid) {
746                return info;
747            }
748
749            if let Some(descendants) = discover_descendants_with_pgrep(parent_pid) {
750                if !descendants.is_empty() {
751                    warn_child_process_detection_conservative();
752                }
753                return child_info_from_descendant_pids(&descendants);
754            }
755
756            // Degraded: emit one-time warning, return no-children (conservative).
757            warn_child_process_detection_degraded();
758            ChildProcessInfo::NONE
759        }
760        #[cfg(not(unix))]
761        {
762            let _ = parent_pid;
763            ChildProcessInfo::NONE
764        }
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771    use std::collections::HashMap;
772
773    #[cfg(unix)]
774    fn ok_output(stdout: &str) -> ProcessOutput {
775        use std::os::unix::process::ExitStatusExt;
776
777        ProcessOutput {
778            status: std::process::ExitStatus::from_raw(0),
779            stdout: stdout.to_string(),
780            stderr: String::new(),
781        }
782    }
783
784    #[cfg(unix)]
785    type ResultMap = HashMap<(String, Vec<String>), ProcessOutput>;
786
787    #[cfg(unix)]
788    #[derive(Debug)]
789    struct TestExecutor {
790        results: ResultMap,
791    }
792
793    #[cfg(unix)]
794    impl TestExecutor {
795        fn new(results: ResultMap) -> Self {
796            Self { results }
797        }
798    }
799
800    #[cfg(unix)]
801    impl ProcessExecutor for TestExecutor {
802        fn execute(
803            &self,
804            command: &str,
805            args: &[&str],
806            _env: &[(String, String)],
807            _workdir: Option<&std::path::Path>,
808        ) -> std::io::Result<ProcessOutput> {
809            self.results
810                .get(&(
811                    command.to_string(),
812                    args.iter().map(ToString::to_string).collect(),
813                ))
814                .cloned()
815                .ok_or_else(|| std::io::Error::other("unexpected execute"))
816        }
817    }
818
819    #[cfg(unix)]
820    fn ps_key() -> (String, Vec<String>) {
821        (
822            "ps".to_string(),
823            vec![
824                "-ax".to_string(),
825                "-o".to_string(),
826                "pid=".to_string(),
827                "-o".to_string(),
828                "ppid=".to_string(),
829                "-o".to_string(),
830                "cputime=".to_string(),
831            ],
832        )
833    }
834
835    #[cfg(unix)]
836    fn ps_key_with_state_and_group() -> (String, Vec<String>) {
837        (
838            "ps".to_string(),
839            vec![
840                "-ax".to_string(),
841                "-o".to_string(),
842                "pid=".to_string(),
843                "-o".to_string(),
844                "ppid=".to_string(),
845                "-o".to_string(),
846                "pgid=".to_string(),
847                "-o".to_string(),
848                "stat=".to_string(),
849                "-o".to_string(),
850                "cputime=".to_string(),
851            ],
852        )
853    }
854
855    #[cfg(unix)]
856    fn ps_key_with_state_group_and_command() -> (String, Vec<String>) {
857        (
858            "ps".to_string(),
859            vec![
860                "-ax".to_string(),
861                "-o".to_string(),
862                "pid=".to_string(),
863                "-o".to_string(),
864                "ppid=".to_string(),
865                "-o".to_string(),
866                "pgid=".to_string(),
867                "-o".to_string(),
868                "stat=".to_string(),
869                "-o".to_string(),
870                "cputime=".to_string(),
871                "-o".to_string(),
872                "comm=".to_string(),
873            ],
874        )
875    }
876
877    #[cfg(unix)]
878    fn pgrep_key(parent_pid: u32) -> (String, Vec<String>) {
879        (
880            "pgrep".to_string(),
881            vec!["-P".to_string(), parent_pid.to_string()],
882        )
883    }
884
885    #[test]
886    #[cfg(unix)]
887    fn get_child_process_info_legacy_ps_output_is_conservative_about_current_activity() {
888        let pid = 4242;
889
890        let mut results: ResultMap = HashMap::new();
891        results.insert(
892            ps_key(),
893            ok_output("12345 4242 0:01.50\n12346 4242 0:03.00\n99999 1 0:10.00\n"),
894        );
895
896        let exec = TestExecutor::new(results);
897        let info = exec.get_child_process_info(pid);
898        assert_eq!(info.child_count, 2, "should find 2 children of pid 4242");
899        assert_eq!(
900            info.active_child_count, 0,
901            "legacy ps output without state or process-group columns must not report current activity"
902        );
903        assert_eq!(
904            info.cpu_time_ms,
905            1500 + 3000,
906            "should sum CPU times of both children"
907        );
908        assert!(info.has_children());
909    }
910
911    #[test]
912    #[cfg(unix)]
913    fn get_child_process_info_no_children_returns_zero() {
914        let pid = 4242;
915
916        let mut results: ResultMap = HashMap::new();
917        results.insert(ps_key(), ok_output("99999 1 0:10.00\n"));
918
919        let exec = TestExecutor::new(results);
920        let info = exec.get_child_process_info(pid);
921        assert_eq!(info.child_count, 0);
922        assert_eq!(info.active_child_count, 0);
923        assert_eq!(info.cpu_time_ms, 0);
924        assert!(!info.has_children());
925    }
926
927    #[test]
928    #[cfg(unix)]
929    fn parse_cputime_formats() {
930        let pid = 100;
931
932        let mut results: ResultMap = HashMap::new();
933        results.insert(ps_key(), ok_output("200 100 01:02:03\n"));
934
935        let exec = TestExecutor::new(results);
936        let info = exec.get_child_process_info(pid);
937        assert_eq!(
938            info.cpu_time_ms,
939            (3600 + 2 * 60 + 3) * 1000,
940            "HH:MM:SS should parse to correct ms"
941        );
942    }
943
944    #[test]
945    #[cfg(unix)]
946    fn parse_cputime_with_day_prefix() {
947        let pid = 100;
948
949        let mut results: ResultMap = HashMap::new();
950        results.insert(ps_key(), ok_output("200 100 1-02:03:04\n"));
951
952        let exec = TestExecutor::new(results);
953        let info = exec.get_child_process_info(pid);
954        assert_eq!(
955            info.cpu_time_ms,
956            ((24 + 2) * 3600 + 3 * 60 + 4) * 1000,
957            "DD-HH:MM:SS should parse to correct ms"
958        );
959    }
960
961    #[test]
962    #[cfg(unix)]
963    fn get_child_process_info_includes_grandchildren() {
964        let parent = 100;
965        // PID 200 is child of 100, PID 300 is child of 200 (grandchild of 100).
966        let ps_output = "200 100 0:01.00\n300 200 0:02.00\n999 1 0:05.00\n";
967
968        let mut results: ResultMap = HashMap::new();
969        results.insert(ps_key(), ok_output(ps_output));
970
971        let exec = TestExecutor::new(results);
972        let info = exec.get_child_process_info(parent);
973        assert_eq!(
974            info.child_count, 2,
975            "should count both child and grandchild"
976        );
977        assert_eq!(
978            info.cpu_time_ms,
979            1000 + 2000,
980            "should sum CPU of child and grandchild"
981        );
982    }
983
984    #[test]
985    #[cfg(unix)]
986    fn get_child_process_info_excludes_unrelated_processes() {
987        let parent = 100;
988        // PID 200 is child of 100. PID 300's ppid chain goes to 1, NOT to 100.
989        let ps_output = "200 100 0:01.00\n300 400 0:02.00\n400 1 0:03.00\n";
990
991        let mut results: ResultMap = HashMap::new();
992        results.insert(ps_key(), ok_output(ps_output));
993
994        let exec = TestExecutor::new(results);
995        let info = exec.get_child_process_info(parent);
996        assert_eq!(info.child_count, 1, "should only count PID 200");
997        assert_eq!(
998            info.active_child_count, 0,
999            "legacy ps output without state columns must remain conservative even for related descendants"
1000        );
1001        assert_eq!(info.cpu_time_ms, 1000, "should only sum CPU of PID 200");
1002    }
1003
1004    #[test]
1005    #[cfg(unix)]
1006    fn get_child_process_info_deep_tree() {
1007        let parent = 100;
1008        // 3+ levels of nesting: 100 → 200 → 300 → 400
1009        let ps_output = "200 100 0:01.00\n300 200 0:02.00\n400 300 0:03.00\n";
1010
1011        let mut results: ResultMap = HashMap::new();
1012        results.insert(ps_key(), ok_output(ps_output));
1013
1014        let exec = TestExecutor::new(results);
1015        let info = exec.get_child_process_info(parent);
1016        assert_eq!(
1017            info.child_count, 3,
1018            "should count all 3 levels of descendants"
1019        );
1020        assert_eq!(
1021            info.cpu_time_ms,
1022            1000 + 2000 + 3000,
1023            "should sum CPU across all descendants"
1024        );
1025    }
1026
1027    #[test]
1028    #[cfg(unix)]
1029    fn get_child_process_info_pgrep_fallback_does_not_report_active_children() {
1030        let parent = 100;
1031
1032        let mut results: ResultMap = HashMap::new();
1033        results.insert(pgrep_key(100), ok_output("200\n300\n"));
1034        results.insert(pgrep_key(200), ok_output("400\n"));
1035        results.insert(pgrep_key(300), ok_output(""));
1036        results.insert(pgrep_key(400), ok_output(""));
1037
1038        let exec = TestExecutor::new(results);
1039        let info = exec.get_child_process_info(parent);
1040
1041        assert_eq!(info.child_count, 3);
1042        assert_eq!(
1043            info.active_child_count, 0,
1044            "fallback without process state or cpu evidence must not report active children"
1045        );
1046        assert_eq!(info.cpu_time_ms, 0);
1047        assert_ne!(
1048            info.descendant_pid_signature, 0,
1049            "observable descendants should retain a stable signature even in fallback mode"
1050        );
1051    }
1052
1053    #[test]
1054    #[cfg(unix)]
1055    fn get_child_process_info_excludes_descendants_in_other_process_groups() {
1056        let parent = 100;
1057
1058        let mut results: ResultMap = HashMap::new();
1059        results.insert(
1060            ps_key_with_state_and_group(),
1061            ok_output(
1062                "200 100 100 S 0:01.00\n201 100 201 S 0:05.00\n300 200 100 S 0:02.00\n301 201 201 S 0:09.00\n",
1063            ),
1064        );
1065
1066        let exec = TestExecutor::new(results);
1067        let info = exec.get_child_process_info(parent);
1068
1069        assert_eq!(
1070            info.child_count, 2,
1071            "only descendants that remain in the agent process group should qualify"
1072        );
1073        assert_eq!(
1074            info.active_child_count, 0,
1075            "sleeping same-process-group descendants should remain observable without suppressing timeout"
1076        );
1077        assert_eq!(
1078            info.cpu_time_ms,
1079            1000 + 2000,
1080            "detached descendants in a different process group must be excluded"
1081        );
1082    }
1083
1084    #[test]
1085    #[cfg(unix)]
1086    fn get_child_process_info_counts_busy_shell_without_descendants_as_current_work() {
1087        let parent = 100;
1088
1089        let mut results: ResultMap = HashMap::new();
1090        results.insert(
1091            ps_key_with_state_group_and_command(),
1092            ok_output("200 100 100 R 0:01.00 sh\n"),
1093        );
1094
1095        let exec = TestExecutor::new(results);
1096        let info = exec.get_child_process_info(parent);
1097
1098        assert_eq!(info.child_count, 1);
1099        assert_eq!(
1100            info.active_child_count, 1,
1101            "a shell process that is itself running with accumulated CPU must count as current child work even without descendants"
1102        );
1103        assert_eq!(info.cpu_time_ms, 1000);
1104    }
1105
1106    #[test]
1107    #[cfg(unix)]
1108    fn get_child_process_info_keeps_non_wrapper_busy_processes_active() {
1109        let parent = 100;
1110
1111        let mut results: ResultMap = HashMap::new();
1112        results.insert(
1113            ps_key_with_state_group_and_command(),
1114            ok_output("200 100 100 R 0:01.00 python3\n"),
1115        );
1116
1117        let exec = TestExecutor::new(results);
1118        let info = exec.get_child_process_info(parent);
1119
1120        assert_eq!(info.child_count, 1);
1121        assert_eq!(
1122            info.active_child_count, 1,
1123            "real worker processes must still count as current child work when they are busy"
1124        );
1125        assert_eq!(info.cpu_time_ms, 1000);
1126    }
1127
1128    #[test]
1129    #[cfg(unix)]
1130    fn get_child_process_info_excludes_zombie_descendants() {
1131        let parent = 100;
1132
1133        let mut results: ResultMap = HashMap::new();
1134        results.insert(
1135            ps_key_with_state_and_group(),
1136            ok_output("200 100 100 S 0:01.00\n201 100 100 Z 0:05.00\n"),
1137        );
1138
1139        let exec = TestExecutor::new(results);
1140        let info = exec.get_child_process_info(parent);
1141
1142        assert_eq!(info.child_count, 1, "zombie descendants must not qualify");
1143        assert_eq!(info.active_child_count, 0);
1144        assert_eq!(info.cpu_time_ms, 1000, "zombie cpu time must be ignored");
1145    }
1146
1147    #[test]
1148    #[cfg(unix)]
1149    fn get_child_process_info_returns_none_when_only_non_qualifying_descendants_exist() {
1150        let parent = 100;
1151
1152        let mut results: ResultMap = HashMap::new();
1153        results.insert(
1154            ps_key_with_state_and_group(),
1155            ok_output("200 100 200 S 0:01.00\n300 200 200 S 0:02.00\n"),
1156        );
1157
1158        let exec = TestExecutor::new(results);
1159        let info = exec.get_child_process_info(parent);
1160
1161        assert_eq!(
1162            info,
1163            ChildProcessInfo::NONE,
1164            "an empty qualified descendant set must normalize to no active child work"
1165        );
1166    }
1167
1168    #[test]
1169    #[cfg(unix)]
1170    fn get_child_process_info_excludes_zero_cpu_descendants_without_activity_evidence() {
1171        let parent = 100;
1172
1173        let mut results: ResultMap = HashMap::new();
1174        results.insert(
1175            ps_key_with_state_and_group(),
1176            ok_output("200 100 100 S 0:00.00\n"),
1177        );
1178
1179        let exec = TestExecutor::new(results);
1180        let info = exec.get_child_process_info(parent);
1181
1182        assert_eq!(info.child_count, 1);
1183        assert_eq!(info.active_child_count, 0);
1184        assert_eq!(info.cpu_time_ms, 0);
1185    }
1186
1187    #[test]
1188    #[cfg(unix)]
1189    fn get_child_process_info_does_not_count_running_zero_cpu_descendants_as_currently_active() {
1190        let parent = 100;
1191
1192        let mut results: ResultMap = HashMap::new();
1193        results.insert(
1194            ps_key_with_state_and_group(),
1195            ok_output("200 100 100 R 0:00.00\n"),
1196        );
1197
1198        let exec = TestExecutor::new(results);
1199        let info = exec.get_child_process_info(parent);
1200
1201        assert_eq!(info.child_count, 1);
1202        assert_eq!(
1203            info.active_child_count, 0,
1204            "running descendants with zero accumulated CPU should not yet count as current work"
1205        );
1206        assert_eq!(info.cpu_time_ms, 0);
1207    }
1208
1209    #[test]
1210    #[cfg(unix)]
1211    fn get_child_process_info_excludes_sleeping_descendants_with_only_historical_cpu() {
1212        let parent = 100;
1213
1214        let mut results: ResultMap = HashMap::new();
1215        results.insert(
1216            ps_key_with_state_and_group(),
1217            ok_output("200 100 100 S 0:01.00\n300 200 100 S 0:02.00\n"),
1218        );
1219
1220        let exec = TestExecutor::new(results);
1221        let info = exec.get_child_process_info(parent);
1222
1223        assert_eq!(info.child_count, 2);
1224        assert_eq!(info.active_child_count, 0);
1225        assert_eq!(info.cpu_time_ms, 3000);
1226    }
1227
1228    #[test]
1229    #[cfg(unix)]
1230    fn get_child_process_info_pgrep_fallback_is_conservative() {
1231        let parent = 100;
1232
1233        let mut results: ResultMap = HashMap::new();
1234        results.insert(pgrep_key(100), ok_output("200\n300\n"));
1235        results.insert(pgrep_key(200), ok_output(""));
1236        results.insert(pgrep_key(300), ok_output(""));
1237
1238        let exec = TestExecutor::new(results);
1239        let info = exec.get_child_process_info(parent);
1240
1241        assert!(info.has_children());
1242        assert!(
1243            !info.has_currently_active_children(),
1244            "fallback without process-state or cpu evidence must not suppress idle timeout"
1245        );
1246        assert_eq!(info.cpu_time_ms, 0);
1247    }
1248
1249    #[test]
1250    #[cfg(target_os = "macos")]
1251    fn child_pid_entry_count_converts_libproc_bytes_to_pid_count() {
1252        let pid_width = i32::try_from(std::mem::size_of::<libc::pid_t>())
1253            .expect("pid_t size should fit in i32");
1254
1255        assert_eq!(child_pid_entry_count(pid_width * 3), Some(3));
1256        assert_eq!(child_pid_entry_count(pid_width), Some(1));
1257        assert_eq!(child_pid_entry_count(0), Some(0));
1258    }
1259}