1use std::io::Read;
6
7use crate::blocks::SbfBlock;
8use crate::crc::validate_block;
9use crate::error::{SbfError, SbfResult};
10use crate::header::{SbfHeader, SBF_SYNC};
11
12const DEFAULT_BUFFER_CAPACITY: usize = 65536;
14
15const MAX_BUFFER_SIZE: usize = 131072;
17
18pub struct SbfReader<R: Read> {
39 inner: R,
40 buffer: Vec<u8>,
41 valid_bytes: usize,
43 validate_crc: bool,
45 stats: ReaderStats,
47}
48
49#[derive(Debug, Clone, Default)]
51pub struct ReaderStats {
52 pub bytes_read: u64,
54 pub blocks_parsed: u64,
56 pub crc_errors: u64,
58 pub parse_errors: u64,
60 pub bytes_skipped: u64,
62}
63
64impl<R: Read> SbfReader<R> {
65 pub fn new(reader: R) -> Self {
67 Self {
68 inner: reader,
69 buffer: Vec::with_capacity(DEFAULT_BUFFER_CAPACITY),
70 valid_bytes: 0,
71 validate_crc: true,
72 stats: ReaderStats::default(),
73 }
74 }
75
76 pub fn with_capacity(reader: R, capacity: usize) -> Self {
78 Self {
79 inner: reader,
80 buffer: Vec::with_capacity(capacity),
81 valid_bytes: 0,
82 validate_crc: true,
83 stats: ReaderStats::default(),
84 }
85 }
86
87 pub fn validate_crc(mut self, validate: bool) -> Self {
89 self.validate_crc = validate;
90 self
91 }
92
93 pub fn stats(&self) -> &ReaderStats {
95 &self.stats
96 }
97
98 pub fn reset_stats(&mut self) {
100 self.stats = ReaderStats::default();
101 }
102
103 pub fn read_block(&mut self) -> SbfResult<Option<SbfBlock>> {
109 loop {
110 if let Some(sync_pos) = self.find_sync() {
112 if sync_pos > 0 {
114 self.stats.bytes_skipped += sync_pos as u64;
115 self.buffer.drain(0..sync_pos);
116 self.valid_bytes -= sync_pos;
117 }
118
119 match self.try_parse_block() {
121 Ok(Some((block, consumed))) => {
122 self.buffer.drain(0..consumed);
124 self.valid_bytes -= consumed;
125 self.stats.blocks_parsed += 1;
126 return Ok(Some(block));
127 }
128 Ok(None) => {
129 if !self.fill_buffer()? {
131 if self.valid_bytes > 0 {
133 return Err(SbfError::IncompleteBlock {
135 needed: 8,
136 have: self.valid_bytes,
137 });
138 }
139 return Ok(None);
140 }
141 }
142 Err(SbfError::InvalidSync) => {
143 self.buffer.remove(0);
145 self.valid_bytes -= 1;
146 self.stats.bytes_skipped += 1;
147 }
148 Err(SbfError::CrcMismatch { .. }) => {
149 self.buffer.remove(0);
151 self.valid_bytes -= 1;
152 self.stats.crc_errors += 1;
153 self.stats.bytes_skipped += 1;
154 }
155 Err(_e) => {
156 self.buffer.remove(0);
158 self.valid_bytes -= 1;
159 self.stats.parse_errors += 1;
160 self.stats.bytes_skipped += 1;
161 }
163 }
164 } else {
165 if !self.fill_buffer()? {
167 return Ok(None);
168 }
169 }
170
171 self.trim_buffer();
173 }
174 }
175
176 fn find_sync(&self) -> Option<usize> {
178 if self.valid_bytes < 2 {
179 return None;
180 }
181
182 (0..(self.valid_bytes - 1))
183 .find(|&i| self.buffer[i] == SBF_SYNC[0] && self.buffer[i + 1] == SBF_SYNC[1])
184 }
185
186 fn try_parse_block(&mut self) -> SbfResult<Option<(SbfBlock, usize)>> {
188 if self.valid_bytes < 8 {
189 return Ok(None);
190 }
191
192 if self.buffer[0] != SBF_SYNC[0] || self.buffer[1] != SBF_SYNC[1] {
194 return Err(SbfError::InvalidSync);
195 }
196
197 let header = SbfHeader::parse(&self.buffer[2..])?;
199 let total_len = header.length as usize;
200
201 if self.valid_bytes < total_len {
202 return Ok(None);
203 }
204
205 if self.validate_crc && !validate_block(&self.buffer[..total_len]) {
207 let stored_crc = u16::from_le_bytes([self.buffer[2], self.buffer[3]]);
209 return Err(SbfError::CrcMismatch {
210 expected: stored_crc,
211 actual: 0, });
213 }
214
215 let (block, consumed) = SbfBlock::parse(&self.buffer[..total_len])?;
217
218 Ok(Some((block, consumed)))
219 }
220
221 fn fill_buffer(&mut self) -> SbfResult<bool> {
223 if self.buffer.len() < self.valid_bytes + 4096 {
225 self.buffer.resize(self.valid_bytes + 4096, 0);
226 }
227
228 let mut temp = [0u8; 4096];
229 match self.inner.read(&mut temp) {
230 Ok(0) => Ok(false), Ok(n) => {
232 if self.buffer.len() < self.valid_bytes + n {
234 self.buffer.resize(self.valid_bytes + n, 0);
235 }
236 self.buffer[self.valid_bytes..self.valid_bytes + n].copy_from_slice(&temp[..n]);
237 self.valid_bytes += n;
238 self.stats.bytes_read += n as u64;
239 Ok(true)
240 }
241 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
242 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => self.fill_buffer(),
243 Err(e) => Err(SbfError::Io(e)),
244 }
245 }
246
247 fn trim_buffer(&mut self) {
249 if self.buffer.len() > MAX_BUFFER_SIZE && self.valid_bytes < MAX_BUFFER_SIZE / 2 {
250 self.buffer.truncate(self.valid_bytes);
251 self.buffer.shrink_to_fit();
252 }
253 }
254}
255
256impl<R: Read> Iterator for SbfReader<R> {
258 type Item = SbfResult<SbfBlock>;
259
260 fn next(&mut self) -> Option<Self::Item> {
261 match self.read_block() {
262 Ok(Some(block)) => Some(Ok(block)),
263 Ok(None) => None,
264 Err(e) => Some(Err(e)),
265 }
266 }
267}
268
269pub trait SbfReadExt: Read + Sized {
271 fn sbf_blocks(self) -> SbfReader<Self> {
273 SbfReader::new(self)
274 }
275}
276
277impl<R: Read> SbfReadExt for R {}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use std::io::Cursor;
283
284 #[test]
285 fn test_reader_empty() {
286 let data: &[u8] = &[];
287 let mut reader = SbfReader::new(Cursor::new(data));
288
289 assert!(reader.read_block().unwrap().is_none());
290 }
291
292 #[test]
293 fn test_reader_no_sync() {
294 let data = [0x00, 0x00, 0x00, 0x00];
295 let mut reader = SbfReader::new(Cursor::new(&data[..]));
296
297 assert!(reader.read_block().unwrap().is_none());
298 }
299
300 #[test]
301 fn test_reader_stats() {
302 let data: &[u8] = &[0x00, 0x00];
303 let mut reader = SbfReader::new(Cursor::new(data));
304
305 let _ = reader.read_block();
306
307 assert_eq!(reader.stats().bytes_read, 2);
308 }
309
310 #[test]
311 fn test_sbf_read_ext() {
312 let data: &[u8] = &[];
313 let reader = Cursor::new(data).sbf_blocks();
314
315 assert!(reader.validate_crc);
316 }
317}