1use std::str::Split;
2
3use crate::mkmf::{Formula, MapKeysMatchFormula as _};
4#[allow(clippy::wildcard_imports)]
5use crate::types::*;
6
7#[derive(Debug, Clone)]
8struct RegexSubscription<C> {
9 regex: regex::Regex,
10 sub: Subscription<C>,
11}
12
13#[derive(Debug, Clone)]
14struct Subscription<C> {
15 subscribers: Set<C>,
16 subtopics: Map<String, Subscription<C>>,
17 subtopics_by_formula: Map<Formula, Subscription<C>>,
18 subtopics_by_regex: Vec<RegexSubscription<C>>,
19 subtopics_any: Option<Box<Subscription<C>>>, sub_any: Set<C>, }
22
23impl<C> Default for Subscription<C> {
24 fn default() -> Self {
25 Self {
26 subscribers: <_>::default(),
27 subtopics: <_>::default(),
28 subtopics_by_formula: <_>::default(),
29 subtopics_by_regex: <_>::default(),
30 subtopics_any: None,
31 sub_any: <_>::default(),
32 }
33 }
34}
35
36impl<C> Subscription<C> {
37 #[inline]
38 fn is_empty(&self) -> bool {
39 self.subscribers.is_empty()
40 && self.subtopics.is_empty()
41 && self.subtopics_by_formula.is_empty()
42 && self.subtopics_by_regex.is_empty()
43 && self.subtopics_any.is_none()
44 && self.sub_any.is_empty()
45 }
46}
47
48#[derive(Debug, Clone)]
49pub struct SubMap<C> {
50 subscriptions: Subscription<C>,
51 subscribed_topics: Map<C, Set<String>>,
52 subscription_count: usize,
53 separator: char,
54 formula_prefix: Option<String>,
55 regex_prefix: Option<String>,
56 match_any: Set<String>,
57 wildcard: Set<String>,
58}
59
60impl<C> Default for SubMap<C> {
61 fn default() -> Self {
62 Self {
63 subscriptions: <_>::default(),
64 subscribed_topics: <_>::default(),
65 subscription_count: 0,
66 separator: '/',
67 formula_prefix: None,
68 regex_prefix: None,
69 match_any: vec!["?".to_owned()].into_iter().collect(),
70 wildcard: vec!["*".to_owned()].into_iter().collect(),
71 }
72 }
73}
74
75impl<C> SubMap<C>
76where
77 C: Client,
78{
79 #[inline]
80 pub fn new() -> Self {
81 Self::default()
82 }
83 #[inline]
84 pub fn separator(mut self, separator: char) -> Self {
85 self.separator = separator;
86 self
87 }
88 #[inline]
89 pub fn formula_prefix(mut self, prefix: &str) -> Self {
90 self.formula_prefix = Some(prefix.to_owned());
91 self
92 }
93 #[inline]
94 pub fn regex_prefix(mut self, prefix: &str) -> Self {
95 self.regex_prefix = Some(prefix.to_owned());
96 self
97 }
98 #[inline]
99 pub fn wildcard(mut self, wildcard: &str) -> Self {
100 self.wildcard = vec![wildcard.to_owned()].into_iter().collect();
101 self
102 }
103 #[inline]
104 pub fn match_any(mut self, match_any: &str) -> Self {
105 self.match_any = vec![match_any.to_owned()].into_iter().collect();
106 self
107 }
108 #[inline]
109 pub fn wildcard_multiple(mut self, wildcard_multiple: &[&str]) -> Self {
110 self.wildcard = wildcard_multiple.iter().map(|&v| v.to_owned()).collect();
111 self
112 }
113 #[inline]
114 pub fn match_any_multiple(mut self, match_any_multiple: &[&str]) -> Self {
115 self.match_any = match_any_multiple.iter().map(|&v| v.to_owned()).collect();
116 self
117 }
118 #[inline]
119 pub fn list_clients(&self) -> Vec<C> {
120 self.subscribed_topics.keys().cloned().collect()
121 }
122 #[inline]
123 pub fn list_topics(&self, client: &C) -> Vec<&str> {
124 if let Some(topics) = self.subscribed_topics.get(client) {
125 topics.iter().map(String::as_str).collect()
126 } else {
127 Vec::new()
128 }
129 }
130 #[inline]
131 pub fn is_empty(&self) -> bool {
132 self.subscribed_topics.is_empty()
133 }
134 pub fn register_client(&mut self, client: &C) -> bool {
135 if self.subscribed_topics.contains_key(client) {
136 false
137 } else {
138 self.subscribed_topics.insert(client.clone(), Set::new());
139 true
140 }
141 }
142 pub fn unregister_client(&mut self, client: &C) -> bool {
143 if let Some(client_topics) = self.subscribed_topics.remove(client) {
144 for topic in client_topics {
145 unsubscribe_rec(
146 &mut self.subscriptions,
147 topic.split(self.separator),
148 client,
149 &self.wildcard,
150 &self.match_any,
151 self.formula_prefix.as_deref(),
152 self.regex_prefix.as_deref(),
153 );
154 self.subscription_count -= 1;
155 }
156 true
157 } else {
158 false
159 }
160 }
161 pub fn subscribe(&mut self, topic: &str, client: &C) -> bool {
162 self.subscribed_topics
163 .get_mut(client)
164 .map_or(false, |client_topics| {
165 if !client_topics.contains(topic) {
166 subscribe_rec(
167 &mut self.subscriptions,
168 topic.split(self.separator),
169 client,
170 &self.wildcard,
171 &self.match_any,
172 self.formula_prefix.as_deref(),
173 self.regex_prefix.as_deref(),
174 );
175 client_topics.insert(topic.to_owned());
176 self.subscription_count += 1;
177 }
178 true
179 })
180 }
181 pub fn unsubscribe(&mut self, topic: &str, client: &C) -> bool {
182 self.subscribed_topics
183 .get_mut(client)
184 .map_or(false, |client_topics| {
185 if client_topics.contains(topic) {
186 unsubscribe_rec(
187 &mut self.subscriptions,
188 topic.split(self.separator),
189 client,
190 &self.wildcard,
191 &self.match_any,
192 self.formula_prefix.as_deref(),
193 self.regex_prefix.as_deref(),
194 );
195 client_topics.remove(topic);
196 self.subscription_count -= 1;
197 }
198 true
199 })
200 }
201 pub fn unsubscribe_all(&mut self, client: &C) -> bool {
202 if let Some(client_topics) = self.subscribed_topics.get_mut(client) {
203 for topic in &*client_topics {
204 unsubscribe_rec(
205 &mut self.subscriptions,
206 topic.split(self.separator),
207 client,
208 &self.wildcard,
209 &self.match_any,
210 self.formula_prefix.as_deref(),
211 self.regex_prefix.as_deref(),
212 );
213 self.subscription_count -= 1;
214 }
215 client_topics.clear();
216 true
217 } else {
218 false
219 }
220 }
221 #[inline]
222 pub fn get_subscribers(&self, topic: &str) -> Set<C> {
223 let mut result = Set::new();
224 get_subscribers_rec(
225 &self.subscriptions,
226 topic.split(self.separator),
227 self.formula_prefix.as_deref(),
228 self.regex_prefix.as_deref(),
229 &mut result,
230 );
231 result
232 }
233 #[inline]
234 pub fn is_subscribed(&self, topic: &str) -> bool {
235 is_subscribed_rec(
236 &self.subscriptions,
237 self.formula_prefix.as_deref(),
238 self.regex_prefix.as_deref(),
239 topic.split(self.separator),
240 )
241 }
242 #[inline]
243 pub fn subscription_count(&self) -> usize {
244 self.subscription_count
245 }
246 #[inline]
247 pub fn client_count(&self) -> usize {
248 self.subscribed_topics.len()
249 }
250}
251
252#[allow(clippy::too_many_lines)]
253fn subscribe_rec<C>(
254 subscription: &mut Subscription<C>,
255 mut sp: Split<char>,
256 client: &C,
257 wildcard: &Set<String>,
258 match_any: &Set<String>,
259 formula_prefix: Option<&str>,
260 regex_prefix: Option<&str>,
261) where
262 C: Client,
263{
264 if let Some(topic) = sp.next() {
265 if wildcard.contains(topic) {
266 subscription.sub_any.insert(client.clone());
267 } else if match_any.contains(topic) {
268 if let Some(ref mut sub) = subscription.subtopics_any {
269 subscribe_rec(
270 sub,
271 sp,
272 client,
273 wildcard,
274 match_any,
275 formula_prefix,
276 regex_prefix,
277 );
278 } else {
279 let mut sub = Subscription::default();
280 subscribe_rec(
281 &mut sub,
282 sp,
283 client,
284 wildcard,
285 match_any,
286 formula_prefix,
287 regex_prefix,
288 );
289 subscription.subtopics_any = Some(Box::new(sub));
290 }
291 } else if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
292 let Ok(formula_parsed) = formula.parse::<Formula>() else {
293 return;
294 };
295 if let Some(sub) = subscription.subtopics_by_formula.get_mut(&formula_parsed) {
296 subscribe_rec(
297 sub,
298 sp,
299 client,
300 wildcard,
301 match_any,
302 formula_prefix,
303 regex_prefix,
304 );
305 } else {
306 let mut sub = Subscription::default();
307 subscribe_rec(
308 &mut sub,
309 sp,
310 client,
311 wildcard,
312 match_any,
313 formula_prefix,
314 regex_prefix,
315 );
316 subscription
317 .subtopics_by_formula
318 .insert(formula_parsed, sub);
319 }
320 } else if let Some(regex) = regex_prefix.and_then(|p| topic.strip_prefix(p)) {
321 if let Ok(regex) = regex::Regex::new(regex) {
322 let pos = subscription
323 .subtopics_by_regex
324 .iter()
325 .position(|rs| rs.regex.as_str() == regex.as_str());
326 if let Some(pos) = pos {
327 subscribe_rec(
328 &mut subscription.subtopics_by_regex[pos].sub,
329 sp,
330 client,
331 wildcard,
332 match_any,
333 formula_prefix,
334 regex_prefix,
335 );
336 } else {
337 let mut sub = Subscription::default();
338 subscribe_rec(
339 &mut sub,
340 sp,
341 client,
342 wildcard,
343 match_any,
344 formula_prefix,
345 regex_prefix,
346 );
347 subscription
348 .subtopics_by_regex
349 .push(RegexSubscription { regex, sub });
350 }
351 }
352 } else if let Some(sub) = subscription.subtopics.get_mut(topic) {
353 subscribe_rec(
354 sub,
355 sp,
356 client,
357 wildcard,
358 match_any,
359 formula_prefix,
360 regex_prefix,
361 );
362 } else {
363 let mut sub = Subscription::default();
364 subscribe_rec(
365 &mut sub,
366 sp,
367 client,
368 wildcard,
369 match_any,
370 formula_prefix,
371 regex_prefix,
372 );
373 subscription.subtopics.insert(topic.to_owned(), sub);
374 }
375 } else {
376 subscription.subscribers.insert(client.clone());
377 }
378}
379
380fn unsubscribe_rec<C>(
381 subscription: &mut Subscription<C>,
382 mut sp: Split<char>,
383 client: &C,
384 wildcard: &Set<String>,
385 match_any: &Set<String>,
386 formula_prefix: Option<&str>,
387 regex_prefix: Option<&str>,
388) where
389 C: Client,
390{
391 if let Some(topic) = sp.next() {
392 if wildcard.contains(topic) {
393 subscription.sub_any.remove(client);
394 } else if match_any.contains(topic) {
395 if let Some(ref mut sub) = subscription.subtopics_any {
396 unsubscribe_rec(
397 sub,
398 sp,
399 client,
400 wildcard,
401 match_any,
402 formula_prefix,
403 regex_prefix,
404 );
405 if sub.is_empty() {
406 subscription.subtopics_any = None;
407 }
408 }
409 } else if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
410 let Ok(formula_parsed) = formula.parse::<Formula>() else {
411 return;
412 };
413 if let Some(sub) = subscription.subtopics_by_formula.get_mut(&formula_parsed) {
414 unsubscribe_rec(
415 sub,
416 sp,
417 client,
418 wildcard,
419 match_any,
420 formula_prefix,
421 regex_prefix,
422 );
423 if sub.is_empty() {
424 subscription.subtopics_by_formula.remove(&formula_parsed);
425 }
426 }
427 } else if let Some(regex) = regex_prefix.and_then(|p| topic.strip_prefix(p)) {
428 if let Ok(regex) = regex::Regex::new(regex) {
429 let pos = subscription
430 .subtopics_by_regex
431 .iter()
432 .position(|rs| rs.regex.as_str() == regex.as_str());
433 if let Some(pos) = pos {
434 let sub = &mut subscription.subtopics_by_regex[pos].sub;
435 unsubscribe_rec(
436 sub,
437 sp,
438 client,
439 wildcard,
440 match_any,
441 formula_prefix,
442 regex_prefix,
443 );
444 if sub.is_empty() {
445 subscription.subtopics_by_regex.remove(pos);
446 }
447 }
448 }
449 } else if let Some(sub) = subscription.subtopics.get_mut(topic) {
450 unsubscribe_rec(
451 sub,
452 sp,
453 client,
454 wildcard,
455 match_any,
456 formula_prefix,
457 regex_prefix,
458 );
459 if sub.is_empty() {
460 subscription.subtopics.remove(topic);
461 }
462 }
463 } else {
464 subscription.subscribers.remove(client);
465 }
466}
467
468fn get_subscribers_rec<C>(
469 subscription: &Subscription<C>,
470 mut sp: Split<char>,
471 formula_prefix: Option<&str>,
472 regex_prefix: Option<&str>,
473 result: &mut Set<C>,
474) where
475 C: Client,
476{
477 if let Some(topic) = sp.next() {
478 result.extend(subscription.sub_any.clone());
479 if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
480 for sub in subscription.subtopics.values_match_key_formula(formula) {
481 get_subscribers_rec(sub, sp.clone(), formula_prefix, regex_prefix, result);
482 }
483 } else if let Some(regex) = regex_prefix.and_then(|p| topic.strip_prefix(p)) {
484 if let Ok(regex) = regex::Regex::new(regex) {
485 for (name, sub) in &subscription.subtopics {
486 if regex.is_match(name) {
487 get_subscribers_rec(sub, sp.clone(), formula_prefix, regex_prefix, result);
488 }
489 }
490 }
491 } else if let Some(sub) = subscription.subtopics.get(topic) {
492 get_subscribers_rec(sub, sp.clone(), formula_prefix, regex_prefix, result);
493 }
494 if !subscription.subtopics_by_formula.is_empty() {
495 for (formula, sub) in &subscription.subtopics_by_formula {
496 if formula.matches(topic) {
497 get_subscribers_rec(sub, sp.clone(), formula_prefix, regex_prefix, result);
498 }
499 }
500 }
501 if !subscription.subtopics_by_regex.is_empty() {
502 for rs in &subscription.subtopics_by_regex {
503 if rs.regex.is_match(topic) {
504 get_subscribers_rec(&rs.sub, sp.clone(), formula_prefix, regex_prefix, result);
505 }
506 }
507 }
508 if let Some(ref sub) = subscription.subtopics_any {
509 get_subscribers_rec(sub, sp, formula_prefix, regex_prefix, result);
510 }
511 } else {
512 result.extend(subscription.subscribers.clone());
513 }
514}
515
516fn is_subscribed_rec<C>(
517 subscription: &Subscription<C>,
518 formula_prefix: Option<&str>,
519 regex_prefix: Option<&str>,
520 mut sp: Split<char>,
521) -> bool
522where
523 C: Ord + Eq + Clone,
524{
525 if let Some(topic) = sp.next() {
526 if !subscription.sub_any.is_empty() {
527 return true;
528 }
529 if let Some(formula) = formula_prefix.and_then(|p| topic.strip_prefix(p)) {
530 for sub in subscription.subtopics.values_match_key_formula(formula) {
531 if is_subscribed_rec(sub, formula_prefix, regex_prefix, sp.clone()) {
532 return true;
533 }
534 }
535 } else if let Some(regex) = regex_prefix.and_then(|p| topic.strip_prefix(p)) {
536 if let Ok(regex) = regex::Regex::new(regex) {
537 for (name, sub) in &subscription.subtopics {
538 if regex.is_match(name)
539 && is_subscribed_rec(sub, formula_prefix, regex_prefix, sp.clone())
540 {
541 return true;
542 }
543 }
544 }
545 } else if let Some(sub) = subscription.subtopics.get(topic) {
546 if is_subscribed_rec(sub, formula_prefix, regex_prefix, sp.clone()) {
547 return true;
548 }
549 }
550 if !subscription.subtopics_by_formula.is_empty() {
551 for (formula, sub) in &subscription.subtopics_by_formula {
552 if formula.matches(topic)
553 && is_subscribed_rec(sub, formula_prefix, regex_prefix, sp.clone())
554 {
555 return true;
556 }
557 }
558 }
559 if !subscription.subtopics_by_regex.is_empty() {
560 for rs in &subscription.subtopics_by_regex {
561 if rs.regex.is_match(topic)
562 && is_subscribed_rec(&rs.sub, formula_prefix, regex_prefix, sp.clone())
563 {
564 return true;
565 }
566 }
567 }
568 if let Some(ref sub) = subscription.subtopics_any {
569 if is_subscribed_rec(sub, formula_prefix, regex_prefix, sp) {
570 return true;
571 }
572 }
573 } else if !subscription.subscribers.is_empty() {
574 return true;
575 }
576 false
577}
578
579#[cfg(test)]
580mod test {
581 use super::SubMap;
582 #[test]
583 fn test_sub() {
584 let mut smap: SubMap<String> = SubMap::new().match_any("+").wildcard("#");
585 let client1 = "test1".to_owned();
586 assert!(smap.register_client(&client1));
587 assert!(smap.subscribe("unit/tests/test1", &client1));
588 assert!(smap.subscribe("unit/tests/test2", &client1));
589 assert!(smap.subscribe("unit/tests/test3", &client1));
590 assert!(smap.unregister_client(&client1));
591 let client2 = "test2".to_owned();
592 assert!(smap.register_client(&client2));
593 assert!(smap.subscribe("unit/+/test2", &client2));
594 assert!(smap.subscribe("unit/zzz/test2", &client2));
595 assert!(smap.unsubscribe("unit/zzz/test2", &client2));
596 let client3 = "test3".to_owned();
597 assert!(smap.register_client(&client3));
598 assert!(smap.subscribe("unit/+/+/+", &client3));
599 assert!(smap.unsubscribe("unit/+/+/+", &client3));
600 let client4 = "test4".to_owned();
601 assert!(smap.register_client(&client4));
602 assert!(smap.subscribe("unit/#", &client4));
603 let subs = smap.get_subscribers("unit/tests/test2");
604 assert_eq!(subs.len(), 2);
605 assert!(subs.contains(&client2));
606 assert!(subs.contains(&client4));
607 let subs = smap.get_subscribers("unit/tests");
608 assert_eq!(subs.len(), 1);
609 assert!(smap.subscribe("#", &client4));
610 let subs = smap.get_subscribers("unit");
611 assert_eq!(subs.len(), 1);
612 assert!(subs.contains(&client4));
613 assert!(smap.unsubscribe("#", &client4));
614 let subs = smap.get_subscribers("unit");
615 assert_eq!(subs.len(), 0);
616 smap.unregister_client(&client1);
617 smap.unregister_client(&client2);
618 smap.unregister_client(&client3);
619 smap.unregister_client(&client4);
620 assert!(smap.register_client(&client1));
621 assert!(smap.register_client(&client2));
622 assert!(smap.subscribe("unit/tests/#", &client1));
623 assert!(smap.subscribe("unit/+/#", &client2));
624 let subs = smap.get_subscribers("unit");
625 assert_eq!(subs.len(), 0);
626 let subs = smap.get_subscribers("unit/tests");
627 assert_eq!(subs.len(), 0);
628 let subs = smap.get_subscribers("unit/tests/xxx");
629 assert!(subs.contains(&client1));
630 assert!(subs.contains(&client2));
631 assert_eq!(subs.len(), 2);
632 let subs = smap.get_subscribers("unit/tests/xxx/yyy");
633 assert_eq!(subs.len(), 2);
634 assert!(subs.contains(&client1));
635 assert!(subs.contains(&client2));
636 assert!(smap.subscribe("unit/#", &client1));
637 let subs = smap.get_subscribers("unit");
638 assert_eq!(subs.len(), 0);
639 let subs = smap.get_subscribers("unit/tests");
640 assert_eq!(subs.len(), 1);
641 assert!(smap.subscribe("#", &client1));
642 let subs = smap.get_subscribers("unit");
643 assert_eq!(subs.len(), 1);
644 let subs = smap.get_subscribers("unit/tests");
645 assert_eq!(subs.len(), 1);
646 smap.unregister_client(&client1);
647 smap.unregister_client(&client2);
648 assert!(smap.subscriptions.is_empty());
649 }
650 #[test]
651 fn test_match_any() {
652 let mut smap: SubMap<String> = SubMap::new().match_any("+").wildcard("#");
653 let client1 = "client1".to_owned();
654 smap.register_client(&client1);
655 assert_eq!(smap.get_subscribers("abc/xxx").len(), 0);
656 smap.subscribe("+/xxx", &client1);
657 assert_eq!(smap.get_subscribers("abc/xxx").len(), 1);
658 assert_eq!(smap.get_subscribers("unix/zzz/xxx/222").len(), 0);
659 smap.subscribe("+/zzz/+/222", &client1);
660 assert_eq!(smap.get_subscribers("unix/zzz/xxx/222").len(), 1);
661 }
662 #[test]
663 fn test_match_formula() {
664 let mut smap: SubMap<String> = SubMap::new()
665 .match_any("+")
666 .wildcard("#")
667 .formula_prefix("!");
668 let client1 = "client1".to_owned();
669 smap.register_client(&client1);
670 assert_eq!(smap.get_subscribers("1/xxx").len(), 0);
671 smap.subscribe("!ge(2)/xxx", &client1);
672 assert_eq!(smap.get_subscribers("1/xxx").len(), 0);
673 assert_eq!(smap.get_subscribers("2/xxx").len(), 1);
674 assert_eq!(smap.get_subscribers("3/xxx").len(), 1);
675 assert_eq!(smap.get_subscribers("unix/zzz/95/222").len(), 0);
676 assert_eq!(smap.get_subscribers("unix/zzz/96/222").len(), 0);
677 assert_eq!(smap.get_subscribers("unix/zzz/97/222").len(), 0);
678 smap.subscribe("+/zzz/!ge(96)/222", &client1);
679 assert_eq!(smap.get_subscribers("unix/zzz/95/222").len(), 0);
680 assert_eq!(smap.get_subscribers("unix/zzz/96/222").len(), 1);
681 assert_eq!(smap.get_subscribers("unix/zzz/97/222").len(), 1);
682 }
683 #[test]
684 fn test_match_regex() {
685 let mut smap: SubMap<String> = SubMap::new().match_any("+").wildcard("#").regex_prefix("~");
686 let client1 = "client1".to_owned();
687 smap.register_client(&client1);
688 assert_eq!(smap.get_subscribers("test1/xxx").len(), 0);
689 smap.subscribe("~^test\\d+$/xxx", &client1);
690 assert_eq!(smap.get_subscribers("test1/xxx").len(), 1);
691 assert_eq!(smap.get_subscribers("test2/xxx").len(), 1);
692 assert_eq!(smap.get_subscribers("test3333/xxx").len(), 1);
693 assert_eq!(smap.get_subscribers("test3333a/xxx").len(), 0);
694 let client2 = "client2".to_owned();
695 smap.register_client(&client2);
696 smap.subscribe("~^test\\d+$/xxx", &client2);
697 assert_eq!(smap.get_subscribers("test1/xxx").len(), 2);
698 assert_eq!(smap.get_subscribers("test2/xxx").len(), 2);
699 assert_eq!(smap.get_subscribers("test3333/xxx").len(), 2);
700 assert_eq!(smap.get_subscribers("test3333a/xxx").len(), 0);
701 smap.unsubscribe("~^test\\d+$/xxx", &client1);
702 assert_eq!(smap.get_subscribers("test1/xxx").len(), 1);
703 smap.unsubscribe("~^test\\d+$/xxx", &client2);
704 assert_eq!(smap.get_subscribers("test1/xxx").len(), 0);
705 }
706}