mqtt_v5/
topic.rs

1use crate::{
2    MAX_TOPIC_LEN_BYTES, MULTI_LEVEL_WILDCARD, MULTI_LEVEL_WILDCARD_STR,
3    SHARED_SUBSCRIPTION_PREFIX, SINGLE_LEVEL_WILDCARD, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR,
4};
5use std::str::FromStr;
6
7/// A filter for subscribers to indicate which topics they want
8/// to receive messages from. Can contain wildcards.
9/// Shared topic filter example: $share/group_name_a/home/kitchen/temperature
10#[derive(Debug, Clone, PartialEq)]
11pub enum TopicFilter {
12    Concrete { filter: String, level_count: u32 },
13    Wildcard { filter: String, level_count: u32 },
14    SharedConcrete { group_name: String, filter: String, level_count: u32 },
15    SharedWildcard { group_name: String, filter: String, level_count: u32 },
16}
17
18/// A topic name publishers use when sending MQTT messages.
19/// Cannot contain wildcards.
20#[derive(Debug, Clone, PartialEq)]
21pub struct Topic {
22    topic_name: String,
23    level_count: u32,
24}
25
26impl Topic {
27    pub fn topic_name(&self) -> &str {
28        &self.topic_name
29    }
30}
31
32#[derive(Debug, PartialEq)]
33pub enum TopicLevel<'a> {
34    Concrete(&'a str),
35    SingleLevelWildcard,
36    MultiLevelWildcard,
37}
38
39#[derive(Debug, PartialEq)]
40pub enum TopicParseError {
41    EmptyTopic,
42    TopicTooLong,
43    MultilevelWildcardNotAtEnd,
44    InvalidWildcardLevel,
45    InvalidSharedGroupName,
46    EmptySharedGroupName,
47    WildcardOrNullInTopic,
48}
49
50impl std::fmt::Display for Topic {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "{}", self.topic_name)
53    }
54}
55
56impl std::fmt::Display for TopicFilter {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        match self {
59            TopicFilter::Concrete { filter, .. } | TopicFilter::Wildcard { filter, .. } => {
60                write!(f, "{}", filter)
61            },
62            TopicFilter::SharedConcrete { group_name, filter, .. }
63            | TopicFilter::SharedWildcard { group_name, filter, .. } => {
64                write!(f, "{}{}/{}", SHARED_SUBSCRIPTION_PREFIX, group_name, filter)
65            },
66        }
67    }
68}
69
70/// If Ok, returns (level_count, contains_wildcards).
71fn process_filter(filter: &str) -> Result<(u32, bool), TopicParseError> {
72    let mut level_count = 0;
73    let mut contains_wildcards = false;
74    for level in filter.split(TOPIC_SEPARATOR) {
75        let level_contains_wildcard =
76            level.contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD);
77        if level_contains_wildcard {
78            // Any wildcards on a particular level must be specified on their own
79            if level.len() > 1 {
80                return Err(TopicParseError::InvalidWildcardLevel);
81            }
82
83            contains_wildcards = true;
84        }
85
86        level_count += 1;
87    }
88
89    Ok((level_count, contains_wildcards))
90}
91
92impl FromStr for TopicFilter {
93    type Err = TopicParseError;
94
95    fn from_str(filter: &str) -> Result<Self, Self::Err> {
96        // Filters and topics cannot be empty
97        if filter.is_empty() {
98            return Err(TopicParseError::EmptyTopic);
99        }
100
101        // TODO - assert no null character U+0000
102        if filter.contains('\0') {
103            return Err(TopicParseError::WildcardOrNullInTopic);
104        }
105
106        // Filters cannot exceed the byte length in the MQTT spec
107        if filter.len() > MAX_TOPIC_LEN_BYTES {
108            return Err(TopicParseError::TopicTooLong);
109        }
110
111        // Multi-level wildcards can only be at the end of the topic
112        if let Some(pos) = filter.rfind(MULTI_LEVEL_WILDCARD) {
113            if pos != filter.len() - 1 {
114                return Err(TopicParseError::MultilevelWildcardNotAtEnd);
115            }
116        }
117
118        let mut shared_group = None;
119
120        if filter.starts_with(SHARED_SUBSCRIPTION_PREFIX) {
121            let filter_rest = &filter[SHARED_SUBSCRIPTION_PREFIX.len()..];
122
123            if filter_rest.is_empty() {
124                return Err(TopicParseError::EmptySharedGroupName);
125            }
126
127            if let Some(slash_pos) = filter_rest.find(TOPIC_SEPARATOR) {
128                let shared_name = &filter_rest[0..slash_pos];
129
130                // slash_pos+1 is safe here, we've already validated the string
131                // has a nonzero length.
132                let shared_filter = &filter_rest[(slash_pos + 1)..];
133
134                if shared_name.is_empty() {
135                    return Err(TopicParseError::EmptySharedGroupName);
136                }
137
138                if shared_name
139                    .contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD)
140                {
141                    return Err(TopicParseError::InvalidSharedGroupName);
142                }
143
144                if shared_filter.is_empty() {
145                    return Err(TopicParseError::EmptyTopic);
146                }
147
148                shared_group = Some((shared_name, shared_filter))
149            } else {
150                return Err(TopicParseError::EmptyTopic);
151            }
152        }
153
154        let topic_filter = if let Some((group_name, shared_filter)) = shared_group {
155            let (level_count, contains_wildcards) = process_filter(shared_filter)?;
156
157            if contains_wildcards {
158                TopicFilter::SharedWildcard {
159                    group_name: group_name.to_string(),
160                    filter: shared_filter.to_string(),
161                    level_count,
162                }
163            } else {
164                TopicFilter::SharedConcrete {
165                    group_name: group_name.to_string(),
166                    filter: shared_filter.to_string(),
167                    level_count,
168                }
169            }
170        } else {
171            let (level_count, contains_wildcards) = process_filter(filter)?;
172
173            if contains_wildcards {
174                TopicFilter::Wildcard { filter: filter.to_string(), level_count }
175            } else {
176                TopicFilter::Concrete { filter: filter.to_string(), level_count }
177            }
178        };
179
180        Ok(topic_filter)
181    }
182}
183
184impl FromStr for Topic {
185    type Err = TopicParseError;
186
187    fn from_str(topic: &str) -> Result<Self, Self::Err> {
188        // TODO - Consider disallowing leading $ characters
189
190        // Topics cannot be empty
191        if topic.is_empty() {
192            return Err(TopicParseError::EmptyTopic);
193        }
194
195        // Topics cannot exceed the byte length in the MQTT spec
196        if topic.len() > MAX_TOPIC_LEN_BYTES {
197            return Err(TopicParseError::TopicTooLong);
198        }
199
200        // Topics cannot contain wildcards or null characters
201        let topic_contains_wildcards = topic.contains(|x: char| {
202            x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD || x == '\0'
203        });
204
205        if topic_contains_wildcards {
206            return Err(TopicParseError::WildcardOrNullInTopic);
207        }
208
209        let level_count = topic.split(TOPIC_SEPARATOR).count() as u32;
210
211        let topic = Topic { topic_name: topic.to_string(), level_count };
212
213        Ok(topic)
214    }
215}
216
217pub struct TopicLevels<'a> {
218    levels_iter: std::str::Split<'a, char>,
219}
220
221impl<'a> TopicFilter {
222    fn filter(&'a self) -> &'a str {
223        match self {
224            TopicFilter::Concrete { filter, .. } => filter,
225            TopicFilter::Wildcard { filter, .. } => filter,
226            TopicFilter::SharedConcrete { filter, .. } => filter,
227            TopicFilter::SharedWildcard { filter, .. } => filter,
228        }
229    }
230
231    pub fn levels(&'a self) -> TopicLevels<'a> {
232        TopicLevels { levels_iter: self.filter().split(TOPIC_SEPARATOR) }
233    }
234}
235
236impl<'a> Topic {
237    pub fn levels(&'a self) -> TopicLevels<'a> {
238        TopicLevels { levels_iter: self.topic_name.split(TOPIC_SEPARATOR) }
239    }
240}
241
242impl<'a> Iterator for TopicLevels<'a> {
243    type Item = TopicLevel<'a>;
244
245    fn next(&mut self) -> Option<Self::Item> {
246        match self.levels_iter.next() {
247            Some(MULTI_LEVEL_WILDCARD_STR) => Some(TopicLevel::MultiLevelWildcard),
248            Some(SINGLE_LEVEL_WILDCARD_STR) => Some(TopicLevel::SingleLevelWildcard),
249            Some(level) => Some(TopicLevel::Concrete(level)),
250            None => None,
251        }
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use crate::topic::{Topic, TopicFilter, TopicLevel, TopicParseError, MAX_TOPIC_LEN_BYTES};
258
259    #[test]
260    fn test_topic_filter_parse_empty_topic() {
261        assert_eq!("".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
262    }
263
264    #[test]
265    fn test_topic_filter_parse_length() {
266        let just_right_topic = "a".repeat(MAX_TOPIC_LEN_BYTES);
267        assert!(just_right_topic.parse::<TopicFilter>().is_ok());
268
269        let too_long_topic = "a".repeat(MAX_TOPIC_LEN_BYTES + 1);
270        assert_eq!(
271            too_long_topic.parse::<TopicFilter>().unwrap_err(),
272            TopicParseError::TopicTooLong
273        );
274    }
275
276    #[test]
277    fn test_topic_filter_parse_concrete() {
278        assert_eq!(
279            "/".parse::<TopicFilter>().unwrap(),
280            TopicFilter::Concrete { filter: "/".to_string(), level_count: 2 }
281        );
282
283        assert_eq!(
284            "a".parse::<TopicFilter>().unwrap(),
285            TopicFilter::Concrete { filter: "a".to_string(), level_count: 1 }
286        );
287
288        // $SYS topics can be subscribed to, but can't be published
289        assert_eq!(
290            "home/kitchen".parse::<TopicFilter>().unwrap(),
291            TopicFilter::Concrete { filter: "home/kitchen".to_string(), level_count: 2 }
292        );
293
294        assert_eq!(
295            "home/kitchen/temperature".parse::<TopicFilter>().unwrap(),
296            TopicFilter::Concrete {
297                filter: "home/kitchen/temperature".to_string(),
298                level_count: 3,
299            }
300        );
301
302        assert_eq!(
303            "home/kitchen/temperature/celsius".parse::<TopicFilter>().unwrap(),
304            TopicFilter::Concrete {
305                filter: "home/kitchen/temperature/celsius".to_string(),
306                level_count: 4,
307            }
308        );
309    }
310
311    #[test]
312    fn test_topic_filter_parse_single_level_wildcard() {
313        assert_eq!(
314            "+".parse::<TopicFilter>().unwrap(),
315            TopicFilter::Wildcard { filter: "+".to_string(), level_count: 1 }
316        );
317
318        assert_eq!(
319            "+/".parse::<TopicFilter>().unwrap(),
320            TopicFilter::Wildcard { filter: "+/".to_string(), level_count: 2 }
321        );
322
323        assert_eq!(
324            "sport/+".parse::<TopicFilter>().unwrap(),
325            TopicFilter::Wildcard { filter: "sport/+".to_string(), level_count: 2 }
326        );
327
328        assert_eq!(
329            "/+".parse::<TopicFilter>().unwrap(),
330            TopicFilter::Wildcard { filter: "/+".to_string(), level_count: 2 }
331        );
332    }
333
334    #[test]
335    fn test_topic_filter_parse_multi_level_wildcard() {
336        assert_eq!(
337            "#".parse::<TopicFilter>().unwrap(),
338            TopicFilter::Wildcard { filter: "#".to_string(), level_count: 1 }
339        );
340
341        assert_eq!(
342            "#/".parse::<TopicFilter>().unwrap_err(),
343            TopicParseError::MultilevelWildcardNotAtEnd
344        );
345
346        assert_eq!(
347            "/#".parse::<TopicFilter>().unwrap(),
348            TopicFilter::Wildcard { filter: "/#".to_string(), level_count: 2 }
349        );
350
351        assert_eq!(
352            "sport/#".parse::<TopicFilter>().unwrap(),
353            TopicFilter::Wildcard { filter: "sport/#".to_string(), level_count: 2 }
354        );
355
356        assert_eq!(
357            "home/kitchen/temperature/#".parse::<TopicFilter>().unwrap(),
358            TopicFilter::Wildcard {
359                filter: "home/kitchen/temperature/#".to_string(),
360                level_count: 4,
361            }
362        );
363    }
364
365    #[test]
366    fn test_topic_filter_parse_shared_subscription_concrete() {
367        assert_eq!(
368            "$share/group_a/home".parse::<TopicFilter>().unwrap(),
369            TopicFilter::SharedConcrete {
370                group_name: "group_a".to_string(),
371                filter: "home".to_string(),
372                level_count: 1,
373            }
374        );
375
376        assert_eq!(
377            "$share/group_a/home/kitchen/temperature".parse::<TopicFilter>().unwrap(),
378            TopicFilter::SharedConcrete {
379                group_name: "group_a".to_string(),
380                filter: "home/kitchen/temperature".to_string(),
381                level_count: 3,
382            }
383        );
384
385        assert_eq!(
386            "$share/group_a//".parse::<TopicFilter>().unwrap(),
387            TopicFilter::SharedConcrete {
388                group_name: "group_a".to_string(),
389                filter: "/".to_string(),
390                level_count: 2,
391            }
392        );
393    }
394
395    #[test]
396    fn test_topic_filter_parse_shared_subscription_wildcard() {
397        assert_eq!(
398            "$share/group_b/#".parse::<TopicFilter>().unwrap(),
399            TopicFilter::SharedWildcard {
400                group_name: "group_b".to_string(),
401                filter: "#".to_string(),
402                level_count: 1,
403            }
404        );
405
406        assert_eq!(
407            "$share/group_b/+".parse::<TopicFilter>().unwrap(),
408            TopicFilter::SharedWildcard {
409                group_name: "group_b".to_string(),
410                filter: "+".to_string(),
411                level_count: 1,
412            }
413        );
414
415        assert_eq!(
416            "$share/group_b/+/temperature".parse::<TopicFilter>().unwrap(),
417            TopicFilter::SharedWildcard {
418                group_name: "group_b".to_string(),
419                filter: "+/temperature".to_string(),
420                level_count: 2,
421            }
422        );
423
424        assert_eq!(
425            "$share/group_c/+/temperature/+/meta".parse::<TopicFilter>().unwrap(),
426            TopicFilter::SharedWildcard {
427                group_name: "group_c".to_string(),
428                filter: "+/temperature/+/meta".to_string(),
429                level_count: 4,
430            }
431        );
432    }
433
434    #[test]
435    fn test_topic_filter_parse_invalid_shared_subscription() {
436        assert_eq!(
437            "$share/".parse::<TopicFilter>().unwrap_err(),
438            TopicParseError::EmptySharedGroupName
439        );
440        assert_eq!("$share/a".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
441        assert_eq!("$share/a/".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
442        assert_eq!(
443            "$share//".parse::<TopicFilter>().unwrap_err(),
444            TopicParseError::EmptySharedGroupName
445        );
446        assert_eq!(
447            "$share///".parse::<TopicFilter>().unwrap_err(),
448            TopicParseError::EmptySharedGroupName
449        );
450
451        assert_eq!(
452            "$share/invalid_group#/#".parse::<TopicFilter>().unwrap_err(),
453            TopicParseError::InvalidSharedGroupName
454        );
455    }
456
457    #[test]
458    fn test_topic_filter_parse_sys_prefix() {
459        assert_eq!(
460            "$SYS/stats".parse::<TopicFilter>().unwrap(),
461            TopicFilter::Concrete { filter: "$SYS/stats".to_string(), level_count: 2 }
462        );
463
464        assert_eq!(
465            "/$SYS/stats".parse::<TopicFilter>().unwrap(),
466            TopicFilter::Concrete { filter: "/$SYS/stats".to_string(), level_count: 3 }
467        );
468
469        assert_eq!(
470            "$SYS/+".parse::<TopicFilter>().unwrap(),
471            TopicFilter::Wildcard { filter: "$SYS/+".to_string(), level_count: 2 }
472        );
473
474        assert_eq!(
475            "$SYS/#".parse::<TopicFilter>().unwrap(),
476            TopicFilter::Wildcard { filter: "$SYS/#".to_string(), level_count: 2 }
477        );
478    }
479
480    #[test]
481    fn test_topic_filter_parse_invalid_filters() {
482        assert_eq!(
483            "sport/#/stats".parse::<TopicFilter>().unwrap_err(),
484            TopicParseError::MultilevelWildcardNotAtEnd
485        );
486        assert_eq!(
487            "sport/#/stats#".parse::<TopicFilter>().unwrap_err(),
488            TopicParseError::InvalidWildcardLevel
489        );
490        assert_eq!(
491            "sport#/stats#".parse::<TopicFilter>().unwrap_err(),
492            TopicParseError::InvalidWildcardLevel
493        );
494        assert_eq!(
495            "sport/tennis#".parse::<TopicFilter>().unwrap_err(),
496            TopicParseError::InvalidWildcardLevel
497        );
498        assert_eq!(
499            "sport/++".parse::<TopicFilter>().unwrap_err(),
500            TopicParseError::InvalidWildcardLevel
501        );
502    }
503
504    #[test]
505    fn test_topic_name_success() {
506        assert_eq!(
507            "/".parse::<Topic>().unwrap(),
508            Topic { topic_name: "/".to_string(), level_count: 2 }
509        );
510
511        assert_eq!(
512            "Accounts payable".parse::<Topic>().unwrap(),
513            Topic { topic_name: "Accounts payable".to_string(), level_count: 1 }
514        );
515
516        assert_eq!(
517            "home/kitchen".parse::<Topic>().unwrap(),
518            Topic { topic_name: "home/kitchen".to_string(), level_count: 2 }
519        );
520
521        assert_eq!(
522            "home/kitchen/temperature".parse::<Topic>().unwrap(),
523            Topic { topic_name: "home/kitchen/temperature".to_string(), level_count: 3 }
524        );
525    }
526
527    #[test]
528    fn test_topic_name_failure() {
529        assert_eq!("#".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
530
531        assert_eq!("+".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
532
533        assert_eq!("\0".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
534
535        assert_eq!(
536            "/multi/level/#".parse::<Topic>().unwrap_err(),
537            TopicParseError::WildcardOrNullInTopic,
538        );
539
540        assert_eq!(
541            "/single/level/+".parse::<Topic>().unwrap_err(),
542            TopicParseError::WildcardOrNullInTopic,
543        );
544
545        assert_eq!(
546            "/null/byte/\0".parse::<Topic>().unwrap_err(),
547            TopicParseError::WildcardOrNullInTopic,
548        );
549    }
550
551    #[test]
552    fn test_topic_filter_level_iterator_simple() {
553        let filter: TopicFilter = "/".parse().unwrap();
554
555        let mut levels = filter.levels();
556
557        assert_eq!(levels.next(), Some(TopicLevel::Concrete("")));
558        assert_eq!(levels.next(), Some(TopicLevel::Concrete("")));
559        assert_eq!(levels.next(), None);
560    }
561
562    #[test]
563    fn test_topic_filter_level_iterator_concrete() {
564        let filter: TopicFilter = "home/kitchen/temperature".parse().unwrap();
565
566        let mut levels = filter.levels();
567
568        assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
569        assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen")));
570        assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature")));
571        assert_eq!(levels.next(), None);
572    }
573
574    #[test]
575    fn test_topic_filter_level_iterator_single_level_wildcard_1() {
576        let filter: TopicFilter = "home/+/+/temperature/+".parse().unwrap();
577
578        let mut levels = filter.levels();
579
580        assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
581        assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
582        assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
583        assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature")));
584        assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
585        assert_eq!(levels.next(), None);
586    }
587
588    #[test]
589    fn test_topic_filter_level_iterator_single_level_wildcard_2() {
590        let filter: TopicFilter = "+".parse().unwrap();
591
592        let mut levels = filter.levels();
593
594        assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
595        assert_eq!(levels.next(), None);
596    }
597
598    #[test]
599    fn test_topic_filter_level_iterator_mutli_level_wildcard_1() {
600        let filter: TopicFilter = "home/kitchen/#".parse().unwrap();
601
602        let mut levels = filter.levels();
603
604        assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
605        assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen")));
606        assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard));
607        assert_eq!(levels.next(), None);
608    }
609
610    #[test]
611    fn test_topic_filter_level_iterator_mutli_level_wildcard_2() {
612        let filter: TopicFilter = "#".parse().unwrap();
613
614        let mut levels = filter.levels();
615
616        assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard));
617        assert_eq!(levels.next(), None);
618    }
619}