use std::io;
use crate::error::ReadRecordError;
use crate::frame::{FrameReader, FrameWriter, ReadFrameError};
use crate::recordlog::RecordWriter;
use crate::rolling::{RollingReader, RollingWriter};
use crate::{BlockRead, Serializable};
pub struct RecordReader<R> {
frame_reader: FrameReader<R>,
record_buffer: Vec<u8>,
within_record: bool,
}
impl<R: BlockRead + Unpin> RecordReader<R> {
pub fn open(reader: R) -> Self {
let frame_reader = FrameReader::open(reader);
RecordReader {
frame_reader,
record_buffer: Vec::with_capacity(10_000),
within_record: false,
}
}
pub fn read(&self) -> &R {
self.frame_reader.read()
}
pub fn record<'a, S: Serializable<'a>>(&'a self) -> Option<S> {
S::deserialize(&self.record_buffer)
}
pub async fn read_record<'a, S: Serializable<'a>>(
&'a mut self,
) -> Result<Option<S>, ReadRecordError> {
let has_record = self.go_next().await?;
if has_record {
let record = self.record().ok_or(ReadRecordError::Corruption)?;
Ok(Some(record))
} else {
Ok(None)
}
}
pub async fn go_next(&mut self) -> Result<bool, ReadRecordError> {
loop {
let frame = self.frame_reader.read_frame().await;
match frame {
Ok((frame_type, frame_payload)) => {
if frame_type.is_first_frame_of_record() {
self.within_record = true;
self.record_buffer.clear();
}
if self.within_record {
self.record_buffer.extend_from_slice(frame_payload);
if frame_type.is_last_frame_of_record() {
self.within_record = false;
return Ok(true);
}
}
}
Err(ReadFrameError::Corruption) => {
self.within_record = false;
return Err(ReadRecordError::Corruption);
}
Err(ReadFrameError::IoError(io_err)) => {
self.within_record = false;
return Err(ReadRecordError::IoError(io_err));
}
Err(ReadFrameError::NotAvailable) => {
return Ok(false);
}
}
}
}
}
impl RecordReader<RollingReader> {
pub async fn into_writer(self) -> io::Result<RecordWriter<RollingWriter>> {
let frame_writer: FrameWriter<RollingWriter> = self.frame_reader.into_writer().await?;
Ok(RecordWriter::from(frame_writer))
}
}