1use alloc::string::String;
11use alloc::vec::Vec;
12use core::fmt;
13
14use crate::format::{
15 FRAME_MAGIC, Frame, FrameView, Header, ParticipantEntry, SampleKind, TopicEntry, ZDDSREC_MAGIC,
16 ZDDSREC_VERSION,
17};
18
19#[derive(Debug)]
21pub enum ReadError {
22 BadMagic,
24 UnsupportedVersion(u32),
26 Truncated {
28 what: &'static str,
30 need: usize,
32 have: usize,
34 },
35 InvalidUtf8 {
37 what: &'static str,
39 },
40 BadSampleKind(u8),
42 BadFrameMagic(u8),
44}
45
46impl fmt::Display for ReadError {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 match self {
49 Self::BadMagic => write!(f, "bad file magic (not a zddsrec)"),
50 Self::UnsupportedVersion(v) => write!(f, "unsupported zddsrec version: {v}"),
51 Self::Truncated { what, need, have } => {
52 write!(f, "truncated at {what}: need {need} bytes, have {have}")
53 }
54 Self::InvalidUtf8 { what } => write!(f, "invalid utf-8 in {what}"),
55 Self::BadSampleKind(b) => write!(f, "unknown sample-kind byte {b}"),
56 Self::BadFrameMagic(b) => write!(f, "expected frame-magic 'F', got 0x{b:02x}"),
57 }
58 }
59}
60
61impl std::error::Error for ReadError {}
62
63pub struct RecordReader<'a> {
65 bytes: &'a [u8],
66 cursor: usize,
67}
68
69impl<'a> RecordReader<'a> {
70 pub fn new(bytes: &'a [u8]) -> Self {
73 Self { bytes, cursor: 0 }
74 }
75
76 fn need(&self, what: &'static str, n: usize) -> Result<&'a [u8], ReadError> {
77 if self.cursor + n > self.bytes.len() {
78 return Err(ReadError::Truncated {
79 what,
80 need: n,
81 have: self.bytes.len().saturating_sub(self.cursor),
82 });
83 }
84 Ok(&self.bytes[self.cursor..self.cursor + n])
85 }
86
87 fn read_u32(&mut self, what: &'static str) -> Result<u32, ReadError> {
88 let s = self.need(what, 4)?;
89 let arr: [u8; 4] = s.try_into().map_err(|_| ReadError::Truncated {
93 what,
94 need: 4,
95 have: s.len(),
96 })?;
97 let v = u32::from_le_bytes(arr);
98 self.cursor += 4;
99 Ok(v)
100 }
101
102 fn read_u64(&mut self, what: &'static str) -> Result<u64, ReadError> {
103 let s = self.need(what, 8)?;
104 let arr: [u8; 8] = s.try_into().map_err(|_| ReadError::Truncated {
105 what,
106 need: 8,
107 have: s.len(),
108 })?;
109 let v = u64::from_le_bytes(arr);
110 self.cursor += 8;
111 Ok(v)
112 }
113
114 fn read_i64(&mut self, what: &'static str) -> Result<i64, ReadError> {
115 Ok(self.read_u64(what)? as i64)
116 }
117
118 fn read_u8(&mut self, what: &'static str) -> Result<u8, ReadError> {
119 let s = self.need(what, 1)?;
120 let v = s[0];
121 self.cursor += 1;
122 Ok(v)
123 }
124
125 fn read_string(&mut self, what: &'static str) -> Result<String, ReadError> {
126 let len = self.read_u32(what)? as usize;
127 let s = self.need(what, len)?;
128 let owned = core::str::from_utf8(s)
129 .map_err(|_| ReadError::InvalidUtf8 { what })?
130 .to_string();
131 self.cursor += len;
132 Ok(owned)
133 }
134
135 fn read_bytes(&mut self, what: &'static str, n: usize) -> Result<&'a [u8], ReadError> {
136 let s = self.need(what, n)?;
137 self.cursor += n;
138 Ok(s)
139 }
140
141 pub fn parse_header(&mut self) -> Result<Header, ReadError> {
146 let magic = self.read_bytes("magic", 4)?;
147 if magic != ZDDSREC_MAGIC {
148 return Err(ReadError::BadMagic);
149 }
150 let version = self.read_u32("version")?;
151 if version > ZDDSREC_VERSION {
152 return Err(ReadError::UnsupportedVersion(version));
153 }
154 let time_base = self.read_i64("time_base")?;
155 let pn = self.read_u32("participant_count")? as usize;
156 let tn = self.read_u32("topic_count")? as usize;
157 let mut participants = Vec::with_capacity(pn);
158 for _ in 0..pn {
159 let guid_slice = self.read_bytes("participant_guid", 16)?;
160 let mut guid = [0u8; 16];
161 guid.copy_from_slice(guid_slice);
162 let name = self.read_string("participant_name")?;
163 participants.push(ParticipantEntry { guid, name });
164 }
165 let mut topics = Vec::with_capacity(tn);
166 for _ in 0..tn {
167 let type_name = self.read_string("topic_type_name")?;
168 let name = self.read_string("topic_name")?;
169 topics.push(TopicEntry { name, type_name });
170 }
171 Ok(Header {
172 time_base_unix_ns: time_base,
173 participants,
174 topics,
175 })
176 }
177
178 pub fn next_frame_view(&mut self) -> Result<Option<FrameView<'a>>, ReadError> {
183 if self.cursor >= self.bytes.len() {
184 return Ok(None);
185 }
186 let magic = self.read_u8("frame_magic")?;
187 if magic != FRAME_MAGIC {
188 return Err(ReadError::BadFrameMagic(magic));
189 }
190 let ts = self.read_i64("frame_timestamp")?;
191 let pidx = self.read_u32("frame_participant_idx")?;
192 let tidx = self.read_u32("frame_topic_idx")?;
193 let kind_byte = self.read_u8("frame_sample_kind")?;
194 let sample_kind =
195 SampleKind::from_u8(kind_byte).ok_or(ReadError::BadSampleKind(kind_byte))?;
196 let plen = self.read_u32("frame_payload_len")? as usize;
197 let payload = self.read_bytes("frame_payload", plen)?;
198 Ok(Some(FrameView {
199 timestamp_delta_ns: ts,
200 participant_idx: pidx,
201 topic_idx: tidx,
202 sample_kind,
203 payload,
204 }))
205 }
206
207 pub fn next_frame(&mut self) -> Result<Option<Frame>, ReadError> {
212 Ok(self.next_frame_view()?.map(|v| v.to_owned()))
213 }
214}
215
216#[cfg(test)]
217#[allow(clippy::unwrap_used)] mod tests {
219 use super::*;
220 use crate::writer::{header_with, write_all};
221
222 fn alloc_string(s: &str) -> String {
223 s.to_string()
224 }
225
226 #[test]
227 fn header_roundtrip() {
228 let h = header_with(
229 1_700_000_000_000_000_000,
230 vec![
231 (alloc_string("talker"), [1u8; 16]),
232 (alloc_string("listener"), [2u8; 16]),
233 ],
234 vec![(
235 alloc_string("/chatter"),
236 alloc_string("std_msgs::msg::String"),
237 )],
238 );
239 let w = write_all(Vec::<u8>::new(), &h, std::iter::empty()).unwrap();
240 let bytes = w.into_inner();
241 let mut r = RecordReader::new(&bytes);
242 let parsed = r.parse_header().unwrap();
243 assert_eq!(parsed, h);
244 assert!(r.next_frame_view().unwrap().is_none());
245 }
246
247 #[test]
248 fn frame_roundtrip() {
249 let h = header_with(
250 0,
251 vec![(alloc_string("p"), [1u8; 16])],
252 vec![(alloc_string("/t"), alloc_string("T"))],
253 );
254 let frames: Vec<Frame> = (0..3u8)
255 .map(|i| Frame {
256 timestamp_delta_ns: (i as i64) * 1_000_000,
257 participant_idx: 0,
258 topic_idx: 0,
259 sample_kind: SampleKind::Alive,
260 payload: vec![i, i + 1, i + 2, 0xab],
261 })
262 .collect();
263 let w = write_all(Vec::<u8>::new(), &h, frames.clone()).unwrap();
264 let bytes = w.into_inner();
265 let mut r = RecordReader::new(&bytes);
266 let _ = r.parse_header().unwrap();
267 let mut got = Vec::new();
268 while let Some(f) = r.next_frame().unwrap() {
269 got.push(f);
270 }
271 assert_eq!(got, frames);
272 }
273
274 #[test]
275 fn bad_magic_rejected() {
276 let bytes = [0u8; 32];
277 let mut r = RecordReader::new(&bytes);
278 let res = r.parse_header();
279 assert!(matches!(res, Err(ReadError::BadMagic)));
280 }
281
282 #[test]
283 fn truncated_header_detected() {
284 let bytes = b"ZDD".as_slice(); let mut r = RecordReader::new(bytes);
286 assert!(matches!(r.parse_header(), Err(ReadError::Truncated { .. })));
287 }
288
289 #[test]
290 fn truncated_frame_detected() {
291 let h = header_with(
292 0,
293 vec![(alloc_string("p"), [1u8; 16])],
294 vec![(alloc_string("/t"), alloc_string("T"))],
295 );
296 let mut buf = Vec::new();
297 h.write(&mut buf);
298 buf.push(b'F');
300 buf.extend_from_slice(&0i64.to_le_bytes()); buf.extend_from_slice(&0u32.to_le_bytes()); let mut r = RecordReader::new(&buf);
304 let _ = r.parse_header().unwrap();
305 assert!(matches!(
306 r.next_frame_view(),
307 Err(ReadError::Truncated { .. })
308 ));
309 }
310
311 #[test]
312 fn unsupported_version_rejected() {
313 let mut buf = Vec::new();
314 buf.extend_from_slice(&ZDDSREC_MAGIC);
315 buf.extend_from_slice(&999u32.to_le_bytes());
316 buf.extend_from_slice(&0i64.to_le_bytes());
317 buf.extend_from_slice(&0u32.to_le_bytes());
318 buf.extend_from_slice(&0u32.to_le_bytes());
319 let mut r = RecordReader::new(&buf);
320 assert!(matches!(
321 r.parse_header(),
322 Err(ReadError::UnsupportedVersion(999))
323 ));
324 }
325
326 #[test]
327 fn bad_sample_kind_rejected() {
328 let h = header_with(
329 0,
330 vec![(alloc_string("p"), [0u8; 16])],
331 vec![(alloc_string("/t"), alloc_string("T"))],
332 );
333 let mut buf = Vec::new();
334 h.write(&mut buf);
335 buf.push(b'F');
336 buf.extend_from_slice(&0i64.to_le_bytes());
337 buf.extend_from_slice(&0u32.to_le_bytes());
338 buf.extend_from_slice(&0u32.to_le_bytes());
339 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
341 let mut r = RecordReader::new(&buf);
342 let _ = r.parse_header().unwrap();
343 assert!(matches!(
344 r.next_frame_view(),
345 Err(ReadError::BadSampleKind(99))
346 ));
347 }
348}