1use std::collections::{HashMap, VecDeque};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum RoutingStrategy {
16 ContentBased { field: String, value: String },
18 TopicExact(String),
20 TopicRegex(String),
22 HeaderBased { key: String, value: String },
24 RoundRobin(Vec<String>),
26}
27
28#[derive(Debug, Clone)]
30pub struct RoutingRule {
31 pub id: String,
33 pub priority: u32,
35 pub strategy: RoutingStrategy,
37 pub destination: String,
39 pub enabled: bool,
41}
42
43#[derive(Debug, Clone)]
45pub struct RoutableMessage {
46 pub id: String,
48 pub topic: String,
50 pub headers: HashMap<String, String>,
52 pub payload_fields: HashMap<String, String>,
54 pub payload: Vec<u8>,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum RoutingOutcome {
61 Routed {
63 destination: String,
64 rule_id: String,
65 },
66 DeadLettered,
68}
69
70#[derive(Debug, Clone, Default)]
72pub struct RoutingStats {
73 pub total_evaluated: u64,
75 pub total_routed: u64,
77 pub total_dead_lettered: u64,
79 pub per_destination: HashMap<String, u64>,
81 pub per_rule: HashMap<String, u64>,
83}
84
85#[derive(Debug, Clone)]
87pub struct RouterConfig {
88 pub dlq_capacity: usize,
90 pub enable_dlq: bool,
92}
93
94impl Default for RouterConfig {
95 fn default() -> Self {
96 Self {
97 dlq_capacity: 10_000,
98 enable_dlq: true,
99 }
100 }
101}
102
103#[derive(Debug)]
105pub enum RouterError {
106 DuplicateRuleId(String),
108 RuleNotFound(String),
110 DlqFull,
112 InvalidRegex(String),
114}
115
116impl std::fmt::Display for RouterError {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 match self {
119 RouterError::DuplicateRuleId(id) => write!(f, "duplicate rule id: {}", id),
120 RouterError::RuleNotFound(id) => write!(f, "rule not found: {}", id),
121 RouterError::DlqFull => write!(f, "dead letter queue is full"),
122 RouterError::InvalidRegex(pat) => write!(f, "invalid regex pattern: {}", pat),
123 }
124 }
125}
126
127impl std::error::Error for RouterError {}
128
129pub struct StreamRouter {
136 config: RouterConfig,
137 rules: Vec<RoutingRule>,
139 dlq: VecDeque<RoutableMessage>,
141 stats: RoutingStats,
143 round_robin_counters: HashMap<String, usize>,
145}
146
147impl StreamRouter {
148 pub fn new(config: RouterConfig) -> Self {
150 Self {
151 config,
152 rules: Vec::new(),
153 dlq: VecDeque::new(),
154 stats: RoutingStats::default(),
155 round_robin_counters: HashMap::new(),
156 }
157 }
158
159 pub fn with_defaults() -> Self {
161 Self::new(RouterConfig::default())
162 }
163
164 pub fn add_rule(&mut self, rule: RoutingRule) -> Result<(), RouterError> {
166 if self.rules.iter().any(|r| r.id == rule.id) {
167 return Err(RouterError::DuplicateRuleId(rule.id));
168 }
169 if let RoutingStrategy::TopicRegex(ref pat) = rule.strategy {
171 Self::compile_regex(pat)?;
172 }
173 self.rules.push(rule);
174 self.rules.sort_by_key(|b| std::cmp::Reverse(b.priority));
175 Ok(())
176 }
177
178 pub fn remove_rule(&mut self, rule_id: &str) -> Result<RoutingRule, RouterError> {
180 let idx = self
181 .rules
182 .iter()
183 .position(|r| r.id == rule_id)
184 .ok_or_else(|| RouterError::RuleNotFound(rule_id.to_string()))?;
185 let removed = self.rules.remove(idx);
186 self.round_robin_counters.remove(rule_id);
187 Ok(removed)
188 }
189
190 pub fn update_rule(&mut self, rule: RoutingRule) -> Result<(), RouterError> {
192 let idx = self
193 .rules
194 .iter()
195 .position(|r| r.id == rule.id)
196 .ok_or_else(|| RouterError::RuleNotFound(rule.id.clone()))?;
197 if let RoutingStrategy::TopicRegex(ref pat) = rule.strategy {
199 Self::compile_regex(pat)?;
200 }
201 self.rules[idx] = rule;
202 self.rules.sort_by_key(|b| std::cmp::Reverse(b.priority));
203 Ok(())
204 }
205
206 pub fn set_rule_enabled(&mut self, rule_id: &str, enabled: bool) -> Result<(), RouterError> {
208 let rule = self
209 .rules
210 .iter_mut()
211 .find(|r| r.id == rule_id)
212 .ok_or_else(|| RouterError::RuleNotFound(rule_id.to_string()))?;
213 rule.enabled = enabled;
214 Ok(())
215 }
216
217 pub fn stats(&self) -> &RoutingStats {
219 &self.stats
220 }
221
222 pub fn active_rule_count(&self) -> usize {
224 self.rules.iter().filter(|r| r.enabled).count()
225 }
226
227 pub fn rule_count(&self) -> usize {
229 self.rules.len()
230 }
231
232 pub fn dlq(&self) -> &VecDeque<RoutableMessage> {
234 &self.dlq
235 }
236
237 pub fn pop_dlq(&mut self) -> Option<RoutableMessage> {
239 self.dlq.pop_front()
240 }
241
242 pub fn clear_dlq(&mut self) {
244 self.dlq.clear();
245 }
246
247 pub fn route(&mut self, message: RoutableMessage) -> RoutingOutcome {
252 self.stats.total_evaluated += 1;
253
254 let matched_idx = self
257 .rules
258 .iter()
259 .position(|rule| rule.enabled && Self::matches_rule_static(rule, &message));
260
261 if let Some(idx) = matched_idx {
262 let rule_id = self.rules[idx].id.clone();
263 let destination = self.resolve_destination_by_index(idx);
264 self.stats.total_routed += 1;
265 *self
266 .stats
267 .per_destination
268 .entry(destination.clone())
269 .or_insert(0) += 1;
270 *self.stats.per_rule.entry(rule_id.clone()).or_insert(0) += 1;
271 return RoutingOutcome::Routed {
272 destination,
273 rule_id,
274 };
275 }
276
277 self.stats.total_dead_lettered += 1;
279 if self.config.enable_dlq {
280 if self.dlq.len() >= self.config.dlq_capacity {
281 self.dlq.pop_front();
283 }
284 self.dlq.push_back(message);
285 }
286 RoutingOutcome::DeadLettered
287 }
288
289 pub fn route_batch(&mut self, messages: Vec<RoutableMessage>) -> Vec<RoutingOutcome> {
291 messages.into_iter().map(|m| self.route(m)).collect()
292 }
293
294 pub fn reset_stats(&mut self) {
296 self.stats = RoutingStats::default();
297 }
298
299 pub fn rule_ids(&self) -> Vec<&str> {
301 self.rules.iter().map(|r| r.id.as_str()).collect()
302 }
303
304 pub fn get_rule(&self, rule_id: &str) -> Option<&RoutingRule> {
306 self.rules.iter().find(|r| r.id == rule_id)
307 }
308
309 fn matches_rule_static(rule: &RoutingRule, message: &RoutableMessage) -> bool {
313 match &rule.strategy {
314 RoutingStrategy::ContentBased { field, value } => {
315 message.payload_fields.get(field) == Some(value)
316 }
317 RoutingStrategy::TopicExact(topic) => message.topic == *topic,
318 RoutingStrategy::TopicRegex(pattern) => Self::regex_matches(pattern, &message.topic),
319 RoutingStrategy::HeaderBased { key, value } => message.headers.get(key) == Some(value),
320 RoutingStrategy::RoundRobin(destinations) => !destinations.is_empty(),
321 }
322 }
323
324 fn resolve_destination_by_index(&mut self, idx: usize) -> String {
326 let rule = &self.rules[idx];
327 match &rule.strategy {
328 RoutingStrategy::RoundRobin(destinations) if !destinations.is_empty() => {
329 let rule_id = rule.id.clone();
330 let counter = self.round_robin_counters.entry(rule_id).or_insert(0);
331 let dest_idx = *counter % destinations.len();
332 *counter = counter.wrapping_add(1);
333 destinations[dest_idx].clone()
334 }
335 _ => rule.destination.clone(),
336 }
337 }
338
339 fn compile_regex(pattern: &str) -> Result<(), RouterError> {
341 if pattern.is_empty() {
344 return Err(RouterError::InvalidRegex("empty pattern".to_string()));
345 }
346 let mut depth: i32 = 0;
348 for ch in pattern.chars() {
349 match ch {
350 '(' => depth += 1,
351 ')' => depth -= 1,
352 _ => {}
353 }
354 if depth < 0 {
355 return Err(RouterError::InvalidRegex(
356 "unbalanced parentheses".to_string(),
357 ));
358 }
359 }
360 if depth != 0 {
361 return Err(RouterError::InvalidRegex(
362 "unbalanced parentheses".to_string(),
363 ));
364 }
365 Ok(())
366 }
367
368 fn regex_matches(pattern: &str, input: &str) -> bool {
375 let anchored_start = pattern.starts_with('^');
377 let anchored_end = pattern.ends_with('$');
378
379 let trimmed = pattern.trim_start_matches('^').trim_end_matches('$');
380
381 if trimmed.is_empty() {
382 return input.is_empty();
383 }
384
385 let segments: Vec<&str> = trimmed.split('*').collect();
387
388 if segments.len() == 1 {
389 let seg = segments[0];
391 if anchored_start && anchored_end {
392 return input == seg;
393 }
394 if anchored_start {
395 return input.starts_with(seg);
396 }
397 if anchored_end {
398 return input.ends_with(seg);
399 }
400 return input.contains(seg);
401 }
402
403 let mut pos = 0usize;
405
406 for (i, seg) in segments.iter().enumerate() {
407 if seg.is_empty() {
408 continue;
409 }
410 if i == 0 && anchored_start {
411 if !input.starts_with(seg) {
412 return false;
413 }
414 pos = seg.len();
415 continue;
416 }
417
418 match input[pos..].find(seg) {
419 Some(found) => {
420 pos += found + seg.len();
421 }
422 None => return false,
423 }
424 }
425
426 if anchored_end {
427 let last_seg = segments.last().unwrap_or(&"");
428 if last_seg.is_empty() {
429 return true; }
431 return input.ends_with(last_seg);
432 }
433
434 true
435 }
436}
437
438#[cfg(test)]
443mod tests {
444 use super::*;
445
446 fn make_message(id: &str, topic: &str) -> RoutableMessage {
447 RoutableMessage {
448 id: id.to_string(),
449 topic: topic.to_string(),
450 headers: HashMap::new(),
451 payload_fields: HashMap::new(),
452 payload: Vec::new(),
453 }
454 }
455
456 fn make_message_with_header(id: &str, topic: &str, key: &str, val: &str) -> RoutableMessage {
457 let mut msg = make_message(id, topic);
458 msg.headers.insert(key.to_string(), val.to_string());
459 msg
460 }
461
462 fn make_message_with_field(id: &str, topic: &str, key: &str, val: &str) -> RoutableMessage {
463 let mut msg = make_message(id, topic);
464 msg.payload_fields.insert(key.to_string(), val.to_string());
465 msg
466 }
467
468 #[test]
471 fn test_router_creation_defaults() {
472 let router = StreamRouter::with_defaults();
473 assert_eq!(router.rule_count(), 0);
474 assert_eq!(router.active_rule_count(), 0);
475 assert_eq!(router.stats().total_evaluated, 0);
476 }
477
478 #[test]
479 fn test_router_creation_custom_config() {
480 let cfg = RouterConfig {
481 dlq_capacity: 500,
482 enable_dlq: false,
483 };
484 let router = StreamRouter::new(cfg);
485 assert_eq!(router.rule_count(), 0);
486 assert!(router.dlq().is_empty());
487 }
488
489 #[test]
492 fn test_add_rule() {
493 let mut router = StreamRouter::with_defaults();
494 let rule = RoutingRule {
495 id: "r1".to_string(),
496 priority: 10,
497 strategy: RoutingStrategy::TopicExact("orders".to_string()),
498 destination: "order-queue".to_string(),
499 enabled: true,
500 };
501 assert!(router.add_rule(rule).is_ok());
502 assert_eq!(router.rule_count(), 1);
503 }
504
505 #[test]
506 fn test_add_duplicate_rule() {
507 let mut router = StreamRouter::with_defaults();
508 let rule = RoutingRule {
509 id: "r1".to_string(),
510 priority: 10,
511 strategy: RoutingStrategy::TopicExact("orders".to_string()),
512 destination: "q".to_string(),
513 enabled: true,
514 };
515 assert!(router.add_rule(rule.clone()).is_ok());
516 assert!(router.add_rule(rule).is_err());
517 }
518
519 #[test]
520 fn test_remove_rule() {
521 let mut router = StreamRouter::with_defaults();
522 let rule = RoutingRule {
523 id: "r1".to_string(),
524 priority: 10,
525 strategy: RoutingStrategy::TopicExact("x".to_string()),
526 destination: "y".to_string(),
527 enabled: true,
528 };
529 router.add_rule(rule).ok();
530 let removed = router.remove_rule("r1");
531 assert!(removed.is_ok());
532 assert_eq!(router.rule_count(), 0);
533 }
534
535 #[test]
536 fn test_remove_nonexistent_rule() {
537 let mut router = StreamRouter::with_defaults();
538 assert!(router.remove_rule("nope").is_err());
539 }
540
541 #[test]
542 fn test_update_rule() {
543 let mut router = StreamRouter::with_defaults();
544 let rule = RoutingRule {
545 id: "r1".to_string(),
546 priority: 5,
547 strategy: RoutingStrategy::TopicExact("a".to_string()),
548 destination: "dest1".to_string(),
549 enabled: true,
550 };
551 router.add_rule(rule).ok();
552
553 let updated = RoutingRule {
554 id: "r1".to_string(),
555 priority: 20,
556 strategy: RoutingStrategy::TopicExact("b".to_string()),
557 destination: "dest2".to_string(),
558 enabled: true,
559 };
560 assert!(router.update_rule(updated).is_ok());
561 assert_eq!(
562 router.get_rule("r1").map(|r| &r.destination),
563 Some(&"dest2".to_string())
564 );
565 }
566
567 #[test]
568 fn test_update_nonexistent_rule() {
569 let mut router = StreamRouter::with_defaults();
570 let rule = RoutingRule {
571 id: "nope".to_string(),
572 priority: 1,
573 strategy: RoutingStrategy::TopicExact("x".to_string()),
574 destination: "y".to_string(),
575 enabled: true,
576 };
577 assert!(router.update_rule(rule).is_err());
578 }
579
580 #[test]
581 fn test_enable_disable_rule() {
582 let mut router = StreamRouter::with_defaults();
583 let rule = RoutingRule {
584 id: "r1".to_string(),
585 priority: 1,
586 strategy: RoutingStrategy::TopicExact("x".to_string()),
587 destination: "y".to_string(),
588 enabled: true,
589 };
590 router.add_rule(rule).ok();
591 assert_eq!(router.active_rule_count(), 1);
592
593 router.set_rule_enabled("r1", false).ok();
594 assert_eq!(router.active_rule_count(), 0);
595
596 router.set_rule_enabled("r1", true).ok();
597 assert_eq!(router.active_rule_count(), 1);
598 }
599
600 #[test]
601 fn test_enable_nonexistent_rule() {
602 let mut router = StreamRouter::with_defaults();
603 assert!(router.set_rule_enabled("nope", true).is_err());
604 }
605
606 #[test]
609 fn test_topic_exact_match() {
610 let mut router = StreamRouter::with_defaults();
611 let rule = RoutingRule {
612 id: "r1".to_string(),
613 priority: 10,
614 strategy: RoutingStrategy::TopicExact("orders".to_string()),
615 destination: "order-queue".to_string(),
616 enabled: true,
617 };
618 router.add_rule(rule).ok();
619
620 let msg = make_message("m1", "orders");
621 let outcome = router.route(msg);
622 assert_eq!(
623 outcome,
624 RoutingOutcome::Routed {
625 destination: "order-queue".to_string(),
626 rule_id: "r1".to_string()
627 }
628 );
629 }
630
631 #[test]
632 fn test_topic_exact_no_match() {
633 let mut router = StreamRouter::with_defaults();
634 let rule = RoutingRule {
635 id: "r1".to_string(),
636 priority: 10,
637 strategy: RoutingStrategy::TopicExact("orders".to_string()),
638 destination: "order-queue".to_string(),
639 enabled: true,
640 };
641 router.add_rule(rule).ok();
642
643 let msg = make_message("m1", "payments");
644 let outcome = router.route(msg);
645 assert_eq!(outcome, RoutingOutcome::DeadLettered);
646 }
647
648 #[test]
651 fn test_topic_regex_wildcard() {
652 let mut router = StreamRouter::with_defaults();
653 let rule = RoutingRule {
654 id: "r1".to_string(),
655 priority: 10,
656 strategy: RoutingStrategy::TopicRegex("orders.*".to_string()),
657 destination: "all-orders".to_string(),
658 enabled: true,
659 };
660 router.add_rule(rule).ok();
661
662 let msg = make_message("m1", "orders.us");
663 assert!(matches!(router.route(msg), RoutingOutcome::Routed { .. }));
664 }
665
666 #[test]
667 fn test_topic_regex_anchored() {
668 let mut router = StreamRouter::with_defaults();
669 let rule = RoutingRule {
670 id: "r1".to_string(),
671 priority: 10,
672 strategy: RoutingStrategy::TopicRegex("^orders$".to_string()),
673 destination: "exact-orders".to_string(),
674 enabled: true,
675 };
676 router.add_rule(rule).ok();
677
678 let match_msg = make_message("m1", "orders");
679 assert!(matches!(
680 router.route(match_msg),
681 RoutingOutcome::Routed { .. }
682 ));
683
684 let no_match = make_message("m2", "orders.eu");
685 assert_eq!(router.route(no_match), RoutingOutcome::DeadLettered);
686 }
687
688 #[test]
689 fn test_topic_regex_contains() {
690 let mut router = StreamRouter::with_defaults();
691 let rule = RoutingRule {
692 id: "r1".to_string(),
693 priority: 10,
694 strategy: RoutingStrategy::TopicRegex("ship".to_string()),
695 destination: "shipping".to_string(),
696 enabled: true,
697 };
698 router.add_rule(rule).ok();
699
700 let msg = make_message("m1", "order-shipping-us");
701 assert!(matches!(router.route(msg), RoutingOutcome::Routed { .. }));
702 }
703
704 #[test]
705 fn test_invalid_regex_rejected() {
706 let mut router = StreamRouter::with_defaults();
707 let rule = RoutingRule {
708 id: "r1".to_string(),
709 priority: 10,
710 strategy: RoutingStrategy::TopicRegex("(".to_string()),
711 destination: "dest".to_string(),
712 enabled: true,
713 };
714 assert!(router.add_rule(rule).is_err());
715 }
716
717 #[test]
718 fn test_empty_regex_rejected() {
719 let mut router = StreamRouter::with_defaults();
720 let rule = RoutingRule {
721 id: "r1".to_string(),
722 priority: 10,
723 strategy: RoutingStrategy::TopicRegex(String::new()),
724 destination: "dest".to_string(),
725 enabled: true,
726 };
727 assert!(router.add_rule(rule).is_err());
728 }
729
730 #[test]
733 fn test_content_based_match() {
734 let mut router = StreamRouter::with_defaults();
735 let rule = RoutingRule {
736 id: "r1".to_string(),
737 priority: 10,
738 strategy: RoutingStrategy::ContentBased {
739 field: "type".to_string(),
740 value: "order".to_string(),
741 },
742 destination: "order-queue".to_string(),
743 enabled: true,
744 };
745 router.add_rule(rule).ok();
746
747 let msg = make_message_with_field("m1", "events", "type", "order");
748 assert!(matches!(router.route(msg), RoutingOutcome::Routed { .. }));
749 }
750
751 #[test]
752 fn test_content_based_no_match() {
753 let mut router = StreamRouter::with_defaults();
754 let rule = RoutingRule {
755 id: "r1".to_string(),
756 priority: 10,
757 strategy: RoutingStrategy::ContentBased {
758 field: "type".to_string(),
759 value: "order".to_string(),
760 },
761 destination: "order-queue".to_string(),
762 enabled: true,
763 };
764 router.add_rule(rule).ok();
765
766 let msg = make_message_with_field("m1", "events", "type", "payment");
767 assert_eq!(router.route(msg), RoutingOutcome::DeadLettered);
768 }
769
770 #[test]
771 fn test_content_based_missing_field() {
772 let mut router = StreamRouter::with_defaults();
773 let rule = RoutingRule {
774 id: "r1".to_string(),
775 priority: 10,
776 strategy: RoutingStrategy::ContentBased {
777 field: "region".to_string(),
778 value: "us".to_string(),
779 },
780 destination: "us-queue".to_string(),
781 enabled: true,
782 };
783 router.add_rule(rule).ok();
784
785 let msg = make_message("m1", "events");
786 assert_eq!(router.route(msg), RoutingOutcome::DeadLettered);
787 }
788
789 #[test]
792 fn test_header_based_match() {
793 let mut router = StreamRouter::with_defaults();
794 let rule = RoutingRule {
795 id: "r1".to_string(),
796 priority: 10,
797 strategy: RoutingStrategy::HeaderBased {
798 key: "X-Priority".to_string(),
799 value: "high".to_string(),
800 },
801 destination: "priority-queue".to_string(),
802 enabled: true,
803 };
804 router.add_rule(rule).ok();
805
806 let msg = make_message_with_header("m1", "events", "X-Priority", "high");
807 assert!(matches!(router.route(msg), RoutingOutcome::Routed { .. }));
808 }
809
810 #[test]
811 fn test_header_based_no_match_wrong_value() {
812 let mut router = StreamRouter::with_defaults();
813 let rule = RoutingRule {
814 id: "r1".to_string(),
815 priority: 10,
816 strategy: RoutingStrategy::HeaderBased {
817 key: "X-Priority".to_string(),
818 value: "high".to_string(),
819 },
820 destination: "priority-queue".to_string(),
821 enabled: true,
822 };
823 router.add_rule(rule).ok();
824
825 let msg = make_message_with_header("m1", "events", "X-Priority", "low");
826 assert_eq!(router.route(msg), RoutingOutcome::DeadLettered);
827 }
828
829 #[test]
830 fn test_header_based_missing_header() {
831 let mut router = StreamRouter::with_defaults();
832 let rule = RoutingRule {
833 id: "r1".to_string(),
834 priority: 10,
835 strategy: RoutingStrategy::HeaderBased {
836 key: "X-Priority".to_string(),
837 value: "high".to_string(),
838 },
839 destination: "priority-queue".to_string(),
840 enabled: true,
841 };
842 router.add_rule(rule).ok();
843
844 let msg = make_message("m1", "events");
845 assert_eq!(router.route(msg), RoutingOutcome::DeadLettered);
846 }
847
848 #[test]
851 fn test_round_robin_distribution() {
852 let mut router = StreamRouter::with_defaults();
853 let rule = RoutingRule {
854 id: "rr1".to_string(),
855 priority: 10,
856 strategy: RoutingStrategy::RoundRobin(vec![
857 "dest-a".to_string(),
858 "dest-b".to_string(),
859 "dest-c".to_string(),
860 ]),
861 destination: String::new(), enabled: true,
863 };
864 router.add_rule(rule).ok();
865
866 let o1 = router.route(make_message("m1", "any"));
867 let o2 = router.route(make_message("m2", "any"));
868 let o3 = router.route(make_message("m3", "any"));
869 let o4 = router.route(make_message("m4", "any"));
870
871 assert_eq!(
872 o1,
873 RoutingOutcome::Routed {
874 destination: "dest-a".to_string(),
875 rule_id: "rr1".to_string()
876 }
877 );
878 assert_eq!(
879 o2,
880 RoutingOutcome::Routed {
881 destination: "dest-b".to_string(),
882 rule_id: "rr1".to_string()
883 }
884 );
885 assert_eq!(
886 o3,
887 RoutingOutcome::Routed {
888 destination: "dest-c".to_string(),
889 rule_id: "rr1".to_string()
890 }
891 );
892 assert_eq!(
893 o4,
894 RoutingOutcome::Routed {
895 destination: "dest-a".to_string(),
896 rule_id: "rr1".to_string()
897 }
898 );
899 }
900
901 #[test]
902 fn test_round_robin_empty_destinations() {
903 let mut router = StreamRouter::with_defaults();
904 let rule = RoutingRule {
905 id: "rr1".to_string(),
906 priority: 10,
907 strategy: RoutingStrategy::RoundRobin(vec![]),
908 destination: String::new(),
909 enabled: true,
910 };
911 router.add_rule(rule).ok();
912
913 let outcome = router.route(make_message("m1", "any"));
914 assert_eq!(outcome, RoutingOutcome::DeadLettered);
915 }
916
917 #[test]
920 fn test_priority_ordering() {
921 let mut router = StreamRouter::with_defaults();
922
923 let low = RoutingRule {
925 id: "low".to_string(),
926 priority: 1,
927 strategy: RoutingStrategy::TopicExact("orders".to_string()),
928 destination: "low-queue".to_string(),
929 enabled: true,
930 };
931 let high = RoutingRule {
933 id: "high".to_string(),
934 priority: 100,
935 strategy: RoutingStrategy::TopicExact("orders".to_string()),
936 destination: "high-queue".to_string(),
937 enabled: true,
938 };
939
940 router.add_rule(low).ok();
942 router.add_rule(high).ok();
943
944 let outcome = router.route(make_message("m1", "orders"));
945 assert_eq!(
946 outcome,
947 RoutingOutcome::Routed {
948 destination: "high-queue".to_string(),
949 rule_id: "high".to_string()
950 }
951 );
952 }
953
954 #[test]
955 fn test_disabled_rule_skipped_in_priority() {
956 let mut router = StreamRouter::with_defaults();
957 let high = RoutingRule {
958 id: "high".to_string(),
959 priority: 100,
960 strategy: RoutingStrategy::TopicExact("orders".to_string()),
961 destination: "high-queue".to_string(),
962 enabled: false, };
964 let low = RoutingRule {
965 id: "low".to_string(),
966 priority: 1,
967 strategy: RoutingStrategy::TopicExact("orders".to_string()),
968 destination: "low-queue".to_string(),
969 enabled: true,
970 };
971 router.add_rule(high).ok();
972 router.add_rule(low).ok();
973
974 let outcome = router.route(make_message("m1", "orders"));
975 assert_eq!(
976 outcome,
977 RoutingOutcome::Routed {
978 destination: "low-queue".to_string(),
979 rule_id: "low".to_string()
980 }
981 );
982 }
983
984 #[test]
987 fn test_dlq_receives_unroutable() {
988 let mut router = StreamRouter::with_defaults();
989 let msg = make_message("m1", "unknown-topic");
990 let outcome = router.route(msg);
991 assert_eq!(outcome, RoutingOutcome::DeadLettered);
992 assert_eq!(router.dlq().len(), 1);
993 }
994
995 #[test]
996 fn test_dlq_disabled() {
997 let cfg = RouterConfig {
998 dlq_capacity: 100,
999 enable_dlq: false,
1000 };
1001 let mut router = StreamRouter::new(cfg);
1002 let msg = make_message("m1", "unknown");
1003 router.route(msg);
1004 assert!(router.dlq().is_empty());
1005 }
1006
1007 #[test]
1008 fn test_dlq_capacity_eviction() {
1009 let cfg = RouterConfig {
1010 dlq_capacity: 2,
1011 enable_dlq: true,
1012 };
1013 let mut router = StreamRouter::new(cfg);
1014 router.route(make_message("m1", "x"));
1015 router.route(make_message("m2", "x"));
1016 router.route(make_message("m3", "x"));
1017 assert_eq!(router.dlq().len(), 2);
1018 assert_eq!(router.dlq().front().map(|m| m.id.as_str()), Some("m2"));
1020 }
1021
1022 #[test]
1023 fn test_pop_dlq() {
1024 let mut router = StreamRouter::with_defaults();
1025 router.route(make_message("m1", "x"));
1026 router.route(make_message("m2", "x"));
1027 let popped = router.pop_dlq();
1028 assert_eq!(popped.map(|m| m.id), Some("m1".to_string()));
1029 assert_eq!(router.dlq().len(), 1);
1030 }
1031
1032 #[test]
1033 fn test_clear_dlq() {
1034 let mut router = StreamRouter::with_defaults();
1035 router.route(make_message("m1", "x"));
1036 router.route(make_message("m2", "x"));
1037 router.clear_dlq();
1038 assert!(router.dlq().is_empty());
1039 }
1040
1041 #[test]
1044 fn test_stats_tracking() {
1045 let mut router = StreamRouter::with_defaults();
1046 let rule = RoutingRule {
1047 id: "r1".to_string(),
1048 priority: 10,
1049 strategy: RoutingStrategy::TopicExact("orders".to_string()),
1050 destination: "q".to_string(),
1051 enabled: true,
1052 };
1053 router.add_rule(rule).ok();
1054
1055 router.route(make_message("m1", "orders"));
1056 router.route(make_message("m2", "orders"));
1057 router.route(make_message("m3", "unknown"));
1058
1059 assert_eq!(router.stats().total_evaluated, 3);
1060 assert_eq!(router.stats().total_routed, 2);
1061 assert_eq!(router.stats().total_dead_lettered, 1);
1062 assert_eq!(router.stats().per_destination.get("q"), Some(&2));
1063 assert_eq!(router.stats().per_rule.get("r1"), Some(&2));
1064 }
1065
1066 #[test]
1067 fn test_reset_stats() {
1068 let mut router = StreamRouter::with_defaults();
1069 router.route(make_message("m1", "x"));
1070 router.reset_stats();
1071 assert_eq!(router.stats().total_evaluated, 0);
1072 assert_eq!(router.stats().total_routed, 0);
1073 assert_eq!(router.stats().total_dead_lettered, 0);
1074 }
1075
1076 #[test]
1079 fn test_batch_routing() {
1080 let mut router = StreamRouter::with_defaults();
1081 let rule = RoutingRule {
1082 id: "r1".to_string(),
1083 priority: 10,
1084 strategy: RoutingStrategy::TopicExact("orders".to_string()),
1085 destination: "q".to_string(),
1086 enabled: true,
1087 };
1088 router.add_rule(rule).ok();
1089
1090 let messages = vec![
1091 make_message("m1", "orders"),
1092 make_message("m2", "payments"),
1093 make_message("m3", "orders"),
1094 ];
1095 let outcomes = router.route_batch(messages);
1096 assert_eq!(outcomes.len(), 3);
1097 assert!(matches!(outcomes[0], RoutingOutcome::Routed { .. }));
1098 assert_eq!(outcomes[1], RoutingOutcome::DeadLettered);
1099 assert!(matches!(outcomes[2], RoutingOutcome::Routed { .. }));
1100 }
1101
1102 #[test]
1105 fn test_rule_ids_in_priority_order() {
1106 let mut router = StreamRouter::with_defaults();
1107 let r1 = RoutingRule {
1108 id: "low".to_string(),
1109 priority: 1,
1110 strategy: RoutingStrategy::TopicExact("a".to_string()),
1111 destination: "d".to_string(),
1112 enabled: true,
1113 };
1114 let r2 = RoutingRule {
1115 id: "high".to_string(),
1116 priority: 100,
1117 strategy: RoutingStrategy::TopicExact("b".to_string()),
1118 destination: "d".to_string(),
1119 enabled: true,
1120 };
1121 router.add_rule(r1).ok();
1122 router.add_rule(r2).ok();
1123 let ids = router.rule_ids();
1124 assert_eq!(ids, vec!["high", "low"]);
1125 }
1126
1127 #[test]
1128 fn test_get_rule() {
1129 let mut router = StreamRouter::with_defaults();
1130 let rule = RoutingRule {
1131 id: "r1".to_string(),
1132 priority: 5,
1133 strategy: RoutingStrategy::TopicExact("x".to_string()),
1134 destination: "y".to_string(),
1135 enabled: true,
1136 };
1137 router.add_rule(rule).ok();
1138 assert!(router.get_rule("r1").is_some());
1139 assert!(router.get_rule("nope").is_none());
1140 }
1141
1142 #[test]
1145 fn test_multiple_strategies() {
1146 let mut router = StreamRouter::with_defaults();
1147 let topic_rule = RoutingRule {
1148 id: "topic".to_string(),
1149 priority: 5,
1150 strategy: RoutingStrategy::TopicExact("orders".to_string()),
1151 destination: "topic-dest".to_string(),
1152 enabled: true,
1153 };
1154 let header_rule = RoutingRule {
1155 id: "header".to_string(),
1156 priority: 10,
1157 strategy: RoutingStrategy::HeaderBased {
1158 key: "X-VIP".to_string(),
1159 value: "true".to_string(),
1160 },
1161 destination: "vip-dest".to_string(),
1162 enabled: true,
1163 };
1164 router.add_rule(topic_rule).ok();
1165 router.add_rule(header_rule).ok();
1166
1167 let msg = make_message_with_header("m1", "orders", "X-VIP", "true");
1169 let outcome = router.route(msg);
1170 assert_eq!(
1171 outcome,
1172 RoutingOutcome::Routed {
1173 destination: "vip-dest".to_string(),
1174 rule_id: "header".to_string()
1175 }
1176 );
1177 }
1178
1179 #[test]
1180 fn test_fallthrough_to_lower_priority() {
1181 let mut router = StreamRouter::with_defaults();
1182 let header_rule = RoutingRule {
1183 id: "header".to_string(),
1184 priority: 10,
1185 strategy: RoutingStrategy::HeaderBased {
1186 key: "X-VIP".to_string(),
1187 value: "true".to_string(),
1188 },
1189 destination: "vip-dest".to_string(),
1190 enabled: true,
1191 };
1192 let topic_rule = RoutingRule {
1193 id: "topic".to_string(),
1194 priority: 1,
1195 strategy: RoutingStrategy::TopicExact("orders".to_string()),
1196 destination: "topic-dest".to_string(),
1197 enabled: true,
1198 };
1199 router.add_rule(header_rule).ok();
1200 router.add_rule(topic_rule).ok();
1201
1202 let msg = make_message("m1", "orders");
1204 let outcome = router.route(msg);
1205 assert_eq!(
1206 outcome,
1207 RoutingOutcome::Routed {
1208 destination: "topic-dest".to_string(),
1209 rule_id: "topic".to_string()
1210 }
1211 );
1212 }
1213
1214 #[test]
1217 fn test_regex_start_anchor() {
1218 let mut router = StreamRouter::with_defaults();
1219 let rule = RoutingRule {
1220 id: "r1".to_string(),
1221 priority: 10,
1222 strategy: RoutingStrategy::TopicRegex("^orders".to_string()),
1223 destination: "d".to_string(),
1224 enabled: true,
1225 };
1226 router.add_rule(rule).ok();
1227
1228 assert!(matches!(
1229 router.route(make_message("m", "orders.us")),
1230 RoutingOutcome::Routed { .. }
1231 ));
1232 assert_eq!(
1233 router.route(make_message("m", "pre-orders")),
1234 RoutingOutcome::DeadLettered
1235 );
1236 }
1237
1238 #[test]
1239 fn test_regex_end_anchor() {
1240 let mut router = StreamRouter::with_defaults();
1241 let rule = RoutingRule {
1242 id: "r1".to_string(),
1243 priority: 10,
1244 strategy: RoutingStrategy::TopicRegex("orders$".to_string()),
1245 destination: "d".to_string(),
1246 enabled: true,
1247 };
1248 router.add_rule(rule).ok();
1249
1250 assert!(matches!(
1251 router.route(make_message("m", "all-orders")),
1252 RoutingOutcome::Routed { .. }
1253 ));
1254 assert_eq!(
1255 router.route(make_message("m", "orders-new")),
1256 RoutingOutcome::DeadLettered
1257 );
1258 }
1259
1260 #[test]
1261 fn test_regex_wildcard_middle() {
1262 let mut router = StreamRouter::with_defaults();
1263 let rule = RoutingRule {
1264 id: "r1".to_string(),
1265 priority: 10,
1266 strategy: RoutingStrategy::TopicRegex("^order*done$".to_string()),
1267 destination: "d".to_string(),
1268 enabled: true,
1269 };
1270 router.add_rule(rule).ok();
1271
1272 assert!(matches!(
1273 router.route(make_message("m", "order-processing-done")),
1274 RoutingOutcome::Routed { .. }
1275 ));
1276 assert!(matches!(
1277 router.route(make_message("m", "orderdone")),
1278 RoutingOutcome::Routed { .. }
1279 ));
1280 }
1281
1282 #[test]
1285 fn test_router_error_display() {
1286 let e1 = RouterError::DuplicateRuleId("r1".to_string());
1287 assert!(format!("{}", e1).contains("r1"));
1288
1289 let e2 = RouterError::RuleNotFound("r2".to_string());
1290 assert!(format!("{}", e2).contains("r2"));
1291
1292 let e3 = RouterError::DlqFull;
1293 assert!(format!("{}", e3).contains("full"));
1294
1295 let e4 = RouterError::InvalidRegex("bad".to_string());
1296 assert!(format!("{}", e4).contains("bad"));
1297 }
1298
1299 #[test]
1300 fn test_router_error_is_error() {
1301 let e: Box<dyn std::error::Error> = Box::new(RouterError::DuplicateRuleId("x".to_string()));
1302 assert!(!e.to_string().is_empty());
1303 }
1304
1305 #[test]
1308 fn test_routing_stats_default() {
1309 let stats = RoutingStats::default();
1310 assert_eq!(stats.total_evaluated, 0);
1311 assert_eq!(stats.total_routed, 0);
1312 assert_eq!(stats.total_dead_lettered, 0);
1313 assert!(stats.per_destination.is_empty());
1314 assert!(stats.per_rule.is_empty());
1315 }
1316
1317 #[test]
1320 fn test_router_config_default() {
1321 let cfg = RouterConfig::default();
1322 assert_eq!(cfg.dlq_capacity, 10_000);
1323 assert!(cfg.enable_dlq);
1324 }
1325
1326 #[test]
1329 fn test_routing_strategy_clone_eq() {
1330 let s1 = RoutingStrategy::TopicExact("x".to_string());
1331 let s2 = s1.clone();
1332 assert_eq!(s1, s2);
1333
1334 let s3 = RoutingStrategy::ContentBased {
1335 field: "a".to_string(),
1336 value: "b".to_string(),
1337 };
1338 let s4 = s3.clone();
1339 assert_eq!(s3, s4);
1340
1341 let s5 = RoutingStrategy::HeaderBased {
1342 key: "k".to_string(),
1343 value: "v".to_string(),
1344 };
1345 assert_eq!(s5.clone(), s5);
1346
1347 let s6 = RoutingStrategy::RoundRobin(vec!["a".to_string()]);
1348 assert_eq!(s6.clone(), s6);
1349
1350 let s7 = RoutingStrategy::TopicRegex("pat".to_string());
1351 assert_eq!(s7.clone(), s7);
1352 }
1353
1354 #[test]
1357 fn test_routing_outcome_dead_lettered_eq() {
1358 assert_eq!(RoutingOutcome::DeadLettered, RoutingOutcome::DeadLettered);
1359 }
1360
1361 #[test]
1362 fn test_routing_outcome_routed_eq() {
1363 let a = RoutingOutcome::Routed {
1364 destination: "x".to_string(),
1365 rule_id: "r".to_string(),
1366 };
1367 let b = a.clone();
1368 assert_eq!(a, b);
1369 }
1370}