xstream_util/lib.rs
1//! xstream library
2//!
3//! Provides [[xstream]] to take a `BufRead` and splits it as input among several processes.
4//! There's current not any async support, and therefore no real way to interact with the processes
5//! afterwards. Even collecting the output would require effort / synchronization, so currently
6//! they're all just piped to the standard inhereted buffers.
7//!
8//! # Usage
9//!
10//! ```
11//! use std::process::Command;
12//! use xstream_util::Limiting;
13//! # use std::io::BufReader;
14//!
15//! let mut input = // ...
16//! # BufReader::new(&[0_u8; 0][..]);
17//! // Spawn up to two `cat` processes, could also use `Rotating`
18//! let mut pool = Limiting::new(Command::new("cat"), 2);
19//! xstream_util::xstream(&mut pool, &mut input, &b"\n", &None::<&[u8]>).unwrap();
20//! ```
21#![warn(missing_docs)]
22#![warn(clippy::pedantic)]
23
24mod limit;
25mod pool;
26mod rot;
27
28pub use limit::Limiting;
29pub use pool::{Error, Pool};
30pub use rot::Rotating;
31use std::io::{BufRead, Write};
32
33/// Stream one reader into several independent processes
34///
35/// `in_handle` will be delimited by `delim`, each section will be piped as stdin to a command spawned from `pool`.
36///
37/// # Notes
38///
39/// If the commands are io intensive or the buffer being piped to each command is long than there
40/// won't be much parallelization as this must finish sending a section to a process before it will
41/// spin up another.
42///
43/// # Errors
44///
45/// If there are problems spawning processes, the processes themselves fail, or there are problems
46/// reading or writing to the available readers / writers.
47pub fn xstream(
48 pool: &mut impl Pool,
49 in_handle: &mut impl BufRead,
50 delim: impl AsRef<[u8]>,
51 write_delim: &Option<impl AsRef<[u8]>>,
52) -> Result<(), Error> {
53 let delim = delim.as_ref();
54
55 while !in_handle.fill_buf().map_err(Error::Input)?.is_empty() {
56 let proc = pool.get()?;
57 let out_handle = proc.stdin.as_mut().ok_or(Error::StdinNotPiped)?;
58
59 while {
60 let buf = in_handle.fill_buf().map_err(Error::Input)?;
61 let (consume, hit_delim) = match buf.windows(delim.len()).position(|w| w == delim) {
62 // no match
63 None => (
64 if buf.len() < delim.len() {
65 // buffer can never contain the match, so dump the rest
66 buf.len()
67 } else {
68 // write we can to guarantee we didn't write part of a match
69 buf.len() - delim.len() + 1
70 },
71 false,
72 ),
73 // matched write up to match, consume the match
74 Some(pos) => (pos + delim.len(), true),
75 };
76 if let (Some(wdel), true) = (write_delim, hit_delim) {
77 out_handle
78 .write_all(&buf[..consume - delim.len()])
79 .map_err(Error::Output)?;
80 out_handle.write_all(wdel.as_ref()).map_err(Error::Output)?;
81 } else {
82 out_handle
83 .write_all(&buf[..consume])
84 .map_err(Error::Output)?;
85 }
86 in_handle.consume(consume);
87 !hit_delim && consume > 0
88 } {}
89 }
90
91 pool.join()
92}