use std::error;
use std::io;
use std::io::{Cursor, Read, Write, SeekFrom, Error, ErrorKind};
use byteorder::{ReadBytesExt, LittleEndian};
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::fmt::{Display, Formatter, Error as FmtError};
use std::mem::replace;
use crate::crc::vorbis_crc32_update;
use crate::Packet;
use std::io::Seek;
use std::sync::Arc;
#[derive(Debug)]
pub enum OggReadError {
NoCapturePatternFound,
InvalidStreamStructVer(u8),
HashMismatch(u32, u32),
ReadError(io::Error),
InvalidData,
}
impl OggReadError {
fn description_str(&self) -> &str {
match *self {
OggReadError::NoCapturePatternFound => "No Ogg capture pattern found",
OggReadError::InvalidStreamStructVer(_) =>
"A non zero stream structure version was passed",
OggReadError::HashMismatch(_, _) => "CRC32 hash mismatch",
OggReadError::ReadError(_) => "I/O error",
OggReadError::InvalidData => "Constraint violated",
}
}
}
impl error::Error for OggReadError {
fn description(&self) -> &str {
self.description_str()
}
fn cause(&self) -> Option<&dyn error::Error> {
match *self {
OggReadError::ReadError(ref err) => Some(err as &dyn error::Error),
_ => None
}
}
}
impl Display for OggReadError {
fn fmt(&self, fmt :&mut Formatter) -> Result<(), FmtError> {
write!(fmt, "{}", Self::description_str(self))
}
}
impl From<io::Error> for OggReadError {
fn from(err :io::Error) -> OggReadError {
return OggReadError::ReadError(err);
}
}
struct PageBaseInfo {
starts_with_continued :bool,
first_page :bool,
last_page :bool,
absgp :u64,
sequence_num :u32,
checksum :u32,
packet_positions :Vec<(u16,u16)>,
ends_with_continued :bool,
}
struct PageInfo {
bi :PageBaseInfo,
packet_idx :u8,
page_body :Vec<u8>,
last_overlap_pck :Vec<Vec<u8>>,
}
impl PageInfo {
fn is_first_pck_in_pg(&self) -> bool {
return self.packet_idx == 0;
}
fn is_last_pck_in_pg(&self) -> bool {
return (self.packet_idx + 1 + (self.bi.ends_with_continued as u8)) as usize
== self.bi.packet_positions.len();
}
}
pub struct OggPage(PageParser);
impl OggPage {
fn has_packet_end(&self) -> bool {
(self.0.bi.packet_positions.len() -
self.0.bi.ends_with_continued as usize) > 0
}
fn has_whole_packet(&self) -> bool {
self.0.bi.packet_positions.len().saturating_sub(
self.0.bi.ends_with_continued as usize +
self.0.bi.starts_with_continued as usize) > 0
}
fn has_packet_start(&self) -> bool {
(self.0.bi.packet_positions.len() -
self.0.bi.starts_with_continued as usize) > 0
}
}
#[derive(Debug, Clone)]
#[non_exhaustive] pub struct PageParsingOptions {
pub verify_checksum :bool,
}
impl Default for PageParsingOptions {
fn default() -> Self {
Self {
verify_checksum: !cfg!(fuzzing),
}
}
}
pub struct PageParser {
bi :PageBaseInfo,
stream_serial :u32,
checksum :u32,
header_buf: [u8; 27],
packet_count :u16, segments_or_packets_buf :Vec<u8>,
parse_opts :Arc<PageParsingOptions>,
}
impl PageParser {
pub fn new(header_buf :[u8; 27]) -> Result<(PageParser, usize), OggReadError> {
Self::new_with_parse_opts(header_buf, PageParsingOptions::default())
}
pub fn new_with_parse_opts(header_buf :[u8; 27], parse_opts :impl Into<Arc<PageParsingOptions>>) -> Result<(PageParser, usize), OggReadError> {
let mut header_rdr = Cursor::new(header_buf);
header_rdr.set_position(4);
let stream_structure_version = tri!(header_rdr.read_u8());
if stream_structure_version != 0 {
tri!(Err(OggReadError::InvalidStreamStructVer(stream_structure_version)));
}
let header_type_flag = header_rdr.read_u8().unwrap();
let absgp = header_rdr.read_u64::<LittleEndian>().unwrap();
let stream_serial = header_rdr.read_u32::<LittleEndian>().unwrap();
let sequence_num = header_rdr.read_u32::<LittleEndian>().unwrap();
let checksum = header_rdr.read_u32::<LittleEndian>().unwrap();
Ok((PageParser {
bi : PageBaseInfo {
starts_with_continued : header_type_flag & 0x01u8 != 0,
first_page : header_type_flag & 0x02u8 != 0,
last_page : header_type_flag & 0x04u8 != 0,
absgp,
sequence_num,
checksum,
packet_positions : Vec::new(),
ends_with_continued : false,
},
stream_serial,
checksum,
header_buf,
packet_count : 0,
segments_or_packets_buf :Vec::new(),
parse_opts: parse_opts.into(),
},
header_rdr.read_u8().unwrap() as usize
))
}
pub fn parse_segments(&mut self, segments_buf :Vec<u8>) -> usize {
let mut page_siz :u16 = 0; self.bi.ends_with_continued = self.bi.starts_with_continued;
for val in &segments_buf {
page_siz += *val as u16;
self.packet_count += (*val < 255) as u16;
self.bi.ends_with_continued = !(*val < 255);
}
let mut packets = Vec::with_capacity(self.packet_count as usize
+ self.bi.ends_with_continued as usize);
let mut cur_packet_siz :u16 = 0;
let mut cur_packet_offs :u16 = 0;
for val in &segments_buf {
cur_packet_siz += *val as u16;
if *val < 255 {
packets.push((cur_packet_offs, cur_packet_siz));
cur_packet_offs += cur_packet_siz;
cur_packet_siz = 0;
}
}
if self.bi.ends_with_continued {
packets.push((cur_packet_offs, cur_packet_siz));
}
self.bi.packet_positions = packets;
self.segments_or_packets_buf = segments_buf;
page_siz as usize
}
pub fn parse_packet_data(mut self, packet_data :Vec<u8>) ->
Result<OggPage, OggReadError> {
if self.parse_opts.verify_checksum {
self.header_buf[22] = 0;
self.header_buf[23] = 0;
self.header_buf[24] = 0;
self.header_buf[25] = 0;
let mut hash_calculated :u32;
hash_calculated = vorbis_crc32_update(0, &self.header_buf);
hash_calculated = vorbis_crc32_update(hash_calculated,
&self.segments_or_packets_buf);
hash_calculated = vorbis_crc32_update(hash_calculated, &packet_data);
if self.checksum != hash_calculated {
tri!(Err(OggReadError::HashMismatch(self.checksum, hash_calculated)));
}
}
self.segments_or_packets_buf = packet_data;
Ok(OggPage(self))
}
}
pub struct BasePacketReader {
page_infos :HashMap<u32, PageInfo>,
stream_with_stuff :Option<u32>,
has_seeked :bool,
}
impl BasePacketReader {
pub fn new() -> Self {
BasePacketReader { page_infos: HashMap::new(),
stream_with_stuff: None, has_seeked: false }
}
pub fn read_packet(&mut self) -> Option<Packet> {
if self.stream_with_stuff == None {
return None;
}
let str_serial :u32 = self.stream_with_stuff.unwrap();
let pg_info = self.page_infos.get_mut(&str_serial).unwrap();
let (offs, len) = pg_info.bi.packet_positions[pg_info.packet_idx as usize];
let need_to_glue = pg_info.packet_idx == 0 &&
pg_info.bi.starts_with_continued &&
!(pg_info.bi.ends_with_continued && pg_info.bi.packet_positions.len() == 1);
let packet_content :Vec<u8> = if need_to_glue {
let mut siz :usize = 0;
for pck in pg_info.last_overlap_pck.iter() {
siz += pck.len();
}
siz += len as usize;
let mut cont :Vec<u8> = Vec::with_capacity(siz);
for pck in pg_info.last_overlap_pck.iter() {
cont.write_all(pck).unwrap();
}
pg_info.last_overlap_pck = Vec::new();
cont.write_all(&pg_info.page_body[offs as usize .. (offs + len) as usize]).unwrap();
cont
} else {
let mut cont :Vec<u8> = Vec::with_capacity(len as usize);
let cont_slice :&[u8] = &pg_info.page_body[offs as usize .. (offs + len) as usize];
cont.write_all(cont_slice).unwrap();
cont
};
let first_pck_in_pg = pg_info.is_first_pck_in_pg();
let first_pck_overall = pg_info.bi.first_page && first_pck_in_pg;
let last_pck_in_pg = pg_info.is_last_pck_in_pg();
let last_pck_overall = pg_info.bi.last_page && last_pck_in_pg;
pg_info.packet_idx += 1;
if last_pck_in_pg {
self.stream_with_stuff = None;
}
return Some(Packet {
data: packet_content,
first_packet_pg: first_pck_in_pg,
first_packet_stream: first_pck_overall,
last_packet_pg: last_pck_in_pg,
last_packet_stream: last_pck_overall,
absgp_page: pg_info.bi.absgp,
checksum_page: pg_info.bi.checksum,
stream_serial: str_serial,
});
}
pub fn push_page(&mut self, page :OggPage) -> Result<(), OggReadError> {
let mut pg_prs = page.0;
match self.page_infos.entry(pg_prs.stream_serial) {
Entry::Occupied(mut o) => {
let inf = o.get_mut();
if pg_prs.bi.first_page {
tri!(Err(OggReadError::InvalidData));
}
if pg_prs.bi.starts_with_continued != inf.bi.ends_with_continued {
if !self.has_seeked {
tri!(Err(OggReadError::InvalidData));
} else {
inf.last_overlap_pck.clear();
if pg_prs.bi.starts_with_continued {
pg_prs.bi.packet_positions.remove(0);
if pg_prs.packet_count != 0 {
pg_prs.packet_count -= 1;
} else {
pg_prs.bi.ends_with_continued = false;
}
}
}
} else if pg_prs.bi.starts_with_continued {
let (offs, len) = inf.bi.packet_positions[inf.packet_idx as usize];
if len as usize != inf.page_body.len() {
let mut tmp = Vec::with_capacity(len as usize);
tmp.write_all(&inf.page_body[offs as usize .. (offs + len) as usize]).unwrap();
inf.last_overlap_pck.push(tmp);
} else {
inf.last_overlap_pck.push(replace(&mut inf.page_body, vec![0;0]));
}
}
inf.bi = pg_prs.bi;
inf.packet_idx = 0;
inf.page_body = pg_prs.segments_or_packets_buf;
},
Entry::Vacant(v) => {
if !self.has_seeked {
if !pg_prs.bi.first_page || pg_prs.bi.starts_with_continued {
tri!(Err(OggReadError::InvalidData));
}
} else {
if !pg_prs.bi.first_page {
}
if pg_prs.bi.starts_with_continued {
pg_prs.bi.packet_positions.remove(0);
if pg_prs.packet_count != 0 {
pg_prs.packet_count -= 1;
} else {
pg_prs.bi.ends_with_continued = false;
}
pg_prs.bi.starts_with_continued = false;
}
}
v.insert(PageInfo {
bi : pg_prs.bi,
packet_idx: 0,
page_body: pg_prs.segments_or_packets_buf,
last_overlap_pck: Vec::new(),
});
},
}
let pg_has_stuff :bool = pg_prs.packet_count > 0;
if pg_has_stuff {
self.stream_with_stuff = Some(pg_prs.stream_serial);
} else {
self.stream_with_stuff = None;
}
return Ok(());
}
pub fn update_after_seek(&mut self) {
self.stream_with_stuff = None;
self.page_infos = HashMap::new();
self.has_seeked = true;
}
}
#[derive(Clone, Copy)]
enum UntilPageHeaderReaderMode {
Searching,
FoundWithNeeded(u8),
SeekNeeded(i32),
Found,
}
enum UntilPageHeaderResult {
Eof,
Found,
ReadNeeded,
SeekNeeded,
}
struct UntilPageHeaderReader {
mode :UntilPageHeaderReaderMode,
cpt_of :u8,
ret_buf :[u8; 27],
read_amount :usize,
}
impl UntilPageHeaderReader {
pub fn new() -> Self {
UntilPageHeaderReader {
mode : UntilPageHeaderReaderMode::Searching,
cpt_of : 0,
ret_buf : [0; 27],
read_amount : 0,
}
}
fn check_arr(&mut self, arr :&[u8]) -> Option<usize> {
for (i, ch) in arr.iter().enumerate() {
match *ch {
b'O' => self.cpt_of = 1,
b'g' if self.cpt_of == 1 || self.cpt_of == 2 => self.cpt_of += 1,
b'S' if self.cpt_of == 3 => return Some(i),
_ => self.cpt_of = 0,
}
}
return None;
}
pub fn do_read<R :Read>(&mut self, mut rdr :R)
-> Result<UntilPageHeaderResult, OggReadError> {
use self::UntilPageHeaderReaderMode::*;
use self::UntilPageHeaderResult as Res;
let mut buf :[u8; 1024] = [0; 1024];
let rd_len = tri!(rdr.read(if self.read_amount < 27 {
&mut buf[0 .. 27 - self.read_amount]
} else {
match self.mode {
Searching => &mut buf,
FoundWithNeeded(amount) => &mut buf[0 .. amount as usize],
SeekNeeded(_) => return Ok(Res::SeekNeeded),
Found => return Ok(Res::Found),
}
}));
if rd_len == 0 {
return Ok(Res::Eof);
}
self.read_amount += rd_len;
let read_amount_max = 150 * 1024;
if self.read_amount > read_amount_max {
tri!(Err(OggReadError::NoCapturePatternFound));
}
let rd_buf = &buf[0 .. rd_len];
use std::cmp::min;
let (off, needed) = match self.mode {
Searching => match self.check_arr(rd_buf) {
Some(off) => {
self.ret_buf[0] = b'O';
self.ret_buf[1] = b'g';
self.ret_buf[2] = b'g';
self.ret_buf[3] = b'S'; (off, 24)
},
None => return Ok(Res::ReadNeeded),
},
FoundWithNeeded(needed) => {
(0, needed as usize)
},
_ => unimplemented!(),
};
let fnd_buf = &rd_buf[off..];
let copy_amount = min(needed, fnd_buf.len());
let start_fill = 27 - needed;
(self.ret_buf[start_fill .. copy_amount + start_fill])
.copy_from_slice(&fnd_buf[0 .. copy_amount]);
#[allow(clippy::comparison_chain)]
if fnd_buf.len() == needed {
self.mode = Found;
return Ok(Res::Found);
} else if fnd_buf.len() < needed {
let needed_new = needed - copy_amount;
self.mode = FoundWithNeeded(needed_new as u8);
return Ok(Res::ReadNeeded);
} else {
self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32);
return Ok(Res::SeekNeeded);
}
}
pub fn do_seek<S :Seek>(&mut self, mut skr :S)
-> Result<UntilPageHeaderResult, OggReadError> {
use self::UntilPageHeaderReaderMode::*;
use self::UntilPageHeaderResult as Res;
match self.mode {
Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded),
SeekNeeded(offs) => {
tri!(skr.seek(SeekFrom::Current(offs as i64)));
self.mode = Found;
Ok(Res::Found)
},
Found => Ok(Res::Found),
}
}
pub fn into_header(self) -> [u8; 27] {
use self::UntilPageHeaderReaderMode::*;
match self.mode {
Found => self.ret_buf,
_ => panic!("wrong mode"),
}
}
}
pub struct PacketReader<T :io::Read + io::Seek> {
rdr :T,
pg_parse_opts :Arc<PageParsingOptions>,
base_pck_rdr :BasePacketReader,
read_some_pg :bool
}
impl<T :io::Read + io::Seek> PacketReader<T> {
pub fn new(rdr :T) -> PacketReader<T> {
Self::new_with_page_parse_opts(rdr, PageParsingOptions::default())
}
pub fn new_with_page_parse_opts(rdr :T, pg_parse_opts : impl Into<Arc<PageParsingOptions>>) -> PacketReader<T> {
PacketReader { rdr, pg_parse_opts: pg_parse_opts.into(), base_pck_rdr : BasePacketReader::new(), read_some_pg : false }
}
pub fn into_inner(self) -> T {
self.rdr
}
pub const fn get_ref(&self) -> &T {
&self.rdr
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.rdr
}
pub fn read_packet(&mut self) -> Result<Option<Packet>, OggReadError> {
loop {
if let Some(pck) = self.base_pck_rdr.read_packet() {
return Ok(Some(pck));
}
let page = tri!(self.read_ogg_page());
match page {
Some(page) => tri!(self.base_pck_rdr.push_page(page)),
None => return Ok(None),
}
}
}
pub fn read_packet_expected(&mut self) -> Result<Packet, OggReadError> {
match tri!(self.read_packet()) {
Some(p) => Ok(p),
None => tri!(Err(Error::new(ErrorKind::UnexpectedEof,
"Expected ogg packet but found end of physical stream"))),
}
}
fn read_until_pg_header(&mut self) -> Result<Option<[u8; 27]>, OggReadError> {
let mut r = UntilPageHeaderReader::new();
use self::UntilPageHeaderResult::*;
let mut res = tri!(r.do_read(&mut self.rdr));
loop {
res = match res {
Eof => return Ok(None),
Found => {
self.read_some_pg = true;
break
},
ReadNeeded => tri!(r.do_read(&mut self.rdr)),
SeekNeeded => tri!(r.do_seek(&mut self.rdr))
}
}
Ok(Some(r.into_header()))
}
fn read_ogg_page(&mut self) -> Result<Option<OggPage>, OggReadError> {
let header_buf :[u8; 27] = match tri!(self.read_until_pg_header()) {
Some(s) => s,
None if self.read_some_pg => return Ok(None),
None => return Err(OggReadError::NoCapturePatternFound)
};
let (mut pg_prs, page_segments) = tri!(PageParser::new_with_parse_opts(header_buf, Arc::clone(&self.pg_parse_opts)));
let mut segments_buf = vec![0; page_segments]; tri!(self.rdr.read_exact(&mut segments_buf));
let page_siz = pg_prs.parse_segments(segments_buf);
let mut packet_data = vec![0; page_siz];
tri!(self.rdr.read_exact(&mut packet_data));
Ok(Some(tri!(pg_prs.parse_packet_data(packet_data))))
}
pub fn seek_bytes(&mut self, pos :SeekFrom) -> Result<u64, Error> {
let r = tri!(self.rdr.seek(pos));
self.base_pck_rdr.update_after_seek();
return Ok(r);
}
pub fn seek_absgp(&mut self, stream_serial :Option<u32>,
pos_goal :u64) -> Result<bool, OggReadError> {
macro_rules! found {
($pos:expr) => {{
tri!(self.rdr.seek(SeekFrom::Start($pos)));
self.base_pck_rdr.update_after_seek();
return Ok(true);
}};
}
macro_rules! bt {
($e:expr) => {{
match tri!($e) {
Some(s) => s,
None => return Ok(false),
}
}};
}
macro_rules! pg_read_until_end_or_goal {
{$goal:expr} => {{
let mut pos;
let mut pg;
loop {
let (n_pos, n_pg) = pg_read_match_serial!();
pos = n_pos;
pg = n_pg;
if pg.0.bi.absgp == $goal {
found!(pos);
}
if pg.0.bi.absgp > $goal {
break;
}
if pg.0.bi.last_page {
return Ok(false)
}
}
(pos, pg)
}};
}
macro_rules! pg_read_match_serial {
{} => {{
let mut pos;
let mut pg;
let mut continued_pck_start = None;
loop {
pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
pg = bt!(self.read_ogg_page());
match stream_serial {
Some(s) if pg.0.stream_serial != s => (),
_ => match continued_pck_start {
None if pg.has_whole_packet() => break,
None if pg.has_packet_start() => {
continued_pck_start = Some(pos);
},
Some(s) if pg.has_packet_end() => {
pos = s;
break;
},
_ => (),
},
}
}
(pos, pg)
}};
}
let ab_of = |pg :&OggPage| { pg.0.bi.absgp };
let seq_of = |pg :&OggPage| { pg.0.bi.sequence_num };
tri!(self.rdr.seek(SeekFrom::Start(0)));
let (mut begin_pos, mut begin_pg) = pg_read_match_serial!();
if pos_goal == 0 {
found!(begin_pos);
}
tri!(seek_before_end(&mut self.rdr, 200 * 1024));
let (mut end_pos, mut end_pg) = pg_read_until_end_or_goal!(pos_goal);
loop {
if seq_of(&end_pg) - seq_of(&begin_pg) <= 1 {
found!(end_pos);
}
let pos_to_seek = begin_pos + (end_pos - begin_pos) / 2;
tri!(self.rdr.seek(SeekFrom::Start(pos_to_seek)));
let (pos, pg) = pg_read_match_serial!();
if seq_of(&end_pg) == seq_of(&pg) ||
seq_of(&begin_pg) == seq_of(&pg) {
let mut pos;
let mut pg;
let mut last_packet_end_pos = begin_pos;
tri!(self.rdr.seek(SeekFrom::Start(begin_pos)));
loop {
pos = tri!(self.rdr.stream_position());
pg = bt!(self.read_ogg_page());
match stream_serial {
Some(s) if pg.0.stream_serial != s => (),
_ if ab_of(&pg) == -1i64 as u64 => (),
_ if ab_of(&pg) >= pos_goal => found!(last_packet_end_pos),
_ => if pg.has_packet_end() {
last_packet_end_pos = pos;
},
}
}
}
if ab_of(&pg) >= pos_goal {
end_pos = pos;
end_pg = pg;
} else {
begin_pos = pos;
begin_pg = pg;
}
}
}
pub fn delete_unread_packets(&mut self) {
self.base_pck_rdr.update_after_seek();
}
}
fn seek_before_end<T :io::Read + io::Seek>(mut rdr :T,
offs :u64) -> Result<u64, OggReadError> {
let end_pos = tri!(rdr.seek(SeekFrom::End(0)));
let end_pos_to_seek = ::std::cmp::min(end_pos, offs);
return Ok(tri!(rdr.seek(SeekFrom::End(-(end_pos_to_seek as i64)))));
}
#[cfg(feature = "async")]
pub mod async_api {
use std::pin::Pin;
use std::task::{Context, Poll};
use super::*;
use futures_core::{ready, Stream};
use futures_io::AsyncRead as FuturesAsyncRead;
use tokio::io::AsyncRead as TokioAsyncRead;
use bytes::BytesMut;
use pin_project::pin_project;
use tokio_util::codec::{Decoder, FramedRead};
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
enum PageDecodeState {
Head,
Segments(PageParser, usize),
PacketData(PageParser, usize),
InUpdate,
}
impl PageDecodeState {
fn needed_size(&self) -> usize {
match self {
PageDecodeState::Head => 27,
PageDecodeState::Segments(_, s) => *s,
PageDecodeState::PacketData(_, s) => *s,
PageDecodeState::InUpdate => panic!("invalid state"),
}
}
}
struct PageDecoder {
state : PageDecodeState,
parse_opts : Arc<PageParsingOptions>,
}
impl PageDecoder {
fn new(parse_opts : impl Into<Arc<PageParsingOptions>>) -> Self {
PageDecoder {
state : PageDecodeState::Head,
parse_opts : parse_opts.into(),
}
}
}
impl Decoder for PageDecoder {
type Item = OggPage;
type Error = OggReadError;
fn decode(&mut self, buf :&mut BytesMut) ->
Result<Option<OggPage>, OggReadError> {
use self::PageDecodeState::*;
loop {
let needed_size = self.state.needed_size();
if buf.len() < needed_size {
return Ok(None);
}
let mut ret = None;
let consumed_buf = buf.split_to(needed_size).to_vec();
self.state = match ::std::mem::replace(&mut self.state, InUpdate) {
Head => {
let mut hdr_buf = [0; 27];
hdr_buf.copy_from_slice(&consumed_buf);
let tup = tri!(PageParser::new_with_parse_opts(hdr_buf, Arc::clone(&self.parse_opts)));
Segments(tup.0, tup.1)
},
Segments(mut pg_prs, _) => {
let new_needed_len = pg_prs.parse_segments(consumed_buf);
PacketData(pg_prs, new_needed_len)
},
PacketData(pg_prs, _) => {
ret = Some(tri!(pg_prs.parse_packet_data(consumed_buf)));
Head
},
InUpdate => panic!("invalid state"),
};
if ret.is_some() {
return Ok(ret);
}
}
}
fn decode_eof(&mut self, buf :&mut BytesMut) ->
Result<Option<OggPage>, OggReadError> {
return self.decode(buf);
}
}
#[pin_project]
pub struct PacketReader<T> where T :TokioAsyncRead {
base_pck_rdr :BasePacketReader,
#[pin]
pg_rd :FramedRead<T, PageDecoder>,
}
impl<T :TokioAsyncRead> PacketReader<T> {
pub fn new(inner :T) -> Self {
Self::new_with_page_parse_opts(inner, PageParsingOptions::default())
}
pub fn new_with_page_parse_opts(inner :T, pg_parse_opts :impl Into<Arc<PageParsingOptions>>) -> Self {
PacketReader {
base_pck_rdr : BasePacketReader::new(),
pg_rd : FramedRead::new(inner, PageDecoder::new(pg_parse_opts)),
}
}
}
impl<T :FuturesAsyncRead> PacketReader<Compat<T>> {
pub fn new_compat(inner :T) -> Self {
Self::new_compat_with_page_parse_opts(inner, PageParsingOptions::default())
}
pub fn new_compat_with_page_parse_opts(inner :T, pg_parse_opts :impl Into<Arc<PageParsingOptions>>) -> Self {
Self::new_with_page_parse_opts(inner.compat(), pg_parse_opts)
}
}
impl<T :TokioAsyncRead> Stream for PacketReader<T> {
type Item = Result<Packet, OggReadError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(pck) = this.base_pck_rdr.read_packet() {
return Poll::Ready(Some(Ok(pck)));
}
let page = match ready!(this.pg_rd.as_mut().poll_next(cx)) {
Some(Ok(page)) => page,
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
None => return Poll::Ready(None),
};
match this.base_pck_rdr.push_page(page) {
Ok(_) => {},
Err(err) => return Poll::Ready(Some(Err(err))),
};
}
}
}
}