Skip to main content

ralph_workflow/executor/
ps.rs

1//! PS output parsing for child process detection.
2
3use super::ChildProcessInfo;
4
5#[derive(Clone, Copy)]
6struct ProcessEntry {
7    pid: u32,
8    parent_pid: u32,
9    cpu_time_ms: u64,
10    in_scope: bool,
11    currently_active: bool,
12}
13
14fn parse_seconds_frac(seconds_str: &str) -> Option<(u64, u64)> {
15    if let Some((s, f)) = seconds_str.split_once('.') {
16        let secs: u64 = s.parse().ok()?;
17        let frac: u64 = f.get(..2).unwrap_or(f).parse().ok()?;
18        Some((secs, frac * 10))
19    } else {
20        Some((seconds_str.parse().ok()?, 0))
21    }
22}
23
24fn parse_hours_field(field: &str) -> Option<u64> {
25    if let Some((days, hours)) = field.split_once('-') {
26        let days: u64 = days.parse().ok()?;
27        let hours: u64 = hours.parse().ok()?;
28        days.checked_mul(24)?.checked_add(hours)
29    } else {
30        field.parse().ok()
31    }
32}
33
34pub fn parse_cputime_ms(s: &str) -> Option<u64> {
35    let parts: Vec<&str> = s.split(':').collect();
36    match parts.len() {
37        3 => {
38            let hours = parse_hours_field(parts[0])?;
39            let minutes: u64 = parts[1].parse().ok()?;
40            let (secs, frac_ms) = parse_seconds_frac(parts[2])?;
41            Some((hours * 3600 + minutes * 60 + secs) * 1000 + frac_ms)
42        }
43        2 => {
44            let minutes: u64 = parts[0].parse().ok()?;
45            let (secs, frac_ms) = parse_seconds_frac(parts[1])?;
46            Some((minutes * 60 + secs) * 1000 + frac_ms)
47        }
48        _ => None,
49    }
50}
51
52fn qualifies_process_state(state: &str) -> bool {
53    match state.chars().next() {
54        Some('Z' | 'X' | 'T' | 'I') | None => false,
55        Some(_) => true,
56    }
57}
58
59fn state_indicates_current_activity(state: &str, cpu_time_ms: u64) -> bool {
60    match state.chars().next() {
61        Some('D' | 'U') => true,
62        Some('R') => cpu_time_ms > 0,
63        _ => false,
64    }
65}
66
67fn module_level_descendant_pid_signature(descendants: &[u32]) -> u64 {
68    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
69    const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
70
71    descendants.iter().fold(FNV_OFFSET, |signature, &pid| {
72        pid.to_le_bytes().iter().fold(signature, |sig, &byte| {
73            (sig ^ u64::from(byte)).wrapping_mul(FNV_PRIME)
74        })
75    })
76}
77
78fn build_children_lookup(
79    entries: &[ProcessEntry],
80) -> std::collections::HashMap<u32, Vec<ProcessEntry>> {
81    entries
82        .iter()
83        .fold(std::collections::HashMap::new(), |mut lookup, entry| {
84            lookup.entry(entry.parent_pid).or_default().push(*entry);
85            lookup
86        })
87}
88
89struct ChildTally {
90    child_count: u32,
91    active_child_count: u32,
92    total_cpu_ms: u64,
93    descendant_pids: Vec<u32>,
94}
95
96fn tally_one_child(
97    child: &ProcessEntry,
98    tally: &mut ChildTally,
99    visited: &mut std::collections::HashSet<u32>,
100    queue: &mut std::collections::VecDeque<u32>,
101) {
102    if !child.in_scope || !visited.insert(child.pid) {
103        return;
104    }
105    tally.child_count = tally.child_count.saturating_add(1);
106    if child.currently_active {
107        tally.active_child_count = tally.active_child_count.saturating_add(1);
108    }
109    tally.total_cpu_ms += child.cpu_time_ms;
110    tally.descendant_pids.push(child.pid);
111    queue.push_back(child.pid);
112}
113
114fn tally_children_of(
115    current: u32,
116    children_of: &std::collections::HashMap<u32, Vec<ProcessEntry>>,
117    tally: &mut ChildTally,
118    visited: &mut std::collections::HashSet<u32>,
119    queue: &mut std::collections::VecDeque<u32>,
120) {
121    if let Some(kids) = children_of.get(&current) {
122        kids.iter().for_each(|child| {
123            debug_assert_eq!(child.parent_pid, current);
124            tally_one_child(child, tally, visited, queue);
125        });
126    }
127}
128
129fn new_child_tally() -> ChildTally {
130    ChildTally {
131        child_count: 0,
132        active_child_count: 0,
133        total_cpu_ms: 0,
134        descendant_pids: Vec::new(),
135    }
136}
137
138fn drain_tally_queue(
139    queue: &mut std::collections::VecDeque<u32>,
140    children_of: &std::collections::HashMap<u32, Vec<ProcessEntry>>,
141    tally: &mut ChildTally,
142    visited: &mut std::collections::HashSet<u32>,
143) {
144    while let Some(current) = queue.pop_front() {
145        tally_children_of(current, children_of, tally, visited, queue);
146    }
147}
148
149fn accumulate_children(
150    children_of: &std::collections::HashMap<u32, Vec<ProcessEntry>>,
151    parent_pid: u32,
152) -> ChildTally {
153    let mut tally = new_child_tally();
154    let mut visited = std::collections::HashSet::new();
155    let mut queue = std::collections::VecDeque::from([parent_pid]);
156    drain_tally_queue(&mut queue, children_of, &mut tally, &mut visited);
157    tally
158}
159
160fn compute_child_process_info(
161    entries: Vec<ProcessEntry>,
162    parent_pid: u32,
163) -> Option<ChildProcessInfo> {
164    let children_of = build_children_lookup(&entries);
165    let mut tally = accumulate_children(&children_of, parent_pid);
166    tally.descendant_pids.sort_unstable();
167
168    if tally.child_count == 0 {
169        return Some(ChildProcessInfo::NONE);
170    }
171
172    Some(ChildProcessInfo {
173        child_count: tally.child_count,
174        active_child_count: tally.active_child_count,
175        cpu_time_ms: tally.total_cpu_ms,
176        descendant_pid_signature: module_level_descendant_pid_signature(&tally.descendant_pids),
177    })
178}
179
180pub fn parse_ps_output(stdout: &str, parent_pid: u32) -> Option<ChildProcessInfo> {
181    let entries = parse_ps_entries(stdout, parent_pid);
182    compute_child_process_info(entries, parent_pid)
183}
184
185fn parse_ps_entries(stdout: &str, parent_pid: u32) -> Vec<ProcessEntry> {
186    stdout
187        .lines()
188        .filter_map(|line| parse_ps_line(line, parent_pid))
189        .collect()
190}
191
192fn parse_ps_line_extended<'a>(parts: &[&'a str], parent_pid: u32) -> (bool, bool, &'a str) {
193    let pgid_matches_parent = parts[2]
194        .parse::<u32>()
195        .ok()
196        .is_some_and(|pgid| pgid == parent_pid);
197    let state_qualifies = qualifies_process_state(parts[3]);
198    let cpu_ms = parse_cputime_ms(parts[4]).unwrap_or(0);
199    (
200        pgid_matches_parent && state_qualifies,
201        state_indicates_current_activity(parts[3], cpu_ms),
202        parts[4],
203    )
204}
205
206fn parse_pid_pair(parts: &[&str]) -> Option<(u32, u32)> {
207    let entry_pid = parts[0].parse::<u32>().ok()?;
208    let parent_of_entry = parts[1].parse::<u32>().ok()?;
209    Some((entry_pid, parent_of_entry))
210}
211
212fn parse_ps_line(line: &str, parent_pid: u32) -> Option<ProcessEntry> {
213    let parts: Vec<&str> = line.split_whitespace().collect();
214    if parts.len() < 3 {
215        return None;
216    }
217
218    let (entry_pid, parent_of_entry) = parse_pid_pair(&parts)?;
219
220    let (in_scope, currently_active, cputime_text) = if parts.len() >= 5 {
221        parse_ps_line_extended(&parts, parent_pid)
222    } else {
223        (true, false, parts[2])
224    };
225
226    let cpu_ms = parse_cputime_ms(cputime_text).unwrap_or(0);
227    Some(ProcessEntry {
228        pid: entry_pid,
229        parent_pid: parent_of_entry,
230        cpu_time_ms: cpu_ms,
231        in_scope,
232        currently_active,
233    })
234}
235
236pub fn parse_pgrep_output(stdout: &str) -> Option<Vec<u32>> {
237    let child_pids: Vec<u32> = stdout
238        .lines()
239        .filter_map(|line| {
240            let pid = line.trim();
241            if pid.is_empty() {
242                None
243            } else {
244                pid.parse::<u32>().ok()
245            }
246        })
247        .collect();
248    Some(child_pids)
249}
250
251fn canonical_descendant_signature(descendants: &[u32]) -> u64 {
252    let mut sorted = descendants.to_vec();
253    sorted.sort_unstable();
254    module_level_descendant_pid_signature(&sorted)
255}
256
257pub fn child_info_from_descendant_pids(descendants: &[u32]) -> ChildProcessInfo {
258    if descendants.is_empty() {
259        return ChildProcessInfo::NONE;
260    }
261
262    let child_count = u32::try_from(descendants.len()).unwrap_or(u32::MAX);
263    ChildProcessInfo {
264        child_count,
265        active_child_count: 0,
266        cpu_time_ms: 0,
267        descendant_pid_signature: canonical_descendant_signature(descendants),
268    }
269}
270
271pub fn warn_child_process_detection_conservative() {}
272
273pub fn warn_child_process_detection_degraded() {}