use tokio::io::{AsyncRead, AsyncReadExt};
pub use crate::sans_io::linear_reader::LinearReaderOptions;
use crate::sans_io::{LinearReadEvent, LinearReader as SansIoReader};
use crate::McapResult;
pub struct LinearReader<R> {
source: R,
reader: SansIoReader,
}
impl<R> LinearReader<R>
where
R: AsyncRead + std::marker::Unpin,
{
pub fn new(reader: R) -> Self {
Self::new_with_options(reader, &LinearReaderOptions::default())
}
pub fn new_with_options(source: R, options: &LinearReaderOptions) -> Self {
Self {
reader: SansIoReader::new_with_options(options.clone()),
source,
}
}
pub fn into_inner(self) -> McapResult<R> {
Ok(self.source)
}
pub async fn next_record(&mut self, data: &mut Vec<u8>) -> Option<McapResult<u8>> {
while let Some(event) = self.reader.next_event() {
match event {
Ok(LinearReadEvent::ReadRequest(n)) => {
let written = match self.source.read(self.reader.insert(n)).await {
Ok(n) => n,
Err(err) => return Some(Err(err.into())),
};
self.reader.notify_read(written);
}
Ok(LinearReadEvent::Record {
data: content,
opcode,
}) => {
data.clear();
data.extend_from_slice(content);
return Some(Ok(opcode));
}
Err(err) => {
return Some(Err(err));
}
}
}
None
}
}