regex_chunker/
stream.rs

1/*!
2Asynchronous analogs to the base `*Chunker` types that wrap
3[Tokio](https://tokio.rs/)'s
4[`AsyncRead`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html)
5types and implement
6[`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html).
7*/
8
9use std::{
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use bytes::{Buf, BytesMut};
15use regex::bytes::Regex;
16use tokio::io::AsyncRead;
17use tokio_stream::Stream;
18use tokio_util::codec::{Decoder, FramedRead};
19
20use crate::{Adapter, MatchDisposition, RcErr};
21
22struct ByteDecoder {
23    fence: Regex,
24    match_dispo: MatchDisposition,
25    scan_offset: usize,
26}
27
28impl Decoder for ByteDecoder {
29    type Item = Vec<u8>;
30    type Error = RcErr;
31
32    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
33        let (start, end) = match self.fence.find_at(src.as_ref(), self.scan_offset) {
34            Some(m) => (m.start(), m.end()),
35            None => return Ok(None),
36        };
37        let length = end - start;
38
39        let new_buff = match self.match_dispo {
40            MatchDisposition::Drop => {
41                let new_buff: Vec<u8> = src.split_to(start).into();
42                src.advance(length);
43                new_buff
44            }
45            MatchDisposition::Append => src.split_to(end).into(),
46            MatchDisposition::Prepend => {
47                self.scan_offset = length;
48                src.split_to(start).into()
49            }
50        };
51
52        Ok(Some(new_buff))
53    }
54
55    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
56        if let Some(v) = self.decode(src)? {
57            Ok(Some(v))
58        } else if src.is_empty() {
59            Ok(None)
60        } else {
61            Ok(Some(src.split().into()))
62        }
63    }
64}
65
66/**
67The `stream::ByteChunker` is the `async` analog to the base
68[`ByteChunker`](crate::ByteChunker) type. It wraps an
69[`AsyncRead`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html)er
70and implements the
71[`Stream`](https://docs.rs/futures-core/0.3.28/futures_core/stream/trait.Stream.html)
72trait.
73
74This async version of the base `ByteChunker` is less flexible in how it
75handles errors; you'll get errors when Tokio's underlying black magic
76returns them.
77*/
78pub struct ByteChunker<R: AsyncRead> {
79    freader: FramedRead<R, ByteDecoder>,
80}
81
82impl<R: AsyncRead> ByteChunker<R> {
83    /// Return a new [`ByteChunker`] wrapping the given async reader that
84    /// will chunk its output be delimiting it with the given regular
85    /// expression pattern.
86    pub fn new(source: R, pattern: &str) -> Result<Self, RcErr> {
87        let fence = Regex::new(pattern)?;
88        let decoder = ByteDecoder {
89            fence,
90            //error_status: ErrorStatus::Ok,
91            match_dispo: MatchDisposition::default(),
92            scan_offset: 0,
93        };
94
95        let freader = FramedRead::new(source, decoder);
96        Ok(Self { freader })
97    }
98
99    pub fn with_adapter<A>(self, adapter: A) -> CustomChunker<R, A> {
100        CustomChunker {
101            chunker: self,
102            adapter,
103        }
104    }
105
106    /// Builder-pattern for controlling what the chunker does with the
107    /// matched text; default value is [`MatchDisposition::Drop`].
108    pub fn with_match(mut self, behavior: MatchDisposition) -> Self {
109        let d = self.freader.decoder_mut();
110        d.match_dispo = behavior;
111        if matches!(behavior, MatchDisposition::Drop | MatchDisposition::Append) {
112            d.scan_offset = 0;
113        }
114        self
115    }
116}
117
118impl<A: AsyncRead + Unpin> Stream for ByteChunker<A> {
119    type Item = Result<Vec<u8>, RcErr>;
120
121    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122        Pin::new(&mut self.freader).poll_next(cx)
123    }
124}
125
126/**
127The async analog to the base crate's
128[`CustomChunker`](`crate::CustomChunker`).
129It takes an [`Adapter`] and yields chunks based on the `Adapter`'s
130transformation.
131
132```rust
133# use std::error::Error;
134# #[tokio::main]
135# async fn main() -> Result<(), Box<dyn Error>> {
136    use regex_chunker::{
137        stream::ByteChunker,
138        StringAdapter,
139    };
140    use tokio_stream::StreamExt;
141    use std::io::Cursor;
142
143    let text = b"One, two, three four. Can I have a little more?";
144    let c = Cursor::new(text);
145
146    let chunks: Vec<_> = ByteChunker::new(c, "[ .,?]+")?
147        .with_adapter(StringAdapter::default())
148        .map(|res| res.unwrap())
149        .collect().await;
150
151    assert_eq!(
152        &chunks,
153        &[
154            "One", "two", "three", "four",
155            "Can", "I", "have", "a", "little", "more"
156        ].clone()
157    );
158#   Ok(()) }
159*/
160pub struct CustomChunker<R: AsyncRead, A> {
161    chunker: ByteChunker<R>,
162    adapter: A,
163}
164
165impl<R: AsyncRead, A> CustomChunker<R, A> {
166    /// Consumes the [`CustomChunker`] and returns the underlying
167    /// [`ByteChunker`] and [`Adapter`].
168    pub fn into_innards(self) -> (ByteChunker<R>, A) {
169        (self.chunker, self.adapter)
170    }
171
172    /// Get a reference to the underlying [`Adapter`].
173    pub fn get_adapter(&self) -> &A { &self.adapter }
174
175    /// Get a mutable reference to the underlying [`Adapter`].
176    pub fn get_adapter_mut(&mut self) -> &mut A { &mut self.adapter }
177}
178
179impl<R: AsyncRead, A> Unpin for CustomChunker<R, A> {}
180
181impl<R, A> Stream for CustomChunker<R, A>
182where
183    R: AsyncRead + Unpin,
184    A: Adapter
185{
186    type Item = A::Item;
187
188    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
189        let p = Pin::new(&mut self.chunker).poll_next(cx);
190        match p {
191            Poll::Pending => Poll::Pending,
192            Poll::Ready(x) => Poll::Ready(self.adapter.adapt(x)),
193        }
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    #[allow(unused_imports)]
201    use crate::tests::{
202        chunk_vec, ref_slice_cmp, HTTP_PATT, HTTP_URL, PASSWD_PATH, PASSWD_PATT, TEST_PATH,
203        TEST_PATT,
204    };
205
206    use std::process::Stdio;
207
208    use tokio::{fs::File, process::Command};
209    use tokio_stream::StreamExt;
210
211    static SOURCE: &str = "target/debug/slowsource";
212    static SOURCE_ARGS: &[&str] = &[TEST_PATH, "0.0", "0.1"];
213
214    #[tokio::test]
215    async fn basic_async() {
216        let byte_vec = std::fs::read(TEST_PATH).unwrap();
217        let re = Regex::new(TEST_PATT).unwrap();
218        let slice_vec = chunk_vec(&re, &byte_vec, MatchDisposition::Drop);
219
220        let f = File::open(TEST_PATH).await.unwrap();
221        let chunker = ByteChunker::new(f, TEST_PATT).unwrap();
222        let vec_vec: Vec<Vec<u8>> = chunker.map(|res| res.unwrap()).collect().await;
223
224        ref_slice_cmp(&vec_vec, &slice_vec);
225    }
226
227    #[tokio::test]
228    async fn slow_async() {
229        let byte_vec = std::fs::read(TEST_PATH).unwrap();
230        let re = Regex::new(TEST_PATT).unwrap();
231        let slice_vec = chunk_vec(&re, &byte_vec, MatchDisposition::Drop);
232
233        let mut child = Command::new(SOURCE)
234            .args(SOURCE_ARGS)
235            .stdout(Stdio::piped())
236            .spawn()
237            .unwrap();
238        let stdout = child.stdout.take().unwrap();
239        let chunker = ByteChunker::new(stdout, TEST_PATT).unwrap();
240        let vec_vec: Vec<Vec<u8>> = chunker.map(|res| res.unwrap()).collect().await;
241        child.wait().await.unwrap();
242
243        ref_slice_cmp(&vec_vec, &slice_vec);
244    }
245}