1use regex::Regex;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39#[derive(Debug, Clone)]
41pub enum PatternMatcher {
42 Exact(String),
44 Glob(GlobPattern),
46 Regex(RegexPattern),
48 All,
50}
51
52impl PatternMatcher {
53 #[must_use]
55 pub fn exact(name: impl Into<String>) -> Self {
56 Self::Exact(name.into())
57 }
58
59 #[must_use]
76 pub fn glob(pattern: impl Into<String>) -> Self {
77 Self::Glob(GlobPattern::new(pattern))
78 }
79
80 pub fn regex(pattern: &str) -> Result<Self, regex::Error> {
96 Ok(Self::Regex(RegexPattern::new(pattern)?))
97 }
98
99 #[must_use]
101 pub fn all() -> Self {
102 Self::All
103 }
104
105 #[inline]
107 #[must_use]
108 pub fn matches(&self, task_name: &str) -> bool {
109 match self {
110 Self::Exact(name) => task_name == name,
111 Self::Glob(glob) => glob.matches(task_name),
112 Self::Regex(regex) => regex.matches(task_name),
113 Self::All => true,
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct GlobPattern {
121 pattern: String,
122 regex: Regex,
123}
124
125impl GlobPattern {
126 #[must_use]
132 pub fn new(pattern: impl Into<String>) -> Self {
133 let pattern = pattern.into();
134 let regex_str = glob_to_regex(&pattern);
135 let regex = Regex::new(®ex_str).expect("Invalid glob pattern");
136 Self { pattern, regex }
137 }
138
139 #[inline]
141 #[must_use]
142 pub fn matches(&self, task_name: &str) -> bool {
143 self.regex.is_match(task_name)
144 }
145
146 #[inline]
148 #[must_use]
149 pub fn pattern(&self) -> &str {
150 &self.pattern
151 }
152}
153
154#[derive(Debug, Clone)]
156pub struct RegexPattern {
157 pattern: String,
158 regex: Regex,
159}
160
161impl RegexPattern {
162 pub fn new(pattern: &str) -> Result<Self, regex::Error> {
168 let regex = Regex::new(pattern)?;
169 Ok(Self {
170 pattern: pattern.to_string(),
171 regex,
172 })
173 }
174
175 #[inline]
177 #[must_use]
178 pub fn matches(&self, task_name: &str) -> bool {
179 self.regex.is_match(task_name)
180 }
181
182 #[inline]
184 #[must_use]
185 pub fn pattern(&self) -> &str {
186 &self.pattern
187 }
188}
189
190fn glob_to_regex(glob: &str) -> String {
192 let mut regex = String::with_capacity(glob.len() * 2 + 2);
193 regex.push('^');
194
195 for c in glob.chars() {
196 match c {
197 '*' => regex.push_str(".*"),
198 '?' => regex.push('.'),
199 '.' | '+' | '(' | ')' | '[' | ']' | '{' | '}' | '^' | '$' | '|' | '\\' => {
200 regex.push('\\');
201 regex.push(c);
202 }
203 _ => regex.push(c),
204 }
205 }
206
207 regex.push('$');
208 regex
209}
210
211#[derive(Debug, Clone)]
213pub struct RouteRule {
214 pub matcher: PatternMatcher,
216 pub queue: String,
218 pub priority: i32,
220 pub routing_key: Option<String>,
222 pub exchange: Option<String>,
224 pub argument_condition: Option<ArgumentCondition>,
226}
227
228impl RouteRule {
229 #[must_use]
231 pub fn new(matcher: PatternMatcher, queue: impl Into<String>) -> Self {
232 Self {
233 matcher,
234 queue: queue.into(),
235 priority: 0,
236 routing_key: None,
237 exchange: None,
238 argument_condition: None,
239 }
240 }
241
242 #[must_use]
244 pub fn with_priority(mut self, priority: i32) -> Self {
245 self.priority = priority;
246 self
247 }
248
249 #[must_use]
251 pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
252 self.routing_key = Some(routing_key.into());
253 self
254 }
255
256 #[must_use]
258 pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
259 self.exchange = Some(exchange.into());
260 self
261 }
262
263 #[must_use]
265 pub fn with_argument_condition(mut self, condition: ArgumentCondition) -> Self {
266 self.argument_condition = Some(condition);
267 self
268 }
269
270 #[inline]
272 #[must_use]
273 pub fn matches(&self, task_name: &str) -> bool {
274 self.matcher.matches(task_name)
275 }
276
277 #[must_use]
283 pub fn matches_with_args(
284 &self,
285 task_name: &str,
286 args: &[serde_json::Value],
287 kwargs: &serde_json::Map<String, serde_json::Value>,
288 ) -> bool {
289 if !self.matcher.matches(task_name) {
290 return false;
291 }
292
293 match &self.argument_condition {
294 Some(condition) => condition.evaluate(args, kwargs),
295 None => true,
296 }
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
308#[serde(tag = "type")]
309pub enum ArgumentCondition {
310 ArgEquals {
312 index: usize,
314 value: serde_json::Value,
316 },
317
318 ArgExists {
320 index: usize,
322 },
323
324 KwargEquals {
326 key: String,
328 value: serde_json::Value,
330 },
331
332 KwargExists {
334 key: String,
336 },
337
338 KwargMatches {
340 key: String,
342 pattern: String,
344 },
345
346 ArgGreaterThan {
348 index: usize,
350 threshold: f64,
352 },
353
354 ArgLessThan {
356 index: usize,
358 threshold: f64,
360 },
361
362 KwargGreaterThan {
364 key: String,
366 threshold: f64,
368 },
369
370 KwargLessThan {
372 key: String,
374 threshold: f64,
376 },
377
378 KwargContains {
380 key: String,
382 value: serde_json::Value,
384 },
385
386 And(Vec<ArgumentCondition>),
388
389 Or(Vec<ArgumentCondition>),
391
392 Not(Box<ArgumentCondition>),
394
395 Always,
397}
398
399impl ArgumentCondition {
400 #[must_use]
402 pub fn arg_equals(index: usize, value: serde_json::Value) -> Self {
403 Self::ArgEquals { index, value }
404 }
405
406 #[must_use]
408 pub fn arg_exists(index: usize) -> Self {
409 Self::ArgExists { index }
410 }
411
412 #[must_use]
414 pub fn kwarg_equals(key: impl Into<String>, value: serde_json::Value) -> Self {
415 Self::KwargEquals {
416 key: key.into(),
417 value,
418 }
419 }
420
421 #[must_use]
423 pub fn kwarg_exists(key: impl Into<String>) -> Self {
424 Self::KwargExists { key: key.into() }
425 }
426
427 #[must_use]
429 pub fn kwarg_matches(key: impl Into<String>, pattern: impl Into<String>) -> Self {
430 Self::KwargMatches {
431 key: key.into(),
432 pattern: pattern.into(),
433 }
434 }
435
436 #[must_use]
438 pub fn arg_greater_than(index: usize, threshold: f64) -> Self {
439 Self::ArgGreaterThan { index, threshold }
440 }
441
442 #[must_use]
444 pub fn arg_less_than(index: usize, threshold: f64) -> Self {
445 Self::ArgLessThan { index, threshold }
446 }
447
448 #[must_use]
450 pub fn kwarg_greater_than(key: impl Into<String>, threshold: f64) -> Self {
451 Self::KwargGreaterThan {
452 key: key.into(),
453 threshold,
454 }
455 }
456
457 #[must_use]
459 pub fn kwarg_less_than(key: impl Into<String>, threshold: f64) -> Self {
460 Self::KwargLessThan {
461 key: key.into(),
462 threshold,
463 }
464 }
465
466 #[must_use]
468 pub fn kwarg_contains(key: impl Into<String>, value: serde_json::Value) -> Self {
469 Self::KwargContains {
470 key: key.into(),
471 value,
472 }
473 }
474
475 #[must_use]
477 pub fn always() -> Self {
478 Self::Always
479 }
480
481 #[must_use]
483 pub fn and(self, other: ArgumentCondition) -> Self {
484 match self {
485 Self::And(mut conditions) => {
486 conditions.push(other);
487 Self::And(conditions)
488 }
489 _ => Self::And(vec![self, other]),
490 }
491 }
492
493 #[must_use]
495 pub fn or(self, other: ArgumentCondition) -> Self {
496 match self {
497 Self::Or(mut conditions) => {
498 conditions.push(other);
499 Self::Or(conditions)
500 }
501 _ => Self::Or(vec![self, other]),
502 }
503 }
504
505 #[must_use]
507 pub fn negate(self) -> Self {
508 Self::Not(Box::new(self))
509 }
510
511 #[must_use]
517 pub fn evaluate(
518 &self,
519 args: &[serde_json::Value],
520 kwargs: &serde_json::Map<String, serde_json::Value>,
521 ) -> bool {
522 match self {
523 Self::Always => true,
524
525 Self::ArgEquals { index, value } => args.get(*index).is_some_and(|v| v == value),
526
527 Self::ArgExists { index } => args.len() > *index,
528
529 Self::KwargEquals { key, value } => kwargs.get(key).is_some_and(|v| v == value),
530
531 Self::KwargExists { key } => kwargs.contains_key(key),
532
533 Self::KwargMatches { key, pattern } => {
534 if let Some(serde_json::Value::String(s)) = kwargs.get(key) {
535 Regex::new(pattern)
536 .map(|re| re.is_match(s))
537 .unwrap_or(false)
538 } else {
539 false
540 }
541 }
542
543 Self::ArgGreaterThan { index, threshold } => args
544 .get(*index)
545 .and_then(serde_json::Value::as_f64)
546 .is_some_and(|v| v > *threshold),
547
548 Self::ArgLessThan { index, threshold } => args
549 .get(*index)
550 .and_then(serde_json::Value::as_f64)
551 .is_some_and(|v| v < *threshold),
552
553 Self::KwargGreaterThan { key, threshold } => kwargs
554 .get(key)
555 .and_then(serde_json::Value::as_f64)
556 .is_some_and(|v| v > *threshold),
557
558 Self::KwargLessThan { key, threshold } => kwargs
559 .get(key)
560 .and_then(serde_json::Value::as_f64)
561 .is_some_and(|v| v < *threshold),
562
563 Self::KwargContains { key, value } => {
564 if let Some(v) = kwargs.get(key) {
565 match v {
566 serde_json::Value::String(s) => {
567 if let Some(needle) = value.as_str() {
568 s.contains(needle)
569 } else {
570 false
571 }
572 }
573 serde_json::Value::Array(arr) => arr.contains(value),
574 _ => false,
575 }
576 } else {
577 false
578 }
579 }
580
581 Self::And(conditions) => conditions.iter().all(|c| c.evaluate(args, kwargs)),
582
583 Self::Or(conditions) => conditions.iter().any(|c| c.evaluate(args, kwargs)),
584
585 Self::Not(condition) => !condition.evaluate(args, kwargs),
586 }
587 }
588}
589
590impl std::fmt::Display for ArgumentCondition {
591 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592 match self {
593 Self::Always => write!(f, "always"),
594 Self::ArgEquals { index, value } => write!(f, "args[{index}] == {value}"),
595 Self::ArgExists { index } => write!(f, "args[{index}] exists"),
596 Self::KwargEquals { key, value } => write!(f, "kwargs[{key}] == {value}"),
597 Self::KwargExists { key } => write!(f, "kwargs[{key}] exists"),
598 Self::KwargMatches { key, pattern } => {
599 write!(f, "kwargs[{key}] matches /{pattern}/")
600 }
601 Self::ArgGreaterThan { index, threshold } => {
602 write!(f, "args[{index}] > {threshold}")
603 }
604 Self::ArgLessThan { index, threshold } => write!(f, "args[{index}] < {threshold}"),
605 Self::KwargGreaterThan { key, threshold } => {
606 write!(f, "kwargs[{key}] > {threshold}")
607 }
608 Self::KwargLessThan { key, threshold } => write!(f, "kwargs[{key}] < {threshold}"),
609 Self::KwargContains { key, value } => {
610 write!(f, "kwargs[{key}] contains {value}")
611 }
612 Self::And(conditions) => {
613 let parts: Vec<String> = conditions.iter().map(|c| format!("{c}")).collect();
614 write!(f, "({})", parts.join(" AND "))
615 }
616 Self::Or(conditions) => {
617 let parts: Vec<String> = conditions.iter().map(|c| format!("{c}")).collect();
618 write!(f, "({})", parts.join(" OR "))
619 }
620 Self::Not(condition) => write!(f, "NOT ({condition})"),
621 }
622 }
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
627pub struct RouteResult {
628 pub queue: String,
630 pub routing_key: Option<String>,
632 pub exchange: Option<String>,
634}
635
636impl RouteResult {
637 #[must_use]
639 pub fn new(queue: impl Into<String>) -> Self {
640 Self {
641 queue: queue.into(),
642 routing_key: None,
643 exchange: None,
644 }
645 }
646
647 #[must_use]
649 pub fn from_rule(rule: &RouteRule) -> Self {
650 Self {
651 queue: rule.queue.clone(),
652 routing_key: rule.routing_key.clone(),
653 exchange: rule.exchange.clone(),
654 }
655 }
656}
657
658#[derive(Debug, Default)]
660pub struct Router {
661 rules: Vec<RouteRule>,
663 default_queue: Option<String>,
665 direct_routes: HashMap<String, RouteResult>,
667}
668
669impl Router {
670 #[must_use]
672 pub fn new() -> Self {
673 Self::default()
674 }
675
676 #[must_use]
678 pub fn with_default_queue(queue: impl Into<String>) -> Self {
679 Self {
680 rules: Vec::new(),
681 default_queue: Some(queue.into()),
682 direct_routes: HashMap::new(),
683 }
684 }
685
686 pub fn add_rule(&mut self, rule: RouteRule) {
690 self.rules.push(rule);
691 self.rules.sort_by(|a, b| b.priority.cmp(&a.priority));
693 }
694
695 pub fn add_direct_route(&mut self, task_name: impl Into<String>, result: RouteResult) {
699 self.direct_routes.insert(task_name.into(), result);
700 }
701
702 pub fn set_default_queue(&mut self, queue: impl Into<String>) {
704 self.default_queue = Some(queue.into());
705 }
706
707 #[must_use]
711 pub fn route(&self, task_name: &str) -> Option<String> {
712 self.route_full(task_name).map(|r| r.queue)
713 }
714
715 #[must_use]
719 pub fn route_full(&self, task_name: &str) -> Option<RouteResult> {
720 if let Some(result) = self.direct_routes.get(task_name) {
722 return Some(result.clone());
723 }
724
725 for rule in &self.rules {
727 if rule.matches(task_name) {
728 return Some(RouteResult::from_rule(rule));
729 }
730 }
731
732 self.default_queue.as_ref().map(RouteResult::new)
734 }
735
736 #[must_use]
741 pub fn route_with_args(
742 &self,
743 task_name: &str,
744 args: &[serde_json::Value],
745 kwargs: &serde_json::Map<String, serde_json::Value>,
746 ) -> Option<String> {
747 self.route_full_with_args(task_name, args, kwargs)
748 .map(|r| r.queue)
749 }
750
751 #[must_use]
756 pub fn route_full_with_args(
757 &self,
758 task_name: &str,
759 args: &[serde_json::Value],
760 kwargs: &serde_json::Map<String, serde_json::Value>,
761 ) -> Option<RouteResult> {
762 if let Some(result) = self.direct_routes.get(task_name) {
764 return Some(result.clone());
765 }
766
767 for rule in &self.rules {
769 if rule.matches_with_args(task_name, args, kwargs) {
770 return Some(RouteResult::from_rule(rule));
771 }
772 }
773
774 self.default_queue.as_ref().map(RouteResult::new)
776 }
777
778 #[inline]
780 #[must_use]
781 pub fn has_route(&self, task_name: &str) -> bool {
782 self.direct_routes.contains_key(task_name)
783 || self.rules.iter().any(|r| r.matches(task_name))
784 || self.default_queue.is_some()
785 }
786
787 #[inline]
789 #[must_use]
790 pub fn rules(&self) -> &[RouteRule] {
791 &self.rules
792 }
793
794 pub fn remove_rules_by_queue(&mut self, queue: &str) {
796 self.rules.retain(|r| r.queue != queue);
797 }
798
799 pub fn clear(&mut self) {
801 self.rules.clear();
802 self.direct_routes.clear();
803 }
804}
805
806#[derive(Debug, Default)]
808pub struct RouterBuilder {
809 router: Router,
810}
811
812impl RouterBuilder {
813 #[must_use]
815 pub fn new() -> Self {
816 Self::default()
817 }
818
819 #[must_use]
821 pub fn route_glob(mut self, pattern: &str, queue: &str) -> Self {
822 self.router
823 .add_rule(RouteRule::new(PatternMatcher::glob(pattern), queue));
824 self
825 }
826
827 pub fn route_regex(mut self, pattern: &str, queue: &str) -> Result<Self, regex::Error> {
833 self.router
834 .add_rule(RouteRule::new(PatternMatcher::regex(pattern)?, queue));
835 Ok(self)
836 }
837
838 #[must_use]
840 pub fn route_exact(mut self, task_name: &str, queue: &str) -> Self {
841 self.router
842 .add_rule(RouteRule::new(PatternMatcher::exact(task_name), queue));
843 self
844 }
845
846 #[must_use]
848 pub fn direct_route(mut self, task_name: &str, queue: &str) -> Self {
849 self.router
850 .add_direct_route(task_name, RouteResult::new(queue));
851 self
852 }
853
854 #[must_use]
875 pub fn route_with_args(
876 mut self,
877 matcher: PatternMatcher,
878 queue: &str,
879 condition: ArgumentCondition,
880 ) -> Self {
881 self.router
882 .add_rule(RouteRule::new(matcher, queue).with_argument_condition(condition));
883 self
884 }
885
886 #[must_use]
888 pub fn route_with_args_priority(
889 mut self,
890 matcher: PatternMatcher,
891 queue: &str,
892 condition: ArgumentCondition,
893 priority: i32,
894 ) -> Self {
895 self.router.add_rule(
896 RouteRule::new(matcher, queue)
897 .with_argument_condition(condition)
898 .with_priority(priority),
899 );
900 self
901 }
902
903 #[must_use]
905 pub fn default_queue(mut self, queue: &str) -> Self {
906 self.router.set_default_queue(queue);
907 self
908 }
909
910 #[must_use]
912 pub fn build(self) -> Router {
913 self.router
914 }
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize)]
919pub struct RoutingConfig {
920 #[serde(default)]
922 pub default_queue: Option<String>,
923 #[serde(default)]
925 pub routes: HashMap<String, String>,
926 #[serde(default)]
928 pub task_routes: HashMap<String, String>,
929}
930
931impl RoutingConfig {
932 #[must_use]
934 pub fn new() -> Self {
935 Self {
936 default_queue: None,
937 routes: HashMap::new(),
938 task_routes: HashMap::new(),
939 }
940 }
941
942 #[must_use]
944 pub fn into_router(self) -> Router {
945 let mut router = match self.default_queue {
946 Some(queue) => Router::with_default_queue(queue),
947 None => Router::new(),
948 };
949
950 for (pattern, queue) in self.routes {
952 router.add_rule(RouteRule::new(PatternMatcher::glob(&pattern), queue));
953 }
954
955 for (task_name, queue) in self.task_routes {
957 router.add_direct_route(task_name, RouteResult::new(queue));
958 }
959
960 router
961 }
962}
963
964impl Default for RoutingConfig {
965 fn default() -> Self {
966 Self::new()
967 }
968}
969
970#[derive(Debug, Clone)]
999pub struct TopicPattern {
1000 pattern: String,
1001 segments: Vec<TopicSegment>,
1002}
1003
1004#[derive(Debug, Clone, PartialEq)]
1005enum TopicSegment {
1006 Literal(String),
1008 Star,
1010 Hash,
1012}
1013
1014impl TopicPattern {
1015 pub fn new(pattern: impl Into<String>) -> Self {
1017 let pattern = pattern.into();
1018 let segments = Self::parse(&pattern);
1019 Self { pattern, segments }
1020 }
1021
1022 fn parse(pattern: &str) -> Vec<TopicSegment> {
1023 pattern
1024 .split('.')
1025 .map(|s| match s {
1026 "*" => TopicSegment::Star,
1027 "#" => TopicSegment::Hash,
1028 literal => TopicSegment::Literal(literal.to_string()),
1029 })
1030 .collect()
1031 }
1032
1033 #[inline]
1035 #[must_use]
1036 pub fn matches(&self, routing_key: &str) -> bool {
1037 let key_parts: Vec<&str> = routing_key.split('.').collect();
1038 self.matches_parts(&key_parts, 0, 0)
1039 }
1040
1041 fn matches_parts(&self, key_parts: &[&str], key_idx: usize, pattern_idx: usize) -> bool {
1042 if pattern_idx >= self.segments.len() {
1044 return key_idx >= key_parts.len();
1045 }
1046
1047 if key_idx >= key_parts.len() {
1048 return self.segments[pattern_idx..]
1050 .iter()
1051 .all(|seg| matches!(seg, TopicSegment::Hash));
1052 }
1053
1054 match &self.segments[pattern_idx] {
1055 TopicSegment::Literal(literal) => {
1056 if key_parts[key_idx] == literal {
1057 self.matches_parts(key_parts, key_idx + 1, pattern_idx + 1)
1058 } else {
1059 false
1060 }
1061 }
1062 TopicSegment::Star => {
1063 self.matches_parts(key_parts, key_idx + 1, pattern_idx + 1)
1065 }
1066 TopicSegment::Hash => {
1067 if self.matches_parts(key_parts, key_idx, pattern_idx + 1) {
1070 return true;
1071 }
1072 for i in key_idx..key_parts.len() {
1074 if self.matches_parts(key_parts, i + 1, pattern_idx + 1) {
1075 return true;
1076 }
1077 }
1078 pattern_idx + 1 >= self.segments.len()
1080 }
1081 }
1082 }
1083
1084 #[inline]
1086 #[must_use]
1087 pub fn pattern(&self) -> &str {
1088 &self.pattern
1089 }
1090
1091 #[inline]
1093 #[must_use]
1094 pub const fn complexity(&self) -> usize {
1095 self.segments.len()
1096 }
1097
1098 #[inline]
1100 #[must_use]
1101 pub fn has_wildcards(&self) -> bool {
1102 self.segments
1103 .iter()
1104 .any(|s| matches!(s, TopicSegment::Star | TopicSegment::Hash))
1105 }
1106
1107 #[inline]
1109 #[must_use]
1110 pub fn is_exact(&self) -> bool {
1111 !self.has_wildcards()
1112 }
1113}
1114
1115#[derive(Debug, Clone)]
1119pub struct TopicRouter {
1120 bindings: Vec<(TopicPattern, String)>,
1122
1123 default_queue: Option<String>,
1125}
1126
1127impl TopicRouter {
1128 #[must_use]
1130 pub fn new() -> Self {
1131 Self {
1132 bindings: Vec::new(),
1133 default_queue: None,
1134 }
1135 }
1136
1137 pub fn bind(&mut self, pattern: impl Into<String>, queue: impl Into<String>) {
1139 let pattern = TopicPattern::new(pattern);
1140 self.bindings.push((pattern, queue.into()));
1141 }
1142
1143 pub fn bind_many(&mut self, patterns: Vec<String>, queue: impl Into<String>) {
1145 let queue = queue.into();
1146 for pattern in patterns {
1147 self.bind(pattern, queue.clone());
1148 }
1149 }
1150
1151 pub fn set_default_queue(&mut self, queue: impl Into<String>) {
1153 self.default_queue = Some(queue.into());
1154 }
1155
1156 #[must_use]
1160 pub fn route(&self, routing_key: &str) -> Option<String> {
1161 for (pattern, queue) in &self.bindings {
1162 if pattern.matches(routing_key) {
1163 return Some(queue.clone());
1164 }
1165 }
1166
1167 self.default_queue.clone()
1168 }
1169
1170 #[must_use]
1172 pub fn route_all(&self, routing_key: &str) -> Vec<String> {
1173 self.bindings
1174 .iter()
1175 .filter(|(pattern, _)| pattern.matches(routing_key))
1176 .map(|(_, queue)| queue.clone())
1177 .collect()
1178 }
1179
1180 pub fn unbind_queue(&mut self, queue: &str) -> usize {
1182 let original_len = self.bindings.len();
1183 self.bindings.retain(|(_, q)| q != queue);
1184 original_len - self.bindings.len()
1185 }
1186
1187 pub fn unbind_pattern(&mut self, pattern: &str) -> bool {
1189 let original_len = self.bindings.len();
1190 self.bindings.retain(|(p, _)| p.pattern() != pattern);
1191 self.bindings.len() < original_len
1192 }
1193
1194 #[must_use]
1196 pub fn bindings(&self) -> Vec<(String, String)> {
1197 self.bindings
1198 .iter()
1199 .map(|(pattern, queue)| (pattern.pattern().to_string(), queue.clone()))
1200 .collect()
1201 }
1202
1203 pub fn clear(&mut self) {
1205 self.bindings.clear();
1206 self.default_queue = None;
1207 }
1208
1209 #[must_use]
1211 pub const fn binding_count(&self) -> usize {
1212 self.bindings.len()
1213 }
1214
1215 #[inline]
1217 #[must_use]
1218 pub fn has_match(&self, routing_key: &str) -> bool {
1219 self.bindings.iter().any(|(p, _)| p.matches(routing_key)) || self.default_queue.is_some()
1220 }
1221}
1222
1223impl Default for TopicRouter {
1224 fn default() -> Self {
1225 Self::new()
1226 }
1227}
1228
1229#[derive(Debug, Clone, Serialize, Deserialize)]
1231pub struct TopicExchangeConfig {
1232 pub name: String,
1234
1235 pub bindings: HashMap<String, String>,
1237
1238 #[serde(skip_serializing_if = "Option::is_none")]
1240 pub default_queue: Option<String>,
1241
1242 #[serde(default = "default_true")]
1244 pub durable: bool,
1245
1246 #[serde(default)]
1248 pub auto_delete: bool,
1249}
1250
1251fn default_true() -> bool {
1252 true
1253}
1254
1255impl TopicExchangeConfig {
1256 pub fn new(name: impl Into<String>) -> Self {
1258 Self {
1259 name: name.into(),
1260 bindings: HashMap::new(),
1261 default_queue: None,
1262 durable: true,
1263 auto_delete: false,
1264 }
1265 }
1266
1267 #[must_use]
1269 pub fn with_binding(mut self, pattern: impl Into<String>, queue: impl Into<String>) -> Self {
1270 self.bindings.insert(pattern.into(), queue.into());
1271 self
1272 }
1273
1274 #[must_use]
1276 pub fn with_default_queue(mut self, queue: impl Into<String>) -> Self {
1277 self.default_queue = Some(queue.into());
1278 self
1279 }
1280
1281 #[must_use]
1283 pub fn with_durable(mut self, durable: bool) -> Self {
1284 self.durable = durable;
1285 self
1286 }
1287
1288 #[must_use]
1290 pub fn build_router(&self) -> TopicRouter {
1291 let mut router = TopicRouter::new();
1292
1293 for (pattern, queue) in &self.bindings {
1294 router.bind(pattern.clone(), queue.clone());
1295 }
1296
1297 if let Some(ref default_queue) = self.default_queue {
1298 router.set_default_queue(default_queue.clone());
1299 }
1300
1301 router
1302 }
1303}
1304
1305#[cfg(test)]
1306mod tests {
1307 use super::*;
1308
1309 #[test]
1310 fn test_exact_pattern() {
1311 let matcher = PatternMatcher::exact("tasks.add");
1312 assert!(matcher.matches("tasks.add"));
1313 assert!(!matcher.matches("tasks.multiply"));
1314 assert!(!matcher.matches("tasks"));
1315 }
1316
1317 #[test]
1318 fn test_glob_pattern() {
1319 let matcher = PatternMatcher::glob("tasks.*");
1320 assert!(matcher.matches("tasks.add"));
1321 assert!(matcher.matches("tasks.multiply"));
1322 assert!(!matcher.matches("other.task"));
1323 assert!(!matcher.matches("tasks"));
1324
1325 let matcher = PatternMatcher::glob("*.add");
1326 assert!(matcher.matches("tasks.add"));
1327 assert!(matcher.matches("math.add"));
1328 assert!(!matcher.matches("tasks.multiply"));
1329
1330 let matcher = PatternMatcher::glob("task?");
1331 assert!(matcher.matches("task1"));
1332 assert!(matcher.matches("taskA"));
1333 assert!(matcher.matches("tasks")); assert!(!matcher.matches("task")); assert!(!matcher.matches("task12")); }
1337
1338 #[test]
1339 fn test_regex_pattern() {
1340 let matcher = PatternMatcher::regex(r"tasks\.[a-z]+").unwrap();
1341 assert!(matcher.matches("tasks.add"));
1342 assert!(matcher.matches("tasks.multiply"));
1343 assert!(!matcher.matches("tasks.Add"));
1344 assert!(!matcher.matches("tasks.123"));
1345
1346 let matcher = PatternMatcher::regex(r"^(email|sms)\.").unwrap();
1347 assert!(matcher.matches("email.send"));
1348 assert!(matcher.matches("sms.send"));
1349 assert!(!matcher.matches("push.send"));
1350 }
1351
1352 #[test]
1353 fn test_all_pattern() {
1354 let matcher = PatternMatcher::all();
1355 assert!(matcher.matches("anything"));
1356 assert!(matcher.matches(""));
1357 assert!(matcher.matches("complex.task.name"));
1358 }
1359
1360 #[test]
1361 fn test_router_basic() {
1362 let mut router = Router::new();
1363 router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1364 router.add_rule(RouteRule::new(PatternMatcher::glob("sms.*"), "sms"));
1365
1366 assert_eq!(router.route("email.send"), Some("email".to_string()));
1367 assert_eq!(router.route("sms.notify"), Some("sms".to_string()));
1368 assert_eq!(router.route("push.notify"), None);
1369 }
1370
1371 #[test]
1372 fn test_router_with_default() {
1373 let mut router = Router::with_default_queue("default");
1374 router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1375
1376 assert_eq!(router.route("email.send"), Some("email".to_string()));
1377 assert_eq!(router.route("other.task"), Some("default".to_string()));
1378 }
1379
1380 #[test]
1381 fn test_router_priority() {
1382 let mut router = Router::new();
1383 router.add_rule(RouteRule::new(PatternMatcher::glob("*"), "default").with_priority(0));
1384 router
1385 .add_rule(RouteRule::new(PatternMatcher::glob("urgent.*"), "urgent").with_priority(10));
1386
1387 assert_eq!(router.route("urgent.email"), Some("urgent".to_string()));
1389 assert_eq!(router.route("email.send"), Some("default".to_string()));
1391 }
1392
1393 #[test]
1394 fn test_router_direct_route() {
1395 let mut router = Router::new();
1396 router.add_rule(RouteRule::new(PatternMatcher::glob("tasks.*"), "tasks"));
1397 router.add_direct_route("tasks.special", RouteResult::new("special"));
1398
1399 assert_eq!(router.route("tasks.special"), Some("special".to_string()));
1401 assert_eq!(router.route("tasks.normal"), Some("tasks".to_string()));
1403 }
1404
1405 #[test]
1406 fn test_route_result() {
1407 let mut router = Router::new();
1408 router.add_rule(
1409 RouteRule::new(PatternMatcher::glob("amqp.*"), "amqp_queue")
1410 .with_routing_key("amqp.routing")
1411 .with_exchange("amqp_exchange"),
1412 );
1413
1414 let result = router.route_full("amqp.task").unwrap();
1415 assert_eq!(result.queue, "amqp_queue");
1416 assert_eq!(result.routing_key, Some("amqp.routing".to_string()));
1417 assert_eq!(result.exchange, Some("amqp_exchange".to_string()));
1418 }
1419
1420 #[test]
1421 fn test_router_builder() {
1422 let router = RouterBuilder::new()
1423 .route_glob("email.*", "email")
1424 .route_glob("sms.*", "sms")
1425 .direct_route("special.task", "special")
1426 .default_queue("default")
1427 .build();
1428
1429 assert_eq!(router.route("email.send"), Some("email".to_string()));
1430 assert_eq!(router.route("sms.notify"), Some("sms".to_string()));
1431 assert_eq!(router.route("special.task"), Some("special".to_string()));
1432 assert_eq!(router.route("other.task"), Some("default".to_string()));
1433 }
1434
1435 #[test]
1436 fn test_routing_config() {
1437 let mut config = RoutingConfig::new();
1438 config.default_queue = Some("default".to_string());
1439 config
1440 .routes
1441 .insert("email.*".to_string(), "email".to_string());
1442 config
1443 .task_routes
1444 .insert("special.task".to_string(), "special".to_string());
1445
1446 let router = config.into_router();
1447 assert_eq!(router.route("email.send"), Some("email".to_string()));
1448 assert_eq!(router.route("special.task"), Some("special".to_string()));
1449 assert_eq!(router.route("other.task"), Some("default".to_string()));
1450 }
1451
1452 #[test]
1453 fn test_routing_config_serialization() {
1454 let mut config = RoutingConfig::new();
1455 config.default_queue = Some("default".to_string());
1456 config
1457 .routes
1458 .insert("email.*".to_string(), "email".to_string());
1459
1460 let json = serde_json::to_string(&config).unwrap();
1461 let parsed: RoutingConfig = serde_json::from_str(&json).unwrap();
1462
1463 assert_eq!(parsed.default_queue, Some("default".to_string()));
1464 assert_eq!(parsed.routes.get("email.*"), Some(&"email".to_string()));
1465 }
1466
1467 #[test]
1468 fn test_glob_special_chars() {
1469 let matcher = PatternMatcher::glob("tasks.v1.0");
1471 assert!(matcher.matches("tasks.v1.0"));
1472 assert!(!matcher.matches("tasks.v1x0"));
1473
1474 let matcher = PatternMatcher::glob("(test)");
1475 assert!(matcher.matches("(test)"));
1476 assert!(!matcher.matches("test"));
1477 }
1478
1479 #[test]
1480 fn test_has_route() {
1481 let mut router = Router::new();
1482 router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1483
1484 assert!(router.has_route("email.send"));
1485 assert!(!router.has_route("sms.send"));
1486
1487 router.set_default_queue("default");
1488 assert!(router.has_route("sms.send"));
1489 }
1490
1491 #[test]
1492 fn test_remove_rules() {
1493 let mut router = Router::new();
1494 router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1495 router.add_rule(RouteRule::new(PatternMatcher::glob("sms.*"), "sms"));
1496
1497 router.remove_rules_by_queue("email");
1498 assert_eq!(router.route("email.send"), None);
1499 assert_eq!(router.route("sms.send"), Some("sms".to_string()));
1500 }
1501
1502 #[test]
1503 fn test_clear() {
1504 let mut router = Router::new();
1505 router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1506 router.add_direct_route("special", RouteResult::new("special"));
1507
1508 router.clear();
1509 assert_eq!(router.route("email.send"), None);
1510 assert_eq!(router.route("special"), None);
1511 }
1512}