Skip to main content

sbf_tools/
reader.rs

1//! SBF stream/file reader
2//!
3//! Provides `SbfReader` for reading SBF blocks from any `Read` source.
4
5use std::collections::VecDeque;
6use std::io::Read;
7
8use crate::blocks::SbfBlock;
9use crate::crc::validate_block;
10use crate::error::{SbfError, SbfResult};
11use crate::header::{SbfHeader, SBF_SYNC};
12
13/// Default buffer capacity (64KB)
14const DEFAULT_BUFFER_CAPACITY: usize = 65536;
15
16/// Maximum buffer size before trimming (128KB)
17const MAX_BUFFER_SIZE: usize = 131072;
18
19/// SBF block reader
20///
21/// Reads SBF blocks from any source implementing `Read`.
22///
23/// # Example
24///
25/// ```no_run
26/// use std::fs::File;
27/// use sbf_tools::SbfReader;
28///
29/// let file = File::open("data.sbf").unwrap();
30/// let mut reader = SbfReader::new(file);
31///
32/// while let Some(result) = reader.next() {
33///     match result {
34///         Ok(block) => println!("Got block: {}", block.name()),
35///         Err(e) => eprintln!("Error: {}", e),
36///     }
37/// }
38/// ```
39pub struct SbfReader<R: Read> {
40    inner: R,
41    buffer: VecDeque<u8>,
42    /// Whether to validate CRC
43    validate_crc: bool,
44    /// Statistics
45    stats: ReaderStats,
46}
47
48/// Reader statistics
49#[derive(Debug, Clone, Default)]
50pub struct ReaderStats {
51    /// Total bytes read from source
52    pub bytes_read: u64,
53    /// Number of blocks successfully parsed
54    pub blocks_parsed: u64,
55    /// Number of CRC errors
56    pub crc_errors: u64,
57    /// Number of parse errors
58    pub parse_errors: u64,
59    /// Bytes skipped looking for sync
60    pub bytes_skipped: u64,
61}
62
63impl<R: Read> SbfReader<R> {
64    /// Create a new SBF reader
65    pub fn new(reader: R) -> Self {
66        Self {
67            inner: reader,
68            buffer: VecDeque::with_capacity(DEFAULT_BUFFER_CAPACITY),
69            validate_crc: true,
70            stats: ReaderStats::default(),
71        }
72    }
73
74    /// Create reader with specific buffer capacity
75    pub fn with_capacity(reader: R, capacity: usize) -> Self {
76        Self {
77            inner: reader,
78            buffer: VecDeque::with_capacity(capacity),
79            validate_crc: true,
80            stats: ReaderStats::default(),
81        }
82    }
83
84    /// Enable or disable CRC validation (default: enabled)
85    pub fn validate_crc(mut self, validate: bool) -> Self {
86        self.validate_crc = validate;
87        self
88    }
89
90    /// Get reader statistics
91    pub fn stats(&self) -> &ReaderStats {
92        &self.stats
93    }
94
95    /// Reset statistics
96    pub fn reset_stats(&mut self) {
97        self.stats = ReaderStats::default();
98    }
99
100    /// Read the next SBF block
101    ///
102    /// Returns `Ok(Some(block))` if a block was read successfully,
103    /// `Ok(None)` if end of stream was reached,
104    /// or `Err(e)` if an error occurred.
105    pub fn read_block(&mut self) -> SbfResult<Option<SbfBlock>> {
106        loop {
107            // Try to find sync bytes in buffer
108            if let Some(sync_pos) = self.find_sync() {
109                // Remove any bytes before sync
110                if sync_pos > 0 {
111                    self.stats.bytes_skipped += sync_pos as u64;
112                    self.buffer.drain(0..sync_pos);
113                }
114
115                // Try to parse block
116                match self.try_parse_block() {
117                    Ok(Some((block, consumed))) => {
118                        // Remove consumed bytes
119                        self.buffer.drain(0..consumed);
120                        self.stats.blocks_parsed += 1;
121                        return Ok(Some(block));
122                    }
123                    Ok(None) => {
124                        // Need more data
125                        if !self.fill_buffer()? {
126                            // EOF reached
127                            if self.buffer.len() > 0 {
128                                // Partial data at end
129                                return Err(SbfError::IncompleteBlock {
130                                    needed: 8,
131                                    have: self.buffer.len(),
132                                });
133                            }
134                            return Ok(None);
135                        }
136                    }
137                    Err(SbfError::InvalidSync) => {
138                        // Skip one byte and try again
139                        self.buffer.remove(0);
140                        self.stats.bytes_skipped += 1;
141                    }
142                    Err(SbfError::CrcMismatch { .. }) => {
143                        // CRC error - skip sync and continue
144                        self.buffer.remove(0);
145                        self.stats.crc_errors += 1;
146                        self.stats.bytes_skipped += 1;
147                    }
148                    Err(_e) => {
149                        // Other parse error - skip sync and continue
150                        self.buffer.remove(0);
151                        self.stats.parse_errors += 1;
152                        self.stats.bytes_skipped += 1;
153                        // Continue to next potential sync
154                    }
155                }
156            } else {
157                // No sync found - need more data
158                if !self.fill_buffer()? {
159                    return Ok(None);
160                }
161            }
162
163            // Prevent buffer from growing too large
164            self.trim_buffer();
165        }
166    }
167
168    /// Find sync bytes in buffer
169    fn find_sync(&self) -> Option<usize> {
170        if self.buffer.len() < 2 {
171            return None;
172        }
173
174        (0..(self.buffer.len() - 1))
175            .find(|&i| self.buffer[i] == SBF_SYNC[0] && self.buffer[i + 1] == SBF_SYNC[1])
176    }
177
178    /// Try to parse a block from the current buffer position
179    fn try_parse_block(&mut self) -> SbfResult<Option<(SbfBlock, usize)>> {
180        if self.buffer.len() < 8 {
181            return Ok(None);
182        }
183
184        let buffer = self.buffer.make_contiguous();
185
186        // Parse header
187        let header = SbfHeader::parse(&buffer[2..])?;
188        let total_len = header.length as usize;
189
190        if buffer.len() < total_len {
191            return Ok(None);
192        }
193
194        // Validate CRC if enabled
195        if self.validate_crc && !validate_block(&buffer[..total_len]) {
196            // Get stored and calculated CRC for error message
197            let stored_crc = u16::from_le_bytes([self.buffer[2], self.buffer[3]]);
198            return Err(SbfError::CrcMismatch {
199                expected: stored_crc,
200                actual: 0, // We don't recalculate here
201            });
202        }
203
204        // Parse block
205        let (block, consumed) = SbfBlock::parse(&buffer[..total_len])?;
206
207        Ok(Some((block, consumed)))
208    }
209
210    /// Fill buffer from source
211    fn fill_buffer(&mut self) -> SbfResult<bool> {
212        let mut temp = [0u8; 4096];
213        match self.inner.read(&mut temp) {
214            Ok(0) => Ok(false), // EOF
215            Ok(n) => {
216                self.buffer.extend(&temp[..n]);
217                self.stats.bytes_read += n as u64;
218                Ok(true)
219            }
220            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
221            Err(e) if e.kind() == std::io::ErrorKind::Interrupted => self.fill_buffer(),
222            Err(e) => Err(SbfError::Io(e)),
223        }
224    }
225
226    /// Trim buffer if too large
227    fn trim_buffer(&mut self) {
228        if self.buffer.capacity() > MAX_BUFFER_SIZE && self.buffer.len() < MAX_BUFFER_SIZE / 2 {
229            self.buffer.shrink_to_fit();
230        }
231    }
232}
233
234/// Iterator implementation for SbfReader
235impl<R: Read> Iterator for SbfReader<R> {
236    type Item = SbfResult<SbfBlock>;
237
238    fn next(&mut self) -> Option<Self::Item> {
239        match self.read_block() {
240            Ok(Some(block)) => Some(Ok(block)),
241            Ok(None) => None,
242            Err(e) => Some(Err(e)),
243        }
244    }
245}
246
247/// Extension trait for creating SbfReader from Read types
248pub trait SbfReadExt: Read + Sized {
249    /// Create an SbfReader from this Read source
250    fn sbf_blocks(self) -> SbfReader<Self> {
251        SbfReader::new(self)
252    }
253}
254
255impl<R: Read> SbfReadExt for R {}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use std::io::Cursor;
261
262    #[test]
263    fn test_reader_empty() {
264        let data: &[u8] = &[];
265        let mut reader = SbfReader::new(Cursor::new(data));
266
267        assert!(reader.read_block().unwrap().is_none());
268    }
269
270    #[test]
271    fn test_reader_no_sync() {
272        let data = [0x00, 0x00, 0x00, 0x00];
273        let mut reader = SbfReader::new(Cursor::new(&data[..]));
274
275        assert!(reader.read_block().unwrap().is_none());
276    }
277
278    #[test]
279    fn test_reader_stats() {
280        let data: &[u8] = &[0x00, 0x00];
281        let mut reader = SbfReader::new(Cursor::new(data));
282
283        let _ = reader.read_block();
284
285        assert_eq!(reader.stats().bytes_read, 2);
286    }
287
288    #[test]
289    fn test_sbf_read_ext() {
290        let data: &[u8] = &[];
291        let reader = Cursor::new(data).sbf_blocks();
292
293        assert!(reader.validate_crc);
294    }
295}