chomp1/buffer/
stateful.rs

1use std::io;
2
3use crate::buffer::data_source::{IteratorDataSource, ReadDataSource};
4use crate::buffer::{
5    Buffer, DataSource, FixedSizeBuffer, InputBuf, RWDataSource, Stream, StreamError,
6};
7use crate::primitives::IntoInner;
8use crate::types::{Input, ParseResult};
9
10bitflags! {
11    #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
12    struct ParserState: u64 {
13        /// The parser which was last run on the buffer did not manage to complete with the data
14        /// available in the buffer.
15        const INCOMPLETE     = 1;
16        /// The buffer did not manage to read any more data from the underlying `Read`
17        /// implementation.
18        const END_OF_INPUT   = 2;
19        /// `parse()` should attempt to read more data whenever the `INCOMPLETE` flag is set.
20        const AUTOMATIC_FILL = 4;
21    }
22}
23
24/// Manages a buffer and data source pair, enabling efficient parsing from a
25/// streaming source.
26#[derive(Debug)]
27pub struct Source<S: DataSource, B: Buffer<S::Item>> {
28    /// Source reader
29    source: S,
30    /// Temporary source
31    buffer: B,
32    /// The requested amount of bytes to be available for reading from the
33    /// buffer
34    request: usize,
35    /// Input state, if end has been reached
36    state: ParserState,
37}
38
39impl<R: io::Read> Source<ReadDataSource<R>, FixedSizeBuffer<u8>> {
40    /// Creates a new `Source` from a `Read` instance with the default
41    /// `FixedSizeBuffer` settings.
42    #[inline]
43    pub fn new(source: R) -> Self {
44        Self::with_buffer(ReadDataSource::new(source), FixedSizeBuffer::new())
45    }
46}
47
48impl<R: io::Read, B: Buffer<u8>> Source<ReadDataSource<R>, B> {
49    /// Creates a new `Source` from `Read` and buffer instances.
50    #[inline]
51    pub fn from_read(source: R, buffer: B) -> Self {
52        Self::with_buffer(ReadDataSource::new(source), buffer)
53    }
54}
55
56impl<RW: io::Read + io::Write> Source<RWDataSource<RW>, FixedSizeBuffer<u8>> {
57    /// Creates a new `Source` from `Read`+`Write` with the default
58    /// `FixedSizeBuffer` settings.
59    #[inline]
60    pub fn new_rw(rwsource: RW) -> Self {
61        Self::with_buffer(RWDataSource::new(rwsource), FixedSizeBuffer::new())
62    }
63}
64
65impl<RW: io::Read + io::Write, B: Buffer<u8>> Source<RWDataSource<RW>, B> {
66    /// Creates a new `Source` from `Read`+`Write` and buffer instances.
67    #[inline]
68    pub fn from_read_write(source: RW, buffer: B) -> Self {
69        Self::with_buffer(RWDataSource::new(source), buffer)
70    }
71}
72
73impl<I: Iterator, B: Buffer<I::Item>> Source<IteratorDataSource<I>, B>
74where
75    I::Item: Copy + PartialEq,
76{
77    /// Creates a new `Source` from `Iterator` and `Buffer` instances.
78    #[inline]
79    pub fn from_iter(source: I, buffer: B) -> Self {
80        Self::with_buffer(IteratorDataSource::new(source), buffer)
81    }
82}
83
84impl<S: DataSource, B: Buffer<S::Item>> Source<S, B> {
85    /// Creates a new `Source` from `DataSource` and `Buffer` instances.
86    #[inline]
87    pub fn with_buffer(source: S, buffer: B) -> Self {
88        Source {
89            source,
90            buffer,
91            request: 0,
92            state: ParserState::INCOMPLETE | ParserState::AUTOMATIC_FILL,
93        }
94    }
95
96    /// Attempts to fill this source so it contains at least ``request`` bytes.
97    #[inline]
98    fn fill_requested(&mut self, request: usize) -> io::Result<usize> {
99        let mut read = 0;
100
101        let buffer = &mut self.buffer;
102        let source = &mut self.source;
103
104        if buffer.len() < request {
105            let diff = request - buffer.len();
106
107            buffer.request_space(diff);
108
109            while buffer.len() < request {
110                match buffer.fill(source)? {
111                    0 => break,
112                    n => read += n,
113                }
114            }
115        }
116
117        Ok(read)
118    }
119
120    /// Attempts to fill the buffer to satisfy the last call to `parse()`.
121    #[inline]
122    pub fn fill(&mut self) -> io::Result<usize> {
123        let req = self.buffer.len() + 1;
124
125        self.fill_requested(req).map(|n| {
126            self.state.remove(ParserState::INCOMPLETE);
127
128            if n > 0 {
129                self.state.remove(ParserState::END_OF_INPUT);
130            } else {
131                self.state.insert(ParserState::END_OF_INPUT);
132            }
133
134            n
135        })
136    }
137
138    /// Returns the number of bytes left in the buffer which have not yet been
139    /// parsed.
140    #[inline]
141    pub fn len(&self) -> usize {
142        self.buffer.len()
143    }
144
145    /// If the buffer is empty and the reader has reached the end.
146    #[inline]
147    pub fn is_empty(&self) -> bool {
148        self.state.contains(ParserState::END_OF_INPUT) && self.len() == 0
149    }
150
151    /// Returns the capacity of the underlying buffer.
152    ///
153    /// This is the maximum number of input items the buffer can store.
154    #[inline]
155    pub fn capacity(&self) -> usize {
156        self.buffer.capacity()
157    }
158
159    /// Borrows the remainder of the buffer.
160    #[inline]
161    pub fn buffer(&self) -> &[S::Item] {
162        &self.buffer
163    }
164
165    /// Resets the buffer state, keeping the current buffer contents and cursor
166    /// position.
167    ///
168    /// This is useful when streaming data and more data has been made available
169    /// on a socket/stream.
170    #[inline]
171    pub fn reset(&mut self) {
172        self.state = ParserState::empty();
173    }
174
175    /// Changes the setting automatic fill feature, `true` will make the buffer
176    /// automatically call `fill()` on the next call to `parse()` after a
177    /// `Retry` was encountered.
178    // TODO: Make a part of the constructor/builder
179    #[inline]
180    pub fn set_autofill(&mut self, value: bool) {
181        if value {
182            self.state.insert(ParserState::AUTOMATIC_FILL)
183        } else {
184            self.state.remove(ParserState::AUTOMATIC_FILL)
185        }
186    }
187}
188
189impl<S: DataSource<Item = u8>, B: Buffer<u8>> io::Read for Source<S, B> {
190    #[inline]
191    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
192        if buf.len() > self.len() {
193            self.fill_requested(buf.len())?;
194        }
195
196        (&self.buffer[..]).read(buf).map(|n| {
197            self.buffer.consume(n);
198
199            n
200        })
201    }
202}
203
204impl<S: DataSource<Item = u8>, B: Buffer<u8>> io::BufRead for Source<S, B> {
205    #[inline]
206    fn fill_buf(&mut self) -> io::Result<&[u8]> {
207        let cap = self.buffer.capacity();
208
209        self.fill_requested(cap)?;
210
211        Ok(self.buffer())
212    }
213
214    #[inline]
215    fn consume(&mut self, num: usize) {
216        self.buffer.consume(num)
217    }
218}
219
220impl<RW: io::Read + io::Write, B: Buffer<u8>> io::Write for Source<RWDataSource<RW>, B> {
221    #[inline]
222    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
223        self.source.write(buf)
224    }
225
226    #[inline]
227    fn flush(&mut self) -> io::Result<()> {
228        self.source.flush()
229    }
230}
231
232impl<'a, S: DataSource, B: Buffer<S::Item>> Stream<'a, 'a> for Source<S, B>
233where
234    S::Item: 'a,
235{
236    type Input = InputBuf<'a, S::Item>;
237
238    #[inline]
239    fn parse<F, T, E>(
240        &'a mut self,
241        f: F,
242    ) -> Result<T, StreamError<<Self::Input as Input>::Buffer, E>>
243    where
244        F: FnOnce(Self::Input) -> ParseResult<Self::Input, T, E>,
245        T: 'a,
246        E: 'a,
247    {
248        use crate::primitives::Primitives;
249
250        if self
251            .state
252            .contains(ParserState::INCOMPLETE | ParserState::AUTOMATIC_FILL)
253        {
254            self.fill().map_err(StreamError::IoError)?;
255        }
256
257        if self.is_empty() {
258            return Err(StreamError::EndOfInput);
259        }
260
261        match f(InputBuf::new(&self.buffer)).into_inner() {
262            (remainder, Ok(data)) => {
263                if remainder.is_incomplete() && !self.state.contains(ParserState::END_OF_INPUT) {
264                    // We can't accept this since we might have hit a premature end
265                    self.state.insert(ParserState::INCOMPLETE);
266
267                    Err(StreamError::Retry)
268                } else {
269                    // TODO: Do something neater with the remainder
270                    self.buffer.consume(self.buffer.len() - remainder.len());
271
272                    Ok(data)
273                }
274            }
275            (mut remainder, Err(err)) => {
276                match (
277                    remainder.is_incomplete(),
278                    self.state.contains(ParserState::END_OF_INPUT),
279                ) {
280                    (true, true) => Err(StreamError::Incomplete),
281                    (true, false) => {
282                        self.state.insert(ParserState::INCOMPLETE);
283
284                        Err(StreamError::Retry)
285                    }
286                    _ => {
287                        // TODO: Do something neater with the remainder
288                        // TODO: Detail this behaviour, maybe make it configurable
289                        self.buffer.consume(self.buffer.len() - remainder.len());
290
291                        Err(StreamError::ParseError(remainder.consume_remaining(), err))
292                    }
293                }
294            }
295        }
296    }
297}
298
299#[cfg(test)]
300mod test {
301    use std::io;
302
303    use super::*;
304    use crate::buffer::data_source::ReadDataSource;
305    use crate::buffer::{FixedSizeBuffer, Stream, StreamError};
306    use crate::parsers::{any, take, take_while, Error};
307    use crate::types::Input;
308
309    fn buf(
310        source: &[u8],
311        buffer_length: usize,
312    ) -> Source<ReadDataSource<io::Cursor<&[u8]>>, FixedSizeBuffer<u8>> {
313        Source::with_buffer(
314            ReadDataSource::new(io::Cursor::new(source)),
315            FixedSizeBuffer::with_size(buffer_length),
316        )
317    }
318
319    #[test]
320    #[should_panic]
321    fn bufsize_zero() {
322        let _ = buf(&b"this is a test"[..], 0);
323    }
324
325    #[test]
326    fn default_bufsize() {
327        let b: Source<_, FixedSizeBuffer<_>> = Source::new(io::Cursor::new(&b"test"[..]));
328
329        assert!(b.capacity() > 0);
330    }
331
332    #[test]
333    fn empty_buf() {
334        let mut n = 0;
335        let mut b = Source::new(io::Cursor::new(&b""[..]));
336
337        let r = b.parse(|i| {
338            n += 1;
339
340            take(i, 1).bind(|i, _| i.ret::<_, Error<_>>(true))
341        });
342
343        assert_eq!(r, Err(StreamError::EndOfInput));
344        assert_eq!(n, 0);
345    }
346
347    #[test]
348    fn fill() {
349        let mut n = 0; // Times it has entered the parsing function
350        let mut m = 0; // Times it has managed to get past the request for data
351        let mut b = buf(&b"test"[..], 1);
352
353        assert_eq!(
354            b.parse(|i| {
355                n += 1;
356                any(i).inspect(|_| m += 1)
357            }),
358            Ok(b't')
359        );
360        assert_eq!(n, 1);
361        assert_eq!(m, 1);
362        assert_eq!(
363            b.parse(|i| {
364                n += 1;
365                any(i).inspect(|_| m += 1)
366            }),
367            Err(StreamError::Retry)
368        );
369        assert_eq!(n, 2);
370        assert_eq!(m, 1);
371        assert_eq!(
372            b.parse(|i| {
373                n += 1;
374                any(i).inspect(|_| m += 1)
375            }),
376            Ok(b'e')
377        );
378        assert_eq!(n, 3);
379        assert_eq!(m, 2);
380        assert_eq!(
381            b.parse(|i| {
382                n += 1;
383                any(i).inspect(|_| m += 1)
384            }),
385            Err(StreamError::Retry)
386        );
387        assert_eq!(n, 4);
388        assert_eq!(m, 2);
389        assert_eq!(
390            b.parse(|i| {
391                n += 1;
392                any(i).inspect(|_| m += 1)
393            }),
394            Ok(b's')
395        );
396        assert_eq!(n, 5);
397        assert_eq!(m, 3);
398        assert_eq!(
399            b.parse(|i| {
400                n += 1;
401                any(i).inspect(|_| m += 1)
402            }),
403            Err(StreamError::Retry)
404        );
405        assert_eq!(n, 6);
406        assert_eq!(m, 3);
407        assert_eq!(
408            b.parse(|i| {
409                n += 1;
410                any(i).inspect(|_| m += 1)
411            }),
412            Ok(b't')
413        );
414        assert_eq!(n, 7);
415        assert_eq!(m, 4);
416        assert_eq!(
417            b.parse(|i| {
418                n += 1;
419                any(i).inspect(|_| m += 1)
420            }),
421            Err(StreamError::Retry)
422        );
423        assert_eq!(n, 8);
424        assert_eq!(m, 4);
425        assert_eq!(
426            b.parse(|i| {
427                n += 1;
428                any(i).inspect(|_| m += 1)
429            }),
430            Err(StreamError::EndOfInput)
431        );
432        assert_eq!(n, 8);
433        assert_eq!(m, 4);
434        assert_eq!(
435            b.parse(|i| {
436                n += 1;
437                any(i).inspect(|_| m += 1)
438            }),
439            Err(StreamError::EndOfInput)
440        );
441        assert_eq!(n, 8);
442        assert_eq!(m, 4);
443    }
444
445    #[test]
446    fn fill2() {
447        let mut n = 0; // Times it has entered the parsing function
448        let mut m = 0; // Times it has managed to get past the request for data
449        let mut b = buf(&b"test"[..], 2);
450
451        assert_eq!(
452            b.parse(|i| {
453                n += 1;
454                any(i).inspect(|_| m += 1)
455            }),
456            Ok(b't')
457        );
458        assert_eq!(n, 1);
459        assert_eq!(m, 1);
460        assert_eq!(
461            b.parse(|i| {
462                n += 1;
463                any(i).inspect(|_| m += 1)
464            }),
465            Ok(b'e')
466        );
467        assert_eq!(n, 2);
468        assert_eq!(m, 2);
469        assert_eq!(
470            b.parse(|i| {
471                n += 1;
472                any(i).inspect(|_| m += 1)
473            }),
474            Err(StreamError::Retry)
475        );
476        assert_eq!(n, 3);
477        assert_eq!(m, 2);
478        assert_eq!(
479            b.parse(|i| {
480                n += 1;
481                any(i).inspect(|_| m += 1)
482            }),
483            Ok(b's')
484        );
485        assert_eq!(n, 4);
486        assert_eq!(m, 3);
487        assert_eq!(
488            b.parse(|i| {
489                n += 1;
490                any(i).inspect(|_| m += 1)
491            }),
492            Ok(b't')
493        );
494        assert_eq!(n, 5);
495        assert_eq!(m, 4);
496        assert_eq!(
497            b.parse(|i| {
498                n += 1;
499                any(i).inspect(|_| m += 1)
500            }),
501            Err(StreamError::Retry)
502        );
503        assert_eq!(n, 6);
504        assert_eq!(m, 4);
505        assert_eq!(
506            b.parse(|i| {
507                n += 1;
508                any(i).inspect(|_| m += 1)
509            }),
510            Err(StreamError::EndOfInput)
511        );
512        assert_eq!(n, 6);
513        assert_eq!(m, 4);
514        assert_eq!(
515            b.parse(|i| {
516                n += 1;
517                any(i).inspect(|_| m += 1)
518            }),
519            Err(StreamError::EndOfInput)
520        );
521        assert_eq!(n, 6);
522        assert_eq!(m, 4);
523    }
524
525    #[test]
526    fn fill3() {
527        let mut n = 0; // Times it has entered the parsing function
528        let mut m = 0; // Times it has managed to get past the request for data
529        let mut b = buf(&b"test"[..], 3);
530
531        assert_eq!(
532            b.parse(|i| {
533                n += 1;
534                take(i, 2).inspect(|_| m += 1)
535            }),
536            Ok(&b"te"[..])
537        );
538        assert_eq!(n, 1);
539        assert_eq!(m, 1);
540        assert_eq!(
541            b.parse(|i| {
542                n += 1;
543                take(i, 2).inspect(|_| m += 1)
544            }),
545            Err(StreamError::Retry)
546        );
547        assert_eq!(n, 2);
548        assert_eq!(m, 1);
549        assert_eq!(
550            b.parse(|i| {
551                n += 1;
552                take(i, 2).inspect(|_| m += 1)
553            }),
554            Ok(&b"st"[..])
555        );
556        assert_eq!(n, 3);
557        assert_eq!(m, 2);
558        assert_eq!(
559            b.parse(|i| {
560                n += 1;
561                take(i, 2).inspect(|_| m += 1)
562            }),
563            Err(StreamError::Retry)
564        );
565        assert_eq!(n, 4);
566        assert_eq!(m, 2);
567        assert_eq!(
568            b.parse(|i| {
569                n += 1;
570                take(i, 2).inspect(|_| m += 1)
571            }),
572            Err(StreamError::EndOfInput)
573        );
574        assert_eq!(n, 4);
575        assert_eq!(m, 2);
576        assert_eq!(
577            b.parse(|i| {
578                n += 1;
579                take(i, 2).inspect(|_| m += 1)
580            }),
581            Err(StreamError::EndOfInput)
582        );
583        assert_eq!(n, 4);
584        assert_eq!(m, 2);
585    }
586
587    #[test]
588    fn incomplete() {
589        let mut n = 0; // Times it has entered the parsing function
590        let mut m = 0; // Times it has managed to get past the request for data
591        let mut b = buf(&b"tes"[..], 2);
592
593        assert_eq!(
594            b.parse(|i| {
595                n += 1;
596                take(i, 2).inspect(|_| m += 1)
597            }),
598            Ok(&b"te"[..])
599        );
600        assert_eq!(n, 1);
601        assert_eq!(m, 1);
602        assert_eq!(
603            b.parse(|i| {
604                n += 1;
605                take(i, 2).inspect(|_| m += 1)
606            }),
607            Err(StreamError::Retry)
608        );
609        assert_eq!(n, 2);
610        assert_eq!(m, 1);
611        assert_eq!(
612            b.parse(|i| {
613                n += 1;
614                take(i, 2).inspect(|_| m += 1)
615            }),
616            Err(StreamError::Retry)
617        );
618        assert_eq!(n, 3);
619        assert_eq!(m, 1);
620        assert_eq!(
621            b.parse(|i| {
622                n += 1;
623                take(i, 2).inspect(|_| m += 1)
624            }),
625            Err(StreamError::Incomplete)
626        );
627        assert_eq!(n, 4);
628        assert_eq!(m, 1);
629        assert_eq!(
630            b.parse(|i| {
631                n += 1;
632                take(i, 2).inspect(|_| m += 1)
633            }),
634            Err(StreamError::Incomplete)
635        );
636        assert_eq!(n, 5);
637        assert_eq!(m, 1);
638    }
639
640    #[test]
641    fn incomplete2() {
642        let mut o = 0;
643        let mut n = 0; // Times it has entered the parsing function
644        let mut m = 0; // Times it has managed to get past the request for data
645        let mut b = buf(&b"tes"[..], 2);
646
647        assert_eq!(
648            b.parse(|i| {
649                n += 1;
650                take_while(i, |_| {
651                    o += 1;
652                    o < 2
653                })
654                .inspect(|_| m += 1)
655            }),
656            Ok(&b"t"[..])
657        );
658        assert_eq!(n, 1);
659        assert_eq!(m, 1);
660        o = 0;
661        assert_eq!(
662            b.parse(|i| {
663                n += 1;
664                take_while(i, |_| {
665                    o += 1;
666                    o < 2
667                })
668                .inspect(|_| m += 1)
669            }),
670            Err(StreamError::Retry)
671        );
672        assert_eq!(n, 2);
673        assert_eq!(m, 2);
674        o = 0;
675        assert_eq!(
676            b.parse(|i| {
677                n += 1;
678                take_while(i, |_| {
679                    o += 1;
680                    o < 2
681                })
682                .inspect(|_| m += 1)
683            }),
684            Ok(&b"e"[..])
685        );
686        assert_eq!(n, 3);
687        assert_eq!(m, 3);
688        o = 0;
689        assert_eq!(
690            b.parse(|i| {
691                n += 1;
692                take_while(i, |_| {
693                    o += 1;
694                    o < 2
695                })
696                .inspect(|_| m += 1)
697            }),
698            Err(StreamError::Retry)
699        );
700        assert_eq!(n, 4);
701        assert_eq!(m, 4);
702        o = 0;
703        assert_eq!(
704            b.parse(|i| {
705                n += 1;
706                take_while(i, |_| {
707                    o += 1;
708                    o < 2
709                })
710                .inspect(|_| m += 1)
711            }),
712            Ok(&b"s"[..])
713        );
714        assert_eq!(n, 5);
715        assert_eq!(m, 5);
716        o = 0;
717        assert_eq!(
718            b.parse(|i| {
719                n += 1;
720                take_while(i, |_| {
721                    o += 1;
722                    o < 2
723                })
724                .inspect(|_| m += 1)
725            }),
726            Err(StreamError::EndOfInput)
727        );
728        assert_eq!(n, 5);
729        assert_eq!(m, 5);
730    }
731
732    #[test]
733    fn no_autofill() {
734        let mut n = 0; // Times it has entered the parsing function
735        let mut m = 0; // Times it has managed to get past the request for data
736        let mut b = buf(&b"test"[..], 2);
737
738        b.set_autofill(false);
739
740        assert_eq!(
741            b.parse(|i| {
742                n += 1;
743                take(i, 2).inspect(|_| m += 1)
744            }),
745            Err(StreamError::Retry)
746        );
747        assert_eq!(n, 1);
748        assert_eq!(m, 0);
749
750        assert_eq!(b.fill().unwrap(), 2);
751
752        assert_eq!(
753            b.parse(|i| {
754                n += 1;
755                take(i, 2).inspect(|_| m += 1)
756            }),
757            Ok(&b"te"[..])
758        );
759        assert_eq!(n, 2);
760        assert_eq!(m, 1);
761        assert_eq!(
762            b.parse(|i| {
763                n += 1;
764                take(i, 2).inspect(|_| m += 1)
765            }),
766            Err(StreamError::Retry)
767        );
768        assert_eq!(n, 3);
769        assert_eq!(m, 1);
770
771        assert_eq!(b.fill().unwrap(), 2);
772
773        assert_eq!(
774            b.parse(|i| {
775                n += 1;
776                take(i, 2).inspect(|_| m += 1)
777            }),
778            Ok(&b"st"[..])
779        );
780        assert_eq!(n, 4);
781        assert_eq!(m, 2);
782        assert_eq!(
783            b.parse(|i| {
784                n += 1;
785                take(i, 2).inspect(|_| m += 1)
786            }),
787            Err(StreamError::Retry)
788        );
789        assert_eq!(n, 5);
790        assert_eq!(m, 2);
791
792        assert_eq!(b.fill().unwrap(), 0);
793
794        assert_eq!(
795            b.parse(|i| {
796                n += 1;
797                take(i, 2).inspect(|_| m += 1)
798            }),
799            Err(StreamError::EndOfInput)
800        );
801        assert_eq!(n, 5);
802        assert_eq!(m, 2);
803    }
804
805    #[test]
806    fn no_autofill_first() {
807        let mut n = 0; // Times it has entered the parsing function
808        let mut m = 0; // Times it has managed to get past the request for data
809        let mut b = buf(&b"ab"[..], 1);
810
811        b.set_autofill(false);
812
813        assert_eq!(b.fill().unwrap(), 1);
814
815        assert_eq!(
816            b.parse(|i| {
817                n += 1;
818                any(i).inspect(|_| m += 1)
819            }),
820            Ok(b'a')
821        );
822        assert_eq!(n, 1);
823        assert_eq!(m, 1);
824        assert_eq!(
825            b.parse(|i| {
826                n += 1;
827                any(i).inspect(|_| m += 1)
828            }),
829            Err(StreamError::Retry)
830        );
831        assert_eq!(n, 2);
832        assert_eq!(m, 1);
833
834        assert_eq!(b.fill().unwrap(), 1);
835
836        assert_eq!(
837            b.parse(|i| {
838                n += 1;
839                any(i).inspect(|_| m += 1)
840            }),
841            Ok(b'b')
842        );
843        assert_eq!(n, 3);
844        assert_eq!(m, 2);
845        assert_eq!(
846            b.parse(|i| {
847                n += 1;
848                any(i).inspect(|_| m += 1)
849            }),
850            Err(StreamError::Retry)
851        );
852        assert_eq!(n, 4);
853        assert_eq!(m, 2);
854
855        assert_eq!(b.fill().unwrap(), 0);
856
857        assert_eq!(
858            b.parse(|i| {
859                n += 1;
860                any(i).inspect(|_| m += 1)
861            }),
862            Err(StreamError::EndOfInput)
863        );
864        assert_eq!(n, 4);
865        assert_eq!(m, 2);
866    }
867}