1use std::collections::VecDeque;
6use std::io::Read;
7
8use crate::blocks::SbfBlock;
9use crate::crc::validate_block;
10use crate::error::{SbfError, SbfResult};
11use crate::header::{SbfHeader, SBF_SYNC};
12
13const DEFAULT_BUFFER_CAPACITY: usize = 65536;
15
16const MAX_BUFFER_SIZE: usize = 131072;
18
19pub struct SbfReader<R: Read> {
40 inner: R,
41 buffer: VecDeque<u8>,
42 validate_crc: bool,
44 stats: ReaderStats,
46}
47
48#[derive(Debug, Clone, Default)]
50pub struct ReaderStats {
51 pub bytes_read: u64,
53 pub blocks_parsed: u64,
55 pub crc_errors: u64,
57 pub parse_errors: u64,
59 pub bytes_skipped: u64,
61}
62
63impl<R: Read> SbfReader<R> {
64 pub fn new(reader: R) -> Self {
66 Self {
67 inner: reader,
68 buffer: VecDeque::with_capacity(DEFAULT_BUFFER_CAPACITY),
69 validate_crc: true,
70 stats: ReaderStats::default(),
71 }
72 }
73
74 pub fn with_capacity(reader: R, capacity: usize) -> Self {
76 Self {
77 inner: reader,
78 buffer: VecDeque::with_capacity(capacity),
79 validate_crc: true,
80 stats: ReaderStats::default(),
81 }
82 }
83
84 pub fn validate_crc(mut self, validate: bool) -> Self {
86 self.validate_crc = validate;
87 self
88 }
89
90 pub fn stats(&self) -> &ReaderStats {
92 &self.stats
93 }
94
95 pub fn reset_stats(&mut self) {
97 self.stats = ReaderStats::default();
98 }
99
100 pub fn read_block(&mut self) -> SbfResult<Option<SbfBlock>> {
106 loop {
107 if let Some(sync_pos) = self.find_sync() {
109 if sync_pos > 0 {
111 self.stats.bytes_skipped += sync_pos as u64;
112 self.buffer.drain(0..sync_pos);
113 }
114
115 match self.try_parse_block() {
117 Ok(Some((block, consumed))) => {
118 self.buffer.drain(0..consumed);
120 self.stats.blocks_parsed += 1;
121 return Ok(Some(block));
122 }
123 Ok(None) => {
124 if !self.fill_buffer()? {
126 if self.buffer.len() > 0 {
128 return Err(SbfError::IncompleteBlock {
130 needed: 8,
131 have: self.buffer.len(),
132 });
133 }
134 return Ok(None);
135 }
136 }
137 Err(SbfError::InvalidSync) => {
138 self.buffer.remove(0);
140 self.stats.bytes_skipped += 1;
141 }
142 Err(SbfError::CrcMismatch { .. }) => {
143 self.buffer.remove(0);
145 self.stats.crc_errors += 1;
146 self.stats.bytes_skipped += 1;
147 }
148 Err(_e) => {
149 self.buffer.remove(0);
151 self.stats.parse_errors += 1;
152 self.stats.bytes_skipped += 1;
153 }
155 }
156 } else {
157 if !self.fill_buffer()? {
159 return Ok(None);
160 }
161 }
162
163 self.trim_buffer();
165 }
166 }
167
168 fn find_sync(&self) -> Option<usize> {
170 if self.buffer.len() < 2 {
171 return None;
172 }
173
174 (0..(self.buffer.len() - 1))
175 .find(|&i| self.buffer[i] == SBF_SYNC[0] && self.buffer[i + 1] == SBF_SYNC[1])
176 }
177
178 fn try_parse_block(&mut self) -> SbfResult<Option<(SbfBlock, usize)>> {
180 if self.buffer.len() < 8 {
181 return Ok(None);
182 }
183
184 let buffer = self.buffer.make_contiguous();
185
186 let header = SbfHeader::parse(&buffer[2..])?;
188 let total_len = header.length as usize;
189
190 if buffer.len() < total_len {
191 return Ok(None);
192 }
193
194 if self.validate_crc && !validate_block(&buffer[..total_len]) {
196 let stored_crc = u16::from_le_bytes([self.buffer[2], self.buffer[3]]);
198 return Err(SbfError::CrcMismatch {
199 expected: stored_crc,
200 actual: 0, });
202 }
203
204 let (block, consumed) = SbfBlock::parse(&buffer[..total_len])?;
206
207 Ok(Some((block, consumed)))
208 }
209
210 fn fill_buffer(&mut self) -> SbfResult<bool> {
212 let mut temp = [0u8; 4096];
213 match self.inner.read(&mut temp) {
214 Ok(0) => Ok(false), Ok(n) => {
216 self.buffer.extend(&temp[..n]);
217 self.stats.bytes_read += n as u64;
218 Ok(true)
219 }
220 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
221 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => self.fill_buffer(),
222 Err(e) => Err(SbfError::Io(e)),
223 }
224 }
225
226 fn trim_buffer(&mut self) {
228 if self.buffer.capacity() > MAX_BUFFER_SIZE && self.buffer.len() < MAX_BUFFER_SIZE / 2 {
229 self.buffer.shrink_to_fit();
230 }
231 }
232}
233
234impl<R: Read> Iterator for SbfReader<R> {
236 type Item = SbfResult<SbfBlock>;
237
238 fn next(&mut self) -> Option<Self::Item> {
239 match self.read_block() {
240 Ok(Some(block)) => Some(Ok(block)),
241 Ok(None) => None,
242 Err(e) => Some(Err(e)),
243 }
244 }
245}
246
247pub trait SbfReadExt: Read + Sized {
249 fn sbf_blocks(self) -> SbfReader<Self> {
251 SbfReader::new(self)
252 }
253}
254
255impl<R: Read> SbfReadExt for R {}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use std::io::Cursor;
261
262 #[test]
263 fn test_reader_empty() {
264 let data: &[u8] = &[];
265 let mut reader = SbfReader::new(Cursor::new(data));
266
267 assert!(reader.read_block().unwrap().is_none());
268 }
269
270 #[test]
271 fn test_reader_no_sync() {
272 let data = [0x00, 0x00, 0x00, 0x00];
273 let mut reader = SbfReader::new(Cursor::new(&data[..]));
274
275 assert!(reader.read_block().unwrap().is_none());
276 }
277
278 #[test]
279 fn test_reader_stats() {
280 let data: &[u8] = &[0x00, 0x00];
281 let mut reader = SbfReader::new(Cursor::new(data));
282
283 let _ = reader.read_block();
284
285 assert_eq!(reader.stats().bytes_read, 2);
286 }
287
288 #[test]
289 fn test_sbf_read_ext() {
290 let data: &[u8] = &[];
291 let reader = Cursor::new(data).sbf_blocks();
292
293 assert!(reader.validate_crc);
294 }
295}