1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
//! Simple command process pool
//!
//! This is a simple pool to manage limiting the number of spawned processes, and manage cleanup so
//! there are no zombie processes. To effectively manage cleanup, this needs to be dropped, so
//! panics while using this may result in zombie processes
use std::collections::VecDeque;
use std::io::{Error, ErrorKind, Result};
use std::process::{Child, Command, ExitStatus};
/// A pool to manage spawning a limited number of processses
///
/// This design is simple and will wait for the old scheduled process to complete before scheduling
/// a new one. If you schedule a long running process, then a bunch of short ones, it won't
/// schedule more short ones beyond the buffer until the long one has finished.
#[derive(Debug)]
pub struct Pool {
procs: VecDeque<Child>,
max_procs: usize,
}
fn parse_code(status: ExitStatus) -> Result<()> {
match status.code() {
Some(0) => Ok(()),
Some(code) => Err(Error::new(
ErrorKind::Other,
format!("child process finished with nonzero exit code: {}", code),
)),
None => Err(Error::new(
ErrorKind::Other,
"child process was killed by a signal",
)),
}
}
/// internal function to wait for a process and error in the event that it doesn't complete
/// successfully (non-zero error code or otherwise)
fn wait_proc(proc: &mut Child) -> Result<()> {
parse_code(proc.wait()?)
}
impl Pool {
/// Create a new empty pool with a limited number of total processes
///
/// Set max_procs to 0 to enable unbounded parallelism.
pub fn new(max_procs: usize) -> Pool {
Pool {
procs: VecDeque::new(),
max_procs,
}
}
/// Spawn a new process with command and return a mutable reference to the process
///
/// This command will block until it can schedule the process under the constraints. It can
/// fail for any reason, including an earlier process failed, and never actually spawn the
/// process in question. If it does successfully spawn the process, it will be recorded so that
/// it will be cleaned up if the pool is dropped.
pub fn spawn(&mut self, command: &mut Command) -> Result<&mut Child> {
// check if early processes have finished and clean them up
while let Some(proc) = self.procs.front_mut() {
match proc.try_wait()? {
Some(status) => {
parse_code(status)?;
self.procs.pop_front();
}
None => break,
}
}
// next wait for oldest proc to finish if we're full
if self.procs.len() == self.max_procs {
if let Some(mut proc) = self.procs.pop_front() {
wait_proc(&mut proc)?
}
};
// now schedule new process
self.procs.push_back(command.spawn()?);
Ok(self.procs.back_mut().unwrap()) // just pushed
}
/// wait for all processes to finish
///
/// Errors will terminate early and not wait for reamining processes to finish. To continue
/// waiting for them anyway you can continue to call join until you get a success, this will
/// indicate that there are no more running processes under management by the pool.
pub fn join(&mut self) -> Result<()> {
while let Some(mut proc) = self.procs.pop_front() {
wait_proc(&mut proc)?
}
Ok(())
}
}
impl Drop for Pool {
fn drop(&mut self) {
// kill any children left in self
for proc in &mut self.procs {
let _ = proc.kill();
}
// wait for them to be cleaned up
for proc in &mut self.procs {
let _ = wait_proc(proc);
}
}
}