1use alloc::string::String;
8use alloc::vec::Vec;
9use core::fmt;
10
11use crate::format::{Frame, Header, ParticipantEntry, TopicEntry};
12
13#[derive(Debug)]
15pub enum WriteError {
16 Io(std::io::Error),
18 HeaderAlreadyWritten,
20 HeaderMissing,
22 OutOfRangeIdx {
24 idx: u32,
26 len: u32,
28 field: &'static str,
30 },
31}
32
33impl fmt::Display for WriteError {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 Self::Io(e) => write!(f, "io: {e}"),
37 Self::HeaderAlreadyWritten => write!(f, "header already written"),
38 Self::HeaderMissing => write!(f, "frame written before header"),
39 Self::OutOfRangeIdx { idx, len, field } => {
40 write!(f, "{field}_idx {idx} >= {field}_count {len}")
41 }
42 }
43 }
44}
45
46impl std::error::Error for WriteError {}
47
48impl From<std::io::Error> for WriteError {
49 fn from(e: std::io::Error) -> Self {
50 Self::Io(e)
51 }
52}
53
54pub struct RecordWriter<W: std::io::Write> {
56 sink: W,
57 header_written: bool,
58 participants_count: u32,
59 topics_count: u32,
60 frames_written: u64,
61 bytes_written: u64,
62}
63
64impl<W: std::io::Write> RecordWriter<W> {
65 pub fn new(sink: W) -> Self {
68 Self {
69 sink,
70 header_written: false,
71 participants_count: 0,
72 topics_count: 0,
73 frames_written: 0,
74 bytes_written: 0,
75 }
76 }
77
78 pub fn write_header(&mut self, header: &Header) -> Result<(), WriteError> {
84 if self.header_written {
85 return Err(WriteError::HeaderAlreadyWritten);
86 }
87 let mut buf =
88 Vec::with_capacity(64 + header.participants.len() * 32 + header.topics.len() * 32);
89 header.write(&mut buf);
90 self.sink.write_all(&buf)?;
91 self.bytes_written = self.bytes_written.saturating_add(buf.len() as u64);
92 self.participants_count = u32::try_from(header.participants.len()).unwrap_or(u32::MAX);
93 self.topics_count = u32::try_from(header.topics.len()).unwrap_or(u32::MAX);
94 self.header_written = true;
95 Ok(())
96 }
97
98 pub fn write_frame(&mut self, frame: &Frame) -> Result<(), WriteError> {
105 if !self.header_written {
106 return Err(WriteError::HeaderMissing);
107 }
108 if frame.participant_idx >= self.participants_count {
109 return Err(WriteError::OutOfRangeIdx {
110 idx: frame.participant_idx,
111 len: self.participants_count,
112 field: "participant",
113 });
114 }
115 if frame.topic_idx >= self.topics_count {
116 return Err(WriteError::OutOfRangeIdx {
117 idx: frame.topic_idx,
118 len: self.topics_count,
119 field: "topic",
120 });
121 }
122 let mut buf = Vec::with_capacity(32 + frame.payload.len());
123 frame.write(&mut buf);
124 self.sink.write_all(&buf)?;
125 self.bytes_written = self.bytes_written.saturating_add(buf.len() as u64);
126 self.frames_written = self.frames_written.saturating_add(1);
127 Ok(())
128 }
129
130 #[must_use]
132 pub fn frames_written(&self) -> u64 {
133 self.frames_written
134 }
135
136 #[must_use]
138 pub fn bytes_written(&self) -> u64 {
139 self.bytes_written
140 }
141
142 pub fn into_inner(self) -> W {
145 self.sink
146 }
147}
148
149pub fn write_all<W: std::io::Write>(
154 sink: W,
155 header: &Header,
156 frames: impl IntoIterator<Item = Frame>,
157) -> Result<RecordWriter<W>, WriteError> {
158 let mut w = RecordWriter::new(sink);
159 w.write_header(header)?;
160 for f in frames {
161 w.write_frame(&f)?;
162 }
163 Ok(w)
164}
165
166#[must_use]
168pub fn header_with(
169 time_base_unix_ns: i64,
170 participants: Vec<(String, [u8; 16])>,
171 topics: Vec<(String, String)>,
172) -> Header {
173 Header {
174 time_base_unix_ns,
175 participants: participants
176 .into_iter()
177 .map(|(name, guid)| ParticipantEntry { guid, name })
178 .collect(),
179 topics: topics
180 .into_iter()
181 .map(|(name, type_name)| TopicEntry { name, type_name })
182 .collect(),
183 }
184}
185
186#[cfg(test)]
187#[allow(clippy::unwrap_used)] mod tests {
189 use super::*;
190 use crate::format::SampleKind;
191
192 #[test]
193 fn header_must_come_before_frames() {
194 let mut w = RecordWriter::new(Vec::<u8>::new());
195 let f = Frame {
196 timestamp_delta_ns: 0,
197 participant_idx: 0,
198 topic_idx: 0,
199 sample_kind: SampleKind::Alive,
200 payload: Vec::new(),
201 };
202 let r = w.write_frame(&f);
203 assert!(matches!(r, Err(WriteError::HeaderMissing)));
204 }
205
206 #[test]
207 fn header_only_once() {
208 let mut w = RecordWriter::new(Vec::<u8>::new());
209 let h = Header {
210 time_base_unix_ns: 0,
211 participants: Vec::new(),
212 topics: Vec::new(),
213 };
214 w.write_header(&h).unwrap();
215 let r = w.write_header(&h);
216 assert!(matches!(r, Err(WriteError::HeaderAlreadyWritten)));
217 }
218
219 #[test]
220 fn frame_idx_must_be_in_range() {
221 let mut w = RecordWriter::new(Vec::<u8>::new());
222 let h = header_with(
223 0,
224 vec![(String::from("p0"), [0u8; 16])],
225 vec![(String::from("/topic"), String::from("Type"))],
226 );
227 w.write_header(&h).unwrap();
228 let bad = Frame {
229 timestamp_delta_ns: 1,
230 participant_idx: 1,
231 topic_idx: 0,
232 sample_kind: SampleKind::Alive,
233 payload: Vec::new(),
234 };
235 let r = w.write_frame(&bad);
236 assert!(matches!(
237 r,
238 Err(WriteError::OutOfRangeIdx {
239 field: "participant",
240 ..
241 })
242 ));
243 }
244
245 #[test]
246 fn write_all_helper() {
247 let h = header_with(
248 1_700_000_000_000_000_000,
249 vec![(String::from("p"), [1u8; 16])],
250 vec![(String::from("/t"), String::from("T"))],
251 );
252 let frames = (0..5).map(|i| Frame {
253 timestamp_delta_ns: i as i64 * 1000,
254 participant_idx: 0,
255 topic_idx: 0,
256 sample_kind: SampleKind::Alive,
257 payload: vec![i as u8; 4],
258 });
259 let w = write_all(Vec::<u8>::new(), &h, frames).unwrap();
260 assert_eq!(w.frames_written(), 5);
261 assert!(w.bytes_written() > 0);
262 }
263}