1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
//! xstream library
//!
//! Provides a command to take a BufRead and split it as input among several processes. There's
//! current not any async support, and therefore no real way to interact with the processes
//! afterwards. Event collecting the output would require effort / synchronization, so currently
//! they're all just piped to the standard inhereted buffers.
mod pool;
use pool::Pool;
use std::borrow::{Borrow, BorrowMut};
use std::io::{BufRead, Error, ErrorKind, Result, Write};
use std::process::{Command, Stdio};
/// stream one reader into several independent processes
///
/// ihandle will be delimited by delim, each section will be piped as stdin to a new command. Up to
/// max_parallel processes will exist at any one time, however if the commands are io intensive or
/// the buffer being piped to each command is long than there won't be much parallelization as this
/// must finish sending a section to a process before it will spin up another. This prevents excess
/// memory consumption, but will be slower than max parallelization.
///
/// Set max parallel to 0 to enable full parallelization.
pub fn xstream(
    mut command: impl BorrowMut<Command>,
    ihandle: &mut impl BufRead,
    delim: impl Borrow<[u8]>,
    max_parallel: usize,
) -> Result<()> {
    let mut pool = Pool::new(max_parallel);
    let command = command
        .borrow_mut()
        .stdin(Stdio::piped())
        .stdout(Stdio::inherit());
    let delim = delim.borrow();
    while {
        let proc = pool.spawn(command)?;
        let ohandle = proc
            .stdin
            .as_mut()
            .ok_or_else(|| Error::new(ErrorKind::Other, "failed to capture child process stdin"))?;
        let mut new_process;
        while {
            let buf = ihandle.fill_buf()?;
            // TODO this takes worst case |buf| * |delim| when it only needs to take |buf|, but I
            // couln't find a builtin method to do it
            let (to_write, hit_delim) = match buf.windows(delim.len()).position(|w| w == delim) {
                // impossible to match delim
                _ if buf.len() < delim.len() => (buf.len(), false),
                // no match, write we can to guarantee we didn't write part of a match
                None => (buf.len() - delim.len() + 1, false),
                // matched write up to match, consume the match
                Some(pos) => (pos, true),
            };
            new_process = hit_delim;
            ohandle.write_all(&buf[..to_write])?;
            ihandle.consume(if hit_delim {
                to_write + delim.len()
            } else {
                to_write
            });
            !hit_delim && to_write > 0
        } {}
        new_process
    } {}
    pool.join()
}