Skip to main content

over_there/utils/
delimiter.rs

1use std::future::Future;
2use std::io;
3
4pub const DEFAULT_DELIMITER: &[u8] = b"</>";
5
6#[derive(Clone, Copy)]
7struct PreReadResult {
8    buf_len: usize,
9    delimiter_len: usize,
10}
11
12/// Reader that takes closure to perform actual read, supporting sync and async
13/// reads, buffering, and extracting data separated by a delimiter
14pub struct DelimiterReader {
15    /// Holds onto data that overflows past a delimiter
16    buf: Box<[u8]>,
17
18    /// Indicator of start of search for delimiter within buf
19    buf_pos: usize,
20
21    /// Indicator of how much of buffer is occupied with data
22    buf_filled: usize,
23
24    /// The delimiter to look for in read data
25    pub delimiter: Vec<u8>,
26}
27
28impl DelimiterReader {
29    pub fn new_with_delimiter(max_data_size: usize, delimiter: &[u8]) -> Self {
30        Self {
31            buf: vec![0; max_data_size + delimiter.len()].into_boxed_slice(),
32            buf_pos: 0,
33            buf_filled: 0,
34            delimiter: delimiter.to_vec(),
35        }
36    }
37
38    pub fn new(max_data_size: usize) -> Self {
39        Self::new_with_delimiter(max_data_size, DEFAULT_DELIMITER)
40    }
41
42    /// Looks for a delimiter in the internal buffer that is not complete by
43    /// searching backwards to find the largest partial delimiter; will
44    /// never return more than a full delimiter and no less than 0
45    ///
46    /// (start position, size)
47    fn find_partial_delimiter(&self) -> (usize, usize) {
48        let b_len = self.buf.len();
49        let delimiter_len = self.delimiter.len();
50        let mut size = 0;
51        let mut pos = 0;
52
53        for i in (b_len - delimiter_len)..b_len {
54            let l = b_len - i;
55            if self.buf[i..] == self.delimiter[..l] {
56                size = l;
57                pos = i;
58                break;
59            }
60        }
61
62        (pos, size)
63    }
64
65    /// Performs movement of internal buffer data based on maximum capacity
66    /// of buffer being reached
67    fn pre_read(&mut self) -> PreReadResult {
68        let buf_len = self.buf.len();
69        let delimiter_len = self.delimiter.len();
70
71        // If for some reason the buffer is completely full and we haven't
72        // found our delimiter, we will shift by up to (not including) the
73        // delimiter size and try again; this creates a sliding window where
74        // we keep the max data size specified at all times
75        if self.buf_pos > (buf_len - delimiter_len) {
76            let (pd_pos, pdelimiter_len) = self.find_partial_delimiter();
77            let shift_len = delimiter_len - pdelimiter_len;
78            self.buf.rotate_left(shift_len);
79            self.buf_filled -= shift_len;
80            self.buf_pos = pd_pos - shift_len;
81        };
82
83        PreReadResult {
84            buf_len,
85            delimiter_len,
86        }
87    }
88
89    /// Performs update to buffer based on read bytes, copying data to external
90    /// buffer if we have found a delimiter and making room for new data
91    fn post_read(
92        &mut self,
93        data: &mut [u8],
94        bytes_read: Option<&usize>,
95        preread_result: PreReadResult,
96    ) -> usize {
97        let PreReadResult {
98            buf_len,
99            delimiter_len,
100        } = preread_result;
101
102        // Mark where we will start reading and then update the filled count
103        if let Some(bytes_read) = bytes_read {
104            self.buf_filled += bytes_read;
105        }
106
107        // Scan for the delimiter starting from the last place searched
108        let mut size = 0;
109        if self.buf_filled > 0 {
110            for i in self.buf_pos..=(buf_len - delimiter_len) {
111                // If we have a match, we want to copy the contents (minus the delimiter) to the
112                // provided buffer, shift over any remaining data, and reset our buf filled count
113                if self.buf[i..i + delimiter_len] == self.delimiter[..] {
114                    data[..i].copy_from_slice(&self.buf[..i]);
115                    for j in &mut self.buf[..i + delimiter_len] {
116                        *j = 0;
117                    }
118                    self.buf.rotate_left(i + delimiter_len);
119                    self.buf_filled -= i + delimiter_len;
120                    self.buf_pos = 0;
121                    size = i;
122                    break;
123                }
124
125                // Move buffer position after what we just checked
126                self.buf_pos = i + 1;
127            }
128        }
129
130        size
131    }
132
133    /// Performs an synchronous read using the given synchronous closure
134    pub fn read<F>(&mut self, data: &mut [u8], f: F) -> io::Result<usize>
135    where
136        F: FnOnce(&mut [u8]) -> io::Result<usize>,
137    {
138        let preread_result = self.pre_read();
139
140        // Attempt to fill up as much of buffer as possible without spilling over
141        //
142        // NOTE: This causes problems because we could have bytes still remaining
143        //       in our buffer, but we will never get them because we exit
144        //       immediately due to being unavailable; so, we wait to fully
145        //       evaluate and return the error until the end in case we can
146        //       process our buffer from existing data instead
147        let read_result = f(&mut self.buf[self.buf_filled..]);
148
149        let size =
150            self.post_read(data, read_result.as_ref().ok(), preread_result);
151
152        // If we didn't find anything new in our internal buffer and the read
153        // result failed, we want to return the failure
154        if size == 0 && read_result.is_err() {
155            read_result
156        } else {
157            Ok(size)
158        }
159    }
160
161    /// Performs an asynchronous read using the given asynchronous closure
162    pub async fn async_read<R, F>(
163        &mut self,
164        data: &mut [u8],
165        r: R,
166    ) -> io::Result<usize>
167    where
168        R: FnOnce(&mut [u8]) -> F,
169        F: Future<Output = io::Result<usize>>,
170    {
171        let preread_result = self.pre_read();
172
173        // Attempt to fill up as much of buffer as possible without spilling over
174        //
175        // NOTE: This causes problems because we could have bytes still remaining
176        //       in our buffer, but we will never get them because we exit
177        //       immediately due to being unavailable; so, we wait to fully
178        //       evaluate and return the error until the end in case we can
179        //       process our buffer from existing data instead
180        let read_result = r(&mut self.buf[self.buf_filled..]).await;
181
182        let size =
183            self.post_read(data, read_result.as_ref().ok(), preread_result);
184
185        // If we didn't find anything new in our internal buffer and the read
186        // result failed, we want to return the failure
187        if size == 0 && read_result.is_err() {
188            read_result
189        } else {
190            Ok(size)
191        }
192    }
193}
194
195/// Writer that takes closure to perform actual write, supporting sync and async
196/// writes, tacking on a delimiter with each full write
197pub struct DelimiterWriter {
198    pub delimiter: Vec<u8>,
199}
200
201impl DelimiterWriter {
202    pub fn new_with_delimiter(delimiter: &[u8]) -> Self {
203        Self {
204            delimiter: delimiter.to_vec(),
205        }
206    }
207
208    pub fn new() -> Self {
209        Self::new_with_delimiter(DEFAULT_DELIMITER)
210    }
211
212    pub fn write<F>(&mut self, data: &[u8], mut f: F) -> io::Result<usize>
213    where
214        F: FnMut(&[u8]) -> io::Result<usize>,
215    {
216        // Send all of the requested data first
217        f(data)?;
218
219        // Then send our delimiter
220        f(&self.delimiter)?;
221
222        Ok(data.len())
223    }
224}
225
226impl Default for DelimiterWriter {
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use std::io::Cursor;
236
237    fn empty_nonblocking_read() -> impl FnOnce(&mut [u8]) -> io::Result<usize> {
238        |_| Err(io::Error::from(io::ErrorKind::WouldBlock))
239    }
240
241    fn read_from_cursor<'a>(
242        c: &'a mut Cursor<Vec<u8>>,
243    ) -> impl FnOnce(&mut [u8]) -> io::Result<usize> + 'a {
244        use std::io::Read;
245        move |data| c.read(data)
246    }
247
248    fn write_from_vec<'a>(
249        v: &'a mut Vec<u8>,
250    ) -> impl FnMut(&[u8]) -> io::Result<usize> + 'a {
251        use std::io::Write;
252        move |data| v.write(data)
253    }
254
255    #[test]
256    fn delimiter_reader_find_partial_delimiter_if_it_exists() {
257        // Make a delimiter of 3 bytes and a data size of 3 byte, meaning
258        // that internally the reader will grab 6 bytes at most at a time
259        let delimiter = b"-+!";
260        let max_data_size = 3;
261
262        // First check that we properly return 0 for size if the buffer does
263        // not contain the delimiter at all
264        let mut r =
265            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
266        r.buf.copy_from_slice(b"000000");
267        assert_eq!(r.find_partial_delimiter(), (0, 0));
268
269        // Second check that we properly return 0 for size if the buffer does
270        // not partially end with the delimiter; we don't check the beginning
271        let mut r =
272            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
273        r.buf.copy_from_slice(b"-+!000");
274        assert_eq!(r.find_partial_delimiter(), (0, 0));
275
276        // Third check that we properly return 0 for size if the buffer does
277        // not partially end with the delimiter; we don't get tripped up by
278        // part of a delimiter mid-way
279        let mut r =
280            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
281        r.buf.copy_from_slice(b"000-+0");
282        assert_eq!(r.find_partial_delimiter(), (0, 0));
283
284        // Fourth, check that we properly match a single byte of the delimiter
285        let mut r =
286            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
287        r.buf.copy_from_slice(b"00000-");
288        assert_eq!(
289            r.find_partial_delimiter(),
290            (5, 1),
291            "Failed to find first byte of delimiter"
292        );
293
294        // Fifth, check that we properly match multiple bytes of the delimiter
295        let mut r =
296            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
297        r.buf.copy_from_slice(b"0000-+");
298        assert_eq!(
299            r.find_partial_delimiter(),
300            (4, 2),
301            "Failed to find multiple bytes of delimiter"
302        );
303
304        // Sixth, check that we properly match the delimiter at the end
305        let mut r =
306            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
307        r.buf.copy_from_slice(b"000-+!");
308        assert_eq!(
309            r.find_partial_delimiter(),
310            (3, 3),
311            "Failed to find entire delimiter"
312        );
313
314        // Sixth, check that we properly match the final delimiter, not one earlier
315        let mut r =
316            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
317        r.buf.copy_from_slice(b"-+!-+!");
318        assert_eq!(
319            r.find_partial_delimiter(),
320            (3, 3),
321            "Failed to find last entire delimiter"
322        );
323
324        // Seventh, check that we properly match the final partial delimiter, not one earlier
325        let mut r =
326            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
327        r.buf.copy_from_slice(b"0-+!-+");
328        assert_eq!(
329            r.find_partial_delimiter(),
330            (4, 2),
331            "Failed to find last partial delimiter"
332        );
333    }
334
335    #[test]
336    fn delimiter_reader_should_fill_provided_buffer_if_found_delimiter() {
337        let delimiter = b"</test>";
338        let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
339        let mut cursor = {
340            let mut reader = Vec::new();
341            reader.extend_from_slice(&data);
342            reader.extend_from_slice(delimiter);
343            Cursor::new(reader)
344        };
345
346        // Create our reader that supports the entire size of our data,
347        // not including the delimiter
348        let mut delimiter_reader =
349            DelimiterReader::new_with_delimiter(data.len(), delimiter);
350
351        // Perform the read, gathering all of the data at once
352        let mut buf = vec![0; data.len()];
353        assert_eq!(
354            delimiter_reader
355                .read(&mut buf, read_from_cursor(&mut cursor))
356                .unwrap(),
357            data.len()
358        );
359        assert_eq!(buf, data);
360    }
361
362    #[test]
363    fn delimiter_reader_should_support_data_less_than_max_size() {
364        let delimiter = b"</test>";
365        let max_data_size = 10;
366        let mut cursor = {
367            let mut reader = Vec::new();
368            reader.extend(vec![1]);
369            reader.extend_from_slice(delimiter);
370            Cursor::new(reader)
371        };
372
373        // Create our reader that supports the entire size of our data,
374        // not including the delimiter
375        let mut delimiter_reader =
376            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
377
378        // Read first data with delimiter
379        let mut buf = vec![0; max_data_size];
380        let size = delimiter_reader
381            .read(&mut buf, read_from_cursor(&mut cursor))
382            .unwrap();
383        assert_eq!(buf[..size], vec![1][..]);
384        assert_eq!(buf[size..], vec![0; max_data_size - size][..]);
385    }
386
387    #[test]
388    fn delimiter_reader_should_support_data_more_than_max_size_by_truncating_earlier_data(
389    ) {
390        let delimiter = b"</test>";
391        let max_data_size = 3;
392        let mut cursor = {
393            let mut reader = Vec::new();
394            reader.extend(vec![1, 2, 3, 4, 5]);
395            reader.extend_from_slice(delimiter);
396            Cursor::new(reader)
397        };
398
399        // Create our reader that supports the entire size of our data,
400        // not including the delimiter
401        let mut delimiter_reader =
402            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
403        let mut buf = vec![0; max_data_size];
404
405        // First read cannot fit all data and thereby doesn't find the delimiter,
406        // so will yield 0 bytes
407        let size = delimiter_reader
408            .read(&mut buf, read_from_cursor(&mut cursor))
409            .unwrap();
410        assert_eq!(size, 0);
411
412        // Second read should acquire the remainder of the data, find the delimiter,
413        // and yield the read data size
414        let size = delimiter_reader
415            .read(&mut buf, read_from_cursor(&mut cursor))
416            .unwrap();
417        assert_eq!(buf[..size], vec![3, 4, 5][..]);
418    }
419
420    #[test]
421    fn delimiter_reader_should_support_multiple_delimiters_being_encountered() {
422        let delimiter = b"</test>";
423        let max_data_size = 3;
424        let mut cursor = {
425            let mut reader = Vec::new();
426            reader.extend(vec![1]);
427            reader.extend_from_slice(delimiter);
428            reader.extend(vec![4, 5, 6]);
429            reader.extend_from_slice(delimiter);
430            reader.extend(vec![2, 3]);
431            reader.extend_from_slice(delimiter);
432            Cursor::new(reader)
433        };
434
435        // Create our reader that supports the entire size of our data,
436        // not including the delimiter
437        let mut delimiter_reader =
438            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
439
440        // Read first data with delimiter
441        let mut buf = vec![0; max_data_size];
442        let size = delimiter_reader
443            .read(&mut buf, read_from_cursor(&mut cursor))
444            .unwrap();
445        assert_eq!(buf[..size], vec![1][..]);
446
447        // Read second data with delimiter
448        let mut buf = vec![0; max_data_size];
449        let size = delimiter_reader
450            .read(&mut buf, read_from_cursor(&mut cursor))
451            .unwrap();
452        assert_eq!(buf[..size], vec![4, 5, 6][..]);
453
454        // Read third data with delimiter
455        let mut buf = vec![0; max_data_size];
456        let size = delimiter_reader
457            .read(&mut buf, read_from_cursor(&mut cursor))
458            .unwrap();
459        assert_eq!(buf[..size], vec![2, 3][..]);
460    }
461
462    #[test]
463    fn delimiter_reader_should_support_multiple_delimited_chunks_in_one_read() {
464        let delimiter = b"</test>";
465
466        // Make the max size (of our internal buffer) much bigger than all data
467        let max_data_size = 100;
468        let mut cursor = {
469            let mut reader = Vec::new();
470            reader.extend(vec![1]);
471            reader.extend_from_slice(delimiter);
472            reader.extend(vec![4, 5, 6]);
473            reader.extend_from_slice(delimiter);
474            reader.extend(vec![2, 3]);
475            reader.extend_from_slice(delimiter);
476            Cursor::new(reader)
477        };
478
479        // Create our reader that supports the entire size of our data,
480        // not including the delimiter
481        let mut delimiter_reader =
482            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
483
484        // Read first data with delimiter
485        let mut buf = vec![0; max_data_size];
486        let size = delimiter_reader
487            .read(&mut buf, read_from_cursor(&mut cursor))
488            .unwrap();
489        assert_eq!(buf[..size], vec![1][..]);
490
491        // Read second data with delimiter
492        let mut buf = vec![0; max_data_size];
493        let size = delimiter_reader
494            .read(&mut buf, read_from_cursor(&mut cursor))
495            .unwrap();
496        assert_eq!(buf[..size], vec![4, 5, 6][..]);
497
498        // Read third data with delimiter
499        let mut buf = vec![0; max_data_size];
500        let size = delimiter_reader
501            .read(&mut buf, read_from_cursor(&mut cursor))
502            .unwrap();
503        assert_eq!(buf[..size], vec![2, 3][..]);
504    }
505
506    #[test]
507    fn delimiter_reader_should_continue_using_internal_buffer_even_if_internal_reader_fails(
508    ) {
509        // Delimiter (7 bytes) * 2 + Data (1 byte) * 2 = 2 writes
510        let delimiter = b"</test>";
511        let max_data_size = 9;
512
513        // Prep our reader to have a certain state where data is still available
514        // internally but the underlying reader will always yield an error
515        let mut delimiter_reader =
516            DelimiterReader::new_with_delimiter(max_data_size, delimiter);
517        delimiter_reader.buf.copy_from_slice(b"0</test>1</test>");
518        delimiter_reader.buf_pos = 0;
519        delimiter_reader.buf_filled = delimiter_reader.buf.len();
520
521        let mut buf = vec![0; max_data_size];
522
523        let size = delimiter_reader
524            .read(&mut buf, empty_nonblocking_read())
525            .unwrap();
526        assert_eq!(&buf[..size], b"0");
527
528        let size = delimiter_reader
529            .read(&mut buf, empty_nonblocking_read())
530            .unwrap();
531        assert_eq!(&buf[..size], b"1");
532
533        let result = delimiter_reader.read(&mut buf, empty_nonblocking_read());
534        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WouldBlock);
535    }
536
537    #[test]
538    fn delimiter_writer_should_send_all_bytes_and_append_the_delimiter() {
539        let delimiter = b"</test>";
540        let mut writer: Vec<u8> = Vec::new();
541        let mut delimiter_writer =
542            DelimiterWriter::new_with_delimiter(delimiter);
543        let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
544
545        // Size should be the data sent not including the delimiter
546        assert_eq!(
547            delimiter_writer
548                .write(&data, write_from_vec(&mut writer))
549                .unwrap(),
550            data.len()
551        );
552
553        // Result should be the data and a delimiter appended
554        data.extend_from_slice(delimiter);
555        assert_eq!(&writer, &data);
556    }
557}