Skip to main content

zerodds_websocket_bridge/
dds_bridge.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! WebSocket↔DDS-Topic-Bridge.
5//!
6//! Wire-Format JSON (Text-Frames):
7//!
8//! Subscribe (Client → Server):
9//! ```json
10//! {"op": "subscribe", "topic": "Trade", "id": "sub-1"}
11//! ```
12//!
13//! Unsubscribe:
14//! ```json
15//! {"op": "unsubscribe", "topic": "Trade", "id": "sub-1"}
16//! ```
17//!
18//! Publish (Client → Server):
19//! ```json
20//! {"op": "publish", "topic": "Trade", "data": {"symbol": "AAPL", "price": 200.5}}
21//! ```
22//!
23//! Notification (Server → Client):
24//! ```json
25//! {"op": "notify", "topic": "Trade", "data": {...}, "subscription_id": "sub-1"}
26//! ```
27
28use alloc::collections::BTreeMap;
29use alloc::string::{String, ToString};
30use alloc::vec::Vec;
31
32/// Bridge-Operation aus dem JSON-Frame.
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum BridgeOp {
35    /// Subscribe to topic.
36    Subscribe {
37        /// Topic-Name.
38        topic: String,
39        /// Optional client-side subscription-id (echoed in notifications).
40        id: Option<String>,
41    },
42    /// Unsubscribe.
43    Unsubscribe {
44        /// Topic-Name.
45        topic: String,
46        /// Subscription-id.
47        id: Option<String>,
48    },
49    /// Publish a sample.
50    Publish {
51        /// Topic-Name.
52        topic: String,
53        /// Raw JSON-encoded sample (Caller-Layer mappt zu IDL-Type).
54        data: String,
55    },
56}
57
58/// Notification an Subscriber.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct Notification {
61    /// Topic-Name.
62    pub topic: String,
63    /// Sample-JSON.
64    pub data: String,
65    /// Optional Subscription-Id (echo).
66    pub subscription_id: Option<String>,
67}
68
69/// Bridge-Fehler.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum BridgeError {
72    /// JSON nicht parsbar.
73    BadJson(String),
74    /// `op`-Feld fehlt oder unbekannt.
75    UnknownOp(String),
76    /// `topic`-Feld fehlt oder leer.
77    MissingTopic,
78    /// Daten-Feld fehlt bei `publish`.
79    MissingData,
80}
81
82impl core::fmt::Display for BridgeError {
83    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
84        match self {
85            Self::BadJson(s) => write!(f, "bad json: {s}"),
86            Self::UnknownOp(s) => write!(f, "unknown op: {s}"),
87            Self::MissingTopic => f.write_str("missing topic"),
88            Self::MissingData => f.write_str("missing data"),
89        }
90    }
91}
92
93#[cfg(feature = "std")]
94impl std::error::Error for BridgeError {}
95
96/// Minimal-JSON-Parser fuer das Bridge-Wire-Format. Erkennt Top-Level-
97/// Objekt mit String/Object-Werten — keinen vollen JSON-Stack
98/// (Nested-Objekte landen als Roh-String im `data`-Feld).
99fn parse_top_level_object(input: &str) -> Result<BTreeMap<String, String>, BridgeError> {
100    let s = input.trim();
101    let s = s
102        .strip_prefix('{')
103        .ok_or_else(|| BridgeError::BadJson("expected `{`".into()))?;
104    let s = s
105        .strip_suffix('}')
106        .ok_or_else(|| BridgeError::BadJson("expected `}`".into()))?;
107    let mut out = BTreeMap::new();
108    let mut chars = s.char_indices().peekable();
109    while let Some((_, c)) = chars.peek().copied() {
110        if c.is_whitespace() || c == ',' {
111            chars.next();
112            continue;
113        }
114        let key = parse_json_string(&mut chars, s)?;
115        // expect ':'
116        skip_ws_to(&mut chars);
117        match chars.next() {
118            Some((_, ':')) => {}
119            _ => return Err(BridgeError::BadJson("expected `:`".into())),
120        }
121        skip_ws_to(&mut chars);
122        let value = parse_json_value(&mut chars, s)?;
123        out.insert(key, value);
124    }
125    Ok(out)
126}
127
128fn skip_ws_to(chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>) {
129    while let Some((_, c)) = chars.peek().copied() {
130        if c.is_whitespace() {
131            chars.next();
132        } else {
133            break;
134        }
135    }
136}
137
138fn parse_json_string(
139    chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
140    src: &str,
141) -> Result<String, BridgeError> {
142    skip_ws_to(chars);
143    match chars.next() {
144        Some((_, '"')) => {}
145        _ => return Err(BridgeError::BadJson("expected `\"`".into())),
146    }
147    let start = chars
148        .peek()
149        .map(|(i, _)| *i)
150        .ok_or_else(|| BridgeError::BadJson("eof".into()))?;
151    let mut end = start;
152    while let Some((i, c)) = chars.next() {
153        if c == '"' {
154            return Ok(src[start..i].to_string());
155        }
156        if c == '\\' {
157            chars.next(); // skip escaped char
158        }
159        end = i + c.len_utf8();
160    }
161    let _ = end;
162    Err(BridgeError::BadJson("unterminated string".into()))
163}
164
165fn parse_json_value(
166    chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
167    src: &str,
168) -> Result<String, BridgeError> {
169    skip_ws_to(chars);
170    match chars.peek().map(|(_, c)| *c) {
171        Some('"') => parse_json_string(chars, src),
172        Some('{') => parse_json_object_to_string(chars, src),
173        Some('[') => parse_json_array_to_string(chars, src),
174        _ => parse_json_scalar(chars, src),
175    }
176}
177
178fn parse_json_object_to_string(
179    chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
180    src: &str,
181) -> Result<String, BridgeError> {
182    let start = chars
183        .peek()
184        .map(|(i, _)| *i)
185        .ok_or_else(|| BridgeError::BadJson("eof".into()))?;
186    let mut depth = 0i32;
187    while let Some((i, c)) = chars.next() {
188        match c {
189            '{' => depth += 1,
190            '}' => {
191                depth -= 1;
192                if depth == 0 {
193                    return Ok(src[start..i + 1].to_string());
194                }
195            }
196            '"' => {
197                // skip whole string
198                while let Some((_, sc)) = chars.next() {
199                    if sc == '"' {
200                        break;
201                    }
202                    if sc == '\\' {
203                        chars.next();
204                    }
205                }
206            }
207            _ => {}
208        }
209    }
210    Err(BridgeError::BadJson("unterminated object".into()))
211}
212
213fn parse_json_array_to_string(
214    chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
215    src: &str,
216) -> Result<String, BridgeError> {
217    let start = chars
218        .peek()
219        .map(|(i, _)| *i)
220        .ok_or_else(|| BridgeError::BadJson("eof".into()))?;
221    let mut depth = 0i32;
222    while let Some((i, c)) = chars.next() {
223        match c {
224            '[' => depth += 1,
225            ']' => {
226                depth -= 1;
227                if depth == 0 {
228                    return Ok(src[start..i + 1].to_string());
229                }
230            }
231            '"' => {
232                while let Some((_, sc)) = chars.next() {
233                    if sc == '"' {
234                        break;
235                    }
236                    if sc == '\\' {
237                        chars.next();
238                    }
239                }
240            }
241            _ => {}
242        }
243    }
244    Err(BridgeError::BadJson("unterminated array".into()))
245}
246
247fn parse_json_scalar(
248    chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
249    src: &str,
250) -> Result<String, BridgeError> {
251    let start = chars
252        .peek()
253        .map(|(i, _)| *i)
254        .ok_or_else(|| BridgeError::BadJson("eof".into()))?;
255    let mut end = start;
256    while let Some((i, c)) = chars.peek().copied() {
257        if c == ',' || c == '}' || c.is_whitespace() {
258            break;
259        }
260        end = i + c.len_utf8();
261        chars.next();
262    }
263    Ok(src[start..end].to_string())
264}
265
266/// Parst ein WebSocket-Text-Frame in eine Bridge-Operation.
267///
268/// # Errors
269/// Siehe [`BridgeError`].
270pub fn parse_op(text: &str) -> Result<BridgeOp, BridgeError> {
271    let map = parse_top_level_object(text)?;
272    let op = map
273        .get("op")
274        .ok_or_else(|| BridgeError::UnknownOp("(missing)".into()))?;
275    let topic = map
276        .get("topic")
277        .filter(|s| !s.is_empty())
278        .ok_or(BridgeError::MissingTopic)?
279        .clone();
280    let id = map.get("id").cloned();
281    match op.as_str() {
282        "subscribe" => Ok(BridgeOp::Subscribe { topic, id }),
283        "unsubscribe" => Ok(BridgeOp::Unsubscribe { topic, id }),
284        "publish" => {
285            let data = map.get("data").ok_or(BridgeError::MissingData)?.clone();
286            Ok(BridgeOp::Publish { topic, data })
287        }
288        other => Err(BridgeError::UnknownOp(other.to_string())),
289    }
290}
291
292/// Render eine Notification zu einem JSON-Text-Frame.
293#[must_use]
294pub fn render_notification(n: &Notification) -> String {
295    let mut s = alloc::format!(
296        "{{\"op\":\"notify\",\"topic\":\"{}\",\"data\":{}",
297        json_escape(&n.topic),
298        n.data
299    );
300    if let Some(id) = &n.subscription_id {
301        s.push_str(&alloc::format!(
302            ",\"subscription_id\":\"{}\"",
303            json_escape(id)
304        ));
305    }
306    s.push('}');
307    s
308}
309
310fn json_escape(s: &str) -> String {
311    s.replace('\\', "\\\\").replace('"', "\\\"")
312}
313
314/// Subscription-Registry. Pro WebSocket-Connection ein Set von Topics.
315#[derive(Debug, Clone, PartialEq, Eq, Default)]
316pub struct SubscriptionRegistry {
317    by_connection: BTreeMap<u64, BTreeMap<String, Option<String>>>,
318}
319
320impl SubscriptionRegistry {
321    /// Konstruktor.
322    #[must_use]
323    pub fn new() -> Self {
324        Self::default()
325    }
326
327    /// Subscribe.
328    pub fn subscribe(&mut self, conn_id: u64, topic: String, sub_id: Option<String>) {
329        self.by_connection
330            .entry(conn_id)
331            .or_default()
332            .insert(topic, sub_id);
333    }
334
335    /// Unsubscribe.
336    pub fn unsubscribe(&mut self, conn_id: u64, topic: &str) -> bool {
337        self.by_connection
338            .get_mut(&conn_id)
339            .map(|set| set.remove(topic).is_some())
340            .unwrap_or(false)
341    }
342
343    /// Connection schliessen — entfernt alle ihre Subscriptions.
344    pub fn drop_connection(&mut self, conn_id: u64) {
345        self.by_connection.remove(&conn_id);
346    }
347
348    /// Liste aller Connections, die einen Topic abonniert haben.
349    /// Liefert `(conn_id, optional sub_id)`-Tupel.
350    #[must_use]
351    pub fn subscribers_of(&self, topic: &str) -> Vec<(u64, Option<String>)> {
352        let mut out = Vec::new();
353        for (&cid, subs) in &self.by_connection {
354            if let Some(sub_id) = subs.get(topic) {
355                out.push((cid, sub_id.clone()));
356            }
357        }
358        out
359    }
360
361    /// Anzahl Connections.
362    #[must_use]
363    pub fn connection_count(&self) -> usize {
364        self.by_connection.len()
365    }
366
367    /// Anzahl Subscriptions ueber alle Connections.
368    #[must_use]
369    pub fn subscription_count(&self) -> usize {
370        self.by_connection.values().map(BTreeMap::len).sum()
371    }
372}
373
374#[cfg(test)]
375#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
376mod tests {
377    use super::*;
378
379    #[test]
380    fn parse_subscribe_frame() {
381        let r = parse_op(r#"{"op":"subscribe","topic":"T","id":"s1"}"#).unwrap();
382        assert!(matches!(r, BridgeOp::Subscribe { .. }));
383        if let BridgeOp::Subscribe { topic, id } = r {
384            assert_eq!(topic, "T");
385            assert_eq!(id, Some("s1".into()));
386        }
387    }
388
389    #[test]
390    fn parse_unsubscribe_frame() {
391        let r = parse_op(r#"{"op":"unsubscribe","topic":"T"}"#).unwrap();
392        assert!(matches!(r, BridgeOp::Unsubscribe { .. }));
393    }
394
395    #[test]
396    fn parse_publish_frame_with_object_data() {
397        let r = parse_op(r#"{"op":"publish","topic":"T","data":{"x":1,"y":"z"}}"#).unwrap();
398        if let BridgeOp::Publish { data, .. } = r {
399            assert_eq!(data, r#"{"x":1,"y":"z"}"#);
400        } else {
401            panic!("expected publish");
402        }
403    }
404
405    #[test]
406    fn parse_publish_with_array_data() {
407        let r = parse_op(r#"{"op":"publish","topic":"T","data":[1,2,3]}"#).unwrap();
408        if let BridgeOp::Publish { data, .. } = r {
409            assert_eq!(data, "[1,2,3]");
410        } else {
411            panic!("expected publish");
412        }
413    }
414
415    #[test]
416    fn missing_topic_rejected() {
417        assert_eq!(
418            parse_op(r#"{"op":"subscribe"}"#),
419            Err(BridgeError::MissingTopic)
420        );
421    }
422
423    #[test]
424    fn unknown_op_rejected() {
425        assert!(matches!(
426            parse_op(r#"{"op":"explode","topic":"T"}"#),
427            Err(BridgeError::UnknownOp(_))
428        ));
429    }
430
431    #[test]
432    fn missing_data_in_publish_rejected() {
433        assert_eq!(
434            parse_op(r#"{"op":"publish","topic":"T"}"#),
435            Err(BridgeError::MissingData)
436        );
437    }
438
439    #[test]
440    fn render_notification_round_trip() {
441        let n = Notification {
442            topic: "T".into(),
443            data: r#"{"x":1}"#.into(),
444            subscription_id: Some("s1".into()),
445        };
446        let s = render_notification(&n);
447        assert!(s.contains(r#""op":"notify""#));
448        assert!(s.contains(r#""topic":"T""#));
449        assert!(s.contains(r#""data":{"x":1}"#));
450        assert!(s.contains(r#""subscription_id":"s1""#));
451    }
452
453    #[test]
454    fn registry_subscribe_unsubscribe_round_trip() {
455        let mut r = SubscriptionRegistry::new();
456        r.subscribe(1, "Trade".into(), Some("s1".into()));
457        r.subscribe(2, "Trade".into(), None);
458        r.subscribe(1, "Quote".into(), None);
459        assert_eq!(r.subscription_count(), 3);
460        let subs = r.subscribers_of("Trade");
461        assert_eq!(subs.len(), 2);
462        assert!(r.unsubscribe(1, "Trade"));
463        assert_eq!(r.subscribers_of("Trade").len(), 1);
464    }
465
466    #[test]
467    fn drop_connection_removes_all_subs() {
468        let mut r = SubscriptionRegistry::new();
469        r.subscribe(1, "A".into(), None);
470        r.subscribe(1, "B".into(), None);
471        r.drop_connection(1);
472        assert_eq!(r.subscription_count(), 0);
473    }
474
475    #[test]
476    fn unsubscribe_unknown_returns_false() {
477        let mut r = SubscriptionRegistry::new();
478        assert!(!r.unsubscribe(1, "X"));
479    }
480
481    #[test]
482    fn json_escape_handles_quote_and_backslash() {
483        assert_eq!(json_escape(r#"a"b\c"#), r#"a\"b\\c"#);
484    }
485}