1use std::collections::HashMap;
2use std::io::{BufReader, Read};
3use std::sync::Arc;
4
5use crate::entry::Entry;
6use crate::error::BinlogError;
7use crate::format::{parse_fmt_payload, MessageFormat};
8use crate::{FMT_TYPE, HEADER_MAGIC};
9
10const MAX_CONSECUTIVE_ERRORS: u32 = 256;
11
12pub struct Reader<R: Read> {
29 reader: BufReader<R>,
30 formats: HashMap<u8, Arc<MessageFormat>>,
31 consecutive_errors: u32,
32}
33
34impl<R: Read> Reader<R> {
35 pub fn new(reader: R) -> Self {
37 let mut formats = HashMap::new();
38 formats.insert(
40 FMT_TYPE,
41 Arc::new(MessageFormat {
42 msg_type: FMT_TYPE,
43 msg_len: 89,
44 name: "FMT".into(),
45 format: "BBnNZ".into(),
46 labels: Arc::from([
47 "Type".into(),
48 "Length".into(),
49 "Name".into(),
50 "Format".into(),
51 "Labels".into(),
52 ]),
53 }),
54 );
55
56 Reader {
57 reader: BufReader::new(reader),
58 formats,
59 consecutive_errors: 0,
60 }
61 }
62
63 fn next_inner(&mut self) -> Result<Option<Entry>, BinlogError> {
64 if self.consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
65 return Ok(None);
66 }
67
68 let mut header = [0u8; 3];
70 match self.read_exact_or_eof(&mut header) {
71 Ok(true) => {}
72 Ok(false) => return Ok(None), Err(_) => return Ok(None),
74 }
75
76 if header[0] != HEADER_MAGIC[0] || header[1] != HEADER_MAGIC[1] {
78 self.consecutive_errors += 1;
79 return self.recover_and_retry();
80 }
81
82 let msg_type = header[2];
83 self.parse_message(msg_type)
84 }
85
86 #[must_use]
88 pub fn formats(&self) -> &HashMap<u8, Arc<MessageFormat>> {
89 &self.formats
90 }
91
92 fn parse_message(&mut self, msg_type: u8) -> Result<Option<Entry>, BinlogError> {
94 let format = match self.formats.get(&msg_type) {
96 Some(f) => Arc::clone(f),
97 None => {
98 self.consecutive_errors += 1;
99 return self.recover_and_retry();
100 }
101 };
102
103 let payload = match self.read_payload(&format) {
105 Ok(p) => p,
106 Err(_) => {
107 self.consecutive_errors += 1;
108 return self.recover_and_retry();
109 }
110 };
111
112 let result = if msg_type == FMT_TYPE {
113 build_fmt_entry(&format, &payload)
114 } else {
115 build_data_entry(&format, msg_type, &payload)
116 };
117
118 match result {
119 Ok((entry, new_fmt)) => {
120 if let Some(new_fmt) = new_fmt {
121 self.formats.insert(new_fmt.msg_type, Arc::new(new_fmt));
122 }
123 self.consecutive_errors = 0;
124 Ok(Some(entry))
125 }
126 Err(_) => {
127 self.consecutive_errors += 1;
128 self.recover_and_retry()
129 }
130 }
131 }
132
133 fn read_payload(&mut self, format: &MessageFormat) -> Result<Vec<u8>, BinlogError> {
135 let payload_len = format.msg_len as usize - 3;
136 let mut payload = vec![0u8; payload_len];
137 match self.read_exact_or_eof(&mut payload) {
138 Ok(true) => Ok(payload),
139 Ok(false) => Err(BinlogError::UnexpectedEof),
140 Err(e) => Err(e),
141 }
142 }
143
144 fn recover_and_retry(&mut self) -> Result<Option<Entry>, BinlogError> {
146 if self.consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
147 return Ok(None);
148 }
149 match self.scan_for_header()? {
150 Some(msg_type) => self.parse_message(msg_type),
151 None => Ok(None),
152 }
153 }
154
155 fn scan_for_header(&mut self) -> Result<Option<u8>, BinlogError> {
158 let mut prev = 0u8;
159 loop {
160 let mut byte = [0u8; 1];
161 match self.reader.read(&mut byte) {
162 Ok(0) => return Ok(None), Ok(_) => {
164 if prev == HEADER_MAGIC[0] && byte[0] == HEADER_MAGIC[1] {
165 let mut msg_type = [0u8; 1];
167 match self.reader.read(&mut msg_type) {
168 Ok(0) => return Ok(None),
169 Ok(_) => return Ok(Some(msg_type[0])),
170 Err(_) => return Ok(None),
171 }
172 }
173 prev = byte[0];
174 }
175 Err(_) => return Ok(None),
176 }
177 }
178 }
179
180 fn read_exact_or_eof(&mut self, buf: &mut [u8]) -> Result<bool, BinlogError> {
183 let mut total = 0;
184 while total < buf.len() {
185 match self.reader.read(&mut buf[total..]) {
186 Ok(0) => {
187 if total == 0 {
188 return Ok(false); }
190 return Err(BinlogError::UnexpectedEof);
191 }
192 Ok(n) => total += n,
193 Err(e) => return Err(BinlogError::Io(e)),
194 }
195 }
196 Ok(true)
197 }
198}
199
200fn build_fmt_entry(
201 format: &MessageFormat,
202 payload: &[u8],
203) -> Result<(Entry, Option<MessageFormat>), BinlogError> {
204 let new_fmt = parse_fmt_payload(payload)?;
205 let values = format.decode_fields(payload)?;
206 let entry = Entry {
207 name: "FMT".into(),
208 msg_type: FMT_TYPE,
209 timestamp_usec: None,
210 labels: format.labels.clone(),
211 values,
212 };
213 Ok((entry, Some(new_fmt)))
214}
215
216fn build_data_entry(
217 format: &MessageFormat,
218 msg_type: u8,
219 payload: &[u8],
220) -> Result<(Entry, Option<MessageFormat>), BinlogError> {
221 let values = format.decode_fields(payload)?;
222 let timestamp_usec = format.extract_timestamp(payload);
223 let entry = Entry {
224 name: format.name.clone(),
225 msg_type,
226 timestamp_usec,
227 labels: format.labels.clone(),
228 values,
229 };
230 Ok((entry, None))
231}
232
233impl<R: Read> Iterator for Reader<R> {
234 type Item = Result<Entry, BinlogError>;
235
236 fn next(&mut self) -> Option<Self::Item> {
237 self.next_inner().transpose()
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use crate::value::FieldValue;
245
246 fn build_fmt_bootstrap() -> Vec<u8> {
248 let mut msg = Vec::new();
249 msg.extend_from_slice(&HEADER_MAGIC);
250 msg.push(FMT_TYPE);
251 let mut payload = [0u8; 86];
253 payload[0] = FMT_TYPE; payload[1] = 89; payload[2..6].copy_from_slice(b"FMT\0"); payload[6..11].copy_from_slice(b"BBnNZ"); let labels = b"Type,Length,Name,Format,Labels";
258 payload[22..22 + labels.len()].copy_from_slice(labels);
259 msg.extend_from_slice(&payload);
260 msg
261 }
262
263 fn build_fmt_for_type(
265 msg_type: u8,
266 msg_len: u8,
267 name: &[u8; 4],
268 format: &str,
269 labels: &str,
270 ) -> Vec<u8> {
271 let mut msg = Vec::new();
272 msg.extend_from_slice(&HEADER_MAGIC);
273 msg.push(FMT_TYPE);
274 let mut payload = [0u8; 86];
275 payload[0] = msg_type;
276 payload[1] = msg_len;
277 payload[2..6].copy_from_slice(name);
278 let fmt_bytes = format.as_bytes();
279 payload[6..6 + fmt_bytes.len()].copy_from_slice(fmt_bytes);
280 let lbl_bytes = labels.as_bytes();
281 payload[22..22 + lbl_bytes.len()].copy_from_slice(lbl_bytes);
282 msg.extend_from_slice(&payload);
283 msg
284 }
285
286 fn build_data_message(msg_type: u8, payload: &[u8]) -> Vec<u8> {
288 let mut msg = Vec::new();
289 msg.extend_from_slice(&HEADER_MAGIC);
290 msg.push(msg_type);
291 msg.extend_from_slice(payload);
292 msg
293 }
294
295 #[test]
296 fn parse_empty_input() {
297 let reader = Reader::new(std::io::Cursor::new(Vec::new()));
298 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
299 assert!(entries.is_empty());
300 }
301
302 #[test]
303 fn parse_fmt_bootstrap_only() {
304 let data = build_fmt_bootstrap();
305 let reader = Reader::new(std::io::Cursor::new(data));
306 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
307 assert_eq!(entries.len(), 1);
308 assert_eq!(entries[0].name, "FMT");
309 assert_eq!(entries[0].msg_type, FMT_TYPE);
310 assert!(entries[0].timestamp_usec.is_none());
311 }
312
313 #[test]
314 fn parse_data_message() {
315 let mut data = Vec::new();
316 data.extend(build_fmt_bootstrap());
318 data.extend(build_fmt_for_type(
320 0x81,
321 15,
322 b"ATT\0",
323 "Qhh",
324 "TimeUS,Roll,Pitch",
325 ));
326 let mut payload = Vec::new();
327 payload.extend_from_slice(&1_000_000u64.to_le_bytes());
328 payload.extend_from_slice(&4500i16.to_le_bytes()); payload.extend_from_slice(&(-200i16).to_le_bytes()); data.extend(build_data_message(0x81, &payload));
331
332 let reader = Reader::new(std::io::Cursor::new(data));
333 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
334 assert_eq!(entries.len(), 3); let att = &entries[2];
337 assert_eq!(att.name, "ATT");
338 assert_eq!(att.msg_type, 0x81);
339 assert_eq!(att.timestamp_usec, Some(1_000_000));
340 assert_eq!(att.get("Roll"), Some(&FieldValue::Int(4500)));
341 assert_eq!(att.get("Pitch"), Some(&FieldValue::Int(-200)));
342 }
343
344 #[test]
345 fn error_recovery_with_garbage() {
346 let mut data = Vec::new();
347 data.extend(build_fmt_bootstrap());
348 data.extend(build_fmt_for_type(
350 0x81, 11, b"TST\0", "Q", "TimeUS",
352 ));
353 data.extend(build_data_message(0x81, &100u64.to_le_bytes()));
355 data.extend_from_slice(&[0xFF; 50]);
357 data.extend(build_data_message(0x81, &200u64.to_le_bytes()));
359
360 let reader = Reader::new(std::io::Cursor::new(data));
361 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
362
363 let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
365 assert_eq!(tst_entries.len(), 2);
366 assert_eq!(tst_entries[0].timestamp_usec, Some(100));
367 assert_eq!(tst_entries[1].timestamp_usec, Some(200));
368 }
369
370 #[test]
371 fn truncated_final_message() {
372 let mut data = Vec::new();
373 data.extend(build_fmt_bootstrap());
374 data.extend(build_fmt_for_type(0x81, 11, b"TST\0", "Q", "TimeUS"));
375 data.extend(build_data_message(0x81, &100u64.to_le_bytes()));
377 data.extend_from_slice(&HEADER_MAGIC);
379 data.push(0x81);
380 data.extend_from_slice(&[0; 3]); let reader = Reader::new(std::io::Cursor::new(data));
383 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
384
385 let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
386 assert_eq!(tst_entries.len(), 1);
387 assert_eq!(tst_entries[0].timestamp_usec, Some(100));
388 }
389
390 #[test]
391 fn unknown_type_recovery() {
392 let mut data = Vec::new();
393 data.extend(build_fmt_bootstrap());
394 data.extend(build_fmt_for_type(0x81, 11, b"TST\0", "Q", "TimeUS"));
395 data.extend_from_slice(&HEADER_MAGIC);
397 data.push(0x99);
398 data.extend_from_slice(&[0; 20]); data.extend(build_data_message(0x81, &300u64.to_le_bytes()));
401
402 let reader = Reader::new(std::io::Cursor::new(data));
403 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
404
405 let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
406 assert_eq!(tst_entries.len(), 1);
407 assert_eq!(tst_entries[0].timestamp_usec, Some(300));
408 }
409
410 #[test]
411 fn max_consecutive_errors_boundary() {
412 let mut data = Vec::new();
413 data.extend(build_fmt_bootstrap());
414 data.extend(build_fmt_for_type(0x81, 11, b"TST\0", "Q", "TimeUS"));
415
416 let error_count = MAX_CONSECUTIVE_ERRORS + 10;
419 for _ in 0..error_count {
420 data.extend_from_slice(&HEADER_MAGIC);
421 data.push(0x99);
422 }
423
424 data.extend(build_data_message(0x81, &999u64.to_le_bytes()));
425
426 let reader = Reader::new(std::io::Cursor::new(data));
427 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
428
429 let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
430 assert!(
431 tst_entries.is_empty(),
432 "reader should stop before reaching the valid message after {} errors",
433 MAX_CONSECUTIVE_ERRORS
434 );
435 }
436
437 #[test]
438 fn recovery_just_below_max_errors() {
439 let mut data = Vec::new();
440 data.extend(build_fmt_bootstrap());
441 data.extend(build_fmt_for_type(0x81, 11, b"TST\0", "Q", "TimeUS"));
442
443 for _ in 0..(MAX_CONSECUTIVE_ERRORS - 1) {
446 data.extend_from_slice(&HEADER_MAGIC);
447 data.push(0x99);
448 }
449
450 data.extend(build_data_message(0x81, &777u64.to_le_bytes()));
451
452 let reader = Reader::new(std::io::Cursor::new(data));
453 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
454
455 let tst_entries: Vec<_> = entries.iter().filter(|e| e.name == "TST").collect();
456 assert_eq!(
457 tst_entries.len(),
458 1,
459 "recovery should still work at {} consecutive errors",
460 MAX_CONSECUTIVE_ERRORS - 1
461 );
462 assert_eq!(tst_entries[0].timestamp_usec, Some(777));
463 }
464
465 #[test]
466 fn formats_accessible() {
467 let data = build_fmt_bootstrap();
468 let mut reader = Reader::new(std::io::Cursor::new(data));
469 let _ = reader.next(); assert!(reader.formats().contains_key(&FMT_TYPE));
471 assert_eq!(reader.formats().get(&FMT_TYPE).unwrap().name, "FMT");
472 }
473}