regex_chunker/
base.rs

1/*!
2The base ByteChunker types.
3*/
4use std::{
5    fmt::{Debug, Formatter},
6    hint::spin_loop,
7    io::{ErrorKind, Read},
8};
9
10use regex::bytes::Regex;
11
12use crate::{ctrl::*, CustomChunker, RcErr, SimpleCustomChunker};
13
14// By default the `read_buffer` size is 1 KiB.
15const DEFAULT_BUFFER_SIZE: usize = 1024;
16
17/**
18The `ByteChunker` takes a
19[`bytes::Regex`](https://docs.rs/regex/latest/regex/bytes/struct.Regex.html),
20wraps a byte source (that is, a type that implements [`std::io::Read`])
21and iterates over chunks of bytes from that source that are delimited by
22the regular expression. It operates very much like
23[`bytes::Regex::split`](https://docs.rs/regex/latest/regex/bytes/struct.Regex.html#method.split),
24except that it works on an incoming stream of bytes instead of a
25necessarily-already-in-memory slice.
26
27```
28use regex_chunker::ByteChunker;
29use std::io::Cursor;
30
31# fn main() -> Result<(), regex_chunker::RcErr> {
32let text = b"One, two, three, four. Can I have a little more?";
33let c = Cursor::new(text);
34
35let chunks: Vec<String> = ByteChunker::new(c, "[ .,?]+")?
36    .map(|res| {
37        let v = res.unwrap();
38        String::from_utf8(v).unwrap()
39    }).collect();
40
41assert_eq!(
42    &chunks,
43    &["One", "two", "three", "four",
44    "Can", "I", "have", "a", "little", "more"].clone()
45);
46# Ok(())
47# }
48```
49
50It's also slightly more flexible, in that the the matched bytes can be
51optionally added to the beginnings or endings of the returned chunks.
52(By default they are just dropped.)
53
54```
55use regex_chunker::{ByteChunker, MatchDisposition};
56use std::io::Cursor;
57
58# fn main() -> Result<(), regex_chunker::RcErr> {
59let text = b"One, two, three, four. Can I have a little more?";
60let c = Cursor::new(text);
61
62let chunks: Vec<String> = ByteChunker::new(c, "[ .,?]+")?
63    .with_match(MatchDisposition::Append)
64    .map(|res| {
65        let v = res.unwrap();
66        String::from_utf8(v).unwrap()
67    }).collect();
68
69assert_eq!(
70    &chunks,
71    &["One, ", "two, ", "three, ", "four. ",
72    "Can ", "I ", "have ", "a ", "little ", "more?"].clone()
73);
74
75# Ok(())
76# }
77
78*/
79pub struct ByteChunker<R> {
80    source: R,
81    fence: Regex,
82    read_buff: Vec<u8>,
83    search_buff: Vec<u8>,
84    error_status: ErrorStatus,
85    match_dispo: MatchDisposition,
86    /* Whether the last search of the search buffer found a match. If it did,
87    then the next call to `.next()` should start by searching the search
88    buffer again; otherwise we should start by trying to pull more bytes
89    from our source. */
90    last_scan_matched: bool,
91    /* If the MatchDisposition is Prepend, we need to keep the match in the
92    scan buffer so we can return it with the next chunk. This means we need
93    to start our next scan of the buffer from _after_ the match, or we'll
94    just match the very beginning of the scan buffer again. */
95    scan_start_offset: usize,
96}
97
98impl<R> ByteChunker<R> {
99    /**
100    Return a new [`ByteChunker`] wrapping the given writer that will chunk its
101    output by delimiting it with the supplied regex pattern.
102    */
103    pub fn new(source: R, delimiter: &str) -> Result<Self, RcErr> {
104        let fence = Regex::new(delimiter)?;
105        Ok(Self {
106            source,
107            fence,
108            read_buff: vec![0u8; DEFAULT_BUFFER_SIZE],
109            search_buff: Vec::new(),
110            error_status: ErrorStatus::Ok,
111            match_dispo: MatchDisposition::default(),
112            last_scan_matched: false,
113            scan_start_offset: 0,
114        })
115    }
116
117    /**
118    Builder-pattern method for setting the read buffer size.
119    Default size is 1024 bytes.
120     */
121    pub fn with_buffer_size(mut self, size: usize) -> Self {
122        self.read_buff.resize(size, 0);
123        self.read_buff.shrink_to_fit();
124        self
125    }
126
127    /**
128    Builder-pattern method for controlling how the chunker behaves when
129    encountering an error in the course of its operation. Default value
130    is [`ErrorResponse::Halt`].
131     */
132    pub fn on_error(mut self, response: ErrorResponse) -> Self {
133        self.error_status = match response {
134            ErrorResponse::Halt => {
135                if self.error_status != ErrorStatus::Errored {
136                    ErrorStatus::Ok
137                } else {
138                    ErrorStatus::Errored
139                }
140            }
141            ErrorResponse::Continue => ErrorStatus::Continue,
142            ErrorResponse::Ignore => ErrorStatus::Ignore,
143        };
144        self
145    }
146
147    /**
148    Builder-pattern method for controlling what the chunker does with the
149    matched text. Default value is [`MatchDisposition::Drop`].
150     */
151    pub fn with_match(mut self, behavior: MatchDisposition) -> Self {
152        self.match_dispo = behavior;
153        if matches!(behavior, MatchDisposition::Drop | MatchDisposition::Append) {
154            // If we swtich to one of these two dispositions, we
155            // need to be sure we reset the scan_start_offset, or
156            // else we'll never scan the beginning of our buffer.
157            self.scan_start_offset = 0;
158        }
159        self
160    }
161
162    /**
163    Consumes the [`ByteChunker`] and returns its wrapped `Read`er.
164    The `ByteChunker` may have read some data from its source that may not
165    yet have been returned or successfully matched; this data may be lost.
166    To retrieve that data, see [`ByteChunker::into_innards`].
167    */
168    pub fn into_inner(self) -> R {
169        self.source
170    }
171
172    /**
173    Consumes the [`ByteChunker`] and returns its wrapped `Read`er, as well
174    as any not-yet-processed data that has been read. If this unprocessed
175    data is unimportant, and you just want the reader back, use the more
176    traditional [`ByteChunker::into_inner`].
177    */
178    pub fn into_innards(self) -> (R, Vec<u8>) {
179        (self.source, self.search_buff)
180    }
181
182    /**
183    Creates a [`CustomChunker`] by combining this `ByteChunker` with an
184    `Adapter` type.
185    */
186    pub fn with_adapter<A>(self, adapter: A) -> CustomChunker<R, A> {
187        (self, adapter).into()
188    }
189
190    pub fn with_simple_adapter<A>(self, adapter: A) -> SimpleCustomChunker<R, A>
191    {
192        (self, adapter).into()
193    }
194
195    /*
196    Search the search_buffer for a match; if found, return the next chunk
197    of bytes to be returned from ]`Iterator::next`].
198    */
199    fn scan_buffer(&mut self) -> Option<Vec<u8>> {
200        let (start, end) = match self
201            .fence
202            .find_at(&self.search_buff, self.scan_start_offset)
203        {
204            Some(m) => {
205                self.last_scan_matched = true;
206                (m.start(), m.end())
207            }
208            None => {
209                self.last_scan_matched = false;
210                return None;
211            }
212        };
213
214        let mut new_buff;
215        match self.match_dispo {
216            MatchDisposition::Drop => {
217                new_buff = self.search_buff.split_off(end);
218                self.search_buff.resize(start, 0);
219            }
220            MatchDisposition::Append => {
221                new_buff = self.search_buff.split_off(end);
222            }
223            MatchDisposition::Prepend => {
224                new_buff = self.search_buff.split_off(start);
225                self.scan_start_offset = end - start;
226            }
227        }
228
229        std::mem::swap(&mut new_buff, &mut self.search_buff);
230        Some(new_buff)
231    }
232
233    // Function for wrapping types that need this information.
234    #[allow(dead_code)]
235    #[inline(always)]
236    fn buff_size(&self) -> usize {
237        return self.read_buff.len();
238    }
239}
240
241impl<R> Debug for ByteChunker<R> {
242    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243        f.debug_struct("ByteChunker")
244            .field("source", &std::any::type_name::<R>())
245            .field("fence", &self.fence)
246            .field("read_buff", &String::from_utf8_lossy(&self.read_buff))
247            .field("search_buff", &String::from_utf8_lossy(&self.search_buff))
248            .field("error_status", &self.error_status)
249            .field("match_dispo", &self.match_dispo)
250            .field("last_scan_matched", &self.last_scan_matched)
251            .field("scan_start_offset", &self.scan_start_offset)
252            .finish()
253    }
254}
255
256/**
257The [`ByteChunker`] specifically doesn't supply an implementation of
258[`Iterator::size_hint`] because, in general, it's impossible to tell
259how much data is left in a reader.
260*/
261impl<R: Read> Iterator for ByteChunker<R> {
262    type Item = Result<Vec<u8>, RcErr>;
263
264    fn next(&mut self) -> Option<Self::Item> {
265        if self.error_status == ErrorStatus::Errored {
266            return None;
267        }
268
269        loop {
270            if !self.last_scan_matched {
271                match self.source.read(&mut self.read_buff) {
272                    Err(e) => match e.kind() {
273                        ErrorKind::WouldBlock | ErrorKind::Interrupted => {
274                            spin_loop();
275                            continue;
276                        }
277                        _ => match self.error_status {
278                            ErrorStatus::Ok | ErrorStatus::Errored => {
279                                self.error_status = ErrorStatus::Errored;
280                                return Some(Err(e.into()));
281                            }
282                            ErrorStatus::Continue => {
283                                return Some(Err(e.into()));
284                            }
285                            ErrorStatus::Ignore => {
286                                continue;
287                            }
288                        },
289                    },
290                    Ok(0) => {
291                        if self.search_buff.is_empty() {
292                            return None;
293                        } else {
294                            let mut new_buff: Vec<u8> = Vec::new();
295                            std::mem::swap(&mut self.search_buff, &mut new_buff);
296                            return Some(Ok(new_buff));
297                        }
298                    }
299                    Ok(n) => {
300                        self.search_buff.extend_from_slice(&self.read_buff[..n]);
301                        match self.scan_buffer() {
302                            Some(v) => return Some(Ok(v)),
303                            None => {
304                                spin_loop();
305                                continue;
306                            }
307                        }
308                    }
309                }
310            } else {
311                match self.scan_buffer() {
312                    Some(v) => return Some(Ok(v)),
313                    None => {
314                        spin_loop();
315                        continue;
316                    }
317                }
318            }
319        }
320    }
321}