1use crate::{
2 MAX_TOPIC_LEN_BYTES, MULTI_LEVEL_WILDCARD, MULTI_LEVEL_WILDCARD_STR,
3 SHARED_SUBSCRIPTION_PREFIX, SINGLE_LEVEL_WILDCARD, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR,
4};
5use std::str::FromStr;
6
7#[derive(Debug, Clone, PartialEq)]
11pub enum TopicFilter {
12 Concrete { filter: String, level_count: u32 },
13 Wildcard { filter: String, level_count: u32 },
14 SharedConcrete { group_name: String, filter: String, level_count: u32 },
15 SharedWildcard { group_name: String, filter: String, level_count: u32 },
16}
17
18#[derive(Debug, Clone, PartialEq)]
21pub struct Topic {
22 topic_name: String,
23 level_count: u32,
24}
25
26impl Topic {
27 pub fn topic_name(&self) -> &str {
28 &self.topic_name
29 }
30}
31
32#[derive(Debug, PartialEq)]
33pub enum TopicLevel<'a> {
34 Concrete(&'a str),
35 SingleLevelWildcard,
36 MultiLevelWildcard,
37}
38
39#[derive(Debug, PartialEq)]
40pub enum TopicParseError {
41 EmptyTopic,
42 TopicTooLong,
43 MultilevelWildcardNotAtEnd,
44 InvalidWildcardLevel,
45 InvalidSharedGroupName,
46 EmptySharedGroupName,
47 WildcardOrNullInTopic,
48}
49
50impl std::fmt::Display for Topic {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 write!(f, "{}", self.topic_name)
53 }
54}
55
56impl std::fmt::Display for TopicFilter {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 match self {
59 TopicFilter::Concrete { filter, .. } | TopicFilter::Wildcard { filter, .. } => {
60 write!(f, "{}", filter)
61 },
62 TopicFilter::SharedConcrete { group_name, filter, .. }
63 | TopicFilter::SharedWildcard { group_name, filter, .. } => {
64 write!(f, "{}{}/{}", SHARED_SUBSCRIPTION_PREFIX, group_name, filter)
65 },
66 }
67 }
68}
69
70fn process_filter(filter: &str) -> Result<(u32, bool), TopicParseError> {
72 let mut level_count = 0;
73 let mut contains_wildcards = false;
74 for level in filter.split(TOPIC_SEPARATOR) {
75 let level_contains_wildcard =
76 level.contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD);
77 if level_contains_wildcard {
78 if level.len() > 1 {
80 return Err(TopicParseError::InvalidWildcardLevel);
81 }
82
83 contains_wildcards = true;
84 }
85
86 level_count += 1;
87 }
88
89 Ok((level_count, contains_wildcards))
90}
91
92impl FromStr for TopicFilter {
93 type Err = TopicParseError;
94
95 fn from_str(filter: &str) -> Result<Self, Self::Err> {
96 if filter.is_empty() {
98 return Err(TopicParseError::EmptyTopic);
99 }
100
101 if filter.contains('\0') {
103 return Err(TopicParseError::WildcardOrNullInTopic);
104 }
105
106 if filter.len() > MAX_TOPIC_LEN_BYTES {
108 return Err(TopicParseError::TopicTooLong);
109 }
110
111 if let Some(pos) = filter.rfind(MULTI_LEVEL_WILDCARD) {
113 if pos != filter.len() - 1 {
114 return Err(TopicParseError::MultilevelWildcardNotAtEnd);
115 }
116 }
117
118 let mut shared_group = None;
119
120 if filter.starts_with(SHARED_SUBSCRIPTION_PREFIX) {
121 let filter_rest = &filter[SHARED_SUBSCRIPTION_PREFIX.len()..];
122
123 if filter_rest.is_empty() {
124 return Err(TopicParseError::EmptySharedGroupName);
125 }
126
127 if let Some(slash_pos) = filter_rest.find(TOPIC_SEPARATOR) {
128 let shared_name = &filter_rest[0..slash_pos];
129
130 let shared_filter = &filter_rest[(slash_pos + 1)..];
133
134 if shared_name.is_empty() {
135 return Err(TopicParseError::EmptySharedGroupName);
136 }
137
138 if shared_name
139 .contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD)
140 {
141 return Err(TopicParseError::InvalidSharedGroupName);
142 }
143
144 if shared_filter.is_empty() {
145 return Err(TopicParseError::EmptyTopic);
146 }
147
148 shared_group = Some((shared_name, shared_filter))
149 } else {
150 return Err(TopicParseError::EmptyTopic);
151 }
152 }
153
154 let topic_filter = if let Some((group_name, shared_filter)) = shared_group {
155 let (level_count, contains_wildcards) = process_filter(shared_filter)?;
156
157 if contains_wildcards {
158 TopicFilter::SharedWildcard {
159 group_name: group_name.to_string(),
160 filter: shared_filter.to_string(),
161 level_count,
162 }
163 } else {
164 TopicFilter::SharedConcrete {
165 group_name: group_name.to_string(),
166 filter: shared_filter.to_string(),
167 level_count,
168 }
169 }
170 } else {
171 let (level_count, contains_wildcards) = process_filter(filter)?;
172
173 if contains_wildcards {
174 TopicFilter::Wildcard { filter: filter.to_string(), level_count }
175 } else {
176 TopicFilter::Concrete { filter: filter.to_string(), level_count }
177 }
178 };
179
180 Ok(topic_filter)
181 }
182}
183
184impl FromStr for Topic {
185 type Err = TopicParseError;
186
187 fn from_str(topic: &str) -> Result<Self, Self::Err> {
188 if topic.is_empty() {
192 return Err(TopicParseError::EmptyTopic);
193 }
194
195 if topic.len() > MAX_TOPIC_LEN_BYTES {
197 return Err(TopicParseError::TopicTooLong);
198 }
199
200 let topic_contains_wildcards = topic.contains(|x: char| {
202 x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD || x == '\0'
203 });
204
205 if topic_contains_wildcards {
206 return Err(TopicParseError::WildcardOrNullInTopic);
207 }
208
209 let level_count = topic.split(TOPIC_SEPARATOR).count() as u32;
210
211 let topic = Topic { topic_name: topic.to_string(), level_count };
212
213 Ok(topic)
214 }
215}
216
217pub struct TopicLevels<'a> {
218 levels_iter: std::str::Split<'a, char>,
219}
220
221impl<'a> TopicFilter {
222 fn filter(&'a self) -> &'a str {
223 match self {
224 TopicFilter::Concrete { filter, .. } => filter,
225 TopicFilter::Wildcard { filter, .. } => filter,
226 TopicFilter::SharedConcrete { filter, .. } => filter,
227 TopicFilter::SharedWildcard { filter, .. } => filter,
228 }
229 }
230
231 pub fn levels(&'a self) -> TopicLevels<'a> {
232 TopicLevels { levels_iter: self.filter().split(TOPIC_SEPARATOR) }
233 }
234}
235
236impl<'a> Topic {
237 pub fn levels(&'a self) -> TopicLevels<'a> {
238 TopicLevels { levels_iter: self.topic_name.split(TOPIC_SEPARATOR) }
239 }
240}
241
242impl<'a> Iterator for TopicLevels<'a> {
243 type Item = TopicLevel<'a>;
244
245 fn next(&mut self) -> Option<Self::Item> {
246 match self.levels_iter.next() {
247 Some(MULTI_LEVEL_WILDCARD_STR) => Some(TopicLevel::MultiLevelWildcard),
248 Some(SINGLE_LEVEL_WILDCARD_STR) => Some(TopicLevel::SingleLevelWildcard),
249 Some(level) => Some(TopicLevel::Concrete(level)),
250 None => None,
251 }
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use crate::topic::{Topic, TopicFilter, TopicLevel, TopicParseError, MAX_TOPIC_LEN_BYTES};
258
259 #[test]
260 fn test_topic_filter_parse_empty_topic() {
261 assert_eq!("".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
262 }
263
264 #[test]
265 fn test_topic_filter_parse_length() {
266 let just_right_topic = "a".repeat(MAX_TOPIC_LEN_BYTES);
267 assert!(just_right_topic.parse::<TopicFilter>().is_ok());
268
269 let too_long_topic = "a".repeat(MAX_TOPIC_LEN_BYTES + 1);
270 assert_eq!(
271 too_long_topic.parse::<TopicFilter>().unwrap_err(),
272 TopicParseError::TopicTooLong
273 );
274 }
275
276 #[test]
277 fn test_topic_filter_parse_concrete() {
278 assert_eq!(
279 "/".parse::<TopicFilter>().unwrap(),
280 TopicFilter::Concrete { filter: "/".to_string(), level_count: 2 }
281 );
282
283 assert_eq!(
284 "a".parse::<TopicFilter>().unwrap(),
285 TopicFilter::Concrete { filter: "a".to_string(), level_count: 1 }
286 );
287
288 assert_eq!(
290 "home/kitchen".parse::<TopicFilter>().unwrap(),
291 TopicFilter::Concrete { filter: "home/kitchen".to_string(), level_count: 2 }
292 );
293
294 assert_eq!(
295 "home/kitchen/temperature".parse::<TopicFilter>().unwrap(),
296 TopicFilter::Concrete {
297 filter: "home/kitchen/temperature".to_string(),
298 level_count: 3,
299 }
300 );
301
302 assert_eq!(
303 "home/kitchen/temperature/celsius".parse::<TopicFilter>().unwrap(),
304 TopicFilter::Concrete {
305 filter: "home/kitchen/temperature/celsius".to_string(),
306 level_count: 4,
307 }
308 );
309 }
310
311 #[test]
312 fn test_topic_filter_parse_single_level_wildcard() {
313 assert_eq!(
314 "+".parse::<TopicFilter>().unwrap(),
315 TopicFilter::Wildcard { filter: "+".to_string(), level_count: 1 }
316 );
317
318 assert_eq!(
319 "+/".parse::<TopicFilter>().unwrap(),
320 TopicFilter::Wildcard { filter: "+/".to_string(), level_count: 2 }
321 );
322
323 assert_eq!(
324 "sport/+".parse::<TopicFilter>().unwrap(),
325 TopicFilter::Wildcard { filter: "sport/+".to_string(), level_count: 2 }
326 );
327
328 assert_eq!(
329 "/+".parse::<TopicFilter>().unwrap(),
330 TopicFilter::Wildcard { filter: "/+".to_string(), level_count: 2 }
331 );
332 }
333
334 #[test]
335 fn test_topic_filter_parse_multi_level_wildcard() {
336 assert_eq!(
337 "#".parse::<TopicFilter>().unwrap(),
338 TopicFilter::Wildcard { filter: "#".to_string(), level_count: 1 }
339 );
340
341 assert_eq!(
342 "#/".parse::<TopicFilter>().unwrap_err(),
343 TopicParseError::MultilevelWildcardNotAtEnd
344 );
345
346 assert_eq!(
347 "/#".parse::<TopicFilter>().unwrap(),
348 TopicFilter::Wildcard { filter: "/#".to_string(), level_count: 2 }
349 );
350
351 assert_eq!(
352 "sport/#".parse::<TopicFilter>().unwrap(),
353 TopicFilter::Wildcard { filter: "sport/#".to_string(), level_count: 2 }
354 );
355
356 assert_eq!(
357 "home/kitchen/temperature/#".parse::<TopicFilter>().unwrap(),
358 TopicFilter::Wildcard {
359 filter: "home/kitchen/temperature/#".to_string(),
360 level_count: 4,
361 }
362 );
363 }
364
365 #[test]
366 fn test_topic_filter_parse_shared_subscription_concrete() {
367 assert_eq!(
368 "$share/group_a/home".parse::<TopicFilter>().unwrap(),
369 TopicFilter::SharedConcrete {
370 group_name: "group_a".to_string(),
371 filter: "home".to_string(),
372 level_count: 1,
373 }
374 );
375
376 assert_eq!(
377 "$share/group_a/home/kitchen/temperature".parse::<TopicFilter>().unwrap(),
378 TopicFilter::SharedConcrete {
379 group_name: "group_a".to_string(),
380 filter: "home/kitchen/temperature".to_string(),
381 level_count: 3,
382 }
383 );
384
385 assert_eq!(
386 "$share/group_a//".parse::<TopicFilter>().unwrap(),
387 TopicFilter::SharedConcrete {
388 group_name: "group_a".to_string(),
389 filter: "/".to_string(),
390 level_count: 2,
391 }
392 );
393 }
394
395 #[test]
396 fn test_topic_filter_parse_shared_subscription_wildcard() {
397 assert_eq!(
398 "$share/group_b/#".parse::<TopicFilter>().unwrap(),
399 TopicFilter::SharedWildcard {
400 group_name: "group_b".to_string(),
401 filter: "#".to_string(),
402 level_count: 1,
403 }
404 );
405
406 assert_eq!(
407 "$share/group_b/+".parse::<TopicFilter>().unwrap(),
408 TopicFilter::SharedWildcard {
409 group_name: "group_b".to_string(),
410 filter: "+".to_string(),
411 level_count: 1,
412 }
413 );
414
415 assert_eq!(
416 "$share/group_b/+/temperature".parse::<TopicFilter>().unwrap(),
417 TopicFilter::SharedWildcard {
418 group_name: "group_b".to_string(),
419 filter: "+/temperature".to_string(),
420 level_count: 2,
421 }
422 );
423
424 assert_eq!(
425 "$share/group_c/+/temperature/+/meta".parse::<TopicFilter>().unwrap(),
426 TopicFilter::SharedWildcard {
427 group_name: "group_c".to_string(),
428 filter: "+/temperature/+/meta".to_string(),
429 level_count: 4,
430 }
431 );
432 }
433
434 #[test]
435 fn test_topic_filter_parse_invalid_shared_subscription() {
436 assert_eq!(
437 "$share/".parse::<TopicFilter>().unwrap_err(),
438 TopicParseError::EmptySharedGroupName
439 );
440 assert_eq!("$share/a".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
441 assert_eq!("$share/a/".parse::<TopicFilter>().unwrap_err(), TopicParseError::EmptyTopic);
442 assert_eq!(
443 "$share//".parse::<TopicFilter>().unwrap_err(),
444 TopicParseError::EmptySharedGroupName
445 );
446 assert_eq!(
447 "$share///".parse::<TopicFilter>().unwrap_err(),
448 TopicParseError::EmptySharedGroupName
449 );
450
451 assert_eq!(
452 "$share/invalid_group#/#".parse::<TopicFilter>().unwrap_err(),
453 TopicParseError::InvalidSharedGroupName
454 );
455 }
456
457 #[test]
458 fn test_topic_filter_parse_sys_prefix() {
459 assert_eq!(
460 "$SYS/stats".parse::<TopicFilter>().unwrap(),
461 TopicFilter::Concrete { filter: "$SYS/stats".to_string(), level_count: 2 }
462 );
463
464 assert_eq!(
465 "/$SYS/stats".parse::<TopicFilter>().unwrap(),
466 TopicFilter::Concrete { filter: "/$SYS/stats".to_string(), level_count: 3 }
467 );
468
469 assert_eq!(
470 "$SYS/+".parse::<TopicFilter>().unwrap(),
471 TopicFilter::Wildcard { filter: "$SYS/+".to_string(), level_count: 2 }
472 );
473
474 assert_eq!(
475 "$SYS/#".parse::<TopicFilter>().unwrap(),
476 TopicFilter::Wildcard { filter: "$SYS/#".to_string(), level_count: 2 }
477 );
478 }
479
480 #[test]
481 fn test_topic_filter_parse_invalid_filters() {
482 assert_eq!(
483 "sport/#/stats".parse::<TopicFilter>().unwrap_err(),
484 TopicParseError::MultilevelWildcardNotAtEnd
485 );
486 assert_eq!(
487 "sport/#/stats#".parse::<TopicFilter>().unwrap_err(),
488 TopicParseError::InvalidWildcardLevel
489 );
490 assert_eq!(
491 "sport#/stats#".parse::<TopicFilter>().unwrap_err(),
492 TopicParseError::InvalidWildcardLevel
493 );
494 assert_eq!(
495 "sport/tennis#".parse::<TopicFilter>().unwrap_err(),
496 TopicParseError::InvalidWildcardLevel
497 );
498 assert_eq!(
499 "sport/++".parse::<TopicFilter>().unwrap_err(),
500 TopicParseError::InvalidWildcardLevel
501 );
502 }
503
504 #[test]
505 fn test_topic_name_success() {
506 assert_eq!(
507 "/".parse::<Topic>().unwrap(),
508 Topic { topic_name: "/".to_string(), level_count: 2 }
509 );
510
511 assert_eq!(
512 "Accounts payable".parse::<Topic>().unwrap(),
513 Topic { topic_name: "Accounts payable".to_string(), level_count: 1 }
514 );
515
516 assert_eq!(
517 "home/kitchen".parse::<Topic>().unwrap(),
518 Topic { topic_name: "home/kitchen".to_string(), level_count: 2 }
519 );
520
521 assert_eq!(
522 "home/kitchen/temperature".parse::<Topic>().unwrap(),
523 Topic { topic_name: "home/kitchen/temperature".to_string(), level_count: 3 }
524 );
525 }
526
527 #[test]
528 fn test_topic_name_failure() {
529 assert_eq!("#".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
530
531 assert_eq!("+".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
532
533 assert_eq!("\0".parse::<Topic>().unwrap_err(), TopicParseError::WildcardOrNullInTopic,);
534
535 assert_eq!(
536 "/multi/level/#".parse::<Topic>().unwrap_err(),
537 TopicParseError::WildcardOrNullInTopic,
538 );
539
540 assert_eq!(
541 "/single/level/+".parse::<Topic>().unwrap_err(),
542 TopicParseError::WildcardOrNullInTopic,
543 );
544
545 assert_eq!(
546 "/null/byte/\0".parse::<Topic>().unwrap_err(),
547 TopicParseError::WildcardOrNullInTopic,
548 );
549 }
550
551 #[test]
552 fn test_topic_filter_level_iterator_simple() {
553 let filter: TopicFilter = "/".parse().unwrap();
554
555 let mut levels = filter.levels();
556
557 assert_eq!(levels.next(), Some(TopicLevel::Concrete("")));
558 assert_eq!(levels.next(), Some(TopicLevel::Concrete("")));
559 assert_eq!(levels.next(), None);
560 }
561
562 #[test]
563 fn test_topic_filter_level_iterator_concrete() {
564 let filter: TopicFilter = "home/kitchen/temperature".parse().unwrap();
565
566 let mut levels = filter.levels();
567
568 assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
569 assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen")));
570 assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature")));
571 assert_eq!(levels.next(), None);
572 }
573
574 #[test]
575 fn test_topic_filter_level_iterator_single_level_wildcard_1() {
576 let filter: TopicFilter = "home/+/+/temperature/+".parse().unwrap();
577
578 let mut levels = filter.levels();
579
580 assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
581 assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
582 assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
583 assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature")));
584 assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
585 assert_eq!(levels.next(), None);
586 }
587
588 #[test]
589 fn test_topic_filter_level_iterator_single_level_wildcard_2() {
590 let filter: TopicFilter = "+".parse().unwrap();
591
592 let mut levels = filter.levels();
593
594 assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard));
595 assert_eq!(levels.next(), None);
596 }
597
598 #[test]
599 fn test_topic_filter_level_iterator_mutli_level_wildcard_1() {
600 let filter: TopicFilter = "home/kitchen/#".parse().unwrap();
601
602 let mut levels = filter.levels();
603
604 assert_eq!(levels.next(), Some(TopicLevel::Concrete("home")));
605 assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen")));
606 assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard));
607 assert_eq!(levels.next(), None);
608 }
609
610 #[test]
611 fn test_topic_filter_level_iterator_mutli_level_wildcard_2() {
612 let filter: TopicFilter = "#".parse().unwrap();
613
614 let mut levels = filter.levels();
615
616 assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard));
617 assert_eq!(levels.next(), None);
618 }
619}