use std::io::Read;
use crate::blocks::SbfBlock;
use crate::crc::validate_block;
use crate::error::{SbfError, SbfResult};
use crate::header::{SbfHeader, SBF_SYNC};
const DEFAULT_BUFFER_CAPACITY: usize = 65536;
const MAX_BUFFER_SIZE: usize = 131072;
pub struct SbfReader<R: Read> {
inner: R,
buffer: Vec<u8>,
valid_bytes: usize,
validate_crc: bool,
stats: ReaderStats,
}
#[derive(Debug, Clone, Default)]
pub struct ReaderStats {
pub bytes_read: u64,
pub blocks_parsed: u64,
pub crc_errors: u64,
pub parse_errors: u64,
pub bytes_skipped: u64,
}
impl<R: Read> SbfReader<R> {
pub fn new(reader: R) -> Self {
Self {
inner: reader,
buffer: Vec::with_capacity(DEFAULT_BUFFER_CAPACITY),
valid_bytes: 0,
validate_crc: true,
stats: ReaderStats::default(),
}
}
pub fn with_capacity(reader: R, capacity: usize) -> Self {
Self {
inner: reader,
buffer: Vec::with_capacity(capacity),
valid_bytes: 0,
validate_crc: true,
stats: ReaderStats::default(),
}
}
pub fn validate_crc(mut self, validate: bool) -> Self {
self.validate_crc = validate;
self
}
pub fn stats(&self) -> &ReaderStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = ReaderStats::default();
}
pub fn read_block(&mut self) -> SbfResult<Option<SbfBlock>> {
loop {
if let Some(sync_pos) = self.find_sync() {
if sync_pos > 0 {
self.stats.bytes_skipped += sync_pos as u64;
self.buffer.drain(0..sync_pos);
self.valid_bytes -= sync_pos;
}
match self.try_parse_block() {
Ok(Some((block, consumed))) => {
self.buffer.drain(0..consumed);
self.valid_bytes -= consumed;
self.stats.blocks_parsed += 1;
return Ok(Some(block));
}
Ok(None) => {
if !self.fill_buffer()? {
if self.valid_bytes > 0 {
return Err(SbfError::IncompleteBlock {
needed: 8,
have: self.valid_bytes,
});
}
return Ok(None);
}
}
Err(SbfError::InvalidSync) => {
self.buffer.remove(0);
self.valid_bytes -= 1;
self.stats.bytes_skipped += 1;
}
Err(SbfError::CrcMismatch { .. }) => {
self.buffer.remove(0);
self.valid_bytes -= 1;
self.stats.crc_errors += 1;
self.stats.bytes_skipped += 1;
}
Err(_e) => {
self.buffer.remove(0);
self.valid_bytes -= 1;
self.stats.parse_errors += 1;
self.stats.bytes_skipped += 1;
}
}
} else {
if !self.fill_buffer()? {
return Ok(None);
}
}
self.trim_buffer();
}
}
fn find_sync(&self) -> Option<usize> {
if self.valid_bytes < 2 {
return None;
}
(0..(self.valid_bytes - 1))
.find(|&i| self.buffer[i] == SBF_SYNC[0] && self.buffer[i + 1] == SBF_SYNC[1])
}
fn try_parse_block(&mut self) -> SbfResult<Option<(SbfBlock, usize)>> {
if self.valid_bytes < 8 {
return Ok(None);
}
if self.buffer[0] != SBF_SYNC[0] || self.buffer[1] != SBF_SYNC[1] {
return Err(SbfError::InvalidSync);
}
let header = SbfHeader::parse(&self.buffer[2..])?;
let total_len = header.length as usize;
if self.valid_bytes < total_len {
return Ok(None);
}
if self.validate_crc && !validate_block(&self.buffer[..total_len]) {
let stored_crc = u16::from_le_bytes([self.buffer[2], self.buffer[3]]);
return Err(SbfError::CrcMismatch {
expected: stored_crc,
actual: 0, });
}
let (block, consumed) = SbfBlock::parse(&self.buffer[..total_len])?;
Ok(Some((block, consumed)))
}
fn fill_buffer(&mut self) -> SbfResult<bool> {
if self.buffer.len() < self.valid_bytes + 4096 {
self.buffer.resize(self.valid_bytes + 4096, 0);
}
let mut temp = [0u8; 4096];
match self.inner.read(&mut temp) {
Ok(0) => Ok(false), Ok(n) => {
if self.buffer.len() < self.valid_bytes + n {
self.buffer.resize(self.valid_bytes + n, 0);
}
self.buffer[self.valid_bytes..self.valid_bytes + n].copy_from_slice(&temp[..n]);
self.valid_bytes += n;
self.stats.bytes_read += n as u64;
Ok(true)
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => self.fill_buffer(),
Err(e) => Err(SbfError::Io(e)),
}
}
fn trim_buffer(&mut self) {
if self.buffer.len() > MAX_BUFFER_SIZE && self.valid_bytes < MAX_BUFFER_SIZE / 2 {
self.buffer.truncate(self.valid_bytes);
self.buffer.shrink_to_fit();
}
}
}
impl<R: Read> Iterator for SbfReader<R> {
type Item = SbfResult<SbfBlock>;
fn next(&mut self) -> Option<Self::Item> {
match self.read_block() {
Ok(Some(block)) => Some(Ok(block)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
pub trait SbfReadExt: Read + Sized {
fn sbf_blocks(self) -> SbfReader<Self> {
SbfReader::new(self)
}
}
impl<R: Read> SbfReadExt for R {}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_reader_empty() {
let data: &[u8] = &[];
let mut reader = SbfReader::new(Cursor::new(data));
assert!(reader.read_block().unwrap().is_none());
}
#[test]
fn test_reader_no_sync() {
let data = [0x00, 0x00, 0x00, 0x00];
let mut reader = SbfReader::new(Cursor::new(&data[..]));
assert!(reader.read_block().unwrap().is_none());
}
#[test]
fn test_reader_stats() {
let data: &[u8] = &[0x00, 0x00];
let mut reader = SbfReader::new(Cursor::new(data));
let _ = reader.read_block();
assert_eq!(reader.stats().bytes_read, 2);
}
#[test]
fn test_sbf_read_ext() {
let data: &[u8] = &[];
let reader = Cursor::new(data).sbf_blocks();
assert!(reader.validate_crc);
}
}