xstream_util/rot.rs
1//! Rotating command process pool
2//!
3//! This pool will spawn up to a set number of processes, and then start returning old processes in
4//! a round robin fashion. To effectively manage cleanup, this needs to be dropped, so panics
5//! while using this may result in zombie processes.
6use super::pool;
7use super::pool::{Error, Pool};
8use std::borrow::BorrowMut;
9use std::process::{Child, Command, Stdio};
10
11/// A pool to manage spawning a limited number of processses
12///
13/// This pool will return new processes up to the limit, and then start returning old processes in
14/// a round-robin order. This type of pool is more effective if the process handles each task by
15/// delimiters as well, allowing for better utilization of resources for embarassingly parallel
16/// tasks.
17#[derive(Debug)]
18pub struct Rotating<C> {
19 procs: Vec<Child>,
20 max_procs: usize,
21 command: C,
22 ind: usize,
23}
24
25impl<C: BorrowMut<Command>> Rotating<C> {
26 /// Create a new empty pool with a limited number of total processes
27 ///
28 /// Set `max_procs` to 0 to enable unbounded parallelism.
29 pub fn new(mut command: C, max_procs: usize) -> Self {
30 command.borrow_mut().stdin(Stdio::piped());
31 Self {
32 procs: Vec::with_capacity(max_procs),
33 max_procs,
34 command,
35 ind: 0,
36 }
37 }
38
39 /// Spawn a new process
40 fn spawn(&mut self) -> Result<Child, Error> {
41 self.command.borrow_mut().spawn().map_err(Error::Spawn)
42 }
43}
44
45impl<C: BorrowMut<Command>> Pool for Rotating<C> {
46 /// Get a process from the pool
47 ///
48 /// If fewer than `max_procs` have been spawned, this will spawn a new process, otherwise it
49 /// will return one that was already spawned.
50 fn get(&mut self) -> Result<&mut Child, Error> {
51 if self.max_procs == 0 {
52 let proc = self.spawn()?;
53 self.procs.push(proc);
54 Ok(self.procs.last_mut().unwrap())
55 } else {
56 if self.procs.len() < self.max_procs {
57 let proc = self.spawn()?;
58 self.procs.push(proc);
59 }
60 let child = &mut self.procs[self.ind];
61 self.ind += 1;
62 self.ind %= self.max_procs;
63 Ok(child)
64 }
65 }
66
67 /// Wait for all processes to finish successfully
68 ///
69 /// Errors will terminate early and not wait for reamining processes to finish. To continue
70 /// waiting for them anyway you can continue to call join until you get a success, this will
71 /// indicate that there are no more running processes under management by the pool.
72 fn join(&mut self) -> Result<(), Error> {
73 // NOTE we do this instead of drain so that errors don't drop the rest of our processes
74 // creating zombies
75 while let Some(proc) = self.procs.pop() {
76 pool::wait_proc(proc)?;
77 }
78 Ok(())
79 }
80}
81
82impl<C> Drop for Rotating<C> {
83 fn drop(&mut self) {
84 // kill any children left in self
85 for proc in &mut self.procs {
86 let _ = proc.kill();
87 }
88 // wait for them to be cleaned up
89 for proc in &mut self.procs {
90 let _ = proc.wait();
91 }
92 }
93}