spawn_wait/
processset.rs

1use std::{
2  collections::HashMap,
3  fmt::Debug,
4  hash::Hash,
5  io,
6  process::{Child, Command, ExitStatus},
7};
8
9use crate::{Error, SignalHandler};
10
11#[derive(Debug)]
12pub struct ProcessSet<K> {
13  concurrency_limit: Option<usize>,
14  queued_keys: HashMap<K, Command>,
15  running_keys: HashMap<K, Child>,
16  errored_keys: HashMap<K, Error>,
17}
18
19pub enum WaitAnyResult<K> {
20  Subprocess(K, Result<(Child, ExitStatus), Error>),
21  ReceivedTerminationSignal(i32),
22  NoProcessesRunning,
23}
24
25impl<K> ProcessSet<K> {
26  pub fn new() -> Self {
27    ProcessSet {
28      concurrency_limit: None,
29      queued_keys: HashMap::new(),
30      running_keys: HashMap::new(),
31      errored_keys: HashMap::new(),
32    }
33  }
34
35  pub fn with_concurrency_limit(limit: usize) -> Self {
36    let mut n = Self::default();
37    n.concurrency_limit = Some(limit);
38    n
39  }
40}
41
42fn take_one_from_hashmap<K: Eq + Hash + Clone, V>(hashmap: &mut HashMap<K, V>) -> Option<(K, V)> {
43  let key = hashmap.keys().next();
44  if let Some(key) = key {
45    let key = key.clone();
46    return hashmap.remove_entry(&key);
47  }
48  None
49}
50
51impl<K: Hash + Eq + Clone> ProcessSet<K> {
52  fn spawn_processes(&mut self) {
53    while !self.queued_keys.is_empty()
54      && self.running_keys.len() < self.concurrency_limit.unwrap_or(usize::max_value())
55    {
56      let (key, mut command) = take_one_from_hashmap(&mut self.queued_keys).unwrap();
57      let child_res = command.spawn();
58      if let Ok(child) = child_res {
59        self.running_keys.insert(key, child);
60      } else {
61        self
62          .errored_keys
63          .insert(key, Error::UnableToSpawnProcess(child_res.unwrap_err()));
64      }
65    }
66  }
67
68  pub fn add_command(&mut self, key: K, command: Command) {
69    if self.queued_keys.contains_key(&key)
70      || self.running_keys.contains_key(&key)
71      || self.errored_keys.contains_key(&key)
72    {
73      panic!("ProcessSet::add_command: key already exists");
74    }
75    self.queued_keys.insert(key, command);
76    self.spawn_processes();
77  }
78
79  /// Wait for any process to finish, and return the corrosponding key and resulting child (or error).
80  ///
81  /// Takes in a signal handler from outside which can be created with
82  /// [`SignalHandler::default()`](crate::SignalHandler). This ensures that
83  /// signals between waits are not missed.
84  ///
85  /// If multiple processes have finished, this will only return one of them.
86  /// Call this function in a loop to wait for all processes to finish.
87  ///
88  /// If no process has finished, this will pause the current thread.
89  ///
90  /// If there are no processes running, this will return NoProcessesRunning.
91  ///
92  /// If, during the middle of waiting, the current process gets a SIGINT or SIGTERM,
93  /// this will return ReceivedTerminationSignal(signal_number). More signals can be added
94  /// via [`SignalHandler::add_termination_signal`](crate::SignalHandler::add_termination_signal).
95  pub fn wait_any(&mut self, signal_handler: &mut SignalHandler) -> WaitAnyResult<K> {
96    if let Some(res) = self.try_wait_any() {
97      return res;
98    }
99    use signal_hook::consts::SIGCHLD;
100    loop {
101      let mut has_sigchld = false;
102      let mut has_term = None;
103      for sig in signal_handler.signals.wait() {
104        if sig == SIGCHLD {
105          has_sigchld = true;
106        } else if signal_handler.termination_signals.contains(&sig) {
107          has_term = Some(sig);
108        }
109      }
110      if let Some(sig) = has_term {
111        return WaitAnyResult::ReceivedTerminationSignal(sig);
112      }
113      if has_sigchld {
114        if let Some(res) = self.try_wait_any() {
115          return res;
116        }
117      }
118    }
119  }
120
121  /// Non-blocking version of wait_any. If no process has finished, this will
122  /// just return None.
123  ///
124  /// Will never return WaitAnyResult::ReceivedTerminationSignal.
125  pub fn try_wait_any(&mut self) -> Option<WaitAnyResult<K>> {
126    if let Some((k, e)) = take_one_from_hashmap(&mut self.errored_keys) {
127      return Some(WaitAnyResult::Subprocess(k, Err(e)));
128    }
129    if self.running_keys.is_empty() {
130      return Some(WaitAnyResult::NoProcessesRunning);
131    }
132    for (k, child) in self.running_keys.iter_mut() {
133      let wait_res = child.try_wait();
134      if let Err(e) = wait_res {
135        let k = k.clone();
136        let taken_k = self.running_keys.remove_entry(&k).unwrap().0;
137        self.spawn_processes();
138        return Some(WaitAnyResult::Subprocess(
139          taken_k,
140          Err(Error::WaitFailed(e)),
141        ));
142      }
143      let wait_res = wait_res.unwrap();
144      if let Some(wait_res) = wait_res {
145        let k = k.clone();
146        let (k, child) = self.running_keys.remove_entry(&k).unwrap();
147        self.spawn_processes();
148        return Some(WaitAnyResult::Subprocess(k, Ok((child, wait_res))));
149      }
150    }
151    None
152  }
153
154  /// Kills all subprocesses.
155  pub fn sigkill_all(&mut self) -> io::Result<()> {
156    for (_, child) in self.running_keys.iter_mut() {
157      child.kill()?;
158      child.wait()?;
159    }
160    self.running_keys.clear();
161    Ok(())
162  }
163
164  /// Send a SIGINT to all subprocesses and return immediately.
165  pub fn sigint_all(&mut self) -> io::Result<()> {
166    let mut k_to_remove = Vec::new();
167    for (k, child) in self.running_keys.iter_mut() {
168      if child.try_wait()?.is_none() {
169        let pid = child.id();
170        // Since we have tried to wait the child process and it is still running,
171        // the pid we got must be correct.
172        unsafe {
173          if libc::kill(pid.try_into().unwrap(), libc::SIGINT) != 0 {
174            return Err(io::Error::last_os_error());
175          }
176        };
177      } else {
178        k_to_remove.push(k.clone());
179      }
180    }
181    for k in k_to_remove.into_iter() {
182      self.running_keys.remove(&k);
183    }
184    Ok(())
185  }
186
187  /// Send a SIGINT to all subprocesses and wait for them to finish.
188  pub fn sigint_all_and_wait(&mut self, signal_handler: &mut SignalHandler) -> io::Result<()> {
189    self.sigint_all()?;
190    while !self.running_keys.is_empty() {
191      let wres = self.wait_any(signal_handler);
192      if let WaitAnyResult::ReceivedTerminationSignal(_) = wres {
193        self.sigint_all().unwrap();
194      }
195    }
196    Ok(())
197  }
198}
199
200impl<K> Default for ProcessSet<K> {
201  fn default() -> Self {
202    Self::new()
203  }
204}