use crate::bindings::Binding;
use crate::messages::Message;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ExchangeType {
Direct,
Fanout,
Topic,
Headers,
}
#[derive(Debug, Clone)]
pub struct Exchange {
pub name: String,
pub exchange_type: ExchangeType,
pub durable: bool,
pub auto_delete: bool,
pub arguments: HashMap<String, String>,
pub bindings: Vec<Binding>,
}
impl Exchange {
pub fn route_message(&self, message: &Message, routing_key: &str) -> Vec<String> {
match self.exchange_type {
ExchangeType::Direct => self.route_direct(routing_key),
ExchangeType::Fanout => self.route_fanout(),
ExchangeType::Topic => self.route_topic(routing_key),
ExchangeType::Headers => self.route_headers(message),
}
}
fn route_direct(&self, routing_key: &str) -> Vec<String> {
self.bindings
.iter()
.filter(|b| b.routing_key == routing_key)
.map(|b| b.queue.clone())
.collect()
}
fn route_fanout(&self) -> Vec<String> {
self.bindings.iter().map(|b| b.queue.clone()).collect()
}
fn route_topic(&self, routing_key: &str) -> Vec<String> {
let routing_parts: Vec<&str> = routing_key.split('.').collect();
self.bindings
.iter()
.filter(|binding| {
let pattern_parts: Vec<&str> = binding.routing_key.split('.').collect();
Self::matches_topic_pattern(&routing_parts, &pattern_parts)
})
.map(|binding| binding.queue.clone())
.collect()
}
pub fn matches_topic_pattern(routing_parts: &[&str], pattern_parts: &[&str]) -> bool {
if routing_parts.len() > pattern_parts.len() && !pattern_parts.contains(&"#") {
return false;
}
let mut routing_iter = routing_parts.iter();
let mut pattern_iter = pattern_parts.iter();
while let (Some(&routing), Some(&pattern)) = (routing_iter.next(), pattern_iter.next()) {
match pattern {
"*" => {
continue;
}
"#" => {
if pattern_iter.next().is_none() {
return true;
}
return true;
}
word if word == routing => {
continue;
}
_ => return false,
}
}
routing_iter.next().is_none()
}
fn route_headers(&self, message: &Message) -> Vec<String> {
self.bindings
.iter()
.filter(|binding| {
Self::matches_headers(&message.properties.headers, &binding.arguments)
})
.map(|binding| binding.queue.clone())
.collect()
}
fn matches_headers(
message_headers: &HashMap<String, String>,
binding_args: &HashMap<String, String>,
) -> bool {
let x_match = binding_args.get("x-match").map(|s| s.as_str()).unwrap_or("all");
let headers_to_match: HashMap<_, _> =
binding_args.iter().filter(|(k, _)| *k != "x-match").collect();
if headers_to_match.is_empty() {
return true; }
if x_match == "any" {
headers_to_match.iter().any(|(key, value)| {
message_headers.get(key.as_str()).map(|v| v == *value).unwrap_or(false)
})
} else {
headers_to_match.iter().all(|(key, value)| {
message_headers.get(key.as_str()).map(|v| v == *value).unwrap_or(false)
})
}
}
}
pub struct ExchangeManager {
exchanges: HashMap<String, Exchange>,
}
impl Default for ExchangeManager {
fn default() -> Self {
Self::new()
}
}
impl ExchangeManager {
pub fn new() -> Self {
Self {
exchanges: HashMap::new(),
}
}
pub fn declare_exchange(
&mut self,
name: String,
exchange_type: ExchangeType,
durable: bool,
auto_delete: bool,
) {
let exchange = Exchange {
name: name.clone(),
exchange_type,
durable,
auto_delete,
arguments: HashMap::new(),
bindings: Vec::new(),
};
self.exchanges.insert(name, exchange);
}
pub fn get_exchange(&self, name: &str) -> Option<&Exchange> {
self.exchanges.get(name)
}
pub fn list_exchanges(&self) -> Vec<&Exchange> {
self.exchanges.values().collect()
}
pub fn delete_exchange(&mut self, name: &str) -> bool {
self.exchanges.remove(name).is_some()
}
}