regex_chunker/base.rs
1/*!
2The base ByteChunker types.
3*/
4use std::{
5 fmt::{Debug, Formatter},
6 hint::spin_loop,
7 io::{ErrorKind, Read},
8};
9
10use regex::bytes::Regex;
11
12use crate::{ctrl::*, CustomChunker, RcErr, SimpleCustomChunker};
13
14// By default the `read_buffer` size is 1 KiB.
15const DEFAULT_BUFFER_SIZE: usize = 1024;
16
17/**
18The `ByteChunker` takes a
19[`bytes::Regex`](https://docs.rs/regex/latest/regex/bytes/struct.Regex.html),
20wraps a byte source (that is, a type that implements [`std::io::Read`])
21and iterates over chunks of bytes from that source that are delimited by
22the regular expression. It operates very much like
23[`bytes::Regex::split`](https://docs.rs/regex/latest/regex/bytes/struct.Regex.html#method.split),
24except that it works on an incoming stream of bytes instead of a
25necessarily-already-in-memory slice.
26
27```
28use regex_chunker::ByteChunker;
29use std::io::Cursor;
30
31# fn main() -> Result<(), regex_chunker::RcErr> {
32let text = b"One, two, three, four. Can I have a little more?";
33let c = Cursor::new(text);
34
35let chunks: Vec<String> = ByteChunker::new(c, "[ .,?]+")?
36 .map(|res| {
37 let v = res.unwrap();
38 String::from_utf8(v).unwrap()
39 }).collect();
40
41assert_eq!(
42 &chunks,
43 &["One", "two", "three", "four",
44 "Can", "I", "have", "a", "little", "more"].clone()
45);
46# Ok(())
47# }
48```
49
50It's also slightly more flexible, in that the the matched bytes can be
51optionally added to the beginnings or endings of the returned chunks.
52(By default they are just dropped.)
53
54```
55use regex_chunker::{ByteChunker, MatchDisposition};
56use std::io::Cursor;
57
58# fn main() -> Result<(), regex_chunker::RcErr> {
59let text = b"One, two, three, four. Can I have a little more?";
60let c = Cursor::new(text);
61
62let chunks: Vec<String> = ByteChunker::new(c, "[ .,?]+")?
63 .with_match(MatchDisposition::Append)
64 .map(|res| {
65 let v = res.unwrap();
66 String::from_utf8(v).unwrap()
67 }).collect();
68
69assert_eq!(
70 &chunks,
71 &["One, ", "two, ", "three, ", "four. ",
72 "Can ", "I ", "have ", "a ", "little ", "more?"].clone()
73);
74
75# Ok(())
76# }
77
78*/
79pub struct ByteChunker<R> {
80 source: R,
81 fence: Regex,
82 read_buff: Vec<u8>,
83 search_buff: Vec<u8>,
84 error_status: ErrorStatus,
85 match_dispo: MatchDisposition,
86 /* Whether the last search of the search buffer found a match. If it did,
87 then the next call to `.next()` should start by searching the search
88 buffer again; otherwise we should start by trying to pull more bytes
89 from our source. */
90 last_scan_matched: bool,
91 /* If the MatchDisposition is Prepend, we need to keep the match in the
92 scan buffer so we can return it with the next chunk. This means we need
93 to start our next scan of the buffer from _after_ the match, or we'll
94 just match the very beginning of the scan buffer again. */
95 scan_start_offset: usize,
96}
97
98impl<R> ByteChunker<R> {
99 /**
100 Return a new [`ByteChunker`] wrapping the given writer that will chunk its
101 output by delimiting it with the supplied regex pattern.
102 */
103 pub fn new(source: R, delimiter: &str) -> Result<Self, RcErr> {
104 let fence = Regex::new(delimiter)?;
105 Ok(Self {
106 source,
107 fence,
108 read_buff: vec![0u8; DEFAULT_BUFFER_SIZE],
109 search_buff: Vec::new(),
110 error_status: ErrorStatus::Ok,
111 match_dispo: MatchDisposition::default(),
112 last_scan_matched: false,
113 scan_start_offset: 0,
114 })
115 }
116
117 /**
118 Builder-pattern method for setting the read buffer size.
119 Default size is 1024 bytes.
120 */
121 pub fn with_buffer_size(mut self, size: usize) -> Self {
122 self.read_buff.resize(size, 0);
123 self.read_buff.shrink_to_fit();
124 self
125 }
126
127 /**
128 Builder-pattern method for controlling how the chunker behaves when
129 encountering an error in the course of its operation. Default value
130 is [`ErrorResponse::Halt`].
131 */
132 pub fn on_error(mut self, response: ErrorResponse) -> Self {
133 self.error_status = match response {
134 ErrorResponse::Halt => {
135 if self.error_status != ErrorStatus::Errored {
136 ErrorStatus::Ok
137 } else {
138 ErrorStatus::Errored
139 }
140 }
141 ErrorResponse::Continue => ErrorStatus::Continue,
142 ErrorResponse::Ignore => ErrorStatus::Ignore,
143 };
144 self
145 }
146
147 /**
148 Builder-pattern method for controlling what the chunker does with the
149 matched text. Default value is [`MatchDisposition::Drop`].
150 */
151 pub fn with_match(mut self, behavior: MatchDisposition) -> Self {
152 self.match_dispo = behavior;
153 if matches!(behavior, MatchDisposition::Drop | MatchDisposition::Append) {
154 // If we swtich to one of these two dispositions, we
155 // need to be sure we reset the scan_start_offset, or
156 // else we'll never scan the beginning of our buffer.
157 self.scan_start_offset = 0;
158 }
159 self
160 }
161
162 /**
163 Consumes the [`ByteChunker`] and returns its wrapped `Read`er.
164 The `ByteChunker` may have read some data from its source that may not
165 yet have been returned or successfully matched; this data may be lost.
166 To retrieve that data, see [`ByteChunker::into_innards`].
167 */
168 pub fn into_inner(self) -> R {
169 self.source
170 }
171
172 /**
173 Consumes the [`ByteChunker`] and returns its wrapped `Read`er, as well
174 as any not-yet-processed data that has been read. If this unprocessed
175 data is unimportant, and you just want the reader back, use the more
176 traditional [`ByteChunker::into_inner`].
177 */
178 pub fn into_innards(self) -> (R, Vec<u8>) {
179 (self.source, self.search_buff)
180 }
181
182 /**
183 Creates a [`CustomChunker`] by combining this `ByteChunker` with an
184 `Adapter` type.
185 */
186 pub fn with_adapter<A>(self, adapter: A) -> CustomChunker<R, A> {
187 (self, adapter).into()
188 }
189
190 pub fn with_simple_adapter<A>(self, adapter: A) -> SimpleCustomChunker<R, A>
191 {
192 (self, adapter).into()
193 }
194
195 /*
196 Search the search_buffer for a match; if found, return the next chunk
197 of bytes to be returned from ]`Iterator::next`].
198 */
199 fn scan_buffer(&mut self) -> Option<Vec<u8>> {
200 let (start, end) = match self
201 .fence
202 .find_at(&self.search_buff, self.scan_start_offset)
203 {
204 Some(m) => {
205 self.last_scan_matched = true;
206 (m.start(), m.end())
207 }
208 None => {
209 self.last_scan_matched = false;
210 return None;
211 }
212 };
213
214 let mut new_buff;
215 match self.match_dispo {
216 MatchDisposition::Drop => {
217 new_buff = self.search_buff.split_off(end);
218 self.search_buff.resize(start, 0);
219 }
220 MatchDisposition::Append => {
221 new_buff = self.search_buff.split_off(end);
222 }
223 MatchDisposition::Prepend => {
224 new_buff = self.search_buff.split_off(start);
225 self.scan_start_offset = end - start;
226 }
227 }
228
229 std::mem::swap(&mut new_buff, &mut self.search_buff);
230 Some(new_buff)
231 }
232
233 // Function for wrapping types that need this information.
234 #[allow(dead_code)]
235 #[inline(always)]
236 fn buff_size(&self) -> usize {
237 return self.read_buff.len();
238 }
239}
240
241impl<R> Debug for ByteChunker<R> {
242 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243 f.debug_struct("ByteChunker")
244 .field("source", &std::any::type_name::<R>())
245 .field("fence", &self.fence)
246 .field("read_buff", &String::from_utf8_lossy(&self.read_buff))
247 .field("search_buff", &String::from_utf8_lossy(&self.search_buff))
248 .field("error_status", &self.error_status)
249 .field("match_dispo", &self.match_dispo)
250 .field("last_scan_matched", &self.last_scan_matched)
251 .field("scan_start_offset", &self.scan_start_offset)
252 .finish()
253 }
254}
255
256/**
257The [`ByteChunker`] specifically doesn't supply an implementation of
258[`Iterator::size_hint`] because, in general, it's impossible to tell
259how much data is left in a reader.
260*/
261impl<R: Read> Iterator for ByteChunker<R> {
262 type Item = Result<Vec<u8>, RcErr>;
263
264 fn next(&mut self) -> Option<Self::Item> {
265 if self.error_status == ErrorStatus::Errored {
266 return None;
267 }
268
269 loop {
270 if !self.last_scan_matched {
271 match self.source.read(&mut self.read_buff) {
272 Err(e) => match e.kind() {
273 ErrorKind::WouldBlock | ErrorKind::Interrupted => {
274 spin_loop();
275 continue;
276 }
277 _ => match self.error_status {
278 ErrorStatus::Ok | ErrorStatus::Errored => {
279 self.error_status = ErrorStatus::Errored;
280 return Some(Err(e.into()));
281 }
282 ErrorStatus::Continue => {
283 return Some(Err(e.into()));
284 }
285 ErrorStatus::Ignore => {
286 continue;
287 }
288 },
289 },
290 Ok(0) => {
291 if self.search_buff.is_empty() {
292 return None;
293 } else {
294 let mut new_buff: Vec<u8> = Vec::new();
295 std::mem::swap(&mut self.search_buff, &mut new_buff);
296 return Some(Ok(new_buff));
297 }
298 }
299 Ok(n) => {
300 self.search_buff.extend_from_slice(&self.read_buff[..n]);
301 match self.scan_buffer() {
302 Some(v) => return Some(Ok(v)),
303 None => {
304 spin_loop();
305 continue;
306 }
307 }
308 }
309 }
310 } else {
311 match self.scan_buffer() {
312 Some(v) => return Some(Ok(v)),
313 None => {
314 spin_loop();
315 continue;
316 }
317 }
318 }
319 }
320 }
321}