1use std::error;
14use std::io;
15use std::io::{Cursor, Read, Write, SeekFrom, Error, ErrorKind};
16use byteorder::{ReadBytesExt, LittleEndian};
17use std::collections::HashMap;
18use std::collections::hash_map::Entry;
19use std::fmt::{Display, Formatter, Error as FmtError};
20use std::mem::replace;
21use crate::crc::vorbis_crc32_update;
22use crate::Packet;
23use std::io::Seek;
24use std::sync::Arc;
25
26#[derive(Debug)]
28pub enum OggReadError {
29 NoCapturePatternFound,
32 InvalidStreamStructVer(u8),
35 HashMismatch(u32, u32),
37 ReadError(io::Error),
39 InvalidData,
41}
42
43impl OggReadError {
44 fn description_str(&self) -> &str {
45 match *self {
46 OggReadError::NoCapturePatternFound => "No Ogg capture pattern found",
47 OggReadError::InvalidStreamStructVer(_) =>
48 "A non zero stream structure version was passed",
49 OggReadError::HashMismatch(_, _) => "CRC32 hash mismatch",
50 OggReadError::ReadError(_) => "I/O error",
51 OggReadError::InvalidData => "Constraint violated",
52 }
53 }
54}
55
56impl error::Error for OggReadError {
57 fn description(&self) -> &str {
58 self.description_str()
59 }
60
61 fn cause(&self) -> Option<&dyn error::Error> {
62 match *self {
63 OggReadError::ReadError(ref err) => Some(err as &dyn error::Error),
64 _ => None
65 }
66 }
67}
68
69impl Display for OggReadError {
70 fn fmt(&self, fmt :&mut Formatter) -> Result<(), FmtError> {
71 write!(fmt, "{}", Self::description_str(self))
72 }
73}
74
75impl From<io::Error> for OggReadError {
76 fn from(err :io::Error) -> OggReadError {
77 return OggReadError::ReadError(err);
78 }
79}
80
81struct PageBaseInfo {
83 starts_with_continued :bool,
85 first_page :bool,
87 last_page :bool,
89 absgp :u64,
91 sequence_num :u32,
93 checksum :u32,
95 packet_positions :Vec<(u16,u16)>,
101 ends_with_continued :bool,
104}
105
106struct PageInfo {
108 bi :PageBaseInfo,
110 packet_idx :u8,
112 page_body :Vec<u8>,
114
115 last_overlap_pck :Vec<Vec<u8>>,
120}
121
122impl PageInfo {
123 fn is_first_pck_in_pg(&self) -> bool {
126 return self.packet_idx == 0;
127 }
128 fn is_last_pck_in_pg(&self) -> bool {
133 return (self.packet_idx + 1 + (self.bi.ends_with_continued as u8)) as usize
134 == self.bi.packet_positions.len();
135 }
136}
137
138pub struct OggPage(PageParser);
140
141impl OggPage {
142 fn has_packet_end(&self) -> bool {
144 (self.0.bi.packet_positions.len() -
145 self.0.bi.ends_with_continued as usize) > 0
146 }
147 fn has_whole_packet(&self) -> bool {
150 self.0.bi.packet_positions.len().saturating_sub(
151 self.0.bi.ends_with_continued as usize +
152 self.0.bi.starts_with_continued as usize) > 0
153 }
154 fn has_packet_start(&self) -> bool {
156 (self.0.bi.packet_positions.len() -
157 self.0.bi.starts_with_continued as usize) > 0
158 }
159}
160
161#[derive(Debug, Clone)]
163#[non_exhaustive] pub struct PageParsingOptions {
165 pub verify_checksum :bool,
170}
171
172impl Default for PageParsingOptions {
173 fn default() -> Self {
174 Self {
175 verify_checksum: !cfg!(fuzzing),
179 }
180 }
181}
182
183pub struct PageParser {
192 bi :PageBaseInfo,
195
196 stream_serial :u32,
197 checksum :u32,
198 header_buf: [u8; 27],
199 packet_count :u16, segments_or_packets_buf :Vec<u8>,
204
205 parse_opts :Arc<PageParsingOptions>,
206}
207
208impl PageParser {
209 pub fn new(header_buf :[u8; 27]) -> Result<(PageParser, usize), OggReadError> {
221 Self::new_with_parse_opts(header_buf, PageParsingOptions::default())
222 }
223
224 pub fn new_with_parse_opts(header_buf :[u8; 27], parse_opts :impl Into<Arc<PageParsingOptions>>) -> Result<(PageParser, usize), OggReadError> {
238 let mut header_rdr = Cursor::new(header_buf);
239 header_rdr.set_position(4);
240 let stream_structure_version = tri!(header_rdr.read_u8());
241 if stream_structure_version != 0 {
242 tri!(Err(OggReadError::InvalidStreamStructVer(stream_structure_version)));
243 }
244 let header_type_flag = header_rdr.read_u8().unwrap();
245 let absgp = header_rdr.read_u64::<LittleEndian>().unwrap();
246 let stream_serial = header_rdr.read_u32::<LittleEndian>().unwrap();
247 let sequence_num = header_rdr.read_u32::<LittleEndian>().unwrap();
248 let checksum = header_rdr.read_u32::<LittleEndian>().unwrap();
249
250 Ok((PageParser {
251 bi : PageBaseInfo {
252 starts_with_continued : header_type_flag & 0x01u8 != 0,
253 first_page : header_type_flag & 0x02u8 != 0,
254 last_page : header_type_flag & 0x04u8 != 0,
255 absgp,
256 sequence_num,
257 checksum,
258 packet_positions : Vec::new(),
259 ends_with_continued : false,
260 },
261 stream_serial,
262 checksum,
263 header_buf,
264 packet_count : 0,
265 segments_or_packets_buf :Vec::new(),
266 parse_opts: parse_opts.into(),
267 },
268 header_rdr.read_u8().unwrap() as usize
270 ))
271 }
272
273 pub fn parse_segments(&mut self, segments_buf :Vec<u8>) -> usize {
279 let mut page_siz :u16 = 0; self.bi.ends_with_continued = self.bi.starts_with_continued;
282
283 for val in &segments_buf {
287 page_siz += *val as u16;
288 self.packet_count += (*val < 255) as u16;
290 self.bi.ends_with_continued = !(*val < 255);
291 }
292
293 let mut packets = Vec::with_capacity(self.packet_count as usize
294 + self.bi.ends_with_continued as usize);
295 let mut cur_packet_siz :u16 = 0;
296 let mut cur_packet_offs :u16 = 0;
297
298 for val in &segments_buf {
301 cur_packet_siz += *val as u16;
302 if *val < 255 {
303 packets.push((cur_packet_offs, cur_packet_siz));
304 cur_packet_offs += cur_packet_siz;
305 cur_packet_siz = 0;
306 }
307 }
308 if self.bi.ends_with_continued {
309 packets.push((cur_packet_offs, cur_packet_siz));
310 }
311
312 self.bi.packet_positions = packets;
313 self.segments_or_packets_buf = segments_buf;
314 page_siz as usize
315 }
316
317 pub fn parse_packet_data(mut self, packet_data :Vec<u8>) ->
321 Result<OggPage, OggReadError> {
322 if self.parse_opts.verify_checksum {
323 self.header_buf[22] = 0;
326 self.header_buf[23] = 0;
327 self.header_buf[24] = 0;
328 self.header_buf[25] = 0;
329
330 let mut hash_calculated :u32;
332 hash_calculated = vorbis_crc32_update(0, &self.header_buf);
333 hash_calculated = vorbis_crc32_update(hash_calculated,
334 &self.segments_or_packets_buf);
335 hash_calculated = vorbis_crc32_update(hash_calculated, &packet_data);
336
337 if self.checksum != hash_calculated {
339 tri!(Err(OggReadError::HashMismatch(self.checksum, hash_calculated)));
340 }
341 }
342 self.segments_or_packets_buf = packet_data;
343 Ok(OggPage(self))
344 }
345}
346
347pub struct BasePacketReader {
365 page_infos :HashMap<u32, PageInfo>,
371
372 stream_with_stuff :Option<u32>,
376
377 has_seeked :bool,
380}
381
382impl BasePacketReader {
383 pub fn new() -> Self {
388 BasePacketReader { page_infos: HashMap::new(),
389 stream_with_stuff: None, has_seeked: false }
390 }
391 pub fn read_packet(&mut self) -> Option<Packet> {
397 if self.stream_with_stuff == None {
398 return None;
399 }
400 let str_serial :u32 = self.stream_with_stuff.unwrap();
401 let pg_info = self.page_infos.get_mut(&str_serial).unwrap();
402 let (offs, len) = pg_info.bi.packet_positions[pg_info.packet_idx as usize];
403 let need_to_glue = pg_info.packet_idx == 0 &&
406 pg_info.bi.starts_with_continued &&
407 !(pg_info.bi.ends_with_continued && pg_info.bi.packet_positions.len() == 1);
408 let packet_content :Vec<u8> = if need_to_glue {
409 let mut siz :usize = 0;
411 for pck in pg_info.last_overlap_pck.iter() {
412 siz += pck.len();
413 }
414 siz += len as usize;
415 let mut cont :Vec<u8> = Vec::with_capacity(siz);
416
417 for pck in pg_info.last_overlap_pck.iter() {
419 cont.write_all(pck).unwrap();
420 }
421 pg_info.last_overlap_pck = Vec::new();
423 cont.write_all(&pg_info.page_body[offs as usize .. (offs + len) as usize]).unwrap();
424
425 cont
426 } else {
427 let mut cont :Vec<u8> = Vec::with_capacity(len as usize);
428 let cont_slice :&[u8] = &pg_info.page_body[offs as usize .. (offs + len) as usize];
432 cont.write_all(cont_slice).unwrap();
433 cont
434 };
435
436 let first_pck_in_pg = pg_info.is_first_pck_in_pg();
437 let first_pck_overall = pg_info.bi.first_page && first_pck_in_pg;
438
439 let last_pck_in_pg = pg_info.is_last_pck_in_pg();
440 let last_pck_overall = pg_info.bi.last_page && last_pck_in_pg;
441
442 pg_info.packet_idx += 1;
444 if last_pck_in_pg {
447 self.stream_with_stuff = None;
448 }
449
450 return Some(Packet {
451 data: packet_content,
452 first_packet_pg: first_pck_in_pg,
453 first_packet_stream: first_pck_overall,
454 last_packet_pg: last_pck_in_pg,
455 last_packet_stream: last_pck_overall,
456 absgp_page: pg_info.bi.absgp,
457 checksum_page: pg_info.bi.checksum,
458 stream_serial: str_serial,
459 });
460 }
461
462 pub fn push_page(&mut self, page :OggPage) -> Result<(), OggReadError> {
469 let mut pg_prs = page.0;
470 match self.page_infos.entry(pg_prs.stream_serial) {
471 Entry::Occupied(mut o) => {
472 let inf = o.get_mut();
473 if pg_prs.bi.first_page {
474 tri!(Err(OggReadError::InvalidData));
475 }
476 if pg_prs.bi.starts_with_continued != inf.bi.ends_with_continued {
477 if !self.has_seeked {
478 tri!(Err(OggReadError::InvalidData));
479 } else {
480 inf.last_overlap_pck.clear();
484 if pg_prs.bi.starts_with_continued {
485 pg_prs.bi.packet_positions.remove(0);
486 if pg_prs.packet_count != 0 {
487 pg_prs.packet_count -= 1;
489 } else {
490 pg_prs.bi.ends_with_continued = false;
495 }
496 }
497 }
498 } else if pg_prs.bi.starts_with_continued {
499 let (offs, len) = inf.bi.packet_positions[inf.packet_idx as usize];
502 if len as usize != inf.page_body.len() {
503 let mut tmp = Vec::with_capacity(len as usize);
504 tmp.write_all(&inf.page_body[offs as usize .. (offs + len) as usize]).unwrap();
505 inf.last_overlap_pck.push(tmp);
506 } else {
507 inf.last_overlap_pck.push(replace(&mut inf.page_body, vec![0;0]));
509 }
510
511 }
512 inf.bi = pg_prs.bi;
513 inf.packet_idx = 0;
514 inf.page_body = pg_prs.segments_or_packets_buf;
515 },
516 Entry::Vacant(v) => {
517 if !self.has_seeked {
518 if !pg_prs.bi.first_page || pg_prs.bi.starts_with_continued {
519 tri!(Err(OggReadError::InvalidData));
521 }
522 } else {
523 if !pg_prs.bi.first_page {
524 }
526 if pg_prs.bi.starts_with_continued {
527 pg_prs.bi.packet_positions.remove(0);
530 if pg_prs.packet_count != 0 {
531 pg_prs.packet_count -= 1;
533 } else {
534 pg_prs.bi.ends_with_continued = false;
539 }
540 pg_prs.bi.starts_with_continued = false;
542 }
543 }
544 v.insert(PageInfo {
545 bi : pg_prs.bi,
546 packet_idx: 0,
547 page_body: pg_prs.segments_or_packets_buf,
548 last_overlap_pck: Vec::new(),
549 });
550 },
551 }
552 let pg_has_stuff :bool = pg_prs.packet_count > 0;
553
554 if pg_has_stuff {
555 self.stream_with_stuff = Some(pg_prs.stream_serial);
556 } else {
557 self.stream_with_stuff = None;
558 }
559
560 return Ok(());
561 }
562
563 pub fn update_after_seek(&mut self) {
569 self.stream_with_stuff = None;
570 self.page_infos = HashMap::new();
571 self.has_seeked = true;
572 }
573}
574
575#[derive(Clone, Copy)]
576enum UntilPageHeaderReaderMode {
577 Searching,
578 FoundWithNeeded(u8),
579 SeekNeeded(i32),
580 Found,
581}
582
583enum UntilPageHeaderResult {
584 Eof,
585 Found,
586 ReadNeeded,
587 SeekNeeded,
588}
589
590struct UntilPageHeaderReader {
591 mode :UntilPageHeaderReaderMode,
592 cpt_of :u8,
596 ret_buf :[u8; 27],
598 read_amount :usize,
599}
600
601impl UntilPageHeaderReader {
602 pub fn new() -> Self {
603 UntilPageHeaderReader {
604 mode : UntilPageHeaderReaderMode::Searching,
605 cpt_of : 0,
606 ret_buf : [0; 27],
607 read_amount : 0,
608 }
609 }
610 fn check_arr(&mut self, arr :&[u8]) -> Option<usize> {
616 for (i, ch) in arr.iter().enumerate() {
617 match *ch {
618 b'O' => self.cpt_of = 1,
619 b'g' if self.cpt_of == 1 || self.cpt_of == 2 => self.cpt_of += 1,
620 b'S' if self.cpt_of == 3 => return Some(i),
621 _ => self.cpt_of = 0,
622 }
623 }
624 return None;
625 }
626 pub fn do_read<R :Read>(&mut self, mut rdr :R)
631 -> Result<UntilPageHeaderResult, OggReadError> {
632 use self::UntilPageHeaderReaderMode::*;
633 use self::UntilPageHeaderResult as Res;
634 let mut buf :[u8; 1024] = [0; 1024];
637
638 let rd_len = tri!(rdr.read(if self.read_amount < 27 {
639 &mut buf[0 .. 27 - self.read_amount]
643 } else {
644 match self.mode {
645 Searching => &mut buf,
646 FoundWithNeeded(amount) => &mut buf[0 .. amount as usize],
647 SeekNeeded(_) => return Ok(Res::SeekNeeded),
648 Found => return Ok(Res::Found),
649 }
650 }));
651
652 if rd_len == 0 {
653 return Ok(Res::Eof);
664 }
665 self.read_amount += rd_len;
666
667 let read_amount_max = 150 * 1024;
672 if self.read_amount > read_amount_max {
673 tri!(Err(OggReadError::NoCapturePatternFound));
676 }
677
678 let rd_buf = &buf[0 .. rd_len];
679
680 use std::cmp::min;
681 let (off, needed) = match self.mode {
682 Searching => match self.check_arr(rd_buf) {
683 Some(off) => {
685 self.ret_buf[0] = b'O';
686 self.ret_buf[1] = b'g';
687 self.ret_buf[2] = b'g';
688 self.ret_buf[3] = b'S'; (off, 24)
690 },
691 None => return Ok(Res::ReadNeeded),
693 },
694 FoundWithNeeded(needed) => {
695 (0, needed as usize)
696 },
697 _ => unimplemented!(),
698 };
699
700 let fnd_buf = &rd_buf[off..];
701
702 let copy_amount = min(needed, fnd_buf.len());
703 let start_fill = 27 - needed;
704 (self.ret_buf[start_fill .. copy_amount + start_fill])
705 .copy_from_slice(&fnd_buf[0 .. copy_amount]);
706 #[allow(clippy::comparison_chain)]
711 if fnd_buf.len() == needed {
712 self.mode = Found;
714 return Ok(Res::Found);
715 } else if fnd_buf.len() < needed {
716 let needed_new = needed - copy_amount;
718 self.mode = FoundWithNeeded(needed_new as u8);
719 return Ok(Res::ReadNeeded);
720 } else {
721 self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32);
726 return Ok(Res::SeekNeeded);
727 }
728 }
729 pub fn do_seek<S :Seek>(&mut self, mut skr :S)
730 -> Result<UntilPageHeaderResult, OggReadError> {
731 use self::UntilPageHeaderReaderMode::*;
732 use self::UntilPageHeaderResult as Res;
733 match self.mode {
734 Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded),
735 SeekNeeded(offs) => {
736 tri!(skr.seek(SeekFrom::Current(offs as i64)));
737 self.mode = Found;
738 Ok(Res::Found)
739 },
740 Found => Ok(Res::Found),
741 }
742 }
743 pub fn into_header(self) -> [u8; 27] {
744 use self::UntilPageHeaderReaderMode::*;
745 match self.mode {
746 Found => self.ret_buf,
747 _ => panic!("wrong mode"),
748 }
749 }
750}
751
752pub struct PacketReader<T :io::Read + io::Seek> {
763 rdr :T,
764 pg_parse_opts :Arc<PageParsingOptions>,
765
766 base_pck_rdr :BasePacketReader,
767
768 read_some_pg :bool
769}
770
771impl<T :io::Read + io::Seek> PacketReader<T> {
772 pub fn new(rdr :T) -> PacketReader<T> {
774 Self::new_with_page_parse_opts(rdr, PageParsingOptions::default())
775 }
776 pub fn new_with_page_parse_opts(rdr :T, pg_parse_opts : impl Into<Arc<PageParsingOptions>>) -> PacketReader<T> {
778 PacketReader { rdr, pg_parse_opts: pg_parse_opts.into(), base_pck_rdr : BasePacketReader::new(), read_some_pg : false }
779 }
780 pub fn into_inner(self) -> T {
782 self.rdr
783 }
784 pub const fn get_ref(&self) -> &T {
786 &self.rdr
787 }
788 pub fn get_mut(&mut self) -> &mut T {
796 &mut self.rdr
797 }
798 pub fn read_packet(&mut self) -> Result<Option<Packet>, OggReadError> {
802 loop {
806 if let Some(pck) = self.base_pck_rdr.read_packet() {
807 return Ok(Some(pck));
808 }
809 let page = tri!(self.read_ogg_page());
810 match page {
811 Some(page) => tri!(self.base_pck_rdr.push_page(page)),
812 None => return Ok(None),
813 }
814 }
815 }
816 pub fn read_packet_expected(&mut self) -> Result<Packet, OggReadError> {
822 match tri!(self.read_packet()) {
823 Some(p) => Ok(p),
824 None => tri!(Err(Error::new(ErrorKind::UnexpectedEof,
825 "Expected ogg packet but found end of physical stream"))),
826 }
827 }
828
829 fn read_until_pg_header(&mut self) -> Result<Option<[u8; 27]>, OggReadError> {
838 let mut r = UntilPageHeaderReader::new();
839 use self::UntilPageHeaderResult::*;
840 let mut res = tri!(r.do_read(&mut self.rdr));
841 loop {
842 res = match res {
843 Eof => return Ok(None),
844 Found => {
845 self.read_some_pg = true;
848 break
849 },
850 ReadNeeded => tri!(r.do_read(&mut self.rdr)),
851 SeekNeeded => tri!(r.do_seek(&mut self.rdr))
852 }
853 }
854 Ok(Some(r.into_header()))
855 }
856
857 fn read_ogg_page(&mut self) -> Result<Option<OggPage>, OggReadError> {
863 let header_buf :[u8; 27] = match tri!(self.read_until_pg_header()) {
864 Some(s) => s,
865 None if self.read_some_pg => return Ok(None),
866 None => return Err(OggReadError::NoCapturePatternFound)
867 };
868 let (mut pg_prs, page_segments) = tri!(PageParser::new_with_parse_opts(header_buf, Arc::clone(&self.pg_parse_opts)));
869
870 let mut segments_buf = vec![0; page_segments]; tri!(self.rdr.read_exact(&mut segments_buf));
872
873 let page_siz = pg_prs.parse_segments(segments_buf);
874
875 let mut packet_data = vec![0; page_siz];
876 tri!(self.rdr.read_exact(&mut packet_data));
877
878 Ok(Some(tri!(pg_prs.parse_packet_data(packet_data))))
879 }
880
881 pub fn seek_bytes(&mut self, pos :SeekFrom) -> Result<u64, Error> {
888 let r = tri!(self.rdr.seek(pos));
889 self.base_pck_rdr.update_after_seek();
891 return Ok(r);
892 }
893
894 pub fn seek_absgp(&mut self, stream_serial :Option<u32>,
911 pos_goal :u64) -> Result<bool, OggReadError> {
912 macro_rules! found {
913 ($pos:expr) => {{
914 tri!(self.rdr.seek(SeekFrom::Start($pos)));
916 self.base_pck_rdr.update_after_seek();
917 return Ok(true);
918 }};
919 }
920 macro_rules! bt {
921 ($e:expr) => {{
922 match tri!($e) {
923 Some(s) => s,
924 None => return Ok(false),
925 }
926 }};
927 }
928 macro_rules! pg_read_until_end_or_goal {
933 {$goal:expr} => {{
934 let mut pos;
935 let mut pg;
936 loop {
937 let (n_pos, n_pg) = pg_read_match_serial!();
938 pos = n_pos;
939 pg = n_pg;
940 if pg.0.bi.absgp == $goal {
946 found!(pos);
947 }
948 if pg.0.bi.absgp > $goal {
951 break;
952 }
953 if pg.0.bi.last_page {
955 return Ok(false)
956 }
957 }
959 (pos, pg)
960 }};
961 }
962 macro_rules! pg_read_match_serial {
963 {} => {{
964 let mut pos;
965 let mut pg;
966 let mut continued_pck_start = None;
967 loop {
968 pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
969 pg = bt!(self.read_ogg_page());
970 match stream_serial {
975 Some(s) if pg.0.stream_serial != s => (),
978 _ => match continued_pck_start {
979 None if pg.has_whole_packet() => break,
980 None if pg.has_packet_start() => {
981 continued_pck_start = Some(pos);
982 },
983 Some(s) if pg.has_packet_end() => {
984 pos = s;
989 break;
990 },
991 _ => (),
992 },
993 }
994 }
995 (pos, pg)
996 }};
997 }
998
999 let ab_of = |pg :&OggPage| { pg.0.bi.absgp };
1006 let seq_of = |pg :&OggPage| { pg.0.bi.sequence_num };
1007
1008 tri!(self.rdr.seek(SeekFrom::Start(0)));
1011 let (mut begin_pos, mut begin_pg) = pg_read_match_serial!();
1012
1013 if pos_goal == 0 {
1015 found!(begin_pos);
1017 }
1018
1019 tri!(seek_before_end(&mut self.rdr, 200 * 1024));
1023 let (mut end_pos, mut end_pg) = pg_read_until_end_or_goal!(pos_goal);
1024
1025 loop {
1027 if seq_of(&end_pg) - seq_of(&begin_pg) <= 1 {
1030 found!(end_pos);
1031 }
1032 let pos_to_seek = begin_pos + (end_pos - begin_pos) / 2;
1034 tri!(self.rdr.seek(SeekFrom::Start(pos_to_seek)));
1035 let (pos, pg) = pg_read_match_serial!();
1036 if seq_of(&end_pg) == seq_of(&pg) ||
1041 seq_of(&begin_pg) == seq_of(&pg) {
1042 let mut pos;
1046 let mut pg;
1047 let mut last_packet_end_pos = begin_pos;
1048 tri!(self.rdr.seek(SeekFrom::Start(begin_pos)));
1049 loop {
1050 pos = tri!(self.rdr.stream_position());
1051 pg = bt!(self.read_ogg_page());
1052 match stream_serial {
1057 Some(s) if pg.0.stream_serial != s => (),
1061 _ if ab_of(&pg) == -1i64 as u64 => (),
1062 _ if ab_of(&pg) >= pos_goal => found!(last_packet_end_pos),
1064 _ => if pg.has_packet_end() {
1067 last_packet_end_pos = pos;
1068 },
1069 }
1070 }
1071 }
1072 if ab_of(&pg) >= pos_goal {
1073 end_pos = pos;
1074 end_pg = pg;
1075 } else {
1076 begin_pos = pos;
1077 begin_pg = pg;
1078 }
1079 }
1080 }
1081 pub fn delete_unread_packets(&mut self) {
1084 self.base_pck_rdr.update_after_seek();
1085 }
1086}
1087
1088fn seek_before_end<T :io::Read + io::Seek>(mut rdr :T,
1090 offs :u64) -> Result<u64, OggReadError> {
1091 let end_pos = tri!(rdr.seek(SeekFrom::End(0)));
1092 let end_pos_to_seek = ::std::cmp::min(end_pos, offs);
1093 return Ok(tri!(rdr.seek(SeekFrom::End(-(end_pos_to_seek as i64)))));
1094}
1095
1096#[cfg(feature = "async")]
1097pub mod async_api {
1101 use std::pin::Pin;
1102 use std::task::{Context, Poll};
1103
1104 use super::*;
1105 use futures_core::{ready, Stream};
1106 use futures_io::AsyncRead as FuturesAsyncRead;
1107 use tokio::io::AsyncRead as TokioAsyncRead;
1108 use bytes::BytesMut;
1109 use pin_project::pin_project;
1110 use tokio_util::codec::{Decoder, FramedRead};
1111 use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
1112
1113 enum PageDecodeState {
1114 Head,
1115 Segments(PageParser, usize),
1116 PacketData(PageParser, usize),
1117 InUpdate,
1118 }
1119
1120 impl PageDecodeState {
1121 fn needed_size(&self) -> usize {
1122 match self {
1123 PageDecodeState::Head => 27,
1124 PageDecodeState::Segments(_, s) => *s,
1125 PageDecodeState::PacketData(_, s) => *s,
1126 PageDecodeState::InUpdate => panic!("invalid state"),
1127 }
1128 }
1129 }
1130
1131 struct PageDecoder {
1135 state : PageDecodeState,
1136 parse_opts : Arc<PageParsingOptions>,
1137 }
1138
1139 impl PageDecoder {
1140 fn new(parse_opts : impl Into<Arc<PageParsingOptions>>) -> Self {
1141 PageDecoder {
1142 state : PageDecodeState::Head,
1143 parse_opts : parse_opts.into(),
1144 }
1145 }
1146 }
1147
1148 impl Decoder for PageDecoder {
1149 type Item = OggPage;
1150 type Error = OggReadError;
1151
1152 fn decode(&mut self, buf :&mut BytesMut) ->
1153 Result<Option<OggPage>, OggReadError> {
1154 use self::PageDecodeState::*;
1155 loop {
1156 let needed_size = self.state.needed_size();
1157 if buf.len() < needed_size {
1158 return Ok(None);
1159 }
1160 let mut ret = None;
1161 let consumed_buf = buf.split_to(needed_size).to_vec();
1162
1163 self.state = match ::std::mem::replace(&mut self.state, InUpdate) {
1164 Head => {
1165 let mut hdr_buf = [0; 27];
1166 hdr_buf.copy_from_slice(&consumed_buf);
1169 let tup = tri!(PageParser::new_with_parse_opts(hdr_buf, Arc::clone(&self.parse_opts)));
1170 Segments(tup.0, tup.1)
1171 },
1172 Segments(mut pg_prs, _) => {
1173 let new_needed_len = pg_prs.parse_segments(consumed_buf);
1174 PacketData(pg_prs, new_needed_len)
1175 },
1176 PacketData(pg_prs, _) => {
1177 ret = Some(tri!(pg_prs.parse_packet_data(consumed_buf)));
1178 Head
1179 },
1180 InUpdate => panic!("invalid state"),
1181 };
1182 if ret.is_some() {
1183 return Ok(ret);
1184 }
1185 }
1186 }
1187
1188 fn decode_eof(&mut self, buf :&mut BytesMut) ->
1189 Result<Option<OggPage>, OggReadError> {
1190 return self.decode(buf);
1192 }
1193 }
1194
1195 #[pin_project]
1199 pub struct PacketReader<T> where T :TokioAsyncRead {
1200 base_pck_rdr :BasePacketReader,
1201 #[pin]
1202 pg_rd :FramedRead<T, PageDecoder>,
1203 }
1204
1205 impl<T :TokioAsyncRead> PacketReader<T> {
1206 pub fn new(inner :T) -> Self {
1212 Self::new_with_page_parse_opts(inner, PageParsingOptions::default())
1213 }
1214
1215 pub fn new_with_page_parse_opts(inner :T, pg_parse_opts :impl Into<Arc<PageParsingOptions>>) -> Self {
1221 PacketReader {
1222 base_pck_rdr : BasePacketReader::new(),
1223 pg_rd : FramedRead::new(inner, PageDecoder::new(pg_parse_opts)),
1224 }
1225 }
1226 }
1227
1228 impl<T :FuturesAsyncRead> PacketReader<Compat<T>> {
1229 pub fn new_compat(inner :T) -> Self {
1238 Self::new_compat_with_page_parse_opts(inner, PageParsingOptions::default())
1239 }
1240
1241 pub fn new_compat_with_page_parse_opts(inner :T, pg_parse_opts :impl Into<Arc<PageParsingOptions>>) -> Self {
1250 Self::new_with_page_parse_opts(inner.compat(), pg_parse_opts)
1251 }
1252 }
1253
1254 impl<T :TokioAsyncRead> Stream for PacketReader<T> {
1255 type Item = Result<Packet, OggReadError>;
1256
1257 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1258 let mut this = self.project();
1259 loop {
1263 if let Some(pck) = this.base_pck_rdr.read_packet() {
1264 return Poll::Ready(Some(Ok(pck)));
1265 }
1266 let page = match ready!(this.pg_rd.as_mut().poll_next(cx)) {
1267 Some(Ok(page)) => page,
1268 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
1269 None => return Poll::Ready(None),
1270 };
1271 match this.base_pck_rdr.push_page(page) {
1272 Ok(_) => {},
1273 Err(err) => return Poll::Ready(Some(Err(err))),
1274 };
1275 }
1276 }
1277 }
1278
1279}