xstream_util/
limit.rs

1//! Limiting process pool
2//!
3//! This is a process pool to manage limiting the number of spawned processes, and manage cleanup
4//! so there are no zombie processes. When trying to spawn more than the limit, the old process
5//! will be waited on before spawning a new one. To effectively manage cleanup, this needs to be
6//! dropped, so panics while using this may result in zombie processes.
7use super::pool;
8use super::pool::{Error, Pool};
9use std::borrow::BorrowMut;
10use std::collections::VecDeque;
11use std::process::{Child, Command, Stdio};
12
13// TODO implement a better limited pool that pipes to the next completed one
14/// A pool to manage spawning a limited number of processses
15///
16/// This will wait for the oldest scheduled process to complete before scheduling a new one. If you
17/// schedule a long running process, then a bunch of short ones, it won't schedule more short ones
18/// beyond the buffer until the long one has finished.
19#[derive(Debug)]
20pub struct Limiting<C> {
21    procs: VecDeque<Child>,
22    max_procs: usize,
23    command: C,
24}
25
26impl<C: BorrowMut<Command>> Limiting<C> {
27    /// Create a new empty pool with a limited number of total processes
28    ///
29    /// Set `max_procs` to 0 to enable unbounded parallelism.
30    pub fn new(mut command: C, max_procs: usize) -> Self {
31        command.borrow_mut().stdin(Stdio::piped());
32        Limiting {
33            procs: VecDeque::with_capacity(max_procs),
34            max_procs,
35            command,
36        }
37    }
38}
39
40impl<C: BorrowMut<Command>> Pool for Limiting<C> {
41    /// Spawn a new process with command and return a mutable reference to the process
42    ///
43    /// This command will block until it can schedule the process under the constraints. It can
44    /// fail for any reason, including an earlier process failed, and never actually spawn the
45    /// process in question. If it does successfully spawn the process, it will be recorded so that
46    /// it will be cleaned up if the pool is dropped.
47    fn get(&mut self) -> Result<&mut Child, Error> {
48        // wait for the oldest process if we're bounded
49        if self.max_procs != 0 && self.procs.len() == self.max_procs {
50            pool::wait_proc(self.procs.pop_front().unwrap())?;
51        };
52
53        // now schedule new process
54        let proc = self.command.borrow_mut().spawn().map_err(Error::Spawn)?;
55        self.procs.push_back(proc);
56        Ok(self.procs.back_mut().unwrap()) // just pushed
57    }
58
59    /// Wait for all processes to finish
60    ///
61    /// Errors will terminate early and not wait for reamining processes to finish. To continue
62    /// waiting for them anyway you can continue to call join until you get a success, this will
63    /// indicate that there are no more running processes under management by the pool.
64    fn join(&mut self) -> Result<(), Error> {
65        // NOTE we do this instead of drain so that errors don't drop the rest of our processes
66        // creating zombies
67        while let Some(proc) = self.procs.pop_back() {
68            pool::wait_proc(proc)?;
69        }
70        Ok(())
71    }
72}
73
74impl<C> Drop for Limiting<C> {
75    fn drop(&mut self) {
76        // kill any children left in self
77        for proc in &mut self.procs {
78            let _ = proc.kill();
79        }
80        // wait for them to be cleaned up
81        for proc in &mut self.procs {
82            let _ = proc.wait();
83        }
84    }
85}