1use std::{
2 collections::HashMap,
3 fmt::Debug,
4 hash::Hash,
5 io,
6 process::{Child, Command, ExitStatus},
7};
8
9use crate::{Error, SignalHandler};
10
11#[derive(Debug)]
12pub struct ProcessSet<K> {
13 concurrency_limit: Option<usize>,
14 queued_keys: HashMap<K, Command>,
15 running_keys: HashMap<K, Child>,
16 errored_keys: HashMap<K, Error>,
17}
18
19pub enum WaitAnyResult<K> {
20 Subprocess(K, Result<(Child, ExitStatus), Error>),
21 ReceivedTerminationSignal(i32),
22 NoProcessesRunning,
23}
24
25impl<K> ProcessSet<K> {
26 pub fn new() -> Self {
27 ProcessSet {
28 concurrency_limit: None,
29 queued_keys: HashMap::new(),
30 running_keys: HashMap::new(),
31 errored_keys: HashMap::new(),
32 }
33 }
34
35 pub fn with_concurrency_limit(limit: usize) -> Self {
36 let mut n = Self::default();
37 n.concurrency_limit = Some(limit);
38 n
39 }
40}
41
42fn take_one_from_hashmap<K: Eq + Hash + Clone, V>(hashmap: &mut HashMap<K, V>) -> Option<(K, V)> {
43 let key = hashmap.keys().next();
44 if let Some(key) = key {
45 let key = key.clone();
46 return hashmap.remove_entry(&key);
47 }
48 None
49}
50
51impl<K: Hash + Eq + Clone> ProcessSet<K> {
52 fn spawn_processes(&mut self) {
53 while !self.queued_keys.is_empty()
54 && self.running_keys.len() < self.concurrency_limit.unwrap_or(usize::max_value())
55 {
56 let (key, mut command) = take_one_from_hashmap(&mut self.queued_keys).unwrap();
57 let child_res = command.spawn();
58 if let Ok(child) = child_res {
59 self.running_keys.insert(key, child);
60 } else {
61 self
62 .errored_keys
63 .insert(key, Error::UnableToSpawnProcess(child_res.unwrap_err()));
64 }
65 }
66 }
67
68 pub fn add_command(&mut self, key: K, command: Command) {
69 if self.queued_keys.contains_key(&key)
70 || self.running_keys.contains_key(&key)
71 || self.errored_keys.contains_key(&key)
72 {
73 panic!("ProcessSet::add_command: key already exists");
74 }
75 self.queued_keys.insert(key, command);
76 self.spawn_processes();
77 }
78
79 pub fn wait_any(&mut self, signal_handler: &mut SignalHandler) -> WaitAnyResult<K> {
96 if let Some(res) = self.try_wait_any() {
97 return res;
98 }
99 use signal_hook::consts::SIGCHLD;
100 loop {
101 let mut has_sigchld = false;
102 let mut has_term = None;
103 for sig in signal_handler.signals.wait() {
104 if sig == SIGCHLD {
105 has_sigchld = true;
106 } else if signal_handler.termination_signals.contains(&sig) {
107 has_term = Some(sig);
108 }
109 }
110 if let Some(sig) = has_term {
111 return WaitAnyResult::ReceivedTerminationSignal(sig);
112 }
113 if has_sigchld {
114 if let Some(res) = self.try_wait_any() {
115 return res;
116 }
117 }
118 }
119 }
120
121 pub fn try_wait_any(&mut self) -> Option<WaitAnyResult<K>> {
126 if let Some((k, e)) = take_one_from_hashmap(&mut self.errored_keys) {
127 return Some(WaitAnyResult::Subprocess(k, Err(e)));
128 }
129 if self.running_keys.is_empty() {
130 return Some(WaitAnyResult::NoProcessesRunning);
131 }
132 for (k, child) in self.running_keys.iter_mut() {
133 let wait_res = child.try_wait();
134 if let Err(e) = wait_res {
135 let k = k.clone();
136 let taken_k = self.running_keys.remove_entry(&k).unwrap().0;
137 self.spawn_processes();
138 return Some(WaitAnyResult::Subprocess(
139 taken_k,
140 Err(Error::WaitFailed(e)),
141 ));
142 }
143 let wait_res = wait_res.unwrap();
144 if let Some(wait_res) = wait_res {
145 let k = k.clone();
146 let (k, child) = self.running_keys.remove_entry(&k).unwrap();
147 self.spawn_processes();
148 return Some(WaitAnyResult::Subprocess(k, Ok((child, wait_res))));
149 }
150 }
151 None
152 }
153
154 pub fn sigkill_all(&mut self) -> io::Result<()> {
156 for (_, child) in self.running_keys.iter_mut() {
157 child.kill()?;
158 child.wait()?;
159 }
160 self.running_keys.clear();
161 Ok(())
162 }
163
164 pub fn sigint_all(&mut self) -> io::Result<()> {
166 let mut k_to_remove = Vec::new();
167 for (k, child) in self.running_keys.iter_mut() {
168 if child.try_wait()?.is_none() {
169 let pid = child.id();
170 unsafe {
173 if libc::kill(pid.try_into().unwrap(), libc::SIGINT) != 0 {
174 return Err(io::Error::last_os_error());
175 }
176 };
177 } else {
178 k_to_remove.push(k.clone());
179 }
180 }
181 for k in k_to_remove.into_iter() {
182 self.running_keys.remove(&k);
183 }
184 Ok(())
185 }
186
187 pub fn sigint_all_and_wait(&mut self, signal_handler: &mut SignalHandler) -> io::Result<()> {
189 self.sigint_all()?;
190 while !self.running_keys.is_empty() {
191 let wres = self.wait_any(signal_handler);
192 if let WaitAnyResult::ReceivedTerminationSignal(_) = wres {
193 self.sigint_all().unwrap();
194 }
195 }
196 Ok(())
197 }
198}
199
200impl<K> Default for ProcessSet<K> {
201 fn default() -> Self {
202 Self::new()
203 }
204}