Skip to main content

zerodds_recorder/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! `RecordReader` — parsed `.zddsrec`-Streams in einen `Header` plus
5//! eine Sequenz von [`crate::format::Frame`].
6//!
7//! Liest aus einem `&[u8]`-Buffer (alles im Speicher). Streaming
8//! `std::io::Read` ist als additive Major-2.0-Erweiterung vorgesehen.
9
10use alloc::string::String;
11use alloc::vec::Vec;
12use core::fmt;
13
14use crate::format::{
15    FRAME_MAGIC, Frame, FrameView, Header, ParticipantEntry, SampleKind, TopicEntry, ZDDSREC_MAGIC,
16    ZDDSREC_VERSION,
17};
18
19/// Fehler beim Lesen.
20#[derive(Debug)]
21pub enum ReadError {
22    /// File-Magic stimmt nicht — kein `.zddsrec`.
23    BadMagic,
24    /// Format-Version ueber dem unterstuetzten Bereich.
25    UnsupportedVersion(u32),
26    /// Stream zu kurz fuer das erwartete Feld.
27    Truncated {
28        /// Welches Feld.
29        what: &'static str,
30        /// Wie viele Bytes erwartet.
31        need: usize,
32        /// Wie viele uebrig.
33        have: usize,
34    },
35    /// String enthaelt kein gueltiges UTF-8.
36    InvalidUtf8 {
37        /// Welches Feld.
38        what: &'static str,
39    },
40    /// Sample-Kind-Byte ist ausserhalb {0,1,2}.
41    BadSampleKind(u8),
42    /// Frame-Magic stimmt nicht.
43    BadFrameMagic(u8),
44}
45
46impl fmt::Display for ReadError {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        match self {
49            Self::BadMagic => write!(f, "bad file magic (not a zddsrec)"),
50            Self::UnsupportedVersion(v) => write!(f, "unsupported zddsrec version: {v}"),
51            Self::Truncated { what, need, have } => {
52                write!(f, "truncated at {what}: need {need} bytes, have {have}")
53            }
54            Self::InvalidUtf8 { what } => write!(f, "invalid utf-8 in {what}"),
55            Self::BadSampleKind(b) => write!(f, "unknown sample-kind byte {b}"),
56            Self::BadFrameMagic(b) => write!(f, "expected frame-magic 'F', got 0x{b:02x}"),
57        }
58    }
59}
60
61impl std::error::Error for ReadError {}
62
63/// Reader fuer `.zddsrec`-Buffer.
64pub struct RecordReader<'a> {
65    bytes: &'a [u8],
66    cursor: usize,
67}
68
69impl<'a> RecordReader<'a> {
70    /// Erzeugt einen Reader. `parse_header` muss zuerst aufgerufen
71    /// werden, danach `next_frame` bis None.
72    pub fn new(bytes: &'a [u8]) -> Self {
73        Self { bytes, cursor: 0 }
74    }
75
76    fn need(&self, what: &'static str, n: usize) -> Result<&'a [u8], ReadError> {
77        if self.cursor + n > self.bytes.len() {
78            return Err(ReadError::Truncated {
79                what,
80                need: n,
81                have: self.bytes.len().saturating_sub(self.cursor),
82            });
83        }
84        Ok(&self.bytes[self.cursor..self.cursor + n])
85    }
86
87    fn read_u32(&mut self, what: &'static str) -> Result<u32, ReadError> {
88        let s = self.need(what, 4)?;
89        // `need(.., 4)` garantiert 4 Bytes; try_into ist deshalb
90        // unfehlbar — wir mappen den theoretischen Err-Pfad trotzdem
91        // explizit, statt expect/panic im Runtime-Pfad.
92        let arr: [u8; 4] = s.try_into().map_err(|_| ReadError::Truncated {
93            what,
94            need: 4,
95            have: s.len(),
96        })?;
97        let v = u32::from_le_bytes(arr);
98        self.cursor += 4;
99        Ok(v)
100    }
101
102    fn read_u64(&mut self, what: &'static str) -> Result<u64, ReadError> {
103        let s = self.need(what, 8)?;
104        let arr: [u8; 8] = s.try_into().map_err(|_| ReadError::Truncated {
105            what,
106            need: 8,
107            have: s.len(),
108        })?;
109        let v = u64::from_le_bytes(arr);
110        self.cursor += 8;
111        Ok(v)
112    }
113
114    fn read_i64(&mut self, what: &'static str) -> Result<i64, ReadError> {
115        Ok(self.read_u64(what)? as i64)
116    }
117
118    fn read_u8(&mut self, what: &'static str) -> Result<u8, ReadError> {
119        let s = self.need(what, 1)?;
120        let v = s[0];
121        self.cursor += 1;
122        Ok(v)
123    }
124
125    fn read_string(&mut self, what: &'static str) -> Result<String, ReadError> {
126        let len = self.read_u32(what)? as usize;
127        let s = self.need(what, len)?;
128        let owned = core::str::from_utf8(s)
129            .map_err(|_| ReadError::InvalidUtf8 { what })?
130            .to_string();
131        self.cursor += len;
132        Ok(owned)
133    }
134
135    fn read_bytes(&mut self, what: &'static str, n: usize) -> Result<&'a [u8], ReadError> {
136        let s = self.need(what, n)?;
137        self.cursor += n;
138        Ok(s)
139    }
140
141    /// Parsed den Header. Cursor steht danach am ersten Frame.
142    ///
143    /// # Errors
144    /// [`ReadError::BadMagic`], `UnsupportedVersion`, `Truncated`.
145    pub fn parse_header(&mut self) -> Result<Header, ReadError> {
146        let magic = self.read_bytes("magic", 4)?;
147        if magic != ZDDSREC_MAGIC {
148            return Err(ReadError::BadMagic);
149        }
150        let version = self.read_u32("version")?;
151        if version > ZDDSREC_VERSION {
152            return Err(ReadError::UnsupportedVersion(version));
153        }
154        let time_base = self.read_i64("time_base")?;
155        let pn = self.read_u32("participant_count")? as usize;
156        let tn = self.read_u32("topic_count")? as usize;
157        let mut participants = Vec::with_capacity(pn);
158        for _ in 0..pn {
159            let guid_slice = self.read_bytes("participant_guid", 16)?;
160            let mut guid = [0u8; 16];
161            guid.copy_from_slice(guid_slice);
162            let name = self.read_string("participant_name")?;
163            participants.push(ParticipantEntry { guid, name });
164        }
165        let mut topics = Vec::with_capacity(tn);
166        for _ in 0..tn {
167            let type_name = self.read_string("topic_type_name")?;
168            let name = self.read_string("topic_name")?;
169            topics.push(TopicEntry { name, type_name });
170        }
171        Ok(Header {
172            time_base_unix_ns: time_base,
173            participants,
174            topics,
175        })
176    }
177
178    /// Liest den naechsten Frame. Returnt `Ok(None)` wenn EOF erreicht.
179    ///
180    /// # Errors
181    /// `BadFrameMagic`, `Truncated`, `BadSampleKind`.
182    pub fn next_frame_view(&mut self) -> Result<Option<FrameView<'a>>, ReadError> {
183        if self.cursor >= self.bytes.len() {
184            return Ok(None);
185        }
186        let magic = self.read_u8("frame_magic")?;
187        if magic != FRAME_MAGIC {
188            return Err(ReadError::BadFrameMagic(magic));
189        }
190        let ts = self.read_i64("frame_timestamp")?;
191        let pidx = self.read_u32("frame_participant_idx")?;
192        let tidx = self.read_u32("frame_topic_idx")?;
193        let kind_byte = self.read_u8("frame_sample_kind")?;
194        let sample_kind =
195            SampleKind::from_u8(kind_byte).ok_or(ReadError::BadSampleKind(kind_byte))?;
196        let plen = self.read_u32("frame_payload_len")? as usize;
197        let payload = self.read_bytes("frame_payload", plen)?;
198        Ok(Some(FrameView {
199            timestamp_delta_ns: ts,
200            participant_idx: pidx,
201            topic_idx: tidx,
202            sample_kind,
203            payload,
204        }))
205    }
206
207    /// Bequem: gibt einen owned [`Frame`] zurueck.
208    ///
209    /// # Errors
210    /// Wie [`RecordReader::next_frame_view`].
211    pub fn next_frame(&mut self) -> Result<Option<Frame>, ReadError> {
212        Ok(self.next_frame_view()?.map(|v| v.to_owned()))
213    }
214}
215
216#[cfg(test)]
217#[allow(clippy::unwrap_used)] // tests duerfen unwrap nutzen.
218mod tests {
219    use super::*;
220    use crate::writer::{header_with, write_all};
221
222    fn alloc_string(s: &str) -> String {
223        s.to_string()
224    }
225
226    #[test]
227    fn header_roundtrip() {
228        let h = header_with(
229            1_700_000_000_000_000_000,
230            vec![
231                (alloc_string("talker"), [1u8; 16]),
232                (alloc_string("listener"), [2u8; 16]),
233            ],
234            vec![(
235                alloc_string("/chatter"),
236                alloc_string("std_msgs::msg::String"),
237            )],
238        );
239        let w = write_all(Vec::<u8>::new(), &h, std::iter::empty()).unwrap();
240        let bytes = w.into_inner();
241        let mut r = RecordReader::new(&bytes);
242        let parsed = r.parse_header().unwrap();
243        assert_eq!(parsed, h);
244        assert!(r.next_frame_view().unwrap().is_none());
245    }
246
247    #[test]
248    fn frame_roundtrip() {
249        let h = header_with(
250            0,
251            vec![(alloc_string("p"), [1u8; 16])],
252            vec![(alloc_string("/t"), alloc_string("T"))],
253        );
254        let frames: Vec<Frame> = (0..3u8)
255            .map(|i| Frame {
256                timestamp_delta_ns: (i as i64) * 1_000_000,
257                participant_idx: 0,
258                topic_idx: 0,
259                sample_kind: SampleKind::Alive,
260                payload: vec![i, i + 1, i + 2, 0xab],
261            })
262            .collect();
263        let w = write_all(Vec::<u8>::new(), &h, frames.clone()).unwrap();
264        let bytes = w.into_inner();
265        let mut r = RecordReader::new(&bytes);
266        let _ = r.parse_header().unwrap();
267        let mut got = Vec::new();
268        while let Some(f) = r.next_frame().unwrap() {
269            got.push(f);
270        }
271        assert_eq!(got, frames);
272    }
273
274    #[test]
275    fn bad_magic_rejected() {
276        let bytes = [0u8; 32];
277        let mut r = RecordReader::new(&bytes);
278        let res = r.parse_header();
279        assert!(matches!(res, Err(ReadError::BadMagic)));
280    }
281
282    #[test]
283    fn truncated_header_detected() {
284        let bytes = b"ZDD".as_slice(); // 3 bytes, magic needs 4
285        let mut r = RecordReader::new(bytes);
286        assert!(matches!(r.parse_header(), Err(ReadError::Truncated { .. })));
287    }
288
289    #[test]
290    fn truncated_frame_detected() {
291        let h = header_with(
292            0,
293            vec![(alloc_string("p"), [1u8; 16])],
294            vec![(alloc_string("/t"), alloc_string("T"))],
295        );
296        let mut buf = Vec::new();
297        h.write(&mut buf);
298        // Add half a frame.
299        buf.push(b'F');
300        buf.extend_from_slice(&0i64.to_le_bytes()); // timestamp ok
301        buf.extend_from_slice(&0u32.to_le_bytes()); // participant
302        // missing topic + sample_kind + payload-len
303        let mut r = RecordReader::new(&buf);
304        let _ = r.parse_header().unwrap();
305        assert!(matches!(
306            r.next_frame_view(),
307            Err(ReadError::Truncated { .. })
308        ));
309    }
310
311    #[test]
312    fn unsupported_version_rejected() {
313        let mut buf = Vec::new();
314        buf.extend_from_slice(&ZDDSREC_MAGIC);
315        buf.extend_from_slice(&999u32.to_le_bytes());
316        buf.extend_from_slice(&0i64.to_le_bytes());
317        buf.extend_from_slice(&0u32.to_le_bytes());
318        buf.extend_from_slice(&0u32.to_le_bytes());
319        let mut r = RecordReader::new(&buf);
320        assert!(matches!(
321            r.parse_header(),
322            Err(ReadError::UnsupportedVersion(999))
323        ));
324    }
325
326    #[test]
327    fn bad_sample_kind_rejected() {
328        let h = header_with(
329            0,
330            vec![(alloc_string("p"), [0u8; 16])],
331            vec![(alloc_string("/t"), alloc_string("T"))],
332        );
333        let mut buf = Vec::new();
334        h.write(&mut buf);
335        buf.push(b'F');
336        buf.extend_from_slice(&0i64.to_le_bytes());
337        buf.extend_from_slice(&0u32.to_le_bytes());
338        buf.extend_from_slice(&0u32.to_le_bytes());
339        buf.push(99); // invalid sample-kind
340        buf.extend_from_slice(&0u32.to_le_bytes());
341        let mut r = RecordReader::new(&buf);
342        let _ = r.parse_header().unwrap();
343        assert!(matches!(
344            r.next_frame_view(),
345            Err(ReadError::BadSampleKind(99))
346        ));
347    }
348}