1#![allow(clippy::print_stderr)]
12
13use std::net::{SocketAddr, TcpListener};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::thread::{self, JoinHandle};
17use std::time::Duration;
18
19use zerodds_monitor::{Counter, Gauge, Labels, Registry};
20use zerodds_observability_otlp::{OtlpConfig, OtlpExporter};
21
22use super::config::{DaemonConfig, TopicConfig};
23
24pub const SERVICE_NAME: &str = "zerodds-ws-bridged";
26
27pub const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
29
30#[derive(Clone)]
40pub struct BridgeMetrics {
41 pub frames_in_total: Arc<Counter>,
43 pub frames_out_total: Arc<Counter>,
45 pub bytes_in_total: Arc<Counter>,
47 pub bytes_out_total: Arc<Counter>,
49 pub connections_active: Arc<Gauge>,
51 pub connections_total: Arc<Counter>,
53 pub dds_samples_in_total: Arc<Counter>,
55 pub dds_samples_out_total: Arc<Counter>,
57 pub errors_total: Arc<Counter>,
59}
60
61impl BridgeMetrics {
62 pub fn register(registry: &Registry) -> Self {
66 registry.set_help(
67 "zerodds_ws_frames_in_total",
68 "WebSocket frames received from peer",
69 );
70 registry.set_help(
71 "zerodds_ws_frames_out_total",
72 "WebSocket frames sent to peer",
73 );
74 registry.set_help("zerodds_ws_bytes_in_total", "WebSocket bytes received");
75 registry.set_help("zerodds_ws_bytes_out_total", "WebSocket bytes sent");
76 registry.set_help(
77 "zerodds_ws_connections_active",
78 "Currently open WebSocket connections",
79 );
80 registry.set_help(
81 "zerodds_ws_connections_total",
82 "Lifetime accepted WebSocket connections",
83 );
84 registry.set_help(
85 "zerodds_ws_dds_samples_in_total",
86 "DDS samples written into runtime via WS",
87 );
88 registry.set_help(
89 "zerodds_ws_dds_samples_out_total",
90 "DDS samples emitted to WS subscribers",
91 );
92 registry.set_help("zerodds_ws_errors_total", "Frame/codec/socket errors");
93
94 Self {
95 frames_in_total: registry.counter("zerodds_ws_frames_in_total", Labels::new()),
96 frames_out_total: registry.counter("zerodds_ws_frames_out_total", Labels::new()),
97 bytes_in_total: registry.counter("zerodds_ws_bytes_in_total", Labels::new()),
98 bytes_out_total: registry.counter("zerodds_ws_bytes_out_total", Labels::new()),
99 connections_active: registry.gauge("zerodds_ws_connections_active", Labels::new()),
100 connections_total: registry.counter("zerodds_ws_connections_total", Labels::new()),
101 dds_samples_in_total: registry
102 .counter("zerodds_ws_dds_samples_in_total", Labels::new()),
103 dds_samples_out_total: registry
104 .counter("zerodds_ws_dds_samples_out_total", Labels::new()),
105 errors_total: registry.counter("zerodds_ws_errors_total", Labels::new()),
106 }
107 }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum LifecycleSignal {
117 Shutdown,
119 Reload,
121}
122
123#[cfg(unix)]
135pub fn install_signal_watcher(
136 shutdown_flag: Arc<AtomicBool>,
137 reload_flag: Arc<AtomicBool>,
138) -> std::io::Result<JoinHandle<()>> {
139 use signal_hook::consts::{SIGHUP, SIGINT, SIGTERM};
140 use signal_hook::iterator::Signals;
141
142 let mut signals = Signals::new([SIGTERM, SIGINT, SIGHUP])?;
143 let h = thread::Builder::new()
144 .name("zerodds-ws-signals".into())
145 .spawn(move || {
146 for sig in signals.forever() {
147 match sig {
148 SIGTERM | SIGINT => {
149 shutdown_flag.store(true, Ordering::SeqCst);
150 break;
151 }
152 SIGHUP => {
153 reload_flag.store(true, Ordering::SeqCst);
154 }
155 _ => {}
156 }
157 }
158 })?;
159 Ok(h)
160}
161
162#[derive(Clone)]
168pub struct CatalogSnapshot {
169 pub service: String,
171 pub version: String,
173 pub topics: Vec<TopicConfig>,
175}
176
177impl CatalogSnapshot {
178 #[must_use]
180 pub fn from_config(cfg: &DaemonConfig) -> Self {
181 Self {
182 service: SERVICE_NAME.into(),
183 version: SERVICE_VERSION.into(),
184 topics: cfg.topics.clone(),
185 }
186 }
187
188 #[must_use]
190 pub fn render_json(&self) -> String {
191 let mut out = String::with_capacity(256 + self.topics.len() * 128);
192 out.push_str("{\"service\":\"");
193 push_json_str(&mut out, &self.service);
194 out.push_str("\",\"version\":\"");
195 push_json_str(&mut out, &self.version);
196 out.push_str("\",\"topics\":[");
197 for (i, t) in self.topics.iter().enumerate() {
198 if i > 0 {
199 out.push(',');
200 }
201 out.push_str("{\"name\":\"");
202 push_json_str(&mut out, &t.name);
203 out.push_str("\",\"type\":\"");
204 push_json_str(&mut out, &t.type_name);
205 out.push_str("\",\"direction\":\"");
206 push_json_str(&mut out, &t.direction);
207 out.push_str("\",\"ws_path\":\"");
208 push_json_str(&mut out, &t.ws_path);
209 out.push_str("\",\"qos\":{\"reliability\":\"");
210 push_json_str(&mut out, &t.reliability);
211 out.push_str("\",\"durability\":\"");
212 push_json_str(&mut out, &t.durability);
213 out.push_str("\",\"history_depth\":");
214 out.push_str(&t.history_depth.to_string());
215 out.push_str("}}");
216 }
217 out.push_str("]}");
218 out
219 }
220}
221
222fn push_json_str(out: &mut String, s: &str) {
223 for c in s.chars() {
224 match c {
225 '"' => out.push_str("\\\""),
226 '\\' => out.push_str("\\\\"),
227 '\n' => out.push_str("\\n"),
228 '\r' => out.push_str("\\r"),
229 '\t' => out.push_str("\\t"),
230 c if (c as u32) < 0x20 => {
231 use std::fmt::Write as _;
232 let _ = write!(out, "\\u{:04x}", c as u32);
233 }
234 c => out.push(c),
235 }
236 }
237}
238
239pub fn serve_admin_endpoints(
244 addr: SocketAddr,
245 catalog: Arc<CatalogSnapshot>,
246 registry: Arc<Registry>,
247 healthy: Arc<AtomicBool>,
248 stop: Arc<AtomicBool>,
249) -> std::io::Result<(JoinHandle<()>, SocketAddr)> {
250 let listener = TcpListener::bind(addr)?;
251 let bound = listener.local_addr()?;
252 listener.set_nonblocking(true)?;
253
254 let h = thread::Builder::new()
255 .name("zerodds-ws-admin".into())
256 .spawn(move || {
257 loop {
258 if stop.load(Ordering::SeqCst) {
259 break;
260 }
261 match listener.accept() {
262 Ok((mut s, _peer)) => {
263 let _ = s.set_nonblocking(false);
264 let _ = s.set_read_timeout(Some(Duration::from_secs(2)));
265 let _ = s.set_write_timeout(Some(Duration::from_secs(2)));
266 admin_handle(&mut s, &catalog, ®istry, &healthy);
267 }
268 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
269 thread::sleep(Duration::from_millis(50));
270 }
271 Err(_) => {
272 thread::sleep(Duration::from_millis(100));
273 }
274 }
275 }
276 })?;
277 Ok((h, bound))
278}
279
280fn admin_handle(
281 stream: &mut std::net::TcpStream,
282 catalog: &CatalogSnapshot,
283 registry: &Registry,
284 healthy: &AtomicBool,
285) {
286 use std::io::{Read, Write};
287 let mut buf = [0u8; 1024];
288 let n = match stream.read(&mut buf) {
289 Ok(n) => n,
290 Err(_) => return,
291 };
292 let req = String::from_utf8_lossy(&buf[..n]);
293 let first_line = req.lines().next().unwrap_or("");
294
295 let path = first_line
297 .split_whitespace()
298 .nth(1)
299 .unwrap_or("/")
300 .split('?')
301 .next()
302 .unwrap_or("/");
303
304 let (status, ctype, body) = match path {
305 "/catalog" => ("200 OK", "application/json", catalog.render_json()),
306 "/healthz" => {
307 if healthy.load(Ordering::SeqCst) {
308 ("200 OK", "text/plain; charset=utf-8", "OK\n".to_string())
309 } else {
310 (
311 "503 Service Unavailable",
312 "text/plain; charset=utf-8",
313 "DOWN\n".to_string(),
314 )
315 }
316 }
317 "/metrics" => (
318 "200 OK",
319 "text/plain; version=0.0.4; charset=utf-8",
320 registry.render_prometheus(),
321 ),
322 "/" => (
323 "200 OK",
324 "text/plain; charset=utf-8",
325 format!("{}\nendpoints: /catalog /healthz /metrics\n", SERVICE_NAME),
326 ),
327 _ => ("404 Not Found", "text/plain; charset=utf-8", String::new()),
328 };
329
330 let resp = format!(
331 "HTTP/1.1 {status}\r\n\
332 Content-Type: {ctype}\r\n\
333 Content-Length: {}\r\n\
334 Connection: close\r\n\r\n{body}",
335 body.len()
336 );
337 let _ = stream.write_all(resp.as_bytes());
338}
339
340#[must_use]
348pub fn otlp_config_from_endpoint(service_name: &str, raw: &str) -> OtlpConfig {
349 let trimmed = raw
350 .strip_prefix("http://")
351 .or_else(|| raw.strip_prefix("https://"))
352 .unwrap_or(raw)
353 .trim_end_matches('/');
354 let (host, port) = match trimmed.rsplit_once(':') {
355 Some((h, p)) => (h.to_string(), p.parse().unwrap_or(4318)),
356 None => (trimmed.to_string(), 4318),
357 };
358 OtlpConfig {
359 host,
360 port,
361 service_name: service_name.into(),
362 service_version: SERVICE_VERSION.into(),
363 timeout: Duration::from_secs(5),
364 }
365}
366
367#[must_use]
370pub fn otlp_config_from_env(service_name: &str) -> Option<OtlpConfig> {
371 let raw = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok()?;
372 Some(otlp_config_from_endpoint(service_name, &raw))
373}
374
375pub fn spawn_otlp_flush_loop(
377 exporter: Arc<OtlpExporter>,
378 stop: Arc<AtomicBool>,
379 interval: Duration,
380) -> std::io::Result<JoinHandle<()>> {
381 thread::Builder::new()
382 .name("zerodds-ws-otlp".into())
383 .spawn(move || {
384 while !stop.load(Ordering::SeqCst) {
385 thread::sleep(interval);
386 if stop.load(Ordering::SeqCst) {
387 break;
388 }
389 let _ = exporter.flush();
392 }
393 let _ = exporter.flush();
395 })
396}
397
398#[cfg(test)]
403#[allow(clippy::expect_used, clippy::unwrap_used)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn metrics_register_is_idempotent() {
409 let r = Registry::new();
410 let m1 = BridgeMetrics::register(&r);
411 let m2 = BridgeMetrics::register(&r);
412 m1.frames_in_total.inc();
413 assert_eq!(m2.frames_in_total.get(), 1);
415 }
416
417 #[test]
418 fn metrics_counter_visible_in_prometheus_render() {
419 let r = Registry::new();
420 let m = BridgeMetrics::register(&r);
421 m.frames_in_total.add(7);
422 m.connections_active.set(2);
423 let text = r.render_prometheus();
424 assert!(
425 text.contains("zerodds_ws_frames_in_total 7"),
426 "expected counter in render, got:\n{text}"
427 );
428 assert!(
429 text.contains("zerodds_ws_connections_active 2"),
430 "expected gauge in render, got:\n{text}"
431 );
432 }
433
434 #[test]
435 fn catalog_render_json_contains_topic_fields() {
436 let mut cfg = DaemonConfig::default_for_dev();
437 cfg.topics.push(TopicConfig {
438 name: "Chat::Message".into(),
439 type_name: "Chat::Message".into(),
440 direction: "bidir".into(),
441 ws_path: "/topics/chat/message".into(),
442 reliability: "reliable".into(),
443 durability: "volatile".into(),
444 history_depth: 10,
445 });
446 let snap = CatalogSnapshot::from_config(&cfg);
447 let j = snap.render_json();
448 assert!(j.contains("\"service\":\"zerodds-ws-bridged\""));
449 assert!(j.contains("\"name\":\"Chat::Message\""));
450 assert!(j.contains("\"ws_path\":\"/topics/chat/message\""));
451 assert!(j.contains("\"reliability\":\"reliable\""));
452 }
453
454 #[test]
455 fn admin_endpoint_serves_catalog_and_healthz_and_metrics() {
456 use std::io::{Read, Write};
457 use std::net::TcpStream;
458 let mut cfg = DaemonConfig::default_for_dev();
459 cfg.topics.push(TopicConfig {
460 name: "T".into(),
461 type_name: "T".into(),
462 direction: "out".into(),
463 ws_path: "/topics/t".into(),
464 reliability: "reliable".into(),
465 durability: "volatile".into(),
466 history_depth: 5,
467 });
468 let snap = Arc::new(CatalogSnapshot::from_config(&cfg));
469 let reg = Arc::new(Registry::new());
470 let metrics = BridgeMetrics::register(®);
471 metrics.frames_in_total.add(42);
472
473 let healthy = Arc::new(AtomicBool::new(true));
474 let stop = Arc::new(AtomicBool::new(false));
475
476 let (_h, bound) = serve_admin_endpoints(
477 "127.0.0.1:0".parse().unwrap(),
478 Arc::clone(&snap),
479 Arc::clone(®),
480 Arc::clone(&healthy),
481 Arc::clone(&stop),
482 )
483 .expect("spawn admin");
484
485 let mut s =
487 TcpStream::connect_timeout(&bound, Duration::from_secs(2)).expect("connect catalog");
488 s.set_read_timeout(Some(Duration::from_secs(2))).ok();
489 s.write_all(b"GET /catalog HTTP/1.1\r\nHost: x\r\n\r\n")
490 .unwrap();
491 let mut body = String::new();
492 s.read_to_string(&mut body).ok();
493 assert!(body.contains("HTTP/1.1 200"));
494 assert!(body.contains("\"name\":\"T\""), "got: {body}");
495
496 let mut s =
498 TcpStream::connect_timeout(&bound, Duration::from_secs(2)).expect("connect health");
499 s.set_read_timeout(Some(Duration::from_secs(2))).ok();
500 s.write_all(b"GET /healthz HTTP/1.1\r\nHost: x\r\n\r\n")
501 .unwrap();
502 let mut body = String::new();
503 s.read_to_string(&mut body).ok();
504 assert!(body.contains("HTTP/1.1 200"));
505 assert!(body.contains("OK"));
506
507 let mut s =
509 TcpStream::connect_timeout(&bound, Duration::from_secs(2)).expect("connect metrics");
510 s.set_read_timeout(Some(Duration::from_secs(2))).ok();
511 s.write_all(b"GET /metrics HTTP/1.1\r\nHost: x\r\n\r\n")
512 .unwrap();
513 let mut body = String::new();
514 s.read_to_string(&mut body).ok();
515 assert!(
516 body.contains("zerodds_ws_frames_in_total 42"),
517 "got: {body}"
518 );
519
520 healthy.store(false, Ordering::SeqCst);
522 let mut s =
523 TcpStream::connect_timeout(&bound, Duration::from_secs(2)).expect("connect health2");
524 s.set_read_timeout(Some(Duration::from_secs(2))).ok();
525 s.write_all(b"GET /healthz HTTP/1.1\r\nHost: x\r\n\r\n")
526 .unwrap();
527 let mut body = String::new();
528 s.read_to_string(&mut body).ok();
529 assert!(body.contains("HTTP/1.1 503"));
530
531 stop.store(true, Ordering::SeqCst);
532 }
533
534 #[test]
535 fn otlp_config_from_endpoint_parses_http_url() {
536 let c = otlp_config_from_endpoint("svc-1", "http://collector.local:4318");
537 assert_eq!(c.host, "collector.local");
538 assert_eq!(c.port, 4318);
539 assert_eq!(c.service_name, "svc-1");
540 }
541
542 #[test]
543 fn otlp_config_from_endpoint_parses_bare_host_port() {
544 let c = otlp_config_from_endpoint("svc", "host:9999");
545 assert_eq!(c.host, "host");
546 assert_eq!(c.port, 9999);
547 }
548
549 #[test]
550 fn otlp_config_from_endpoint_handles_https_and_trailing_slash() {
551 let c = otlp_config_from_endpoint("svc", "https://otel.svc.local:4318/");
552 assert_eq!(c.host, "otel.svc.local");
553 assert_eq!(c.port, 4318);
554 }
555
556 #[test]
557 fn otlp_config_from_endpoint_falls_back_to_default_port() {
558 let c = otlp_config_from_endpoint("svc", "host-only");
559 assert_eq!(c.host, "host-only");
560 assert_eq!(c.port, 4318);
561 }
562}
563
564#[cfg(not(unix))]
565pub fn install_signal_watcher(
566 _shutdown_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
567 _reload_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
568) -> std::io::Result<std::thread::JoinHandle<()>> {
569 Ok(std::thread::spawn(|| {}))
572}