celers_core/
router.rs

1//! Task Routing
2//!
3//! This module provides task routing capabilities for directing tasks to appropriate
4//! queues based on various criteria:
5//!
6//! - **Name Patterns**: Route tasks based on glob or regex patterns
7//! - **Queue Routing**: Route tasks to specific queues based on task type
8//! - **Priority Routing**: Route tasks based on priority levels
9//! - **Custom Strategies**: Implement custom routing logic
10//!
11//! # Example
12//!
13//! ```rust
14//! use celers_core::router::{Router, RouteRule, PatternMatcher};
15//!
16//! // Create a router with rules
17//! let mut router = Router::new();
18//!
19//! // Route all tasks starting with "email." to the "email" queue
20//! router.add_rule(RouteRule::new(
21//!     PatternMatcher::glob("email.*"),
22//!     "email"
23//! ));
24//!
25//! // Route high priority tasks to "high_priority" queue
26//! router.add_rule(RouteRule::new(
27//!     PatternMatcher::glob("urgent.*"),
28//!     "high_priority"
29//! ).with_priority(10));
30//!
31//! // Get the queue for a task
32//! assert_eq!(router.route("email.send_newsletter"), Some("email".to_string()));
33//! ```
34
35use regex::Regex;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39/// Pattern matching strategy for task routing
40#[derive(Debug, Clone)]
41pub enum PatternMatcher {
42    /// Exact name match
43    Exact(String),
44    /// Glob pattern (supports * and ?)
45    Glob(GlobPattern),
46    /// Regular expression pattern
47    Regex(RegexPattern),
48    /// Match all tasks
49    All,
50}
51
52impl PatternMatcher {
53    /// Create an exact match pattern
54    #[must_use]
55    pub fn exact(name: impl Into<String>) -> Self {
56        Self::Exact(name.into())
57    }
58
59    /// Create a glob pattern matcher
60    ///
61    /// Supports:
62    /// - `*` matches any sequence of characters
63    /// - `?` matches any single character
64    ///
65    /// # Example
66    ///
67    /// ```rust
68    /// use celers_core::router::PatternMatcher;
69    ///
70    /// let matcher = PatternMatcher::glob("tasks.*");
71    /// assert!(matcher.matches("tasks.add"));
72    /// assert!(matcher.matches("tasks.multiply"));
73    /// assert!(!matcher.matches("other.task"));
74    /// ```
75    #[must_use]
76    pub fn glob(pattern: impl Into<String>) -> Self {
77        Self::Glob(GlobPattern::new(pattern))
78    }
79
80    /// Create a regex pattern matcher
81    ///
82    /// # Example
83    ///
84    /// ```rust
85    /// use celers_core::router::PatternMatcher;
86    ///
87    /// let matcher = PatternMatcher::regex(r"tasks\.[a-z]+").unwrap();
88    /// assert!(matcher.matches("tasks.add"));
89    /// assert!(!matcher.matches("tasks.Add"));
90    /// ```
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the regex pattern is invalid.
95    pub fn regex(pattern: &str) -> Result<Self, regex::Error> {
96        Ok(Self::Regex(RegexPattern::new(pattern)?))
97    }
98
99    /// Create a matcher that matches all tasks
100    #[must_use]
101    pub fn all() -> Self {
102        Self::All
103    }
104
105    /// Check if a task name matches this pattern
106    #[inline]
107    #[must_use]
108    pub fn matches(&self, task_name: &str) -> bool {
109        match self {
110            Self::Exact(name) => task_name == name,
111            Self::Glob(glob) => glob.matches(task_name),
112            Self::Regex(regex) => regex.matches(task_name),
113            Self::All => true,
114        }
115    }
116}
117
118/// Glob pattern for task name matching
119#[derive(Debug, Clone)]
120pub struct GlobPattern {
121    pattern: String,
122    regex: Regex,
123}
124
125impl GlobPattern {
126    /// Create a new glob pattern
127    ///
128    /// # Panics
129    ///
130    /// Panics if the glob pattern cannot be converted to a valid regex.
131    #[must_use]
132    pub fn new(pattern: impl Into<String>) -> Self {
133        let pattern = pattern.into();
134        let regex_str = glob_to_regex(&pattern);
135        let regex = Regex::new(&regex_str).expect("Invalid glob pattern");
136        Self { pattern, regex }
137    }
138
139    /// Check if a task name matches this glob pattern
140    #[inline]
141    #[must_use]
142    pub fn matches(&self, task_name: &str) -> bool {
143        self.regex.is_match(task_name)
144    }
145
146    /// Get the original pattern string
147    #[inline]
148    #[must_use]
149    pub fn pattern(&self) -> &str {
150        &self.pattern
151    }
152}
153
154/// Regular expression pattern for task name matching
155#[derive(Debug, Clone)]
156pub struct RegexPattern {
157    pattern: String,
158    regex: Regex,
159}
160
161impl RegexPattern {
162    /// Create a new regex pattern
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the regex pattern is invalid.
167    pub fn new(pattern: &str) -> Result<Self, regex::Error> {
168        let regex = Regex::new(pattern)?;
169        Ok(Self {
170            pattern: pattern.to_string(),
171            regex,
172        })
173    }
174
175    /// Check if a task name matches this regex pattern
176    #[inline]
177    #[must_use]
178    pub fn matches(&self, task_name: &str) -> bool {
179        self.regex.is_match(task_name)
180    }
181
182    /// Get the original pattern string
183    #[inline]
184    #[must_use]
185    pub fn pattern(&self) -> &str {
186        &self.pattern
187    }
188}
189
190/// Convert a glob pattern to a regex pattern
191fn glob_to_regex(glob: &str) -> String {
192    let mut regex = String::with_capacity(glob.len() * 2 + 2);
193    regex.push('^');
194
195    for c in glob.chars() {
196        match c {
197            '*' => regex.push_str(".*"),
198            '?' => regex.push('.'),
199            '.' | '+' | '(' | ')' | '[' | ']' | '{' | '}' | '^' | '$' | '|' | '\\' => {
200                regex.push('\\');
201                regex.push(c);
202            }
203            _ => regex.push(c),
204        }
205    }
206
207    regex.push('$');
208    regex
209}
210
211/// A routing rule that maps task names to queues
212#[derive(Debug, Clone)]
213pub struct RouteRule {
214    /// Pattern matcher for task names
215    pub matcher: PatternMatcher,
216    /// Target queue name
217    pub queue: String,
218    /// Rule priority (higher = evaluated first)
219    pub priority: i32,
220    /// Optional routing key (for AMQP exchanges)
221    pub routing_key: Option<String>,
222    /// Optional exchange name (for AMQP)
223    pub exchange: Option<String>,
224    /// Optional argument condition for argument-based routing
225    pub argument_condition: Option<ArgumentCondition>,
226}
227
228impl RouteRule {
229    /// Create a new routing rule
230    #[must_use]
231    pub fn new(matcher: PatternMatcher, queue: impl Into<String>) -> Self {
232        Self {
233            matcher,
234            queue: queue.into(),
235            priority: 0,
236            routing_key: None,
237            exchange: None,
238            argument_condition: None,
239        }
240    }
241
242    /// Set the rule priority
243    #[must_use]
244    pub fn with_priority(mut self, priority: i32) -> Self {
245        self.priority = priority;
246        self
247    }
248
249    /// Set the routing key (for AMQP)
250    #[must_use]
251    pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
252        self.routing_key = Some(routing_key.into());
253        self
254    }
255
256    /// Set the exchange name (for AMQP)
257    #[must_use]
258    pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
259        self.exchange = Some(exchange.into());
260        self
261    }
262
263    /// Set the argument condition for argument-based routing
264    #[must_use]
265    pub fn with_argument_condition(mut self, condition: ArgumentCondition) -> Self {
266        self.argument_condition = Some(condition);
267        self
268    }
269
270    /// Check if this rule matches a task name
271    #[inline]
272    #[must_use]
273    pub fn matches(&self, task_name: &str) -> bool {
274        self.matcher.matches(task_name)
275    }
276
277    /// Check if this rule matches a task name and arguments
278    ///
279    /// Returns true if:
280    /// - The task name matches the pattern matcher
281    /// - AND (if `argument_condition` is set) the arguments match the condition
282    #[must_use]
283    pub fn matches_with_args(
284        &self,
285        task_name: &str,
286        args: &[serde_json::Value],
287        kwargs: &serde_json::Map<String, serde_json::Value>,
288    ) -> bool {
289        if !self.matcher.matches(task_name) {
290            return false;
291        }
292
293        match &self.argument_condition {
294            Some(condition) => condition.evaluate(args, kwargs),
295            None => true,
296        }
297    }
298}
299
300// ============================================================================
301// Argument-Based Routing
302// ============================================================================
303
304/// Condition for matching task arguments
305///
306/// Allows routing based on the content of task arguments or keyword arguments.
307#[derive(Debug, Clone, Serialize, Deserialize)]
308#[serde(tag = "type")]
309pub enum ArgumentCondition {
310    /// Check if a positional argument at index equals a value
311    ArgEquals {
312        /// Argument index (0-based)
313        index: usize,
314        /// Expected value
315        value: serde_json::Value,
316    },
317
318    /// Check if a positional argument at index exists
319    ArgExists {
320        /// Argument index (0-based)
321        index: usize,
322    },
323
324    /// Check if a keyword argument equals a value
325    KwargEquals {
326        /// Keyword argument name
327        key: String,
328        /// Expected value
329        value: serde_json::Value,
330    },
331
332    /// Check if a keyword argument exists
333    KwargExists {
334        /// Keyword argument name
335        key: String,
336    },
337
338    /// Check if a keyword argument matches a pattern
339    KwargMatches {
340        /// Keyword argument name
341        key: String,
342        /// Regex pattern
343        pattern: String,
344    },
345
346    /// Check if a positional argument is greater than a threshold
347    ArgGreaterThan {
348        /// Argument index (0-based)
349        index: usize,
350        /// Threshold value
351        threshold: f64,
352    },
353
354    /// Check if a positional argument is less than a threshold
355    ArgLessThan {
356        /// Argument index (0-based)
357        index: usize,
358        /// Threshold value
359        threshold: f64,
360    },
361
362    /// Check if a keyword argument is greater than a threshold
363    KwargGreaterThan {
364        /// Keyword argument name
365        key: String,
366        /// Threshold value
367        threshold: f64,
368    },
369
370    /// Check if a keyword argument is less than a threshold
371    KwargLessThan {
372        /// Keyword argument name
373        key: String,
374        /// Threshold value
375        threshold: f64,
376    },
377
378    /// Check if a keyword argument contains a value (for strings/arrays)
379    KwargContains {
380        /// Keyword argument name
381        key: String,
382        /// Value to search for
383        value: serde_json::Value,
384    },
385
386    /// Logical AND of multiple conditions
387    And(Vec<ArgumentCondition>),
388
389    /// Logical OR of multiple conditions
390    Or(Vec<ArgumentCondition>),
391
392    /// Logical NOT of a condition
393    Not(Box<ArgumentCondition>),
394
395    /// Always true (no argument condition)
396    Always,
397}
398
399impl ArgumentCondition {
400    /// Create a condition that checks if arg\[index\] == value
401    #[must_use]
402    pub fn arg_equals(index: usize, value: serde_json::Value) -> Self {
403        Self::ArgEquals { index, value }
404    }
405
406    /// Create a condition that checks if arg\[index\] exists
407    #[must_use]
408    pub fn arg_exists(index: usize) -> Self {
409        Self::ArgExists { index }
410    }
411
412    /// Create a condition that checks if kwargs\[key\] == value
413    #[must_use]
414    pub fn kwarg_equals(key: impl Into<String>, value: serde_json::Value) -> Self {
415        Self::KwargEquals {
416            key: key.into(),
417            value,
418        }
419    }
420
421    /// Create a condition that checks if kwargs\[key\] exists
422    #[must_use]
423    pub fn kwarg_exists(key: impl Into<String>) -> Self {
424        Self::KwargExists { key: key.into() }
425    }
426
427    /// Create a condition that checks if kwargs\[key\] matches a regex pattern
428    #[must_use]
429    pub fn kwarg_matches(key: impl Into<String>, pattern: impl Into<String>) -> Self {
430        Self::KwargMatches {
431            key: key.into(),
432            pattern: pattern.into(),
433        }
434    }
435
436    /// Create a condition that checks if arg\[index\] > threshold
437    #[must_use]
438    pub fn arg_greater_than(index: usize, threshold: f64) -> Self {
439        Self::ArgGreaterThan { index, threshold }
440    }
441
442    /// Create a condition that checks if arg\[index\] < threshold
443    #[must_use]
444    pub fn arg_less_than(index: usize, threshold: f64) -> Self {
445        Self::ArgLessThan { index, threshold }
446    }
447
448    /// Create a condition that checks if kwargs\[key\] > threshold
449    #[must_use]
450    pub fn kwarg_greater_than(key: impl Into<String>, threshold: f64) -> Self {
451        Self::KwargGreaterThan {
452            key: key.into(),
453            threshold,
454        }
455    }
456
457    /// Create a condition that checks if kwargs\[key\] < threshold
458    #[must_use]
459    pub fn kwarg_less_than(key: impl Into<String>, threshold: f64) -> Self {
460        Self::KwargLessThan {
461            key: key.into(),
462            threshold,
463        }
464    }
465
466    /// Create a condition that checks if kwargs\[key\] contains value
467    #[must_use]
468    pub fn kwarg_contains(key: impl Into<String>, value: serde_json::Value) -> Self {
469        Self::KwargContains {
470            key: key.into(),
471            value,
472        }
473    }
474
475    /// Create an always-true condition
476    #[must_use]
477    pub fn always() -> Self {
478        Self::Always
479    }
480
481    /// Combine with AND
482    #[must_use]
483    pub fn and(self, other: ArgumentCondition) -> Self {
484        match self {
485            Self::And(mut conditions) => {
486                conditions.push(other);
487                Self::And(conditions)
488            }
489            _ => Self::And(vec![self, other]),
490        }
491    }
492
493    /// Combine with OR
494    #[must_use]
495    pub fn or(self, other: ArgumentCondition) -> Self {
496        match self {
497            Self::Or(mut conditions) => {
498                conditions.push(other);
499                Self::Or(conditions)
500            }
501            _ => Self::Or(vec![self, other]),
502        }
503    }
504
505    /// Negate the condition
506    #[must_use]
507    pub fn negate(self) -> Self {
508        Self::Not(Box::new(self))
509    }
510
511    /// Evaluate the condition against task arguments
512    ///
513    /// # Arguments
514    /// * `args` - Positional arguments as JSON values
515    /// * `kwargs` - Keyword arguments as a JSON object
516    #[must_use]
517    pub fn evaluate(
518        &self,
519        args: &[serde_json::Value],
520        kwargs: &serde_json::Map<String, serde_json::Value>,
521    ) -> bool {
522        match self {
523            Self::Always => true,
524
525            Self::ArgEquals { index, value } => args.get(*index).is_some_and(|v| v == value),
526
527            Self::ArgExists { index } => args.len() > *index,
528
529            Self::KwargEquals { key, value } => kwargs.get(key).is_some_and(|v| v == value),
530
531            Self::KwargExists { key } => kwargs.contains_key(key),
532
533            Self::KwargMatches { key, pattern } => {
534                if let Some(serde_json::Value::String(s)) = kwargs.get(key) {
535                    Regex::new(pattern)
536                        .map(|re| re.is_match(s))
537                        .unwrap_or(false)
538                } else {
539                    false
540                }
541            }
542
543            Self::ArgGreaterThan { index, threshold } => args
544                .get(*index)
545                .and_then(serde_json::Value::as_f64)
546                .is_some_and(|v| v > *threshold),
547
548            Self::ArgLessThan { index, threshold } => args
549                .get(*index)
550                .and_then(serde_json::Value::as_f64)
551                .is_some_and(|v| v < *threshold),
552
553            Self::KwargGreaterThan { key, threshold } => kwargs
554                .get(key)
555                .and_then(serde_json::Value::as_f64)
556                .is_some_and(|v| v > *threshold),
557
558            Self::KwargLessThan { key, threshold } => kwargs
559                .get(key)
560                .and_then(serde_json::Value::as_f64)
561                .is_some_and(|v| v < *threshold),
562
563            Self::KwargContains { key, value } => {
564                if let Some(v) = kwargs.get(key) {
565                    match v {
566                        serde_json::Value::String(s) => {
567                            if let Some(needle) = value.as_str() {
568                                s.contains(needle)
569                            } else {
570                                false
571                            }
572                        }
573                        serde_json::Value::Array(arr) => arr.contains(value),
574                        _ => false,
575                    }
576                } else {
577                    false
578                }
579            }
580
581            Self::And(conditions) => conditions.iter().all(|c| c.evaluate(args, kwargs)),
582
583            Self::Or(conditions) => conditions.iter().any(|c| c.evaluate(args, kwargs)),
584
585            Self::Not(condition) => !condition.evaluate(args, kwargs),
586        }
587    }
588}
589
590impl std::fmt::Display for ArgumentCondition {
591    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592        match self {
593            Self::Always => write!(f, "always"),
594            Self::ArgEquals { index, value } => write!(f, "args[{index}] == {value}"),
595            Self::ArgExists { index } => write!(f, "args[{index}] exists"),
596            Self::KwargEquals { key, value } => write!(f, "kwargs[{key}] == {value}"),
597            Self::KwargExists { key } => write!(f, "kwargs[{key}] exists"),
598            Self::KwargMatches { key, pattern } => {
599                write!(f, "kwargs[{key}] matches /{pattern}/")
600            }
601            Self::ArgGreaterThan { index, threshold } => {
602                write!(f, "args[{index}] > {threshold}")
603            }
604            Self::ArgLessThan { index, threshold } => write!(f, "args[{index}] < {threshold}"),
605            Self::KwargGreaterThan { key, threshold } => {
606                write!(f, "kwargs[{key}] > {threshold}")
607            }
608            Self::KwargLessThan { key, threshold } => write!(f, "kwargs[{key}] < {threshold}"),
609            Self::KwargContains { key, value } => {
610                write!(f, "kwargs[{key}] contains {value}")
611            }
612            Self::And(conditions) => {
613                let parts: Vec<String> = conditions.iter().map(|c| format!("{c}")).collect();
614                write!(f, "({})", parts.join(" AND "))
615            }
616            Self::Or(conditions) => {
617                let parts: Vec<String> = conditions.iter().map(|c| format!("{c}")).collect();
618                write!(f, "({})", parts.join(" OR "))
619            }
620            Self::Not(condition) => write!(f, "NOT ({condition})"),
621        }
622    }
623}
624
625/// Routing result containing queue and optional AMQP settings
626#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
627pub struct RouteResult {
628    /// Target queue name
629    pub queue: String,
630    /// Optional routing key (for AMQP)
631    pub routing_key: Option<String>,
632    /// Optional exchange name (for AMQP)
633    pub exchange: Option<String>,
634}
635
636impl RouteResult {
637    /// Create a new route result
638    #[must_use]
639    pub fn new(queue: impl Into<String>) -> Self {
640        Self {
641            queue: queue.into(),
642            routing_key: None,
643            exchange: None,
644        }
645    }
646
647    /// Create from a route rule
648    #[must_use]
649    pub fn from_rule(rule: &RouteRule) -> Self {
650        Self {
651            queue: rule.queue.clone(),
652            routing_key: rule.routing_key.clone(),
653            exchange: rule.exchange.clone(),
654        }
655    }
656}
657
658/// Task router for directing tasks to appropriate queues
659#[derive(Debug, Default)]
660pub struct Router {
661    /// Routing rules (sorted by priority)
662    rules: Vec<RouteRule>,
663    /// Default queue for unmatched tasks
664    default_queue: Option<String>,
665    /// Direct task-to-queue mappings
666    direct_routes: HashMap<String, RouteResult>,
667}
668
669impl Router {
670    /// Create a new empty router
671    #[must_use]
672    pub fn new() -> Self {
673        Self::default()
674    }
675
676    /// Create a router with a default queue
677    #[must_use]
678    pub fn with_default_queue(queue: impl Into<String>) -> Self {
679        Self {
680            rules: Vec::new(),
681            default_queue: Some(queue.into()),
682            direct_routes: HashMap::new(),
683        }
684    }
685
686    /// Add a routing rule
687    ///
688    /// Rules are sorted by priority (higher priority rules are evaluated first)
689    pub fn add_rule(&mut self, rule: RouteRule) {
690        self.rules.push(rule);
691        // Sort by priority (descending)
692        self.rules.sort_by(|a, b| b.priority.cmp(&a.priority));
693    }
694
695    /// Add a direct route for a specific task
696    ///
697    /// Direct routes take precedence over pattern-based rules
698    pub fn add_direct_route(&mut self, task_name: impl Into<String>, result: RouteResult) {
699        self.direct_routes.insert(task_name.into(), result);
700    }
701
702    /// Set the default queue for unmatched tasks
703    pub fn set_default_queue(&mut self, queue: impl Into<String>) {
704        self.default_queue = Some(queue.into());
705    }
706
707    /// Route a task to a queue
708    ///
709    /// Returns `None` if no matching rule and no default queue
710    #[must_use]
711    pub fn route(&self, task_name: &str) -> Option<String> {
712        self.route_full(task_name).map(|r| r.queue)
713    }
714
715    /// Route a task and get full routing information
716    ///
717    /// Returns `None` if no matching rule and no default queue
718    #[must_use]
719    pub fn route_full(&self, task_name: &str) -> Option<RouteResult> {
720        // Check direct routes first
721        if let Some(result) = self.direct_routes.get(task_name) {
722            return Some(result.clone());
723        }
724
725        // Check pattern-based rules
726        for rule in &self.rules {
727            if rule.matches(task_name) {
728                return Some(RouteResult::from_rule(rule));
729            }
730        }
731
732        // Fall back to default queue
733        self.default_queue.as_ref().map(RouteResult::new)
734    }
735
736    /// Route a task with arguments to a queue
737    ///
738    /// This method considers both task name patterns and argument conditions.
739    /// Returns `None` if no matching rule and no default queue.
740    #[must_use]
741    pub fn route_with_args(
742        &self,
743        task_name: &str,
744        args: &[serde_json::Value],
745        kwargs: &serde_json::Map<String, serde_json::Value>,
746    ) -> Option<String> {
747        self.route_full_with_args(task_name, args, kwargs)
748            .map(|r| r.queue)
749    }
750
751    /// Route a task with arguments and get full routing information
752    ///
753    /// This method considers both task name patterns and argument conditions.
754    /// Returns `None` if no matching rule and no default queue.
755    #[must_use]
756    pub fn route_full_with_args(
757        &self,
758        task_name: &str,
759        args: &[serde_json::Value],
760        kwargs: &serde_json::Map<String, serde_json::Value>,
761    ) -> Option<RouteResult> {
762        // Check direct routes first (direct routes don't have argument conditions)
763        if let Some(result) = self.direct_routes.get(task_name) {
764            return Some(result.clone());
765        }
766
767        // Check pattern-based rules with argument conditions
768        for rule in &self.rules {
769            if rule.matches_with_args(task_name, args, kwargs) {
770                return Some(RouteResult::from_rule(rule));
771            }
772        }
773
774        // Fall back to default queue
775        self.default_queue.as_ref().map(RouteResult::new)
776    }
777
778    /// Check if a task has any matching route
779    #[inline]
780    #[must_use]
781    pub fn has_route(&self, task_name: &str) -> bool {
782        self.direct_routes.contains_key(task_name)
783            || self.rules.iter().any(|r| r.matches(task_name))
784            || self.default_queue.is_some()
785    }
786
787    /// Get all registered rules
788    #[inline]
789    #[must_use]
790    pub fn rules(&self) -> &[RouteRule] {
791        &self.rules
792    }
793
794    /// Remove all rules matching a pattern
795    pub fn remove_rules_by_queue(&mut self, queue: &str) {
796        self.rules.retain(|r| r.queue != queue);
797    }
798
799    /// Clear all rules
800    pub fn clear(&mut self) {
801        self.rules.clear();
802        self.direct_routes.clear();
803    }
804}
805
806/// Builder for creating routers with fluent API
807#[derive(Debug, Default)]
808pub struct RouterBuilder {
809    router: Router,
810}
811
812impl RouterBuilder {
813    /// Create a new router builder
814    #[must_use]
815    pub fn new() -> Self {
816        Self::default()
817    }
818
819    /// Add a rule that routes tasks matching a glob pattern to a queue
820    #[must_use]
821    pub fn route_glob(mut self, pattern: &str, queue: &str) -> Self {
822        self.router
823            .add_rule(RouteRule::new(PatternMatcher::glob(pattern), queue));
824        self
825    }
826
827    /// Add a rule that routes tasks matching a regex pattern to a queue
828    ///
829    /// # Errors
830    ///
831    /// Returns an error if the regex pattern is invalid.
832    pub fn route_regex(mut self, pattern: &str, queue: &str) -> Result<Self, regex::Error> {
833        self.router
834            .add_rule(RouteRule::new(PatternMatcher::regex(pattern)?, queue));
835        Ok(self)
836    }
837
838    /// Add a rule that routes a specific task to a queue
839    #[must_use]
840    pub fn route_exact(mut self, task_name: &str, queue: &str) -> Self {
841        self.router
842            .add_rule(RouteRule::new(PatternMatcher::exact(task_name), queue));
843        self
844    }
845
846    /// Add a direct route for a specific task
847    #[must_use]
848    pub fn direct_route(mut self, task_name: &str, queue: &str) -> Self {
849        self.router
850            .add_direct_route(task_name, RouteResult::new(queue));
851        self
852    }
853
854    /// Add a rule that routes tasks based on argument conditions
855    ///
856    /// # Example
857    /// ```
858    /// use celers_core::router::{RouterBuilder, PatternMatcher, ArgumentCondition};
859    ///
860    /// let router = RouterBuilder::new()
861    ///     .route_with_args(
862    ///         PatternMatcher::glob("process.*"),
863    ///         "high_priority",
864    ///         ArgumentCondition::kwarg_equals("priority", serde_json::json!("high")),
865    ///     )
866    ///     .route_with_args(
867    ///         PatternMatcher::glob("process.*"),
868    ///         "low_priority",
869    ///         ArgumentCondition::kwarg_equals("priority", serde_json::json!("low")),
870    ///     )
871    ///     .default_queue("default")
872    ///     .build();
873    /// ```
874    #[must_use]
875    pub fn route_with_args(
876        mut self,
877        matcher: PatternMatcher,
878        queue: &str,
879        condition: ArgumentCondition,
880    ) -> Self {
881        self.router
882            .add_rule(RouteRule::new(matcher, queue).with_argument_condition(condition));
883        self
884    }
885
886    /// Add a rule with both priority and argument condition
887    #[must_use]
888    pub fn route_with_args_priority(
889        mut self,
890        matcher: PatternMatcher,
891        queue: &str,
892        condition: ArgumentCondition,
893        priority: i32,
894    ) -> Self {
895        self.router.add_rule(
896            RouteRule::new(matcher, queue)
897                .with_argument_condition(condition)
898                .with_priority(priority),
899        );
900        self
901    }
902
903    /// Set the default queue for unmatched tasks
904    #[must_use]
905    pub fn default_queue(mut self, queue: &str) -> Self {
906        self.router.set_default_queue(queue);
907        self
908    }
909
910    /// Build the router
911    #[must_use]
912    pub fn build(self) -> Router {
913        self.router
914    }
915}
916
917/// Serializable routing configuration
918#[derive(Debug, Clone, Serialize, Deserialize)]
919pub struct RoutingConfig {
920    /// Default queue for unmatched tasks
921    #[serde(default)]
922    pub default_queue: Option<String>,
923    /// Routing rules (glob pattern -> queue)
924    #[serde(default)]
925    pub routes: HashMap<String, String>,
926    /// Direct task-to-queue mappings
927    #[serde(default)]
928    pub task_routes: HashMap<String, String>,
929}
930
931impl RoutingConfig {
932    /// Create a new empty routing configuration
933    #[must_use]
934    pub fn new() -> Self {
935        Self {
936            default_queue: None,
937            routes: HashMap::new(),
938            task_routes: HashMap::new(),
939        }
940    }
941
942    /// Create a router from this configuration
943    #[must_use]
944    pub fn into_router(self) -> Router {
945        let mut router = match self.default_queue {
946            Some(queue) => Router::with_default_queue(queue),
947            None => Router::new(),
948        };
949
950        // Add glob pattern routes
951        for (pattern, queue) in self.routes {
952            router.add_rule(RouteRule::new(PatternMatcher::glob(&pattern), queue));
953        }
954
955        // Add direct task routes
956        for (task_name, queue) in self.task_routes {
957            router.add_direct_route(task_name, RouteResult::new(queue));
958        }
959
960        router
961    }
962}
963
964impl Default for RoutingConfig {
965    fn default() -> Self {
966        Self::new()
967    }
968}
969
970// ============================================================================
971// Topic-Based Routing (AMQP Topic Exchange Pattern)
972// ============================================================================
973
974/// Topic pattern matcher for AMQP-style topic routing
975///
976/// Supports wildcards:
977/// - `*` (star) matches exactly one word
978/// - `#` (hash) matches zero or more words
979/// - Words are separated by dots (`.`)
980///
981/// # Examples
982///
983/// ```rust
984/// use celers_core::router::TopicPattern;
985///
986/// let pattern = TopicPattern::new("user.*.created");
987/// assert!(pattern.matches("user.email.created"));
988/// assert!(pattern.matches("user.profile.created"));
989/// assert!(!pattern.matches("user.created"));  // No middle word
990/// assert!(!pattern.matches("user.email.verified.created"));  // Too many words
991///
992/// let pattern = TopicPattern::new("user.#");
993/// assert!(pattern.matches("user.email"));
994/// assert!(pattern.matches("user.email.created"));
995/// assert!(pattern.matches("user.email.verified.sent"));
996/// assert!(!pattern.matches("admin.email"));  // Doesn't start with "user"
997/// ```
998#[derive(Debug, Clone)]
999pub struct TopicPattern {
1000    pattern: String,
1001    segments: Vec<TopicSegment>,
1002}
1003
1004#[derive(Debug, Clone, PartialEq)]
1005enum TopicSegment {
1006    /// Exact word match
1007    Literal(String),
1008    /// Wildcard: matches exactly one word (*)
1009    Star,
1010    /// Wildcard: matches zero or more words (#)
1011    Hash,
1012}
1013
1014impl TopicPattern {
1015    /// Create a new topic pattern
1016    pub fn new(pattern: impl Into<String>) -> Self {
1017        let pattern = pattern.into();
1018        let segments = Self::parse(&pattern);
1019        Self { pattern, segments }
1020    }
1021
1022    fn parse(pattern: &str) -> Vec<TopicSegment> {
1023        pattern
1024            .split('.')
1025            .map(|s| match s {
1026                "*" => TopicSegment::Star,
1027                "#" => TopicSegment::Hash,
1028                literal => TopicSegment::Literal(literal.to_string()),
1029            })
1030            .collect()
1031    }
1032
1033    /// Check if a routing key matches this topic pattern
1034    #[inline]
1035    #[must_use]
1036    pub fn matches(&self, routing_key: &str) -> bool {
1037        let key_parts: Vec<&str> = routing_key.split('.').collect();
1038        self.matches_parts(&key_parts, 0, 0)
1039    }
1040
1041    fn matches_parts(&self, key_parts: &[&str], key_idx: usize, pattern_idx: usize) -> bool {
1042        // Base cases
1043        if pattern_idx >= self.segments.len() {
1044            return key_idx >= key_parts.len();
1045        }
1046
1047        if key_idx >= key_parts.len() {
1048            // Check if remaining segments are all # (which match zero words)
1049            return self.segments[pattern_idx..]
1050                .iter()
1051                .all(|seg| matches!(seg, TopicSegment::Hash));
1052        }
1053
1054        match &self.segments[pattern_idx] {
1055            TopicSegment::Literal(literal) => {
1056                if key_parts[key_idx] == literal {
1057                    self.matches_parts(key_parts, key_idx + 1, pattern_idx + 1)
1058                } else {
1059                    false
1060                }
1061            }
1062            TopicSegment::Star => {
1063                // * matches exactly one word
1064                self.matches_parts(key_parts, key_idx + 1, pattern_idx + 1)
1065            }
1066            TopicSegment::Hash => {
1067                // # matches zero or more words
1068                // Try matching zero words (skip this segment)
1069                if self.matches_parts(key_parts, key_idx, pattern_idx + 1) {
1070                    return true;
1071                }
1072                // Try matching one or more words
1073                for i in key_idx..key_parts.len() {
1074                    if self.matches_parts(key_parts, i + 1, pattern_idx + 1) {
1075                        return true;
1076                    }
1077                }
1078                // Also check if # can match all remaining words
1079                pattern_idx + 1 >= self.segments.len()
1080            }
1081        }
1082    }
1083
1084    /// Get the original pattern string
1085    #[inline]
1086    #[must_use]
1087    pub fn pattern(&self) -> &str {
1088        &self.pattern
1089    }
1090
1091    /// Get pattern complexity (number of segments)
1092    #[inline]
1093    #[must_use]
1094    pub const fn complexity(&self) -> usize {
1095        self.segments.len()
1096    }
1097
1098    /// Check if pattern contains wildcards
1099    #[inline]
1100    #[must_use]
1101    pub fn has_wildcards(&self) -> bool {
1102        self.segments
1103            .iter()
1104            .any(|s| matches!(s, TopicSegment::Star | TopicSegment::Hash))
1105    }
1106
1107    /// Check if pattern is exact (no wildcards)
1108    #[inline]
1109    #[must_use]
1110    pub fn is_exact(&self) -> bool {
1111        !self.has_wildcards()
1112    }
1113}
1114
1115/// Topic exchange router for AMQP-style topic routing
1116///
1117/// Routes messages based on topic patterns with wildcards.
1118#[derive(Debug, Clone)]
1119pub struct TopicRouter {
1120    /// Topic bindings: (pattern, queue)
1121    bindings: Vec<(TopicPattern, String)>,
1122
1123    /// Default queue for unmatched topics
1124    default_queue: Option<String>,
1125}
1126
1127impl TopicRouter {
1128    /// Create a new topic router
1129    #[must_use]
1130    pub fn new() -> Self {
1131        Self {
1132            bindings: Vec::new(),
1133            default_queue: None,
1134        }
1135    }
1136
1137    /// Bind a topic pattern to a queue
1138    pub fn bind(&mut self, pattern: impl Into<String>, queue: impl Into<String>) {
1139        let pattern = TopicPattern::new(pattern);
1140        self.bindings.push((pattern, queue.into()));
1141    }
1142
1143    /// Bind multiple patterns to a queue
1144    pub fn bind_many(&mut self, patterns: Vec<String>, queue: impl Into<String>) {
1145        let queue = queue.into();
1146        for pattern in patterns {
1147            self.bind(pattern, queue.clone());
1148        }
1149    }
1150
1151    /// Set default queue for unmatched routing keys
1152    pub fn set_default_queue(&mut self, queue: impl Into<String>) {
1153        self.default_queue = Some(queue.into());
1154    }
1155
1156    /// Route a message based on routing key
1157    ///
1158    /// Returns the first matching queue, or the default queue if no match.
1159    #[must_use]
1160    pub fn route(&self, routing_key: &str) -> Option<String> {
1161        for (pattern, queue) in &self.bindings {
1162            if pattern.matches(routing_key) {
1163                return Some(queue.clone());
1164            }
1165        }
1166
1167        self.default_queue.clone()
1168    }
1169
1170    /// Get all queues that match a routing key
1171    #[must_use]
1172    pub fn route_all(&self, routing_key: &str) -> Vec<String> {
1173        self.bindings
1174            .iter()
1175            .filter(|(pattern, _)| pattern.matches(routing_key))
1176            .map(|(_, queue)| queue.clone())
1177            .collect()
1178    }
1179
1180    /// Remove all bindings for a queue
1181    pub fn unbind_queue(&mut self, queue: &str) -> usize {
1182        let original_len = self.bindings.len();
1183        self.bindings.retain(|(_, q)| q != queue);
1184        original_len - self.bindings.len()
1185    }
1186
1187    /// Remove a specific binding
1188    pub fn unbind_pattern(&mut self, pattern: &str) -> bool {
1189        let original_len = self.bindings.len();
1190        self.bindings.retain(|(p, _)| p.pattern() != pattern);
1191        self.bindings.len() < original_len
1192    }
1193
1194    /// Get all bindings
1195    #[must_use]
1196    pub fn bindings(&self) -> Vec<(String, String)> {
1197        self.bindings
1198            .iter()
1199            .map(|(pattern, queue)| (pattern.pattern().to_string(), queue.clone()))
1200            .collect()
1201    }
1202
1203    /// Clear all bindings
1204    pub fn clear(&mut self) {
1205        self.bindings.clear();
1206        self.default_queue = None;
1207    }
1208
1209    /// Get number of bindings
1210    #[must_use]
1211    pub const fn binding_count(&self) -> usize {
1212        self.bindings.len()
1213    }
1214
1215    /// Check if a routing key has any matches
1216    #[inline]
1217    #[must_use]
1218    pub fn has_match(&self, routing_key: &str) -> bool {
1219        self.bindings.iter().any(|(p, _)| p.matches(routing_key)) || self.default_queue.is_some()
1220    }
1221}
1222
1223impl Default for TopicRouter {
1224    fn default() -> Self {
1225        Self::new()
1226    }
1227}
1228
1229/// Topic exchange configuration for declarative setup
1230#[derive(Debug, Clone, Serialize, Deserialize)]
1231pub struct TopicExchangeConfig {
1232    /// Exchange name
1233    pub name: String,
1234
1235    /// Topic bindings: pattern -> queue
1236    pub bindings: HashMap<String, String>,
1237
1238    /// Default queue for unmatched topics
1239    #[serde(skip_serializing_if = "Option::is_none")]
1240    pub default_queue: Option<String>,
1241
1242    /// Whether the exchange is durable
1243    #[serde(default = "default_true")]
1244    pub durable: bool,
1245
1246    /// Whether to auto-delete when unused
1247    #[serde(default)]
1248    pub auto_delete: bool,
1249}
1250
1251fn default_true() -> bool {
1252    true
1253}
1254
1255impl TopicExchangeConfig {
1256    /// Create a new topic exchange configuration
1257    pub fn new(name: impl Into<String>) -> Self {
1258        Self {
1259            name: name.into(),
1260            bindings: HashMap::new(),
1261            default_queue: None,
1262            durable: true,
1263            auto_delete: false,
1264        }
1265    }
1266
1267    /// Add a topic binding
1268    #[must_use]
1269    pub fn with_binding(mut self, pattern: impl Into<String>, queue: impl Into<String>) -> Self {
1270        self.bindings.insert(pattern.into(), queue.into());
1271        self
1272    }
1273
1274    /// Set default queue
1275    #[must_use]
1276    pub fn with_default_queue(mut self, queue: impl Into<String>) -> Self {
1277        self.default_queue = Some(queue.into());
1278        self
1279    }
1280
1281    /// Set durable flag
1282    #[must_use]
1283    pub fn with_durable(mut self, durable: bool) -> Self {
1284        self.durable = durable;
1285        self
1286    }
1287
1288    /// Build a topic router from this configuration
1289    #[must_use]
1290    pub fn build_router(&self) -> TopicRouter {
1291        let mut router = TopicRouter::new();
1292
1293        for (pattern, queue) in &self.bindings {
1294            router.bind(pattern.clone(), queue.clone());
1295        }
1296
1297        if let Some(ref default_queue) = self.default_queue {
1298            router.set_default_queue(default_queue.clone());
1299        }
1300
1301        router
1302    }
1303}
1304
1305#[cfg(test)]
1306mod tests {
1307    use super::*;
1308
1309    #[test]
1310    fn test_exact_pattern() {
1311        let matcher = PatternMatcher::exact("tasks.add");
1312        assert!(matcher.matches("tasks.add"));
1313        assert!(!matcher.matches("tasks.multiply"));
1314        assert!(!matcher.matches("tasks"));
1315    }
1316
1317    #[test]
1318    fn test_glob_pattern() {
1319        let matcher = PatternMatcher::glob("tasks.*");
1320        assert!(matcher.matches("tasks.add"));
1321        assert!(matcher.matches("tasks.multiply"));
1322        assert!(!matcher.matches("other.task"));
1323        assert!(!matcher.matches("tasks"));
1324
1325        let matcher = PatternMatcher::glob("*.add");
1326        assert!(matcher.matches("tasks.add"));
1327        assert!(matcher.matches("math.add"));
1328        assert!(!matcher.matches("tasks.multiply"));
1329
1330        let matcher = PatternMatcher::glob("task?");
1331        assert!(matcher.matches("task1"));
1332        assert!(matcher.matches("taskA"));
1333        assert!(matcher.matches("tasks")); // 's' is a single character, so it matches
1334        assert!(!matcher.matches("task")); // No character after "task"
1335        assert!(!matcher.matches("task12")); // Two characters after "task"
1336    }
1337
1338    #[test]
1339    fn test_regex_pattern() {
1340        let matcher = PatternMatcher::regex(r"tasks\.[a-z]+").unwrap();
1341        assert!(matcher.matches("tasks.add"));
1342        assert!(matcher.matches("tasks.multiply"));
1343        assert!(!matcher.matches("tasks.Add"));
1344        assert!(!matcher.matches("tasks.123"));
1345
1346        let matcher = PatternMatcher::regex(r"^(email|sms)\.").unwrap();
1347        assert!(matcher.matches("email.send"));
1348        assert!(matcher.matches("sms.send"));
1349        assert!(!matcher.matches("push.send"));
1350    }
1351
1352    #[test]
1353    fn test_all_pattern() {
1354        let matcher = PatternMatcher::all();
1355        assert!(matcher.matches("anything"));
1356        assert!(matcher.matches(""));
1357        assert!(matcher.matches("complex.task.name"));
1358    }
1359
1360    #[test]
1361    fn test_router_basic() {
1362        let mut router = Router::new();
1363        router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1364        router.add_rule(RouteRule::new(PatternMatcher::glob("sms.*"), "sms"));
1365
1366        assert_eq!(router.route("email.send"), Some("email".to_string()));
1367        assert_eq!(router.route("sms.notify"), Some("sms".to_string()));
1368        assert_eq!(router.route("push.notify"), None);
1369    }
1370
1371    #[test]
1372    fn test_router_with_default() {
1373        let mut router = Router::with_default_queue("default");
1374        router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1375
1376        assert_eq!(router.route("email.send"), Some("email".to_string()));
1377        assert_eq!(router.route("other.task"), Some("default".to_string()));
1378    }
1379
1380    #[test]
1381    fn test_router_priority() {
1382        let mut router = Router::new();
1383        router.add_rule(RouteRule::new(PatternMatcher::glob("*"), "default").with_priority(0));
1384        router
1385            .add_rule(RouteRule::new(PatternMatcher::glob("urgent.*"), "urgent").with_priority(10));
1386
1387        // Urgent tasks should go to urgent queue
1388        assert_eq!(router.route("urgent.email"), Some("urgent".to_string()));
1389        // Other tasks should go to default queue
1390        assert_eq!(router.route("email.send"), Some("default".to_string()));
1391    }
1392
1393    #[test]
1394    fn test_router_direct_route() {
1395        let mut router = Router::new();
1396        router.add_rule(RouteRule::new(PatternMatcher::glob("tasks.*"), "tasks"));
1397        router.add_direct_route("tasks.special", RouteResult::new("special"));
1398
1399        // Direct route takes precedence
1400        assert_eq!(router.route("tasks.special"), Some("special".to_string()));
1401        // Pattern-based route
1402        assert_eq!(router.route("tasks.normal"), Some("tasks".to_string()));
1403    }
1404
1405    #[test]
1406    fn test_route_result() {
1407        let mut router = Router::new();
1408        router.add_rule(
1409            RouteRule::new(PatternMatcher::glob("amqp.*"), "amqp_queue")
1410                .with_routing_key("amqp.routing")
1411                .with_exchange("amqp_exchange"),
1412        );
1413
1414        let result = router.route_full("amqp.task").unwrap();
1415        assert_eq!(result.queue, "amqp_queue");
1416        assert_eq!(result.routing_key, Some("amqp.routing".to_string()));
1417        assert_eq!(result.exchange, Some("amqp_exchange".to_string()));
1418    }
1419
1420    #[test]
1421    fn test_router_builder() {
1422        let router = RouterBuilder::new()
1423            .route_glob("email.*", "email")
1424            .route_glob("sms.*", "sms")
1425            .direct_route("special.task", "special")
1426            .default_queue("default")
1427            .build();
1428
1429        assert_eq!(router.route("email.send"), Some("email".to_string()));
1430        assert_eq!(router.route("sms.notify"), Some("sms".to_string()));
1431        assert_eq!(router.route("special.task"), Some("special".to_string()));
1432        assert_eq!(router.route("other.task"), Some("default".to_string()));
1433    }
1434
1435    #[test]
1436    fn test_routing_config() {
1437        let mut config = RoutingConfig::new();
1438        config.default_queue = Some("default".to_string());
1439        config
1440            .routes
1441            .insert("email.*".to_string(), "email".to_string());
1442        config
1443            .task_routes
1444            .insert("special.task".to_string(), "special".to_string());
1445
1446        let router = config.into_router();
1447        assert_eq!(router.route("email.send"), Some("email".to_string()));
1448        assert_eq!(router.route("special.task"), Some("special".to_string()));
1449        assert_eq!(router.route("other.task"), Some("default".to_string()));
1450    }
1451
1452    #[test]
1453    fn test_routing_config_serialization() {
1454        let mut config = RoutingConfig::new();
1455        config.default_queue = Some("default".to_string());
1456        config
1457            .routes
1458            .insert("email.*".to_string(), "email".to_string());
1459
1460        let json = serde_json::to_string(&config).unwrap();
1461        let parsed: RoutingConfig = serde_json::from_str(&json).unwrap();
1462
1463        assert_eq!(parsed.default_queue, Some("default".to_string()));
1464        assert_eq!(parsed.routes.get("email.*"), Some(&"email".to_string()));
1465    }
1466
1467    #[test]
1468    fn test_glob_special_chars() {
1469        // Test escaping of regex special characters in glob patterns
1470        let matcher = PatternMatcher::glob("tasks.v1.0");
1471        assert!(matcher.matches("tasks.v1.0"));
1472        assert!(!matcher.matches("tasks.v1x0"));
1473
1474        let matcher = PatternMatcher::glob("(test)");
1475        assert!(matcher.matches("(test)"));
1476        assert!(!matcher.matches("test"));
1477    }
1478
1479    #[test]
1480    fn test_has_route() {
1481        let mut router = Router::new();
1482        router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1483
1484        assert!(router.has_route("email.send"));
1485        assert!(!router.has_route("sms.send"));
1486
1487        router.set_default_queue("default");
1488        assert!(router.has_route("sms.send"));
1489    }
1490
1491    #[test]
1492    fn test_remove_rules() {
1493        let mut router = Router::new();
1494        router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1495        router.add_rule(RouteRule::new(PatternMatcher::glob("sms.*"), "sms"));
1496
1497        router.remove_rules_by_queue("email");
1498        assert_eq!(router.route("email.send"), None);
1499        assert_eq!(router.route("sms.send"), Some("sms".to_string()));
1500    }
1501
1502    #[test]
1503    fn test_clear() {
1504        let mut router = Router::new();
1505        router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
1506        router.add_direct_route("special", RouteResult::new("special"));
1507
1508        router.clear();
1509        assert_eq!(router.route("email.send"), None);
1510        assert_eq!(router.route("special"), None);
1511    }
1512}