use futures::StreamExt;
use lapin::BasicProperties;
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, Consumer};
use serde_json::Value;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
type HandlerFn = Box<dyn Fn(Vec<u8>) -> Option<Value> + Send + Sync>;
pub struct RabbitRouter {
pub connection: Arc<Connection>,
pub channel: Arc<lapin::Channel>,
pub routes: Arc<Mutex<HashMap<String, HandlerFn>>>,
}
impl FromStr for Exchange {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() != 2 {
return Err("Exchange must be in the format name:routing_key".to_string());
}
Ok(Self {
name: parts[0].to_string(),
routing_key: parts[1].to_string(),
})
}
}
#[derive(Debug, Clone)]
pub struct Exchange {
pub name: String,
pub routing_key: String,
}
impl RabbitRouter {
pub async fn new(uri: &str) -> Self {
let connection = Connection::connect(uri, ConnectionProperties::default())
.await
.expect("Failed to connect to RabbitMQ");
let channel = connection
.create_channel()
.await
.expect("Failed to create channel");
RabbitRouter {
connection: Arc::new(connection),
channel: Arc::new(channel),
routes: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn add_route_exchange<F>(
&self,
exchange: &str,
routing_key: &str,
result_exchange: Option<Exchange>,
handler: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Fn(Vec<u8>) -> Option<Value> + Send + Sync + 'static,
{
let exchange = exchange.to_string();
let routing_key = routing_key.to_string();
let handler = Box::new(handler) as HandlerFn;
let channel = self.channel.clone();
channel
.exchange_declare(
&exchange,
lapin::ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
let mut options = QueueDeclareOptions::default();
options.exclusive = true;
options.durable = true;
let result = channel
.queue_declare("", options, FieldTable::default())
.await?;
let queue_name = result.name().as_str().to_string();
let mut routes = self.routes.lock().await;
routes.insert(queue_name.clone(), handler);
let routes = self.routes.clone();
channel
.queue_bind(
&queue_name,
&exchange,
&routing_key,
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
channel
.exchange_declare(
&exchange,
lapin::ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
tokio::spawn(async move {
let consumer = channel
.basic_consume(
&queue_name,
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("Failed to start consumer");
Self::consume_messages(queue_name, consumer,channel,result_exchange, routes).await;
});
Ok(())
}
pub async fn add_route_queue<F>(
&self,
queue_name: &str,
result_exchange: Option<Exchange>,
handler: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Fn(Vec<u8>) -> Option<Value> + Send + Sync + 'static,
{
let queue_name = queue_name.to_string();
let handler = Box::new(handler) as HandlerFn;
let mut routes = self.routes.lock().await;
routes.insert(queue_name.clone(), handler);
let routes = self.routes.clone();
let channel = self.channel.clone();
let _queue = channel
.queue_declare(
&queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("Failed to declare queue");
tokio::spawn(async move {
let consumer = channel
.basic_consume(
&queue_name,
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("Failed to start consumer");
Self::consume_messages(queue_name, consumer, channel,result_exchange,routes).await;
});
Ok(())
}
async fn consume_messages(
queue_name: String,
mut consumer: Consumer,
channel: Arc<lapin::Channel>,
result_exchange: Option<Exchange>,
routes: Arc<Mutex<HashMap<String, HandlerFn>>>,
) {
while let Some(delivery) = consumer.next().await {
match delivery {
Ok(delivery) => {
let data = delivery.data.clone();
let routes = routes.lock().await;
if let Some(handler) = routes.get(&queue_name) {
let result= handler(data);
if let Some(result) = result {
if let Some(result_exchange) = &result_exchange {
let payload = serde_json::to_vec(&result).unwrap();
channel
.basic_publish(
&result_exchange.name,
&result_exchange.routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.expect("Failed to publish message");
}
}
}
delivery
.ack(BasicAckOptions::default())
.await
.expect("Failed to ack message");
}
Err(error) => eprintln!("Error receiving message: {:?}", error),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use lapin::BasicProperties;
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_rabbit_router() {
let router = RabbitRouter::new("amqp://127.0.0.1:5672/%2f").await;
let _ = router
.add_route_queue("test_queue",None, |data| {
println!(
"Test handler received: {:?}",
String::from_utf8_lossy(&data)
);
assert!(String::from_utf8_lossy(&data) == "Test message");
None
})
.await;
let channel = router.channel.clone();
let payload = b"Test message".to_vec();
channel
.basic_publish(
"",
"test_queue",
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.expect("Failed to publish message");
sleep(Duration::from_secs(2)).await;
}
#[tokio::test]
async fn test_multiple_routes() {
let router = RabbitRouter::new("amqp://127.0.0.1:5672/%2f").await;
let _ = router
.add_route_queue("queue_1",None, |data| {
println!(
"Handler for queue_1 received: {:?}",
String::from_utf8_lossy(&data)
);
assert!(String::from_utf8_lossy(&data) == "Message for queue_1");
None
})
.await;
let _ = router
.add_route_queue("queue_2",None, |data| {
println!(
"Handler for queue_2 received: {:?}",
String::from_utf8_lossy(&data)
);
assert!(String::from_utf8_lossy(&data) == "Message for queue_2");
None
})
.await;
let channel = router.channel.clone();
let payload_1 = b"Message for queue_1".to_vec();
let payload_2 = b"Message for queue_2".to_vec();
channel
.basic_publish(
"",
"queue_1",
BasicPublishOptions::default(),
&payload_1,
BasicProperties::default(),
)
.await
.expect("Failed to publish message to queue_1");
channel
.basic_publish(
"",
"queue_2",
BasicPublishOptions::default(),
&payload_2,
BasicProperties::default(),
)
.await
.expect("Failed to publish message to queue_2");
sleep(Duration::from_secs(2)).await;
assert!(false);
}
#[tokio::test]
async fn test_exchange_route() {
let router = RabbitRouter::new("amqp://127.0.0.1:5672").await;
let channel = router.channel.clone();
channel
.exchange_declare(
"test_exchange",
lapin::ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("Failed to declare exchange");
let _ = router
.add_route_exchange("test_exchange", "test.routing.key",None, |data| {
println!(
"Exchange handler received: {:?}",
String::from_utf8_lossy(&data)
);
None
})
.await;
let channel = router.channel.clone();
let payload = b"Test exchange message".to_vec();
channel
.basic_publish(
"test_exchange",
"test.routing.key",
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.expect("Failed to publish message to exchange");
println!("Message published to exchange");
sleep(Duration::from_secs(2)).await;
}
}