1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
//! Rotating command process pool
//!
//! This pool will spawn up to a set number of processes, and then start returning old processes in
//! a round robin fashion. To effectively manage cleanup, this needs to be dropped, so panics
//! while using this may result in zombie processes.
use super::pool;
use super::pool::{Error, Pool};
use std::borrow::BorrowMut;
use std::process::{Child, Command, Stdio};
/// A pool to manage spawning a limited number of processses
///
/// This pool will return new processes up to the limit, and then start returning old processes in
/// a round-robin order. This type of pool is more effective if the process handles each task by
/// delimiters as well, allowing for better utilization of resources for embarassingly parallel
/// tasks.
#[derive(Debug)]
pub struct Rotating<C> {
procs: Vec<Child>,
max_procs: usize,
command: C,
ind: usize,
}
impl<C: BorrowMut<Command>> Rotating<C> {
/// Create a new empty pool with a limited number of total processes
///
/// Set `max_procs` to 0 to enable unbounded parallelism.
pub fn new(mut command: C, max_procs: usize) -> Self {
command.borrow_mut().stdin(Stdio::piped());
Self {
procs: Vec::with_capacity(max_procs),
max_procs,
command,
ind: 0,
}
}
/// Spawn a new process
fn spawn(&mut self) -> Result<Child, Error> {
self.command.borrow_mut().spawn().map_err(Error::Spawn)
}
}
impl<C: BorrowMut<Command>> Pool for Rotating<C> {
/// Get a process from the pool
///
/// If fewer than `max_procs` have been spawned, this will spawn a new process, otherwise it
/// will return one that was already spawned.
fn get(&mut self) -> Result<&mut Child, Error> {
if self.max_procs == 0 {
let proc = self.spawn()?;
self.procs.push(proc);
Ok(self.procs.last_mut().unwrap())
} else {
if self.procs.len() < self.max_procs {
let proc = self.spawn()?;
self.procs.push(proc);
}
let child = &mut self.procs[self.ind];
self.ind += 1;
self.ind %= self.max_procs;
Ok(child)
}
}
/// Wait for all processes to finish successfully
///
/// Errors will terminate early and not wait for reamining processes to finish. To continue
/// waiting for them anyway you can continue to call join until you get a success, this will
/// indicate that there are no more running processes under management by the pool.
fn join(&mut self) -> Result<(), Error> {
// NOTE we do this instead of drain so that errors don't drop the rest of our processes
// creating zombies
while let Some(proc) = self.procs.pop() {
pool::wait_proc(proc)?;
}
Ok(())
}
}
impl<C> Drop for Rotating<C> {
fn drop(&mut self) {
// kill any children left in self
for proc in &mut self.procs {
let _ = proc.kill();
}
// wait for them to be cleaned up
for proc in &mut self.procs {
let _ = proc.wait();
}
}
}