1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
//! xstream library
//!
//! Provides a command to take a BufRead and split it as input among several processes. There's
//! current not any async support, and therefore no real way to interact with the processes
//! afterwards. Event collecting the output would require effort / synchronization, so currently
//! they're all just piped to the standard inhereted buffers.
mod pool;
use pool::Pool;
use std::borrow::{Borrow, BorrowMut};
use std::io::{BufRead, Error, ErrorKind, Result, Write};
use std::process::{Command, Stdio};
/// stream one reader into several independent processes
///
/// ihandle will be delimited by delim, each section will be piped as stdin to a new command. Up to
/// max_parallel processes will exist at any one time, however if the commands are io intensive or
/// the buffer being piped to each command is long than there won't be much parallelization as this
/// must finish sending a section to a process before it will spin up another. This prevents excess
/// memory consumption, but will be slower than max parallelization.
///
/// Set max parallel to 0 to enable full parallelization.
pub fn xstream(
mut command: impl BorrowMut<Command>,
ihandle: &mut impl BufRead,
delim: impl Borrow<[u8]>,
max_parallel: usize,
) -> Result<()> {
let mut pool = Pool::new(max_parallel);
let command = command
.borrow_mut()
.stdin(Stdio::piped())
.stdout(Stdio::inherit());
let delim = delim.borrow();
while {
let proc = pool.spawn(command)?;
let ohandle = proc.stdin.as_mut().ok_or(Error::new(
ErrorKind::Other,
"failed to capture child process stdin",
))?;
let mut new_process;
while {
let buf = ihandle.fill_buf()?;
// TODO this takes worst case |buf| * |delim| when it only needs to take |buf|, but I
// couln't find a builtin method to do it
let (to_write, hit_delim) = match buf.windows(delim.len()).position(|w| w == delim) {
// impossible to match delim
_ if buf.len() < delim.len() => (buf.len(), false),
// no match, write we can to guarantee we didn't write part of a match
None => (buf.len() - delim.len() + 1, false),
// matched write up to match, consume the match
Some(pos) => (pos, true),
};
new_process = hit_delim;
ohandle.write_all(&buf[..to_write])?;
ihandle.consume(if hit_delim {
to_write + delim.len()
} else {
to_write
});
!hit_delim && to_write > 0
} {}
new_process
} {}
pool.join()
}