Skip to main content

zerodds_dashboard/
state.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Daten-Modell des Dashboards. Konsumenten fuellen es aus DcpsRuntime,
5//! der Server serialisiert es zu JSON.
6
7use std::sync::Mutex;
8
9/// Live-State des Dashboards. Mutex-geschuetzt, der Server liest
10/// und Konsumenten schreiben.
11pub struct DashboardState {
12    inner: Mutex<Inner>,
13}
14
15#[derive(Default)]
16struct Inner {
17    topics: Vec<TopicInfo>,
18    participants: Vec<ParticipantInfo>,
19    histograms: Vec<HistogramSnapshot>,
20    edges: Vec<DiscoveryEdge>,
21    recording: RecordingStatus,
22}
23
24/// Eintrag in der Topic-Liste.
25#[derive(Clone, Debug, PartialEq)]
26pub struct TopicInfo {
27    /// Topic-Name (DDS-Wire-Form, ggf. mit `rt/`-Prefix).
28    pub name: String,
29    /// Type-Name.
30    pub type_name: String,
31    /// Anzahl matched Publisher.
32    pub publishers: u32,
33    /// Anzahl matched Subscriber.
34    pub subscribers: u32,
35    /// Letzte gemessene Sample-Rate (samples/s, EMA).
36    pub sample_rate_hz: f64,
37}
38
39/// Participant-Info.
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub struct ParticipantInfo {
42    /// 16-Byte GUID-Prefix als Hex.
43    pub guid_prefix_hex: String,
44    /// Logischer Name.
45    pub name: String,
46    /// Domain-ID.
47    pub domain_id: u32,
48    /// Vendor-ID-Hex (z.B. "01.0F" fuer ZeroDDS).
49    pub vendor_id_hex: String,
50}
51
52/// Histogram-Snapshot fuer das Dashboard.
53#[derive(Clone, Debug)]
54pub struct HistogramSnapshot {
55    /// Logischer Name.
56    pub name: String,
57    /// Anzahl aller Records.
58    pub count: u64,
59    /// Mittelwert (ns).
60    pub mean_ns: u64,
61    /// Min/Max.
62    pub min_ns: u64,
63    /// Max-Wert (ns).
64    pub max_ns: u64,
65    /// p50/p99 — Aufrufer berechnet (z.B. aus hdrhistogram).
66    pub p50_ns: u64,
67    /// p99 als ns.
68    pub p99_ns: u64,
69}
70
71/// Edge im Discovery-Graph: Pub→Sub-Match auf einem Topic.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub struct DiscoveryEdge {
74    /// GUID-Hex des Pub-Endpoints.
75    pub from_guid: String,
76    /// GUID-Hex des Sub-Endpoints.
77    pub to_guid: String,
78    /// Topic-Name.
79    pub topic: String,
80}
81
82/// Knoten im Discovery-Graph (Endpoint).
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub struct DiscoveryNode {
85    /// GUID-Hex.
86    pub guid: String,
87    /// Display-Name.
88    pub label: String,
89    /// "publisher" | "subscriber" | "participant".
90    pub kind: String,
91}
92
93/// Recording-Status.
94#[derive(Clone, Debug, Default, PartialEq, Eq)]
95pub struct RecordingStatus {
96    /// `true` wenn aktuell aufgezeichnet wird.
97    pub active: bool,
98    /// Pfad der Output-Datei (wenn aktiv).
99    pub output_path: Option<String>,
100    /// Anzahl bisher geschriebener Frames.
101    pub frames: u64,
102}
103
104impl DashboardState {
105    /// Leerer State.
106    #[must_use]
107    pub fn new() -> Self {
108        Self {
109            inner: Mutex::new(Inner::default()),
110        }
111    }
112
113    /// Setzt die komplette Topic-Liste (ueberschreibt).
114    pub fn set_topics(&self, topics: Vec<TopicInfo>) {
115        if let Ok(mut g) = self.inner.lock() {
116            g.topics = topics;
117        }
118    }
119
120    /// Setzt die Participant-Liste.
121    pub fn set_participants(&self, ps: Vec<ParticipantInfo>) {
122        if let Ok(mut g) = self.inner.lock() {
123            g.participants = ps;
124        }
125    }
126
127    /// Setzt die Histogramme.
128    pub fn set_histograms(&self, hs: Vec<HistogramSnapshot>) {
129        if let Ok(mut g) = self.inner.lock() {
130            g.histograms = hs;
131        }
132    }
133
134    /// Setzt die Discovery-Edges.
135    pub fn set_edges(&self, e: Vec<DiscoveryEdge>) {
136        if let Ok(mut g) = self.inner.lock() {
137            g.edges = e;
138        }
139    }
140
141    /// Setzt den Recording-Status.
142    pub fn set_recording(&self, r: RecordingStatus) {
143        if let Ok(mut g) = self.inner.lock() {
144            g.recording = r;
145        }
146    }
147
148    /// Phase-B: Inject ueber JSON-Body (POST /api/inject/topics).
149    /// Erwartet das gleiche Schema wie die GET-Antwort.
150    pub fn inject_topics_json(&self, body: &str) -> Result<usize, String> {
151        let v = parse_json(body)?;
152        let arr = v.as_array().ok_or("topics must be array")?;
153        let mut topics = Vec::with_capacity(arr.len());
154        for o in arr {
155            topics.push(TopicInfo {
156                name: o.get_str("name").unwrap_or("").into(),
157                type_name: o.get_str("type_name").unwrap_or("").into(),
158                publishers: o.get_u32("publishers").unwrap_or(0),
159                subscribers: o.get_u32("subscribers").unwrap_or(0),
160                sample_rate_hz: o.get_f64("sample_rate_hz").unwrap_or(0.0),
161            });
162        }
163        let n = topics.len();
164        self.set_topics(topics);
165        Ok(n)
166    }
167
168    /// Phase-B: Inject von Participants.
169    pub fn inject_participants_json(&self, body: &str) -> Result<usize, String> {
170        let v = parse_json(body)?;
171        let arr = v.as_array().ok_or("participants must be array")?;
172        let mut ps = Vec::with_capacity(arr.len());
173        for o in arr {
174            ps.push(ParticipantInfo {
175                guid_prefix_hex: o.get_str("guid_prefix_hex").unwrap_or("").into(),
176                name: o.get_str("name").unwrap_or("").into(),
177                domain_id: o.get_u32("domain_id").unwrap_or(0),
178                vendor_id_hex: o.get_str("vendor_id_hex").unwrap_or("").into(),
179            });
180        }
181        let n = ps.len();
182        self.set_participants(ps);
183        Ok(n)
184    }
185
186    /// Phase-B: Inject von Histograms.
187    pub fn inject_histograms_json(&self, body: &str) -> Result<usize, String> {
188        let v = parse_json(body)?;
189        let arr = v.as_array().ok_or("histograms must be array")?;
190        let mut hs = Vec::with_capacity(arr.len());
191        for o in arr {
192            hs.push(HistogramSnapshot {
193                name: o.get_str("name").unwrap_or("").into(),
194                count: o.get_u64("count").unwrap_or(0),
195                mean_ns: o.get_u64("mean_ns").unwrap_or(0),
196                min_ns: o.get_u64("min_ns").unwrap_or(0),
197                max_ns: o.get_u64("max_ns").unwrap_or(0),
198                p50_ns: o.get_u64("p50_ns").unwrap_or(0),
199                p99_ns: o.get_u64("p99_ns").unwrap_or(0),
200            });
201        }
202        let n = hs.len();
203        self.set_histograms(hs);
204        Ok(n)
205    }
206
207    /// Liest topics-JSON.
208    #[must_use]
209    pub fn topics_json(&self) -> String {
210        let Ok(g) = self.inner.lock() else {
211            return "[]".into();
212        };
213        let mut out = String::from("[");
214        for (i, t) in g.topics.iter().enumerate() {
215            if i > 0 {
216                out.push(',');
217            }
218            out.push_str(r#"{"name":""#);
219            json_escape(&mut out, &t.name);
220            out.push_str(r#"","type_name":""#);
221            json_escape(&mut out, &t.type_name);
222            out.push_str(&format!(
223                r#"","publishers":{},"subscribers":{},"sample_rate_hz":{:.3}}}"#,
224                t.publishers, t.subscribers, t.sample_rate_hz
225            ));
226        }
227        out.push(']');
228        out
229    }
230
231    /// Liest participants-JSON.
232    #[must_use]
233    pub fn participants_json(&self) -> String {
234        let Ok(g) = self.inner.lock() else {
235            return "[]".into();
236        };
237        let mut out = String::from("[");
238        for (i, p) in g.participants.iter().enumerate() {
239            if i > 0 {
240                out.push(',');
241            }
242            out.push_str(r#"{"guid_prefix_hex":""#);
243            json_escape(&mut out, &p.guid_prefix_hex);
244            out.push_str(r#"","name":""#);
245            json_escape(&mut out, &p.name);
246            out.push_str(r#"","vendor_id_hex":""#);
247            json_escape(&mut out, &p.vendor_id_hex);
248            out.push_str(&format!(r#"","domain_id":{}}}"#, p.domain_id));
249        }
250        out.push(']');
251        out
252    }
253
254    /// Liest histograms-JSON.
255    #[must_use]
256    pub fn histograms_json(&self) -> String {
257        let Ok(g) = self.inner.lock() else {
258            return "[]".into();
259        };
260        let mut out = String::from("[");
261        for (i, h) in g.histograms.iter().enumerate() {
262            if i > 0 {
263                out.push(',');
264            }
265            out.push_str(r#"{"name":""#);
266            json_escape(&mut out, &h.name);
267            out.push_str(&format!(
268                r#"","count":{},"mean_ns":{},"min_ns":{},"max_ns":{},"p50_ns":{},"p99_ns":{}}}"#,
269                h.count, h.mean_ns, h.min_ns, h.max_ns, h.p50_ns, h.p99_ns
270            ));
271        }
272        out.push(']');
273        out
274    }
275
276    /// Liest discovery-graph-JSON.
277    #[must_use]
278    pub fn graph_json(&self) -> String {
279        let Ok(g) = self.inner.lock() else {
280            return r#"{"nodes":[],"edges":[]}"#.into();
281        };
282        // Nodes ergeben sich aus Participants (immer).
283        let mut nodes = String::from(r#"{"nodes":["#);
284        for (i, p) in g.participants.iter().enumerate() {
285            if i > 0 {
286                nodes.push(',');
287            }
288            nodes.push_str(r#"{"guid":""#);
289            json_escape(&mut nodes, &p.guid_prefix_hex);
290            nodes.push_str(r#"","label":""#);
291            json_escape(&mut nodes, &p.name);
292            nodes.push_str(r#"","kind":"participant"}"#);
293        }
294        nodes.push_str(r#"],"edges":["#);
295        for (i, e) in g.edges.iter().enumerate() {
296            if i > 0 {
297                nodes.push(',');
298            }
299            nodes.push_str(r#"{"from_guid":""#);
300            json_escape(&mut nodes, &e.from_guid);
301            nodes.push_str(r#"","to_guid":""#);
302            json_escape(&mut nodes, &e.to_guid);
303            nodes.push_str(r#"","topic":""#);
304            json_escape(&mut nodes, &e.topic);
305            nodes.push_str(r#""}"#);
306        }
307        nodes.push_str("]}");
308        nodes
309    }
310
311    /// Liest recording-JSON.
312    #[must_use]
313    pub fn recording_json(&self) -> String {
314        let Ok(g) = self.inner.lock() else {
315            return r#"{"active":false,"frames":0}"#.into();
316        };
317        let path = match &g.recording.output_path {
318            Some(p) => format!(r#""{}""#, escape_value(p)),
319            None => "null".into(),
320        };
321        format!(
322            r#"{{"active":{},"output_path":{},"frames":{}}}"#,
323            g.recording.active, path, g.recording.frames
324        )
325    }
326}
327
328impl Default for DashboardState {
329    fn default() -> Self {
330        Self::new()
331    }
332}
333
334// ---- Mini-JSON-Parser (kein serde-Dep) ----
335//
336// Reicht fuer das Inject-Schema: Array of Objects mit String/Num-
337// Feldern. Identisch zur Logik in tools/interop-matrix/src/parser.rs,
338// hier inline weil wir einen separaten Top-Level-Use-Case haben.
339
340#[derive(Clone, Debug)]
341#[allow(dead_code)] // Bool/Null werden parsed aber nicht weiter inspiziert.
342enum Val {
343    Str(String),
344    Num(String),
345    Bool(bool),
346    Null,
347    Array(Vec<Val>),
348    Object(Vec<(String, Val)>),
349}
350
351impl Val {
352    fn as_array(&self) -> Option<&[Val]> {
353        if let Self::Array(a) = self {
354            Some(a)
355        } else {
356            None
357        }
358    }
359    fn as_object(&self) -> Option<&[(String, Val)]> {
360        if let Self::Object(o) = self {
361            Some(o)
362        } else {
363            None
364        }
365    }
366    fn get_str(&self, k: &str) -> Option<&str> {
367        let kv = self.as_object()?.iter().find(|(n, _)| n == k)?;
368        if let Val::Str(s) = &kv.1 {
369            Some(s)
370        } else {
371            None
372        }
373    }
374    fn get_u32(&self, k: &str) -> Option<u32> {
375        let kv = self.as_object()?.iter().find(|(n, _)| n == k)?;
376        if let Val::Num(s) = &kv.1 {
377            s.parse().ok()
378        } else {
379            None
380        }
381    }
382    fn get_u64(&self, k: &str) -> Option<u64> {
383        let kv = self.as_object()?.iter().find(|(n, _)| n == k)?;
384        if let Val::Num(s) = &kv.1 {
385            s.parse().ok()
386        } else {
387            None
388        }
389    }
390    fn get_f64(&self, k: &str) -> Option<f64> {
391        let kv = self.as_object()?.iter().find(|(n, _)| n == k)?;
392        if let Val::Num(s) = &kv.1 {
393            s.parse().ok()
394        } else {
395            None
396        }
397    }
398}
399
400fn parse_json(body: &str) -> Result<Val, String> {
401    let mut p = JsonParser {
402        s: body.as_bytes(),
403        i: 0,
404    };
405    p.skip_ws();
406    p.parse_value()
407}
408
409struct JsonParser<'a> {
410    s: &'a [u8],
411    i: usize,
412}
413
414impl JsonParser<'_> {
415    fn skip_ws(&mut self) {
416        while self.i < self.s.len() && self.s[self.i].is_ascii_whitespace() {
417            self.i += 1;
418        }
419    }
420    fn parse_value(&mut self) -> Result<Val, String> {
421        self.skip_ws();
422        if self.i >= self.s.len() {
423            return Err("unexpected eof".into());
424        }
425        match self.s[self.i] {
426            b'{' => self.parse_object(),
427            b'[' => self.parse_array(),
428            b'"' => self.parse_string().map(Val::Str),
429            b't' | b'f' => self.parse_bool(),
430            b'n' => self.parse_null(),
431            b'-' | b'0'..=b'9' => self.parse_num(),
432            other => Err(format!("unexpected byte 0x{other:02x} at {}", self.i)),
433        }
434    }
435    fn parse_object(&mut self) -> Result<Val, String> {
436        self.i += 1; // {
437        self.skip_ws();
438        let mut out = Vec::new();
439        if self.i < self.s.len() && self.s[self.i] == b'}' {
440            self.i += 1;
441            return Ok(Val::Object(out));
442        }
443        loop {
444            self.skip_ws();
445            let key = self.parse_string()?;
446            self.skip_ws();
447            if self.i >= self.s.len() || self.s[self.i] != b':' {
448                return Err("expected ':'".into());
449            }
450            self.i += 1;
451            let val = self.parse_value()?;
452            out.push((key, val));
453            self.skip_ws();
454            if self.i < self.s.len() && self.s[self.i] == b',' {
455                self.i += 1;
456                continue;
457            }
458            if self.i < self.s.len() && self.s[self.i] == b'}' {
459                self.i += 1;
460                return Ok(Val::Object(out));
461            }
462            return Err("expected ',' or '}'".into());
463        }
464    }
465    fn parse_array(&mut self) -> Result<Val, String> {
466        self.i += 1;
467        self.skip_ws();
468        let mut out = Vec::new();
469        if self.i < self.s.len() && self.s[self.i] == b']' {
470            self.i += 1;
471            return Ok(Val::Array(out));
472        }
473        loop {
474            out.push(self.parse_value()?);
475            self.skip_ws();
476            if self.i < self.s.len() && self.s[self.i] == b',' {
477                self.i += 1;
478                continue;
479            }
480            if self.i < self.s.len() && self.s[self.i] == b']' {
481                self.i += 1;
482                return Ok(Val::Array(out));
483            }
484            return Err("expected ',' or ']'".into());
485        }
486    }
487    fn parse_string(&mut self) -> Result<String, String> {
488        if self.s[self.i] != b'"' {
489            return Err("expected string".into());
490        }
491        self.i += 1;
492        let mut out = String::new();
493        while self.i < self.s.len() {
494            let c = self.s[self.i];
495            if c == b'"' {
496                self.i += 1;
497                return Ok(out);
498            }
499            if c == b'\\' && self.i + 1 < self.s.len() {
500                let esc = self.s[self.i + 1];
501                self.i += 2;
502                match esc {
503                    b'"' => out.push('"'),
504                    b'\\' => out.push('\\'),
505                    b'/' => out.push('/'),
506                    b'n' => out.push('\n'),
507                    b't' => out.push('\t'),
508                    b'r' => out.push('\r'),
509                    other => out.push(other as char),
510                }
511            } else {
512                out.push(c as char);
513                self.i += 1;
514            }
515        }
516        Err("unterminated string".into())
517    }
518    fn parse_bool(&mut self) -> Result<Val, String> {
519        if self.s[self.i..].starts_with(b"true") {
520            self.i += 4;
521            Ok(Val::Bool(true))
522        } else if self.s[self.i..].starts_with(b"false") {
523            self.i += 5;
524            Ok(Val::Bool(false))
525        } else {
526            Err("expected bool".into())
527        }
528    }
529    fn parse_null(&mut self) -> Result<Val, String> {
530        if self.s[self.i..].starts_with(b"null") {
531            self.i += 4;
532            Ok(Val::Null)
533        } else {
534            Err("expected null".into())
535        }
536    }
537    fn parse_num(&mut self) -> Result<Val, String> {
538        let start = self.i;
539        if self.s[self.i] == b'-' {
540            self.i += 1;
541        }
542        while self.i < self.s.len() {
543            let c = self.s[self.i];
544            if c.is_ascii_digit() || c == b'.' || c == b'e' || c == b'E' || c == b'+' || c == b'-' {
545                self.i += 1;
546            } else {
547                break;
548            }
549        }
550        Ok(Val::Num(
551            String::from_utf8_lossy(&self.s[start..self.i]).into_owned(),
552        ))
553    }
554}
555
556fn json_escape(out: &mut String, s: &str) {
557    for c in s.chars() {
558        match c {
559            '"' => out.push_str(r#"\""#),
560            '\\' => out.push_str(r"\\"),
561            '\n' => out.push_str(r"\n"),
562            '\r' => out.push_str(r"\r"),
563            '\t' => out.push_str(r"\t"),
564            c if (c as u32) < 0x20 => {
565                out.push_str(&format!("\\u{:04x}", c as u32));
566            }
567            c => out.push(c),
568        }
569    }
570}
571
572fn escape_value(s: &str) -> String {
573    let mut out = String::new();
574    json_escape(&mut out, s);
575    out
576}
577
578#[cfg(test)]
579#[allow(clippy::unwrap_used)] // tests duerfen unwrap nutzen.
580mod tests {
581    use super::*;
582
583    #[test]
584    fn empty_state_topics_is_empty_array() {
585        let s = DashboardState::new();
586        assert_eq!(s.topics_json(), "[]");
587        assert_eq!(s.participants_json(), "[]");
588        assert_eq!(s.histograms_json(), "[]");
589        assert_eq!(s.graph_json(), r#"{"nodes":[],"edges":[]}"#);
590    }
591
592    #[test]
593    fn topics_serializes_fields() {
594        let s = DashboardState::new();
595        s.set_topics(vec![TopicInfo {
596            name: "/chatter".into(),
597            type_name: "std_msgs::msg::String".into(),
598            publishers: 1,
599            subscribers: 2,
600            sample_rate_hz: 10.5,
601        }]);
602        let j = s.topics_json();
603        assert!(j.contains(r#""name":"/chatter""#));
604        assert!(j.contains(r#""publishers":1"#));
605        assert!(j.contains(r#""subscribers":2"#));
606        assert!(j.contains(r#""sample_rate_hz":10.500"#));
607    }
608
609    #[test]
610    fn participants_serializes_guid() {
611        let s = DashboardState::new();
612        s.set_participants(vec![ParticipantInfo {
613            guid_prefix_hex: "010203040506070809".into(),
614            name: "talker".into(),
615            domain_id: 0,
616            vendor_id_hex: "01.0F".into(),
617        }]);
618        let j = s.participants_json();
619        assert!(j.contains(r#""guid_prefix_hex":"010203040506070809""#));
620        assert!(j.contains(r#""domain_id":0"#));
621    }
622
623    #[test]
624    fn graph_combines_nodes_and_edges() {
625        let s = DashboardState::new();
626        s.set_participants(vec![
627            ParticipantInfo {
628                guid_prefix_hex: "a".into(),
629                name: "talker".into(),
630                domain_id: 0,
631                vendor_id_hex: "x".into(),
632            },
633            ParticipantInfo {
634                guid_prefix_hex: "b".into(),
635                name: "listener".into(),
636                domain_id: 0,
637                vendor_id_hex: "x".into(),
638            },
639        ]);
640        s.set_edges(vec![DiscoveryEdge {
641            from_guid: "a".into(),
642            to_guid: "b".into(),
643            topic: "/chatter".into(),
644        }]);
645        let j = s.graph_json();
646        assert!(j.contains(r#""label":"talker""#));
647        assert!(j.contains(r#""kind":"participant""#));
648        assert!(j.contains(r#""topic":"/chatter""#));
649    }
650
651    #[test]
652    fn json_escape_handles_special_chars() {
653        let mut s = String::new();
654        json_escape(&mut s, "a\"b\nc");
655        assert_eq!(s, r#"a\"b\nc"#);
656    }
657
658    #[test]
659    fn recording_status_json() {
660        let s = DashboardState::new();
661        s.set_recording(RecordingStatus {
662            active: true,
663            output_path: Some("/tmp/x.zddsrec".into()),
664            frames: 42,
665        });
666        let j = s.recording_json();
667        assert!(j.contains(r#""active":true"#));
668        assert!(j.contains(r#""output_path":"/tmp/x.zddsrec""#));
669        assert!(j.contains(r#""frames":42"#));
670    }
671}