1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//! xstream library
//!
//! Provides a command to take a BufRead and split it as input among several processes. There's
//! current not any async support, and therefore no real way to interact with the processes
//! afterwards. Event collecting the output would require effort / synchronization, so currently
//! they're all just piped to the standard inhereted buffers.

mod pool;
use pool::Pool;
use std::borrow::{Borrow, BorrowMut};
use std::io::{BufRead, Error, ErrorKind, Result, Write};
use std::process::{Command, Stdio};

/// stream one reader into several independent processes
///
/// ihandle will be delimited by delim, each section will be piped as stdin to a new command. Up to
/// max_parallel processes will exist at any one time, however if the commands are io intensive or
/// the buffer being piped to each command is long than there won't be much parallelization as this
/// must finish sending a section to a process before it will spin up another. This prevents excess
/// memory consumption, but will be slower than max parallelization.
///
/// Set max parallel to 0 to enable full parallelization.
pub fn xstream(
    mut command: impl BorrowMut<Command>,
    ihandle: &mut impl BufRead,
    delim: impl Borrow<[u8]>,
    max_parallel: usize,
) -> Result<()> {
    let mut pool = Pool::new(max_parallel);
    let command = command
        .borrow_mut()
        .stdin(Stdio::piped())
        .stdout(Stdio::inherit());
    let delim = delim.borrow();

    while {
        let proc = pool.spawn(command)?;
        let ohandle = proc.stdin.as_mut().ok_or(Error::new(
            ErrorKind::Other,
            "failed to capture child process stdin",
        ))?;

        let mut new_process;
        while {
            let buf = ihandle.fill_buf()?;
            // TODO this takes worst case |buf| * |delim| when it only needs to take |buf|, but I
            // couln't find a builtin method to do it
            let (to_write, hit_delim) = match buf.windows(delim.len()).position(|w| w == delim) {
                // impossible to match delim
                _ if buf.len() < delim.len() => (buf.len(), false),
                // no match, write we can to guarantee we didn't write part of a match
                None => (buf.len() - delim.len() + 1, false),
                // matched write up to match, consume the match
                Some(pos) => (pos, true),
            };
            new_process = hit_delim;
            ohandle.write_all(&buf[..to_write])?;
            ihandle.consume(if hit_delim {
                to_write + delim.len()
            } else {
                to_write
            });
            !hit_delim && to_write > 0
        } {}

        new_process
    } {}

    pool.join()
}