1use crate::{
2 MAX_TOPIC_LEN_BYTES, MULTI_LEVEL_WILDCARD, MULTI_LEVEL_WILDCARD_STR,
3 SHARED_SUBSCRIPTION_PREFIX, SINGLE_LEVEL_WILDCARD, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR,
4};
5use std::str::FromStr;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
11pub enum TopicFilter {
12 Concrete { filter: String, level_count: u32 },
13 Wildcard { filter: String, level_count: u32 },
14 SharedConcrete { group_name: String, filter: String, level_count: u32 },
15 SharedWildcard { group_name: String, filter: String, level_count: u32 },
16}
17
18impl TopicFilter {
19 pub fn new_concrete(filter: String) -> Self {
20 TopicFilter::Concrete { filter, level_count: 1 }
21 }
22}
23
24#[derive(Debug, Default, Clone, PartialEq, Eq)]
34pub struct Topic {
35 topic_name: String,
36 level_count: u32,
37}
38
39impl Topic {
40 pub fn topic_name(&self) -> &str {
41 &self.topic_name
42 }
43}
44
45#[derive(Debug, PartialEq, Eq)]
46pub enum TopicLevel<'a> {
47 Concrete(&'a str),
48 SingleLevelWildcard,
49 MultiLevelWildcard,
50}
51
52impl<'a> TopicLevel<'a> {
53 pub fn has_leading_dollar(&self) -> bool {
54 match self {
55 TopicLevel::Concrete(level_str) => level_str.starts_with('$'),
56 TopicLevel::SingleLevelWildcard | TopicLevel::MultiLevelWildcard => false,
57 }
58 }
59}
60
61#[derive(Debug, PartialEq, Eq)]
62pub enum TopicParseError {
63 EmptyTopic,
64 TopicTooLong,
65 MultilevelWildcardNotAtEnd,
66 InvalidWildcardLevel,
67 InvalidSharedGroupName,
68 EmptySharedGroupName,
69 WildcardOrNullInTopic,
70 ReservedTopicBeginning,
71}
72
73impl std::fmt::Display for Topic {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "{}", self.topic_name)
76 }
77}
78
79impl std::fmt::Display for TopicFilter {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 TopicFilter::Concrete { filter, .. } | TopicFilter::Wildcard { filter, .. } => {
83 write!(f, "{}", filter)
84 },
85 TopicFilter::SharedConcrete { group_name, filter, .. }
86 | TopicFilter::SharedWildcard { group_name, filter, .. } => {
87 write!(f, "{}{}/{}", SHARED_SUBSCRIPTION_PREFIX, group_name, filter)
88 },
89 }
90 }
91}
92
93fn process_filter(filter: &str) -> Result<(u32, bool), TopicParseError> {
95 let mut level_count = 0;
96 let mut contains_wildcards = false;
97 for level in filter.split(TOPIC_SEPARATOR) {
98 let level_contains_wildcard =
99 level.contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD);
100 if level_contains_wildcard {
101 if level.len() > 1 {
103 return Err(TopicParseError::InvalidWildcardLevel);
104 }
105
106 contains_wildcards = true;
107 }
108
109 level_count += 1;
110 }
111
112 Ok((level_count, contains_wildcards))
113}
114
115impl FromStr for TopicFilter {
116 type Err = TopicParseError;
117
118 fn from_str(filter: &str) -> Result<Self, Self::Err> {
119 if filter.is_empty() {
121 return Err(TopicParseError::EmptyTopic);
122 }
123
124 if filter.contains('\0') {
126 return Err(TopicParseError::WildcardOrNullInTopic);
127 }
128
129 if filter.len() > MAX_TOPIC_LEN_BYTES {
131 return Err(TopicParseError::TopicTooLong);
132 }
133
134 if let Some(pos) = filter.rfind(MULTI_LEVEL_WILDCARD) {
136 if pos != filter.len() - 1 {
137 return Err(TopicParseError::MultilevelWildcardNotAtEnd);
138 }
139 }
140
141 let mut shared_group = None;
142
143 if let Some(filter_rest) = filter.strip_prefix(SHARED_SUBSCRIPTION_PREFIX) {
144 if filter_rest.is_empty() {
145 return Err(TopicParseError::EmptySharedGroupName);
146 }
147
148 if let Some(slash_pos) = filter_rest.find(TOPIC_SEPARATOR) {
149 let shared_name = &filter_rest[0..slash_pos];
150
151 let shared_filter = &filter_rest[(slash_pos + 1)..];
154
155 if shared_name.is_empty() {
156 return Err(TopicParseError::EmptySharedGroupName);
157 }
158
159 if shared_name
160 .contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD)
161 {
162 return Err(TopicParseError::InvalidSharedGroupName);
163 }
164
165 if shared_filter.is_empty() {
166 return Err(TopicParseError::EmptyTopic);
167 }
168
169 shared_group = Some((shared_name, shared_filter))
170 } else {
171 return Err(TopicParseError::EmptyTopic);
172 }
173 }
174
175 let topic_filter = if let Some((group_name, shared_filter)) = shared_group {
176 let (level_count, contains_wildcards) = process_filter(shared_filter)?;
177
178 if contains_wildcards {
179 TopicFilter::SharedWildcard {
180 group_name: group_name.to_string(),
181 filter: shared_filter.to_string(),
182 level_count,
183 }
184 } else {
185 TopicFilter::SharedConcrete {
186 group_name: group_name.to_string(),
187 filter: shared_filter.to_string(),
188 level_count,
189 }
190 }
191 } else {
192 let (level_count, contains_wildcards) = process_filter(filter)?;
193
194 if contains_wildcards {
195 TopicFilter::Wildcard { filter: filter.to_string(), level_count }
196 } else {
197 TopicFilter::Concrete { filter: filter.to_string(), level_count }
198 }
199 };
200
201 Ok(topic_filter)
202 }
203}
204
205impl FromStr for Topic {
206 type Err = TopicParseError;
207
208 fn from_str(topic: &str) -> Result<Self, Self::Err> {
209 if topic.is_empty() {
211 return Err(TopicParseError::EmptyTopic);
212 }
213 if topic.starts_with('$') {
214 return Err(TopicParseError::ReservedTopicBeginning);
215 }
216
217 if topic.len() > MAX_TOPIC_LEN_BYTES {
219 return Err(TopicParseError::TopicTooLong);
220 }
221
222 if topic.contains(|x: char| {
224 x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD || x == '\0'
225 }) {
226 return Err(TopicParseError::WildcardOrNullInTopic);
227 }
228
229 let level_count = topic.split(TOPIC_SEPARATOR).count() as u32;
230
231 let topic = Topic { topic_name: topic.to_string(), level_count };
232
233 Ok(topic)
234 }
235}
236
237pub struct TopicLevels<'a> {
238 levels_iter: std::str::Split<'a, char>,
239}
240
241impl<'a> TopicFilter {
242 fn filter(&'a self) -> &'a str {
243 match self {
244 TopicFilter::Concrete { filter, .. } => filter,
245 TopicFilter::Wildcard { filter, .. } => filter,
246 TopicFilter::SharedConcrete { filter, .. } => filter,
247 TopicFilter::SharedWildcard { filter, .. } => filter,
248 }
249 }
250
251 pub fn levels(&'a self) -> TopicLevels<'a> {
252 TopicLevels { levels_iter: self.filter().split(TOPIC_SEPARATOR) }
253 }
254}
255
256impl<'a> Topic {
257 pub fn levels(&'a self) -> TopicLevels<'a> {
258 TopicLevels { levels_iter: self.topic_name.split(TOPIC_SEPARATOR) }
259 }
260}
261
262impl<'a> Iterator for TopicLevels<'a> {
263 type Item = TopicLevel<'a>;
264
265 fn next(&mut self) -> Option<Self::Item> {
266 match self.levels_iter.next() {
267 Some(MULTI_LEVEL_WILDCARD_STR) => Some(TopicLevel::MultiLevelWildcard),
268 Some(SINGLE_LEVEL_WILDCARD_STR) => Some(TopicLevel::SingleLevelWildcard),
269 Some(level) => Some(TopicLevel::Concrete(level)),
270 None => None,
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use crate::topic::{Topic, TopicFilter, TopicLevel, TopicParseError, MAX_TOPIC_LEN_BYTES};
278
279 #[test]
280 fn test_topic_filter_parse_empty_topic() {
281 assert_eq!("".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
282 }
283
284 #[test]
285 fn test_topic_filter_parse_length() {
286 let just_right_topic = "a".repeat(MAX_TOPIC_LEN_BYTES);
287 assert!(just_right_topic.parse::<TopicFilter>().is_ok());
288
289 let too_long_topic = "a".repeat(MAX_TOPIC_LEN_BYTES + 1);
290 assert_eq!(
291 too_long_topic.parse::<TopicFilter>().unwrap_err(),
292 TopicParseError::TopicTooLong
293 );
294 }
295
296 #[test]
297 fn test_topic_filter_parse_concrete() {
298 assert_eq!(
299 "/".parse::<TopicFilter>().unwrap(),
300 TopicFilter::Concrete { filter: "/".to_string(), level_count: 2 }
301 );
302
303 assert_eq!(
304 "a".parse::<TopicFilter>().unwrap(),
305 TopicFilter::Concrete { filter: "a".to_string(), level_count: 1 }
306 );
307
308 assert_eq!(
310 "home/kitchen".parse::<TopicFilter>().unwrap(),
311 TopicFilter::Concrete { filter: "home/kitchen".to_string(), level_count: 2 }
312 );
313
314 assert_eq!(
315 "home/kitchen/temperature".parse::<TopicFilter>().unwrap(),
316 TopicFilter::Concrete {
317 filter: "home/kitchen/temperature".to_string(),
318 level_count: 3,
319 }
320 );
321
322 assert_eq!(
323 "home/kitchen/temperature/celsius".parse::<TopicFilter>().unwrap(),
324 TopicFilter::Concrete {
325 filter: "home/kitchen/temperature/celsius".to_string(),
326 level_count: 4,
327 }
328 );
329 }
330
331 #[test]
332 fn test_topic_filter_parse_single_level_wildcard() {
333 assert_eq!(
334 "+".parse::<TopicFilter>().unwrap(),
335 TopicFilter::Wildcard { filter: "+".to_string(), level_count: 1 }
336 );
337
338 assert_eq!(
339 "+/".parse::<TopicFilter>().unwrap(),
340 TopicFilter::Wildcard { filter: "+/".to_string(), level_count: 2 }
341 );
342
343 assert_eq!(
344 "sport/+".parse::<TopicFilter>().unwrap(),
345 TopicFilter::Wildcard { filter: "sport/+".to_string(), level_count: 2 }
346 );
347
348 assert_eq!(
349 "/+".parse::<TopicFilter>().unwrap(),
350 TopicFilter::Wildcard { filter: "/+".to_string(), level_count: 2 }
351 );
352 }
353
354 #[test]
355 fn test_topic_filter_parse_multi_level_wildcard() {
356 assert_eq!(
357 "#".parse::<TopicFilter>().unwrap(),
358 TopicFilter::Wildcard { filter: "#".to_string(), level_count: 1 }
359 );
360
361 assert_eq!(
362 "#/".parse::<TopicFilter>().unwrap_err(),
363 TopicParseError::MultilevelWildcardNotAtEnd
364 );
365
366 assert_eq!(
367 "/#".parse::<TopicFilter>().unwrap(),
368 TopicFilter::Wildcard { filter: "/#".to_string(), level_count: 2 }
369 );
370
371 assert_eq!(
372 "sport/#".parse::<TopicFilter>().unwrap(),
373 TopicFilter::Wildcard { filter: "sport/#".to_string(), level_count: 2 }
374 );
375
376 assert_eq!(
377 "home/kitchen/temperature/#".parse::<TopicFilter>().unwrap(),
378 TopicFilter::Wildcard {
379 filter: "home/kitchen/temperature/#".to_string(),
380 level_count: 4,
381 }
382 );
383 }
384
385 #[test]
386 fn test_topic_filter_parse_shared_subscription_concrete() {
387 assert_eq!(
388 "$share/group_a/home".parse::<TopicFilter>().unwrap(),
389 TopicFilter::SharedConcrete {
390 group_name: "group_a".to_string(),
391 filter: "home".to_string(),
392 level_count: 1,
393 }
394 );
395
396 assert_eq!(
397 "$share/group_a/home/kitchen/temperature".parse::<TopicFilter>().unwrap(),
398 TopicFilter::SharedConcrete {
399 group_name: "group_a".to_string(),
400 filter: "home/kitchen/temperature".to_string(),
401 level_count: 3,
402 }
403 );
404
405 assert_eq!(
406 "$share/group_a//".parse::<TopicFilter>().unwrap(),
407 TopicFilter::SharedConcrete {
408 group_name: "group_a".to_string(),
409 filter: "/".to_string(),
410 level_count: 2,
411 }
412 );
413 }
414
415 #[test]
416 fn test_topic_filter_parse_shared_subscription_wildcard() {
417 assert_eq!(
418 "$share/group_b/#".parse::<TopicFilter>().unwrap(),
419 TopicFilter::SharedWildcard {
420 group_name: "group_b".to_string(),
421 filter: "#".to_string(),
422 level_count: 1,
423 }
424 );
425
426 assert_eq!(
427 "$share/group_b/+".parse::<TopicFilter>().unwrap(),
428 TopicFilter::SharedWildcard {
429 group_name: "group_b".to_string(),
430 filter: "+".to_string(),
431 level_count: 1,
432 }
433 );
434
435 assert_eq!(
436 "$share/group_b/+/temperature".parse::<TopicFilter>().unwrap(),
437 TopicFilter::SharedWildcard {
438 group_name: "group_b".to_string(),
439 filter: "+/temperature".to_string(),
440 level_count: 2,
441 }
442 );
443
444 assert_eq!(
445 "$share/group_c/+/temperature/+/meta".parse::<TopicFilter>().unwrap(),
446 TopicFilter::SharedWildcard {
447 group_name: "group_c".to_string(),
448 filter: "+/temperature/+/meta".to_string(),
449 level_count: 4,
450 }
451 );
452 }
453
454 #[test]
455 fn test_topic_filter_parse_invalid_shared_subscription() {
456 assert_eq!(
457 "$share/".parse::<TopicFilter>().unwrap_err(),
458 TopicParseError::EmptySharedGroupName
459 );
460 assert_eq!("$share/a".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
461 assert_eq!("$share/a/".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
462 assert_eq!(
463 "$share//".parse::<TopicFilter>().unwrap_err(),
464 TopicParseError::EmptySharedGroupName
465 );
466 assert_eq!(
467 "$share///".parse::<TopicFilter>().unwrap_err(),
468 TopicParseError::EmptySharedGroupName
469 );
470
471 assert_eq!(
472 "$share/invalid_group#/#".parse::<TopicFilter>().unwrap_err(),
473 TopicParseError::InvalidSharedGroupName
474 );
475 }
476
477 #[test]
478 fn test_topic_filter_parse_sys_prefix() {
479 assert_eq!(
480 "$SYS/stats".parse::<TopicFilter>().unwrap(),
481 TopicFilter::Concrete { filter: "$SYS/stats".to_string(), level_count: 2 }
482 );
483
484 assert_eq!(
485 "/$SYS/stats".parse::<TopicFilter>().unwrap(),
486 TopicFilter::Concrete { filter: "/$SYS/stats".to_string(), level_count: 3 }
487 );
488
489 assert_eq!(
490 "$SYS/+".parse::<TopicFilter>().unwrap(),
491 TopicFilter::Wildcard { filter: "$SYS/+".to_string(), level_count: 2 }
492 );
493
494 assert_eq!(
495 "$SYS/#".parse::<TopicFilter>().unwrap(),
496 TopicFilter::Wildcard { filter: "$SYS/#".to_string(), level_count: 2 }
497 );
498 }
499
500 #[test]
501 fn test_topic_filter_parse_invalid_filters() {
502 assert_eq!(
503 "sport/#/stats".parse::<TopicFilter>().unwrap_err(),
504 TopicParseError::MultilevelWildcardNotAtEnd
505 );
506 assert_eq!(
507 "sport/#/stats#".parse::<TopicFilter>().unwrap_err(),
508 TopicParseError::InvalidWildcardLevel
509 );
510 assert_eq!(
511 "sport#/stats#".parse::<TopicFilter>().unwrap_err(),
512 TopicParseError::InvalidWildcardLevel
513 );
514 assert_eq!(
515 "sport/tennis#".parse::<TopicFilter>().unwrap_err(),
516 TopicParseError::InvalidWildcardLevel
517 );
518 assert_eq!(
519 "sport/++".parse::<TopicFilter>().unwrap_err(),
520 TopicParseError::InvalidWildcardLevel
521 );
522 }
523
524 #[test]
525 fn test_topic_name_success() {
526 assert_eq!(
527 "/".parse::<Topic>().unwrap(),
528 Topic { topic_name: "/".to_string(), level_count: 2 }
529 );
530
531 assert_eq!(
532 "Accounts payable".parse::<Topic>().unwrap(),
533 Topic { topic_name: "Accounts payable".to_string(), level_count: 1 }
534 );
535
536 assert_eq!(
537 "home/kitchen".parse::<Topic>().unwrap(),
538 Topic { topic_name: "home/kitchen".to_string(), level_count: 2 }
539 );
540
541 assert_eq!(
542 "home/kitchen/temperature".parse::<Topic>().unwrap(),
543 Topic { topic_name: "home/kitchen/temperature".to_string(), level_count: 3 }
544 );
545 }
546
547 #[test]
548 fn test_topic_name_failure() {
549 assert_eq!("#".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
550
551 assert_eq!("+".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
552
553 assert_eq!("\0".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
554
555 assert_eq!(
556 "/multi/level/#".parse::<Topic>().unwrap_err(),
557 TopicParseError::WildcardOrNullInTopic,
558 );
559
560 assert_eq!(
561 "/single/level/+".parse::<Topic>().unwrap_err(),
562 TopicParseError::WildcardOrNullInTopic,
563 );
564
565 assert_eq!(
566 "/null/byte/\0".parse::<Topic>().unwrap_err(),
567 TopicParseError::WildcardOrNullInTopic,
568 );
569 }
570
571 #[test]
572 fn test_topic_filter_level_iterator_simple() {
573 let filter: TopicFilter = "/".parse().unwrap();
574
575 let mut levels = filter.levels();
576
577 assert_eq!(levels.next(), Some(TopicLevel::Concrete("")));
578 assert_eq!(levels.next(), Some(TopicLevel::Concrete("")));
579 assert_eq!(levels.next(), None);
580 }
581
582 #[test]
583 fn test_topic_filter_level_iterator_concrete() {
584 let filter: TopicFilter = "home/kitchen/temperature".parse().unwrap();
585
586 let mut levels = filter.levels();
587
588 assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
589 assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen")));
590 assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature")));
591 assert_eq!(levels.next(), None);
592 }
593
594 #[test]
595 fn test_topic_filter_level_iterator_single_level_wildcard_1() {
596 let filter: TopicFilter = "home/+/+/temperature/+".parse().unwrap();
597
598 let mut levels = filter.levels();
599
600 assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
601 assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
602 assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
603 assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature")));
604 assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
605 assert_eq!(levels.next(), None);
606 }
607
608 #[test]
609 fn test_topic_filter_level_iterator_single_level_wildcard_2() {
610 let filter: TopicFilter = "+".parse().unwrap();
611
612 let mut levels = filter.levels();
613
614 assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
615 assert_eq!(levels.next(), None);
616 }
617
618 #[test]
619 fn test_topic_filter_level_iterator_mutli_level_wildcard_1() {
620 let filter: TopicFilter = "home/kitchen/#".parse().unwrap();
621
622 let mut levels = filter.levels();
623
624 assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
625 assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen")));
626 assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard));
627 assert_eq!(levels.next(), None);
628 }
629
630 #[test]
631 fn test_topic_filter_level_iterator_mutli_level_wildcard_2() {
632 let filter: TopicFilter = "#".parse().unwrap();
633
634 let mut levels = filter.levels();
635
636 assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard));
637 assert_eq!(levels.next(), None);
638 }
639}