xstream_util/
rot.rs

1//! Rotating command process pool
2//!
3//! This pool will spawn up to a set number of processes, and then start returning old processes in
4//! a round robin fashion.  To effectively manage cleanup, this needs to be dropped, so panics
5//! while using this may result in zombie processes.
6use super::pool;
7use super::pool::{Error, Pool};
8use std::borrow::BorrowMut;
9use std::process::{Child, Command, Stdio};
10
11/// A pool to manage spawning a limited number of processses
12///
13/// This pool will return new processes up to the limit, and then start returning old processes in
14/// a round-robin order. This type of pool is more effective if the process handles each task by
15/// delimiters as well, allowing for better utilization of resources for embarassingly parallel
16/// tasks.
17#[derive(Debug)]
18pub struct Rotating<C> {
19    procs: Vec<Child>,
20    max_procs: usize,
21    command: C,
22    ind: usize,
23}
24
25impl<C: BorrowMut<Command>> Rotating<C> {
26    /// Create a new empty pool with a limited number of total processes
27    ///
28    /// Set `max_procs` to 0 to enable unbounded parallelism.
29    pub fn new(mut command: C, max_procs: usize) -> Self {
30        command.borrow_mut().stdin(Stdio::piped());
31        Self {
32            procs: Vec::with_capacity(max_procs),
33            max_procs,
34            command,
35            ind: 0,
36        }
37    }
38
39    /// Spawn a new process
40    fn spawn(&mut self) -> Result<Child, Error> {
41        self.command.borrow_mut().spawn().map_err(Error::Spawn)
42    }
43}
44
45impl<C: BorrowMut<Command>> Pool for Rotating<C> {
46    /// Get a process from the pool
47    ///
48    /// If fewer than `max_procs` have been spawned, this will spawn a new process, otherwise it
49    /// will return one that was already spawned.
50    fn get(&mut self) -> Result<&mut Child, Error> {
51        if self.max_procs == 0 {
52            let proc = self.spawn()?;
53            self.procs.push(proc);
54            Ok(self.procs.last_mut().unwrap())
55        } else {
56            if self.procs.len() < self.max_procs {
57                let proc = self.spawn()?;
58                self.procs.push(proc);
59            }
60            let child = &mut self.procs[self.ind];
61            self.ind += 1;
62            self.ind %= self.max_procs;
63            Ok(child)
64        }
65    }
66
67    /// Wait for all processes to finish successfully
68    ///
69    /// Errors will terminate early and not wait for reamining processes to finish. To continue
70    /// waiting for them anyway you can continue to call join until you get a success, this will
71    /// indicate that there are no more running processes under management by the pool.
72    fn join(&mut self) -> Result<(), Error> {
73        // NOTE we do this instead of drain so that errors don't drop the rest of our processes
74        // creating zombies
75        while let Some(proc) = self.procs.pop() {
76            pool::wait_proc(proc)?;
77        }
78        Ok(())
79    }
80}
81
82impl<C> Drop for Rotating<C> {
83    fn drop(&mut self) {
84        // kill any children left in self
85        for proc in &mut self.procs {
86            let _ = proc.kill();
87        }
88        // wait for them to be cleaned up
89        for proc in &mut self.procs {
90            let _ = proc.wait();
91        }
92    }
93}