xstream_util/
lib.rs

1//! xstream library
2//!
3//! Provides [[xstream]] to take a `BufRead` and splits it as input among several processes.
4//! There's current not any async support, and therefore no real way to interact with the processes
5//! afterwards. Even collecting the output would require effort / synchronization, so currently
6//! they're all just piped to the standard inhereted buffers.
7//!
8//! # Usage
9//!
10//! ```
11//! use std::process::Command;
12//! use xstream_util::Limiting;
13//! # use std::io::BufReader;
14//!
15//! let mut input = // ...
16//! # BufReader::new(&[0_u8; 0][..]);
17//! // Spawn up to two `cat` processes, could also use `Rotating`
18//! let mut pool = Limiting::new(Command::new("cat"), 2);
19//! xstream_util::xstream(&mut pool, &mut input, &b"\n", &None::<&[u8]>).unwrap();
20//! ```
21#![warn(missing_docs)]
22#![warn(clippy::pedantic)]
23
24mod limit;
25mod pool;
26mod rot;
27
28pub use limit::Limiting;
29pub use pool::{Error, Pool};
30pub use rot::Rotating;
31use std::io::{BufRead, Write};
32
33/// Stream one reader into several independent processes
34///
35/// `in_handle` will be delimited by `delim`, each section will be piped as stdin to a command spawned from `pool`.
36///
37/// # Notes
38///
39/// If the commands are io intensive or the buffer being piped to each command is long than there
40/// won't be much parallelization as this must finish sending a section to a process before it will
41/// spin up another.
42///
43/// # Errors
44///
45/// If there are problems spawning processes, the processes themselves fail, or there are problems
46/// reading or writing to the available readers / writers.
47pub fn xstream(
48    pool: &mut impl Pool,
49    in_handle: &mut impl BufRead,
50    delim: impl AsRef<[u8]>,
51    write_delim: &Option<impl AsRef<[u8]>>,
52) -> Result<(), Error> {
53    let delim = delim.as_ref();
54
55    while !in_handle.fill_buf().map_err(Error::Input)?.is_empty() {
56        let proc = pool.get()?;
57        let out_handle = proc.stdin.as_mut().ok_or(Error::StdinNotPiped)?;
58
59        while {
60            let buf = in_handle.fill_buf().map_err(Error::Input)?;
61            let (consume, hit_delim) = match buf.windows(delim.len()).position(|w| w == delim) {
62                // no match
63                None => (
64                    if buf.len() < delim.len() {
65                        // buffer can never contain the match, so dump the rest
66                        buf.len()
67                    } else {
68                        // write we can to guarantee we didn't write part of a match
69                        buf.len() - delim.len() + 1
70                    },
71                    false,
72                ),
73                // matched write up to match, consume the match
74                Some(pos) => (pos + delim.len(), true),
75            };
76            if let (Some(wdel), true) = (write_delim, hit_delim) {
77                out_handle
78                    .write_all(&buf[..consume - delim.len()])
79                    .map_err(Error::Output)?;
80                out_handle.write_all(wdel.as_ref()).map_err(Error::Output)?;
81            } else {
82                out_handle
83                    .write_all(&buf[..consume])
84                    .map_err(Error::Output)?;
85            }
86            in_handle.consume(consume);
87            !hit_delim && consume > 0
88        } {}
89    }
90
91    pool.join()
92}