use crate::{
Buffer,
h3::{
H3Error,
frame::{Frame, FrameDecodeError},
},
};
use futures_lite::{AsyncRead, AsyncReadExt, io as async_io};
use std::io;
#[derive(Debug)]
pub struct FrameStream<'a, R> {
reader: &'a mut R,
buf: &'a mut Buffer,
pending_skip: u64,
}
impl<'a, R: AsyncRead + Unpin> FrameStream<'a, R> {
pub fn new(reader: &'a mut R, buf: &'a mut Buffer) -> Self {
Self {
reader,
buf,
pending_skip: 0,
}
}
pub async fn next(&mut self) -> Result<Option<ActiveFrame<'_, 'a, R>>, H3Error> {
if self.pending_skip > 0 {
let skip = self.pending_skip;
self.pending_skip = 0;
self.skip_bytes(skip).await?;
}
loop {
match Frame::decode(self.buf) {
Ok((Frame::Unknown(len), consumed)) => {
log::trace!("skipping unknown frame, payload length {len}");
self.skip_bytes(len + consumed as u64).await?;
continue;
}
Ok((frame, consumed)) => {
self.buf.ignore_front(consumed);
let remaining = match &frame {
Frame::Data(len) | Frame::Headers(len) => *len,
Frame::PushPromise {
field_section_length,
..
} => *field_section_length,
_ => 0, };
return Ok(Some(ActiveFrame {
stream: self,
frame,
remaining,
}));
}
Err(FrameDecodeError::Incomplete) => {}
Err(FrameDecodeError::Error(code)) => return Err(code.into()),
}
if !self.read_more().await? {
return Ok(None);
}
}
}
async fn read_more(&mut self) -> io::Result<bool> {
let before = self.buf.len();
self.buf.expand();
let n = self.reader.read(&mut self.buf[before..]).await?;
self.buf.truncate(before + n);
Ok(n > 0)
}
async fn skip_bytes(&mut self, n: u64) -> io::Result<()> {
let from_buf = usize::try_from(n)
.unwrap_or(self.buf.len())
.min(self.buf.len());
self.buf.ignore_front(from_buf);
let remaining = n - from_buf as u64;
if remaining > 0 {
let copied =
async_io::copy((&mut self.reader).take(remaining), async_io::sink()).await?;
if copied < remaining {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"stream ended mid-frame payload",
));
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct ActiveFrame<'b, 'a, R> {
stream: &'b mut FrameStream<'a, R>,
frame: Frame,
remaining: u64,
}
impl<R: AsyncRead + Unpin> ActiveFrame<'_, '_, R> {
pub fn frame(&self) -> &Frame {
&self.frame
}
pub async fn buffer_payload(&mut self) -> io::Result<&[u8]> {
let len = usize::try_from(self.remaining).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"frame payload too large to buffer",
)
})?;
while self.stream.buf.len() < len {
if !self.stream.read_more().await? {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"stream ended mid-frame payload",
));
}
}
Ok(&self.stream.buf[..len])
}
}
impl<R> Drop for ActiveFrame<'_, '_, R> {
fn drop(&mut self) {
let in_buf = usize::try_from(self.remaining)
.unwrap_or(self.stream.buf.len())
.min(self.stream.buf.len());
self.stream.buf.ignore_front(in_buf);
self.stream.pending_skip = self.remaining - in_buf as u64;
}
}