mqtt_v5_fork/
topic.rs

1use crate::{
2    MAX_TOPIC_LEN_BYTES, MULTI_LEVEL_WILDCARD, MULTI_LEVEL_WILDCARD_STR,
3    SHARED_SUBSCRIPTION_PREFIX, SINGLE_LEVEL_WILDCARD, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR,
4};
5use std::str::FromStr;
6
7/// A filter for subscribers to indicate which topics they want
8/// to receive messages from. Can contain wildcards.
9/// Shared topic filter example: $share/group_name_a/home/kitchen/temperature
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub enum TopicFilter {
12    Concrete { filter: String, level_count: u32 },
13    Wildcard { filter: String, level_count: u32 },
14    SharedConcrete { group_name: String, filter: String, level_count: u32 },
15    SharedWildcard { group_name: String, filter: String, level_count: u32 },
16}
17
18impl TopicFilter {
19    pub fn new_concrete(filter: String) -> Self {
20        TopicFilter::Concrete { filter, level_count: 1 }
21    }
22}
23
24/// A topic name publishers use when sending MQTT messages.
25/// Cannot contain wildcards.
26///
27/// # Example
28/// ```rust
29/// use std::str::FromStr;
30/// use mqtt_v5_fork::topic::Topic;
31/// let topic = Topic::from_str("my_topic").unwrap();
32/// ```
33#[derive(Debug, Default, Clone, PartialEq, Eq)]
34pub struct Topic {
35    topic_name: String,
36    level_count: u32,
37}
38
39impl Topic {
40    pub fn topic_name(&self) -> &str {
41        &self.topic_name
42    }
43}
44
45#[derive(Debug, PartialEq, Eq)]
46pub enum TopicLevel<'a> {
47    Concrete(&'a str),
48    SingleLevelWildcard,
49    MultiLevelWildcard,
50}
51
52impl<'a> TopicLevel<'a> {
53    pub fn has_leading_dollar(&self) -> bool {
54        match self {
55            TopicLevel::Concrete(level_str) => level_str.starts_with('$'),
56            TopicLevel::SingleLevelWildcard | TopicLevel::MultiLevelWildcard => false,
57        }
58    }
59}
60
61#[derive(Debug, PartialEq, Eq)]
62pub enum TopicParseError {
63    EmptyTopic,
64    TopicTooLong,
65    MultilevelWildcardNotAtEnd,
66    InvalidWildcardLevel,
67    InvalidSharedGroupName,
68    EmptySharedGroupName,
69    WildcardOrNullInTopic,
70    ReservedTopicBeginning,
71}
72
73impl std::fmt::Display for Topic {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "{}", self.topic_name)
76    }
77}
78
79impl std::fmt::Display for TopicFilter {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self {
82            TopicFilter::Concrete { filter, .. } | TopicFilter::Wildcard { filter, .. } => {
83                write!(f, "{}", filter)
84            },
85            TopicFilter::SharedConcrete { group_name, filter, .. }
86            | TopicFilter::SharedWildcard { group_name, filter, .. } => {
87                write!(f, "{}{}/{}", SHARED_SUBSCRIPTION_PREFIX, group_name, filter)
88            },
89        }
90    }
91}
92
93/// If Ok, returns (level_count, contains_wildcards).
94fn process_filter(filter: &str) -> Result<(u32, bool), TopicParseError> {
95    let mut level_count = 0;
96    let mut contains_wildcards = false;
97    for level in filter.split(TOPIC_SEPARATOR) {
98        let level_contains_wildcard =
99            level.contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD);
100        if level_contains_wildcard {
101            // Any wildcards on a particular level must be specified on their own
102            if level.len() > 1 {
103                return Err(TopicParseError::InvalidWildcardLevel);
104            }
105
106            contains_wildcards = true;
107        }
108
109        level_count += 1;
110    }
111
112    Ok((level_count, contains_wildcards))
113}
114
115impl FromStr for TopicFilter {
116    type Err = TopicParseError;
117
118    fn from_str(filter: &str) -> Result<Self, Self::Err> {
119        // Filters and topics cannot be empty
120        if filter.is_empty() {
121            return Err(TopicParseError::EmptyTopic);
122        }
123
124        // Assert no null character U+0000
125        if filter.contains('\0') {
126            return Err(TopicParseError::WildcardOrNullInTopic);
127        }
128
129        // Filters cannot exceed the byte length in the MQTT spec
130        if filter.len() > MAX_TOPIC_LEN_BYTES {
131            return Err(TopicParseError::TopicTooLong);
132        }
133
134        // Multi-level wildcards can only be at the end of the topic
135        if let Some(pos) = filter.rfind(MULTI_LEVEL_WILDCARD) {
136            if pos != filter.len() - 1 {
137                return Err(TopicParseError::MultilevelWildcardNotAtEnd);
138            }
139        }
140
141        let mut shared_group = None;
142
143        if let Some(filter_rest) = filter.strip_prefix(SHARED_SUBSCRIPTION_PREFIX) {
144            if filter_rest.is_empty() {
145                return Err(TopicParseError::EmptySharedGroupName);
146            }
147
148            if let Some(slash_pos) = filter_rest.find(TOPIC_SEPARATOR) {
149                let shared_name = &filter_rest[0..slash_pos];
150
151                // slash_pos+1 is safe here, we've already validated the string
152                // has a nonzero length.
153                let shared_filter = &filter_rest[(slash_pos + 1)..];
154
155                if shared_name.is_empty() {
156                    return Err(TopicParseError::EmptySharedGroupName);
157                }
158
159                if shared_name
160                    .contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD)
161                {
162                    return Err(TopicParseError::InvalidSharedGroupName);
163                }
164
165                if shared_filter.is_empty() {
166                    return Err(TopicParseError::EmptyTopic);
167                }
168
169                shared_group = Some((shared_name, shared_filter))
170            } else {
171                return Err(TopicParseError::EmptyTopic);
172            }
173        }
174
175        let topic_filter = if let Some((group_name, shared_filter)) = shared_group {
176            let (level_count, contains_wildcards) = process_filter(shared_filter)?;
177
178            if contains_wildcards {
179                TopicFilter::SharedWildcard {
180                    group_name: group_name.to_string(),
181                    filter: shared_filter.to_string(),
182                    level_count,
183                }
184            } else {
185                TopicFilter::SharedConcrete {
186                    group_name: group_name.to_string(),
187                    filter: shared_filter.to_string(),
188                    level_count,
189                }
190            }
191        } else {
192            let (level_count, contains_wildcards) = process_filter(filter)?;
193
194            if contains_wildcards {
195                TopicFilter::Wildcard { filter: filter.to_string(), level_count }
196            } else {
197                TopicFilter::Concrete { filter: filter.to_string(), level_count }
198            }
199        };
200
201        Ok(topic_filter)
202    }
203}
204
205impl FromStr for Topic {
206    type Err = TopicParseError;
207
208    fn from_str(topic: &str) -> Result<Self, Self::Err> {
209        // Topics cannot be empty
210        if topic.is_empty() {
211            return Err(TopicParseError::EmptyTopic);
212        }
213        if topic.starts_with('$') {
214            return Err(TopicParseError::ReservedTopicBeginning);
215        }
216
217        // Topics cannot exceed the byte length in the MQTT spec
218        if topic.len() > MAX_TOPIC_LEN_BYTES {
219            return Err(TopicParseError::TopicTooLong);
220        }
221
222        // Topics cannot contain wildcards or null characters
223        if topic.contains(|x: char| {
224            x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD || x == '\0'
225        }) {
226            return Err(TopicParseError::WildcardOrNullInTopic);
227        }
228
229        let level_count = topic.split(TOPIC_SEPARATOR).count() as u32;
230
231        let topic = Topic { topic_name: topic.to_string(), level_count };
232
233        Ok(topic)
234    }
235}
236
237pub struct TopicLevels<'a> {
238    levels_iter: std::str::Split<'a, char>,
239}
240
241impl<'a> TopicFilter {
242    fn filter(&'a self) -> &'a str {
243        match self {
244            TopicFilter::Concrete { filter, .. } => filter,
245            TopicFilter::Wildcard { filter, .. } => filter,
246            TopicFilter::SharedConcrete { filter, .. } => filter,
247            TopicFilter::SharedWildcard { filter, .. } => filter,
248        }
249    }
250
251    pub fn levels(&'a self) -> TopicLevels<'a> {
252        TopicLevels { levels_iter: self.filter().split(TOPIC_SEPARATOR) }
253    }
254}
255
256impl<'a> Topic {
257    pub fn levels(&'a self) -> TopicLevels<'a> {
258        TopicLevels { levels_iter: self.topic_name.split(TOPIC_SEPARATOR) }
259    }
260}
261
262impl<'a> Iterator for TopicLevels<'a> {
263    type Item = TopicLevel<'a>;
264
265    fn next(&mut self) -> Option<Self::Item> {
266        match self.levels_iter.next() {
267            Some(MULTI_LEVEL_WILDCARD_STR) => Some(TopicLevel::MultiLevelWildcard),
268            Some(SINGLE_LEVEL_WILDCARD_STR) => Some(TopicLevel::SingleLevelWildcard),
269            Some(level) => Some(TopicLevel::Concrete(level)),
270            None => None,
271        }
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use crate::topic::{Topic, TopicFilter, TopicLevel, TopicParseError, MAX_TOPIC_LEN_BYTES};
278
279    #[test]
280    fn test_topic_filter_parse_empty_topic() {
281        assert_eq!("".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
282    }
283
284    #[test]
285    fn test_topic_filter_parse_length() {
286        let just_right_topic = "a".repeat(MAX_TOPIC_LEN_BYTES);
287        assert!(just_right_topic.parse::<TopicFilter>().is_ok());
288
289        let too_long_topic = "a".repeat(MAX_TOPIC_LEN_BYTES + 1);
290        assert_eq!(
291            too_long_topic.parse::<TopicFilter>().unwrap_err(),
292            TopicParseError::TopicTooLong
293        );
294    }
295
296    #[test]
297    fn test_topic_filter_parse_concrete() {
298        assert_eq!(
299            "/".parse::<TopicFilter>().unwrap(),
300            TopicFilter::Concrete { filter: "/".to_string(), level_count: 2 }
301        );
302
303        assert_eq!(
304            "a".parse::<TopicFilter>().unwrap(),
305            TopicFilter::Concrete { filter: "a".to_string(), level_count: 1 }
306        );
307
308        // $SYS topics can be subscribed to, but can't be published
309        assert_eq!(
310            "home/kitchen".parse::<TopicFilter>().unwrap(),
311            TopicFilter::Concrete { filter: "home/kitchen".to_string(), level_count: 2 }
312        );
313
314        assert_eq!(
315            "home/kitchen/temperature".parse::<TopicFilter>().unwrap(),
316            TopicFilter::Concrete {
317                filter: "home/kitchen/temperature".to_string(),
318                level_count: 3,
319            }
320        );
321
322        assert_eq!(
323            "home/kitchen/temperature/celsius".parse::<TopicFilter>().unwrap(),
324            TopicFilter::Concrete {
325                filter: "home/kitchen/temperature/celsius".to_string(),
326                level_count: 4,
327            }
328        );
329    }
330
331    #[test]
332    fn test_topic_filter_parse_single_level_wildcard() {
333        assert_eq!(
334            "+".parse::<TopicFilter>().unwrap(),
335            TopicFilter::Wildcard { filter: "+".to_string(), level_count: 1 }
336        );
337
338        assert_eq!(
339            "+/".parse::<TopicFilter>().unwrap(),
340            TopicFilter::Wildcard { filter: "+/".to_string(), level_count: 2 }
341        );
342
343        assert_eq!(
344            "sport/+".parse::<TopicFilter>().unwrap(),
345            TopicFilter::Wildcard { filter: "sport/+".to_string(), level_count: 2 }
346        );
347
348        assert_eq!(
349            "/+".parse::<TopicFilter>().unwrap(),
350            TopicFilter::Wildcard { filter: "/+".to_string(), level_count: 2 }
351        );
352    }
353
354    #[test]
355    fn test_topic_filter_parse_multi_level_wildcard() {
356        assert_eq!(
357            "#".parse::<TopicFilter>().unwrap(),
358            TopicFilter::Wildcard { filter: "#".to_string(), level_count: 1 }
359        );
360
361        assert_eq!(
362            "#/".parse::<TopicFilter>().unwrap_err(),
363            TopicParseError::MultilevelWildcardNotAtEnd
364        );
365
366        assert_eq!(
367            "/#".parse::<TopicFilter>().unwrap(),
368            TopicFilter::Wildcard { filter: "/#".to_string(), level_count: 2 }
369        );
370
371        assert_eq!(
372            "sport/#".parse::<TopicFilter>().unwrap(),
373            TopicFilter::Wildcard { filter: "sport/#".to_string(), level_count: 2 }
374        );
375
376        assert_eq!(
377            "home/kitchen/temperature/#".parse::<TopicFilter>().unwrap(),
378            TopicFilter::Wildcard {
379                filter: "home/kitchen/temperature/#".to_string(),
380                level_count: 4,
381            }
382        );
383    }
384
385    #[test]
386    fn test_topic_filter_parse_shared_subscription_concrete() {
387        assert_eq!(
388            "$share/group_a/home".parse::<TopicFilter>().unwrap(),
389            TopicFilter::SharedConcrete {
390                group_name: "group_a".to_string(),
391                filter: "home".to_string(),
392                level_count: 1,
393            }
394        );
395
396        assert_eq!(
397            "$share/group_a/home/kitchen/temperature".parse::<TopicFilter>().unwrap(),
398            TopicFilter::SharedConcrete {
399                group_name: "group_a".to_string(),
400                filter: "home/kitchen/temperature".to_string(),
401                level_count: 3,
402            }
403        );
404
405        assert_eq!(
406            "$share/group_a//".parse::<TopicFilter>().unwrap(),
407            TopicFilter::SharedConcrete {
408                group_name: "group_a".to_string(),
409                filter: "/".to_string(),
410                level_count: 2,
411            }
412        );
413    }
414
415    #[test]
416    fn test_topic_filter_parse_shared_subscription_wildcard() {
417        assert_eq!(
418            "$share/group_b/#".parse::<TopicFilter>().unwrap(),
419            TopicFilter::SharedWildcard {
420                group_name: "group_b".to_string(),
421                filter: "#".to_string(),
422                level_count: 1,
423            }
424        );
425
426        assert_eq!(
427            "$share/group_b/+".parse::<TopicFilter>().unwrap(),
428            TopicFilter::SharedWildcard {
429                group_name: "group_b".to_string(),
430                filter: "+".to_string(),
431                level_count: 1,
432            }
433        );
434
435        assert_eq!(
436            "$share/group_b/+/temperature".parse::<TopicFilter>().unwrap(),
437            TopicFilter::SharedWildcard {
438                group_name: "group_b".to_string(),
439                filter: "+/temperature".to_string(),
440                level_count: 2,
441            }
442        );
443
444        assert_eq!(
445            "$share/group_c/+/temperature/+/meta".parse::<TopicFilter>().unwrap(),
446            TopicFilter::SharedWildcard {
447                group_name: "group_c".to_string(),
448                filter: "+/temperature/+/meta".to_string(),
449                level_count: 4,
450            }
451        );
452    }
453
454    #[test]
455    fn test_topic_filter_parse_invalid_shared_subscription() {
456        assert_eq!(
457            "$share/".parse::<TopicFilter>().unwrap_err(),
458            TopicParseError::EmptySharedGroupName
459        );
460        assert_eq!("$share/a".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
461        assert_eq!("$share/a/".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
462        assert_eq!(
463            "$share//".parse::<TopicFilter>().unwrap_err(),
464            TopicParseError::EmptySharedGroupName
465        );
466        assert_eq!(
467            "$share///".parse::<TopicFilter>().unwrap_err(),
468            TopicParseError::EmptySharedGroupName
469        );
470
471        assert_eq!(
472            "$share/invalid_group#/#".parse::<TopicFilter>().unwrap_err(),
473            TopicParseError::InvalidSharedGroupName
474        );
475    }
476
477    #[test]
478    fn test_topic_filter_parse_sys_prefix() {
479        assert_eq!(
480            "$SYS/stats".parse::<TopicFilter>().unwrap(),
481            TopicFilter::Concrete { filter: "$SYS/stats".to_string(), level_count: 2 }
482        );
483
484        assert_eq!(
485            "/$SYS/stats".parse::<TopicFilter>().unwrap(),
486            TopicFilter::Concrete { filter: "/$SYS/stats".to_string(), level_count: 3 }
487        );
488
489        assert_eq!(
490            "$SYS/+".parse::<TopicFilter>().unwrap(),
491            TopicFilter::Wildcard { filter: "$SYS/+".to_string(), level_count: 2 }
492        );
493
494        assert_eq!(
495            "$SYS/#".parse::<TopicFilter>().unwrap(),
496            TopicFilter::Wildcard { filter: "$SYS/#".to_string(), level_count: 2 }
497        );
498    }
499
500    #[test]
501    fn test_topic_filter_parse_invalid_filters() {
502        assert_eq!(
503            "sport/#/stats".parse::<TopicFilter>().unwrap_err(),
504            TopicParseError::MultilevelWildcardNotAtEnd
505        );
506        assert_eq!(
507            "sport/#/stats#".parse::<TopicFilter>().unwrap_err(),
508            TopicParseError::InvalidWildcardLevel
509        );
510        assert_eq!(
511            "sport#/stats#".parse::<TopicFilter>().unwrap_err(),
512            TopicParseError::InvalidWildcardLevel
513        );
514        assert_eq!(
515            "sport/tennis#".parse::<TopicFilter>().unwrap_err(),
516            TopicParseError::InvalidWildcardLevel
517        );
518        assert_eq!(
519            "sport/++".parse::<TopicFilter>().unwrap_err(),
520            TopicParseError::InvalidWildcardLevel
521        );
522    }
523
524    #[test]
525    fn test_topic_name_success() {
526        assert_eq!(
527            "/".parse::<Topic>().unwrap(),
528            Topic { topic_name: "/".to_string(), level_count: 2 }
529        );
530
531        assert_eq!(
532            "Accounts payable".parse::<Topic>().unwrap(),
533            Topic { topic_name: "Accounts payable".to_string(), level_count: 1 }
534        );
535
536        assert_eq!(
537            "home/kitchen".parse::<Topic>().unwrap(),
538            Topic { topic_name: "home/kitchen".to_string(), level_count: 2 }
539        );
540
541        assert_eq!(
542            "home/kitchen/temperature".parse::<Topic>().unwrap(),
543            Topic { topic_name: "home/kitchen/temperature".to_string(), level_count: 3 }
544        );
545    }
546
547    #[test]
548    fn test_topic_name_failure() {
549        assert_eq!("#".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
550
551        assert_eq!("+".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
552
553        assert_eq!("\0".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
554
555        assert_eq!(
556            "/multi/level/#".parse::<Topic>().unwrap_err(),
557            TopicParseError::WildcardOrNullInTopic,
558        );
559
560        assert_eq!(
561            "/single/level/+".parse::<Topic>().unwrap_err(),
562            TopicParseError::WildcardOrNullInTopic,
563        );
564
565        assert_eq!(
566            "/null/byte/\0".parse::<Topic>().unwrap_err(),
567            TopicParseError::WildcardOrNullInTopic,
568        );
569    }
570
571    #[test]
572    fn test_topic_filter_level_iterator_simple() {
573        let filter: TopicFilter = "/".parse().unwrap();
574
575        let mut levels = filter.levels();
576
577        assert_eq!(levels.next(), Some(TopicLevel::Concrete("")));
578        assert_eq!(levels.next(), Some(TopicLevel::Concrete("")));
579        assert_eq!(levels.next(), None);
580    }
581
582    #[test]
583    fn test_topic_filter_level_iterator_concrete() {
584        let filter: TopicFilter = "home/kitchen/temperature".parse().unwrap();
585
586        let mut levels = filter.levels();
587
588        assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
589        assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen")));
590        assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature")));
591        assert_eq!(levels.next(), None);
592    }
593
594    #[test]
595    fn test_topic_filter_level_iterator_single_level_wildcard_1() {
596        let filter: TopicFilter = "home/+/+/temperature/+".parse().unwrap();
597
598        let mut levels = filter.levels();
599
600        assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
601        assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
602        assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
603        assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature")));
604        assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
605        assert_eq!(levels.next(), None);
606    }
607
608    #[test]
609    fn test_topic_filter_level_iterator_single_level_wildcard_2() {
610        let filter: TopicFilter = "+".parse().unwrap();
611
612        let mut levels = filter.levels();
613
614        assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
615        assert_eq!(levels.next(), None);
616    }
617
618    #[test]
619    fn test_topic_filter_level_iterator_mutli_level_wildcard_1() {
620        let filter: TopicFilter = "home/kitchen/#".parse().unwrap();
621
622        let mut levels = filter.levels();
623
624        assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
625        assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen")));
626        assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard));
627        assert_eq!(levels.next(), None);
628    }
629
630    #[test]
631    fn test_topic_filter_level_iterator_mutli_level_wildcard_2() {
632        let filter: TopicFilter = "#".parse().unwrap();
633
634        let mut levels = filter.levels();
635
636        assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard));
637        assert_eq!(levels.next(), None);
638    }
639}