Skip to main content

sbf_tools/
reader.rs

1//! SBF stream/file reader
2//!
3//! Provides `SbfReader` for reading SBF blocks from any `Read` source.
4
5use std::io::Read;
6
7use crate::blocks::SbfBlock;
8use crate::crc::validate_block;
9use crate::error::{SbfError, SbfResult};
10use crate::header::{SbfHeader, SBF_SYNC};
11
12/// Default buffer capacity (64KB)
13const DEFAULT_BUFFER_CAPACITY: usize = 65536;
14
15/// Maximum buffer size before trimming (128KB)
16const MAX_BUFFER_SIZE: usize = 131072;
17
18/// SBF block reader
19///
20/// Reads SBF blocks from any source implementing `Read`.
21///
22/// # Example
23///
24/// ```no_run
25/// use std::fs::File;
26/// use sbf_tools::SbfReader;
27///
28/// let file = File::open("data.sbf").unwrap();
29/// let mut reader = SbfReader::new(file);
30///
31/// while let Some(result) = reader.next() {
32///     match result {
33///         Ok(block) => println!("Got block: {}", block.name()),
34///         Err(e) => eprintln!("Error: {}", e),
35///     }
36/// }
37/// ```
38pub struct SbfReader<R: Read> {
39    inner: R,
40    buffer: Vec<u8>,
41    /// Number of valid bytes in buffer
42    valid_bytes: usize,
43    /// Whether to validate CRC
44    validate_crc: bool,
45    /// Statistics
46    stats: ReaderStats,
47}
48
49/// Reader statistics
50#[derive(Debug, Clone, Default)]
51pub struct ReaderStats {
52    /// Total bytes read from source
53    pub bytes_read: u64,
54    /// Number of blocks successfully parsed
55    pub blocks_parsed: u64,
56    /// Number of CRC errors
57    pub crc_errors: u64,
58    /// Number of parse errors
59    pub parse_errors: u64,
60    /// Bytes skipped looking for sync
61    pub bytes_skipped: u64,
62}
63
64impl<R: Read> SbfReader<R> {
65    /// Create a new SBF reader
66    pub fn new(reader: R) -> Self {
67        Self {
68            inner: reader,
69            buffer: Vec::with_capacity(DEFAULT_BUFFER_CAPACITY),
70            valid_bytes: 0,
71            validate_crc: true,
72            stats: ReaderStats::default(),
73        }
74    }
75
76    /// Create reader with specific buffer capacity
77    pub fn with_capacity(reader: R, capacity: usize) -> Self {
78        Self {
79            inner: reader,
80            buffer: Vec::with_capacity(capacity),
81            valid_bytes: 0,
82            validate_crc: true,
83            stats: ReaderStats::default(),
84        }
85    }
86
87    /// Enable or disable CRC validation (default: enabled)
88    pub fn validate_crc(mut self, validate: bool) -> Self {
89        self.validate_crc = validate;
90        self
91    }
92
93    /// Get reader statistics
94    pub fn stats(&self) -> &ReaderStats {
95        &self.stats
96    }
97
98    /// Reset statistics
99    pub fn reset_stats(&mut self) {
100        self.stats = ReaderStats::default();
101    }
102
103    /// Read the next SBF block
104    ///
105    /// Returns `Ok(Some(block))` if a block was read successfully,
106    /// `Ok(None)` if end of stream was reached,
107    /// or `Err(e)` if an error occurred.
108    pub fn read_block(&mut self) -> SbfResult<Option<SbfBlock>> {
109        loop {
110            // Try to find sync bytes in buffer
111            if let Some(sync_pos) = self.find_sync() {
112                // Remove any bytes before sync
113                if sync_pos > 0 {
114                    self.stats.bytes_skipped += sync_pos as u64;
115                    self.buffer.drain(0..sync_pos);
116                    self.valid_bytes -= sync_pos;
117                }
118
119                // Try to parse block
120                match self.try_parse_block() {
121                    Ok(Some((block, consumed))) => {
122                        // Remove consumed bytes
123                        self.buffer.drain(0..consumed);
124                        self.valid_bytes -= consumed;
125                        self.stats.blocks_parsed += 1;
126                        return Ok(Some(block));
127                    }
128                    Ok(None) => {
129                        // Need more data
130                        if !self.fill_buffer()? {
131                            // EOF reached
132                            if self.valid_bytes > 0 {
133                                // Partial data at end
134                                return Err(SbfError::IncompleteBlock {
135                                    needed: 8,
136                                    have: self.valid_bytes,
137                                });
138                            }
139                            return Ok(None);
140                        }
141                    }
142                    Err(SbfError::InvalidSync) => {
143                        // Skip one byte and try again
144                        self.buffer.remove(0);
145                        self.valid_bytes -= 1;
146                        self.stats.bytes_skipped += 1;
147                    }
148                    Err(SbfError::CrcMismatch { .. }) => {
149                        // CRC error - skip sync and continue
150                        self.buffer.remove(0);
151                        self.valid_bytes -= 1;
152                        self.stats.crc_errors += 1;
153                        self.stats.bytes_skipped += 1;
154                    }
155                    Err(_e) => {
156                        // Other parse error - skip sync and continue
157                        self.buffer.remove(0);
158                        self.valid_bytes -= 1;
159                        self.stats.parse_errors += 1;
160                        self.stats.bytes_skipped += 1;
161                        // Continue to next potential sync
162                    }
163                }
164            } else {
165                // No sync found - need more data
166                if !self.fill_buffer()? {
167                    return Ok(None);
168                }
169            }
170
171            // Prevent buffer from growing too large
172            self.trim_buffer();
173        }
174    }
175
176    /// Find sync bytes in buffer
177    fn find_sync(&self) -> Option<usize> {
178        if self.valid_bytes < 2 {
179            return None;
180        }
181
182        (0..(self.valid_bytes - 1))
183            .find(|&i| self.buffer[i] == SBF_SYNC[0] && self.buffer[i + 1] == SBF_SYNC[1])
184    }
185
186    /// Try to parse a block from the current buffer position
187    fn try_parse_block(&mut self) -> SbfResult<Option<(SbfBlock, usize)>> {
188        if self.valid_bytes < 8 {
189            return Ok(None);
190        }
191
192        // Verify sync
193        if self.buffer[0] != SBF_SYNC[0] || self.buffer[1] != SBF_SYNC[1] {
194            return Err(SbfError::InvalidSync);
195        }
196
197        // Parse header
198        let header = SbfHeader::parse(&self.buffer[2..])?;
199        let total_len = header.length as usize;
200
201        if self.valid_bytes < total_len {
202            return Ok(None);
203        }
204
205        // Validate CRC if enabled
206        if self.validate_crc && !validate_block(&self.buffer[..total_len]) {
207            // Get stored and calculated CRC for error message
208            let stored_crc = u16::from_le_bytes([self.buffer[2], self.buffer[3]]);
209            return Err(SbfError::CrcMismatch {
210                expected: stored_crc,
211                actual: 0, // We don't recalculate here
212            });
213        }
214
215        // Parse block
216        let (block, consumed) = SbfBlock::parse(&self.buffer[..total_len])?;
217
218        Ok(Some((block, consumed)))
219    }
220
221    /// Fill buffer from source
222    fn fill_buffer(&mut self) -> SbfResult<bool> {
223        // Ensure we have room
224        if self.buffer.len() < self.valid_bytes + 4096 {
225            self.buffer.resize(self.valid_bytes + 4096, 0);
226        }
227
228        let mut temp = [0u8; 4096];
229        match self.inner.read(&mut temp) {
230            Ok(0) => Ok(false), // EOF
231            Ok(n) => {
232                // Append to buffer
233                if self.buffer.len() < self.valid_bytes + n {
234                    self.buffer.resize(self.valid_bytes + n, 0);
235                }
236                self.buffer[self.valid_bytes..self.valid_bytes + n].copy_from_slice(&temp[..n]);
237                self.valid_bytes += n;
238                self.stats.bytes_read += n as u64;
239                Ok(true)
240            }
241            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
242            Err(e) if e.kind() == std::io::ErrorKind::Interrupted => self.fill_buffer(),
243            Err(e) => Err(SbfError::Io(e)),
244        }
245    }
246
247    /// Trim buffer if too large
248    fn trim_buffer(&mut self) {
249        if self.buffer.len() > MAX_BUFFER_SIZE && self.valid_bytes < MAX_BUFFER_SIZE / 2 {
250            self.buffer.truncate(self.valid_bytes);
251            self.buffer.shrink_to_fit();
252        }
253    }
254}
255
256/// Iterator implementation for SbfReader
257impl<R: Read> Iterator for SbfReader<R> {
258    type Item = SbfResult<SbfBlock>;
259
260    fn next(&mut self) -> Option<Self::Item> {
261        match self.read_block() {
262            Ok(Some(block)) => Some(Ok(block)),
263            Ok(None) => None,
264            Err(e) => Some(Err(e)),
265        }
266    }
267}
268
269/// Extension trait for creating SbfReader from Read types
270pub trait SbfReadExt: Read + Sized {
271    /// Create an SbfReader from this Read source
272    fn sbf_blocks(self) -> SbfReader<Self> {
273        SbfReader::new(self)
274    }
275}
276
277impl<R: Read> SbfReadExt for R {}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use std::io::Cursor;
283
284    #[test]
285    fn test_reader_empty() {
286        let data: &[u8] = &[];
287        let mut reader = SbfReader::new(Cursor::new(data));
288
289        assert!(reader.read_block().unwrap().is_none());
290    }
291
292    #[test]
293    fn test_reader_no_sync() {
294        let data = [0x00, 0x00, 0x00, 0x00];
295        let mut reader = SbfReader::new(Cursor::new(&data[..]));
296
297        assert!(reader.read_block().unwrap().is_none());
298    }
299
300    #[test]
301    fn test_reader_stats() {
302        let data: &[u8] = &[0x00, 0x00];
303        let mut reader = SbfReader::new(Cursor::new(data));
304
305        let _ = reader.read_block();
306
307        assert_eq!(reader.stats().bytes_read, 2);
308    }
309
310    #[test]
311    fn test_sbf_read_ext() {
312        let data: &[u8] = &[];
313        let reader = Cursor::new(data).sbf_blocks();
314
315        assert!(reader.validate_crc);
316    }
317}