Skip to main content

zerodds_foundation/
observability.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Observability — strukturierte DDS-Events fuer Tracing und Metriken.
4//!
5//! Liefert eine ZeroDDS-spezifische Event-Sprache plus einen
6//! schlanken `Sink`-Trait, ueber den Konsumenten Events abgreifen
7//! koennen.
8//!
9//! ## Design-Ziele
10//!
11//! 1. **Zero-Overhead by Default**: ohne Sink wird gar kein Event-Objekt
12//!    konstruiert (`with_sink(...)` ist die Opt-In-Stelle).
13//! 2. **Sync, allocation-light**: `Sink::record(&Event)` nimmt Events per
14//!    `&`, jeder Sink entscheidet selber ob er klont/serialisiert.
15//! 3. **Production-tauglich**: der mitgelieferte [`StderrJsonSink`]
16//!    schreibt JSON-Lines auf stderr — direkt verarbeitbar von
17//!    Vector/fluentd/Datadog/Loki/journald.
18//! 4. **OTel-Bruecke spaeter**: ein eigener Crate `dds-otel` (oder ein
19//!    `tracing-opentelemetry`-Adapter im Konsumenten) kann diesen
20//!    Sink-Trait implementieren und Events als OTLP-Spans schicken.
21//!
22//! ## Event-Modell
23//!
24//! Events sind grobgranular: ein Event pro Endpoint-Lifecycle-Aktion
25//! oder pro Sample-Pfad-Phase. Im Hot-Path (z.B. pro-Sample-Latency)
26//! benutzen wir **keine** Events — stattdessen die atomaren Stats
27//! aus D.4 Phase A. Events sind fuer Coarse-Grained-Telemetry, nicht
28//! fuer p99-Latenz-Sampling.
29
30#[cfg(feature = "alloc")]
31use alloc::string::String;
32#[cfg(feature = "alloc")]
33use alloc::sync::Arc;
34#[cfg(feature = "alloc")]
35use alloc::vec::Vec;
36
37#[cfg(feature = "std")]
38use std::io::{self, Write};
39#[cfg(feature = "std")]
40use std::sync::Mutex;
41
42/// Schweregrad eines Events. An OTel/Syslog-Levels angelehnt.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub enum Level {
45    /// Normaler Lifecycle-Event (Endpoint create/destroy, match).
46    Info,
47    /// Hinweis auf abnormale aber nicht-fatale Situation
48    /// (Discovery-Timeout, einzelner Drop).
49    Warn,
50    /// Funktional fehlgeschlagene Operation.
51    Error,
52}
53
54impl Level {
55    /// JSON-/Logfile-konforme Klein-Schreibweise.
56    #[must_use]
57    pub const fn as_str(self) -> &'static str {
58        match self {
59            Self::Info => "info",
60            Self::Warn => "warn",
61            Self::Error => "error",
62        }
63    }
64}
65
66/// Event-Quelle. Identifiziert die Schicht.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub enum Component {
69    /// DCPS / Domain-Participant-Pfad.
70    Dcps,
71    /// SPDP/SEDP Discovery.
72    Discovery,
73    /// RTPS-Wire / Reader / Writer.
74    Rtps,
75    /// Security-Plugins.
76    Security,
77    /// Transport-Layer (UDP/TCP/SHM).
78    Transport,
79    /// User-defined sub-system (Bridges, Tools).
80    User,
81}
82
83impl Component {
84    /// Maschinenlesbares Label.
85    #[must_use]
86    pub const fn as_str(self) -> &'static str {
87        match self {
88            Self::Dcps => "dcps",
89            Self::Discovery => "discovery",
90            Self::Rtps => "rtps",
91            Self::Security => "security",
92            Self::Transport => "transport",
93            Self::User => "user",
94        }
95    }
96}
97
98/// Strukturiertes Key-Value-Attribut. `value` ist als String gehalten
99/// — der Sink entscheidet, ob/wie er typisiert serialisiert.
100#[cfg(feature = "alloc")]
101#[derive(Debug, Clone)]
102pub struct Attribute {
103    /// Stable-Schluessel (kebab-case empfohlen).
104    pub key: &'static str,
105    /// String-Value.
106    pub value: String,
107}
108
109/// Event-Datensatz. Wird von der DCPS-Runtime und Plugins erzeugt.
110#[cfg(feature = "alloc")]
111#[derive(Debug, Clone)]
112pub struct Event {
113    /// Schweregrad.
114    pub level: Level,
115    /// Verursacher-Komponente.
116    pub component: Component,
117    /// Stable-Event-Name in `domain.event`-Form (z.B.
118    /// `dcps.user_writer.created`, `discovery.peer.matched`).
119    pub name: &'static str,
120    /// Optionale strukturierte Attribute.
121    pub attrs: Vec<Attribute>,
122}
123
124#[cfg(feature = "alloc")]
125impl Event {
126    /// Konstruiert ein neues Event ohne Attribute.
127    #[must_use]
128    pub fn new(level: Level, component: Component, name: &'static str) -> Self {
129        Self {
130            level,
131            component,
132            name,
133            attrs: Vec::new(),
134        }
135    }
136
137    /// Builder-Form: ein Attribut anhaengen.
138    #[must_use]
139    pub fn with_attr(mut self, key: &'static str, value: impl Into<String>) -> Self {
140        self.attrs.push(Attribute {
141            key,
142            value: value.into(),
143        });
144        self
145    }
146}
147
148/// Sink-Trait: Konsumenten implementieren `record` und entscheiden,
149/// wohin das Event geht (stderr, OTLP, prometheus, /dev/null).
150#[cfg(feature = "alloc")]
151pub trait Sink: Send + Sync {
152    /// Verarbeitet ein Event. **Synchron.** Sinks duerfen schreiben,
153    /// puffern oder droppen — der Aufrufer wartet, also bitte nicht
154    /// blockieren (z.B. nicht synchrones HTTP-POST aus dem Hot-Path).
155    fn record(&self, event: &Event);
156}
157
158/// No-op-Sink. Default-Wahl wenn keine Telemetry konfiguriert ist —
159/// jeder `record`-Call ist ein direkter Return.
160#[cfg(feature = "alloc")]
161#[derive(Debug, Clone, Copy)]
162pub struct NullSink;
163
164#[cfg(feature = "alloc")]
165impl Sink for NullSink {
166    fn record(&self, _event: &Event) {}
167}
168
169/// Stderr-Sink: schreibt jedes Event als JSON-Line auf stderr.
170/// Geeignet fuer Docker/k8s/journald-Pipelines mit nachgelagerten
171/// Loglinks (Vector/fluentd/Datadog-Agent/Loki).
172///
173/// Format pro Zeile:
174///
175/// ```json
176/// {"level":"info","component":"dcps","name":"user_writer.created","attrs":{"topic":"Foo","reliable":"true"}}
177/// ```
178///
179/// Synchron ueber `std::io::stderr()`. Mutex schuetzt vor
180/// interleaved-Output zwischen Threads.
181#[cfg(feature = "std")]
182#[derive(Debug)]
183pub struct StderrJsonSink {
184    out: Mutex<io::Stderr>,
185}
186
187#[cfg(feature = "std")]
188impl Default for StderrJsonSink {
189    fn default() -> Self {
190        Self {
191            out: Mutex::new(io::stderr()),
192        }
193    }
194}
195
196#[cfg(feature = "std")]
197impl StderrJsonSink {
198    /// Konstruktor.
199    #[must_use]
200    pub fn new() -> Self {
201        Self::default()
202    }
203}
204
205#[cfg(feature = "std")]
206impl Sink for StderrJsonSink {
207    fn record(&self, event: &Event) {
208        let line = serialize_json_line(event);
209        if let Ok(mut out) = self.out.lock() {
210            // Errors auf stderr ignorieren — Sink darf den App-Pfad
211            // nicht torpedieren wenn jemand stderr schliesst.
212            let _ = out.write_all(line.as_bytes());
213            let _ = out.write_all(b"\n");
214            let _ = out.flush();
215        }
216    }
217}
218
219/// In-Memory-Sink fuer Tests. Sammelt Events in einem `Mutex<Vec>`.
220#[cfg(feature = "std")]
221#[derive(Debug, Default)]
222pub struct VecSink {
223    events: Mutex<Vec<Event>>,
224}
225
226#[cfg(feature = "std")]
227impl VecSink {
228    /// Konstruktor.
229    #[must_use]
230    pub fn new() -> Self {
231        Self::default()
232    }
233
234    /// Snapshot der bisher gesammelten Events.
235    #[must_use]
236    pub fn snapshot(&self) -> Vec<Event> {
237        self.events.lock().map(|e| e.clone()).unwrap_or_default()
238    }
239
240    /// Anzahl Events bisher.
241    #[must_use]
242    pub fn len(&self) -> usize {
243        self.events.lock().map(|e| e.len()).unwrap_or(0)
244    }
245
246    /// True wenn keine Events.
247    #[must_use]
248    pub fn is_empty(&self) -> bool {
249        self.len() == 0
250    }
251}
252
253#[cfg(feature = "std")]
254impl Sink for VecSink {
255    fn record(&self, event: &Event) {
256        if let Ok(mut v) = self.events.lock() {
257            v.push(event.clone());
258        }
259    }
260}
261
262// zerodds-lint: allow no_dyn_in_safe
263// SharedSink benoetigt `Arc<dyn Sink>` damit Konsumenten beliebige
264// Sink-Implementations injizieren koennen (StderrJsonSink, OTLP-Bridge,
265// custom Forwarder). Die Sinks selbst sind Send+Sync; trait-objects
266// hier sind ein Architektur-Vertrag, keine Speicher-Sicherheits-Frage.
267
268/// Type-erased shared Sink-Handle.
269#[cfg(feature = "alloc")]
270pub type SharedSink = Arc<dyn Sink>;
271
272/// Liefert einen `SharedSink`, der nichts macht. Default-Wahl.
273#[cfg(feature = "alloc")]
274#[must_use]
275pub fn null_sink() -> SharedSink {
276    Arc::new(NullSink)
277}
278
279// ============================================================================
280// JSON-Serialisierung — minimal, ohne serde (foundation soll dep-frei
281// bleiben). RFC 8259 Subset: Strings mit \"-Escape, keine Unicode-
282// Eskapaden ausser \\ \" \n \r \t.
283// ============================================================================
284
285// Wird nur vom StderrJsonSink (feature=std) genutzt; alloc-only-
286// Build erkennt sie als dead. Allow ist sauberer als pro-Aufrufer-cfg.
287#[cfg(feature = "alloc")]
288#[allow(dead_code)]
289fn serialize_json_line(event: &Event) -> String {
290    let mut s = String::new();
291    s.push('{');
292    s.push_str("\"level\":");
293    push_json_string(&mut s, event.level.as_str());
294    s.push_str(",\"component\":");
295    push_json_string(&mut s, event.component.as_str());
296    s.push_str(",\"name\":");
297    push_json_string(&mut s, event.name);
298    if !event.attrs.is_empty() {
299        s.push_str(",\"attrs\":{");
300        for (i, a) in event.attrs.iter().enumerate() {
301            if i > 0 {
302                s.push(',');
303            }
304            push_json_string(&mut s, a.key);
305            s.push(':');
306            push_json_string(&mut s, &a.value);
307        }
308        s.push('}');
309    }
310    s.push('}');
311    s
312}
313
314#[cfg(feature = "alloc")]
315#[allow(dead_code)]
316fn push_json_string(out: &mut String, value: &str) {
317    out.push('"');
318    for ch in value.chars() {
319        match ch {
320            '"' => out.push_str("\\\""),
321            '\\' => out.push_str("\\\\"),
322            '\n' => out.push_str("\\n"),
323            '\r' => out.push_str("\\r"),
324            '\t' => out.push_str("\\t"),
325            c if (c as u32) < 0x20 => {
326                // Control char → \u00XX
327                let _ = core::fmt::Write::write_fmt(out, core::format_args!("\\u{:04x}", c as u32));
328            }
329            c => out.push(c),
330        }
331    }
332    out.push('"');
333}
334
335#[cfg(test)]
336#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn level_labels() {
342        assert_eq!(Level::Info.as_str(), "info");
343        assert_eq!(Level::Warn.as_str(), "warn");
344        assert_eq!(Level::Error.as_str(), "error");
345    }
346
347    #[test]
348    fn component_labels() {
349        assert_eq!(Component::Dcps.as_str(), "dcps");
350        assert_eq!(Component::Discovery.as_str(), "discovery");
351        assert_eq!(Component::Rtps.as_str(), "rtps");
352        assert_eq!(Component::Security.as_str(), "security");
353        assert_eq!(Component::Transport.as_str(), "transport");
354        assert_eq!(Component::User.as_str(), "user");
355    }
356
357    #[test]
358    fn event_builder_attrs() {
359        let e = Event::new(Level::Info, Component::Dcps, "user_writer.created")
360            .with_attr("topic", "Foo")
361            .with_attr("reliable", "true");
362        assert_eq!(e.attrs.len(), 2);
363        assert_eq!(e.attrs[0].key, "topic");
364        assert_eq!(e.attrs[0].value, "Foo");
365    }
366
367    #[test]
368    fn null_sink_is_no_op() {
369        let s = NullSink;
370        let e = Event::new(Level::Info, Component::Dcps, "x");
371        s.record(&e); // kein Panic, keine Mutation.
372    }
373
374    #[test]
375    fn vec_sink_collects() {
376        let s = VecSink::new();
377        s.record(&Event::new(Level::Info, Component::Dcps, "a"));
378        s.record(&Event::new(Level::Warn, Component::Rtps, "b"));
379        assert_eq!(s.len(), 2);
380        let snap = s.snapshot();
381        assert_eq!(snap[0].name, "a");
382        assert_eq!(snap[1].level, Level::Warn);
383    }
384
385    #[test]
386    fn serialize_json_line_basic() {
387        let e = Event::new(Level::Info, Component::Dcps, "user_writer.created");
388        let s = serialize_json_line(&e);
389        assert_eq!(
390            s,
391            r#"{"level":"info","component":"dcps","name":"user_writer.created"}"#
392        );
393    }
394
395    #[test]
396    fn serialize_json_line_with_attrs() {
397        let e = Event::new(Level::Info, Component::Dcps, "writer.created")
398            .with_attr("topic", "Foo")
399            .with_attr("reliable", "true");
400        let s = serialize_json_line(&e);
401        assert!(s.contains(r#""attrs":{"topic":"Foo","reliable":"true"}"#));
402    }
403
404    #[test]
405    fn serialize_escapes_special_chars() {
406        let e = Event::new(Level::Info, Component::User, "x").with_attr("k", "a\"b\\c\nd\te");
407        let s = serialize_json_line(&e);
408        assert!(s.contains(r#""k":"a\"b\\c\nd\te""#));
409    }
410
411    #[test]
412    fn serialize_escapes_control_chars() {
413        let e = Event::new(Level::Info, Component::User, "x").with_attr("k", "\x01");
414        let s = serialize_json_line(&e);
415        assert!(
416            s.contains("\\u0001"),
417            "control-char must be \\uXXXX, got: {s}"
418        );
419    }
420
421    #[test]
422    fn null_sink_handle_typed() {
423        let h: SharedSink = null_sink();
424        h.record(&Event::new(Level::Info, Component::Dcps, "x"));
425    }
426
427    #[test]
428    fn vec_sink_threadsafe_smoke() {
429        use std::sync::Arc as StdArc;
430        use std::thread;
431        let s: StdArc<VecSink> = StdArc::new(VecSink::new());
432        let mut handles = Vec::new();
433        for i in 0..4 {
434            let s = StdArc::clone(&s);
435            handles.push(thread::spawn(move || {
436                for _ in 0..100 {
437                    s.record(&Event::new(
438                        Level::Info,
439                        Component::User,
440                        if i % 2 == 0 { "even" } else { "odd" },
441                    ));
442                }
443            }));
444        }
445        for h in handles {
446            h.join().unwrap();
447        }
448        assert_eq!(s.len(), 400);
449    }
450
451    #[test]
452    fn stderr_json_sink_does_not_panic() {
453        // Smoke: schreiben auf stderr soll niemals panicen.
454        let s = StderrJsonSink::new();
455        s.record(&Event::new(Level::Info, Component::Dcps, "stderr.smoke"));
456    }
457}