1#![doc = include_str!("../README.md")]
2
3pub mod data_message;
4pub mod dropout_message;
5pub mod format_message;
6pub mod info_message;
7pub mod logged_message;
8pub mod multi_message;
9pub mod parameter_message;
10pub mod subscription_message;
11pub mod tagged_logged_message;
12pub mod ulog;
13
14use byteorder::{LittleEndian, ReadBytesExt};
15use dropout_message::DropoutStats;
16use format_message::FormatMessage;
17use info_message::*;
18use logged_message::LoggedMessage;
19use multi_message::MultiMessage;
20use parameter_message::{DefaultParameterMessage, ParameterMessage};
21use serde_derive::Serialize;
22use std::collections::HashMap;
23use std::io::{self, Read};
24use subscription_message::SubscriptionMessage;
25use tagged_logged_message::*;
26use thiserror::Error;
27
28const MAX_MESSAGE_SIZE: u16 = 65535;
30
31#[derive(Debug, Clone, PartialEq)]
32pub enum ULogType {
38 Basic(ULogValueType),
39 Message(String), }
41
42#[derive(Debug, Clone, PartialEq, Serialize)]
44pub enum ULogValueType {
47 Int8,
48 UInt8,
49 Int16,
50 UInt16,
51 Int32,
52 UInt32,
53 Int64,
54 UInt64,
55 Float,
56 Double,
57 Bool,
58 Char,
59}
60
61#[derive(Debug, Clone, Serialize)]
62pub enum ULogValue {
65 Int8(i8),
66 UInt8(u8),
67 Int16(i16),
68 UInt16(u16),
69 Int32(i32),
70 UInt32(u32),
71 Int64(i64),
72 UInt64(u64),
73 Float(f32),
74 Double(f64),
75 Bool(bool),
76 Char(char),
77 Int8Array(Vec<i8>),
79 UInt8Array(Vec<u8>),
80 Int16Array(Vec<i16>),
81 UInt16Array(Vec<u16>),
82 Int32Array(Vec<i32>),
83 UInt32Array(Vec<u32>),
84 Int64Array(Vec<i64>),
85 UInt64Array(Vec<u64>),
86 FloatArray(Vec<f32>),
87 DoubleArray(Vec<f64>),
88 BoolArray(Vec<bool>),
89 CharArray(String), Message(Vec<ULogValue>), MessageArray(Vec<Vec<ULogValue>>), }
93
94impl ULogValue {
95 pub fn calculate_size(&self) -> usize {
96 match self {
97 ULogValue::BoolArray(v) => v.len(),
98 ULogValue::CharArray(s) => s.len(),
99 ULogValue::DoubleArray(v) => v.len() * 8,
100 ULogValue::FloatArray(v) => v.len() * 4,
101 ULogValue::Int16(_) | ULogValue::UInt16(_) => 2,
102 ULogValue::Int16Array(v) => v.len() * 2,
103 ULogValue::Int32(_) | ULogValue::UInt32(_) | ULogValue::Float(_) => 4,
104 ULogValue::Int32Array(v) => v.len() * 4,
105 ULogValue::Int64(_) | ULogValue::UInt64(_) | ULogValue::Double(_) => 8,
106 ULogValue::Int64Array(v) => v.len() * 8,
107 ULogValue::Int8(_) | ULogValue::UInt8(_) | ULogValue::Bool(_) | ULogValue::Char(_) => 1,
108 ULogValue::Int8Array(v) => v.len(),
109 ULogValue::UInt8Array(v) => v.len(),
110 ULogValue::Message(map) => map.iter().map(|v| v.calculate_size()).sum(),
111 ULogValue::MessageArray(arr) => arr
112 .iter()
113 .map(|v| v.iter().map(|inner| inner.calculate_size()).sum::<usize>())
114 .sum(),
115 ULogValue::UInt16Array(vec) => vec.len() * 2,
116 ULogValue::UInt32Array(vec) => vec.len() * 4,
117 ULogValue::UInt64Array(vec) => vec.len() * 8,
118 }
119 }
120}
121
122#[derive(Debug, Error)]
123pub enum ULogError {
124 #[error("IO error: {0}")]
125 Io(#[from] io::Error),
126 #[error("Invalid magic bytes")]
127 InvalidMagic,
128 #[error("Unsupported version: {0}")]
129 UnsupportedVersion(u8),
130 #[error("Invalid message type: {0}")]
131 InvalidMessageType(u8),
132 #[error("Invalid string data")]
133 InvalidString,
134 #[error("Invalid type name: {0}")]
135 InvalidTypeName(String),
136 #[error("Parse error: {0}")]
137 ParseError(String),
138 #[error("IncompatibleFlags: {:?}", .0)]
139 IncompatibleFlags(Vec<u8>),
140}
141#[derive(Debug)]
143pub struct ULogHeader {
150 pub version: u8,
151 pub timestamp: u64,
152}
153
154#[derive(Debug)]
156
157pub struct MessageHeader {
163 pub msg_size: u16,
164 pub msg_type: u8,
165}
166
167#[derive(Debug)]
168pub struct FlagBitsMessage {
173 pub compat_flags: [u8; 8],
174 pub incompat_flags: [u8; 8],
175 pub appended_offsets: [u64; 3],
176}
177
178#[derive(Debug)]
179pub struct ULogParser<R: Read> {
188 reader: R,
189 _version: u8,
190 _current_timestamp: u64,
191 dropout_details: DropoutStats,
192 header: ULogHeader,
193 formats: HashMap<String, FormatMessage>,
194 subscriptions: HashMap<u16, SubscriptionMessage>,
195 logged_messages: Vec<LoggedMessage>,
196 logged_messages_tagged: HashMap<u16, Vec<TaggedLoggedMessage>>,
197 info_messages: HashMap<String, InfoMessage>,
198 initial_params: HashMap<String, ParameterMessage>,
199 multi_messages: HashMap<String, Vec<MultiMessage>>,
200 default_params: HashMap<String, DefaultParameterMessage>,
201 changed_params: HashMap<String, Vec<ParameterMessage>>,
202}
203
204impl<R: Read> ULogParser<R> {
205 pub fn new(mut reader: R) -> Result<Self, ULogError> {
206 let mut magic = [0u8; 7];
208 reader.read_exact(&mut magic)?;
209 if magic != [0x55, 0x4C, 0x6F, 0x67, 0x01, 0x12, 0x35] {
210 return Err(ULogError::InvalidMagic);
211 }
212 log::info!("Magic bytes: {:?}", magic);
213 let version = reader.read_u8()?;
215 if version > 1 {
216 return Err(ULogError::UnsupportedVersion(version));
217 }
218 log::info!("ULog version: {}", version);
219 let timestamp = reader.read_u64::<LittleEndian>()?;
221
222 let header = ULogHeader { version, timestamp };
223
224 Ok(ULogParser {
225 reader,
226 _current_timestamp: timestamp,
227 _version: version,
228 dropout_details: DropoutStats {
229 total_drops: 0,
230 total_duration_ms: 0,
231 dropouts: Vec::new(),
232 },
233 header,
234 formats: HashMap::new(),
235 subscriptions: HashMap::new(),
236 logged_messages: Vec::new(),
237 logged_messages_tagged: HashMap::new(),
238 info_messages: HashMap::new(),
239 initial_params: HashMap::new(),
240 multi_messages: HashMap::new(),
241 default_params: HashMap::new(),
242 changed_params: HashMap::new(),
243 })
244 }
245
246 pub fn header(&self) -> &ULogHeader {
247 &self.header
248 }
249
250 fn _dump_next_bytes(&mut self, count: usize) -> Result<(), ULogError> {
251 let mut buf = vec![0u8; count];
252 self.reader.read_exact(&mut buf)?;
253 log::debug!("Next {} bytes: {:?}", count, buf);
254 Ok(())
255 }
256
257 pub fn read_message_header(&mut self) -> Result<MessageHeader, ULogError> {
258 let msg_size = self.reader.read_u16::<LittleEndian>()?;
259 let msg_type = self.reader.read_u8()?;
260 Ok(MessageHeader { msg_size, msg_type })
261 }
262
263 pub fn read_flag_bits(&mut self) -> Result<FlagBitsMessage, ULogError> {
264 let mut compat_flags = [0u8; 8];
265 let mut incompat_flags = [0u8; 8];
266 let mut appended_offsets = [0u64; 3];
267
268 self.reader.read_exact(&mut compat_flags)?;
269 self.reader.read_exact(&mut incompat_flags)?;
270
271 for offset in &mut appended_offsets {
272 *offset = self.reader.read_u64::<LittleEndian>()?;
273 }
274
275 if incompat_flags.iter().any(|&x| x != 0) {
277 return Err(ULogError::IncompatibleFlags(incompat_flags.to_vec()));
278 }
279
280 Ok(FlagBitsMessage {
281 compat_flags,
282 incompat_flags,
283 appended_offsets,
284 })
285 }
286
287 fn parse_type_string(type_str: &str) -> Result<(ULogType, Option<usize>), ULogError> {
288 let mut parts = type_str.split('[');
289 let base_type = parts.next().unwrap_or("");
290
291 let array_size = if let Some(size_str) = parts.next() {
292 Some(
294 size_str
295 .trim_end_matches(']')
296 .parse::<usize>()
297 .map_err(|_| ULogError::ParseError("Invalid array size".to_string()))?,
298 )
299 } else {
300 None
301 };
302
303 let value_type = match base_type {
304 "int8_t" => ULogType::Basic(ULogValueType::Int8),
306 "uint8_t" => ULogType::Basic(ULogValueType::UInt8),
307 "int16_t" => ULogType::Basic(ULogValueType::Int16),
308 "uint16_t" => ULogType::Basic(ULogValueType::UInt16),
309 "int32_t" => ULogType::Basic(ULogValueType::Int32),
310 "uint32_t" => ULogType::Basic(ULogValueType::UInt32),
311 "int64_t" => ULogType::Basic(ULogValueType::Int64),
312 "uint64_t" => ULogType::Basic(ULogValueType::UInt64),
313 "float" => ULogType::Basic(ULogValueType::Float),
314 "double" => ULogType::Basic(ULogValueType::Double),
315 "bool" => ULogType::Basic(ULogValueType::Bool),
316 "char" => ULogType::Basic(ULogValueType::Char),
317 _ => ULogType::Message(base_type.to_string()),
319 };
320
321 Ok((value_type, array_size))
322 }
323
324 fn read_typed_value(
326 &mut self,
327 value_type: &ULogValueType,
328 array_size: Option<usize>,
329 ) -> Result<ULogValue, ULogError> {
330 match (value_type, array_size) {
331 (ULogValueType::Int8, None) => Ok(ULogValue::Int8(self.reader.read_i8()?)),
333 (ULogValueType::UInt8, None) => Ok(ULogValue::UInt8(self.reader.read_u8()?)),
334 (ULogValueType::Int16, None) => {
335 Ok(ULogValue::Int16(self.reader.read_i16::<LittleEndian>()?))
336 }
337 (ULogValueType::UInt16, None) => {
338 Ok(ULogValue::UInt16(self.reader.read_u16::<LittleEndian>()?))
339 }
340 (ULogValueType::Int32, None) => {
341 Ok(ULogValue::Int32(self.reader.read_i32::<LittleEndian>()?))
342 }
343 (ULogValueType::UInt32, None) => {
344 Ok(ULogValue::UInt32(self.reader.read_u32::<LittleEndian>()?))
345 }
346 (ULogValueType::Int64, None) => {
347 Ok(ULogValue::Int64(self.reader.read_i64::<LittleEndian>()?))
348 }
349 (ULogValueType::UInt64, None) => {
350 Ok(ULogValue::UInt64(self.reader.read_u64::<LittleEndian>()?))
351 }
352 (ULogValueType::Float, None) => {
353 Ok(ULogValue::Float(self.reader.read_f32::<LittleEndian>()?))
354 }
355 (ULogValueType::Double, None) => {
356 Ok(ULogValue::Double(self.reader.read_f64::<LittleEndian>()?))
357 }
358 (ULogValueType::Bool, None) => Ok(ULogValue::Bool(self.reader.read_u8()? != 0)),
359 (ULogValueType::Char, None) => {
360 let c = self.reader.read_u8()? as char;
361 Ok(ULogValue::Char(c))
362 }
363
364 (ULogValueType::Bool, Some(size)) => {
366 let mut values = vec![0u8; size];
367 self.reader.read_exact(&mut values)?;
368 Ok(ULogValue::BoolArray(
369 values.iter().map(|&x| x != 0).collect(),
370 ))
371 }
372 (ULogValueType::UInt16, Some(size)) => {
373 let mut values = vec![0u16; size];
374 self.reader.read_u16_into::<LittleEndian>(&mut values)?;
375 Ok(ULogValue::UInt16Array(values))
376 }
377 (ULogValueType::UInt32, Some(size)) => {
378 let mut values = vec![0u32; size];
379 self.reader.read_u32_into::<LittleEndian>(&mut values)?;
380 Ok(ULogValue::UInt32Array(values))
381 }
382 (ULogValueType::Int8, Some(size)) => {
383 let mut values = vec![0i8; size];
384 self.reader.read_i8_into(&mut values)?;
385 Ok(ULogValue::Int8Array(values))
386 }
387 (ULogValueType::UInt8, Some(size)) => {
388 let mut values = vec![0u8; size];
389 self.reader.read_exact(&mut values)?;
390 Ok(ULogValue::UInt8Array(values))
391 }
392 (ULogValueType::Float, Some(size)) => {
393 let mut values = vec![0.0f32; size];
394 self.reader.read_f32_into::<LittleEndian>(&mut values)?;
395 Ok(ULogValue::FloatArray(values))
396 }
397 (ULogValueType::Char, Some(size)) => {
399 let mut bytes = vec![0u8; size];
400 self.reader.read_exact(&mut bytes)?;
401 let s = String::from_utf8_lossy(&bytes)
403 .trim_matches('\0')
404 .to_string();
405 Ok(ULogValue::CharArray(s))
406 }
407 ulog_value_type => {
408 log::error!("Unsupported type/size combination");
409 Err(ULogError::ParseError(format!(
410 "Invalid type/size combination: {:?}",
411 ulog_value_type
412 )))
413 }
414 }
415 }
416 fn read_string(&mut self, len: usize) -> Result<String, ULogError> {
417 let mut buf = vec![0u8; len];
418 self.reader.read_exact(&mut buf)?;
419 String::from_utf8(buf).map_err(|_| ULogError::InvalidString)
420 }
421
422 fn is_valid_message_type(msg_type: u8) -> bool {
424 let is_valid = matches!(
425 msg_type,
426 b'A' | b'R' | b'D' | b'I' | b'M' | b'P' | b'Q' | b'L' | b'C' | b'S' | b'O' );
438 if !is_valid {
439 log::warn!("Invalid message type: {}", msg_type);
440 }
441 is_valid
442 }
443
444 pub fn parse_definitions(&mut self) -> Result<(), ULogError> {
450 log::info!("Parsing definitions section");
451
452 if self._version > 0 {
454 let header = self.read_message_header()?;
455 log::debug!(
456 "Flag bits header: msg_size={}, msg_type={}",
457 header.msg_size,
458 header.msg_type as char
459 );
460 if header.msg_type != b'B' {
461 return Err(ULogError::InvalidMessageType(header.msg_type));
462 }
463 let _flag_bits = self.read_flag_bits()?;
464 }
465
466 loop {
468 let header = self.read_message_header()?;
469 log::debug!(
471 "Message header: size={}, type={}({:#x})",
472 header.msg_size,
473 header.msg_type as char,
474 header.msg_type
475 );
476
477 match header.msg_type {
478 b'I' => {
479 log::debug!("Handling Info message");
480 self.handle_info_message(&header)?
481 }
482 b'F' => {
483 log::debug!("Handling Format message");
484 self.handle_format_message(&header)?
485 }
486 b'P' => {
487 log::debug!("Handling Parameter message");
488 self.handle_initial_param(&header)?
489 }
490 b'A' => {
491 log::debug!("Handling Subscription message");
492 self.handle_subscription_message(&header)?;
493 break;
494 }
495 b'Q' => {
496 self.handle_default_parameter()?;
497 }
498 _ => {
499 log::debug!("Unknown message type: {}", header.msg_type as char);
500 let mut peek_buf = vec![0u8; std::cmp::min(16, header.msg_size as usize)];
502 self.reader.read_exact(&mut peek_buf)?;
503 log::debug!("Next {} bytes: {:?}", peek_buf.len(), peek_buf);
504
505 if header.msg_size > 16 {
506 let mut remainder = vec![0u8; header.msg_size as usize - 16];
507 self.reader.read_exact(&mut remainder)?;
508 }
509 }
510 }
511 }
512 Ok(())
513 }
514
515 pub fn parse_data(&mut self) -> Result<(), ULogError> {
522 loop {
523 match self.read_message_header() {
524 Ok(header) => {
525 if !Self::is_valid_message_type(header.msg_type) {
526 return Ok(());
527 }
528 if header.msg_size > MAX_MESSAGE_SIZE {
529 return Ok(());
530 }
531
532 match header.msg_type {
533 b'A' => self.handle_subscription_message(&header)?,
534 b'I' => self.handle_info_message(&header)?,
535 b'M' => self.handle_multi_message(&header)?,
536 b'L' => self.handle_logged_message(&header)?,
537 b'C' => self.handle_tagged_logged_message(&header)?,
538 b'D' => self.handle_data_message(&header)?,
539 b'O' => self.handle_dropout(&header)?,
540 b'P' => self.handle_parameter_change(&header)?,
541 b'R' => self.skip_message(&header)?,
543 b'S' => self.skip_message(&header)?,
545 b'Q' => {
546 self.handle_default_parameter()?;
547 }
548 _ => self.skip_message(&header)?,
549 }
550 }
551 Err(ULogError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
552 log::info!("Reached end of file");
553 break;
554 }
555 Err(e) => return Err(e),
556 }
557 }
558 Ok(())
559 }
560
561 fn skip_message(&mut self, header: &MessageHeader) -> Result<(), ULogError> {
562 let mut buf = vec![0u8; header.msg_size as usize];
563 self.reader.read_exact(&mut buf)?;
564 Ok(())
565 }
566
567 #[allow(dead_code)]
568 pub fn parse_reader(reader: R) -> Result<ULogParser<R>, ULogError> {
574 let mut parser = ULogParser::new(reader)?;
575 parser.parse_definitions()?;
576 parser.parse_data()?;
577 Ok(parser)
578 }
579
580 pub fn last_timestamp(&self) -> u64 {
581 self._current_timestamp
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 use std::io::Cursor;
589
590 #[test]
591 fn test_parse_header() {
592 let mut data = vec![];
593 data.extend_from_slice(&[0x55, 0x4C, 0x6F, 0x67, 0x01, 0x12, 0x35]);
595 data.push(1);
597 data.extend_from_slice(&[0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]);
599
600 let parser = ULogParser::new(Cursor::new(data)).unwrap();
601 assert_eq!(parser.header.version, 1);
602 assert_eq!(parser.header.timestamp, 0x7766554433221100);
603 }
604}