Skip to main content

oxirs_stream/
stream_router.rs

1//! Stream message routing engine.
2//!
3//! Provides content-based, topic-based, and header-based message routing with
4//! round-robin distribution, priority evaluation, regex pattern matching, and
5//! dead-letter routing for unroutable messages.
6
7use std::collections::{HashMap, VecDeque};
8
9// ──────────────────────────────────────────────────────────────────────────────
10// Types
11// ──────────────────────────────────────────────────────────────────────────────
12
13/// The kind of matching a routing rule uses.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum RoutingStrategy {
16    /// Match when a payload field equals a specific value.
17    ContentBased { field: String, value: String },
18    /// Match when the topic name equals a literal string.
19    TopicExact(String),
20    /// Match when the topic name matches a regex pattern.
21    TopicRegex(String),
22    /// Match when a header key equals a specific value.
23    HeaderBased { key: String, value: String },
24    /// Distribute messages round-robin across destinations.
25    RoundRobin(Vec<String>),
26}
27
28/// A single routing rule with priority and target destination.
29#[derive(Debug, Clone)]
30pub struct RoutingRule {
31    /// Unique identifier for the rule.
32    pub id: String,
33    /// Evaluation priority (higher is evaluated first).
34    pub priority: u32,
35    /// The matching strategy.
36    pub strategy: RoutingStrategy,
37    /// Target destination name (topic, queue, etc.).
38    pub destination: String,
39    /// Whether the rule is currently active.
40    pub enabled: bool,
41}
42
43/// A message to be routed.
44#[derive(Debug, Clone)]
45pub struct RoutableMessage {
46    /// Unique message identifier.
47    pub id: String,
48    /// The topic the message was published to.
49    pub topic: String,
50    /// Key-value headers attached to the message.
51    pub headers: HashMap<String, String>,
52    /// Key-value payload fields (simplified for routing decisions).
53    pub payload_fields: HashMap<String, String>,
54    /// Raw payload bytes.
55    pub payload: Vec<u8>,
56}
57
58/// The outcome of routing a single message.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum RoutingOutcome {
61    /// Successfully routed to a destination.
62    Routed {
63        destination: String,
64        rule_id: String,
65    },
66    /// No matching rule — sent to DLQ.
67    DeadLettered,
68}
69
70/// Aggregate routing statistics.
71#[derive(Debug, Clone, Default)]
72pub struct RoutingStats {
73    /// Total messages evaluated.
74    pub total_evaluated: u64,
75    /// Messages successfully routed.
76    pub total_routed: u64,
77    /// Messages sent to DLQ (no matching rule).
78    pub total_dead_lettered: u64,
79    /// Per-destination message counts.
80    pub per_destination: HashMap<String, u64>,
81    /// Per-rule match counts.
82    pub per_rule: HashMap<String, u64>,
83}
84
85/// Configuration for the [`StreamRouter`].
86#[derive(Debug, Clone)]
87pub struct RouterConfig {
88    /// Maximum number of dead-lettered messages to retain.
89    pub dlq_capacity: usize,
90    /// Whether to enable the dead-letter queue.
91    pub enable_dlq: bool,
92}
93
94impl Default for RouterConfig {
95    fn default() -> Self {
96        Self {
97            dlq_capacity: 10_000,
98            enable_dlq: true,
99        }
100    }
101}
102
103/// Router error types.
104#[derive(Debug)]
105pub enum RouterError {
106    /// A rule with this ID already exists.
107    DuplicateRuleId(String),
108    /// The referenced rule ID was not found.
109    RuleNotFound(String),
110    /// The DLQ is full.
111    DlqFull,
112    /// Invalid regex pattern.
113    InvalidRegex(String),
114}
115
116impl std::fmt::Display for RouterError {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        match self {
119            RouterError::DuplicateRuleId(id) => write!(f, "duplicate rule id: {}", id),
120            RouterError::RuleNotFound(id) => write!(f, "rule not found: {}", id),
121            RouterError::DlqFull => write!(f, "dead letter queue is full"),
122            RouterError::InvalidRegex(pat) => write!(f, "invalid regex pattern: {}", pat),
123        }
124    }
125}
126
127impl std::error::Error for RouterError {}
128
129// ──────────────────────────────────────────────────────────────────────────────
130// StreamRouter
131// ──────────────────────────────────────────────────────────────────────────────
132
133/// Stream message router with content-, topic-, and header-based routing,
134/// round-robin distribution, priority ordering, and a dead-letter queue.
135pub struct StreamRouter {
136    config: RouterConfig,
137    /// Rules sorted by descending priority (highest first).
138    rules: Vec<RoutingRule>,
139    /// Dead-lettered messages.
140    dlq: VecDeque<RoutableMessage>,
141    /// Running statistics.
142    stats: RoutingStats,
143    /// Round-robin counters keyed by rule ID.
144    round_robin_counters: HashMap<String, usize>,
145}
146
147impl StreamRouter {
148    /// Create a new router with the given configuration.
149    pub fn new(config: RouterConfig) -> Self {
150        Self {
151            config,
152            rules: Vec::new(),
153            dlq: VecDeque::new(),
154            stats: RoutingStats::default(),
155            round_robin_counters: HashMap::new(),
156        }
157    }
158
159    /// Create a router with default configuration.
160    pub fn with_defaults() -> Self {
161        Self::new(RouterConfig::default())
162    }
163
164    /// Add a routing rule. Rules are kept sorted by descending priority.
165    pub fn add_rule(&mut self, rule: RoutingRule) -> Result<(), RouterError> {
166        if self.rules.iter().any(|r| r.id == rule.id) {
167            return Err(RouterError::DuplicateRuleId(rule.id));
168        }
169        // Validate regex patterns eagerly
170        if let RoutingStrategy::TopicRegex(ref pat) = rule.strategy {
171            Self::compile_regex(pat)?;
172        }
173        self.rules.push(rule);
174        self.rules.sort_by_key(|b| std::cmp::Reverse(b.priority));
175        Ok(())
176    }
177
178    /// Remove a routing rule by ID.
179    pub fn remove_rule(&mut self, rule_id: &str) -> Result<RoutingRule, RouterError> {
180        let idx = self
181            .rules
182            .iter()
183            .position(|r| r.id == rule_id)
184            .ok_or_else(|| RouterError::RuleNotFound(rule_id.to_string()))?;
185        let removed = self.rules.remove(idx);
186        self.round_robin_counters.remove(rule_id);
187        Ok(removed)
188    }
189
190    /// Update an existing rule in place.
191    pub fn update_rule(&mut self, rule: RoutingRule) -> Result<(), RouterError> {
192        let idx = self
193            .rules
194            .iter()
195            .position(|r| r.id == rule.id)
196            .ok_or_else(|| RouterError::RuleNotFound(rule.id.clone()))?;
197        // Validate regex patterns eagerly
198        if let RoutingStrategy::TopicRegex(ref pat) = rule.strategy {
199            Self::compile_regex(pat)?;
200        }
201        self.rules[idx] = rule;
202        self.rules.sort_by_key(|b| std::cmp::Reverse(b.priority));
203        Ok(())
204    }
205
206    /// Enable or disable a rule.
207    pub fn set_rule_enabled(&mut self, rule_id: &str, enabled: bool) -> Result<(), RouterError> {
208        let rule = self
209            .rules
210            .iter_mut()
211            .find(|r| r.id == rule_id)
212            .ok_or_else(|| RouterError::RuleNotFound(rule_id.to_string()))?;
213        rule.enabled = enabled;
214        Ok(())
215    }
216
217    /// Return a snapshot of current routing statistics.
218    pub fn stats(&self) -> &RoutingStats {
219        &self.stats
220    }
221
222    /// Return the number of active (enabled) rules.
223    pub fn active_rule_count(&self) -> usize {
224        self.rules.iter().filter(|r| r.enabled).count()
225    }
226
227    /// Return total number of rules.
228    pub fn rule_count(&self) -> usize {
229        self.rules.len()
230    }
231
232    /// Return the dead-letter queue contents.
233    pub fn dlq(&self) -> &VecDeque<RoutableMessage> {
234        &self.dlq
235    }
236
237    /// Pop the oldest dead-lettered message.
238    pub fn pop_dlq(&mut self) -> Option<RoutableMessage> {
239        self.dlq.pop_front()
240    }
241
242    /// Clear all dead-lettered messages.
243    pub fn clear_dlq(&mut self) {
244        self.dlq.clear();
245    }
246
247    /// Route a single message through the rule chain.
248    ///
249    /// The first matching enabled rule (by priority) wins. If no rule matches,
250    /// the message is dead-lettered (if DLQ is enabled).
251    pub fn route(&mut self, message: RoutableMessage) -> RoutingOutcome {
252        self.stats.total_evaluated += 1;
253
254        // Find the matching rule index first (immutable pass), then update
255        // counters and resolve destination (mutable pass) to satisfy the borrow checker.
256        let matched_idx = self
257            .rules
258            .iter()
259            .position(|rule| rule.enabled && Self::matches_rule_static(rule, &message));
260
261        if let Some(idx) = matched_idx {
262            let rule_id = self.rules[idx].id.clone();
263            let destination = self.resolve_destination_by_index(idx);
264            self.stats.total_routed += 1;
265            *self
266                .stats
267                .per_destination
268                .entry(destination.clone())
269                .or_insert(0) += 1;
270            *self.stats.per_rule.entry(rule_id.clone()).or_insert(0) += 1;
271            return RoutingOutcome::Routed {
272                destination,
273                rule_id,
274            };
275        }
276
277        // No rule matched — dead letter
278        self.stats.total_dead_lettered += 1;
279        if self.config.enable_dlq {
280            if self.dlq.len() >= self.config.dlq_capacity {
281                // Evict the oldest entry to make room
282                self.dlq.pop_front();
283            }
284            self.dlq.push_back(message);
285        }
286        RoutingOutcome::DeadLettered
287    }
288
289    /// Route a batch of messages. Returns one outcome per message.
290    pub fn route_batch(&mut self, messages: Vec<RoutableMessage>) -> Vec<RoutingOutcome> {
291        messages.into_iter().map(|m| self.route(m)).collect()
292    }
293
294    /// Reset statistics counters to zero.
295    pub fn reset_stats(&mut self) {
296        self.stats = RoutingStats::default();
297    }
298
299    /// Return all rule IDs in priority order (highest first).
300    pub fn rule_ids(&self) -> Vec<&str> {
301        self.rules.iter().map(|r| r.id.as_str()).collect()
302    }
303
304    /// Return a reference to a rule by ID, if it exists.
305    pub fn get_rule(&self, rule_id: &str) -> Option<&RoutingRule> {
306        self.rules.iter().find(|r| r.id == rule_id)
307    }
308
309    // ── Private helpers ──────────────────────────────────────────────────────
310
311    /// Test whether a rule matches a given message (static, no &self needed).
312    fn matches_rule_static(rule: &RoutingRule, message: &RoutableMessage) -> bool {
313        match &rule.strategy {
314            RoutingStrategy::ContentBased { field, value } => {
315                message.payload_fields.get(field) == Some(value)
316            }
317            RoutingStrategy::TopicExact(topic) => message.topic == *topic,
318            RoutingStrategy::TopicRegex(pattern) => Self::regex_matches(pattern, &message.topic),
319            RoutingStrategy::HeaderBased { key, value } => message.headers.get(key) == Some(value),
320            RoutingStrategy::RoundRobin(destinations) => !destinations.is_empty(),
321        }
322    }
323
324    /// Resolve the actual destination string by rule index. For round-robin, pick the next target.
325    fn resolve_destination_by_index(&mut self, idx: usize) -> String {
326        let rule = &self.rules[idx];
327        match &rule.strategy {
328            RoutingStrategy::RoundRobin(destinations) if !destinations.is_empty() => {
329                let rule_id = rule.id.clone();
330                let counter = self.round_robin_counters.entry(rule_id).or_insert(0);
331                let dest_idx = *counter % destinations.len();
332                *counter = counter.wrapping_add(1);
333                destinations[dest_idx].clone()
334            }
335            _ => rule.destination.clone(),
336        }
337    }
338
339    /// Compile a regex pattern, returning an error if invalid.
340    fn compile_regex(pattern: &str) -> Result<(), RouterError> {
341        // Simple regex-like matching: we support `*` as wildcard and `^`/`$` anchors.
342        // For full regex we validate the pattern manually.
343        if pattern.is_empty() {
344            return Err(RouterError::InvalidRegex("empty pattern".to_string()));
345        }
346        // Check for unbalanced parentheses
347        let mut depth: i32 = 0;
348        for ch in pattern.chars() {
349            match ch {
350                '(' => depth += 1,
351                ')' => depth -= 1,
352                _ => {}
353            }
354            if depth < 0 {
355                return Err(RouterError::InvalidRegex(
356                    "unbalanced parentheses".to_string(),
357                ));
358            }
359        }
360        if depth != 0 {
361            return Err(RouterError::InvalidRegex(
362                "unbalanced parentheses".to_string(),
363            ));
364        }
365        Ok(())
366    }
367
368    /// Simple regex-like topic matching.
369    ///
370    /// Supported syntax:
371    /// - `*` matches any sequence of characters (equivalent to `.*`)
372    /// - Literal characters match themselves
373    /// - `^` and `$` anchor to start/end
374    fn regex_matches(pattern: &str, input: &str) -> bool {
375        // Convert our simplified pattern to segment-based matching
376        let anchored_start = pattern.starts_with('^');
377        let anchored_end = pattern.ends_with('$');
378
379        let trimmed = pattern.trim_start_matches('^').trim_end_matches('$');
380
381        if trimmed.is_empty() {
382            return input.is_empty();
383        }
384
385        // Split on `*` to get literal segments
386        let segments: Vec<&str> = trimmed.split('*').collect();
387
388        if segments.len() == 1 {
389            // No wildcards
390            let seg = segments[0];
391            if anchored_start && anchored_end {
392                return input == seg;
393            }
394            if anchored_start {
395                return input.starts_with(seg);
396            }
397            if anchored_end {
398                return input.ends_with(seg);
399            }
400            return input.contains(seg);
401        }
402
403        // Multiple segments separated by wildcards
404        let mut pos = 0usize;
405
406        for (i, seg) in segments.iter().enumerate() {
407            if seg.is_empty() {
408                continue;
409            }
410            if i == 0 && anchored_start {
411                if !input.starts_with(seg) {
412                    return false;
413                }
414                pos = seg.len();
415                continue;
416            }
417
418            match input[pos..].find(seg) {
419                Some(found) => {
420                    pos += found + seg.len();
421                }
422                None => return false,
423            }
424        }
425
426        if anchored_end {
427            let last_seg = segments.last().unwrap_or(&"");
428            if last_seg.is_empty() {
429                return true; // pattern ends with `*`
430            }
431            return input.ends_with(last_seg);
432        }
433
434        true
435    }
436}
437
438// ──────────────────────────────────────────────────────────────────────────────
439// Tests
440// ──────────────────────────────────────────────────────────────────────────────
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    fn make_message(id: &str, topic: &str) -> RoutableMessage {
447        RoutableMessage {
448            id: id.to_string(),
449            topic: topic.to_string(),
450            headers: HashMap::new(),
451            payload_fields: HashMap::new(),
452            payload: Vec::new(),
453        }
454    }
455
456    fn make_message_with_header(id: &str, topic: &str, key: &str, val: &str) -> RoutableMessage {
457        let mut msg = make_message(id, topic);
458        msg.headers.insert(key.to_string(), val.to_string());
459        msg
460    }
461
462    fn make_message_with_field(id: &str, topic: &str, key: &str, val: &str) -> RoutableMessage {
463        let mut msg = make_message(id, topic);
464        msg.payload_fields.insert(key.to_string(), val.to_string());
465        msg
466    }
467
468    // ── Basic construction ───────────────────────────────────────────────────
469
470    #[test]
471    fn test_router_creation_defaults() {
472        let router = StreamRouter::with_defaults();
473        assert_eq!(router.rule_count(), 0);
474        assert_eq!(router.active_rule_count(), 0);
475        assert_eq!(router.stats().total_evaluated, 0);
476    }
477
478    #[test]
479    fn test_router_creation_custom_config() {
480        let cfg = RouterConfig {
481            dlq_capacity: 500,
482            enable_dlq: false,
483        };
484        let router = StreamRouter::new(cfg);
485        assert_eq!(router.rule_count(), 0);
486        assert!(router.dlq().is_empty());
487    }
488
489    // ── Rule management ──────────────────────────────────────────────────────
490
491    #[test]
492    fn test_add_rule() {
493        let mut router = StreamRouter::with_defaults();
494        let rule = RoutingRule {
495            id: "r1".to_string(),
496            priority: 10,
497            strategy: RoutingStrategy::TopicExact("orders".to_string()),
498            destination: "order-queue".to_string(),
499            enabled: true,
500        };
501        assert!(router.add_rule(rule).is_ok());
502        assert_eq!(router.rule_count(), 1);
503    }
504
505    #[test]
506    fn test_add_duplicate_rule() {
507        let mut router = StreamRouter::with_defaults();
508        let rule = RoutingRule {
509            id: "r1".to_string(),
510            priority: 10,
511            strategy: RoutingStrategy::TopicExact("orders".to_string()),
512            destination: "q".to_string(),
513            enabled: true,
514        };
515        assert!(router.add_rule(rule.clone()).is_ok());
516        assert!(router.add_rule(rule).is_err());
517    }
518
519    #[test]
520    fn test_remove_rule() {
521        let mut router = StreamRouter::with_defaults();
522        let rule = RoutingRule {
523            id: "r1".to_string(),
524            priority: 10,
525            strategy: RoutingStrategy::TopicExact("x".to_string()),
526            destination: "y".to_string(),
527            enabled: true,
528        };
529        router.add_rule(rule).ok();
530        let removed = router.remove_rule("r1");
531        assert!(removed.is_ok());
532        assert_eq!(router.rule_count(), 0);
533    }
534
535    #[test]
536    fn test_remove_nonexistent_rule() {
537        let mut router = StreamRouter::with_defaults();
538        assert!(router.remove_rule("nope").is_err());
539    }
540
541    #[test]
542    fn test_update_rule() {
543        let mut router = StreamRouter::with_defaults();
544        let rule = RoutingRule {
545            id: "r1".to_string(),
546            priority: 5,
547            strategy: RoutingStrategy::TopicExact("a".to_string()),
548            destination: "dest1".to_string(),
549            enabled: true,
550        };
551        router.add_rule(rule).ok();
552
553        let updated = RoutingRule {
554            id: "r1".to_string(),
555            priority: 20,
556            strategy: RoutingStrategy::TopicExact("b".to_string()),
557            destination: "dest2".to_string(),
558            enabled: true,
559        };
560        assert!(router.update_rule(updated).is_ok());
561        assert_eq!(
562            router.get_rule("r1").map(|r| &r.destination),
563            Some(&"dest2".to_string())
564        );
565    }
566
567    #[test]
568    fn test_update_nonexistent_rule() {
569        let mut router = StreamRouter::with_defaults();
570        let rule = RoutingRule {
571            id: "nope".to_string(),
572            priority: 1,
573            strategy: RoutingStrategy::TopicExact("x".to_string()),
574            destination: "y".to_string(),
575            enabled: true,
576        };
577        assert!(router.update_rule(rule).is_err());
578    }
579
580    #[test]
581    fn test_enable_disable_rule() {
582        let mut router = StreamRouter::with_defaults();
583        let rule = RoutingRule {
584            id: "r1".to_string(),
585            priority: 1,
586            strategy: RoutingStrategy::TopicExact("x".to_string()),
587            destination: "y".to_string(),
588            enabled: true,
589        };
590        router.add_rule(rule).ok();
591        assert_eq!(router.active_rule_count(), 1);
592
593        router.set_rule_enabled("r1", false).ok();
594        assert_eq!(router.active_rule_count(), 0);
595
596        router.set_rule_enabled("r1", true).ok();
597        assert_eq!(router.active_rule_count(), 1);
598    }
599
600    #[test]
601    fn test_enable_nonexistent_rule() {
602        let mut router = StreamRouter::with_defaults();
603        assert!(router.set_rule_enabled("nope", true).is_err());
604    }
605
606    // ── Topic-exact routing ──────────────────────────────────────────────────
607
608    #[test]
609    fn test_topic_exact_match() {
610        let mut router = StreamRouter::with_defaults();
611        let rule = RoutingRule {
612            id: "r1".to_string(),
613            priority: 10,
614            strategy: RoutingStrategy::TopicExact("orders".to_string()),
615            destination: "order-queue".to_string(),
616            enabled: true,
617        };
618        router.add_rule(rule).ok();
619
620        let msg = make_message("m1", "orders");
621        let outcome = router.route(msg);
622        assert_eq!(
623            outcome,
624            RoutingOutcome::Routed {
625                destination: "order-queue".to_string(),
626                rule_id: "r1".to_string()
627            }
628        );
629    }
630
631    #[test]
632    fn test_topic_exact_no_match() {
633        let mut router = StreamRouter::with_defaults();
634        let rule = RoutingRule {
635            id: "r1".to_string(),
636            priority: 10,
637            strategy: RoutingStrategy::TopicExact("orders".to_string()),
638            destination: "order-queue".to_string(),
639            enabled: true,
640        };
641        router.add_rule(rule).ok();
642
643        let msg = make_message("m1", "payments");
644        let outcome = router.route(msg);
645        assert_eq!(outcome, RoutingOutcome::DeadLettered);
646    }
647
648    // ── Topic-regex routing ──────────────────────────────────────────────────
649
650    #[test]
651    fn test_topic_regex_wildcard() {
652        let mut router = StreamRouter::with_defaults();
653        let rule = RoutingRule {
654            id: "r1".to_string(),
655            priority: 10,
656            strategy: RoutingStrategy::TopicRegex("orders.*".to_string()),
657            destination: "all-orders".to_string(),
658            enabled: true,
659        };
660        router.add_rule(rule).ok();
661
662        let msg = make_message("m1", "orders.us");
663        assert!(matches!(router.route(msg), RoutingOutcome::Routed { .. }));
664    }
665
666    #[test]
667    fn test_topic_regex_anchored() {
668        let mut router = StreamRouter::with_defaults();
669        let rule = RoutingRule {
670            id: "r1".to_string(),
671            priority: 10,
672            strategy: RoutingStrategy::TopicRegex("^orders$".to_string()),
673            destination: "exact-orders".to_string(),
674            enabled: true,
675        };
676        router.add_rule(rule).ok();
677
678        let match_msg = make_message("m1", "orders");
679        assert!(matches!(
680            router.route(match_msg),
681            RoutingOutcome::Routed { .. }
682        ));
683
684        let no_match = make_message("m2", "orders.eu");
685        assert_eq!(router.route(no_match), RoutingOutcome::DeadLettered);
686    }
687
688    #[test]
689    fn test_topic_regex_contains() {
690        let mut router = StreamRouter::with_defaults();
691        let rule = RoutingRule {
692            id: "r1".to_string(),
693            priority: 10,
694            strategy: RoutingStrategy::TopicRegex("ship".to_string()),
695            destination: "shipping".to_string(),
696            enabled: true,
697        };
698        router.add_rule(rule).ok();
699
700        let msg = make_message("m1", "order-shipping-us");
701        assert!(matches!(router.route(msg), RoutingOutcome::Routed { .. }));
702    }
703
704    #[test]
705    fn test_invalid_regex_rejected() {
706        let mut router = StreamRouter::with_defaults();
707        let rule = RoutingRule {
708            id: "r1".to_string(),
709            priority: 10,
710            strategy: RoutingStrategy::TopicRegex("(".to_string()),
711            destination: "dest".to_string(),
712            enabled: true,
713        };
714        assert!(router.add_rule(rule).is_err());
715    }
716
717    #[test]
718    fn test_empty_regex_rejected() {
719        let mut router = StreamRouter::with_defaults();
720        let rule = RoutingRule {
721            id: "r1".to_string(),
722            priority: 10,
723            strategy: RoutingStrategy::TopicRegex(String::new()),
724            destination: "dest".to_string(),
725            enabled: true,
726        };
727        assert!(router.add_rule(rule).is_err());
728    }
729
730    // ── Content-based routing ────────────────────────────────────────────────
731
732    #[test]
733    fn test_content_based_match() {
734        let mut router = StreamRouter::with_defaults();
735        let rule = RoutingRule {
736            id: "r1".to_string(),
737            priority: 10,
738            strategy: RoutingStrategy::ContentBased {
739                field: "type".to_string(),
740                value: "order".to_string(),
741            },
742            destination: "order-queue".to_string(),
743            enabled: true,
744        };
745        router.add_rule(rule).ok();
746
747        let msg = make_message_with_field("m1", "events", "type", "order");
748        assert!(matches!(router.route(msg), RoutingOutcome::Routed { .. }));
749    }
750
751    #[test]
752    fn test_content_based_no_match() {
753        let mut router = StreamRouter::with_defaults();
754        let rule = RoutingRule {
755            id: "r1".to_string(),
756            priority: 10,
757            strategy: RoutingStrategy::ContentBased {
758                field: "type".to_string(),
759                value: "order".to_string(),
760            },
761            destination: "order-queue".to_string(),
762            enabled: true,
763        };
764        router.add_rule(rule).ok();
765
766        let msg = make_message_with_field("m1", "events", "type", "payment");
767        assert_eq!(router.route(msg), RoutingOutcome::DeadLettered);
768    }
769
770    #[test]
771    fn test_content_based_missing_field() {
772        let mut router = StreamRouter::with_defaults();
773        let rule = RoutingRule {
774            id: "r1".to_string(),
775            priority: 10,
776            strategy: RoutingStrategy::ContentBased {
777                field: "region".to_string(),
778                value: "us".to_string(),
779            },
780            destination: "us-queue".to_string(),
781            enabled: true,
782        };
783        router.add_rule(rule).ok();
784
785        let msg = make_message("m1", "events");
786        assert_eq!(router.route(msg), RoutingOutcome::DeadLettered);
787    }
788
789    // ── Header-based routing ─────────────────────────────────────────────────
790
791    #[test]
792    fn test_header_based_match() {
793        let mut router = StreamRouter::with_defaults();
794        let rule = RoutingRule {
795            id: "r1".to_string(),
796            priority: 10,
797            strategy: RoutingStrategy::HeaderBased {
798                key: "X-Priority".to_string(),
799                value: "high".to_string(),
800            },
801            destination: "priority-queue".to_string(),
802            enabled: true,
803        };
804        router.add_rule(rule).ok();
805
806        let msg = make_message_with_header("m1", "events", "X-Priority", "high");
807        assert!(matches!(router.route(msg), RoutingOutcome::Routed { .. }));
808    }
809
810    #[test]
811    fn test_header_based_no_match_wrong_value() {
812        let mut router = StreamRouter::with_defaults();
813        let rule = RoutingRule {
814            id: "r1".to_string(),
815            priority: 10,
816            strategy: RoutingStrategy::HeaderBased {
817                key: "X-Priority".to_string(),
818                value: "high".to_string(),
819            },
820            destination: "priority-queue".to_string(),
821            enabled: true,
822        };
823        router.add_rule(rule).ok();
824
825        let msg = make_message_with_header("m1", "events", "X-Priority", "low");
826        assert_eq!(router.route(msg), RoutingOutcome::DeadLettered);
827    }
828
829    #[test]
830    fn test_header_based_missing_header() {
831        let mut router = StreamRouter::with_defaults();
832        let rule = RoutingRule {
833            id: "r1".to_string(),
834            priority: 10,
835            strategy: RoutingStrategy::HeaderBased {
836                key: "X-Priority".to_string(),
837                value: "high".to_string(),
838            },
839            destination: "priority-queue".to_string(),
840            enabled: true,
841        };
842        router.add_rule(rule).ok();
843
844        let msg = make_message("m1", "events");
845        assert_eq!(router.route(msg), RoutingOutcome::DeadLettered);
846    }
847
848    // ── Round-robin routing ──────────────────────────────────────────────────
849
850    #[test]
851    fn test_round_robin_distribution() {
852        let mut router = StreamRouter::with_defaults();
853        let rule = RoutingRule {
854            id: "rr1".to_string(),
855            priority: 10,
856            strategy: RoutingStrategy::RoundRobin(vec![
857                "dest-a".to_string(),
858                "dest-b".to_string(),
859                "dest-c".to_string(),
860            ]),
861            destination: String::new(), // not used for RR
862            enabled: true,
863        };
864        router.add_rule(rule).ok();
865
866        let o1 = router.route(make_message("m1", "any"));
867        let o2 = router.route(make_message("m2", "any"));
868        let o3 = router.route(make_message("m3", "any"));
869        let o4 = router.route(make_message("m4", "any"));
870
871        assert_eq!(
872            o1,
873            RoutingOutcome::Routed {
874                destination: "dest-a".to_string(),
875                rule_id: "rr1".to_string()
876            }
877        );
878        assert_eq!(
879            o2,
880            RoutingOutcome::Routed {
881                destination: "dest-b".to_string(),
882                rule_id: "rr1".to_string()
883            }
884        );
885        assert_eq!(
886            o3,
887            RoutingOutcome::Routed {
888                destination: "dest-c".to_string(),
889                rule_id: "rr1".to_string()
890            }
891        );
892        assert_eq!(
893            o4,
894            RoutingOutcome::Routed {
895                destination: "dest-a".to_string(),
896                rule_id: "rr1".to_string()
897            }
898        );
899    }
900
901    #[test]
902    fn test_round_robin_empty_destinations() {
903        let mut router = StreamRouter::with_defaults();
904        let rule = RoutingRule {
905            id: "rr1".to_string(),
906            priority: 10,
907            strategy: RoutingStrategy::RoundRobin(vec![]),
908            destination: String::new(),
909            enabled: true,
910        };
911        router.add_rule(rule).ok();
912
913        let outcome = router.route(make_message("m1", "any"));
914        assert_eq!(outcome, RoutingOutcome::DeadLettered);
915    }
916
917    // ── Priority ordering ────────────────────────────────────────────────────
918
919    #[test]
920    fn test_priority_ordering() {
921        let mut router = StreamRouter::with_defaults();
922
923        // Low priority
924        let low = RoutingRule {
925            id: "low".to_string(),
926            priority: 1,
927            strategy: RoutingStrategy::TopicExact("orders".to_string()),
928            destination: "low-queue".to_string(),
929            enabled: true,
930        };
931        // High priority
932        let high = RoutingRule {
933            id: "high".to_string(),
934            priority: 100,
935            strategy: RoutingStrategy::TopicExact("orders".to_string()),
936            destination: "high-queue".to_string(),
937            enabled: true,
938        };
939
940        // Add low first, then high — high should still win
941        router.add_rule(low).ok();
942        router.add_rule(high).ok();
943
944        let outcome = router.route(make_message("m1", "orders"));
945        assert_eq!(
946            outcome,
947            RoutingOutcome::Routed {
948                destination: "high-queue".to_string(),
949                rule_id: "high".to_string()
950            }
951        );
952    }
953
954    #[test]
955    fn test_disabled_rule_skipped_in_priority() {
956        let mut router = StreamRouter::with_defaults();
957        let high = RoutingRule {
958            id: "high".to_string(),
959            priority: 100,
960            strategy: RoutingStrategy::TopicExact("orders".to_string()),
961            destination: "high-queue".to_string(),
962            enabled: false, // disabled
963        };
964        let low = RoutingRule {
965            id: "low".to_string(),
966            priority: 1,
967            strategy: RoutingStrategy::TopicExact("orders".to_string()),
968            destination: "low-queue".to_string(),
969            enabled: true,
970        };
971        router.add_rule(high).ok();
972        router.add_rule(low).ok();
973
974        let outcome = router.route(make_message("m1", "orders"));
975        assert_eq!(
976            outcome,
977            RoutingOutcome::Routed {
978                destination: "low-queue".to_string(),
979                rule_id: "low".to_string()
980            }
981        );
982    }
983
984    // ── Dead letter queue ────────────────────────────────────────────────────
985
986    #[test]
987    fn test_dlq_receives_unroutable() {
988        let mut router = StreamRouter::with_defaults();
989        let msg = make_message("m1", "unknown-topic");
990        let outcome = router.route(msg);
991        assert_eq!(outcome, RoutingOutcome::DeadLettered);
992        assert_eq!(router.dlq().len(), 1);
993    }
994
995    #[test]
996    fn test_dlq_disabled() {
997        let cfg = RouterConfig {
998            dlq_capacity: 100,
999            enable_dlq: false,
1000        };
1001        let mut router = StreamRouter::new(cfg);
1002        let msg = make_message("m1", "unknown");
1003        router.route(msg);
1004        assert!(router.dlq().is_empty());
1005    }
1006
1007    #[test]
1008    fn test_dlq_capacity_eviction() {
1009        let cfg = RouterConfig {
1010            dlq_capacity: 2,
1011            enable_dlq: true,
1012        };
1013        let mut router = StreamRouter::new(cfg);
1014        router.route(make_message("m1", "x"));
1015        router.route(make_message("m2", "x"));
1016        router.route(make_message("m3", "x"));
1017        assert_eq!(router.dlq().len(), 2);
1018        // m1 should have been evicted
1019        assert_eq!(router.dlq().front().map(|m| m.id.as_str()), Some("m2"));
1020    }
1021
1022    #[test]
1023    fn test_pop_dlq() {
1024        let mut router = StreamRouter::with_defaults();
1025        router.route(make_message("m1", "x"));
1026        router.route(make_message("m2", "x"));
1027        let popped = router.pop_dlq();
1028        assert_eq!(popped.map(|m| m.id), Some("m1".to_string()));
1029        assert_eq!(router.dlq().len(), 1);
1030    }
1031
1032    #[test]
1033    fn test_clear_dlq() {
1034        let mut router = StreamRouter::with_defaults();
1035        router.route(make_message("m1", "x"));
1036        router.route(make_message("m2", "x"));
1037        router.clear_dlq();
1038        assert!(router.dlq().is_empty());
1039    }
1040
1041    // ── Statistics ────────────────────────────────────────────────────────────
1042
1043    #[test]
1044    fn test_stats_tracking() {
1045        let mut router = StreamRouter::with_defaults();
1046        let rule = RoutingRule {
1047            id: "r1".to_string(),
1048            priority: 10,
1049            strategy: RoutingStrategy::TopicExact("orders".to_string()),
1050            destination: "q".to_string(),
1051            enabled: true,
1052        };
1053        router.add_rule(rule).ok();
1054
1055        router.route(make_message("m1", "orders"));
1056        router.route(make_message("m2", "orders"));
1057        router.route(make_message("m3", "unknown"));
1058
1059        assert_eq!(router.stats().total_evaluated, 3);
1060        assert_eq!(router.stats().total_routed, 2);
1061        assert_eq!(router.stats().total_dead_lettered, 1);
1062        assert_eq!(router.stats().per_destination.get("q"), Some(&2));
1063        assert_eq!(router.stats().per_rule.get("r1"), Some(&2));
1064    }
1065
1066    #[test]
1067    fn test_reset_stats() {
1068        let mut router = StreamRouter::with_defaults();
1069        router.route(make_message("m1", "x"));
1070        router.reset_stats();
1071        assert_eq!(router.stats().total_evaluated, 0);
1072        assert_eq!(router.stats().total_routed, 0);
1073        assert_eq!(router.stats().total_dead_lettered, 0);
1074    }
1075
1076    // ── Batch routing ────────────────────────────────────────────────────────
1077
1078    #[test]
1079    fn test_batch_routing() {
1080        let mut router = StreamRouter::with_defaults();
1081        let rule = RoutingRule {
1082            id: "r1".to_string(),
1083            priority: 10,
1084            strategy: RoutingStrategy::TopicExact("orders".to_string()),
1085            destination: "q".to_string(),
1086            enabled: true,
1087        };
1088        router.add_rule(rule).ok();
1089
1090        let messages = vec![
1091            make_message("m1", "orders"),
1092            make_message("m2", "payments"),
1093            make_message("m3", "orders"),
1094        ];
1095        let outcomes = router.route_batch(messages);
1096        assert_eq!(outcomes.len(), 3);
1097        assert!(matches!(outcomes[0], RoutingOutcome::Routed { .. }));
1098        assert_eq!(outcomes[1], RoutingOutcome::DeadLettered);
1099        assert!(matches!(outcomes[2], RoutingOutcome::Routed { .. }));
1100    }
1101
1102    // ── Rule accessors ───────────────────────────────────────────────────────
1103
1104    #[test]
1105    fn test_rule_ids_in_priority_order() {
1106        let mut router = StreamRouter::with_defaults();
1107        let r1 = RoutingRule {
1108            id: "low".to_string(),
1109            priority: 1,
1110            strategy: RoutingStrategy::TopicExact("a".to_string()),
1111            destination: "d".to_string(),
1112            enabled: true,
1113        };
1114        let r2 = RoutingRule {
1115            id: "high".to_string(),
1116            priority: 100,
1117            strategy: RoutingStrategy::TopicExact("b".to_string()),
1118            destination: "d".to_string(),
1119            enabled: true,
1120        };
1121        router.add_rule(r1).ok();
1122        router.add_rule(r2).ok();
1123        let ids = router.rule_ids();
1124        assert_eq!(ids, vec!["high", "low"]);
1125    }
1126
1127    #[test]
1128    fn test_get_rule() {
1129        let mut router = StreamRouter::with_defaults();
1130        let rule = RoutingRule {
1131            id: "r1".to_string(),
1132            priority: 5,
1133            strategy: RoutingStrategy::TopicExact("x".to_string()),
1134            destination: "y".to_string(),
1135            enabled: true,
1136        };
1137        router.add_rule(rule).ok();
1138        assert!(router.get_rule("r1").is_some());
1139        assert!(router.get_rule("nope").is_none());
1140    }
1141
1142    // ── Mixed strategy tests ─────────────────────────────────────────────────
1143
1144    #[test]
1145    fn test_multiple_strategies() {
1146        let mut router = StreamRouter::with_defaults();
1147        let topic_rule = RoutingRule {
1148            id: "topic".to_string(),
1149            priority: 5,
1150            strategy: RoutingStrategy::TopicExact("orders".to_string()),
1151            destination: "topic-dest".to_string(),
1152            enabled: true,
1153        };
1154        let header_rule = RoutingRule {
1155            id: "header".to_string(),
1156            priority: 10,
1157            strategy: RoutingStrategy::HeaderBased {
1158                key: "X-VIP".to_string(),
1159                value: "true".to_string(),
1160            },
1161            destination: "vip-dest".to_string(),
1162            enabled: true,
1163        };
1164        router.add_rule(topic_rule).ok();
1165        router.add_rule(header_rule).ok();
1166
1167        // Header rule has higher priority and matches
1168        let msg = make_message_with_header("m1", "orders", "X-VIP", "true");
1169        let outcome = router.route(msg);
1170        assert_eq!(
1171            outcome,
1172            RoutingOutcome::Routed {
1173                destination: "vip-dest".to_string(),
1174                rule_id: "header".to_string()
1175            }
1176        );
1177    }
1178
1179    #[test]
1180    fn test_fallthrough_to_lower_priority() {
1181        let mut router = StreamRouter::with_defaults();
1182        let header_rule = RoutingRule {
1183            id: "header".to_string(),
1184            priority: 10,
1185            strategy: RoutingStrategy::HeaderBased {
1186                key: "X-VIP".to_string(),
1187                value: "true".to_string(),
1188            },
1189            destination: "vip-dest".to_string(),
1190            enabled: true,
1191        };
1192        let topic_rule = RoutingRule {
1193            id: "topic".to_string(),
1194            priority: 1,
1195            strategy: RoutingStrategy::TopicExact("orders".to_string()),
1196            destination: "topic-dest".to_string(),
1197            enabled: true,
1198        };
1199        router.add_rule(header_rule).ok();
1200        router.add_rule(topic_rule).ok();
1201
1202        // No VIP header, falls through to topic match
1203        let msg = make_message("m1", "orders");
1204        let outcome = router.route(msg);
1205        assert_eq!(
1206            outcome,
1207            RoutingOutcome::Routed {
1208                destination: "topic-dest".to_string(),
1209                rule_id: "topic".to_string()
1210            }
1211        );
1212    }
1213
1214    // ── Regex edge cases ─────────────────────────────────────────────────────
1215
1216    #[test]
1217    fn test_regex_start_anchor() {
1218        let mut router = StreamRouter::with_defaults();
1219        let rule = RoutingRule {
1220            id: "r1".to_string(),
1221            priority: 10,
1222            strategy: RoutingStrategy::TopicRegex("^orders".to_string()),
1223            destination: "d".to_string(),
1224            enabled: true,
1225        };
1226        router.add_rule(rule).ok();
1227
1228        assert!(matches!(
1229            router.route(make_message("m", "orders.us")),
1230            RoutingOutcome::Routed { .. }
1231        ));
1232        assert_eq!(
1233            router.route(make_message("m", "pre-orders")),
1234            RoutingOutcome::DeadLettered
1235        );
1236    }
1237
1238    #[test]
1239    fn test_regex_end_anchor() {
1240        let mut router = StreamRouter::with_defaults();
1241        let rule = RoutingRule {
1242            id: "r1".to_string(),
1243            priority: 10,
1244            strategy: RoutingStrategy::TopicRegex("orders$".to_string()),
1245            destination: "d".to_string(),
1246            enabled: true,
1247        };
1248        router.add_rule(rule).ok();
1249
1250        assert!(matches!(
1251            router.route(make_message("m", "all-orders")),
1252            RoutingOutcome::Routed { .. }
1253        ));
1254        assert_eq!(
1255            router.route(make_message("m", "orders-new")),
1256            RoutingOutcome::DeadLettered
1257        );
1258    }
1259
1260    #[test]
1261    fn test_regex_wildcard_middle() {
1262        let mut router = StreamRouter::with_defaults();
1263        let rule = RoutingRule {
1264            id: "r1".to_string(),
1265            priority: 10,
1266            strategy: RoutingStrategy::TopicRegex("^order*done$".to_string()),
1267            destination: "d".to_string(),
1268            enabled: true,
1269        };
1270        router.add_rule(rule).ok();
1271
1272        assert!(matches!(
1273            router.route(make_message("m", "order-processing-done")),
1274            RoutingOutcome::Routed { .. }
1275        ));
1276        assert!(matches!(
1277            router.route(make_message("m", "orderdone")),
1278            RoutingOutcome::Routed { .. }
1279        ));
1280    }
1281
1282    // ── Router error display ─────────────────────────────────────────────────
1283
1284    #[test]
1285    fn test_router_error_display() {
1286        let e1 = RouterError::DuplicateRuleId("r1".to_string());
1287        assert!(format!("{}", e1).contains("r1"));
1288
1289        let e2 = RouterError::RuleNotFound("r2".to_string());
1290        assert!(format!("{}", e2).contains("r2"));
1291
1292        let e3 = RouterError::DlqFull;
1293        assert!(format!("{}", e3).contains("full"));
1294
1295        let e4 = RouterError::InvalidRegex("bad".to_string());
1296        assert!(format!("{}", e4).contains("bad"));
1297    }
1298
1299    #[test]
1300    fn test_router_error_is_error() {
1301        let e: Box<dyn std::error::Error> = Box::new(RouterError::DuplicateRuleId("x".to_string()));
1302        assert!(!e.to_string().is_empty());
1303    }
1304
1305    // ── Routing stats default ────────────────────────────────────────────────
1306
1307    #[test]
1308    fn test_routing_stats_default() {
1309        let stats = RoutingStats::default();
1310        assert_eq!(stats.total_evaluated, 0);
1311        assert_eq!(stats.total_routed, 0);
1312        assert_eq!(stats.total_dead_lettered, 0);
1313        assert!(stats.per_destination.is_empty());
1314        assert!(stats.per_rule.is_empty());
1315    }
1316
1317    // ── Config default ───────────────────────────────────────────────────────
1318
1319    #[test]
1320    fn test_router_config_default() {
1321        let cfg = RouterConfig::default();
1322        assert_eq!(cfg.dlq_capacity, 10_000);
1323        assert!(cfg.enable_dlq);
1324    }
1325
1326    // ── RoutingStrategy clone + eq ───────────────────────────────────────────
1327
1328    #[test]
1329    fn test_routing_strategy_clone_eq() {
1330        let s1 = RoutingStrategy::TopicExact("x".to_string());
1331        let s2 = s1.clone();
1332        assert_eq!(s1, s2);
1333
1334        let s3 = RoutingStrategy::ContentBased {
1335            field: "a".to_string(),
1336            value: "b".to_string(),
1337        };
1338        let s4 = s3.clone();
1339        assert_eq!(s3, s4);
1340
1341        let s5 = RoutingStrategy::HeaderBased {
1342            key: "k".to_string(),
1343            value: "v".to_string(),
1344        };
1345        assert_eq!(s5.clone(), s5);
1346
1347        let s6 = RoutingStrategy::RoundRobin(vec!["a".to_string()]);
1348        assert_eq!(s6.clone(), s6);
1349
1350        let s7 = RoutingStrategy::TopicRegex("pat".to_string());
1351        assert_eq!(s7.clone(), s7);
1352    }
1353
1354    // ── RoutingOutcome ───────────────────────────────────────────────────────
1355
1356    #[test]
1357    fn test_routing_outcome_dead_lettered_eq() {
1358        assert_eq!(RoutingOutcome::DeadLettered, RoutingOutcome::DeadLettered);
1359    }
1360
1361    #[test]
1362    fn test_routing_outcome_routed_eq() {
1363        let a = RoutingOutcome::Routed {
1364            destination: "x".to_string(),
1365            rule_id: "r".to_string(),
1366        };
1367        let b = a.clone();
1368        assert_eq!(a, b);
1369    }
1370}