1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
//! xstream library
//!
//! Provides [[xstream]] to take a `BufRead` and splits it as input among several processes.
//! There's current not any async support, and therefore no real way to interact with the processes
//! afterwards. Even collecting the output would require effort / synchronization, so currently
//! they're all just piped to the standard inhereted buffers.
//!
//! # Usage
//!
//! ```
//! use std::process::Command;
//! use xstream_util::Limiting;
//! # use std::io::BufReader;
//!
//! let mut input = // ...
//! # BufReader::new(&[0_u8; 0][..]);
//! // Spawn up to two `cat` processes, could also use `Rotating`
//! let mut pool = Limiting::new(Command::new("cat"), 2);
//! xstream_util::xstream(&mut pool, &mut input, &b"\n", &None::<&[u8]>).unwrap();
//! ```
#![warn(missing_docs)]
#![warn(clippy::pedantic)]
mod limit;
mod pool;
mod rot;
pub use limit::Limiting;
pub use pool::{Error, Pool};
pub use rot::Rotating;
use std::io::{BufRead, Write};
/// Stream one reader into several independent processes
///
/// `in_handle` will be delimited by `delim`, each section will be piped as stdin to a command spawned from `pool`.
///
/// # Notes
///
/// If the commands are io intensive or the buffer being piped to each command is long than there
/// won't be much parallelization as this must finish sending a section to a process before it will
/// spin up another.
///
/// # Errors
///
/// If there are problems spawning processes, the processes themselves fail, or there are problems
/// reading or writing to the available readers / writers.
pub fn xstream(
pool: &mut impl Pool,
in_handle: &mut impl BufRead,
delim: impl AsRef<[u8]>,
write_delim: &Option<impl AsRef<[u8]>>,
) -> Result<(), Error> {
let delim = delim.as_ref();
while !in_handle.fill_buf().map_err(Error::Input)?.is_empty() {
let proc = pool.get()?;
let out_handle = proc.stdin.as_mut().ok_or(Error::StdinNotPiped)?;
while {
let buf = in_handle.fill_buf().map_err(Error::Input)?;
let (consume, hit_delim) = match buf.windows(delim.len()).position(|w| w == delim) {
// no match
None => (
if buf.len() < delim.len() {
// buffer can never contain the match, so dump the rest
buf.len()
} else {
// write we can to guarantee we didn't write part of a match
buf.len() - delim.len() + 1
},
false,
),
// matched write up to match, consume the match
Some(pos) => (pos + delim.len(), true),
};
if let (Some(wdel), true) = (write_delim, hit_delim) {
out_handle
.write_all(&buf[..consume - delim.len()])
.map_err(Error::Output)?;
out_handle.write_all(wdel.as_ref()).map_err(Error::Output)?;
} else {
out_handle
.write_all(&buf[..consume])
.map_err(Error::Output)?;
}
in_handle.consume(consume);
!hit_delim && consume > 0
} {}
}
pool.join()
}