xstream_util/limit.rs
1//! Limiting process pool
2//!
3//! This is a process pool to manage limiting the number of spawned processes, and manage cleanup
4//! so there are no zombie processes. When trying to spawn more than the limit, the old process
5//! will be waited on before spawning a new one. To effectively manage cleanup, this needs to be
6//! dropped, so panics while using this may result in zombie processes.
7use super::pool;
8use super::pool::{Error, Pool};
9use std::borrow::BorrowMut;
10use std::collections::VecDeque;
11use std::process::{Child, Command, Stdio};
12
13// TODO implement a better limited pool that pipes to the next completed one
14/// A pool to manage spawning a limited number of processses
15///
16/// This will wait for the oldest scheduled process to complete before scheduling a new one. If you
17/// schedule a long running process, then a bunch of short ones, it won't schedule more short ones
18/// beyond the buffer until the long one has finished.
19#[derive(Debug)]
20pub struct Limiting<C> {
21 procs: VecDeque<Child>,
22 max_procs: usize,
23 command: C,
24}
25
26impl<C: BorrowMut<Command>> Limiting<C> {
27 /// Create a new empty pool with a limited number of total processes
28 ///
29 /// Set `max_procs` to 0 to enable unbounded parallelism.
30 pub fn new(mut command: C, max_procs: usize) -> Self {
31 command.borrow_mut().stdin(Stdio::piped());
32 Limiting {
33 procs: VecDeque::with_capacity(max_procs),
34 max_procs,
35 command,
36 }
37 }
38}
39
40impl<C: BorrowMut<Command>> Pool for Limiting<C> {
41 /// Spawn a new process with command and return a mutable reference to the process
42 ///
43 /// This command will block until it can schedule the process under the constraints. It can
44 /// fail for any reason, including an earlier process failed, and never actually spawn the
45 /// process in question. If it does successfully spawn the process, it will be recorded so that
46 /// it will be cleaned up if the pool is dropped.
47 fn get(&mut self) -> Result<&mut Child, Error> {
48 // wait for the oldest process if we're bounded
49 if self.max_procs != 0 && self.procs.len() == self.max_procs {
50 pool::wait_proc(self.procs.pop_front().unwrap())?;
51 };
52
53 // now schedule new process
54 let proc = self.command.borrow_mut().spawn().map_err(Error::Spawn)?;
55 self.procs.push_back(proc);
56 Ok(self.procs.back_mut().unwrap()) // just pushed
57 }
58
59 /// Wait for all processes to finish
60 ///
61 /// Errors will terminate early and not wait for reamining processes to finish. To continue
62 /// waiting for them anyway you can continue to call join until you get a success, this will
63 /// indicate that there are no more running processes under management by the pool.
64 fn join(&mut self) -> Result<(), Error> {
65 // NOTE we do this instead of drain so that errors don't drop the rest of our processes
66 // creating zombies
67 while let Some(proc) = self.procs.pop_back() {
68 pool::wait_proc(proc)?;
69 }
70 Ok(())
71 }
72}
73
74impl<C> Drop for Limiting<C> {
75 fn drop(&mut self) {
76 // kill any children left in self
77 for proc in &mut self.procs {
78 let _ = proc.kill();
79 }
80 // wait for them to be cleaned up
81 for proc in &mut self.procs {
82 let _ = proc.wait();
83 }
84 }
85}