use std::io;
use thiserror::Error;
use crate::frame::{FrameType, FrameWriter, Header, HEADER_LEN};
use crate::rolling::{RollingReader, RollingWriter};
use crate::{BlockRead, BLOCK_NUM_BYTES};
pub struct FrameReader<R> {
reader: R,
cursor: usize,
block_corrupted: bool,
}
#[derive(Error, Debug)]
pub enum ReadFrameError {
#[error("Io error: {0}")]
IoError(#[from] io::Error),
#[error("Corruption in frame")]
Corruption,
#[error("Next frame not available")]
NotAvailable,
}
impl<R: BlockRead + Unpin> FrameReader<R> {
pub fn open(reader: R) -> Self {
FrameReader {
reader,
cursor: 0,
block_corrupted: false,
}
}
pub fn read(&self) -> &R {
&self.reader
}
fn num_bytes_to_end_of_block(&self) -> usize {
crate::BLOCK_NUM_BYTES - self.cursor
}
async fn go_to_next_block_if_necessary(&mut self) -> Result<(), ReadFrameError> {
let num_bytes_to_end_of_block = self.num_bytes_to_end_of_block();
let need_to_skip_block = self.block_corrupted || num_bytes_to_end_of_block < HEADER_LEN;
if !need_to_skip_block {
return Ok(());
}
if !self.reader.next_block().await? {
return Err(ReadFrameError::NotAvailable);
}
self.cursor = 0;
self.block_corrupted = false;
Ok(())
}
fn get_frame_header(&mut self) -> Result<Header, ReadFrameError> {
let header_bytes: &[u8] = &self.reader.block()[self.cursor..][..HEADER_LEN];
if header_bytes == [0u8; HEADER_LEN] {
return Err(ReadFrameError::NotAvailable);
}
match Header::deserialize(header_bytes) {
Some(header) => Ok(header),
None => {
self.block_corrupted = true;
Err(ReadFrameError::Corruption)
}
}
}
pub async fn read_frame(&mut self) -> Result<(FrameType, &[u8]), ReadFrameError> {
self.go_to_next_block_if_necessary().await?;
let header = self.get_frame_header()?;
self.cursor += HEADER_LEN;
if self.cursor + header.len() > BLOCK_NUM_BYTES {
self.block_corrupted = true;
return Err(ReadFrameError::Corruption);
}
let frame_payload = &self.reader.block()[self.cursor..][..header.len()];
self.cursor += header.len();
if !header.check(frame_payload) {
return Err(ReadFrameError::Corruption);
}
Ok((header.frame_type(), frame_payload))
}
}
impl FrameReader<RollingReader> {
pub async fn into_writer(self) -> io::Result<FrameWriter<RollingWriter>> {
let mut rolling_writer: RollingWriter = self.reader.into_writer().await?;
rolling_writer.forward(self.cursor).await?;
Ok(FrameWriter::create(rolling_writer))
}
}