Skip to main content

zerodds_recorder/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! `RecordWriter` — schreibt einen `.zddsrec`-Stream in einen
5//! `std::io::Write`-Sink.
6
7use alloc::string::String;
8use alloc::vec::Vec;
9use core::fmt;
10
11use crate::format::{Frame, Header, ParticipantEntry, TopicEntry};
12
13/// Fehler beim Schreiben.
14#[derive(Debug)]
15pub enum WriteError {
16    /// I/O-Fehler vom Sink.
17    Io(std::io::Error),
18    /// Header bereits geschrieben — Frames duerfen folgen.
19    HeaderAlreadyWritten,
20    /// Frame ohne vorhergehendem Header.
21    HeaderMissing,
22    /// Ungueltiger Index im Frame (uebersteigt Header-Range).
23    OutOfRangeIdx {
24        /// Was der Frame referenzierte.
25        idx: u32,
26        /// Wieviele Eintraege der Header bietet.
27        len: u32,
28        /// Welches Feld ("participant" oder "topic").
29        field: &'static str,
30    },
31}
32
33impl fmt::Display for WriteError {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        match self {
36            Self::Io(e) => write!(f, "io: {e}"),
37            Self::HeaderAlreadyWritten => write!(f, "header already written"),
38            Self::HeaderMissing => write!(f, "frame written before header"),
39            Self::OutOfRangeIdx { idx, len, field } => {
40                write!(f, "{field}_idx {idx} >= {field}_count {len}")
41            }
42        }
43    }
44}
45
46impl std::error::Error for WriteError {}
47
48impl From<std::io::Error> for WriteError {
49    fn from(e: std::io::Error) -> Self {
50        Self::Io(e)
51    }
52}
53
54/// Streamender Writer fuer `.zddsrec`-Files.
55pub struct RecordWriter<W: std::io::Write> {
56    sink: W,
57    header_written: bool,
58    participants_count: u32,
59    topics_count: u32,
60    frames_written: u64,
61    bytes_written: u64,
62}
63
64impl<W: std::io::Write> RecordWriter<W> {
65    /// Erzeugt einen Writer ueber `sink`. Caller muss `write_header`
66    /// vor dem ersten Frame aufrufen.
67    pub fn new(sink: W) -> Self {
68        Self {
69            sink,
70            header_written: false,
71            participants_count: 0,
72            topics_count: 0,
73            frames_written: 0,
74            bytes_written: 0,
75        }
76    }
77
78    /// Schreibt den Header. Kann nur einmal aufgerufen werden.
79    ///
80    /// # Errors
81    /// Returnt [`WriteError::HeaderAlreadyWritten`] wenn schon
82    /// gerufen, sonst beliebige IO-Fehler.
83    pub fn write_header(&mut self, header: &Header) -> Result<(), WriteError> {
84        if self.header_written {
85            return Err(WriteError::HeaderAlreadyWritten);
86        }
87        let mut buf =
88            Vec::with_capacity(64 + header.participants.len() * 32 + header.topics.len() * 32);
89        header.write(&mut buf);
90        self.sink.write_all(&buf)?;
91        self.bytes_written = self.bytes_written.saturating_add(buf.len() as u64);
92        self.participants_count = u32::try_from(header.participants.len()).unwrap_or(u32::MAX);
93        self.topics_count = u32::try_from(header.topics.len()).unwrap_or(u32::MAX);
94        self.header_written = true;
95        Ok(())
96    }
97
98    /// Schreibt einen Frame. Header muss zuvor gesetzt sein.
99    ///
100    /// # Errors
101    /// Returnt [`WriteError::HeaderMissing`] wenn `write_header`
102    /// noch nicht aufgerufen, oder [`WriteError::OutOfRangeIdx`] wenn
103    /// `participant_idx`/`topic_idx` aus dem Header-Range fallen.
104    pub fn write_frame(&mut self, frame: &Frame) -> Result<(), WriteError> {
105        if !self.header_written {
106            return Err(WriteError::HeaderMissing);
107        }
108        if frame.participant_idx >= self.participants_count {
109            return Err(WriteError::OutOfRangeIdx {
110                idx: frame.participant_idx,
111                len: self.participants_count,
112                field: "participant",
113            });
114        }
115        if frame.topic_idx >= self.topics_count {
116            return Err(WriteError::OutOfRangeIdx {
117                idx: frame.topic_idx,
118                len: self.topics_count,
119                field: "topic",
120            });
121        }
122        let mut buf = Vec::with_capacity(32 + frame.payload.len());
123        frame.write(&mut buf);
124        self.sink.write_all(&buf)?;
125        self.bytes_written = self.bytes_written.saturating_add(buf.len() as u64);
126        self.frames_written = self.frames_written.saturating_add(1);
127        Ok(())
128    }
129
130    /// Anzahl bisher geschriebener Frames.
131    #[must_use]
132    pub fn frames_written(&self) -> u64 {
133        self.frames_written
134    }
135
136    /// Anzahl Bytes auf dem Sink.
137    #[must_use]
138    pub fn bytes_written(&self) -> u64 {
139        self.bytes_written
140    }
141
142    /// Konsumiert den Writer und gibt den darunter liegenden Sink
143    /// zurueck (relevant fuer Cursor-Tests etc.).
144    pub fn into_inner(self) -> W {
145        self.sink
146    }
147}
148
149/// Convenience: schreibt Header + alle Frames in einem Schwung.
150///
151/// # Errors
152/// Wie [`RecordWriter::write_header`] / [`RecordWriter::write_frame`].
153pub fn write_all<W: std::io::Write>(
154    sink: W,
155    header: &Header,
156    frames: impl IntoIterator<Item = Frame>,
157) -> Result<RecordWriter<W>, WriteError> {
158    let mut w = RecordWriter::new(sink);
159    w.write_header(header)?;
160    for f in frames {
161        w.write_frame(&f)?;
162    }
163    Ok(w)
164}
165
166/// Bequemer Header-Builder fuer Tests.
167#[must_use]
168pub fn header_with(
169    time_base_unix_ns: i64,
170    participants: Vec<(String, [u8; 16])>,
171    topics: Vec<(String, String)>,
172) -> Header {
173    Header {
174        time_base_unix_ns,
175        participants: participants
176            .into_iter()
177            .map(|(name, guid)| ParticipantEntry { guid, name })
178            .collect(),
179        topics: topics
180            .into_iter()
181            .map(|(name, type_name)| TopicEntry { name, type_name })
182            .collect(),
183    }
184}
185
186#[cfg(test)]
187#[allow(clippy::unwrap_used)] // tests duerfen unwrap nutzen.
188mod tests {
189    use super::*;
190    use crate::format::SampleKind;
191
192    #[test]
193    fn header_must_come_before_frames() {
194        let mut w = RecordWriter::new(Vec::<u8>::new());
195        let f = Frame {
196            timestamp_delta_ns: 0,
197            participant_idx: 0,
198            topic_idx: 0,
199            sample_kind: SampleKind::Alive,
200            payload: Vec::new(),
201        };
202        let r = w.write_frame(&f);
203        assert!(matches!(r, Err(WriteError::HeaderMissing)));
204    }
205
206    #[test]
207    fn header_only_once() {
208        let mut w = RecordWriter::new(Vec::<u8>::new());
209        let h = Header {
210            time_base_unix_ns: 0,
211            participants: Vec::new(),
212            topics: Vec::new(),
213        };
214        w.write_header(&h).unwrap();
215        let r = w.write_header(&h);
216        assert!(matches!(r, Err(WriteError::HeaderAlreadyWritten)));
217    }
218
219    #[test]
220    fn frame_idx_must_be_in_range() {
221        let mut w = RecordWriter::new(Vec::<u8>::new());
222        let h = header_with(
223            0,
224            vec![(String::from("p0"), [0u8; 16])],
225            vec![(String::from("/topic"), String::from("Type"))],
226        );
227        w.write_header(&h).unwrap();
228        let bad = Frame {
229            timestamp_delta_ns: 1,
230            participant_idx: 1,
231            topic_idx: 0,
232            sample_kind: SampleKind::Alive,
233            payload: Vec::new(),
234        };
235        let r = w.write_frame(&bad);
236        assert!(matches!(
237            r,
238            Err(WriteError::OutOfRangeIdx {
239                field: "participant",
240                ..
241            })
242        ));
243    }
244
245    #[test]
246    fn write_all_helper() {
247        let h = header_with(
248            1_700_000_000_000_000_000,
249            vec![(String::from("p"), [1u8; 16])],
250            vec![(String::from("/t"), String::from("T"))],
251        );
252        let frames = (0..5).map(|i| Frame {
253            timestamp_delta_ns: i as i64 * 1000,
254            participant_idx: 0,
255            topic_idx: 0,
256            sample_kind: SampleKind::Alive,
257            payload: vec![i as u8; 4],
258        });
259        let w = write_all(Vec::<u8>::new(), &h, frames).unwrap();
260        assert_eq!(w.frames_written(), 5);
261        assert!(w.bytes_written() > 0);
262    }
263}