Skip to main content

zerodds_observability_otlp/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! OTLP/HTTP/JSON-Exporter fuer ZeroDDS-Telemetrie.
5//!
6//! Crate `zerodds-observability-otlp`. Safety classification: **STANDARD**.
7//!
8//! Spec: `docs/specs/zerodds-observability-otlp-1.0.md`.
9//! Schichten-Position: Layer 4 — Core Services (Konsumenten-Pfad fuer
10//! `foundation::tracing::{Span, Histogram}` + `foundation::observability::Event`).
11//!
12//! # Was wird exportiert?
13//!
14//! Drei OTLP-Endpunkte werden bedient:
15//!
16//! * **`/v1/traces`** — Spans aus [`foundation::tracing::Span`].
17//! * **`/v1/metrics`** — Histograms aus [`foundation::tracing::Histogram`].
18//! * **`/v1/logs`** — Events aus [`foundation::observability::Event`].
19//!
20//! Wir benutzen das **OTLP/HTTP/JSON**-Format laut OpenTelemetry-Spec
21//! v1.4 (https://github.com/open-telemetry/opentelemetry-proto/blob/v1.4.0/
22//! docs/specification.md#otlphttp). JSON ist offiziell unterstuetzt
23//! und braucht keine Protobuf-Codegen-Pipeline — perfekt fuer
24//! pure-Rust ohne `prost`/`tonic`.
25//!
26//! Pro Tick werden alle gepufferten Spans/Histogramme/Events in
27//! einer Batch-Request POSTed.
28//!
29//! # Default-Endpoints
30//!
31//! Standardmaessig laeuft ein lokaler OTel-Collector auf
32//! `http://127.0.0.1:4318/v1/traces` etc. — siehe
33//! `examples/otel/jaeger-compose.yml` fuer einen Local-Stack.
34
35#![warn(missing_docs)]
36
37use std::io::{Read, Write};
38use std::net::TcpStream;
39use std::sync::Mutex;
40use std::time::Duration;
41
42use zerodds_foundation::observability::{Component, Event, Level};
43use zerodds_foundation::tracing::{Histogram, Span, SpanKind, SpanStatus};
44
45/// Defaults — OTel-Collector auf localhost (Jaeger-compose).
46pub const DEFAULT_OTLP_HOST: &str = "127.0.0.1";
47/// Default OTLP/HTTP-Port (OTel-Spec).
48pub const DEFAULT_OTLP_PORT: u16 = 4318;
49
50/// Konfiguration fuer den Exporter.
51#[derive(Clone, Debug)]
52pub struct OtlpConfig {
53    /// OTel-Collector-Host.
54    pub host: String,
55    /// OTel-Collector-Port (HTTP).
56    pub port: u16,
57    /// Service-Name fuer alle Resource-Attributes.
58    pub service_name: String,
59    /// Service-Version (z.B. Cargo-Version).
60    pub service_version: String,
61    /// Connect/Write-Timeout.
62    pub timeout: Duration,
63}
64
65impl Default for OtlpConfig {
66    fn default() -> Self {
67        Self {
68            host: DEFAULT_OTLP_HOST.into(),
69            port: DEFAULT_OTLP_PORT,
70            service_name: "zerodds".into(),
71            service_version: env!("CARGO_PKG_VERSION").into(),
72            timeout: Duration::from_secs(5),
73        }
74    }
75}
76
77/// Exporter-Handle. Buffert Spans/Histogramme/Events und flush'd
78/// sie via POST an den OTel-Collector.
79pub struct OtlpExporter {
80    cfg: OtlpConfig,
81    buf: Mutex<ExporterBuffers>,
82}
83
84#[derive(Default)]
85struct ExporterBuffers {
86    spans: Vec<Span>,
87    histograms: Vec<Histogram>,
88    events: Vec<Event>,
89}
90
91/// Fehler beim Export.
92#[derive(Debug)]
93pub enum ExportError {
94    /// Connect/Write/Read fehlgeschlagen.
95    Io(std::io::Error),
96    /// HTTP-Status != 2xx.
97    HttpStatus {
98        /// Status-Code.
99        code: u16,
100        /// Body-Snippet (max 256 byte).
101        body_snippet: String,
102    },
103    /// Mutex vergiftet.
104    Poisoned,
105}
106
107impl std::fmt::Display for ExportError {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        match self {
110            Self::Io(e) => write!(f, "io: {e}"),
111            Self::HttpStatus { code, body_snippet } => {
112                write!(f, "http {code}: {body_snippet}")
113            }
114            Self::Poisoned => write!(f, "exporter mutex poisoned"),
115        }
116    }
117}
118
119impl std::error::Error for ExportError {}
120
121impl OtlpExporter {
122    /// Neuer Exporter mit der gegebenen Config.
123    #[must_use]
124    pub fn new(cfg: OtlpConfig) -> Self {
125        Self {
126            cfg,
127            buf: Mutex::new(ExporterBuffers::default()),
128        }
129    }
130
131    /// Fuegt einen Span zur Pending-Queue hinzu.
132    pub fn add_span(&self, span: Span) {
133        if let Ok(mut b) = self.buf.lock() {
134            b.spans.push(span);
135        }
136    }
137
138    /// Fuegt ein Histogram (Snapshot) hinzu.
139    pub fn add_histogram(&self, h: Histogram) {
140        if let Ok(mut b) = self.buf.lock() {
141            b.histograms.push(h);
142        }
143    }
144
145    /// Fuegt ein Event hinzu.
146    pub fn add_event(&self, e: Event) {
147        if let Ok(mut b) = self.buf.lock() {
148            b.events.push(e);
149        }
150    }
151
152    /// Flush + POST. Blockiert bis Collector geantwortet hat oder
153    /// Timeout erreicht ist. Leert die Buffer auch bei Fehler.
154    ///
155    /// # Errors
156    /// IO/HTTP-Fehler aus dem POST-Request.
157    pub fn flush(&self) -> Result<(), ExportError> {
158        let (spans, histograms, events) = {
159            let mut b = self.buf.lock().map_err(|_| ExportError::Poisoned)?;
160            (
161                std::mem::take(&mut b.spans),
162                std::mem::take(&mut b.histograms),
163                std::mem::take(&mut b.events),
164            )
165        };
166        if !spans.is_empty() {
167            let body = build_traces_json(&self.cfg, &spans);
168            self.post("/v1/traces", &body)?;
169        }
170        if !histograms.is_empty() {
171            let body = build_metrics_json(&self.cfg, &histograms);
172            self.post("/v1/metrics", &body)?;
173        }
174        if !events.is_empty() {
175            let body = build_logs_json(&self.cfg, &events);
176            self.post("/v1/logs", &body)?;
177        }
178        Ok(())
179    }
180
181    /// Direct HTTP-POST nach `path`. JSON content-type.
182    fn post(&self, path: &str, body: &str) -> Result<(), ExportError> {
183        let addr = format!("{}:{}", self.cfg.host, self.cfg.port);
184        let mut stream = TcpStream::connect(&addr).map_err(ExportError::Io)?;
185        stream.set_write_timeout(Some(self.cfg.timeout)).ok();
186        stream.set_read_timeout(Some(self.cfg.timeout)).ok();
187        let req = format!(
188            "POST {} HTTP/1.1\r\n\
189             Host: {}\r\n\
190             User-Agent: zerodds-otlp/0.1\r\n\
191             Content-Type: application/json\r\n\
192             Content-Length: {}\r\n\
193             Connection: close\r\n\r\n{}",
194            path,
195            self.cfg.host,
196            body.len(),
197            body
198        );
199        stream.write_all(req.as_bytes()).map_err(ExportError::Io)?;
200        let mut resp = Vec::new();
201        let _ = stream.read_to_end(&mut resp);
202        let resp_str = String::from_utf8_lossy(&resp);
203        // Parse status-line.
204        let (code, body_start) = parse_http_status(&resp_str);
205        if !(200..300).contains(&code) {
206            let snippet: String = resp_str[body_start.min(resp_str.len())..]
207                .chars()
208                .take(256)
209                .collect();
210            return Err(ExportError::HttpStatus {
211                code,
212                body_snippet: snippet,
213            });
214        }
215        Ok(())
216    }
217}
218
219fn parse_http_status(resp: &str) -> (u16, usize) {
220    if let Some(line_end) = resp.find('\n') {
221        let first_line = &resp[..line_end];
222        let parts: Vec<&str> = first_line.split_whitespace().collect();
223        if parts.len() >= 2 {
224            let code: u16 = parts[1].parse().unwrap_or(0);
225            let body_start = resp.find("\r\n\r\n").map(|i| i + 4).unwrap_or(resp.len());
226            return (code, body_start);
227        }
228    }
229    (0, resp.len())
230}
231
232// ============================================================================
233// JSON-Builder — minimaler Serializer nach OTLP/HTTP/JSON-Spec.
234// ============================================================================
235
236fn build_traces_json(cfg: &OtlpConfig, spans: &[Span]) -> String {
237    // Top-level: { resourceSpans: [{ resource, scopeSpans: [{ scope, spans }] }] }
238    let mut out = String::with_capacity(512 + spans.len() * 256);
239    out.push_str(r#"{"resourceSpans":[{"resource":"#);
240    push_resource(&mut out, cfg);
241    out.push_str(r#","scopeSpans":[{"scope":{"name":"zerodds","version":""#);
242    push_str_escaped(&mut out, &cfg.service_version);
243    out.push_str(r#""},"spans":["#);
244    for (i, s) in spans.iter().enumerate() {
245        if i > 0 {
246            out.push(',');
247        }
248        push_span(&mut out, s);
249    }
250    out.push_str("]}]}]}");
251    out
252}
253
254fn build_metrics_json(cfg: &OtlpConfig, hists: &[Histogram]) -> String {
255    let mut out = String::with_capacity(512 + hists.len() * 256);
256    out.push_str(r#"{"resourceMetrics":[{"resource":"#);
257    push_resource(&mut out, cfg);
258    out.push_str(r#","scopeMetrics":[{"scope":{"name":"zerodds","version":""#);
259    push_str_escaped(&mut out, &cfg.service_version);
260    out.push_str(r#""},"metrics":["#);
261    for (i, h) in hists.iter().enumerate() {
262        if i > 0 {
263            out.push(',');
264        }
265        push_histogram(&mut out, h);
266    }
267    out.push_str("]}]}]}");
268    out
269}
270
271fn build_logs_json(cfg: &OtlpConfig, events: &[Event]) -> String {
272    let mut out = String::with_capacity(512 + events.len() * 200);
273    out.push_str(r#"{"resourceLogs":[{"resource":"#);
274    push_resource(&mut out, cfg);
275    out.push_str(r#","scopeLogs":[{"scope":{"name":"zerodds","version":""#);
276    push_str_escaped(&mut out, &cfg.service_version);
277    out.push_str(r#""},"logRecords":["#);
278    for (i, e) in events.iter().enumerate() {
279        if i > 0 {
280            out.push(',');
281        }
282        push_event(&mut out, e);
283    }
284    out.push_str("]}]}]}");
285    out
286}
287
288fn push_resource(out: &mut String, cfg: &OtlpConfig) {
289    out.push_str(r#"{"attributes":[{"key":"service.name","value":{"stringValue":""#);
290    push_str_escaped(out, &cfg.service_name);
291    out.push_str(r#""}},{"key":"service.version","value":{"stringValue":""#);
292    push_str_escaped(out, &cfg.service_version);
293    out.push_str(r#""}}]}"#);
294}
295
296fn push_span(out: &mut String, s: &Span) {
297    out.push_str(r#"{"traceId":""#);
298    out.push_str(&s.context.trace_id.to_hex());
299    out.push_str(r#"","spanId":""#);
300    out.push_str(&s.context.span_id.to_hex());
301    out.push_str(r#"","name":""#);
302    push_str_escaped(out, &s.name);
303    out.push_str(r#"","kind":"#);
304    out.push_str(match s.kind {
305        SpanKind::Internal => "1",
306        SpanKind::Server => "2",
307        SpanKind::Client => "3",
308    });
309    out.push_str(r#","startTimeUnixNano":""#);
310    push_u64(out, s.start_unix_ns);
311    out.push_str(r#"","endTimeUnixNano":""#);
312    push_u64(out, s.end_unix_ns);
313    out.push('"');
314    if let Some(p) = s.context.parent_span_id {
315        out.push_str(r#","parentSpanId":""#);
316        out.push_str(&p.to_hex());
317        out.push('"');
318    }
319    if let Some(d) = &s.status_description {
320        out.push_str(r#","status":{"code":"#);
321        out.push_str(match s.status {
322            SpanStatus::Unset => "0",
323            SpanStatus::Ok => "1",
324            SpanStatus::Error => "2",
325        });
326        out.push_str(r#","message":""#);
327        push_str_escaped(out, d);
328        out.push_str(r#""}"#);
329    } else {
330        out.push_str(r#","status":{"code":"#);
331        out.push_str(match s.status {
332            SpanStatus::Unset => "0",
333            SpanStatus::Ok => "1",
334            SpanStatus::Error => "2",
335        });
336        out.push('}');
337    }
338    if !s.attributes.is_empty() {
339        out.push_str(r#","attributes":["#);
340        for (i, a) in s.attributes.iter().enumerate() {
341            if i > 0 {
342                out.push(',');
343            }
344            out.push_str(r#"{"key":""#);
345            push_str_escaped(out, a.key);
346            out.push_str(r#"","value":{"stringValue":""#);
347            push_str_escaped(out, &a.value);
348            out.push_str(r#""}}"#);
349        }
350        out.push(']');
351    }
352    out.push('}');
353}
354
355fn push_histogram(out: &mut String, h: &Histogram) {
356    out.push_str(r#"{"name":""#);
357    push_str_escaped(out, &h.name);
358    out.push_str(r#"","unit":"ns","histogram":{"aggregationTemporality":2,"dataPoints":[{"#);
359    out.push_str(r#""startTimeUnixNano":"0","timeUnixNano":"0","count":""#);
360    push_u64(out, h.count);
361    out.push_str(r#"","sum":"#);
362    push_u64(out, h.sum_ns);
363    out.push_str(r#","min":"#);
364    push_u64(out, if h.count == 0 { 0 } else { h.min_ns });
365    out.push_str(r#","max":"#);
366    push_u64(out, h.max_ns);
367    out.push_str(r#","explicitBounds":["#);
368    let bounds = Histogram::bucket_bounds();
369    // OTLP wants strictly-increasing bucket boundaries; use ALL but
370    // last (overflow bucket has no upper bound).
371    for (i, b) in bounds.iter().take(bounds.len() - 1).enumerate() {
372        if i > 0 {
373            out.push(',');
374        }
375        push_u64(out, *b);
376    }
377    out.push_str(r#"],"bucketCounts":["#);
378    for (i, c) in h.buckets.iter().enumerate() {
379        if i > 0 {
380            out.push(',');
381        }
382        out.push('"');
383        push_u64(out, *c);
384        out.push('"');
385    }
386    out.push_str("]}]}}");
387}
388
389fn push_event(out: &mut String, e: &Event) {
390    out.push_str(r#"{"timeUnixNano":"0","severityNumber":"#);
391    out.push_str(match e.level {
392        Level::Info => "9",
393        Level::Warn => "13",
394        Level::Error => "17",
395    });
396    out.push_str(r#","severityText":""#);
397    out.push_str(match e.level {
398        Level::Info => "INFO",
399        Level::Warn => "WARN",
400        Level::Error => "ERROR",
401    });
402    out.push_str(r#"","body":{"stringValue":""#);
403    push_str_escaped(out, e.name);
404    out.push_str(r#""},"attributes":[{"key":"component","value":{"stringValue":""#);
405    out.push_str(component_str(e.component));
406    out.push_str(r#""}}"#);
407    for a in &e.attrs {
408        out.push_str(r#",{"key":""#);
409        push_str_escaped(out, a.key);
410        out.push_str(r#"","value":{"stringValue":""#);
411        push_str_escaped(out, &a.value);
412        out.push_str(r#""}}"#);
413    }
414    out.push_str("]}");
415}
416
417fn component_str(c: Component) -> &'static str {
418    c.as_str()
419}
420
421fn push_u64(out: &mut String, v: u64) {
422    use std::fmt::Write as _;
423    let _ = write!(out, "{v}");
424}
425
426fn push_str_escaped(out: &mut String, s: &str) {
427    for c in s.chars() {
428        match c {
429            '"' => out.push_str(r#"\""#),
430            '\\' => out.push_str(r"\\"),
431            '\n' => out.push_str(r"\n"),
432            '\r' => out.push_str(r"\r"),
433            '\t' => out.push_str(r"\t"),
434            c if (c as u32) < 0x20 => {
435                use std::fmt::Write as _;
436                let _ = write!(out, "\\u{:04x}", c as u32);
437            }
438            c => out.push(c),
439        }
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use zerodds_foundation::observability::Event;
447    use zerodds_foundation::tracing::{Span, SpanContext, SpanId, TraceId};
448
449    #[test]
450    fn config_default_points_to_localhost() {
451        let c = OtlpConfig::default();
452        assert_eq!(c.host, "127.0.0.1");
453        assert_eq!(c.port, 4318);
454        assert_eq!(c.service_name, "zerodds");
455    }
456
457    #[test]
458    fn traces_json_roundtrip_shape() {
459        let span = Span {
460            context: SpanContext::new_root(TraceId([1u8; 16]), SpanId([2u8; 8])),
461            name: "dcps.write".into(),
462            kind: SpanKind::Client,
463            start_unix_ns: 1_700_000_000_000_000_000,
464            end_unix_ns: 1_700_000_000_001_500_000,
465            status: SpanStatus::Ok,
466            status_description: None,
467            attributes: Vec::new(),
468        };
469        let cfg = OtlpConfig::default();
470        let body = build_traces_json(&cfg, &[span]);
471        assert!(body.contains(r#""traceId":"01010101010101010101010101010101""#));
472        assert!(body.contains(r#""spanId":"0202020202020202""#));
473        assert!(body.contains(r#""name":"dcps.write""#));
474        assert!(body.contains(r#""kind":3"#)); // Client
475        assert!(body.contains(r#""service.name""#));
476    }
477
478    #[test]
479    fn metrics_json_roundtrip_shape() {
480        let mut h = Histogram::new("dds.write.latency");
481        h.record_ns(500);
482        h.record_ns(50_000);
483        let cfg = OtlpConfig::default();
484        let body = build_metrics_json(&cfg, &[h]);
485        assert!(body.contains(r#""name":"dds.write.latency""#));
486        assert!(body.contains(r#""unit":"ns""#));
487        assert!(body.contains(r#""count":"2""#));
488        assert!(body.contains(r#""explicitBounds":[1,10,100,1000,10000,100000,"#));
489    }
490
491    #[test]
492    fn logs_json_roundtrip_shape() {
493        let e = Event::new(Level::Info, Component::Dcps, "writer.created").with_attr("topic", "/x");
494        let cfg = OtlpConfig::default();
495        let body = build_logs_json(&cfg, &[e]);
496        assert!(body.contains(r#""severityNumber":9"#));
497        assert!(body.contains(r#""body":{"stringValue":"writer.created"}"#));
498        assert!(body.contains(r#""key":"topic""#));
499    }
500
501    #[test]
502    fn json_escape_handles_quotes_and_newlines() {
503        let mut s = String::new();
504        push_str_escaped(&mut s, "a\"b\nc\\d");
505        assert_eq!(s, r#"a\"b\nc\\d"#);
506    }
507
508    #[test]
509    fn parse_http_status_extracts_code() {
510        let r = "HTTP/1.1 200 OK\r\nServer: x\r\n\r\n{}";
511        let (code, body_start) = parse_http_status(r);
512        assert_eq!(code, 200);
513        assert_eq!(&r[body_start..], "{}");
514    }
515
516    #[test]
517    fn parse_http_status_handles_500() {
518        let r = "HTTP/1.1 500 Internal Server Error\r\n\r\nboom";
519        let (code, _) = parse_http_status(r);
520        assert_eq!(code, 500);
521    }
522
523    #[test]
524    fn flush_drains_buffers_even_with_no_collector() {
525        let cfg = OtlpConfig {
526            host: "127.0.0.1".into(),
527            port: 1, // bewusst unreachable
528            timeout: Duration::from_millis(50),
529            ..OtlpConfig::default()
530        };
531        let exp = OtlpExporter::new(cfg);
532        exp.add_span(Span {
533            context: SpanContext::new_root(TraceId([1u8; 16]), SpanId([2u8; 8])),
534            name: "x".into(),
535            kind: SpanKind::Internal,
536            start_unix_ns: 0,
537            end_unix_ns: 1,
538            status: SpanStatus::Unset,
539            status_description: None,
540            attributes: Vec::new(),
541        });
542        let r = exp.flush();
543        // Erwartet: IO-Fehler beim Connect.
544        assert!(r.is_err());
545        // Buffer wurde dennoch geleert.
546        let r2 = exp.flush();
547        assert!(r2.is_ok(), "second flush with empty buffers should be ok");
548    }
549}