1use alloc::string::String;
23use alloc::vec::Vec;
24use core::fmt;
25use core::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Mutex;
27
28use crate::format::{Frame, Header, ParticipantEntry, SampleKind, TopicEntry};
29use crate::writer::{RecordWriter, WriteError};
30
31#[derive(Clone, Debug, Hash, PartialEq, Eq)]
33pub struct TopicKey {
34 pub topic: String,
36 pub type_name: String,
38}
39
40#[derive(Clone, Debug)]
42pub struct SessionOptions {
43 pub time_base_unix_ns: i64,
46 pub participants: Vec<ParticipantEntry>,
48 pub topics: Vec<TopicKey>,
52}
53
54impl SessionOptions {
55 #[must_use]
57 pub fn new(time_base_unix_ns: i64) -> Self {
58 Self {
59 time_base_unix_ns,
60 participants: Vec::new(),
61 topics: Vec::new(),
62 }
63 }
64
65 #[must_use]
67 pub fn with_participant(mut self, p: ParticipantEntry) -> Self {
68 self.participants.push(p);
69 self
70 }
71
72 #[must_use]
74 pub fn with_topic(mut self, t: TopicKey) -> Self {
75 self.topics.push(t);
76 self
77 }
78}
79
80#[derive(Debug)]
82pub enum SessionError {
83 Writer(WriteError),
85 Poisoned,
87}
88
89impl fmt::Display for SessionError {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 match self {
92 Self::Writer(e) => write!(f, "writer: {e}"),
93 Self::Poisoned => write!(f, "session mutex poisoned"),
94 }
95 }
96}
97
98impl std::error::Error for SessionError {}
99
100impl From<WriteError> for SessionError {
101 fn from(e: WriteError) -> Self {
102 Self::Writer(e)
103 }
104}
105
106pub struct RecordingSession<W: std::io::Write + Send> {
111 inner: Mutex<Inner<W>>,
112 samples_total: AtomicU64,
113 samples_dropped: AtomicU64,
114 bytes_total: AtomicU64,
115}
116
117struct Inner<W: std::io::Write> {
118 writer: RecordWriter<W>,
119 topic_index: Vec<(TopicKey, u32)>,
121 participant_index: Vec<([u8; 16], u32)>,
122 time_base_unix_ns: i64,
123 header_written: bool,
124 pending_header: Header,
127}
128
129impl<W: std::io::Write + Send> RecordingSession<W> {
130 pub fn new(sink: W, opts: SessionOptions) -> Self {
133 let mut topic_index = Vec::with_capacity(opts.topics.len());
134 for (i, t) in opts.topics.iter().enumerate() {
135 topic_index.push((t.clone(), i as u32));
136 }
137 let participant_index = opts
138 .participants
139 .iter()
140 .enumerate()
141 .map(|(i, p)| (p.guid, i as u32))
142 .collect();
143 let header = Header {
144 time_base_unix_ns: opts.time_base_unix_ns,
145 participants: opts.participants,
146 topics: opts
147 .topics
148 .into_iter()
149 .map(|t| TopicEntry {
150 name: t.topic,
151 type_name: t.type_name,
152 })
153 .collect(),
154 };
155 Self {
156 inner: Mutex::new(Inner {
157 writer: RecordWriter::new(sink),
158 topic_index,
159 participant_index,
160 time_base_unix_ns: opts.time_base_unix_ns,
161 header_written: false,
162 pending_header: header,
163 }),
164 samples_total: AtomicU64::new(0),
165 samples_dropped: AtomicU64::new(0),
166 bytes_total: AtomicU64::new(0),
167 }
168 }
169
170 pub fn record_sample(
176 &self,
177 now_unix_ns: i64,
178 participant_guid: [u8; 16],
179 topic: &TopicKey,
180 sample_kind: SampleKind,
181 payload: Vec<u8>,
182 ) -> Result<(), SessionError> {
183 let mut g = self.inner.lock().map_err(|_| SessionError::Poisoned)?;
184 if !g.header_written {
185 let header = g.pending_header.clone();
186 g.writer.write_header(&header)?;
187 g.header_written = true;
188 }
189 let Some(topic_idx) = g
190 .topic_index
191 .iter()
192 .find(|(k, _)| k == topic)
193 .map(|(_, i)| *i)
194 else {
195 self.samples_dropped.fetch_add(1, Ordering::Relaxed);
196 return Ok(());
197 };
198 let participant_idx = g
199 .participant_index
200 .iter()
201 .find(|(g_guid, _)| g_guid == &participant_guid)
202 .map(|(_, i)| *i)
203 .unwrap_or(0);
204 let frame = Frame {
205 timestamp_delta_ns: now_unix_ns - g.time_base_unix_ns,
206 participant_idx,
207 topic_idx,
208 sample_kind,
209 payload,
210 };
211 g.writer.write_frame(&frame)?;
212 self.samples_total.fetch_add(1, Ordering::Relaxed);
213 self.bytes_total
214 .fetch_add(g.writer.bytes_written(), Ordering::Relaxed);
215 Ok(())
216 }
217
218 #[must_use]
220 pub fn stats(&self) -> SessionStats {
221 SessionStats {
222 samples_total: self.samples_total.load(Ordering::Relaxed),
223 samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
224 bytes_total: self.bytes_total.load(Ordering::Relaxed),
225 }
226 }
227}
228
229#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
231pub struct SessionStats {
232 pub samples_total: u64,
234 pub samples_dropped: u64,
236 pub bytes_total: u64,
238}
239
240#[cfg(test)]
241#[allow(clippy::unwrap_used)] mod tests {
243 use super::*;
244
245 fn p(name: &str, guid_byte: u8) -> ParticipantEntry {
246 ParticipantEntry {
247 guid: [guid_byte; 16],
248 name: name.into(),
249 }
250 }
251 fn t(topic: &str, ty: &str) -> TopicKey {
252 TopicKey {
253 topic: topic.into(),
254 type_name: ty.into(),
255 }
256 }
257
258 #[test]
259 fn session_writes_header_lazy_on_first_sample() {
260 let opts = SessionOptions::new(1_700_000_000_000_000_000)
261 .with_participant(p("talker", 1))
262 .with_topic(t("/x", "T"));
263 let s: RecordingSession<Vec<u8>> = RecordingSession::new(Vec::new(), opts);
264 assert_eq!(s.stats().samples_total, 0);
265 s.record_sample(
266 1_700_000_000_000_001_000,
267 [1u8; 16],
268 &t("/x", "T"),
269 SampleKind::Alive,
270 vec![1, 2, 3],
271 )
272 .unwrap();
273 assert_eq!(s.stats().samples_total, 1);
274 }
275
276 #[test]
277 fn session_drops_unknown_topic() {
278 let opts = SessionOptions::new(0)
279 .with_participant(p("p", 1))
280 .with_topic(t("/known", "T"));
281 let s: RecordingSession<Vec<u8>> = RecordingSession::new(Vec::new(), opts);
282 s.record_sample(1, [1u8; 16], &t("/unknown", "U"), SampleKind::Alive, vec![])
283 .unwrap();
284 assert_eq!(s.stats().samples_total, 0);
285 assert_eq!(s.stats().samples_dropped, 1);
286 }
287
288 #[test]
289 fn session_thread_safe_record() {
290 use std::sync::Arc;
291 use std::thread;
292 let opts = SessionOptions::new(0)
293 .with_participant(p("p0", 1))
294 .with_participant(p("p1", 2))
295 .with_topic(t("/a", "T"))
296 .with_topic(t("/b", "T"));
297 let s: Arc<RecordingSession<Vec<u8>>> = Arc::new(RecordingSession::new(Vec::new(), opts));
298 let mut handles = Vec::new();
299 for thread_id in 0..4 {
300 let s = Arc::clone(&s);
301 handles.push(thread::spawn(move || {
302 for i in 0..100 {
303 let topic = if i % 2 == 0 {
304 t("/a", "T")
305 } else {
306 t("/b", "T")
307 };
308 let guid_byte = if thread_id < 2 { 1 } else { 2 };
309 s.record_sample(
310 i as i64,
311 [guid_byte; 16],
312 &topic,
313 SampleKind::Alive,
314 vec![i as u8],
315 )
316 .unwrap();
317 }
318 }));
319 }
320 for h in handles {
321 h.join().unwrap();
322 }
323 assert_eq!(s.stats().samples_total, 400);
324 assert_eq!(s.stats().samples_dropped, 0);
325 }
326
327 #[test]
328 fn session_unknown_participant_falls_back_to_idx_zero() {
329 let opts = SessionOptions::new(0)
330 .with_participant(p("p", 1))
331 .with_topic(t("/a", "T"));
332 let s: RecordingSession<Vec<u8>> = RecordingSession::new(Vec::new(), opts);
333 s.record_sample(
335 1,
336 [99u8; 16], &t("/a", "T"),
338 SampleKind::Alive,
339 vec![],
340 )
341 .unwrap();
342 assert_eq!(s.stats().samples_total, 1);
343 }
344}