1use std::io;
2
3use crate::buffer::data_source::{IteratorDataSource, ReadDataSource};
4use crate::buffer::{
5 Buffer, DataSource, FixedSizeBuffer, InputBuf, RWDataSource, Stream, StreamError,
6};
7use crate::primitives::IntoInner;
8use crate::types::{Input, ParseResult};
9
10bitflags! {
11 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
12 struct ParserState: u64 {
13 const INCOMPLETE = 1;
16 const END_OF_INPUT = 2;
19 const AUTOMATIC_FILL = 4;
21 }
22}
23
24#[derive(Debug)]
27pub struct Source<S: DataSource, B: Buffer<S::Item>> {
28 source: S,
30 buffer: B,
32 request: usize,
35 state: ParserState,
37}
38
39impl<R: io::Read> Source<ReadDataSource<R>, FixedSizeBuffer<u8>> {
40 #[inline]
43 pub fn new(source: R) -> Self {
44 Self::with_buffer(ReadDataSource::new(source), FixedSizeBuffer::new())
45 }
46}
47
48impl<R: io::Read, B: Buffer<u8>> Source<ReadDataSource<R>, B> {
49 #[inline]
51 pub fn from_read(source: R, buffer: B) -> Self {
52 Self::with_buffer(ReadDataSource::new(source), buffer)
53 }
54}
55
56impl<RW: io::Read + io::Write> Source<RWDataSource<RW>, FixedSizeBuffer<u8>> {
57 #[inline]
60 pub fn new_rw(rwsource: RW) -> Self {
61 Self::with_buffer(RWDataSource::new(rwsource), FixedSizeBuffer::new())
62 }
63}
64
65impl<RW: io::Read + io::Write, B: Buffer<u8>> Source<RWDataSource<RW>, B> {
66 #[inline]
68 pub fn from_read_write(source: RW, buffer: B) -> Self {
69 Self::with_buffer(RWDataSource::new(source), buffer)
70 }
71}
72
73impl<I: Iterator, B: Buffer<I::Item>> Source<IteratorDataSource<I>, B>
74where
75 I::Item: Copy + PartialEq,
76{
77 #[inline]
79 pub fn from_iter(source: I, buffer: B) -> Self {
80 Self::with_buffer(IteratorDataSource::new(source), buffer)
81 }
82}
83
84impl<S: DataSource, B: Buffer<S::Item>> Source<S, B> {
85 #[inline]
87 pub fn with_buffer(source: S, buffer: B) -> Self {
88 Source {
89 source,
90 buffer,
91 request: 0,
92 state: ParserState::INCOMPLETE | ParserState::AUTOMATIC_FILL,
93 }
94 }
95
96 #[inline]
98 fn fill_requested(&mut self, request: usize) -> io::Result<usize> {
99 let mut read = 0;
100
101 let buffer = &mut self.buffer;
102 let source = &mut self.source;
103
104 if buffer.len() < request {
105 let diff = request - buffer.len();
106
107 buffer.request_space(diff);
108
109 while buffer.len() < request {
110 match buffer.fill(source)? {
111 0 => break,
112 n => read += n,
113 }
114 }
115 }
116
117 Ok(read)
118 }
119
120 #[inline]
122 pub fn fill(&mut self) -> io::Result<usize> {
123 let req = self.buffer.len() + 1;
124
125 self.fill_requested(req).map(|n| {
126 self.state.remove(ParserState::INCOMPLETE);
127
128 if n > 0 {
129 self.state.remove(ParserState::END_OF_INPUT);
130 } else {
131 self.state.insert(ParserState::END_OF_INPUT);
132 }
133
134 n
135 })
136 }
137
138 #[inline]
141 pub fn len(&self) -> usize {
142 self.buffer.len()
143 }
144
145 #[inline]
147 pub fn is_empty(&self) -> bool {
148 self.state.contains(ParserState::END_OF_INPUT) && self.len() == 0
149 }
150
151 #[inline]
155 pub fn capacity(&self) -> usize {
156 self.buffer.capacity()
157 }
158
159 #[inline]
161 pub fn buffer(&self) -> &[S::Item] {
162 &self.buffer
163 }
164
165 #[inline]
171 pub fn reset(&mut self) {
172 self.state = ParserState::empty();
173 }
174
175 #[inline]
180 pub fn set_autofill(&mut self, value: bool) {
181 if value {
182 self.state.insert(ParserState::AUTOMATIC_FILL)
183 } else {
184 self.state.remove(ParserState::AUTOMATIC_FILL)
185 }
186 }
187}
188
189impl<S: DataSource<Item = u8>, B: Buffer<u8>> io::Read for Source<S, B> {
190 #[inline]
191 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
192 if buf.len() > self.len() {
193 self.fill_requested(buf.len())?;
194 }
195
196 (&self.buffer[..]).read(buf).map(|n| {
197 self.buffer.consume(n);
198
199 n
200 })
201 }
202}
203
204impl<S: DataSource<Item = u8>, B: Buffer<u8>> io::BufRead for Source<S, B> {
205 #[inline]
206 fn fill_buf(&mut self) -> io::Result<&[u8]> {
207 let cap = self.buffer.capacity();
208
209 self.fill_requested(cap)?;
210
211 Ok(self.buffer())
212 }
213
214 #[inline]
215 fn consume(&mut self, num: usize) {
216 self.buffer.consume(num)
217 }
218}
219
220impl<RW: io::Read + io::Write, B: Buffer<u8>> io::Write for Source<RWDataSource<RW>, B> {
221 #[inline]
222 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
223 self.source.write(buf)
224 }
225
226 #[inline]
227 fn flush(&mut self) -> io::Result<()> {
228 self.source.flush()
229 }
230}
231
232impl<'a, S: DataSource, B: Buffer<S::Item>> Stream<'a, 'a> for Source<S, B>
233where
234 S::Item: 'a,
235{
236 type Input = InputBuf<'a, S::Item>;
237
238 #[inline]
239 fn parse<F, T, E>(
240 &'a mut self,
241 f: F,
242 ) -> Result<T, StreamError<<Self::Input as Input>::Buffer, E>>
243 where
244 F: FnOnce(Self::Input) -> ParseResult<Self::Input, T, E>,
245 T: 'a,
246 E: 'a,
247 {
248 use crate::primitives::Primitives;
249
250 if self
251 .state
252 .contains(ParserState::INCOMPLETE | ParserState::AUTOMATIC_FILL)
253 {
254 self.fill().map_err(StreamError::IoError)?;
255 }
256
257 if self.is_empty() {
258 return Err(StreamError::EndOfInput);
259 }
260
261 match f(InputBuf::new(&self.buffer)).into_inner() {
262 (remainder, Ok(data)) => {
263 if remainder.is_incomplete() && !self.state.contains(ParserState::END_OF_INPUT) {
264 self.state.insert(ParserState::INCOMPLETE);
266
267 Err(StreamError::Retry)
268 } else {
269 self.buffer.consume(self.buffer.len() - remainder.len());
271
272 Ok(data)
273 }
274 }
275 (mut remainder, Err(err)) => {
276 match (
277 remainder.is_incomplete(),
278 self.state.contains(ParserState::END_OF_INPUT),
279 ) {
280 (true, true) => Err(StreamError::Incomplete),
281 (true, false) => {
282 self.state.insert(ParserState::INCOMPLETE);
283
284 Err(StreamError::Retry)
285 }
286 _ => {
287 self.buffer.consume(self.buffer.len() - remainder.len());
290
291 Err(StreamError::ParseError(remainder.consume_remaining(), err))
292 }
293 }
294 }
295 }
296 }
297}
298
299#[cfg(test)]
300mod test {
301 use std::io;
302
303 use super::*;
304 use crate::buffer::data_source::ReadDataSource;
305 use crate::buffer::{FixedSizeBuffer, Stream, StreamError};
306 use crate::parsers::{any, take, take_while, Error};
307 use crate::types::Input;
308
309 fn buf(
310 source: &[u8],
311 buffer_length: usize,
312 ) -> Source<ReadDataSource<io::Cursor<&[u8]>>, FixedSizeBuffer<u8>> {
313 Source::with_buffer(
314 ReadDataSource::new(io::Cursor::new(source)),
315 FixedSizeBuffer::with_size(buffer_length),
316 )
317 }
318
319 #[test]
320 #[should_panic]
321 fn bufsize_zero() {
322 let _ = buf(&b"this is a test"[..], 0);
323 }
324
325 #[test]
326 fn default_bufsize() {
327 let b: Source<_, FixedSizeBuffer<_>> = Source::new(io::Cursor::new(&b"test"[..]));
328
329 assert!(b.capacity() > 0);
330 }
331
332 #[test]
333 fn empty_buf() {
334 let mut n = 0;
335 let mut b = Source::new(io::Cursor::new(&b""[..]));
336
337 let r = b.parse(|i| {
338 n += 1;
339
340 take(i, 1).bind(|i, _| i.ret::<_, Error<_>>(true))
341 });
342
343 assert_eq!(r, Err(StreamError::EndOfInput));
344 assert_eq!(n, 0);
345 }
346
347 #[test]
348 fn fill() {
349 let mut n = 0; let mut m = 0; let mut b = buf(&b"test"[..], 1);
352
353 assert_eq!(
354 b.parse(|i| {
355 n += 1;
356 any(i).inspect(|_| m += 1)
357 }),
358 Ok(b't')
359 );
360 assert_eq!(n, 1);
361 assert_eq!(m, 1);
362 assert_eq!(
363 b.parse(|i| {
364 n += 1;
365 any(i).inspect(|_| m += 1)
366 }),
367 Err(StreamError::Retry)
368 );
369 assert_eq!(n, 2);
370 assert_eq!(m, 1);
371 assert_eq!(
372 b.parse(|i| {
373 n += 1;
374 any(i).inspect(|_| m += 1)
375 }),
376 Ok(b'e')
377 );
378 assert_eq!(n, 3);
379 assert_eq!(m, 2);
380 assert_eq!(
381 b.parse(|i| {
382 n += 1;
383 any(i).inspect(|_| m += 1)
384 }),
385 Err(StreamError::Retry)
386 );
387 assert_eq!(n, 4);
388 assert_eq!(m, 2);
389 assert_eq!(
390 b.parse(|i| {
391 n += 1;
392 any(i).inspect(|_| m += 1)
393 }),
394 Ok(b's')
395 );
396 assert_eq!(n, 5);
397 assert_eq!(m, 3);
398 assert_eq!(
399 b.parse(|i| {
400 n += 1;
401 any(i).inspect(|_| m += 1)
402 }),
403 Err(StreamError::Retry)
404 );
405 assert_eq!(n, 6);
406 assert_eq!(m, 3);
407 assert_eq!(
408 b.parse(|i| {
409 n += 1;
410 any(i).inspect(|_| m += 1)
411 }),
412 Ok(b't')
413 );
414 assert_eq!(n, 7);
415 assert_eq!(m, 4);
416 assert_eq!(
417 b.parse(|i| {
418 n += 1;
419 any(i).inspect(|_| m += 1)
420 }),
421 Err(StreamError::Retry)
422 );
423 assert_eq!(n, 8);
424 assert_eq!(m, 4);
425 assert_eq!(
426 b.parse(|i| {
427 n += 1;
428 any(i).inspect(|_| m += 1)
429 }),
430 Err(StreamError::EndOfInput)
431 );
432 assert_eq!(n, 8);
433 assert_eq!(m, 4);
434 assert_eq!(
435 b.parse(|i| {
436 n += 1;
437 any(i).inspect(|_| m += 1)
438 }),
439 Err(StreamError::EndOfInput)
440 );
441 assert_eq!(n, 8);
442 assert_eq!(m, 4);
443 }
444
445 #[test]
446 fn fill2() {
447 let mut n = 0; let mut m = 0; let mut b = buf(&b"test"[..], 2);
450
451 assert_eq!(
452 b.parse(|i| {
453 n += 1;
454 any(i).inspect(|_| m += 1)
455 }),
456 Ok(b't')
457 );
458 assert_eq!(n, 1);
459 assert_eq!(m, 1);
460 assert_eq!(
461 b.parse(|i| {
462 n += 1;
463 any(i).inspect(|_| m += 1)
464 }),
465 Ok(b'e')
466 );
467 assert_eq!(n, 2);
468 assert_eq!(m, 2);
469 assert_eq!(
470 b.parse(|i| {
471 n += 1;
472 any(i).inspect(|_| m += 1)
473 }),
474 Err(StreamError::Retry)
475 );
476 assert_eq!(n, 3);
477 assert_eq!(m, 2);
478 assert_eq!(
479 b.parse(|i| {
480 n += 1;
481 any(i).inspect(|_| m += 1)
482 }),
483 Ok(b's')
484 );
485 assert_eq!(n, 4);
486 assert_eq!(m, 3);
487 assert_eq!(
488 b.parse(|i| {
489 n += 1;
490 any(i).inspect(|_| m += 1)
491 }),
492 Ok(b't')
493 );
494 assert_eq!(n, 5);
495 assert_eq!(m, 4);
496 assert_eq!(
497 b.parse(|i| {
498 n += 1;
499 any(i).inspect(|_| m += 1)
500 }),
501 Err(StreamError::Retry)
502 );
503 assert_eq!(n, 6);
504 assert_eq!(m, 4);
505 assert_eq!(
506 b.parse(|i| {
507 n += 1;
508 any(i).inspect(|_| m += 1)
509 }),
510 Err(StreamError::EndOfInput)
511 );
512 assert_eq!(n, 6);
513 assert_eq!(m, 4);
514 assert_eq!(
515 b.parse(|i| {
516 n += 1;
517 any(i).inspect(|_| m += 1)
518 }),
519 Err(StreamError::EndOfInput)
520 );
521 assert_eq!(n, 6);
522 assert_eq!(m, 4);
523 }
524
525 #[test]
526 fn fill3() {
527 let mut n = 0; let mut m = 0; let mut b = buf(&b"test"[..], 3);
530
531 assert_eq!(
532 b.parse(|i| {
533 n += 1;
534 take(i, 2).inspect(|_| m += 1)
535 }),
536 Ok(&b"te"[..])
537 );
538 assert_eq!(n, 1);
539 assert_eq!(m, 1);
540 assert_eq!(
541 b.parse(|i| {
542 n += 1;
543 take(i, 2).inspect(|_| m += 1)
544 }),
545 Err(StreamError::Retry)
546 );
547 assert_eq!(n, 2);
548 assert_eq!(m, 1);
549 assert_eq!(
550 b.parse(|i| {
551 n += 1;
552 take(i, 2).inspect(|_| m += 1)
553 }),
554 Ok(&b"st"[..])
555 );
556 assert_eq!(n, 3);
557 assert_eq!(m, 2);
558 assert_eq!(
559 b.parse(|i| {
560 n += 1;
561 take(i, 2).inspect(|_| m += 1)
562 }),
563 Err(StreamError::Retry)
564 );
565 assert_eq!(n, 4);
566 assert_eq!(m, 2);
567 assert_eq!(
568 b.parse(|i| {
569 n += 1;
570 take(i, 2).inspect(|_| m += 1)
571 }),
572 Err(StreamError::EndOfInput)
573 );
574 assert_eq!(n, 4);
575 assert_eq!(m, 2);
576 assert_eq!(
577 b.parse(|i| {
578 n += 1;
579 take(i, 2).inspect(|_| m += 1)
580 }),
581 Err(StreamError::EndOfInput)
582 );
583 assert_eq!(n, 4);
584 assert_eq!(m, 2);
585 }
586
587 #[test]
588 fn incomplete() {
589 let mut n = 0; let mut m = 0; let mut b = buf(&b"tes"[..], 2);
592
593 assert_eq!(
594 b.parse(|i| {
595 n += 1;
596 take(i, 2).inspect(|_| m += 1)
597 }),
598 Ok(&b"te"[..])
599 );
600 assert_eq!(n, 1);
601 assert_eq!(m, 1);
602 assert_eq!(
603 b.parse(|i| {
604 n += 1;
605 take(i, 2).inspect(|_| m += 1)
606 }),
607 Err(StreamError::Retry)
608 );
609 assert_eq!(n, 2);
610 assert_eq!(m, 1);
611 assert_eq!(
612 b.parse(|i| {
613 n += 1;
614 take(i, 2).inspect(|_| m += 1)
615 }),
616 Err(StreamError::Retry)
617 );
618 assert_eq!(n, 3);
619 assert_eq!(m, 1);
620 assert_eq!(
621 b.parse(|i| {
622 n += 1;
623 take(i, 2).inspect(|_| m += 1)
624 }),
625 Err(StreamError::Incomplete)
626 );
627 assert_eq!(n, 4);
628 assert_eq!(m, 1);
629 assert_eq!(
630 b.parse(|i| {
631 n += 1;
632 take(i, 2).inspect(|_| m += 1)
633 }),
634 Err(StreamError::Incomplete)
635 );
636 assert_eq!(n, 5);
637 assert_eq!(m, 1);
638 }
639
640 #[test]
641 fn incomplete2() {
642 let mut o = 0;
643 let mut n = 0; let mut m = 0; let mut b = buf(&b"tes"[..], 2);
646
647 assert_eq!(
648 b.parse(|i| {
649 n += 1;
650 take_while(i, |_| {
651 o += 1;
652 o < 2
653 })
654 .inspect(|_| m += 1)
655 }),
656 Ok(&b"t"[..])
657 );
658 assert_eq!(n, 1);
659 assert_eq!(m, 1);
660 o = 0;
661 assert_eq!(
662 b.parse(|i| {
663 n += 1;
664 take_while(i, |_| {
665 o += 1;
666 o < 2
667 })
668 .inspect(|_| m += 1)
669 }),
670 Err(StreamError::Retry)
671 );
672 assert_eq!(n, 2);
673 assert_eq!(m, 2);
674 o = 0;
675 assert_eq!(
676 b.parse(|i| {
677 n += 1;
678 take_while(i, |_| {
679 o += 1;
680 o < 2
681 })
682 .inspect(|_| m += 1)
683 }),
684 Ok(&b"e"[..])
685 );
686 assert_eq!(n, 3);
687 assert_eq!(m, 3);
688 o = 0;
689 assert_eq!(
690 b.parse(|i| {
691 n += 1;
692 take_while(i, |_| {
693 o += 1;
694 o < 2
695 })
696 .inspect(|_| m += 1)
697 }),
698 Err(StreamError::Retry)
699 );
700 assert_eq!(n, 4);
701 assert_eq!(m, 4);
702 o = 0;
703 assert_eq!(
704 b.parse(|i| {
705 n += 1;
706 take_while(i, |_| {
707 o += 1;
708 o < 2
709 })
710 .inspect(|_| m += 1)
711 }),
712 Ok(&b"s"[..])
713 );
714 assert_eq!(n, 5);
715 assert_eq!(m, 5);
716 o = 0;
717 assert_eq!(
718 b.parse(|i| {
719 n += 1;
720 take_while(i, |_| {
721 o += 1;
722 o < 2
723 })
724 .inspect(|_| m += 1)
725 }),
726 Err(StreamError::EndOfInput)
727 );
728 assert_eq!(n, 5);
729 assert_eq!(m, 5);
730 }
731
732 #[test]
733 fn no_autofill() {
734 let mut n = 0; let mut m = 0; let mut b = buf(&b"test"[..], 2);
737
738 b.set_autofill(false);
739
740 assert_eq!(
741 b.parse(|i| {
742 n += 1;
743 take(i, 2).inspect(|_| m += 1)
744 }),
745 Err(StreamError::Retry)
746 );
747 assert_eq!(n, 1);
748 assert_eq!(m, 0);
749
750 assert_eq!(b.fill().unwrap(), 2);
751
752 assert_eq!(
753 b.parse(|i| {
754 n += 1;
755 take(i, 2).inspect(|_| m += 1)
756 }),
757 Ok(&b"te"[..])
758 );
759 assert_eq!(n, 2);
760 assert_eq!(m, 1);
761 assert_eq!(
762 b.parse(|i| {
763 n += 1;
764 take(i, 2).inspect(|_| m += 1)
765 }),
766 Err(StreamError::Retry)
767 );
768 assert_eq!(n, 3);
769 assert_eq!(m, 1);
770
771 assert_eq!(b.fill().unwrap(), 2);
772
773 assert_eq!(
774 b.parse(|i| {
775 n += 1;
776 take(i, 2).inspect(|_| m += 1)
777 }),
778 Ok(&b"st"[..])
779 );
780 assert_eq!(n, 4);
781 assert_eq!(m, 2);
782 assert_eq!(
783 b.parse(|i| {
784 n += 1;
785 take(i, 2).inspect(|_| m += 1)
786 }),
787 Err(StreamError::Retry)
788 );
789 assert_eq!(n, 5);
790 assert_eq!(m, 2);
791
792 assert_eq!(b.fill().unwrap(), 0);
793
794 assert_eq!(
795 b.parse(|i| {
796 n += 1;
797 take(i, 2).inspect(|_| m += 1)
798 }),
799 Err(StreamError::EndOfInput)
800 );
801 assert_eq!(n, 5);
802 assert_eq!(m, 2);
803 }
804
805 #[test]
806 fn no_autofill_first() {
807 let mut n = 0; let mut m = 0; let mut b = buf(&b"ab"[..], 1);
810
811 b.set_autofill(false);
812
813 assert_eq!(b.fill().unwrap(), 1);
814
815 assert_eq!(
816 b.parse(|i| {
817 n += 1;
818 any(i).inspect(|_| m += 1)
819 }),
820 Ok(b'a')
821 );
822 assert_eq!(n, 1);
823 assert_eq!(m, 1);
824 assert_eq!(
825 b.parse(|i| {
826 n += 1;
827 any(i).inspect(|_| m += 1)
828 }),
829 Err(StreamError::Retry)
830 );
831 assert_eq!(n, 2);
832 assert_eq!(m, 1);
833
834 assert_eq!(b.fill().unwrap(), 1);
835
836 assert_eq!(
837 b.parse(|i| {
838 n += 1;
839 any(i).inspect(|_| m += 1)
840 }),
841 Ok(b'b')
842 );
843 assert_eq!(n, 3);
844 assert_eq!(m, 2);
845 assert_eq!(
846 b.parse(|i| {
847 n += 1;
848 any(i).inspect(|_| m += 1)
849 }),
850 Err(StreamError::Retry)
851 );
852 assert_eq!(n, 4);
853 assert_eq!(m, 2);
854
855 assert_eq!(b.fill().unwrap(), 0);
856
857 assert_eq!(
858 b.parse(|i| {
859 n += 1;
860 any(i).inspect(|_| m += 1)
861 }),
862 Err(StreamError::EndOfInput)
863 );
864 assert_eq!(n, 4);
865 assert_eq!(m, 2);
866 }
867}