use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub enum PatternMatcher {
Exact(String),
Glob(GlobPattern),
Regex(RegexPattern),
All,
}
impl PatternMatcher {
#[must_use]
pub fn exact(name: impl Into<String>) -> Self {
Self::Exact(name.into())
}
#[must_use]
pub fn glob(pattern: impl Into<String>) -> Self {
Self::Glob(GlobPattern::new(pattern))
}
pub fn regex(pattern: &str) -> Result<Self, regex::Error> {
Ok(Self::Regex(RegexPattern::new(pattern)?))
}
#[must_use]
pub fn all() -> Self {
Self::All
}
#[inline]
#[must_use]
pub fn matches(&self, task_name: &str) -> bool {
match self {
Self::Exact(name) => task_name == name,
Self::Glob(glob) => glob.matches(task_name),
Self::Regex(regex) => regex.matches(task_name),
Self::All => true,
}
}
}
#[derive(Debug, Clone)]
pub struct GlobPattern {
pattern: String,
regex: Regex,
}
impl GlobPattern {
#[must_use]
pub fn new(pattern: impl Into<String>) -> Self {
let pattern = pattern.into();
let regex_str = glob_to_regex(&pattern);
let regex = Regex::new(®ex_str).expect("Invalid glob pattern");
Self { pattern, regex }
}
#[inline]
#[must_use]
pub fn matches(&self, task_name: &str) -> bool {
self.regex.is_match(task_name)
}
#[inline]
#[must_use]
pub fn pattern(&self) -> &str {
&self.pattern
}
}
#[derive(Debug, Clone)]
pub struct RegexPattern {
pattern: String,
regex: Regex,
}
impl RegexPattern {
pub fn new(pattern: &str) -> Result<Self, regex::Error> {
let regex = Regex::new(pattern)?;
Ok(Self {
pattern: pattern.to_string(),
regex,
})
}
#[inline]
#[must_use]
pub fn matches(&self, task_name: &str) -> bool {
self.regex.is_match(task_name)
}
#[inline]
#[must_use]
pub fn pattern(&self) -> &str {
&self.pattern
}
}
fn glob_to_regex(glob: &str) -> String {
let mut regex = String::with_capacity(glob.len() * 2 + 2);
regex.push('^');
for c in glob.chars() {
match c {
'*' => regex.push_str(".*"),
'?' => regex.push('.'),
'.' | '+' | '(' | ')' | '[' | ']' | '{' | '}' | '^' | '$' | '|' | '\\' => {
regex.push('\\');
regex.push(c);
}
_ => regex.push(c),
}
}
regex.push('$');
regex
}
#[derive(Debug, Clone)]
pub struct RouteRule {
pub matcher: PatternMatcher,
pub queue: String,
pub priority: i32,
pub routing_key: Option<String>,
pub exchange: Option<String>,
pub argument_condition: Option<ArgumentCondition>,
}
impl RouteRule {
#[must_use]
pub fn new(matcher: PatternMatcher, queue: impl Into<String>) -> Self {
Self {
matcher,
queue: queue.into(),
priority: 0,
routing_key: None,
exchange: None,
argument_condition: None,
}
}
#[must_use]
pub fn with_priority(mut self, priority: i32) -> Self {
self.priority = priority;
self
}
#[must_use]
pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
self.routing_key = Some(routing_key.into());
self
}
#[must_use]
pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
self.exchange = Some(exchange.into());
self
}
#[must_use]
pub fn with_argument_condition(mut self, condition: ArgumentCondition) -> Self {
self.argument_condition = Some(condition);
self
}
#[inline]
#[must_use]
pub fn matches(&self, task_name: &str) -> bool {
self.matcher.matches(task_name)
}
#[must_use]
pub fn matches_with_args(
&self,
task_name: &str,
args: &[serde_json::Value],
kwargs: &serde_json::Map<String, serde_json::Value>,
) -> bool {
if !self.matcher.matches(task_name) {
return false;
}
match &self.argument_condition {
Some(condition) => condition.evaluate(args, kwargs),
None => true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ArgumentCondition {
ArgEquals {
index: usize,
value: serde_json::Value,
},
ArgExists {
index: usize,
},
KwargEquals {
key: String,
value: serde_json::Value,
},
KwargExists {
key: String,
},
KwargMatches {
key: String,
pattern: String,
},
ArgGreaterThan {
index: usize,
threshold: f64,
},
ArgLessThan {
index: usize,
threshold: f64,
},
KwargGreaterThan {
key: String,
threshold: f64,
},
KwargLessThan {
key: String,
threshold: f64,
},
KwargContains {
key: String,
value: serde_json::Value,
},
And(Vec<ArgumentCondition>),
Or(Vec<ArgumentCondition>),
Not(Box<ArgumentCondition>),
Always,
}
impl ArgumentCondition {
#[must_use]
pub fn arg_equals(index: usize, value: serde_json::Value) -> Self {
Self::ArgEquals { index, value }
}
#[must_use]
pub fn arg_exists(index: usize) -> Self {
Self::ArgExists { index }
}
#[must_use]
pub fn kwarg_equals(key: impl Into<String>, value: serde_json::Value) -> Self {
Self::KwargEquals {
key: key.into(),
value,
}
}
#[must_use]
pub fn kwarg_exists(key: impl Into<String>) -> Self {
Self::KwargExists { key: key.into() }
}
#[must_use]
pub fn kwarg_matches(key: impl Into<String>, pattern: impl Into<String>) -> Self {
Self::KwargMatches {
key: key.into(),
pattern: pattern.into(),
}
}
#[must_use]
pub fn arg_greater_than(index: usize, threshold: f64) -> Self {
Self::ArgGreaterThan { index, threshold }
}
#[must_use]
pub fn arg_less_than(index: usize, threshold: f64) -> Self {
Self::ArgLessThan { index, threshold }
}
#[must_use]
pub fn kwarg_greater_than(key: impl Into<String>, threshold: f64) -> Self {
Self::KwargGreaterThan {
key: key.into(),
threshold,
}
}
#[must_use]
pub fn kwarg_less_than(key: impl Into<String>, threshold: f64) -> Self {
Self::KwargLessThan {
key: key.into(),
threshold,
}
}
#[must_use]
pub fn kwarg_contains(key: impl Into<String>, value: serde_json::Value) -> Self {
Self::KwargContains {
key: key.into(),
value,
}
}
#[must_use]
pub fn always() -> Self {
Self::Always
}
#[must_use]
pub fn and(self, other: ArgumentCondition) -> Self {
match self {
Self::And(mut conditions) => {
conditions.push(other);
Self::And(conditions)
}
_ => Self::And(vec![self, other]),
}
}
#[must_use]
pub fn or(self, other: ArgumentCondition) -> Self {
match self {
Self::Or(mut conditions) => {
conditions.push(other);
Self::Or(conditions)
}
_ => Self::Or(vec![self, other]),
}
}
#[must_use]
pub fn negate(self) -> Self {
Self::Not(Box::new(self))
}
#[must_use]
pub fn evaluate(
&self,
args: &[serde_json::Value],
kwargs: &serde_json::Map<String, serde_json::Value>,
) -> bool {
match self {
Self::Always => true,
Self::ArgEquals { index, value } => args.get(*index).is_some_and(|v| v == value),
Self::ArgExists { index } => args.len() > *index,
Self::KwargEquals { key, value } => kwargs.get(key).is_some_and(|v| v == value),
Self::KwargExists { key } => kwargs.contains_key(key),
Self::KwargMatches { key, pattern } => {
if let Some(serde_json::Value::String(s)) = kwargs.get(key) {
Regex::new(pattern)
.map(|re| re.is_match(s))
.unwrap_or(false)
} else {
false
}
}
Self::ArgGreaterThan { index, threshold } => args
.get(*index)
.and_then(serde_json::Value::as_f64)
.is_some_and(|v| v > *threshold),
Self::ArgLessThan { index, threshold } => args
.get(*index)
.and_then(serde_json::Value::as_f64)
.is_some_and(|v| v < *threshold),
Self::KwargGreaterThan { key, threshold } => kwargs
.get(key)
.and_then(serde_json::Value::as_f64)
.is_some_and(|v| v > *threshold),
Self::KwargLessThan { key, threshold } => kwargs
.get(key)
.and_then(serde_json::Value::as_f64)
.is_some_and(|v| v < *threshold),
Self::KwargContains { key, value } => {
if let Some(v) = kwargs.get(key) {
match v {
serde_json::Value::String(s) => {
if let Some(needle) = value.as_str() {
s.contains(needle)
} else {
false
}
}
serde_json::Value::Array(arr) => arr.contains(value),
_ => false,
}
} else {
false
}
}
Self::And(conditions) => conditions.iter().all(|c| c.evaluate(args, kwargs)),
Self::Or(conditions) => conditions.iter().any(|c| c.evaluate(args, kwargs)),
Self::Not(condition) => !condition.evaluate(args, kwargs),
}
}
}
impl std::fmt::Display for ArgumentCondition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Always => write!(f, "always"),
Self::ArgEquals { index, value } => write!(f, "args[{index}] == {value}"),
Self::ArgExists { index } => write!(f, "args[{index}] exists"),
Self::KwargEquals { key, value } => write!(f, "kwargs[{key}] == {value}"),
Self::KwargExists { key } => write!(f, "kwargs[{key}] exists"),
Self::KwargMatches { key, pattern } => {
write!(f, "kwargs[{key}] matches /{pattern}/")
}
Self::ArgGreaterThan { index, threshold } => {
write!(f, "args[{index}] > {threshold}")
}
Self::ArgLessThan { index, threshold } => write!(f, "args[{index}] < {threshold}"),
Self::KwargGreaterThan { key, threshold } => {
write!(f, "kwargs[{key}] > {threshold}")
}
Self::KwargLessThan { key, threshold } => write!(f, "kwargs[{key}] < {threshold}"),
Self::KwargContains { key, value } => {
write!(f, "kwargs[{key}] contains {value}")
}
Self::And(conditions) => {
let parts: Vec<String> = conditions.iter().map(|c| format!("{c}")).collect();
write!(f, "({})", parts.join(" AND "))
}
Self::Or(conditions) => {
let parts: Vec<String> = conditions.iter().map(|c| format!("{c}")).collect();
write!(f, "({})", parts.join(" OR "))
}
Self::Not(condition) => write!(f, "NOT ({condition})"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RouteResult {
pub queue: String,
pub routing_key: Option<String>,
pub exchange: Option<String>,
}
impl RouteResult {
#[must_use]
pub fn new(queue: impl Into<String>) -> Self {
Self {
queue: queue.into(),
routing_key: None,
exchange: None,
}
}
#[must_use]
pub fn from_rule(rule: &RouteRule) -> Self {
Self {
queue: rule.queue.clone(),
routing_key: rule.routing_key.clone(),
exchange: rule.exchange.clone(),
}
}
}
#[derive(Debug, Default)]
pub struct Router {
rules: Vec<RouteRule>,
default_queue: Option<String>,
direct_routes: HashMap<String, RouteResult>,
}
impl Router {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_default_queue(queue: impl Into<String>) -> Self {
Self {
rules: Vec::new(),
default_queue: Some(queue.into()),
direct_routes: HashMap::new(),
}
}
pub fn add_rule(&mut self, rule: RouteRule) {
self.rules.push(rule);
self.rules.sort_by_key(|b| std::cmp::Reverse(b.priority));
}
pub fn add_direct_route(&mut self, task_name: impl Into<String>, result: RouteResult) {
self.direct_routes.insert(task_name.into(), result);
}
pub fn set_default_queue(&mut self, queue: impl Into<String>) {
self.default_queue = Some(queue.into());
}
#[must_use]
pub fn route(&self, task_name: &str) -> Option<String> {
self.route_full(task_name).map(|r| r.queue)
}
#[must_use]
pub fn route_full(&self, task_name: &str) -> Option<RouteResult> {
if let Some(result) = self.direct_routes.get(task_name) {
return Some(result.clone());
}
for rule in &self.rules {
if rule.matches(task_name) {
return Some(RouteResult::from_rule(rule));
}
}
self.default_queue.as_ref().map(RouteResult::new)
}
#[must_use]
pub fn route_with_args(
&self,
task_name: &str,
args: &[serde_json::Value],
kwargs: &serde_json::Map<String, serde_json::Value>,
) -> Option<String> {
self.route_full_with_args(task_name, args, kwargs)
.map(|r| r.queue)
}
#[must_use]
pub fn route_full_with_args(
&self,
task_name: &str,
args: &[serde_json::Value],
kwargs: &serde_json::Map<String, serde_json::Value>,
) -> Option<RouteResult> {
if let Some(result) = self.direct_routes.get(task_name) {
return Some(result.clone());
}
for rule in &self.rules {
if rule.matches_with_args(task_name, args, kwargs) {
return Some(RouteResult::from_rule(rule));
}
}
self.default_queue.as_ref().map(RouteResult::new)
}
#[inline]
#[must_use]
pub fn has_route(&self, task_name: &str) -> bool {
self.direct_routes.contains_key(task_name)
|| self.rules.iter().any(|r| r.matches(task_name))
|| self.default_queue.is_some()
}
#[inline]
#[must_use]
pub fn rules(&self) -> &[RouteRule] {
&self.rules
}
pub fn remove_rules_by_queue(&mut self, queue: &str) {
self.rules.retain(|r| r.queue != queue);
}
pub fn clear(&mut self) {
self.rules.clear();
self.direct_routes.clear();
}
}
#[derive(Debug, Default)]
pub struct RouterBuilder {
router: Router,
}
impl RouterBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn route_glob(mut self, pattern: &str, queue: &str) -> Self {
self.router
.add_rule(RouteRule::new(PatternMatcher::glob(pattern), queue));
self
}
pub fn route_regex(mut self, pattern: &str, queue: &str) -> Result<Self, regex::Error> {
self.router
.add_rule(RouteRule::new(PatternMatcher::regex(pattern)?, queue));
Ok(self)
}
#[must_use]
pub fn route_exact(mut self, task_name: &str, queue: &str) -> Self {
self.router
.add_rule(RouteRule::new(PatternMatcher::exact(task_name), queue));
self
}
#[must_use]
pub fn direct_route(mut self, task_name: &str, queue: &str) -> Self {
self.router
.add_direct_route(task_name, RouteResult::new(queue));
self
}
#[must_use]
pub fn route_with_args(
mut self,
matcher: PatternMatcher,
queue: &str,
condition: ArgumentCondition,
) -> Self {
self.router
.add_rule(RouteRule::new(matcher, queue).with_argument_condition(condition));
self
}
#[must_use]
pub fn route_with_args_priority(
mut self,
matcher: PatternMatcher,
queue: &str,
condition: ArgumentCondition,
priority: i32,
) -> Self {
self.router.add_rule(
RouteRule::new(matcher, queue)
.with_argument_condition(condition)
.with_priority(priority),
);
self
}
#[must_use]
pub fn default_queue(mut self, queue: &str) -> Self {
self.router.set_default_queue(queue);
self
}
#[must_use]
pub fn build(self) -> Router {
self.router
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingConfig {
#[serde(default)]
pub default_queue: Option<String>,
#[serde(default)]
pub routes: HashMap<String, String>,
#[serde(default)]
pub task_routes: HashMap<String, String>,
}
impl RoutingConfig {
#[must_use]
pub fn new() -> Self {
Self {
default_queue: None,
routes: HashMap::new(),
task_routes: HashMap::new(),
}
}
#[must_use]
pub fn into_router(self) -> Router {
let mut router = match self.default_queue {
Some(queue) => Router::with_default_queue(queue),
None => Router::new(),
};
for (pattern, queue) in self.routes {
router.add_rule(RouteRule::new(PatternMatcher::glob(&pattern), queue));
}
for (task_name, queue) in self.task_routes {
router.add_direct_route(task_name, RouteResult::new(queue));
}
router
}
}
impl Default for RoutingConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct TopicPattern {
pattern: String,
segments: Vec<TopicSegment>,
}
#[derive(Debug, Clone, PartialEq)]
enum TopicSegment {
Literal(String),
Star,
Hash,
}
impl TopicPattern {
pub fn new(pattern: impl Into<String>) -> Self {
let pattern = pattern.into();
let segments = Self::parse(&pattern);
Self { pattern, segments }
}
fn parse(pattern: &str) -> Vec<TopicSegment> {
pattern
.split('.')
.map(|s| match s {
"*" => TopicSegment::Star,
"#" => TopicSegment::Hash,
literal => TopicSegment::Literal(literal.to_string()),
})
.collect()
}
#[inline]
#[must_use]
pub fn matches(&self, routing_key: &str) -> bool {
let key_parts: Vec<&str> = routing_key.split('.').collect();
self.matches_parts(&key_parts, 0, 0)
}
fn matches_parts(&self, key_parts: &[&str], key_idx: usize, pattern_idx: usize) -> bool {
if pattern_idx >= self.segments.len() {
return key_idx >= key_parts.len();
}
if key_idx >= key_parts.len() {
return self.segments[pattern_idx..]
.iter()
.all(|seg| matches!(seg, TopicSegment::Hash));
}
match &self.segments[pattern_idx] {
TopicSegment::Literal(literal) => {
if key_parts[key_idx] == literal {
self.matches_parts(key_parts, key_idx + 1, pattern_idx + 1)
} else {
false
}
}
TopicSegment::Star => {
self.matches_parts(key_parts, key_idx + 1, pattern_idx + 1)
}
TopicSegment::Hash => {
if self.matches_parts(key_parts, key_idx, pattern_idx + 1) {
return true;
}
for i in key_idx..key_parts.len() {
if self.matches_parts(key_parts, i + 1, pattern_idx + 1) {
return true;
}
}
pattern_idx + 1 >= self.segments.len()
}
}
}
#[inline]
#[must_use]
pub fn pattern(&self) -> &str {
&self.pattern
}
#[inline]
#[must_use]
pub const fn complexity(&self) -> usize {
self.segments.len()
}
#[inline]
#[must_use]
pub fn has_wildcards(&self) -> bool {
self.segments
.iter()
.any(|s| matches!(s, TopicSegment::Star | TopicSegment::Hash))
}
#[inline]
#[must_use]
pub fn is_exact(&self) -> bool {
!self.has_wildcards()
}
}
#[derive(Debug, Clone)]
pub struct TopicRouter {
bindings: Vec<(TopicPattern, String)>,
default_queue: Option<String>,
}
impl TopicRouter {
#[must_use]
pub fn new() -> Self {
Self {
bindings: Vec::new(),
default_queue: None,
}
}
pub fn bind(&mut self, pattern: impl Into<String>, queue: impl Into<String>) {
let pattern = TopicPattern::new(pattern);
self.bindings.push((pattern, queue.into()));
}
pub fn bind_many(&mut self, patterns: Vec<String>, queue: impl Into<String>) {
let queue = queue.into();
for pattern in patterns {
self.bind(pattern, queue.clone());
}
}
pub fn set_default_queue(&mut self, queue: impl Into<String>) {
self.default_queue = Some(queue.into());
}
#[must_use]
pub fn route(&self, routing_key: &str) -> Option<String> {
for (pattern, queue) in &self.bindings {
if pattern.matches(routing_key) {
return Some(queue.clone());
}
}
self.default_queue.clone()
}
#[must_use]
pub fn route_all(&self, routing_key: &str) -> Vec<String> {
self.bindings
.iter()
.filter(|(pattern, _)| pattern.matches(routing_key))
.map(|(_, queue)| queue.clone())
.collect()
}
pub fn unbind_queue(&mut self, queue: &str) -> usize {
let original_len = self.bindings.len();
self.bindings.retain(|(_, q)| q != queue);
original_len - self.bindings.len()
}
pub fn unbind_pattern(&mut self, pattern: &str) -> bool {
let original_len = self.bindings.len();
self.bindings.retain(|(p, _)| p.pattern() != pattern);
self.bindings.len() < original_len
}
#[must_use]
pub fn bindings(&self) -> Vec<(String, String)> {
self.bindings
.iter()
.map(|(pattern, queue)| (pattern.pattern().to_string(), queue.clone()))
.collect()
}
pub fn clear(&mut self) {
self.bindings.clear();
self.default_queue = None;
}
#[must_use]
pub const fn binding_count(&self) -> usize {
self.bindings.len()
}
#[inline]
#[must_use]
pub fn has_match(&self, routing_key: &str) -> bool {
self.bindings.iter().any(|(p, _)| p.matches(routing_key)) || self.default_queue.is_some()
}
}
impl Default for TopicRouter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicExchangeConfig {
pub name: String,
pub bindings: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub default_queue: Option<String>,
#[serde(default = "default_true")]
pub durable: bool,
#[serde(default)]
pub auto_delete: bool,
}
fn default_true() -> bool {
true
}
impl TopicExchangeConfig {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
bindings: HashMap::new(),
default_queue: None,
durable: true,
auto_delete: false,
}
}
#[must_use]
pub fn with_binding(mut self, pattern: impl Into<String>, queue: impl Into<String>) -> Self {
self.bindings.insert(pattern.into(), queue.into());
self
}
#[must_use]
pub fn with_default_queue(mut self, queue: impl Into<String>) -> Self {
self.default_queue = Some(queue.into());
self
}
#[must_use]
pub fn with_durable(mut self, durable: bool) -> Self {
self.durable = durable;
self
}
#[must_use]
pub fn build_router(&self) -> TopicRouter {
let mut router = TopicRouter::new();
for (pattern, queue) in &self.bindings {
router.bind(pattern.clone(), queue.clone());
}
if let Some(ref default_queue) = self.default_queue {
router.set_default_queue(default_queue.clone());
}
router
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_exact_pattern() {
let matcher = PatternMatcher::exact("tasks.add");
assert!(matcher.matches("tasks.add"));
assert!(!matcher.matches("tasks.multiply"));
assert!(!matcher.matches("tasks"));
}
#[test]
fn test_glob_pattern() {
let matcher = PatternMatcher::glob("tasks.*");
assert!(matcher.matches("tasks.add"));
assert!(matcher.matches("tasks.multiply"));
assert!(!matcher.matches("other.task"));
assert!(!matcher.matches("tasks"));
let matcher = PatternMatcher::glob("*.add");
assert!(matcher.matches("tasks.add"));
assert!(matcher.matches("math.add"));
assert!(!matcher.matches("tasks.multiply"));
let matcher = PatternMatcher::glob("task?");
assert!(matcher.matches("task1"));
assert!(matcher.matches("taskA"));
assert!(matcher.matches("tasks")); assert!(!matcher.matches("task")); assert!(!matcher.matches("task12")); }
#[test]
fn test_regex_pattern() {
let matcher = PatternMatcher::regex(r"tasks\.[a-z]+").unwrap();
assert!(matcher.matches("tasks.add"));
assert!(matcher.matches("tasks.multiply"));
assert!(!matcher.matches("tasks.Add"));
assert!(!matcher.matches("tasks.123"));
let matcher = PatternMatcher::regex(r"^(email|sms)\.").unwrap();
assert!(matcher.matches("email.send"));
assert!(matcher.matches("sms.send"));
assert!(!matcher.matches("push.send"));
}
#[test]
fn test_all_pattern() {
let matcher = PatternMatcher::all();
assert!(matcher.matches("anything"));
assert!(matcher.matches(""));
assert!(matcher.matches("complex.task.name"));
}
#[test]
fn test_router_basic() {
let mut router = Router::new();
router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
router.add_rule(RouteRule::new(PatternMatcher::glob("sms.*"), "sms"));
assert_eq!(router.route("email.send"), Some("email".to_string()));
assert_eq!(router.route("sms.notify"), Some("sms".to_string()));
assert_eq!(router.route("push.notify"), None);
}
#[test]
fn test_router_with_default() {
let mut router = Router::with_default_queue("default");
router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
assert_eq!(router.route("email.send"), Some("email".to_string()));
assert_eq!(router.route("other.task"), Some("default".to_string()));
}
#[test]
fn test_router_priority() {
let mut router = Router::new();
router.add_rule(RouteRule::new(PatternMatcher::glob("*"), "default").with_priority(0));
router
.add_rule(RouteRule::new(PatternMatcher::glob("urgent.*"), "urgent").with_priority(10));
assert_eq!(router.route("urgent.email"), Some("urgent".to_string()));
assert_eq!(router.route("email.send"), Some("default".to_string()));
}
#[test]
fn test_router_direct_route() {
let mut router = Router::new();
router.add_rule(RouteRule::new(PatternMatcher::glob("tasks.*"), "tasks"));
router.add_direct_route("tasks.special", RouteResult::new("special"));
assert_eq!(router.route("tasks.special"), Some("special".to_string()));
assert_eq!(router.route("tasks.normal"), Some("tasks".to_string()));
}
#[test]
fn test_route_result() {
let mut router = Router::new();
router.add_rule(
RouteRule::new(PatternMatcher::glob("amqp.*"), "amqp_queue")
.with_routing_key("amqp.routing")
.with_exchange("amqp_exchange"),
);
let result = router.route_full("amqp.task").unwrap();
assert_eq!(result.queue, "amqp_queue");
assert_eq!(result.routing_key, Some("amqp.routing".to_string()));
assert_eq!(result.exchange, Some("amqp_exchange".to_string()));
}
#[test]
fn test_router_builder() {
let router = RouterBuilder::new()
.route_glob("email.*", "email")
.route_glob("sms.*", "sms")
.direct_route("special.task", "special")
.default_queue("default")
.build();
assert_eq!(router.route("email.send"), Some("email".to_string()));
assert_eq!(router.route("sms.notify"), Some("sms".to_string()));
assert_eq!(router.route("special.task"), Some("special".to_string()));
assert_eq!(router.route("other.task"), Some("default".to_string()));
}
#[test]
fn test_routing_config() {
let mut config = RoutingConfig::new();
config.default_queue = Some("default".to_string());
config
.routes
.insert("email.*".to_string(), "email".to_string());
config
.task_routes
.insert("special.task".to_string(), "special".to_string());
let router = config.into_router();
assert_eq!(router.route("email.send"), Some("email".to_string()));
assert_eq!(router.route("special.task"), Some("special".to_string()));
assert_eq!(router.route("other.task"), Some("default".to_string()));
}
#[test]
fn test_routing_config_serialization() {
let mut config = RoutingConfig::new();
config.default_queue = Some("default".to_string());
config
.routes
.insert("email.*".to_string(), "email".to_string());
let json = serde_json::to_string(&config).unwrap();
let parsed: RoutingConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.default_queue, Some("default".to_string()));
assert_eq!(parsed.routes.get("email.*"), Some(&"email".to_string()));
}
#[test]
fn test_glob_special_chars() {
let matcher = PatternMatcher::glob("tasks.v1.0");
assert!(matcher.matches("tasks.v1.0"));
assert!(!matcher.matches("tasks.v1x0"));
let matcher = PatternMatcher::glob("(test)");
assert!(matcher.matches("(test)"));
assert!(!matcher.matches("test"));
}
#[test]
fn test_has_route() {
let mut router = Router::new();
router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
assert!(router.has_route("email.send"));
assert!(!router.has_route("sms.send"));
router.set_default_queue("default");
assert!(router.has_route("sms.send"));
}
#[test]
fn test_remove_rules() {
let mut router = Router::new();
router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
router.add_rule(RouteRule::new(PatternMatcher::glob("sms.*"), "sms"));
router.remove_rules_by_queue("email");
assert_eq!(router.route("email.send"), None);
assert_eq!(router.route("sms.send"), Some("sms".to_string()));
}
#[test]
fn test_clear() {
let mut router = Router::new();
router.add_rule(RouteRule::new(PatternMatcher::glob("email.*"), "email"));
router.add_direct_route("special", RouteResult::new("special"));
router.clear();
assert_eq!(router.route("email.send"), None);
assert_eq!(router.route("special"), None);
}
}