Skip to main content

zerodds_websocket_bridge/daemon/
runtime_common.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Cross-Cutting Daemon-Runtime: §8.2 Prometheus-Metrics, §8.3 OTLP-Spans,
5//! §9.2 Graceful Shutdown, §5.2 Catalog-/Healthz-Endpoint, §6 QoS-Mapping.
6//!
7//! Wiederverwendbar fuer den `zerodds-ws-bridged`-Daemon — die Logik ist
8//! library-side gehalten, damit Tests den `BridgeMetrics`-Set ohne
9//! Subprocess-Boot durchspielen koennen.
10
11#![allow(clippy::print_stderr)]
12
13use std::net::{SocketAddr, TcpListener};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::thread::{self, JoinHandle};
17use std::time::Duration;
18
19use zerodds_monitor::{Counter, Gauge, Labels, Registry};
20use zerodds_observability_otlp::{OtlpConfig, OtlpExporter};
21
22use super::config::{DaemonConfig, TopicConfig};
23
24/// Service-Name fuer OTel-Resource und Catalog.
25pub const SERVICE_NAME: &str = "zerodds-ws-bridged";
26
27/// Versions-String — Cargo-Crate-Version (Single-Source).
28pub const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
29
30// ============================================================================
31// A2 — Prometheus-Metrics-Set (§8.2).
32// ============================================================================
33
34/// Standard-Metric-Set fuer alle Bridges.
35///
36/// Die Metric-Namen folgen dem Bridge-Spec-Schema
37/// `zerodds_<bridge>_<thing>_total`. Counter sind monotonically-increasing,
38/// Gauges sind reset-bar.
39#[derive(Clone)]
40pub struct BridgeMetrics {
41    /// Anzahl eingehender WS-Frames (Text+Binary).
42    pub frames_in_total: Arc<Counter>,
43    /// Anzahl ausgehender WS-Frames.
44    pub frames_out_total: Arc<Counter>,
45    /// Bytes in (Frame-Payload).
46    pub bytes_in_total: Arc<Counter>,
47    /// Bytes out.
48    pub bytes_out_total: Arc<Counter>,
49    /// Aktuell offene Connections.
50    pub connections_active: Arc<Gauge>,
51    /// Lifetime-Connections-akzeptiert.
52    pub connections_total: Arc<Counter>,
53    /// DDS-Samples published into runtime.
54    pub dds_samples_in_total: Arc<Counter>,
55    /// DDS-Samples received from runtime.
56    pub dds_samples_out_total: Arc<Counter>,
57    /// Wire-Errors (decode/encode/socket).
58    pub errors_total: Arc<Counter>,
59}
60
61impl BridgeMetrics {
62    /// Registriert das Standard-Metric-Set in der gegebenen Registry.
63    /// Idempotent — mehrfacher Aufruf liefert dieselben Counter-/Gauge-
64    /// Instanzen zurueck.
65    pub fn register(registry: &Registry) -> Self {
66        registry.set_help(
67            "zerodds_ws_frames_in_total",
68            "WebSocket frames received from peer",
69        );
70        registry.set_help(
71            "zerodds_ws_frames_out_total",
72            "WebSocket frames sent to peer",
73        );
74        registry.set_help("zerodds_ws_bytes_in_total", "WebSocket bytes received");
75        registry.set_help("zerodds_ws_bytes_out_total", "WebSocket bytes sent");
76        registry.set_help(
77            "zerodds_ws_connections_active",
78            "Currently open WebSocket connections",
79        );
80        registry.set_help(
81            "zerodds_ws_connections_total",
82            "Lifetime accepted WebSocket connections",
83        );
84        registry.set_help(
85            "zerodds_ws_dds_samples_in_total",
86            "DDS samples written into runtime via WS",
87        );
88        registry.set_help(
89            "zerodds_ws_dds_samples_out_total",
90            "DDS samples emitted to WS subscribers",
91        );
92        registry.set_help("zerodds_ws_errors_total", "Frame/codec/socket errors");
93
94        Self {
95            frames_in_total: registry.counter("zerodds_ws_frames_in_total", Labels::new()),
96            frames_out_total: registry.counter("zerodds_ws_frames_out_total", Labels::new()),
97            bytes_in_total: registry.counter("zerodds_ws_bytes_in_total", Labels::new()),
98            bytes_out_total: registry.counter("zerodds_ws_bytes_out_total", Labels::new()),
99            connections_active: registry.gauge("zerodds_ws_connections_active", Labels::new()),
100            connections_total: registry.counter("zerodds_ws_connections_total", Labels::new()),
101            dds_samples_in_total: registry
102                .counter("zerodds_ws_dds_samples_in_total", Labels::new()),
103            dds_samples_out_total: registry
104                .counter("zerodds_ws_dds_samples_out_total", Labels::new()),
105            errors_total: registry.counter("zerodds_ws_errors_total", Labels::new()),
106        }
107    }
108}
109
110// ============================================================================
111// A1 — Graceful-Shutdown (§9.2).
112// ============================================================================
113
114/// Lifecycle-Signale, die der Daemon empfangen kann.
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum LifecycleSignal {
117    /// SIGTERM/SIGINT — beginne mit Shutdown.
118    Shutdown,
119    /// SIGHUP — Config-Reload (TLS-Cert + ACL hot-reload).
120    Reload,
121}
122
123/// Installiert einen Signal-Watcher der `SIGTERM`/`SIGINT`/`SIGHUP`
124/// abfaengt und das `shutdown_flag` setzt bzw. den `reload_flag`-
125/// Hook anstoesst.
126///
127/// Der Worker laeuft in einem dedizierten Thread und beendet sich
128/// sobald das `shutdown_flag` gesetzt ist (Self-Notification durch
129/// SIGTERM-Path).
130///
131/// Die Funktion gibt einen Join-Handle zurueck; im Daemon-`Drop`-Pfad
132/// soll dieser nicht aktiv gejoint werden — der Worker-Thread ist
133/// detached gegen das Process-Lifecycle.
134#[cfg(unix)]
135pub fn install_signal_watcher(
136    shutdown_flag: Arc<AtomicBool>,
137    reload_flag: Arc<AtomicBool>,
138) -> std::io::Result<JoinHandle<()>> {
139    use signal_hook::consts::{SIGHUP, SIGINT, SIGTERM};
140    use signal_hook::iterator::Signals;
141
142    let mut signals = Signals::new([SIGTERM, SIGINT, SIGHUP])?;
143    let h = thread::Builder::new()
144        .name("zerodds-ws-signals".into())
145        .spawn(move || {
146            for sig in signals.forever() {
147                match sig {
148                    SIGTERM | SIGINT => {
149                        shutdown_flag.store(true, Ordering::SeqCst);
150                        break;
151                    }
152                    SIGHUP => {
153                        reload_flag.store(true, Ordering::SeqCst);
154                    }
155                    _ => {}
156                }
157            }
158        })?;
159    Ok(h)
160}
161
162// ============================================================================
163// A5 — Catalog/Healthz-Endpoint (§5.2).
164// ============================================================================
165
166/// Catalog-Snapshot fuer `/catalog`-Endpoint.
167#[derive(Clone)]
168pub struct CatalogSnapshot {
169    /// Service-Name.
170    pub service: String,
171    /// Version.
172    pub version: String,
173    /// Bridge-Topics (DDS-Name + Direction + WS-Pfad + QoS).
174    pub topics: Vec<TopicConfig>,
175}
176
177impl CatalogSnapshot {
178    /// Aus Daemon-Config bauen.
179    #[must_use]
180    pub fn from_config(cfg: &DaemonConfig) -> Self {
181        Self {
182            service: SERVICE_NAME.into(),
183            version: SERVICE_VERSION.into(),
184            topics: cfg.topics.clone(),
185        }
186    }
187
188    /// Render als JSON (Spec §5.2).
189    #[must_use]
190    pub fn render_json(&self) -> String {
191        let mut out = String::with_capacity(256 + self.topics.len() * 128);
192        out.push_str("{\"service\":\"");
193        push_json_str(&mut out, &self.service);
194        out.push_str("\",\"version\":\"");
195        push_json_str(&mut out, &self.version);
196        out.push_str("\",\"topics\":[");
197        for (i, t) in self.topics.iter().enumerate() {
198            if i > 0 {
199                out.push(',');
200            }
201            out.push_str("{\"name\":\"");
202            push_json_str(&mut out, &t.name);
203            out.push_str("\",\"type\":\"");
204            push_json_str(&mut out, &t.type_name);
205            out.push_str("\",\"direction\":\"");
206            push_json_str(&mut out, &t.direction);
207            out.push_str("\",\"ws_path\":\"");
208            push_json_str(&mut out, &t.ws_path);
209            out.push_str("\",\"qos\":{\"reliability\":\"");
210            push_json_str(&mut out, &t.reliability);
211            out.push_str("\",\"durability\":\"");
212            push_json_str(&mut out, &t.durability);
213            out.push_str("\",\"history_depth\":");
214            out.push_str(&t.history_depth.to_string());
215            out.push_str("}}");
216        }
217        out.push_str("]}");
218        out
219    }
220}
221
222fn push_json_str(out: &mut String, s: &str) {
223    for c in s.chars() {
224        match c {
225            '"' => out.push_str("\\\""),
226            '\\' => out.push_str("\\\\"),
227            '\n' => out.push_str("\\n"),
228            '\r' => out.push_str("\\r"),
229            '\t' => out.push_str("\\t"),
230            c if (c as u32) < 0x20 => {
231                use std::fmt::Write as _;
232                let _ = write!(out, "\\u{:04x}", c as u32);
233            }
234            c => out.push(c),
235        }
236    }
237}
238
239/// Mini-HTTP-Worker der `/catalog`, `/healthz` und `/metrics` bedient.
240///
241/// Returns `JoinHandle` + bound `SocketAddr`. Beim Drop des stop-flags
242/// laeuft die accept-Loop noch eine Iteration durch (dank Self-Connect).
243pub fn serve_admin_endpoints(
244    addr: SocketAddr,
245    catalog: Arc<CatalogSnapshot>,
246    registry: Arc<Registry>,
247    healthy: Arc<AtomicBool>,
248    stop: Arc<AtomicBool>,
249) -> std::io::Result<(JoinHandle<()>, SocketAddr)> {
250    let listener = TcpListener::bind(addr)?;
251    let bound = listener.local_addr()?;
252    listener.set_nonblocking(true)?;
253
254    let h = thread::Builder::new()
255        .name("zerodds-ws-admin".into())
256        .spawn(move || {
257            loop {
258                if stop.load(Ordering::SeqCst) {
259                    break;
260                }
261                match listener.accept() {
262                    Ok((mut s, _peer)) => {
263                        let _ = s.set_nonblocking(false);
264                        let _ = s.set_read_timeout(Some(Duration::from_secs(2)));
265                        let _ = s.set_write_timeout(Some(Duration::from_secs(2)));
266                        admin_handle(&mut s, &catalog, &registry, &healthy);
267                    }
268                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
269                        thread::sleep(Duration::from_millis(50));
270                    }
271                    Err(_) => {
272                        thread::sleep(Duration::from_millis(100));
273                    }
274                }
275            }
276        })?;
277    Ok((h, bound))
278}
279
280fn admin_handle(
281    stream: &mut std::net::TcpStream,
282    catalog: &CatalogSnapshot,
283    registry: &Registry,
284    healthy: &AtomicBool,
285) {
286    use std::io::{Read, Write};
287    let mut buf = [0u8; 1024];
288    let n = match stream.read(&mut buf) {
289        Ok(n) => n,
290        Err(_) => return,
291    };
292    let req = String::from_utf8_lossy(&buf[..n]);
293    let first_line = req.lines().next().unwrap_or("");
294
295    // Path extrahieren.
296    let path = first_line
297        .split_whitespace()
298        .nth(1)
299        .unwrap_or("/")
300        .split('?')
301        .next()
302        .unwrap_or("/");
303
304    let (status, ctype, body) = match path {
305        "/catalog" => ("200 OK", "application/json", catalog.render_json()),
306        "/healthz" => {
307            if healthy.load(Ordering::SeqCst) {
308                ("200 OK", "text/plain; charset=utf-8", "OK\n".to_string())
309            } else {
310                (
311                    "503 Service Unavailable",
312                    "text/plain; charset=utf-8",
313                    "DOWN\n".to_string(),
314                )
315            }
316        }
317        "/metrics" => (
318            "200 OK",
319            "text/plain; version=0.0.4; charset=utf-8",
320            registry.render_prometheus(),
321        ),
322        "/" => (
323            "200 OK",
324            "text/plain; charset=utf-8",
325            format!("{}\nendpoints: /catalog /healthz /metrics\n", SERVICE_NAME),
326        ),
327        _ => ("404 Not Found", "text/plain; charset=utf-8", String::new()),
328    };
329
330    let resp = format!(
331        "HTTP/1.1 {status}\r\n\
332         Content-Type: {ctype}\r\n\
333         Content-Length: {}\r\n\
334         Connection: close\r\n\r\n{body}",
335        body.len()
336    );
337    let _ = stream.write_all(resp.as_bytes());
338}
339
340// ============================================================================
341// A3 — OTLP-Span-Exporter (§8.3).
342// ============================================================================
343
344/// Parst `OTEL_EXPORTER_OTLP_ENDPOINT`-aehnlichen String in eine
345/// `OtlpConfig`. Akzeptiert `http://host:port`, `host:port`, oder
346/// nur `host` (default Port 4318).
347#[must_use]
348pub fn otlp_config_from_endpoint(service_name: &str, raw: &str) -> OtlpConfig {
349    let trimmed = raw
350        .strip_prefix("http://")
351        .or_else(|| raw.strip_prefix("https://"))
352        .unwrap_or(raw)
353        .trim_end_matches('/');
354    let (host, port) = match trimmed.rsplit_once(':') {
355        Some((h, p)) => (h.to_string(), p.parse().unwrap_or(4318)),
356        None => (trimmed.to_string(), 4318),
357    };
358    OtlpConfig {
359        host,
360        port,
361        service_name: service_name.into(),
362        service_version: SERVICE_VERSION.into(),
363        timeout: Duration::from_secs(5),
364    }
365}
366
367/// Holt `OTEL_EXPORTER_OTLP_ENDPOINT` aus der Umgebung und parst
368/// `host:port`. Liefert `None` wenn die ENV nicht gesetzt ist.
369#[must_use]
370pub fn otlp_config_from_env(service_name: &str) -> Option<OtlpConfig> {
371    let raw = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok()?;
372    Some(otlp_config_from_endpoint(service_name, &raw))
373}
374
375/// Spawn-t einen periodischen `OtlpExporter::flush()`-Thread.
376pub fn spawn_otlp_flush_loop(
377    exporter: Arc<OtlpExporter>,
378    stop: Arc<AtomicBool>,
379    interval: Duration,
380) -> std::io::Result<JoinHandle<()>> {
381    thread::Builder::new()
382        .name("zerodds-ws-otlp".into())
383        .spawn(move || {
384            while !stop.load(Ordering::SeqCst) {
385                thread::sleep(interval);
386                if stop.load(Ordering::SeqCst) {
387                    break;
388                }
389                // Fehler werden geschluckt — der Collector kann offline
390                // sein; das ist fuer den Daemon-Pfad nicht fatal.
391                let _ = exporter.flush();
392            }
393            // Final flush.
394            let _ = exporter.flush();
395        })
396}
397
398// ============================================================================
399// Tests
400// ============================================================================
401
402#[cfg(test)]
403#[allow(clippy::expect_used, clippy::unwrap_used)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn metrics_register_is_idempotent() {
409        let r = Registry::new();
410        let m1 = BridgeMetrics::register(&r);
411        let m2 = BridgeMetrics::register(&r);
412        m1.frames_in_total.inc();
413        // Same registry-key => same Counter instance => share state.
414        assert_eq!(m2.frames_in_total.get(), 1);
415    }
416
417    #[test]
418    fn metrics_counter_visible_in_prometheus_render() {
419        let r = Registry::new();
420        let m = BridgeMetrics::register(&r);
421        m.frames_in_total.add(7);
422        m.connections_active.set(2);
423        let text = r.render_prometheus();
424        assert!(
425            text.contains("zerodds_ws_frames_in_total 7"),
426            "expected counter in render, got:\n{text}"
427        );
428        assert!(
429            text.contains("zerodds_ws_connections_active 2"),
430            "expected gauge in render, got:\n{text}"
431        );
432    }
433
434    #[test]
435    fn catalog_render_json_contains_topic_fields() {
436        let mut cfg = DaemonConfig::default_for_dev();
437        cfg.topics.push(TopicConfig {
438            name: "Chat::Message".into(),
439            type_name: "Chat::Message".into(),
440            direction: "bidir".into(),
441            ws_path: "/topics/chat/message".into(),
442            reliability: "reliable".into(),
443            durability: "volatile".into(),
444            history_depth: 10,
445        });
446        let snap = CatalogSnapshot::from_config(&cfg);
447        let j = snap.render_json();
448        assert!(j.contains("\"service\":\"zerodds-ws-bridged\""));
449        assert!(j.contains("\"name\":\"Chat::Message\""));
450        assert!(j.contains("\"ws_path\":\"/topics/chat/message\""));
451        assert!(j.contains("\"reliability\":\"reliable\""));
452    }
453
454    #[test]
455    fn admin_endpoint_serves_catalog_and_healthz_and_metrics() {
456        use std::io::{Read, Write};
457        use std::net::TcpStream;
458        let mut cfg = DaemonConfig::default_for_dev();
459        cfg.topics.push(TopicConfig {
460            name: "T".into(),
461            type_name: "T".into(),
462            direction: "out".into(),
463            ws_path: "/topics/t".into(),
464            reliability: "reliable".into(),
465            durability: "volatile".into(),
466            history_depth: 5,
467        });
468        let snap = Arc::new(CatalogSnapshot::from_config(&cfg));
469        let reg = Arc::new(Registry::new());
470        let metrics = BridgeMetrics::register(&reg);
471        metrics.frames_in_total.add(42);
472
473        let healthy = Arc::new(AtomicBool::new(true));
474        let stop = Arc::new(AtomicBool::new(false));
475
476        let (_h, bound) = serve_admin_endpoints(
477            "127.0.0.1:0".parse().unwrap(),
478            Arc::clone(&snap),
479            Arc::clone(&reg),
480            Arc::clone(&healthy),
481            Arc::clone(&stop),
482        )
483        .expect("spawn admin");
484
485        // /catalog
486        let mut s =
487            TcpStream::connect_timeout(&bound, Duration::from_secs(2)).expect("connect catalog");
488        s.set_read_timeout(Some(Duration::from_secs(2))).ok();
489        s.write_all(b"GET /catalog HTTP/1.1\r\nHost: x\r\n\r\n")
490            .unwrap();
491        let mut body = String::new();
492        s.read_to_string(&mut body).ok();
493        assert!(body.contains("HTTP/1.1 200"));
494        assert!(body.contains("\"name\":\"T\""), "got: {body}");
495
496        // /healthz
497        let mut s =
498            TcpStream::connect_timeout(&bound, Duration::from_secs(2)).expect("connect health");
499        s.set_read_timeout(Some(Duration::from_secs(2))).ok();
500        s.write_all(b"GET /healthz HTTP/1.1\r\nHost: x\r\n\r\n")
501            .unwrap();
502        let mut body = String::new();
503        s.read_to_string(&mut body).ok();
504        assert!(body.contains("HTTP/1.1 200"));
505        assert!(body.contains("OK"));
506
507        // /metrics
508        let mut s =
509            TcpStream::connect_timeout(&bound, Duration::from_secs(2)).expect("connect metrics");
510        s.set_read_timeout(Some(Duration::from_secs(2))).ok();
511        s.write_all(b"GET /metrics HTTP/1.1\r\nHost: x\r\n\r\n")
512            .unwrap();
513        let mut body = String::new();
514        s.read_to_string(&mut body).ok();
515        assert!(
516            body.contains("zerodds_ws_frames_in_total 42"),
517            "got: {body}"
518        );
519
520        // /healthz down
521        healthy.store(false, Ordering::SeqCst);
522        let mut s =
523            TcpStream::connect_timeout(&bound, Duration::from_secs(2)).expect("connect health2");
524        s.set_read_timeout(Some(Duration::from_secs(2))).ok();
525        s.write_all(b"GET /healthz HTTP/1.1\r\nHost: x\r\n\r\n")
526            .unwrap();
527        let mut body = String::new();
528        s.read_to_string(&mut body).ok();
529        assert!(body.contains("HTTP/1.1 503"));
530
531        stop.store(true, Ordering::SeqCst);
532    }
533
534    #[test]
535    fn otlp_config_from_endpoint_parses_http_url() {
536        let c = otlp_config_from_endpoint("svc-1", "http://collector.local:4318");
537        assert_eq!(c.host, "collector.local");
538        assert_eq!(c.port, 4318);
539        assert_eq!(c.service_name, "svc-1");
540    }
541
542    #[test]
543    fn otlp_config_from_endpoint_parses_bare_host_port() {
544        let c = otlp_config_from_endpoint("svc", "host:9999");
545        assert_eq!(c.host, "host");
546        assert_eq!(c.port, 9999);
547    }
548
549    #[test]
550    fn otlp_config_from_endpoint_handles_https_and_trailing_slash() {
551        let c = otlp_config_from_endpoint("svc", "https://otel.svc.local:4318/");
552        assert_eq!(c.host, "otel.svc.local");
553        assert_eq!(c.port, 4318);
554    }
555
556    #[test]
557    fn otlp_config_from_endpoint_falls_back_to_default_port() {
558        let c = otlp_config_from_endpoint("svc", "host-only");
559        assert_eq!(c.host, "host-only");
560        assert_eq!(c.port, 4318);
561    }
562}
563
564#[cfg(not(unix))]
565pub fn install_signal_watcher(
566    _shutdown_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
567    _reload_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
568) -> std::io::Result<std::thread::JoinHandle<()>> {
569    // Windows: signal_hook::iterator nur POSIX. Spawn dummy thread,
570    // shutdown laeuft ueber die normalen socket-close-Pfade.
571    Ok(std::thread::spawn(|| {}))
572}