Skip to main content

zerodds_websocket_bridge/daemon/
server.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! WebSocket-Server + DDS-Pump fuer den `zerodds-ws-bridged`-Daemon.
5//!
6//! Spec: `zerodds-ws-bridge-1.0.md` §4 + §9.
7//!
8//! `eprintln!`-Logging im Daemon-Pfad: Spec §8.1 ueberlaesst dem
9//! Daemon strukturiertes Logging. Bis ein workspace-tracing-Stack
10//! gewired ist, ist `eprintln` die Senke; lokal als clippy-allow
11//! auf den betroffenen Funktionen markiert.
12//!
13//! Sync, blockierendes I/O auf `std::net`. Pro Connection ein
14//! Reader-Thread (TCP→WS-Frames→Router) plus ein Writer-Thread
15//! (Router-Channel→WS-Frames→TCP). Der DDS-Pump-Thread konsumiert
16//! `mpsc::Receiver<UserSample>` aus jedem registrierten Reader und
17//! dispatcht ueber den `Router`.
18
19use std::io::{Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
22use std::sync::{Arc, Mutex};
23use std::thread::{self, JoinHandle};
24use std::time::Duration;
25
26use crate::codec::{decode, encode};
27use crate::frame::{Frame, Opcode};
28use crate::handshake::{build_server_response, parse_client_request, render_server_response};
29
30use super::config::{DaemonConfig, TopicConfig};
31use super::router::{Router, RouterMsg};
32#[cfg(feature = "daemon")]
33use super::runtime_common::{
34    BridgeMetrics, CatalogSnapshot, SERVICE_NAME, install_signal_watcher, otlp_config_from_env,
35    serve_admin_endpoints, spawn_otlp_flush_loop,
36};
37#[cfg(feature = "daemon")]
38use super::security::{
39    AclOp, AuthSubject, SecurityCtx, authenticate_ws, authorize, ctx_from_daemon_config,
40    extract_authorization_header, serve_tls_handshake,
41};
42#[cfg(feature = "daemon")]
43use rustls::{ServerConnection, StreamOwned};
44#[cfg(feature = "daemon")]
45use zerodds_monitor::Registry;
46#[cfg(feature = "daemon")]
47use zerodds_observability_otlp::OtlpExporter;
48
49#[cfg(feature = "daemon")]
50use zerodds_dcps::runtime::{
51    DcpsRuntime, RuntimeConfig, UserReaderConfig, UserSample, UserWriterConfig,
52};
53#[cfg(feature = "daemon")]
54use zerodds_rtps::wire_types::{EntityId, GuidPrefix};
55
56/// Top-Level-Fehler des Daemons.
57#[derive(Debug)]
58pub enum ServerError {
59    /// Bind-Fehler (Exit-Code 2).
60    Bind(String),
61    /// DCPS-Init-Fehler (Exit-Code 3).
62    Dds(String),
63    /// IO-Fehler waehrend Operation.
64    Io(String),
65}
66
67impl core::fmt::Display for ServerError {
68    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
69        match self {
70            Self::Bind(m) => write!(f, "bind error: {m}"),
71            Self::Dds(m) => write!(f, "dds error: {m}"),
72            Self::Io(m) => write!(f, "io error: {m}"),
73        }
74    }
75}
76
77impl std::error::Error for ServerError {}
78
79/// Daemon-Handle. Beim Drop wird shutdown aufgerufen.
80pub struct DaemonHandle {
81    stop: Arc<AtomicBool>,
82    accept_thread: Option<JoinHandle<()>>,
83    pump_threads: Vec<JoinHandle<()>>,
84    #[cfg(feature = "daemon")]
85    admin_thread: Option<JoinHandle<()>>,
86    #[cfg(feature = "daemon")]
87    otlp_thread: Option<JoinHandle<()>>,
88    router: Arc<Mutex<Router>>,
89    /// Bound-Address (kann von Config-listen abweichen wenn Port=0).
90    pub local_addr: String,
91    /// Bound Admin-Address (Prometheus + Catalog + Healthz).
92    #[cfg(feature = "daemon")]
93    pub admin_addr: Option<String>,
94    /// Lifecycle: SIGHUP setzt das hier; Server-Loop kann reagieren.
95    #[cfg(feature = "daemon")]
96    pub reload_flag: Arc<AtomicBool>,
97    /// Healthz-Flag — DCPS-Runtime up == true.
98    #[cfg(feature = "daemon")]
99    pub healthy: Arc<AtomicBool>,
100    /// Metric-Set fuer §8.2-Wireup. Reader-side fuer Tests.
101    #[cfg(feature = "daemon")]
102    pub metrics: Option<BridgeMetrics>,
103}
104
105impl DaemonHandle {
106    /// Initiiert graceful Shutdown.
107    pub fn shutdown(&mut self) {
108        self.stop.store(true, Ordering::SeqCst);
109        #[cfg(feature = "daemon")]
110        {
111            self.healthy.store(false, Ordering::SeqCst);
112        }
113        // Unblock accept().
114        if let Ok(addr) = self.local_addr.parse::<std::net::SocketAddr>() {
115            // Self-connect to wake accept().
116            let _ = TcpStream::connect_timeout(&addr, Duration::from_millis(200));
117        }
118        // Self-connect Admin-Server (so der accept-loop blockt).
119        #[cfg(feature = "daemon")]
120        if let Some(admin) = self.admin_addr.as_deref() {
121            if let Ok(addr) = admin.parse::<std::net::SocketAddr>() {
122                let _ = TcpStream::connect_timeout(&addr, Duration::from_millis(200));
123            }
124        }
125        if let Ok(r) = self.router.lock() {
126            r.broadcast_shutdown();
127        }
128        if let Some(j) = self.accept_thread.take() {
129            let _ = j.join();
130        }
131        for j in self.pump_threads.drain(..) {
132            let _ = j.join();
133        }
134        #[cfg(feature = "daemon")]
135        {
136            if let Some(j) = self.admin_thread.take() {
137                let _ = j.join();
138            }
139            if let Some(j) = self.otlp_thread.take() {
140                let _ = j.join();
141            }
142        }
143    }
144}
145
146impl Drop for DaemonHandle {
147    fn drop(&mut self) {
148        self.shutdown();
149    }
150}
151/// zerodds-lint: recursion-depth 64 (start bounded by AST depth)
152/// Startet den Daemon mit der gegebenen Config. Blockiert NICHT —
153/// gibt einen `DaemonHandle` zurueck ueber den der Caller (entweder
154/// das Binary oder ein E2E-Test) das Lifecycle steuert.
155///
156/// # Errors
157/// `Bind` wenn der TCP-Listener nicht binden kann (Spec Exit-Code 2).
158/// `Dds` wenn der `DcpsRuntime` nicht starten kann (Spec Exit-Code 3).
159#[cfg(feature = "daemon")]
160#[allow(clippy::too_many_lines)]
161pub fn start(cfg: DaemonConfig) -> Result<DaemonHandle, ServerError> {
162    eprintln!(
163        "[zerodds-ws-bridged] starting on {} domain={} topics={}",
164        cfg.listen,
165        cfg.domain,
166        cfg.topics.len()
167    );
168
169    // 0. Metrics-Registry + Standard-Counter-Set (§8.2).
170    let registry = Arc::new(Registry::new());
171    let metrics = BridgeMetrics::register(&registry);
172
173    // 0b. Bridge-Security: Security-Ctx + ggf. RotatingTlsConfig für
174    //     SIGHUP-Hot-Reload (§7.1 TLS / §7.2 Auth / §7.3 ACL).
175    let (security_ctx, rotating_tls) = ctx_from_daemon_config(&cfg)
176        .map_err(|e| ServerError::Bind(alloc_format(format_args!("security: {e}"))))?;
177    let security_ctx = Arc::new(security_ctx);
178    let rotating_tls = rotating_tls.map(Arc::new);
179    if rotating_tls.is_some() {
180        eprintln!(
181            "[zerodds-ws-bridged] TLS active (cert={}, mtls={})",
182            cfg.tls_cert_file,
183            !cfg.tls_client_ca_file.is_empty(),
184        );
185    }
186    eprintln!(
187        "[zerodds-ws-bridged] auth-mode={} acl-entries={}",
188        cfg.auth_mode,
189        cfg.topic_acl.len()
190    );
191
192    // 1. DCPS-Runtime hochfahren.
193    let prefix = stable_prefix_for(&cfg.listen);
194    let runtime = DcpsRuntime::start(cfg.domain, prefix, RuntimeConfig::default())
195        .map_err(|e| ServerError::Dds(alloc_format(format_args!("{e:?}"))))?;
196    let healthy = Arc::new(AtomicBool::new(true));
197
198    // 2. Pro Topic Reader+Writer registrieren.
199    let mut writers: std::collections::BTreeMap<String, EntityId> =
200        std::collections::BTreeMap::new();
201    let mut readers: Vec<(String, std::sync::mpsc::Receiver<UserSample>)> = Vec::new();
202    for topic in &cfg.topics {
203        let (reader_eid, writer_eid) = register_topic_endpoints(&runtime, topic)?;
204        if let Some((eid, rx)) = reader_eid {
205            let _ = eid;
206            readers.push((topic.name.clone(), rx));
207        }
208        if let Some(eid) = writer_eid {
209            writers.insert(topic.name.clone(), eid);
210        }
211    }
212
213    // 3. Router + TCP-Listener.
214    let router = Arc::new(Mutex::new(Router::new()));
215    let listener = TcpListener::bind(&cfg.listen)
216        .map_err(|e| ServerError::Bind(alloc_format(format_args!("{e}"))))?;
217    let local_addr = listener
218        .local_addr()
219        .map(|a| a.to_string())
220        .unwrap_or_else(|_| cfg.listen.clone());
221    listener
222        .set_nonblocking(false)
223        .map_err(|e| ServerError::Io(alloc_format(format_args!("{e}"))))?;
224
225    eprintln!("[zerodds-ws-bridged] bound on {local_addr}");
226
227    let stop = Arc::new(AtomicBool::new(false));
228    let reload_flag = Arc::new(AtomicBool::new(false));
229
230    // 4. Pump-Threads pro Reader.
231    let mut pump_threads = Vec::new();
232    for (topic_name, rx) in readers {
233        let router_c = Arc::clone(&router);
234        let stop_c = Arc::clone(&stop);
235        let topic_name_c = topic_name.clone();
236        let dds_out = Arc::clone(&metrics.dds_samples_out_total);
237        let h = thread::spawn(move || {
238            while !stop_c.load(Ordering::SeqCst) {
239                match rx.recv_timeout(Duration::from_millis(200)) {
240                    Ok(UserSample::Alive { payload, .. }) => {
241                        dds_out.inc();
242                        if let Ok(mut r) = router_c.lock() {
243                            r.dispatch(&topic_name_c, payload);
244                        }
245                    }
246                    Ok(UserSample::Lifecycle { .. }) => {
247                        // Lifecycle-Events: wir koennten dispose-Frames
248                        // pushen — fuer L1-L4-Wireup reicht der Alive-
249                        // Pfad.
250                    }
251                    Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
252                    Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
253                }
254            }
255        });
256        pump_threads.push(h);
257    }
258
259    // 5. Accept-Thread.
260    let next_conn_id = Arc::new(AtomicU64::new(1));
261    let stop_acc = Arc::clone(&stop);
262    let router_acc = Arc::clone(&router);
263    let writers_arc = Arc::new(writers);
264    let runtime_acc = Arc::clone(&runtime);
265    let topics_arc = Arc::new(cfg.topics.clone());
266    let metrics_acc = metrics.clone();
267    let security_acc = Arc::clone(&security_ctx);
268    let rotating_acc = rotating_tls.clone();
269
270    let accept_thread = thread::spawn(move || {
271        for incoming in listener.incoming() {
272            if stop_acc.load(Ordering::SeqCst) {
273                break;
274            }
275            match incoming {
276                Ok(tcp) => {
277                    let conn_id = next_conn_id.fetch_add(1, Ordering::SeqCst);
278                    let router_h = Arc::clone(&router_acc);
279                    let writers_h = Arc::clone(&writers_arc);
280                    let runtime_h = Arc::clone(&runtime_acc);
281                    let stop_h = Arc::clone(&stop_acc);
282                    let topics_h = Arc::clone(&topics_arc);
283                    let metrics_h = metrics_acc.clone();
284                    let security_h = Arc::clone(&security_acc);
285                    let rot_h = rotating_acc.clone();
286                    thread::spawn(move || {
287                        // Wenn TLS konfiguriert ist: rustls-Handshake. Sonst Plain.
288                        let (stream, mtls_subj) = if let Some(rot) = rot_h.as_ref() {
289                            let cfg = rot.current();
290                            match serve_tls_handshake(cfg, tcp, Duration::from_secs(5)) {
291                                Ok((tcp, conn, subj)) => {
292                                    (WsStream::Tls(Box::new(StreamOwned::new(conn, tcp))), subj)
293                                }
294                                Err(e) => {
295                                    metrics_h.errors_total.inc();
296                                    eprintln!(
297                                        "[zerodds-ws-bridged] tls handshake err conn={conn_id}: {e}"
298                                    );
299                                    return;
300                                }
301                            }
302                        } else {
303                            (WsStream::Plain(tcp), None)
304                        };
305                        let _ = serve_connection(
306                            conn_id, stream, mtls_subj, router_h, writers_h, runtime_h, stop_h,
307                            topics_h, metrics_h, security_h,
308                        );
309                    });
310                }
311                Err(e) => {
312                    eprintln!("[zerodds-ws-bridged] accept error: {e}");
313                    continue;
314                }
315            }
316        }
317    });
318
319    // 6. Admin-Endpoint (§8.2 Prometheus + §5.2 Catalog/Healthz).
320    let mut admin_thread: Option<JoinHandle<()>> = None;
321    let mut admin_addr: Option<String> = None;
322    if cfg.metrics_enabled || !cfg.metrics_addr.is_empty() {
323        let bind_str = if cfg.metrics_addr.is_empty() {
324            "127.0.0.1:9090".to_string()
325        } else {
326            cfg.metrics_addr.clone()
327        };
328        match bind_str.parse::<std::net::SocketAddr>() {
329            Ok(sock) => {
330                let snap = Arc::new(CatalogSnapshot::from_config(&cfg));
331                match serve_admin_endpoints(
332                    sock,
333                    snap,
334                    Arc::clone(&registry),
335                    Arc::clone(&healthy),
336                    Arc::clone(&stop),
337                ) {
338                    Ok((h, bound)) => {
339                        eprintln!(
340                            "[{SERVICE_NAME}] admin endpoint on {bound} (/metrics /catalog /healthz)"
341                        );
342                        admin_addr = Some(bound.to_string());
343                        admin_thread = Some(h);
344                    }
345                    Err(e) => {
346                        eprintln!("[{SERVICE_NAME}] admin bind error: {e}");
347                    }
348                }
349            }
350            Err(e) => {
351                eprintln!("[{SERVICE_NAME}] admin addr parse error: {e}");
352            }
353        }
354    }
355
356    // 7. Signal-Watcher (§9.2 Graceful Shutdown).
357    if let Err(e) = install_signal_watcher(Arc::clone(&stop), Arc::clone(&reload_flag)) {
358        eprintln!("[{SERVICE_NAME}] signal watcher init failed: {e}");
359    }
360
361    // 7b. Bridge-Security: SIGHUP-Hook für TLS-Cert-Hot-Reload. Polled das
362    //     reload_flag und ruft RotatingTlsConfig::reload() auf.
363    if let Some(rot) = rotating_tls.clone() {
364        let stop_r = Arc::clone(&stop);
365        let reload_r = Arc::clone(&reload_flag);
366        thread::Builder::new()
367            .name("zerodds-ws-tls-reload".into())
368            .spawn(move || {
369                while !stop_r.load(Ordering::SeqCst) {
370                    thread::sleep(Duration::from_millis(250));
371                    if reload_r.swap(false, Ordering::SeqCst) {
372                        match rot.reload() {
373                            Ok(()) => eprintln!(
374                                "[{SERVICE_NAME}] SIGHUP TLS-cert reloaded"
375                            ),
376                            Err(e) => eprintln!(
377                                "[{SERVICE_NAME}] SIGHUP TLS-cert reload FAILED: {e} (keeping old cert)"
378                            ),
379                        }
380                    }
381                }
382            })
383            .ok();
384    }
385
386    // 8. OTLP-Span-Exporter (§8.3) wenn ENV gesetzt.
387    let otlp_thread = if let Some(otlp_cfg) = otlp_config_from_env(SERVICE_NAME) {
388        let exporter = Arc::new(OtlpExporter::new(otlp_cfg));
389        match spawn_otlp_flush_loop(exporter, Arc::clone(&stop), Duration::from_secs(5)) {
390            Ok(h) => {
391                eprintln!("[{SERVICE_NAME}] OTLP exporter active");
392                Some(h)
393            }
394            Err(e) => {
395                eprintln!("[{SERVICE_NAME}] OTLP spawn failed: {e}");
396                None
397            }
398        }
399    } else {
400        None
401    };
402
403    Ok(DaemonHandle {
404        stop,
405        accept_thread: Some(accept_thread),
406        pump_threads,
407        admin_thread,
408        otlp_thread,
409        router,
410        local_addr,
411        admin_addr,
412        reload_flag,
413        healthy,
414        metrics: Some(metrics),
415    })
416}
417
418#[cfg(feature = "daemon")]
419type ReaderEndpoint = (EntityId, std::sync::mpsc::Receiver<UserSample>);
420#[cfg(feature = "daemon")]
421type TopicEndpoints = (Option<ReaderEndpoint>, Option<EntityId>);
422
423#[cfg(feature = "daemon")]
424fn register_topic_endpoints(
425    rt: &Arc<DcpsRuntime>,
426    topic: &TopicConfig,
427) -> Result<TopicEndpoints, ServerError> {
428    use zerodds_qos::{
429        DeadlineQosPolicy, DurabilityKind, LifespanQosPolicy, LivelinessQosPolicy, OwnershipKind,
430    };
431    let durability = match topic.durability.as_str() {
432        "transient_local" => DurabilityKind::TransientLocal,
433        "transient" => DurabilityKind::Transient,
434        "persistent" => DurabilityKind::Persistent,
435        _ => DurabilityKind::Volatile,
436    };
437    let reliable = !matches!(topic.reliability.as_str(), "best_effort");
438    let want_reader =
439        matches!(topic.direction.as_str(), "in" | "bidir") || topic.direction.is_empty();
440    let want_writer =
441        matches!(topic.direction.as_str(), "out" | "bidir") || topic.direction.is_empty();
442
443    let reader = if want_reader {
444        let (eid, rx) = rt
445            .register_user_reader(UserReaderConfig {
446                topic_name: topic.name.clone(),
447                type_name: if topic.type_name.is_empty() {
448                    topic.name.clone()
449                } else {
450                    topic.type_name.clone()
451                },
452                reliable,
453                durability,
454                deadline: DeadlineQosPolicy::default(),
455                liveliness: LivelinessQosPolicy::default(),
456                ownership: OwnershipKind::Shared,
457                partition: Vec::new(),
458                user_data: Vec::new(),
459                topic_data: Vec::new(),
460                group_data: Vec::new(),
461                type_identifier: zerodds_types::TypeIdentifier::None,
462                type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
463                data_representation_offer: None,
464            })
465            .map_err(|e| ServerError::Dds(alloc_format(format_args!("reader: {e:?}"))))?;
466        Some((eid, rx))
467    } else {
468        None
469    };
470
471    let writer = if want_writer {
472        let eid = rt
473            .register_user_writer(UserWriterConfig {
474                topic_name: topic.name.clone(),
475                type_name: if topic.type_name.is_empty() {
476                    topic.name.clone()
477                } else {
478                    topic.type_name.clone()
479                },
480                reliable,
481                durability,
482                deadline: DeadlineQosPolicy::default(),
483                lifespan: LifespanQosPolicy::default(),
484                liveliness: LivelinessQosPolicy::default(),
485                ownership: OwnershipKind::Shared,
486                ownership_strength: 0,
487                partition: Vec::new(),
488                user_data: Vec::new(),
489                topic_data: Vec::new(),
490                group_data: Vec::new(),
491                type_identifier: zerodds_types::TypeIdentifier::None,
492                data_representation_offer: None,
493            })
494            .map_err(|e| ServerError::Dds(alloc_format(format_args!("writer: {e:?}"))))?;
495        Some(eid)
496    } else {
497        None
498    };
499
500    Ok((reader, writer))
501}
502
503#[cfg(feature = "daemon")]
504#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
505fn serve_connection(
506    conn_id: u64,
507    mut stream: WsStream,
508    mtls_subject: Option<AuthSubject>,
509    router: Arc<Mutex<Router>>,
510    writers: Arc<std::collections::BTreeMap<String, EntityId>>,
511    runtime: Arc<DcpsRuntime>,
512    stop: Arc<AtomicBool>,
513    topics: Arc<Vec<TopicConfig>>,
514    metrics: BridgeMetrics,
515    security: Arc<SecurityCtx>,
516) -> Result<(), ServerError> {
517    metrics.connections_total.inc();
518    metrics.connections_active.inc();
519    let conn_guard = ConnectionLifetime {
520        active: Arc::clone(&metrics.connections_active),
521    };
522    stream
523        .set_read_timeout(Some(Duration::from_millis(500)))
524        .ok();
525
526    // 1. HTTP-Upgrade-Handshake einlesen.
527    let mut buf = [0u8; 4096];
528    let mut accumulated = Vec::new();
529    let req_str = loop {
530        match stream.read(&mut buf) {
531            Ok(0) => return Err(ServerError::Io("eof during handshake".to_string())),
532            Ok(n) => {
533                accumulated.extend_from_slice(&buf[..n]);
534                if accumulated.windows(4).any(|w| w == b"\r\n\r\n") {
535                    let s = String::from_utf8_lossy(&accumulated).to_string();
536                    break s;
537                }
538                if accumulated.len() > 64 * 1024 {
539                    return Err(ServerError::Io("handshake too large".to_string()));
540                }
541            }
542            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
543            Err(e) => return Err(ServerError::Io(e.to_string())),
544        }
545    };
546
547    let req = match parse_client_request(&req_str) {
548        Ok(r) => r,
549        Err(e) => {
550            let _ = stream.write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n");
551            return Err(ServerError::Io(alloc_format(format_args!(
552                "handshake parse: {e:?}"
553            ))));
554        }
555    };
556
557    // §7.2 — Authentication. Bei Reject: HTTP 401 + close.
558    let auth_header = extract_authorization_header(&req_str);
559    let auth_headers: Vec<(String, String)> = if let Some(v) = auth_header {
560        vec![("authorization".to_string(), v)]
561    } else {
562        Vec::new()
563    };
564    let subject = match authenticate_ws(&security.auth, &auth_headers, mtls_subject.clone()) {
565        Ok(s) => s,
566        Err(e) => {
567            metrics.errors_total.inc();
568            let body = b"unauthorized";
569            let resp = format!(
570                "HTTP/1.1 401 Unauthorized\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nWWW-Authenticate: Bearer realm=\"zerodds-ws\"\r\nConnection: close\r\n\r\nunauthorized",
571                body.len()
572            );
573            let _ = stream.write_all(resp.as_bytes());
574            eprintln!("[zerodds-ws-bridged] auth reject conn={conn_id} reason={e}");
575            return Err(ServerError::Io(alloc_format(format_args!(
576                "auth reject: {e}"
577            ))));
578        }
579    };
580
581    // Auto-Subscribe wenn Pfad einem Topic entspricht (Spec §4.2).
582    let mut auto_topic: Option<String> = None;
583    for t in topics.iter() {
584        if t.ws_path == req.path || super::config::default_ws_path(&t.name) == req.path {
585            auto_topic = Some(t.name.clone());
586            break;
587        }
588    }
589
590    // §7.3 — Auto-Subscribe-Topic ACL-Check (Read).
591    if let Some(topic) = &auto_topic {
592        if !authorize(&security.acl, &subject, AclOp::Read, topic) {
593            metrics.errors_total.inc();
594            let body = format!("forbidden: read on {topic}");
595            let resp = format!(
596                "HTTP/1.1 403 Forbidden\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n{body}",
597                body.len()
598            );
599            let _ = stream.write_all(resp.as_bytes());
600            eprintln!(
601                "[zerodds-ws-bridged] acl reject conn={conn_id} subject={} topic={topic}",
602                subject.name
603            );
604            return Err(ServerError::Io(alloc_format(format_args!(
605                "acl reject: {topic}"
606            ))));
607        }
608    }
609
610    let resp = build_server_response(&req);
611    let resp_bytes = render_server_response(&resp);
612    stream
613        .write_all(resp_bytes.as_bytes())
614        .map_err(|e| ServerError::Io(e.to_string()))?;
615
616    // 2. Connection an Router registrieren.
617    let (tx, rx) = std::sync::mpsc::channel::<RouterMsg>();
618    if let Ok(mut r) = router.lock() {
619        r.register_connection(conn_id, tx);
620        if let Some(topic) = &auto_topic {
621            r.subscribe(conn_id, topic.clone());
622        }
623    }
624
625    // 3. Writer-Thread (Router-Channel → WS-Frames). Der Stream ist
626    //    Arc<Mutex<>>-shared zwischen Reader-Loop und Writer-Thread,
627    //    weil TLS-Streams sich nicht via `try_clone` duplizieren lassen
628    //    (rustls-Session-State). Plain-TCP würde via try_clone gehen,
629    //    aber wir nehmen einheitlich den Mutex-Pfad.
630    let stream = Arc::new(Mutex::new(stream));
631    let stop_w = Arc::clone(&stop);
632    let frames_out = Arc::clone(&metrics.frames_out_total);
633    let bytes_out = Arc::clone(&metrics.bytes_out_total);
634    let errors_out = Arc::clone(&metrics.errors_total);
635    let stream_w = Arc::clone(&stream);
636    // Per-Connection ACL-State: pro Topic wird der Read-Check gemacht
637    // bevor wir zu router.dispatch reichen. Hier vor dem Send: Subject
638    // + Acl haben wir per Closure-Move.
639    let security_w = Arc::clone(&security);
640    let subject_w = subject.clone();
641    let writer_thread = thread::spawn(move || {
642        while !stop_w.load(Ordering::SeqCst) {
643            match rx.recv_timeout(Duration::from_millis(200)) {
644                Ok(RouterMsg::Sample { topic, payload }) => {
645                    // §7.3 — Read-ACL (post-subscribe gate): wenn die
646                    // ACL für diesen Subject + Topic auf Deny steht,
647                    // droppe das Sample (kein Disclose).
648                    if !authorize(&security_w.acl, &subject_w, AclOp::Read, &topic) {
649                        continue;
650                    }
651                    let json = render_notify_json(&topic, &payload);
652                    let frame = Frame::text(json);
653                    if let Ok(bytes) = encode(&frame) {
654                        bytes_out.add(bytes.len() as u64);
655                        let mut guard = match stream_w.lock() {
656                            Ok(g) => g,
657                            Err(p) => p.into_inner(),
658                        };
659                        if guard.write_all(&bytes).is_err() {
660                            errors_out.inc();
661                            break;
662                        }
663                        frames_out.inc();
664                    } else {
665                        errors_out.inc();
666                    }
667                }
668                Ok(RouterMsg::Shutdown) => {
669                    let close = Frame::close(1001, "going away");
670                    if let Ok(b) = encode(&close) {
671                        let mut guard = match stream_w.lock() {
672                            Ok(g) => g,
673                            Err(p) => p.into_inner(),
674                        };
675                        let _ = guard.write_all(&b);
676                    }
677                    break;
678                }
679                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
680                Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
681            }
682        }
683    });
684
685    // 4. Reader-Loop (TCP/TLS → WS-Frames → Router/DDS-Writer).
686    let mut frame_buf: Vec<u8> = Vec::new();
687    'reader: loop {
688        if stop.load(Ordering::SeqCst) {
689            break;
690        }
691        let read_result = {
692            let mut guard = match stream.lock() {
693                Ok(g) => g,
694                Err(p) => p.into_inner(),
695            };
696            guard.read(&mut buf)
697        };
698        match read_result {
699            Ok(0) => break,
700            Ok(n) => {
701                frame_buf.extend_from_slice(&buf[..n]);
702                while let Ok((frame, used)) = decode(&frame_buf) {
703                    frame_buf.drain(..used);
704                    match frame.opcode {
705                        Opcode::Text | Opcode::Binary => {
706                            let payload = frame.payload;
707                            metrics.frames_in_total.inc();
708                            metrics.bytes_in_total.add(payload.len() as u64);
709                            let result = handle_inbound_frame(
710                                &payload,
711                                conn_id,
712                                &router,
713                                &writers,
714                                &runtime,
715                                auto_topic.as_deref(),
716                                &metrics,
717                                &security,
718                                &subject,
719                                &stream,
720                            );
721                            if let Err(e) = result {
722                                metrics.errors_total.inc();
723                                eprintln!("[zerodds-ws-bridged] inbound err conn={conn_id}: {e}");
724                            }
725                        }
726                        Opcode::Ping => {
727                            let pong = Frame::pong(frame.payload);
728                            if let Ok(b) = encode(&pong) {
729                                let mut guard = match stream.lock() {
730                                    Ok(g) => g,
731                                    Err(p) => p.into_inner(),
732                                };
733                                let _ = guard.write_all(&b);
734                            }
735                        }
736                        Opcode::Pong => {}
737                        Opcode::Close => break 'reader,
738                        _ => {}
739                    }
740                }
741            }
742            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
743            Err(_) => break,
744        }
745    }
746
747    // 5. Cleanup.
748    if let Ok(mut r) = router.lock() {
749        r.deregister_connection(conn_id);
750    }
751    let _ = writer_thread.join();
752    drop(conn_guard);
753    Ok(())
754}
755
756/// Eine Connection ist entweder Plain-TCP oder TLS-gewrapped.
757/// Read/Write-Operationen gehen durch denselben Trait, damit der WS-
758/// Reader/Writer-Loop dieselbe Logik für beide Pfade hat.
759#[cfg(feature = "daemon")]
760enum WsStream {
761    /// Plain `TcpStream` — `tls_enabled=false`.
762    Plain(TcpStream),
763    /// Server-Side TLS-Stream auf einer akzeptierten Connection.
764    Tls(Box<StreamOwned<ServerConnection, TcpStream>>),
765}
766
767#[cfg(feature = "daemon")]
768impl WsStream {
769    fn set_read_timeout(&mut self, dur: Option<Duration>) -> std::io::Result<()> {
770        match self {
771            Self::Plain(s) => s.set_read_timeout(dur),
772            Self::Tls(s) => s.sock.set_read_timeout(dur),
773        }
774    }
775}
776
777#[cfg(feature = "daemon")]
778impl Read for WsStream {
779    fn read(&mut self, b: &mut [u8]) -> std::io::Result<usize> {
780        match self {
781            Self::Plain(s) => s.read(b),
782            Self::Tls(s) => s.read(b),
783        }
784    }
785}
786
787#[cfg(feature = "daemon")]
788impl Write for WsStream {
789    fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
790        match self {
791            Self::Plain(s) => s.write(b),
792            Self::Tls(s) => s.write(b),
793        }
794    }
795    fn flush(&mut self) -> std::io::Result<()> {
796        match self {
797            Self::Plain(s) => s.flush(),
798            Self::Tls(s) => s.flush(),
799        }
800    }
801}
802
803/// RAII-Guard, der `connections_active` beim Drop dekrementiert.
804#[cfg(feature = "daemon")]
805struct ConnectionLifetime {
806    active: Arc<zerodds_monitor::Gauge>,
807}
808
809#[cfg(feature = "daemon")]
810impl Drop for ConnectionLifetime {
811    fn drop(&mut self) {
812        self.active.dec();
813    }
814}
815
816#[cfg(feature = "daemon")]
817#[allow(clippy::too_many_arguments)]
818fn handle_inbound_frame(
819    payload: &[u8],
820    conn_id: u64,
821    router: &Arc<Mutex<Router>>,
822    writers: &Arc<std::collections::BTreeMap<String, EntityId>>,
823    runtime: &Arc<DcpsRuntime>,
824    auto_topic: Option<&str>,
825    metrics: &BridgeMetrics,
826    security: &Arc<SecurityCtx>,
827    subject: &AuthSubject,
828    stream: &Arc<Mutex<WsStream>>,
829) -> Result<(), String> {
830    use crate::dds_bridge::{BridgeOp, parse_op};
831    // Versuche JSON-Op zu parsen.
832    let text =
833        core::str::from_utf8(payload).map_err(|e| alloc_format(format_args!("utf8: {e}")))?;
834    if let Ok(op) = parse_op(text) {
835        match op {
836            BridgeOp::Subscribe { topic, .. } => {
837                if !authorize(&security.acl, subject, AclOp::Read, &topic) {
838                    metrics.errors_total.inc();
839                    let err = format!(
840                        "{{\"op\":\"error\",\"code\":403,\"topic\":\"{topic}\",\"reason\":\"acl-deny-read\"}}"
841                    );
842                    send_text_frame(stream, &err);
843                    eprintln!(
844                        "[zerodds-ws-bridged] acl-deny conn={conn_id} subject={} read {topic}",
845                        subject.name
846                    );
847                    return Ok(());
848                }
849                if let Ok(mut r) = router.lock() {
850                    r.subscribe(conn_id, topic);
851                }
852                return Ok(());
853            }
854            BridgeOp::Unsubscribe { topic, .. } => {
855                if let Ok(mut r) = router.lock() {
856                    r.unsubscribe(conn_id, &topic);
857                }
858                return Ok(());
859            }
860            BridgeOp::Publish { topic, data } => {
861                if !authorize(&security.acl, subject, AclOp::Write, &topic) {
862                    metrics.errors_total.inc();
863                    let err = format!(
864                        "{{\"op\":\"error\",\"code\":403,\"topic\":\"{topic}\",\"reason\":\"acl-deny-write\"}}"
865                    );
866                    send_text_frame(stream, &err);
867                    eprintln!(
868                        "[zerodds-ws-bridged] acl-deny conn={conn_id} subject={} write {topic}",
869                        subject.name
870                    );
871                    return Ok(());
872                }
873                if let Some(eid) = writers.get(&topic) {
874                    runtime
875                        .write_user_sample(*eid, data.into_bytes())
876                        .map_err(|e| alloc_format(format_args!("dds-write: {e:?}")))?;
877                    metrics.dds_samples_in_total.inc();
878                }
879                return Ok(());
880            }
881        }
882    }
883    // Fallback: wenn Connection an einen einzelnen Topic-Pfad gebunden
884    // ist, behandle das ganze Frame als opaque Payload-Publish.
885    if let Some(topic) = auto_topic {
886        if !authorize(&security.acl, subject, AclOp::Write, topic) {
887            metrics.errors_total.inc();
888            let err = format!(
889                "{{\"op\":\"error\",\"code\":403,\"topic\":\"{topic}\",\"reason\":\"acl-deny-write\"}}"
890            );
891            send_text_frame(stream, &err);
892            return Ok(());
893        }
894        if let Some(eid) = writers.get(topic) {
895            runtime
896                .write_user_sample(*eid, payload.to_vec())
897                .map_err(|e| alloc_format(format_args!("dds-write: {e:?}")))?;
898            metrics.dds_samples_in_total.inc();
899            return Ok(());
900        }
901    }
902    Ok(())
903}
904
905#[cfg(feature = "daemon")]
906fn send_text_frame(stream: &Arc<Mutex<WsStream>>, text: &str) {
907    let frame = Frame::text(text.to_string());
908    if let Ok(b) = encode(&frame) {
909        let mut g = match stream.lock() {
910            Ok(g) => g,
911            Err(p) => p.into_inner(),
912        };
913        let _ = g.write_all(&b);
914    }
915}
916
917fn render_notify_json(topic: &str, payload: &[u8]) -> String {
918    let payload_text = match core::str::from_utf8(payload) {
919        Ok(s) => s.to_string(),
920        Err(_) => format_bytes_array(payload),
921    };
922    let payload_json = if payload_text.starts_with('{') || payload_text.starts_with('[') {
923        payload_text
924    } else {
925        let mut buf = String::from("\"");
926        for c in payload_text.chars() {
927            match c {
928                '"' => buf.push_str("\\\""),
929                '\\' => buf.push_str("\\\\"),
930                '\n' => buf.push_str("\\n"),
931                '\r' => buf.push_str("\\r"),
932                '\t' => buf.push_str("\\t"),
933                c if (c as u32) < 0x20 => {
934                    buf.push_str(&alloc_format(format_args!("\\u{:04x}", c as u32)));
935                }
936                c => buf.push(c),
937            }
938        }
939        buf.push('"');
940        buf
941    };
942    alloc_format(format_args!(
943        "{{\"op\":\"notify\",\"topic\":\"{topic}\",\"data\":{payload_json}}}"
944    ))
945}
946
947fn format_bytes_array(b: &[u8]) -> String {
948    let mut out = String::from("[");
949    for (i, byte) in b.iter().enumerate() {
950        if i > 0 {
951            out.push(',');
952        }
953        out.push_str(&alloc_format(format_args!("{byte}")));
954    }
955    out.push(']');
956    out
957}
958
959#[cfg(feature = "daemon")]
960fn stable_prefix_for(addr: &str) -> GuidPrefix {
961    let mut bytes = [0u8; 12];
962    let src = addr.as_bytes();
963    for (i, b) in src.iter().take(12).enumerate() {
964        bytes[i] = *b;
965    }
966    bytes[0] ^= 0x42; // damit ein Prefix von 0x00 ausgeschlossen ist
967    GuidPrefix::from_bytes(bytes)
968}
969
970fn alloc_format(args: core::fmt::Arguments<'_>) -> String {
971    use core::fmt::Write as _;
972    let mut s = String::new();
973    let _ = s.write_fmt(args);
974    s
975}
976
977#[cfg(test)]
978#[allow(clippy::expect_used, clippy::unwrap_used)]
979mod tests {
980    use super::*;
981
982    #[test]
983    fn render_notify_json_with_text_payload() {
984        let s = render_notify_json("Trade", b"hello");
985        assert!(s.contains("\"op\":\"notify\""));
986        assert!(s.contains("\"topic\":\"Trade\""));
987        assert!(s.contains("\"hello\""));
988    }
989
990    #[test]
991    fn render_notify_json_with_object_payload() {
992        let s = render_notify_json("X", b"{\"a\":1}");
993        assert!(s.contains("\"data\":{\"a\":1}"));
994    }
995
996    #[test]
997    fn render_notify_json_escapes_quotes() {
998        let s = render_notify_json("X", b"a\"b");
999        assert!(s.contains("\\\""));
1000    }
1001}