1use std::future::Future;
2use std::io;
3
4pub const DEFAULT_DELIMITER: &[u8] = b"</>";
5
6#[derive(Clone, Copy)]
7struct PreReadResult {
8 buf_len: usize,
9 delimiter_len: usize,
10}
11
12pub struct DelimiterReader {
15 buf: Box<[u8]>,
17
18 buf_pos: usize,
20
21 buf_filled: usize,
23
24 pub delimiter: Vec<u8>,
26}
27
28impl DelimiterReader {
29 pub fn new_with_delimiter(max_data_size: usize, delimiter: &[u8]) -> Self {
30 Self {
31 buf: vec![0; max_data_size + delimiter.len()].into_boxed_slice(),
32 buf_pos: 0,
33 buf_filled: 0,
34 delimiter: delimiter.to_vec(),
35 }
36 }
37
38 pub fn new(max_data_size: usize) -> Self {
39 Self::new_with_delimiter(max_data_size, DEFAULT_DELIMITER)
40 }
41
42 fn find_partial_delimiter(&self) -> (usize, usize) {
48 let b_len = self.buf.len();
49 let delimiter_len = self.delimiter.len();
50 let mut size = 0;
51 let mut pos = 0;
52
53 for i in (b_len - delimiter_len)..b_len {
54 let l = b_len - i;
55 if self.buf[i..] == self.delimiter[..l] {
56 size = l;
57 pos = i;
58 break;
59 }
60 }
61
62 (pos, size)
63 }
64
65 fn pre_read(&mut self) -> PreReadResult {
68 let buf_len = self.buf.len();
69 let delimiter_len = self.delimiter.len();
70
71 if self.buf_pos > (buf_len - delimiter_len) {
76 let (pd_pos, pdelimiter_len) = self.find_partial_delimiter();
77 let shift_len = delimiter_len - pdelimiter_len;
78 self.buf.rotate_left(shift_len);
79 self.buf_filled -= shift_len;
80 self.buf_pos = pd_pos - shift_len;
81 };
82
83 PreReadResult {
84 buf_len,
85 delimiter_len,
86 }
87 }
88
89 fn post_read(
92 &mut self,
93 data: &mut [u8],
94 bytes_read: Option<&usize>,
95 preread_result: PreReadResult,
96 ) -> usize {
97 let PreReadResult {
98 buf_len,
99 delimiter_len,
100 } = preread_result;
101
102 if let Some(bytes_read) = bytes_read {
104 self.buf_filled += bytes_read;
105 }
106
107 let mut size = 0;
109 if self.buf_filled > 0 {
110 for i in self.buf_pos..=(buf_len - delimiter_len) {
111 if self.buf[i..i + delimiter_len] == self.delimiter[..] {
114 data[..i].copy_from_slice(&self.buf[..i]);
115 for j in &mut self.buf[..i + delimiter_len] {
116 *j = 0;
117 }
118 self.buf.rotate_left(i + delimiter_len);
119 self.buf_filled -= i + delimiter_len;
120 self.buf_pos = 0;
121 size = i;
122 break;
123 }
124
125 self.buf_pos = i + 1;
127 }
128 }
129
130 size
131 }
132
133 pub fn read<F>(&mut self, data: &mut [u8], f: F) -> io::Result<usize>
135 where
136 F: FnOnce(&mut [u8]) -> io::Result<usize>,
137 {
138 let preread_result = self.pre_read();
139
140 let read_result = f(&mut self.buf[self.buf_filled..]);
148
149 let size =
150 self.post_read(data, read_result.as_ref().ok(), preread_result);
151
152 if size == 0 && read_result.is_err() {
155 read_result
156 } else {
157 Ok(size)
158 }
159 }
160
161 pub async fn async_read<R, F>(
163 &mut self,
164 data: &mut [u8],
165 r: R,
166 ) -> io::Result<usize>
167 where
168 R: FnOnce(&mut [u8]) -> F,
169 F: Future<Output = io::Result<usize>>,
170 {
171 let preread_result = self.pre_read();
172
173 let read_result = r(&mut self.buf[self.buf_filled..]).await;
181
182 let size =
183 self.post_read(data, read_result.as_ref().ok(), preread_result);
184
185 if size == 0 && read_result.is_err() {
188 read_result
189 } else {
190 Ok(size)
191 }
192 }
193}
194
195pub struct DelimiterWriter {
198 pub delimiter: Vec<u8>,
199}
200
201impl DelimiterWriter {
202 pub fn new_with_delimiter(delimiter: &[u8]) -> Self {
203 Self {
204 delimiter: delimiter.to_vec(),
205 }
206 }
207
208 pub fn new() -> Self {
209 Self::new_with_delimiter(DEFAULT_DELIMITER)
210 }
211
212 pub fn write<F>(&mut self, data: &[u8], mut f: F) -> io::Result<usize>
213 where
214 F: FnMut(&[u8]) -> io::Result<usize>,
215 {
216 f(data)?;
218
219 f(&self.delimiter)?;
221
222 Ok(data.len())
223 }
224}
225
226impl Default for DelimiterWriter {
227 fn default() -> Self {
228 Self::new()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use std::io::Cursor;
236
237 fn empty_nonblocking_read() -> impl FnOnce(&mut [u8]) -> io::Result<usize> {
238 |_| Err(io::Error::from(io::ErrorKind::WouldBlock))
239 }
240
241 fn read_from_cursor<'a>(
242 c: &'a mut Cursor<Vec<u8>>,
243 ) -> impl FnOnce(&mut [u8]) -> io::Result<usize> + 'a {
244 use std::io::Read;
245 move |data| c.read(data)
246 }
247
248 fn write_from_vec<'a>(
249 v: &'a mut Vec<u8>,
250 ) -> impl FnMut(&[u8]) -> io::Result<usize> + 'a {
251 use std::io::Write;
252 move |data| v.write(data)
253 }
254
255 #[test]
256 fn delimiter_reader_find_partial_delimiter_if_it_exists() {
257 let delimiter = b"-+!";
260 let max_data_size = 3;
261
262 let mut r =
265 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
266 r.buf.copy_from_slice(b"000000");
267 assert_eq!(r.find_partial_delimiter(), (0, 0));
268
269 let mut r =
272 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
273 r.buf.copy_from_slice(b"-+!000");
274 assert_eq!(r.find_partial_delimiter(), (0, 0));
275
276 let mut r =
280 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
281 r.buf.copy_from_slice(b"000-+0");
282 assert_eq!(r.find_partial_delimiter(), (0, 0));
283
284 let mut r =
286 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
287 r.buf.copy_from_slice(b"00000-");
288 assert_eq!(
289 r.find_partial_delimiter(),
290 (5, 1),
291 "Failed to find first byte of delimiter"
292 );
293
294 let mut r =
296 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
297 r.buf.copy_from_slice(b"0000-+");
298 assert_eq!(
299 r.find_partial_delimiter(),
300 (4, 2),
301 "Failed to find multiple bytes of delimiter"
302 );
303
304 let mut r =
306 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
307 r.buf.copy_from_slice(b"000-+!");
308 assert_eq!(
309 r.find_partial_delimiter(),
310 (3, 3),
311 "Failed to find entire delimiter"
312 );
313
314 let mut r =
316 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
317 r.buf.copy_from_slice(b"-+!-+!");
318 assert_eq!(
319 r.find_partial_delimiter(),
320 (3, 3),
321 "Failed to find last entire delimiter"
322 );
323
324 let mut r =
326 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
327 r.buf.copy_from_slice(b"0-+!-+");
328 assert_eq!(
329 r.find_partial_delimiter(),
330 (4, 2),
331 "Failed to find last partial delimiter"
332 );
333 }
334
335 #[test]
336 fn delimiter_reader_should_fill_provided_buffer_if_found_delimiter() {
337 let delimiter = b"</test>";
338 let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
339 let mut cursor = {
340 let mut reader = Vec::new();
341 reader.extend_from_slice(&data);
342 reader.extend_from_slice(delimiter);
343 Cursor::new(reader)
344 };
345
346 let mut delimiter_reader =
349 DelimiterReader::new_with_delimiter(data.len(), delimiter);
350
351 let mut buf = vec![0; data.len()];
353 assert_eq!(
354 delimiter_reader
355 .read(&mut buf, read_from_cursor(&mut cursor))
356 .unwrap(),
357 data.len()
358 );
359 assert_eq!(buf, data);
360 }
361
362 #[test]
363 fn delimiter_reader_should_support_data_less_than_max_size() {
364 let delimiter = b"</test>";
365 let max_data_size = 10;
366 let mut cursor = {
367 let mut reader = Vec::new();
368 reader.extend(vec![1]);
369 reader.extend_from_slice(delimiter);
370 Cursor::new(reader)
371 };
372
373 let mut delimiter_reader =
376 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
377
378 let mut buf = vec![0; max_data_size];
380 let size = delimiter_reader
381 .read(&mut buf, read_from_cursor(&mut cursor))
382 .unwrap();
383 assert_eq!(buf[..size], vec![1][..]);
384 assert_eq!(buf[size..], vec![0; max_data_size - size][..]);
385 }
386
387 #[test]
388 fn delimiter_reader_should_support_data_more_than_max_size_by_truncating_earlier_data(
389 ) {
390 let delimiter = b"</test>";
391 let max_data_size = 3;
392 let mut cursor = {
393 let mut reader = Vec::new();
394 reader.extend(vec![1, 2, 3, 4, 5]);
395 reader.extend_from_slice(delimiter);
396 Cursor::new(reader)
397 };
398
399 let mut delimiter_reader =
402 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
403 let mut buf = vec![0; max_data_size];
404
405 let size = delimiter_reader
408 .read(&mut buf, read_from_cursor(&mut cursor))
409 .unwrap();
410 assert_eq!(size, 0);
411
412 let size = delimiter_reader
415 .read(&mut buf, read_from_cursor(&mut cursor))
416 .unwrap();
417 assert_eq!(buf[..size], vec![3, 4, 5][..]);
418 }
419
420 #[test]
421 fn delimiter_reader_should_support_multiple_delimiters_being_encountered() {
422 let delimiter = b"</test>";
423 let max_data_size = 3;
424 let mut cursor = {
425 let mut reader = Vec::new();
426 reader.extend(vec![1]);
427 reader.extend_from_slice(delimiter);
428 reader.extend(vec![4, 5, 6]);
429 reader.extend_from_slice(delimiter);
430 reader.extend(vec![2, 3]);
431 reader.extend_from_slice(delimiter);
432 Cursor::new(reader)
433 };
434
435 let mut delimiter_reader =
438 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
439
440 let mut buf = vec![0; max_data_size];
442 let size = delimiter_reader
443 .read(&mut buf, read_from_cursor(&mut cursor))
444 .unwrap();
445 assert_eq!(buf[..size], vec![1][..]);
446
447 let mut buf = vec![0; max_data_size];
449 let size = delimiter_reader
450 .read(&mut buf, read_from_cursor(&mut cursor))
451 .unwrap();
452 assert_eq!(buf[..size], vec![4, 5, 6][..]);
453
454 let mut buf = vec![0; max_data_size];
456 let size = delimiter_reader
457 .read(&mut buf, read_from_cursor(&mut cursor))
458 .unwrap();
459 assert_eq!(buf[..size], vec![2, 3][..]);
460 }
461
462 #[test]
463 fn delimiter_reader_should_support_multiple_delimited_chunks_in_one_read() {
464 let delimiter = b"</test>";
465
466 let max_data_size = 100;
468 let mut cursor = {
469 let mut reader = Vec::new();
470 reader.extend(vec![1]);
471 reader.extend_from_slice(delimiter);
472 reader.extend(vec![4, 5, 6]);
473 reader.extend_from_slice(delimiter);
474 reader.extend(vec![2, 3]);
475 reader.extend_from_slice(delimiter);
476 Cursor::new(reader)
477 };
478
479 let mut delimiter_reader =
482 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
483
484 let mut buf = vec![0; max_data_size];
486 let size = delimiter_reader
487 .read(&mut buf, read_from_cursor(&mut cursor))
488 .unwrap();
489 assert_eq!(buf[..size], vec![1][..]);
490
491 let mut buf = vec![0; max_data_size];
493 let size = delimiter_reader
494 .read(&mut buf, read_from_cursor(&mut cursor))
495 .unwrap();
496 assert_eq!(buf[..size], vec![4, 5, 6][..]);
497
498 let mut buf = vec![0; max_data_size];
500 let size = delimiter_reader
501 .read(&mut buf, read_from_cursor(&mut cursor))
502 .unwrap();
503 assert_eq!(buf[..size], vec![2, 3][..]);
504 }
505
506 #[test]
507 fn delimiter_reader_should_continue_using_internal_buffer_even_if_internal_reader_fails(
508 ) {
509 let delimiter = b"</test>";
511 let max_data_size = 9;
512
513 let mut delimiter_reader =
516 DelimiterReader::new_with_delimiter(max_data_size, delimiter);
517 delimiter_reader.buf.copy_from_slice(b"0</test>1</test>");
518 delimiter_reader.buf_pos = 0;
519 delimiter_reader.buf_filled = delimiter_reader.buf.len();
520
521 let mut buf = vec![0; max_data_size];
522
523 let size = delimiter_reader
524 .read(&mut buf, empty_nonblocking_read())
525 .unwrap();
526 assert_eq!(&buf[..size], b"0");
527
528 let size = delimiter_reader
529 .read(&mut buf, empty_nonblocking_read())
530 .unwrap();
531 assert_eq!(&buf[..size], b"1");
532
533 let result = delimiter_reader.read(&mut buf, empty_nonblocking_read());
534 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WouldBlock);
535 }
536
537 #[test]
538 fn delimiter_writer_should_send_all_bytes_and_append_the_delimiter() {
539 let delimiter = b"</test>";
540 let mut writer: Vec<u8> = Vec::new();
541 let mut delimiter_writer =
542 DelimiterWriter::new_with_delimiter(delimiter);
543 let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
544
545 assert_eq!(
547 delimiter_writer
548 .write(&data, write_from_vec(&mut writer))
549 .unwrap(),
550 data.len()
551 );
552
553 data.extend_from_slice(delimiter);
555 assert_eq!(&writer, &data);
556 }
557}