ulog_rs/
data_message.rs

1use std::io::Read;
2
3use byteorder::{LittleEndian, ReadBytesExt};
4
5use crate::{
6    format_message::FormatMessage, MessageHeader, ULogError, ULogParser, ULogType, ULogValue,
7};
8
9/// A struct representing the result of parsing a nested message in a ULog data message.
10/// The `data` field contains the parsed ULog values, and `bytes_read` indicates the number of bytes consumed.
11struct NestedMessageResult {
12    data: Vec<ULogValue>,
13    bytes_read: usize,
14}
15
16#[derive(Debug)]
17/// A struct representing a data message in a ULog file.
18/// The `msg_id` field contains the message ID, `time_us` contains the timestamp in microseconds,
19/// and `data` contains the parsed ULog values for the message.
20pub struct DataMessage {
21    msg_id: u16,
22    time_us: u64,
23    data: Vec<ULogValue>,
24}
25
26impl<R: Read> ULogParser<R> {
27    /// Reads a data message from the ULog file and returns a `DataMessage` struct containing the parsed data.
28    ///
29    /// This function takes the message ID, message size, and the format of the message as input. It then reads the message data, parsing the values according to the specified format, and returns a `DataMessage` struct containing the parsed data.
30    ///
31    /// The function handles padding fields, nested messages, and any remaining bytes in the message. It also updates the current timestamp if a new timestamp value is present in the message data.
32    ///
33    /// # Arguments
34    /// * `msg_id` - The ID of the message to be read.
35    /// * `msg_size` - The size of the message in bytes.
36    /// * `format` - The format of the message, as a `FormatMessage` struct.
37    ///
38    /// # Returns
39    /// A `Result` containing a `DataMessage` struct with the parsed data, or a `ULogError` if there was an error parsing the message.
40    pub fn read_data_message(
41        &mut self,
42        msg_id: u16,
43        msg_size: u16,
44        format: &FormatMessage,
45    ) -> Result<DataMessage, ULogError> {
46        let mut data: Vec<ULogValue> = Vec::new();
47        let mut bytes_read = 2; // Account for msg_id that was already read
48
49        let has_trailing_padding = format
50            .fields
51            .last()
52            .map(|f| f.field_name.starts_with("_padding"))
53            .unwrap_or(false);
54
55        for (i, field) in format.fields.iter().enumerate() {
56            // Skip trailing padding field
57            if has_trailing_padding
58                && i == format.fields.len() - 1
59                && field.field_name.starts_with("_padding")
60            {
61                continue;
62            }
63            // Handle padding fields
64            if field.field_name.starts_with("_padding") {
65                let padding_size = if let Some(size) = field.array_size {
66                    size
67                } else {
68                    match field.field_type.as_str() {
69                        "uint8_t" => 1,
70                        "uint16_t" => 2,
71                        "uint32_t" => 4,
72                        "uint64_t" => 8,
73                        _ => 1,
74                    }
75                };
76                let mut padding = vec![0u8; padding_size];
77                self.reader.read_exact(&mut padding)?;
78                bytes_read += padding_size;
79                continue;
80            }
81            let (mut type_info, array_size) = Self::parse_type_string(&field.field_type)?;
82            // type_info.array_size = field.array_size;
83
84            let (value, field_bytes) = match type_info.clone() {
85                ULogType::Basic(value_type) => {
86                    let value = self.read_typed_value(&value_type, field.array_size)?;
87                    let bytes = match &value {
88                        ULogValue::BoolArray(v) => v.len(),
89                        ULogValue::CharArray(s) => s.len(),
90                        ULogValue::DoubleArray(v) => v.len() * 8,
91                        ULogValue::FloatArray(v) => v.len() * 4,
92                        ULogValue::Int16(_) | ULogValue::UInt16(_) => 2,
93                        ULogValue::Int16Array(v) => v.len() * 2,
94                        ULogValue::Int32(_) | ULogValue::UInt32(_) | ULogValue::Float(_) => 4,
95                        ULogValue::Int32Array(v) => v.len() * 4,
96                        ULogValue::Int64(_) | ULogValue::UInt64(_) | ULogValue::Double(_) => 8,
97                        ULogValue::Int64Array(v) => v.len() * 8,
98                        ULogValue::Int8(_)
99                        | ULogValue::UInt8(_)
100                        | ULogValue::Bool(_)
101                        | ULogValue::Char(_) => 1,
102                        ULogValue::Int8Array(v) => v.len(),
103                        ULogValue::UInt8Array(v) => v.len(),
104                        ULogValue::UInt16Array(v) => v.len() * 2,
105                        ULogValue::UInt32Array(v) => v.len() * 4,
106                        ULogValue::UInt64Array(v) => v.len() * 8,
107                        _ => 0,
108                    };
109                    (value, bytes)
110                }
111                ULogType::Message(msg_type) => {
112                    let nested_format = self
113                        .formats
114                        .get(&msg_type)
115                        .ok_or_else(|| {
116                            ULogError::ParseError(format!("Unknown message type: {}", msg_type))
117                        })?
118                        .clone();
119
120                    if let Some(size) = array_size {
121                        let mut array_values = Vec::with_capacity(size);
122                        let mut total_bytes = 0;
123                        for _ in 0..size {
124                            let result = self.read_nested_message(&nested_format)?;
125                            total_bytes += result.bytes_read;
126                            array_values.push(result.data);
127                        }
128                        (ULogValue::MessageArray(array_values), total_bytes)
129                    } else {
130                        let result = self.read_nested_message(&nested_format)?;
131                        (ULogValue::Message(result.data), result.bytes_read)
132                    }
133                }
134            };
135
136            bytes_read += field_bytes;
137            data.push(value);
138        }
139
140        // Handle any remaining bytes in the message
141        if bytes_read < msg_size as usize {
142            let remaining = msg_size as usize - bytes_read;
143            let mut remaining_bytes = vec![0u8; remaining];
144            self.reader.read_exact(&mut remaining_bytes)?;
145        } else if bytes_read > msg_size as usize {
146            return Err(ULogError::ParseError(format!(
147                "Read too many bytes: {} > {} for message {}",
148                bytes_read, msg_size, format.name
149            )));
150        }
151        let timestamp = if let Some(ULogValue::UInt64(ts)) = data.first() {
152            *ts
153        } else {
154            0
155        };
156        // Don't update the timestamp if there wasn't a new timestamp value
157        if timestamp > 0 {
158            self._current_timestamp = timestamp
159        }
160        Ok(DataMessage {
161            msg_id,
162            time_us: timestamp,
163            data,
164        })
165    }
166
167    pub fn handle_data_message(&mut self, header: &MessageHeader) -> Result<(), ULogError> {
168        let msg_id = self.reader.read_u16::<LittleEndian>()?;
169        let format_name = self
170            .subscriptions
171            .get(&msg_id)
172            .ok_or_else(|| ULogError::ParseError(format!("Unknown msg_id: {}", msg_id)))?
173            .message_name
174            .clone();
175        let format = self
176            .formats
177            .get(&format_name)
178            .ok_or_else(|| ULogError::ParseError(format!("Unknown format: {}", format_name)))?
179            .clone();
180        let data = self.read_data_message(msg_id, header.msg_size, &format)?;
181
182        self.subscriptions
183            .get_mut(&msg_id)
184            .unwrap()
185            .insert_data(data.data);
186        Ok(())
187    }
188
189    /// Reads a nested message from the ULog data stream.
190    ///
191    /// This function reads a nested message from the ULog data stream, based on the provided `FormatMessage` structure.
192    /// It recursively reads the nested fields, handling both basic types and nested message types.
193    /// The function returns a `NestedMessageResult` containing the read data and the total number of bytes read.
194    fn read_nested_message(
195        &mut self,
196        format: &FormatMessage,
197    ) -> Result<NestedMessageResult, ULogError> {
198        let mut nested_data = Vec::new();
199        let mut total_bytes_read = 0;
200
201        for field in &format.fields {
202            // Skip padding fields in nested messages
203            if field.field_name.starts_with("_padding") {
204                continue;
205            }
206
207            let field_array_size = field.array_size.unwrap_or(1);
208
209            let (type_info, array_size) = Self::parse_type_string(&field.field_type)?;
210
211            let (value, bytes) = match type_info {
212                ULogType::Basic(value_type) => {
213                    let value = self.read_typed_value(&value_type, field.array_size)?;
214                    let bytes = match &value {
215                        ULogValue::BoolArray(_) => field_array_size,
216                        ULogValue::CharArray(s) => s.len(),
217                        ULogValue::DoubleArray(_) => field_array_size * 8,
218                        ULogValue::FloatArray(_) => field_array_size * 4,
219                        ULogValue::Int16(_) | ULogValue::UInt16(_) => 2,
220                        ULogValue::Int16Array(_) => field_array_size * 2,
221                        ULogValue::Int32(_) | ULogValue::UInt32(_) | ULogValue::Float(_) => 4,
222                        ULogValue::Int32Array(_) => field_array_size * 4,
223                        ULogValue::Int64(_) | ULogValue::UInt64(_) | ULogValue::Double(_) => 8,
224                        ULogValue::Int64Array(_) => field_array_size * 8,
225                        ULogValue::Int8(_)
226                        | ULogValue::UInt8(_)
227                        | ULogValue::Bool(_)
228                        | ULogValue::Char(_) => 1,
229                        ULogValue::Int8Array(_) => field_array_size,
230                        ULogValue::UInt8Array(_) => field_array_size,
231                        ULogValue::UInt16Array(_) => field_array_size * 2,
232                        ULogValue::UInt32Array(_) => field_array_size * 4,
233                        ULogValue::UInt64Array(_) => field_array_size * 8,
234                        _ => 0, // Should never happen for basic types
235                    };
236                    (value, bytes)
237                }
238                ULogType::Message(msg_type) => {
239                    let nested_format = self
240                        .formats
241                        .get(&msg_type)
242                        .ok_or_else(|| {
243                            ULogError::ParseError(format!("Unknown message type: {}", msg_type))
244                        })?
245                        .clone();
246
247                    if let Some(size) = field.array_size {
248                        let mut array_values = Vec::with_capacity(size);
249                        let mut array_bytes = 0;
250                        for _ in 0..size {
251                            let result = self.read_nested_message(&nested_format)?;
252                            array_bytes += result.bytes_read;
253                            array_values.push(result.data);
254                        }
255                        (ULogValue::MessageArray(array_values), array_bytes)
256                    } else {
257                        let result = self.read_nested_message(&nested_format)?;
258                        (ULogValue::Message(result.data), result.bytes_read)
259                    }
260                }
261            };
262            total_bytes_read += bytes;
263            nested_data.push(value);
264        }
265        Ok(NestedMessageResult {
266            data: nested_data,
267            bytes_read: total_bytes_read,
268        })
269    }
270}