zerodds_websocket_bridge/
dds_bridge.rs1use alloc::collections::BTreeMap;
29use alloc::string::{String, ToString};
30use alloc::vec::Vec;
31
32#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum BridgeOp {
35 Subscribe {
37 topic: String,
39 id: Option<String>,
41 },
42 Unsubscribe {
44 topic: String,
46 id: Option<String>,
48 },
49 Publish {
51 topic: String,
53 data: String,
55 },
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct Notification {
61 pub topic: String,
63 pub data: String,
65 pub subscription_id: Option<String>,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum BridgeError {
72 BadJson(String),
74 UnknownOp(String),
76 MissingTopic,
78 MissingData,
80}
81
82impl core::fmt::Display for BridgeError {
83 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
84 match self {
85 Self::BadJson(s) => write!(f, "bad json: {s}"),
86 Self::UnknownOp(s) => write!(f, "unknown op: {s}"),
87 Self::MissingTopic => f.write_str("missing topic"),
88 Self::MissingData => f.write_str("missing data"),
89 }
90 }
91}
92
93#[cfg(feature = "std")]
94impl std::error::Error for BridgeError {}
95
96fn parse_top_level_object(input: &str) -> Result<BTreeMap<String, String>, BridgeError> {
100 let s = input.trim();
101 let s = s
102 .strip_prefix('{')
103 .ok_or_else(|| BridgeError::BadJson("expected `{`".into()))?;
104 let s = s
105 .strip_suffix('}')
106 .ok_or_else(|| BridgeError::BadJson("expected `}`".into()))?;
107 let mut out = BTreeMap::new();
108 let mut chars = s.char_indices().peekable();
109 while let Some((_, c)) = chars.peek().copied() {
110 if c.is_whitespace() || c == ',' {
111 chars.next();
112 continue;
113 }
114 let key = parse_json_string(&mut chars, s)?;
115 skip_ws_to(&mut chars);
117 match chars.next() {
118 Some((_, ':')) => {}
119 _ => return Err(BridgeError::BadJson("expected `:`".into())),
120 }
121 skip_ws_to(&mut chars);
122 let value = parse_json_value(&mut chars, s)?;
123 out.insert(key, value);
124 }
125 Ok(out)
126}
127
128fn skip_ws_to(chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>) {
129 while let Some((_, c)) = chars.peek().copied() {
130 if c.is_whitespace() {
131 chars.next();
132 } else {
133 break;
134 }
135 }
136}
137
138fn parse_json_string(
139 chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
140 src: &str,
141) -> Result<String, BridgeError> {
142 skip_ws_to(chars);
143 match chars.next() {
144 Some((_, '"')) => {}
145 _ => return Err(BridgeError::BadJson("expected `\"`".into())),
146 }
147 let start = chars
148 .peek()
149 .map(|(i, _)| *i)
150 .ok_or_else(|| BridgeError::BadJson("eof".into()))?;
151 let mut end = start;
152 while let Some((i, c)) = chars.next() {
153 if c == '"' {
154 return Ok(src[start..i].to_string());
155 }
156 if c == '\\' {
157 chars.next(); }
159 end = i + c.len_utf8();
160 }
161 let _ = end;
162 Err(BridgeError::BadJson("unterminated string".into()))
163}
164
165fn parse_json_value(
166 chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
167 src: &str,
168) -> Result<String, BridgeError> {
169 skip_ws_to(chars);
170 match chars.peek().map(|(_, c)| *c) {
171 Some('"') => parse_json_string(chars, src),
172 Some('{') => parse_json_object_to_string(chars, src),
173 Some('[') => parse_json_array_to_string(chars, src),
174 _ => parse_json_scalar(chars, src),
175 }
176}
177
178fn parse_json_object_to_string(
179 chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
180 src: &str,
181) -> Result<String, BridgeError> {
182 let start = chars
183 .peek()
184 .map(|(i, _)| *i)
185 .ok_or_else(|| BridgeError::BadJson("eof".into()))?;
186 let mut depth = 0i32;
187 while let Some((i, c)) = chars.next() {
188 match c {
189 '{' => depth += 1,
190 '}' => {
191 depth -= 1;
192 if depth == 0 {
193 return Ok(src[start..i + 1].to_string());
194 }
195 }
196 '"' => {
197 while let Some((_, sc)) = chars.next() {
199 if sc == '"' {
200 break;
201 }
202 if sc == '\\' {
203 chars.next();
204 }
205 }
206 }
207 _ => {}
208 }
209 }
210 Err(BridgeError::BadJson("unterminated object".into()))
211}
212
213fn parse_json_array_to_string(
214 chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
215 src: &str,
216) -> Result<String, BridgeError> {
217 let start = chars
218 .peek()
219 .map(|(i, _)| *i)
220 .ok_or_else(|| BridgeError::BadJson("eof".into()))?;
221 let mut depth = 0i32;
222 while let Some((i, c)) = chars.next() {
223 match c {
224 '[' => depth += 1,
225 ']' => {
226 depth -= 1;
227 if depth == 0 {
228 return Ok(src[start..i + 1].to_string());
229 }
230 }
231 '"' => {
232 while let Some((_, sc)) = chars.next() {
233 if sc == '"' {
234 break;
235 }
236 if sc == '\\' {
237 chars.next();
238 }
239 }
240 }
241 _ => {}
242 }
243 }
244 Err(BridgeError::BadJson("unterminated array".into()))
245}
246
247fn parse_json_scalar(
248 chars: &mut core::iter::Peekable<core::str::CharIndices<'_>>,
249 src: &str,
250) -> Result<String, BridgeError> {
251 let start = chars
252 .peek()
253 .map(|(i, _)| *i)
254 .ok_or_else(|| BridgeError::BadJson("eof".into()))?;
255 let mut end = start;
256 while let Some((i, c)) = chars.peek().copied() {
257 if c == ',' || c == '}' || c.is_whitespace() {
258 break;
259 }
260 end = i + c.len_utf8();
261 chars.next();
262 }
263 Ok(src[start..end].to_string())
264}
265
266pub fn parse_op(text: &str) -> Result<BridgeOp, BridgeError> {
271 let map = parse_top_level_object(text)?;
272 let op = map
273 .get("op")
274 .ok_or_else(|| BridgeError::UnknownOp("(missing)".into()))?;
275 let topic = map
276 .get("topic")
277 .filter(|s| !s.is_empty())
278 .ok_or(BridgeError::MissingTopic)?
279 .clone();
280 let id = map.get("id").cloned();
281 match op.as_str() {
282 "subscribe" => Ok(BridgeOp::Subscribe { topic, id }),
283 "unsubscribe" => Ok(BridgeOp::Unsubscribe { topic, id }),
284 "publish" => {
285 let data = map.get("data").ok_or(BridgeError::MissingData)?.clone();
286 Ok(BridgeOp::Publish { topic, data })
287 }
288 other => Err(BridgeError::UnknownOp(other.to_string())),
289 }
290}
291
292#[must_use]
294pub fn render_notification(n: &Notification) -> String {
295 let mut s = alloc::format!(
296 "{{\"op\":\"notify\",\"topic\":\"{}\",\"data\":{}",
297 json_escape(&n.topic),
298 n.data
299 );
300 if let Some(id) = &n.subscription_id {
301 s.push_str(&alloc::format!(
302 ",\"subscription_id\":\"{}\"",
303 json_escape(id)
304 ));
305 }
306 s.push('}');
307 s
308}
309
310fn json_escape(s: &str) -> String {
311 s.replace('\\', "\\\\").replace('"', "\\\"")
312}
313
314#[derive(Debug, Clone, PartialEq, Eq, Default)]
316pub struct SubscriptionRegistry {
317 by_connection: BTreeMap<u64, BTreeMap<String, Option<String>>>,
318}
319
320impl SubscriptionRegistry {
321 #[must_use]
323 pub fn new() -> Self {
324 Self::default()
325 }
326
327 pub fn subscribe(&mut self, conn_id: u64, topic: String, sub_id: Option<String>) {
329 self.by_connection
330 .entry(conn_id)
331 .or_default()
332 .insert(topic, sub_id);
333 }
334
335 pub fn unsubscribe(&mut self, conn_id: u64, topic: &str) -> bool {
337 self.by_connection
338 .get_mut(&conn_id)
339 .map(|set| set.remove(topic).is_some())
340 .unwrap_or(false)
341 }
342
343 pub fn drop_connection(&mut self, conn_id: u64) {
345 self.by_connection.remove(&conn_id);
346 }
347
348 #[must_use]
351 pub fn subscribers_of(&self, topic: &str) -> Vec<(u64, Option<String>)> {
352 let mut out = Vec::new();
353 for (&cid, subs) in &self.by_connection {
354 if let Some(sub_id) = subs.get(topic) {
355 out.push((cid, sub_id.clone()));
356 }
357 }
358 out
359 }
360
361 #[must_use]
363 pub fn connection_count(&self) -> usize {
364 self.by_connection.len()
365 }
366
367 #[must_use]
369 pub fn subscription_count(&self) -> usize {
370 self.by_connection.values().map(BTreeMap::len).sum()
371 }
372}
373
374#[cfg(test)]
375#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
376mod tests {
377 use super::*;
378
379 #[test]
380 fn parse_subscribe_frame() {
381 let r = parse_op(r#"{"op":"subscribe","topic":"T","id":"s1"}"#).unwrap();
382 assert!(matches!(r, BridgeOp::Subscribe { .. }));
383 if let BridgeOp::Subscribe { topic, id } = r {
384 assert_eq!(topic, "T");
385 assert_eq!(id, Some("s1".into()));
386 }
387 }
388
389 #[test]
390 fn parse_unsubscribe_frame() {
391 let r = parse_op(r#"{"op":"unsubscribe","topic":"T"}"#).unwrap();
392 assert!(matches!(r, BridgeOp::Unsubscribe { .. }));
393 }
394
395 #[test]
396 fn parse_publish_frame_with_object_data() {
397 let r = parse_op(r#"{"op":"publish","topic":"T","data":{"x":1,"y":"z"}}"#).unwrap();
398 if let BridgeOp::Publish { data, .. } = r {
399 assert_eq!(data, r#"{"x":1,"y":"z"}"#);
400 } else {
401 panic!("expected publish");
402 }
403 }
404
405 #[test]
406 fn parse_publish_with_array_data() {
407 let r = parse_op(r#"{"op":"publish","topic":"T","data":[1,2,3]}"#).unwrap();
408 if let BridgeOp::Publish { data, .. } = r {
409 assert_eq!(data, "[1,2,3]");
410 } else {
411 panic!("expected publish");
412 }
413 }
414
415 #[test]
416 fn missing_topic_rejected() {
417 assert_eq!(
418 parse_op(r#"{"op":"subscribe"}"#),
419 Err(BridgeError::MissingTopic)
420 );
421 }
422
423 #[test]
424 fn unknown_op_rejected() {
425 assert!(matches!(
426 parse_op(r#"{"op":"explode","topic":"T"}"#),
427 Err(BridgeError::UnknownOp(_))
428 ));
429 }
430
431 #[test]
432 fn missing_data_in_publish_rejected() {
433 assert_eq!(
434 parse_op(r#"{"op":"publish","topic":"T"}"#),
435 Err(BridgeError::MissingData)
436 );
437 }
438
439 #[test]
440 fn render_notification_round_trip() {
441 let n = Notification {
442 topic: "T".into(),
443 data: r#"{"x":1}"#.into(),
444 subscription_id: Some("s1".into()),
445 };
446 let s = render_notification(&n);
447 assert!(s.contains(r#""op":"notify""#));
448 assert!(s.contains(r#""topic":"T""#));
449 assert!(s.contains(r#""data":{"x":1}"#));
450 assert!(s.contains(r#""subscription_id":"s1""#));
451 }
452
453 #[test]
454 fn registry_subscribe_unsubscribe_round_trip() {
455 let mut r = SubscriptionRegistry::new();
456 r.subscribe(1, "Trade".into(), Some("s1".into()));
457 r.subscribe(2, "Trade".into(), None);
458 r.subscribe(1, "Quote".into(), None);
459 assert_eq!(r.subscription_count(), 3);
460 let subs = r.subscribers_of("Trade");
461 assert_eq!(subs.len(), 2);
462 assert!(r.unsubscribe(1, "Trade"));
463 assert_eq!(r.subscribers_of("Trade").len(), 1);
464 }
465
466 #[test]
467 fn drop_connection_removes_all_subs() {
468 let mut r = SubscriptionRegistry::new();
469 r.subscribe(1, "A".into(), None);
470 r.subscribe(1, "B".into(), None);
471 r.drop_connection(1);
472 assert_eq!(r.subscription_count(), 0);
473 }
474
475 #[test]
476 fn unsubscribe_unknown_returns_false() {
477 let mut r = SubscriptionRegistry::new();
478 assert!(!r.unsubscribe(1, "X"));
479 }
480
481 #[test]
482 fn json_escape_handles_quote_and_backslash() {
483 assert_eq!(json_escape(r#"a"b\c"#), r#"a\"b\\c"#);
484 }
485}