Skip to main content

zerodds_recorder/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Live-Recording-Session — high-level API fuer laufende Capture.
5//!
6//! Kapselt einen [`RecordWriter`] hinter einer Topic-Indexing-Schicht:
7//! Konsument ruft `record_sample(topic, type, payload)` und der
8//! Indexer kuemmert sich um:
9//!
10//! * Initialer Header beim ersten Sample geschrieben (lazy).
11//! * Topic-/Participant-Eintraege werden bei Bedarf nachgetragen.
12//!   Da das Format Header-Once ist, wird ein neuer Header geschrieben
13//!   wenn die ersten Topics noch unbekannt sind — d.h. der Caller
14//!   sollte die Set vorab via [`SessionOptions`] anmelden.
15//! * Atomare Counter (Frames, Bytes) fuer das Dashboard.
16//!
17//! Die Anbindung an die DcpsRuntime (Hook auf Built-in-Topics
18//! `DCPSPublication`/`DCPSSubscription`) ist Aufgabe des dcps-Crate
19//! und des `tools/recorder-bridge` — `RecordingSession` bietet dem
20//! hier den thread-safen Schreib-Ingress.
21
22use alloc::string::String;
23use alloc::vec::Vec;
24use core::fmt;
25use core::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Mutex;
27
28use crate::format::{Frame, Header, ParticipantEntry, SampleKind, TopicEntry};
29use crate::writer::{RecordWriter, WriteError};
30
31/// Bequemer Topic-Schluessel: Tuple aus Topic-Name und Type-Name.
32#[derive(Clone, Debug, Hash, PartialEq, Eq)]
33pub struct TopicKey {
34    /// DDS-Topic-Name (z.B. mit `rt/`-Prefix).
35    pub topic: String,
36    /// Type-Name (z.B. `"std_msgs::msg::String"`).
37    pub type_name: String,
38}
39
40/// Setup-Optionen fuer eine Session.
41#[derive(Clone, Debug)]
42pub struct SessionOptions {
43    /// UNIX-Epoch-Anchor in Nanosekunden — Frame-Timestamps sind
44    /// Deltas dazu.
45    pub time_base_unix_ns: i64,
46    /// Vorab-bekannte Participants (GUID + Name).
47    pub participants: Vec<ParticipantEntry>,
48    /// Vorab-bekannte Topics. Falls ein Topic kommt das hier nicht
49    /// drin ist, ignoriert die Session den Sample (Counter
50    /// `samples_dropped_unknown_topic` wird inkrementiert).
51    pub topics: Vec<TopicKey>,
52}
53
54impl SessionOptions {
55    /// Konstruktor mit time_base_unix_ns und leeren Listen.
56    #[must_use]
57    pub fn new(time_base_unix_ns: i64) -> Self {
58        Self {
59            time_base_unix_ns,
60            participants: Vec::new(),
61            topics: Vec::new(),
62        }
63    }
64
65    /// Fuegt einen Participant hinzu (Builder-Form).
66    #[must_use]
67    pub fn with_participant(mut self, p: ParticipantEntry) -> Self {
68        self.participants.push(p);
69        self
70    }
71
72    /// Fuegt ein Topic hinzu (Builder-Form).
73    #[must_use]
74    pub fn with_topic(mut self, t: TopicKey) -> Self {
75        self.topics.push(t);
76        self
77    }
78}
79
80/// Session-Fehler.
81#[derive(Debug)]
82pub enum SessionError {
83    /// Underlying writer error.
84    Writer(WriteError),
85    /// Session-Mutex vergiftet.
86    Poisoned,
87}
88
89impl fmt::Display for SessionError {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        match self {
92            Self::Writer(e) => write!(f, "writer: {e}"),
93            Self::Poisoned => write!(f, "session mutex poisoned"),
94        }
95    }
96}
97
98impl std::error::Error for SessionError {}
99
100impl From<WriteError> for SessionError {
101    fn from(e: WriteError) -> Self {
102        Self::Writer(e)
103    }
104}
105
106/// Live-Recording-Session.
107///
108/// Thread-safe: mehrere Threads koennen gleichzeitig
109/// `record_sample` callen.
110pub struct RecordingSession<W: std::io::Write + Send> {
111    inner: Mutex<Inner<W>>,
112    samples_total: AtomicU64,
113    samples_dropped: AtomicU64,
114    bytes_total: AtomicU64,
115}
116
117struct Inner<W: std::io::Write> {
118    writer: RecordWriter<W>,
119    /// (topic, type) → topic_idx im Header.
120    topic_index: Vec<(TopicKey, u32)>,
121    participant_index: Vec<([u8; 16], u32)>,
122    time_base_unix_ns: i64,
123    header_written: bool,
124    /// Vor-Allokierte Header-Daten — werden beim ersten Sample
125    /// geflushed.
126    pending_header: Header,
127}
128
129impl<W: std::io::Write + Send> RecordingSession<W> {
130    /// Erzeugt eine neue Session ueber `sink`. Der Header wird beim
131    /// ersten `record_sample` geschrieben.
132    pub fn new(sink: W, opts: SessionOptions) -> Self {
133        let mut topic_index = Vec::with_capacity(opts.topics.len());
134        for (i, t) in opts.topics.iter().enumerate() {
135            topic_index.push((t.clone(), i as u32));
136        }
137        let participant_index = opts
138            .participants
139            .iter()
140            .enumerate()
141            .map(|(i, p)| (p.guid, i as u32))
142            .collect();
143        let header = Header {
144            time_base_unix_ns: opts.time_base_unix_ns,
145            participants: opts.participants,
146            topics: opts
147                .topics
148                .into_iter()
149                .map(|t| TopicEntry {
150                    name: t.topic,
151                    type_name: t.type_name,
152                })
153                .collect(),
154        };
155        Self {
156            inner: Mutex::new(Inner {
157                writer: RecordWriter::new(sink),
158                topic_index,
159                participant_index,
160                time_base_unix_ns: opts.time_base_unix_ns,
161                header_written: false,
162                pending_header: header,
163            }),
164            samples_total: AtomicU64::new(0),
165            samples_dropped: AtomicU64::new(0),
166            bytes_total: AtomicU64::new(0),
167        }
168    }
169
170    /// Schreibt einen Sample. `now_unix_ns` muss die aktuelle
171    /// Wallclock-Zeit in Nanosekunden seit Epoch sein.
172    ///
173    /// # Errors
174    /// Siehe [`SessionError`].
175    pub fn record_sample(
176        &self,
177        now_unix_ns: i64,
178        participant_guid: [u8; 16],
179        topic: &TopicKey,
180        sample_kind: SampleKind,
181        payload: Vec<u8>,
182    ) -> Result<(), SessionError> {
183        let mut g = self.inner.lock().map_err(|_| SessionError::Poisoned)?;
184        if !g.header_written {
185            let header = g.pending_header.clone();
186            g.writer.write_header(&header)?;
187            g.header_written = true;
188        }
189        let Some(topic_idx) = g
190            .topic_index
191            .iter()
192            .find(|(k, _)| k == topic)
193            .map(|(_, i)| *i)
194        else {
195            self.samples_dropped.fetch_add(1, Ordering::Relaxed);
196            return Ok(());
197        };
198        let participant_idx = g
199            .participant_index
200            .iter()
201            .find(|(g_guid, _)| g_guid == &participant_guid)
202            .map(|(_, i)| *i)
203            .unwrap_or(0);
204        let frame = Frame {
205            timestamp_delta_ns: now_unix_ns - g.time_base_unix_ns,
206            participant_idx,
207            topic_idx,
208            sample_kind,
209            payload,
210        };
211        g.writer.write_frame(&frame)?;
212        self.samples_total.fetch_add(1, Ordering::Relaxed);
213        self.bytes_total
214            .fetch_add(g.writer.bytes_written(), Ordering::Relaxed);
215        Ok(())
216    }
217
218    /// Liefert die aktuellen Counter (Snapshot).
219    #[must_use]
220    pub fn stats(&self) -> SessionStats {
221        SessionStats {
222            samples_total: self.samples_total.load(Ordering::Relaxed),
223            samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
224            bytes_total: self.bytes_total.load(Ordering::Relaxed),
225        }
226    }
227}
228
229/// Counter-Snapshot.
230#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
231pub struct SessionStats {
232    /// Anzahl erfolgreich geschriebener Samples.
233    pub samples_total: u64,
234    /// Anzahl droppter Samples (Topic nicht im Header).
235    pub samples_dropped: u64,
236    /// Gesamte File-Bytes (inkl. Header).
237    pub bytes_total: u64,
238}
239
240#[cfg(test)]
241#[allow(clippy::unwrap_used)] // tests duerfen unwrap nutzen.
242mod tests {
243    use super::*;
244
245    fn p(name: &str, guid_byte: u8) -> ParticipantEntry {
246        ParticipantEntry {
247            guid: [guid_byte; 16],
248            name: name.into(),
249        }
250    }
251    fn t(topic: &str, ty: &str) -> TopicKey {
252        TopicKey {
253            topic: topic.into(),
254            type_name: ty.into(),
255        }
256    }
257
258    #[test]
259    fn session_writes_header_lazy_on_first_sample() {
260        let opts = SessionOptions::new(1_700_000_000_000_000_000)
261            .with_participant(p("talker", 1))
262            .with_topic(t("/x", "T"));
263        let s: RecordingSession<Vec<u8>> = RecordingSession::new(Vec::new(), opts);
264        assert_eq!(s.stats().samples_total, 0);
265        s.record_sample(
266            1_700_000_000_000_001_000,
267            [1u8; 16],
268            &t("/x", "T"),
269            SampleKind::Alive,
270            vec![1, 2, 3],
271        )
272        .unwrap();
273        assert_eq!(s.stats().samples_total, 1);
274    }
275
276    #[test]
277    fn session_drops_unknown_topic() {
278        let opts = SessionOptions::new(0)
279            .with_participant(p("p", 1))
280            .with_topic(t("/known", "T"));
281        let s: RecordingSession<Vec<u8>> = RecordingSession::new(Vec::new(), opts);
282        s.record_sample(1, [1u8; 16], &t("/unknown", "U"), SampleKind::Alive, vec![])
283            .unwrap();
284        assert_eq!(s.stats().samples_total, 0);
285        assert_eq!(s.stats().samples_dropped, 1);
286    }
287
288    #[test]
289    fn session_thread_safe_record() {
290        use std::sync::Arc;
291        use std::thread;
292        let opts = SessionOptions::new(0)
293            .with_participant(p("p0", 1))
294            .with_participant(p("p1", 2))
295            .with_topic(t("/a", "T"))
296            .with_topic(t("/b", "T"));
297        let s: Arc<RecordingSession<Vec<u8>>> = Arc::new(RecordingSession::new(Vec::new(), opts));
298        let mut handles = Vec::new();
299        for thread_id in 0..4 {
300            let s = Arc::clone(&s);
301            handles.push(thread::spawn(move || {
302                for i in 0..100 {
303                    let topic = if i % 2 == 0 {
304                        t("/a", "T")
305                    } else {
306                        t("/b", "T")
307                    };
308                    let guid_byte = if thread_id < 2 { 1 } else { 2 };
309                    s.record_sample(
310                        i as i64,
311                        [guid_byte; 16],
312                        &topic,
313                        SampleKind::Alive,
314                        vec![i as u8],
315                    )
316                    .unwrap();
317                }
318            }));
319        }
320        for h in handles {
321            h.join().unwrap();
322        }
323        assert_eq!(s.stats().samples_total, 400);
324        assert_eq!(s.stats().samples_dropped, 0);
325    }
326
327    #[test]
328    fn session_unknown_participant_falls_back_to_idx_zero() {
329        let opts = SessionOptions::new(0)
330            .with_participant(p("p", 1))
331            .with_topic(t("/a", "T"));
332        let s: RecordingSession<Vec<u8>> = RecordingSession::new(Vec::new(), opts);
333        // GUID nicht in der Liste → fallback idx=0.
334        s.record_sample(
335            1,
336            [99u8; 16], // unknown
337            &t("/a", "T"),
338            SampleKind::Alive,
339            vec![],
340        )
341        .unwrap();
342        assert_eq!(s.stats().samples_total, 1);
343    }
344}