1use std::io::{Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
22use std::sync::{Arc, Mutex};
23use std::thread::{self, JoinHandle};
24use std::time::Duration;
25
26use crate::codec::{decode, encode};
27use crate::frame::{Frame, Opcode};
28use crate::handshake::{build_server_response, parse_client_request, render_server_response};
29
30use super::config::{DaemonConfig, TopicConfig};
31use super::router::{Router, RouterMsg};
32#[cfg(feature = "daemon")]
33use super::runtime_common::{
34 BridgeMetrics, CatalogSnapshot, SERVICE_NAME, install_signal_watcher, otlp_config_from_env,
35 serve_admin_endpoints, spawn_otlp_flush_loop,
36};
37#[cfg(feature = "daemon")]
38use super::security::{
39 AclOp, AuthSubject, SecurityCtx, authenticate_ws, authorize, ctx_from_daemon_config,
40 extract_authorization_header, serve_tls_handshake,
41};
42#[cfg(feature = "daemon")]
43use rustls::{ServerConnection, StreamOwned};
44#[cfg(feature = "daemon")]
45use zerodds_monitor::Registry;
46#[cfg(feature = "daemon")]
47use zerodds_observability_otlp::OtlpExporter;
48
49#[cfg(feature = "daemon")]
50use zerodds_dcps::runtime::{
51 DcpsRuntime, RuntimeConfig, UserReaderConfig, UserSample, UserWriterConfig,
52};
53#[cfg(feature = "daemon")]
54use zerodds_rtps::wire_types::{EntityId, GuidPrefix};
55
56#[derive(Debug)]
58pub enum ServerError {
59 Bind(String),
61 Dds(String),
63 Io(String),
65}
66
67impl core::fmt::Display for ServerError {
68 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
69 match self {
70 Self::Bind(m) => write!(f, "bind error: {m}"),
71 Self::Dds(m) => write!(f, "dds error: {m}"),
72 Self::Io(m) => write!(f, "io error: {m}"),
73 }
74 }
75}
76
77impl std::error::Error for ServerError {}
78
79pub struct DaemonHandle {
81 stop: Arc<AtomicBool>,
82 accept_thread: Option<JoinHandle<()>>,
83 pump_threads: Vec<JoinHandle<()>>,
84 #[cfg(feature = "daemon")]
85 admin_thread: Option<JoinHandle<()>>,
86 #[cfg(feature = "daemon")]
87 otlp_thread: Option<JoinHandle<()>>,
88 router: Arc<Mutex<Router>>,
89 pub local_addr: String,
91 #[cfg(feature = "daemon")]
93 pub admin_addr: Option<String>,
94 #[cfg(feature = "daemon")]
96 pub reload_flag: Arc<AtomicBool>,
97 #[cfg(feature = "daemon")]
99 pub healthy: Arc<AtomicBool>,
100 #[cfg(feature = "daemon")]
102 pub metrics: Option<BridgeMetrics>,
103}
104
105impl DaemonHandle {
106 pub fn shutdown(&mut self) {
108 self.stop.store(true, Ordering::SeqCst);
109 #[cfg(feature = "daemon")]
110 {
111 self.healthy.store(false, Ordering::SeqCst);
112 }
113 if let Ok(addr) = self.local_addr.parse::<std::net::SocketAddr>() {
115 let _ = TcpStream::connect_timeout(&addr, Duration::from_millis(200));
117 }
118 #[cfg(feature = "daemon")]
120 if let Some(admin) = self.admin_addr.as_deref() {
121 if let Ok(addr) = admin.parse::<std::net::SocketAddr>() {
122 let _ = TcpStream::connect_timeout(&addr, Duration::from_millis(200));
123 }
124 }
125 if let Ok(r) = self.router.lock() {
126 r.broadcast_shutdown();
127 }
128 if let Some(j) = self.accept_thread.take() {
129 let _ = j.join();
130 }
131 for j in self.pump_threads.drain(..) {
132 let _ = j.join();
133 }
134 #[cfg(feature = "daemon")]
135 {
136 if let Some(j) = self.admin_thread.take() {
137 let _ = j.join();
138 }
139 if let Some(j) = self.otlp_thread.take() {
140 let _ = j.join();
141 }
142 }
143 }
144}
145
146impl Drop for DaemonHandle {
147 fn drop(&mut self) {
148 self.shutdown();
149 }
150}
151#[cfg(feature = "daemon")]
160#[allow(clippy::too_many_lines)]
161pub fn start(cfg: DaemonConfig) -> Result<DaemonHandle, ServerError> {
162 eprintln!(
163 "[zerodds-ws-bridged] starting on {} domain={} topics={}",
164 cfg.listen,
165 cfg.domain,
166 cfg.topics.len()
167 );
168
169 let registry = Arc::new(Registry::new());
171 let metrics = BridgeMetrics::register(®istry);
172
173 let (security_ctx, rotating_tls) = ctx_from_daemon_config(&cfg)
176 .map_err(|e| ServerError::Bind(alloc_format(format_args!("security: {e}"))))?;
177 let security_ctx = Arc::new(security_ctx);
178 let rotating_tls = rotating_tls.map(Arc::new);
179 if rotating_tls.is_some() {
180 eprintln!(
181 "[zerodds-ws-bridged] TLS active (cert={}, mtls={})",
182 cfg.tls_cert_file,
183 !cfg.tls_client_ca_file.is_empty(),
184 );
185 }
186 eprintln!(
187 "[zerodds-ws-bridged] auth-mode={} acl-entries={}",
188 cfg.auth_mode,
189 cfg.topic_acl.len()
190 );
191
192 let prefix = stable_prefix_for(&cfg.listen);
194 let runtime = DcpsRuntime::start(cfg.domain, prefix, RuntimeConfig::default())
195 .map_err(|e| ServerError::Dds(alloc_format(format_args!("{e:?}"))))?;
196 let healthy = Arc::new(AtomicBool::new(true));
197
198 let mut writers: std::collections::BTreeMap<String, EntityId> =
200 std::collections::BTreeMap::new();
201 let mut readers: Vec<(String, std::sync::mpsc::Receiver<UserSample>)> = Vec::new();
202 for topic in &cfg.topics {
203 let (reader_eid, writer_eid) = register_topic_endpoints(&runtime, topic)?;
204 if let Some((eid, rx)) = reader_eid {
205 let _ = eid;
206 readers.push((topic.name.clone(), rx));
207 }
208 if let Some(eid) = writer_eid {
209 writers.insert(topic.name.clone(), eid);
210 }
211 }
212
213 let router = Arc::new(Mutex::new(Router::new()));
215 let listener = TcpListener::bind(&cfg.listen)
216 .map_err(|e| ServerError::Bind(alloc_format(format_args!("{e}"))))?;
217 let local_addr = listener
218 .local_addr()
219 .map(|a| a.to_string())
220 .unwrap_or_else(|_| cfg.listen.clone());
221 listener
222 .set_nonblocking(false)
223 .map_err(|e| ServerError::Io(alloc_format(format_args!("{e}"))))?;
224
225 eprintln!("[zerodds-ws-bridged] bound on {local_addr}");
226
227 let stop = Arc::new(AtomicBool::new(false));
228 let reload_flag = Arc::new(AtomicBool::new(false));
229
230 let mut pump_threads = Vec::new();
232 for (topic_name, rx) in readers {
233 let router_c = Arc::clone(&router);
234 let stop_c = Arc::clone(&stop);
235 let topic_name_c = topic_name.clone();
236 let dds_out = Arc::clone(&metrics.dds_samples_out_total);
237 let h = thread::spawn(move || {
238 while !stop_c.load(Ordering::SeqCst) {
239 match rx.recv_timeout(Duration::from_millis(200)) {
240 Ok(UserSample::Alive { payload, .. }) => {
241 dds_out.inc();
242 if let Ok(mut r) = router_c.lock() {
243 r.dispatch(&topic_name_c, payload);
244 }
245 }
246 Ok(UserSample::Lifecycle { .. }) => {
247 }
251 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
252 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
253 }
254 }
255 });
256 pump_threads.push(h);
257 }
258
259 let next_conn_id = Arc::new(AtomicU64::new(1));
261 let stop_acc = Arc::clone(&stop);
262 let router_acc = Arc::clone(&router);
263 let writers_arc = Arc::new(writers);
264 let runtime_acc = Arc::clone(&runtime);
265 let topics_arc = Arc::new(cfg.topics.clone());
266 let metrics_acc = metrics.clone();
267 let security_acc = Arc::clone(&security_ctx);
268 let rotating_acc = rotating_tls.clone();
269
270 let accept_thread = thread::spawn(move || {
271 for incoming in listener.incoming() {
272 if stop_acc.load(Ordering::SeqCst) {
273 break;
274 }
275 match incoming {
276 Ok(tcp) => {
277 let conn_id = next_conn_id.fetch_add(1, Ordering::SeqCst);
278 let router_h = Arc::clone(&router_acc);
279 let writers_h = Arc::clone(&writers_arc);
280 let runtime_h = Arc::clone(&runtime_acc);
281 let stop_h = Arc::clone(&stop_acc);
282 let topics_h = Arc::clone(&topics_arc);
283 let metrics_h = metrics_acc.clone();
284 let security_h = Arc::clone(&security_acc);
285 let rot_h = rotating_acc.clone();
286 thread::spawn(move || {
287 let (stream, mtls_subj) = if let Some(rot) = rot_h.as_ref() {
289 let cfg = rot.current();
290 match serve_tls_handshake(cfg, tcp, Duration::from_secs(5)) {
291 Ok((tcp, conn, subj)) => {
292 (WsStream::Tls(Box::new(StreamOwned::new(conn, tcp))), subj)
293 }
294 Err(e) => {
295 metrics_h.errors_total.inc();
296 eprintln!(
297 "[zerodds-ws-bridged] tls handshake err conn={conn_id}: {e}"
298 );
299 return;
300 }
301 }
302 } else {
303 (WsStream::Plain(tcp), None)
304 };
305 let _ = serve_connection(
306 conn_id, stream, mtls_subj, router_h, writers_h, runtime_h, stop_h,
307 topics_h, metrics_h, security_h,
308 );
309 });
310 }
311 Err(e) => {
312 eprintln!("[zerodds-ws-bridged] accept error: {e}");
313 continue;
314 }
315 }
316 }
317 });
318
319 let mut admin_thread: Option<JoinHandle<()>> = None;
321 let mut admin_addr: Option<String> = None;
322 if cfg.metrics_enabled || !cfg.metrics_addr.is_empty() {
323 let bind_str = if cfg.metrics_addr.is_empty() {
324 "127.0.0.1:9090".to_string()
325 } else {
326 cfg.metrics_addr.clone()
327 };
328 match bind_str.parse::<std::net::SocketAddr>() {
329 Ok(sock) => {
330 let snap = Arc::new(CatalogSnapshot::from_config(&cfg));
331 match serve_admin_endpoints(
332 sock,
333 snap,
334 Arc::clone(®istry),
335 Arc::clone(&healthy),
336 Arc::clone(&stop),
337 ) {
338 Ok((h, bound)) => {
339 eprintln!(
340 "[{SERVICE_NAME}] admin endpoint on {bound} (/metrics /catalog /healthz)"
341 );
342 admin_addr = Some(bound.to_string());
343 admin_thread = Some(h);
344 }
345 Err(e) => {
346 eprintln!("[{SERVICE_NAME}] admin bind error: {e}");
347 }
348 }
349 }
350 Err(e) => {
351 eprintln!("[{SERVICE_NAME}] admin addr parse error: {e}");
352 }
353 }
354 }
355
356 if let Err(e) = install_signal_watcher(Arc::clone(&stop), Arc::clone(&reload_flag)) {
358 eprintln!("[{SERVICE_NAME}] signal watcher init failed: {e}");
359 }
360
361 if let Some(rot) = rotating_tls.clone() {
364 let stop_r = Arc::clone(&stop);
365 let reload_r = Arc::clone(&reload_flag);
366 thread::Builder::new()
367 .name("zerodds-ws-tls-reload".into())
368 .spawn(move || {
369 while !stop_r.load(Ordering::SeqCst) {
370 thread::sleep(Duration::from_millis(250));
371 if reload_r.swap(false, Ordering::SeqCst) {
372 match rot.reload() {
373 Ok(()) => eprintln!(
374 "[{SERVICE_NAME}] SIGHUP TLS-cert reloaded"
375 ),
376 Err(e) => eprintln!(
377 "[{SERVICE_NAME}] SIGHUP TLS-cert reload FAILED: {e} (keeping old cert)"
378 ),
379 }
380 }
381 }
382 })
383 .ok();
384 }
385
386 let otlp_thread = if let Some(otlp_cfg) = otlp_config_from_env(SERVICE_NAME) {
388 let exporter = Arc::new(OtlpExporter::new(otlp_cfg));
389 match spawn_otlp_flush_loop(exporter, Arc::clone(&stop), Duration::from_secs(5)) {
390 Ok(h) => {
391 eprintln!("[{SERVICE_NAME}] OTLP exporter active");
392 Some(h)
393 }
394 Err(e) => {
395 eprintln!("[{SERVICE_NAME}] OTLP spawn failed: {e}");
396 None
397 }
398 }
399 } else {
400 None
401 };
402
403 Ok(DaemonHandle {
404 stop,
405 accept_thread: Some(accept_thread),
406 pump_threads,
407 admin_thread,
408 otlp_thread,
409 router,
410 local_addr,
411 admin_addr,
412 reload_flag,
413 healthy,
414 metrics: Some(metrics),
415 })
416}
417
418#[cfg(feature = "daemon")]
419type ReaderEndpoint = (EntityId, std::sync::mpsc::Receiver<UserSample>);
420#[cfg(feature = "daemon")]
421type TopicEndpoints = (Option<ReaderEndpoint>, Option<EntityId>);
422
423#[cfg(feature = "daemon")]
424fn register_topic_endpoints(
425 rt: &Arc<DcpsRuntime>,
426 topic: &TopicConfig,
427) -> Result<TopicEndpoints, ServerError> {
428 use zerodds_qos::{
429 DeadlineQosPolicy, DurabilityKind, LifespanQosPolicy, LivelinessQosPolicy, OwnershipKind,
430 };
431 let durability = match topic.durability.as_str() {
432 "transient_local" => DurabilityKind::TransientLocal,
433 "transient" => DurabilityKind::Transient,
434 "persistent" => DurabilityKind::Persistent,
435 _ => DurabilityKind::Volatile,
436 };
437 let reliable = !matches!(topic.reliability.as_str(), "best_effort");
438 let want_reader =
439 matches!(topic.direction.as_str(), "in" | "bidir") || topic.direction.is_empty();
440 let want_writer =
441 matches!(topic.direction.as_str(), "out" | "bidir") || topic.direction.is_empty();
442
443 let reader = if want_reader {
444 let (eid, rx) = rt
445 .register_user_reader(UserReaderConfig {
446 topic_name: topic.name.clone(),
447 type_name: if topic.type_name.is_empty() {
448 topic.name.clone()
449 } else {
450 topic.type_name.clone()
451 },
452 reliable,
453 durability,
454 deadline: DeadlineQosPolicy::default(),
455 liveliness: LivelinessQosPolicy::default(),
456 ownership: OwnershipKind::Shared,
457 partition: Vec::new(),
458 user_data: Vec::new(),
459 topic_data: Vec::new(),
460 group_data: Vec::new(),
461 type_identifier: zerodds_types::TypeIdentifier::None,
462 type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
463 data_representation_offer: None,
464 })
465 .map_err(|e| ServerError::Dds(alloc_format(format_args!("reader: {e:?}"))))?;
466 Some((eid, rx))
467 } else {
468 None
469 };
470
471 let writer = if want_writer {
472 let eid = rt
473 .register_user_writer(UserWriterConfig {
474 topic_name: topic.name.clone(),
475 type_name: if topic.type_name.is_empty() {
476 topic.name.clone()
477 } else {
478 topic.type_name.clone()
479 },
480 reliable,
481 durability,
482 deadline: DeadlineQosPolicy::default(),
483 lifespan: LifespanQosPolicy::default(),
484 liveliness: LivelinessQosPolicy::default(),
485 ownership: OwnershipKind::Shared,
486 ownership_strength: 0,
487 partition: Vec::new(),
488 user_data: Vec::new(),
489 topic_data: Vec::new(),
490 group_data: Vec::new(),
491 type_identifier: zerodds_types::TypeIdentifier::None,
492 data_representation_offer: None,
493 })
494 .map_err(|e| ServerError::Dds(alloc_format(format_args!("writer: {e:?}"))))?;
495 Some(eid)
496 } else {
497 None
498 };
499
500 Ok((reader, writer))
501}
502
503#[cfg(feature = "daemon")]
504#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
505fn serve_connection(
506 conn_id: u64,
507 mut stream: WsStream,
508 mtls_subject: Option<AuthSubject>,
509 router: Arc<Mutex<Router>>,
510 writers: Arc<std::collections::BTreeMap<String, EntityId>>,
511 runtime: Arc<DcpsRuntime>,
512 stop: Arc<AtomicBool>,
513 topics: Arc<Vec<TopicConfig>>,
514 metrics: BridgeMetrics,
515 security: Arc<SecurityCtx>,
516) -> Result<(), ServerError> {
517 metrics.connections_total.inc();
518 metrics.connections_active.inc();
519 let conn_guard = ConnectionLifetime {
520 active: Arc::clone(&metrics.connections_active),
521 };
522 stream
523 .set_read_timeout(Some(Duration::from_millis(500)))
524 .ok();
525
526 let mut buf = [0u8; 4096];
528 let mut accumulated = Vec::new();
529 let req_str = loop {
530 match stream.read(&mut buf) {
531 Ok(0) => return Err(ServerError::Io("eof during handshake".to_string())),
532 Ok(n) => {
533 accumulated.extend_from_slice(&buf[..n]);
534 if accumulated.windows(4).any(|w| w == b"\r\n\r\n") {
535 let s = String::from_utf8_lossy(&accumulated).to_string();
536 break s;
537 }
538 if accumulated.len() > 64 * 1024 {
539 return Err(ServerError::Io("handshake too large".to_string()));
540 }
541 }
542 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
543 Err(e) => return Err(ServerError::Io(e.to_string())),
544 }
545 };
546
547 let req = match parse_client_request(&req_str) {
548 Ok(r) => r,
549 Err(e) => {
550 let _ = stream.write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n");
551 return Err(ServerError::Io(alloc_format(format_args!(
552 "handshake parse: {e:?}"
553 ))));
554 }
555 };
556
557 let auth_header = extract_authorization_header(&req_str);
559 let auth_headers: Vec<(String, String)> = if let Some(v) = auth_header {
560 vec![("authorization".to_string(), v)]
561 } else {
562 Vec::new()
563 };
564 let subject = match authenticate_ws(&security.auth, &auth_headers, mtls_subject.clone()) {
565 Ok(s) => s,
566 Err(e) => {
567 metrics.errors_total.inc();
568 let body = b"unauthorized";
569 let resp = format!(
570 "HTTP/1.1 401 Unauthorized\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nWWW-Authenticate: Bearer realm=\"zerodds-ws\"\r\nConnection: close\r\n\r\nunauthorized",
571 body.len()
572 );
573 let _ = stream.write_all(resp.as_bytes());
574 eprintln!("[zerodds-ws-bridged] auth reject conn={conn_id} reason={e}");
575 return Err(ServerError::Io(alloc_format(format_args!(
576 "auth reject: {e}"
577 ))));
578 }
579 };
580
581 let mut auto_topic: Option<String> = None;
583 for t in topics.iter() {
584 if t.ws_path == req.path || super::config::default_ws_path(&t.name) == req.path {
585 auto_topic = Some(t.name.clone());
586 break;
587 }
588 }
589
590 if let Some(topic) = &auto_topic {
592 if !authorize(&security.acl, &subject, AclOp::Read, topic) {
593 metrics.errors_total.inc();
594 let body = format!("forbidden: read on {topic}");
595 let resp = format!(
596 "HTTP/1.1 403 Forbidden\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n{body}",
597 body.len()
598 );
599 let _ = stream.write_all(resp.as_bytes());
600 eprintln!(
601 "[zerodds-ws-bridged] acl reject conn={conn_id} subject={} topic={topic}",
602 subject.name
603 );
604 return Err(ServerError::Io(alloc_format(format_args!(
605 "acl reject: {topic}"
606 ))));
607 }
608 }
609
610 let resp = build_server_response(&req);
611 let resp_bytes = render_server_response(&resp);
612 stream
613 .write_all(resp_bytes.as_bytes())
614 .map_err(|e| ServerError::Io(e.to_string()))?;
615
616 let (tx, rx) = std::sync::mpsc::channel::<RouterMsg>();
618 if let Ok(mut r) = router.lock() {
619 r.register_connection(conn_id, tx);
620 if let Some(topic) = &auto_topic {
621 r.subscribe(conn_id, topic.clone());
622 }
623 }
624
625 let stream = Arc::new(Mutex::new(stream));
631 let stop_w = Arc::clone(&stop);
632 let frames_out = Arc::clone(&metrics.frames_out_total);
633 let bytes_out = Arc::clone(&metrics.bytes_out_total);
634 let errors_out = Arc::clone(&metrics.errors_total);
635 let stream_w = Arc::clone(&stream);
636 let security_w = Arc::clone(&security);
640 let subject_w = subject.clone();
641 let writer_thread = thread::spawn(move || {
642 while !stop_w.load(Ordering::SeqCst) {
643 match rx.recv_timeout(Duration::from_millis(200)) {
644 Ok(RouterMsg::Sample { topic, payload }) => {
645 if !authorize(&security_w.acl, &subject_w, AclOp::Read, &topic) {
649 continue;
650 }
651 let json = render_notify_json(&topic, &payload);
652 let frame = Frame::text(json);
653 if let Ok(bytes) = encode(&frame) {
654 bytes_out.add(bytes.len() as u64);
655 let mut guard = match stream_w.lock() {
656 Ok(g) => g,
657 Err(p) => p.into_inner(),
658 };
659 if guard.write_all(&bytes).is_err() {
660 errors_out.inc();
661 break;
662 }
663 frames_out.inc();
664 } else {
665 errors_out.inc();
666 }
667 }
668 Ok(RouterMsg::Shutdown) => {
669 let close = Frame::close(1001, "going away");
670 if let Ok(b) = encode(&close) {
671 let mut guard = match stream_w.lock() {
672 Ok(g) => g,
673 Err(p) => p.into_inner(),
674 };
675 let _ = guard.write_all(&b);
676 }
677 break;
678 }
679 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
680 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
681 }
682 }
683 });
684
685 let mut frame_buf: Vec<u8> = Vec::new();
687 'reader: loop {
688 if stop.load(Ordering::SeqCst) {
689 break;
690 }
691 let read_result = {
692 let mut guard = match stream.lock() {
693 Ok(g) => g,
694 Err(p) => p.into_inner(),
695 };
696 guard.read(&mut buf)
697 };
698 match read_result {
699 Ok(0) => break,
700 Ok(n) => {
701 frame_buf.extend_from_slice(&buf[..n]);
702 while let Ok((frame, used)) = decode(&frame_buf) {
703 frame_buf.drain(..used);
704 match frame.opcode {
705 Opcode::Text | Opcode::Binary => {
706 let payload = frame.payload;
707 metrics.frames_in_total.inc();
708 metrics.bytes_in_total.add(payload.len() as u64);
709 let result = handle_inbound_frame(
710 &payload,
711 conn_id,
712 &router,
713 &writers,
714 &runtime,
715 auto_topic.as_deref(),
716 &metrics,
717 &security,
718 &subject,
719 &stream,
720 );
721 if let Err(e) = result {
722 metrics.errors_total.inc();
723 eprintln!("[zerodds-ws-bridged] inbound err conn={conn_id}: {e}");
724 }
725 }
726 Opcode::Ping => {
727 let pong = Frame::pong(frame.payload);
728 if let Ok(b) = encode(&pong) {
729 let mut guard = match stream.lock() {
730 Ok(g) => g,
731 Err(p) => p.into_inner(),
732 };
733 let _ = guard.write_all(&b);
734 }
735 }
736 Opcode::Pong => {}
737 Opcode::Close => break 'reader,
738 _ => {}
739 }
740 }
741 }
742 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
743 Err(_) => break,
744 }
745 }
746
747 if let Ok(mut r) = router.lock() {
749 r.deregister_connection(conn_id);
750 }
751 let _ = writer_thread.join();
752 drop(conn_guard);
753 Ok(())
754}
755
756#[cfg(feature = "daemon")]
760enum WsStream {
761 Plain(TcpStream),
763 Tls(Box<StreamOwned<ServerConnection, TcpStream>>),
765}
766
767#[cfg(feature = "daemon")]
768impl WsStream {
769 fn set_read_timeout(&mut self, dur: Option<Duration>) -> std::io::Result<()> {
770 match self {
771 Self::Plain(s) => s.set_read_timeout(dur),
772 Self::Tls(s) => s.sock.set_read_timeout(dur),
773 }
774 }
775}
776
777#[cfg(feature = "daemon")]
778impl Read for WsStream {
779 fn read(&mut self, b: &mut [u8]) -> std::io::Result<usize> {
780 match self {
781 Self::Plain(s) => s.read(b),
782 Self::Tls(s) => s.read(b),
783 }
784 }
785}
786
787#[cfg(feature = "daemon")]
788impl Write for WsStream {
789 fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
790 match self {
791 Self::Plain(s) => s.write(b),
792 Self::Tls(s) => s.write(b),
793 }
794 }
795 fn flush(&mut self) -> std::io::Result<()> {
796 match self {
797 Self::Plain(s) => s.flush(),
798 Self::Tls(s) => s.flush(),
799 }
800 }
801}
802
803#[cfg(feature = "daemon")]
805struct ConnectionLifetime {
806 active: Arc<zerodds_monitor::Gauge>,
807}
808
809#[cfg(feature = "daemon")]
810impl Drop for ConnectionLifetime {
811 fn drop(&mut self) {
812 self.active.dec();
813 }
814}
815
816#[cfg(feature = "daemon")]
817#[allow(clippy::too_many_arguments)]
818fn handle_inbound_frame(
819 payload: &[u8],
820 conn_id: u64,
821 router: &Arc<Mutex<Router>>,
822 writers: &Arc<std::collections::BTreeMap<String, EntityId>>,
823 runtime: &Arc<DcpsRuntime>,
824 auto_topic: Option<&str>,
825 metrics: &BridgeMetrics,
826 security: &Arc<SecurityCtx>,
827 subject: &AuthSubject,
828 stream: &Arc<Mutex<WsStream>>,
829) -> Result<(), String> {
830 use crate::dds_bridge::{BridgeOp, parse_op};
831 let text =
833 core::str::from_utf8(payload).map_err(|e| alloc_format(format_args!("utf8: {e}")))?;
834 if let Ok(op) = parse_op(text) {
835 match op {
836 BridgeOp::Subscribe { topic, .. } => {
837 if !authorize(&security.acl, subject, AclOp::Read, &topic) {
838 metrics.errors_total.inc();
839 let err = format!(
840 "{{\"op\":\"error\",\"code\":403,\"topic\":\"{topic}\",\"reason\":\"acl-deny-read\"}}"
841 );
842 send_text_frame(stream, &err);
843 eprintln!(
844 "[zerodds-ws-bridged] acl-deny conn={conn_id} subject={} read {topic}",
845 subject.name
846 );
847 return Ok(());
848 }
849 if let Ok(mut r) = router.lock() {
850 r.subscribe(conn_id, topic);
851 }
852 return Ok(());
853 }
854 BridgeOp::Unsubscribe { topic, .. } => {
855 if let Ok(mut r) = router.lock() {
856 r.unsubscribe(conn_id, &topic);
857 }
858 return Ok(());
859 }
860 BridgeOp::Publish { topic, data } => {
861 if !authorize(&security.acl, subject, AclOp::Write, &topic) {
862 metrics.errors_total.inc();
863 let err = format!(
864 "{{\"op\":\"error\",\"code\":403,\"topic\":\"{topic}\",\"reason\":\"acl-deny-write\"}}"
865 );
866 send_text_frame(stream, &err);
867 eprintln!(
868 "[zerodds-ws-bridged] acl-deny conn={conn_id} subject={} write {topic}",
869 subject.name
870 );
871 return Ok(());
872 }
873 if let Some(eid) = writers.get(&topic) {
874 runtime
875 .write_user_sample(*eid, data.into_bytes())
876 .map_err(|e| alloc_format(format_args!("dds-write: {e:?}")))?;
877 metrics.dds_samples_in_total.inc();
878 }
879 return Ok(());
880 }
881 }
882 }
883 if let Some(topic) = auto_topic {
886 if !authorize(&security.acl, subject, AclOp::Write, topic) {
887 metrics.errors_total.inc();
888 let err = format!(
889 "{{\"op\":\"error\",\"code\":403,\"topic\":\"{topic}\",\"reason\":\"acl-deny-write\"}}"
890 );
891 send_text_frame(stream, &err);
892 return Ok(());
893 }
894 if let Some(eid) = writers.get(topic) {
895 runtime
896 .write_user_sample(*eid, payload.to_vec())
897 .map_err(|e| alloc_format(format_args!("dds-write: {e:?}")))?;
898 metrics.dds_samples_in_total.inc();
899 return Ok(());
900 }
901 }
902 Ok(())
903}
904
905#[cfg(feature = "daemon")]
906fn send_text_frame(stream: &Arc<Mutex<WsStream>>, text: &str) {
907 let frame = Frame::text(text.to_string());
908 if let Ok(b) = encode(&frame) {
909 let mut g = match stream.lock() {
910 Ok(g) => g,
911 Err(p) => p.into_inner(),
912 };
913 let _ = g.write_all(&b);
914 }
915}
916
917fn render_notify_json(topic: &str, payload: &[u8]) -> String {
918 let payload_text = match core::str::from_utf8(payload) {
919 Ok(s) => s.to_string(),
920 Err(_) => format_bytes_array(payload),
921 };
922 let payload_json = if payload_text.starts_with('{') || payload_text.starts_with('[') {
923 payload_text
924 } else {
925 let mut buf = String::from("\"");
926 for c in payload_text.chars() {
927 match c {
928 '"' => buf.push_str("\\\""),
929 '\\' => buf.push_str("\\\\"),
930 '\n' => buf.push_str("\\n"),
931 '\r' => buf.push_str("\\r"),
932 '\t' => buf.push_str("\\t"),
933 c if (c as u32) < 0x20 => {
934 buf.push_str(&alloc_format(format_args!("\\u{:04x}", c as u32)));
935 }
936 c => buf.push(c),
937 }
938 }
939 buf.push('"');
940 buf
941 };
942 alloc_format(format_args!(
943 "{{\"op\":\"notify\",\"topic\":\"{topic}\",\"data\":{payload_json}}}"
944 ))
945}
946
947fn format_bytes_array(b: &[u8]) -> String {
948 let mut out = String::from("[");
949 for (i, byte) in b.iter().enumerate() {
950 if i > 0 {
951 out.push(',');
952 }
953 out.push_str(&alloc_format(format_args!("{byte}")));
954 }
955 out.push(']');
956 out
957}
958
959#[cfg(feature = "daemon")]
960fn stable_prefix_for(addr: &str) -> GuidPrefix {
961 let mut bytes = [0u8; 12];
962 let src = addr.as_bytes();
963 for (i, b) in src.iter().take(12).enumerate() {
964 bytes[i] = *b;
965 }
966 bytes[0] ^= 0x42; GuidPrefix::from_bytes(bytes)
968}
969
970fn alloc_format(args: core::fmt::Arguments<'_>) -> String {
971 use core::fmt::Write as _;
972 let mut s = String::new();
973 let _ = s.write_fmt(args);
974 s
975}
976
977#[cfg(test)]
978#[allow(clippy::expect_used, clippy::unwrap_used)]
979mod tests {
980 use super::*;
981
982 #[test]
983 fn render_notify_json_with_text_payload() {
984 let s = render_notify_json("Trade", b"hello");
985 assert!(s.contains("\"op\":\"notify\""));
986 assert!(s.contains("\"topic\":\"Trade\""));
987 assert!(s.contains("\"hello\""));
988 }
989
990 #[test]
991 fn render_notify_json_with_object_payload() {
992 let s = render_notify_json("X", b"{\"a\":1}");
993 assert!(s.contains("\"data\":{\"a\":1}"));
994 }
995
996 #[test]
997 fn render_notify_json_escapes_quotes() {
998 let s = render_notify_json("X", b"a\"b");
999 assert!(s.contains("\\\""));
1000 }
1001}