Skip to main content

ardupilot_binlog/
reader.rs

1use std::collections::HashMap;
2use std::io::{BufReader, Read};
3use std::sync::Arc;
4
5use crate::entry::Entry;
6use crate::error::BinlogError;
7use crate::format::{parse_fmt_payload, MessageFormat};
8use crate::{FMT_TYPE, HEADER_MAGIC};
9
10const MAX_CONSECUTIVE_ERRORS: u32 = 256;
11
12/// Streaming parser for DataFlash BIN data.
13///
14/// Reads from any `Read` source, discovers message schemas via FMT
15/// messages, and yields parsed [`Entry`] values via the [`Iterator`] trait.
16///
17/// ```
18/// use ardupilot_binlog::Reader;
19/// use std::io::Cursor;
20///
21/// let data: Vec<u8> = vec![];
22/// let reader = Reader::new(Cursor::new(data));
23/// for result in reader {
24///     let entry = result.unwrap();
25///     println!("{}: {} fields", entry.name, entry.len());
26/// }
27/// ```
28pub struct Reader<R: Read> {
29    reader: BufReader<R>,
30    formats: HashMap<u8, Arc<MessageFormat>>,
31    consecutive_errors: u32,
32}
33
34impl<R: Read> Reader<R> {
35    /// Create a new reader wrapping a byte source.
36    pub fn new(reader: R) -> Self {
37        let mut formats = HashMap::new();
38        // Bootstrap: hardcode the FMT definition so we can parse the first FMT message
39        formats.insert(
40            FMT_TYPE,
41            Arc::new(MessageFormat {
42                msg_type: FMT_TYPE,
43                msg_len: 89,
44                name: "FMT".into(),
45                format: "BBnNZ".into(),
46                labels: Arc::from([
47                    "Type".into(),
48                    "Length".into(),
49                    "Name".into(),
50                    "Format".into(),
51                    "Labels".into(),
52                ]),
53            }),
54        );
55
56        Reader {
57            reader: BufReader::new(reader),
58            formats,
59            consecutive_errors: 0,
60        }
61    }
62
63    fn next_inner(&mut self) -> Result<Option<Entry>, BinlogError> {
64        if self.consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
65            return Ok(None);
66        }
67
68        // Read the 3-byte header
69        let mut header = [0u8; 3];
70        match self.read_exact_or_eof(&mut header) {
71            Ok(true) => {}
72            Ok(false) => return Ok(None), // EOF
73            Err(_) => return Ok(None),
74        }
75
76        // Validate magic bytes
77        if header[0] != HEADER_MAGIC[0] || header[1] != HEADER_MAGIC[1] {
78            self.consecutive_errors += 1;
79            return self.recover_and_retry();
80        }
81
82        let msg_type = header[2];
83        self.parse_message(msg_type)
84    }
85
86    /// Return all message format definitions discovered so far.
87    #[must_use]
88    pub fn formats(&self) -> &HashMap<u8, Arc<MessageFormat>> {
89        &self.formats
90    }
91
92    /// Parse a message given its type byte (header already consumed).
93    fn parse_message(&mut self, msg_type: u8) -> Result<Option<Entry>, BinlogError> {
94        // Clone the Arc so we own a handle and don't borrow self.formats
95        let format = match self.formats.get(&msg_type) {
96            Some(f) => Arc::clone(f),
97            None => {
98                self.consecutive_errors += 1;
99                return self.recover_and_retry();
100            }
101        };
102
103        // Read payload (no borrow on self.formats)
104        let payload = match self.read_payload(&format) {
105            Ok(p) => p,
106            Err(_) => {
107                self.consecutive_errors += 1;
108                return self.recover_and_retry();
109            }
110        };
111
112        let result = if msg_type == FMT_TYPE {
113            build_fmt_entry(&format, &payload)
114        } else {
115            build_data_entry(&format, msg_type, &payload)
116        };
117
118        match result {
119            Ok((entry, new_fmt)) => {
120                if let Some(new_fmt) = new_fmt {
121                    self.formats.insert(new_fmt.msg_type, Arc::new(new_fmt));
122                }
123                self.consecutive_errors = 0;
124                Ok(Some(entry))
125            }
126            Err(_) => {
127                self.consecutive_errors += 1;
128                self.recover_and_retry()
129            }
130        }
131    }
132
133    /// Read the payload bytes for a message (everything after the 3-byte header).
134    fn read_payload(&mut self, format: &MessageFormat) -> Result<Vec<u8>, BinlogError> {
135        let payload_len = format.msg_len as usize - 3;
136        let mut payload = vec![0u8; payload_len];
137        match self.read_exact_or_eof(&mut payload) {
138            Ok(true) => Ok(payload),
139            Ok(false) => Err(BinlogError::UnexpectedEof),
140            Err(e) => Err(e),
141        }
142    }
143
144    /// Scan forward byte-by-byte looking for the magic header, then retry parsing.
145    fn recover_and_retry(&mut self) -> Result<Option<Entry>, BinlogError> {
146        if self.consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
147            return Ok(None);
148        }
149        match self.scan_for_header()? {
150            Some(msg_type) => self.parse_message(msg_type),
151            None => Ok(None),
152        }
153    }
154
155    /// Scan byte-by-byte for the next 0xA3 0x95 sequence.
156    /// Returns the msg_type byte that follows the magic, or None at EOF.
157    fn scan_for_header(&mut self) -> Result<Option<u8>, BinlogError> {
158        let mut prev = 0u8;
159        loop {
160            let mut byte = [0u8; 1];
161            match self.reader.read(&mut byte) {
162                Ok(0) => return Ok(None), // EOF
163                Ok(_) => {
164                    if prev == HEADER_MAGIC[0] && byte[0] == HEADER_MAGIC[1] {
165                        // Found magic, read msg_type
166                        let mut msg_type = [0u8; 1];
167                        match self.reader.read(&mut msg_type) {
168                            Ok(0) => return Ok(None),
169                            Ok(_) => return Ok(Some(msg_type[0])),
170                            Err(_) => return Ok(None),
171                        }
172                    }
173                    prev = byte[0];
174                }
175                Err(_) => return Ok(None),
176            }
177        }
178    }
179
180    /// Read exactly `buf.len()` bytes. Returns Ok(true) on success, Ok(false) on EOF
181    /// (zero bytes available), Err on partial read / IO error.
182    fn read_exact_or_eof(&mut self, buf: &mut [u8]) -> Result<bool, BinlogError> {
183        let mut total = 0;
184        while total < buf.len() {
185            match self.reader.read(&mut buf[total..]) {
186                Ok(0) => {
187                    if total == 0 {
188                        return Ok(false); // Clean EOF
189                    }
190                    return Err(BinlogError::UnexpectedEof);
191                }
192                Ok(n) => total += n,
193                Err(e) => return Err(BinlogError::Io(e)),
194            }
195        }
196        Ok(true)
197    }
198}
199
200fn build_fmt_entry(
201    format: &MessageFormat,
202    payload: &[u8],
203) -> Result<(Entry, Option<MessageFormat>), BinlogError> {
204    let new_fmt = parse_fmt_payload(payload)?;
205    let values = format.decode_fields(payload)?;
206    let entry = Entry {
207        name: "FMT".into(),
208        msg_type: FMT_TYPE,
209        timestamp_usec: None,
210        labels: format.labels.clone(),
211        values,
212    };
213    Ok((entry, Some(new_fmt)))
214}
215
216fn build_data_entry(
217    format: &MessageFormat,
218    msg_type: u8,
219    payload: &[u8],
220) -> Result<(Entry, Option<MessageFormat>), BinlogError> {
221    let values = format.decode_fields(payload)?;
222    let timestamp_usec = format.extract_timestamp(payload);
223    let entry = Entry {
224        name: format.name.clone(),
225        msg_type,
226        timestamp_usec,
227        labels: format.labels.clone(),
228        values,
229    };
230    Ok((entry, None))
231}
232
233impl<R: Read> Iterator for Reader<R> {
234    type Item = Result<Entry, BinlogError>;
235
236    fn next(&mut self) -> Option<Self::Item> {
237        self.next_inner().transpose()
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use crate::value::FieldValue;
245
246    /// Build a valid FMT message (89 bytes) that defines the FMT type itself.
247    fn build_fmt_bootstrap() -> Vec<u8> {
248        let mut msg = Vec::new();
249        msg.extend_from_slice(&HEADER_MAGIC);
250        msg.push(FMT_TYPE);
251        // 86-byte payload
252        let mut payload = [0u8; 86];
253        payload[0] = FMT_TYPE; // type
254        payload[1] = 89; // length
255        payload[2..6].copy_from_slice(b"FMT\0"); // name
256        payload[6..11].copy_from_slice(b"BBnNZ"); // format
257        let labels = b"Type,Length,Name,Format,Labels";
258        payload[22..22 + labels.len()].copy_from_slice(labels);
259        msg.extend_from_slice(&payload);
260        msg
261    }
262
263    /// Build a FMT message that defines a custom message type.
264    fn build_fmt_for_type(
265        msg_type: u8,
266        msg_len: u8,
267        name: &[u8; 4],
268        format: &str,
269        labels: &str,
270    ) -> Vec<u8> {
271        let mut msg = Vec::new();
272        msg.extend_from_slice(&HEADER_MAGIC);
273        msg.push(FMT_TYPE);
274        let mut payload = [0u8; 86];
275        payload[0] = msg_type;
276        payload[1] = msg_len;
277        payload[2..6].copy_from_slice(name);
278        let fmt_bytes = format.as_bytes();
279        payload[6..6 + fmt_bytes.len()].copy_from_slice(fmt_bytes);
280        let lbl_bytes = labels.as_bytes();
281        payload[22..22 + lbl_bytes.len()].copy_from_slice(lbl_bytes);
282        msg.extend_from_slice(&payload);
283        msg
284    }
285
286    /// Build a data message with a given type and raw payload.
287    fn build_data_message(msg_type: u8, payload: &[u8]) -> Vec<u8> {
288        let mut msg = Vec::new();
289        msg.extend_from_slice(&HEADER_MAGIC);
290        msg.push(msg_type);
291        msg.extend_from_slice(payload);
292        msg
293    }
294
295    #[test]
296    fn parse_empty_input() {
297        let reader = Reader::new(std::io::Cursor::new(Vec::new()));
298        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
299        assert!(entries.is_empty());
300    }
301
302    #[test]
303    fn parse_fmt_bootstrap_only() {
304        let data = build_fmt_bootstrap();
305        let reader = Reader::new(std::io::Cursor::new(data));
306        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
307        assert_eq!(entries.len(), 1);
308        assert_eq!(entries[0].name, "FMT");
309        assert_eq!(entries[0].msg_type, FMT_TYPE);
310        assert!(entries[0].timestamp_usec.is_none());
311    }
312
313    #[test]
314    fn parse_data_message() {
315        let mut data = Vec::new();
316        // Bootstrap FMT
317        data.extend(build_fmt_bootstrap());
318        // payload size: 8 + 2 + 2 = 12, total = 15
319        data.extend(build_fmt_for_type(
320            0x81,
321            15,
322            b"ATT\0",
323            "Qhh",
324            "TimeUS,Roll,Pitch",
325        ));
326        let mut payload = Vec::new();
327        payload.extend_from_slice(&1_000_000u64.to_le_bytes());
328        payload.extend_from_slice(&4500i16.to_le_bytes()); // Roll
329        payload.extend_from_slice(&(-200i16).to_le_bytes()); // Pitch
330        data.extend(build_data_message(0x81, &payload));
331
332        let reader = Reader::new(std::io::Cursor::new(data));
333        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
334        assert_eq!(entries.len(), 3); // FMT(FMT), FMT(ATT), ATT
335
336        let att = &entries[2];
337        assert_eq!(att.name, "ATT");
338        assert_eq!(att.msg_type, 0x81);
339        assert_eq!(att.timestamp_usec, Some(1_000_000));
340        assert_eq!(att.get("Roll"), Some(&FieldValue::Int(4500)));
341        assert_eq!(att.get("Pitch"), Some(&FieldValue::Int(-200)));
342    }
343
344    #[test]
345    fn error_recovery_with_garbage() {
346        let mut data = Vec::new();
347        data.extend(build_fmt_bootstrap());
348        // Define a simple type
349        data.extend(build_fmt_for_type(
350            0x81, 11, // 3 + 8
351            b"TST\0", "Q", "TimeUS",
352        ));
353        // First valid data message
354        data.extend(build_data_message(0x81, &100u64.to_le_bytes()));
355        // Garbage bytes
356        data.extend_from_slice(&[0xFF; 50]);
357        // Second valid data message
358        data.extend(build_data_message(0x81, &200u64.to_le_bytes()));
359
360        let reader = Reader::new(std::io::Cursor::new(data));
361        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
362
363        // Should have: FMT(FMT), FMT(TST), TST(100), TST(200)
364        let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
365        assert_eq!(tst_entries.len(), 2);
366        assert_eq!(tst_entries[0].timestamp_usec, Some(100));
367        assert_eq!(tst_entries[1].timestamp_usec, Some(200));
368    }
369
370    #[test]
371    fn truncated_final_message() {
372        let mut data = Vec::new();
373        data.extend(build_fmt_bootstrap());
374        data.extend(build_fmt_for_type(0x81, 11, b"TST\0", "Q", "TimeUS"));
375        // Valid message
376        data.extend(build_data_message(0x81, &100u64.to_le_bytes()));
377        // Truncated message: header + partial payload
378        data.extend_from_slice(&HEADER_MAGIC);
379        data.push(0x81);
380        data.extend_from_slice(&[0; 3]); // only 3 of 8 payload bytes
381
382        let reader = Reader::new(std::io::Cursor::new(data));
383        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
384
385        let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
386        assert_eq!(tst_entries.len(), 1);
387        assert_eq!(tst_entries[0].timestamp_usec, Some(100));
388    }
389
390    #[test]
391    fn unknown_type_recovery() {
392        let mut data = Vec::new();
393        data.extend(build_fmt_bootstrap());
394        data.extend(build_fmt_for_type(0x81, 11, b"TST\0", "Q", "TimeUS"));
395        // Message with unknown type 0x99
396        data.extend_from_slice(&HEADER_MAGIC);
397        data.push(0x99);
398        data.extend_from_slice(&[0; 20]); // some bytes
399                                          // Valid message after
400        data.extend(build_data_message(0x81, &300u64.to_le_bytes()));
401
402        let reader = Reader::new(std::io::Cursor::new(data));
403        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
404
405        let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
406        assert_eq!(tst_entries.len(), 1);
407        assert_eq!(tst_entries[0].timestamp_usec, Some(300));
408    }
409
410    #[test]
411    fn max_consecutive_errors_boundary() {
412        let mut data = Vec::new();
413        data.extend(build_fmt_bootstrap());
414        data.extend(build_fmt_for_type(0x81, 11, b"TST\0", "Q", "TimeUS"));
415
416        // Each unknown-type header chains: parse_message → recover_and_retry →
417        // scan_for_header → parse_message, incrementing consecutive_errors each time.
418        let error_count = MAX_CONSECUTIVE_ERRORS + 10;
419        for _ in 0..error_count {
420            data.extend_from_slice(&HEADER_MAGIC);
421            data.push(0x99);
422        }
423
424        data.extend(build_data_message(0x81, &999u64.to_le_bytes()));
425
426        let reader = Reader::new(std::io::Cursor::new(data));
427        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
428
429        let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
430        assert!(
431            tst_entries.is_empty(),
432            "reader should stop before reaching the valid message after {} errors",
433            MAX_CONSECUTIVE_ERRORS
434        );
435    }
436
437    #[test]
438    fn recovery_just_below_max_errors() {
439        let mut data = Vec::new();
440        data.extend(build_fmt_bootstrap());
441        data.extend(build_fmt_for_type(0x81, 11, b"TST\0", "Q", "TimeUS"));
442
443        // 255 unknown-type headers: one below the limit, so the final
444        // recover_and_retry still scans forward and finds the valid message.
445        for _ in 0..(MAX_CONSECUTIVE_ERRORS - 1) {
446            data.extend_from_slice(&HEADER_MAGIC);
447            data.push(0x99);
448        }
449
450        data.extend(build_data_message(0x81, &777u64.to_le_bytes()));
451
452        let reader = Reader::new(std::io::Cursor::new(data));
453        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
454
455        let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
456        assert_eq!(
457            tst_entries.len(),
458            1,
459            "recovery should still work at {} consecutive errors",
460            MAX_CONSECUTIVE_ERRORS - 1
461        );
462        assert_eq!(tst_entries[0].timestamp_usec, Some(777));
463    }
464
465    #[test]
466    fn formats_accessible() {
467        let data = build_fmt_bootstrap();
468        let mut reader = Reader::new(std::io::Cursor::new(data));
469        let _ = reader.next(); // parse the FMT
470        assert!(reader.formats().contains_key(&FMT_TYPE));
471        assert_eq!(reader.formats().get(&FMT_TYPE).unwrap().name, "FMT");
472    }
473}