ralph_workflow/executor/
bfs.rs1use crate::executor::ps::child_info_from_descendant_pids;
6use crate::executor::ps::parse_pgrep_output;
7use crate::executor::{ChildProcessInfo, ProcessExecutor};
8use std::collections::{HashSet, VecDeque};
9
10fn pgrep_children<E: ProcessExecutor + ?Sized>(executor: &E, current_pid: u32) -> Option<Vec<u32>> {
11 let output = executor
12 .execute("pgrep", &["-P", ¤t_pid.to_string()], &[], None)
13 .ok()?;
14 if output.succeeded() {
15 parse_pgrep_output(&output.stdout)
16 } else if output.exit_code() == 1 {
17 Some(Vec::new())
18 } else {
19 None
20 }
21}
22
23fn bfs_step(
24 current: u32,
25 visited: &mut HashSet<u32>,
26 get_children: impl Fn(u32) -> Option<Vec<u32>>,
27) -> Option<(Vec<u32>, Vec<u32>)> {
28 let child_pids = get_children(current)?;
29 let new_pids: Vec<u32> = child_pids
30 .into_iter()
31 .filter(|&pid| visited.insert(pid))
32 .collect();
33 Some((new_pids.clone(), new_pids))
34}
35
36fn apply_bfs_step(
37 current: u32,
38 queue: &mut VecDeque<u32>,
39 visited: &mut HashSet<u32>,
40 descendants: &mut Vec<u32>,
41 get_children: &impl Fn(u32) -> Option<Vec<u32>>,
42) {
43 if let Some((new_queue_items, new_descendants)) = bfs_step(current, visited, get_children) {
44 descendants.extend(new_descendants);
45 queue.extend(new_queue_items);
46 }
47}
48
49fn bfs_collect(
50 queue: &mut VecDeque<u32>,
51 visited: &mut HashSet<u32>,
52 get_children: &impl Fn(u32) -> Option<Vec<u32>>,
53) -> Vec<u32> {
54 let mut descendants = Vec::new();
55 while let Some(current) = queue.pop_front() {
56 apply_bfs_step(current, queue, visited, &mut descendants, get_children);
57 }
58 descendants
59}
60
61fn bfs_traverse(start: u32, get_children: impl Fn(u32) -> Option<Vec<u32>>) -> Vec<u32> {
62 let mut queue = VecDeque::from([start]);
63 let mut visited = HashSet::new();
64 let _ = visited.insert(start);
65
66 let mut descendants = bfs_collect(&mut queue, &mut visited, &get_children);
67 descendants.sort_unstable();
68 descendants
69}
70
71pub fn collect_descendants<E: ProcessExecutor + ?Sized>(executor: &E, parent_pid: u32) -> Vec<u32> {
73 bfs_traverse(parent_pid, |pid| pgrep_children(executor, pid))
74}
75
76pub fn compute_from_descendants(_parent_pid: u32, descendants: &[u32]) -> ChildProcessInfo {
78 if descendants.is_empty() {
79 return ChildProcessInfo::NONE;
80 }
81
82 child_info_from_descendant_pids(descendants)
83}