ollie 0.1.0

A brief description of what your crate does.
Documentation
use futures::StreamExt;
use lapin::BasicProperties;
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, Consumer};
use serde_json::Value;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;

type HandlerFn = Box<dyn Fn(Vec<u8>) -> Option<Value> + Send + Sync>;

pub struct RabbitRouter {
    pub connection: Arc<Connection>,
    pub channel: Arc<lapin::Channel>,
    pub routes: Arc<Mutex<HashMap<String, HandlerFn>>>,
}

impl FromStr for Exchange {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let parts: Vec<&str> = s.split(':').collect();
        if parts.len() != 2 {
            return Err("Exchange must be in the format name:routing_key".to_string());
        }
        Ok(Self {
            name: parts[0].to_string(),
            routing_key: parts[1].to_string(),
        })
    }
}

#[derive(Debug, Clone)]
pub struct Exchange {
    pub name: String,
    pub routing_key: String,
}

impl RabbitRouter {
    // Create a new router with a RabbitMQ connection
    pub async fn new(uri: &str) -> Self {
        let connection = Connection::connect(uri, ConnectionProperties::default())
            .await
            .expect("Failed to connect to RabbitMQ");
        let channel = connection
            .create_channel()
            .await
            .expect("Failed to create channel");

        RabbitRouter {
            connection: Arc::new(connection),
            channel: Arc::new(channel),
            routes: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    // Connect a temporary queue to the given exchange and then listen on that queue\
    pub async fn add_route_exchange<F>(
        &self,
        exchange: &str,
        routing_key: &str,
        result_exchange: Option<Exchange>,
        handler: F,
    ) -> Result<(), Box<dyn std::error::Error>>
    where
        F: Fn(Vec<u8>) -> Option<Value> + Send + Sync + 'static,
    {
        let exchange = exchange.to_string();

        let routing_key = routing_key.to_string();
        let handler = Box::new(handler) as HandlerFn;

        let channel = self.channel.clone();
        // Declare the exchange
        channel
            .exchange_declare(
                &exchange,
                lapin::ExchangeKind::Topic,
                ExchangeDeclareOptions::default(),
                FieldTable::default(),
            )
            .await?;
        let mut options = QueueDeclareOptions::default();
        options.exclusive = true;
        options.durable = true;

        let result = channel
            .queue_declare("", options, FieldTable::default())
            .await?;
        let queue_name = result.name().as_str().to_string();

        let mut routes = self.routes.lock().await;
        routes.insert(queue_name.clone(), handler);
        let routes = self.routes.clone();
        channel
            .queue_bind(
                &queue_name,
                &exchange,
                &routing_key,
                QueueBindOptions::default(),
                FieldTable::default(),
            )
            .await?;

        // Declare the exchange
        channel
            .exchange_declare(
                &exchange,
                lapin::ExchangeKind::Topic,
                ExchangeDeclareOptions::default(),
                FieldTable::default(),
            )
            .await?;

        tokio::spawn(async move {
            let consumer = channel
                .basic_consume(
                    &queue_name,
                    "",
                    BasicConsumeOptions::default(),
                    FieldTable::default(),
                )
                .await
                .expect("Failed to start consumer");

            Self::consume_messages(queue_name, consumer,channel,result_exchange, routes).await;
        });
        Ok(())
    }

    // Add a route with a queue_name and handler
    pub async fn add_route_queue<F>(
        &self,
        queue_name: &str,
        result_exchange: Option<Exchange>,
        handler: F,
    ) -> Result<(), Box<dyn std::error::Error>>
    where
        F: Fn(Vec<u8>) -> Option<Value> + Send + Sync + 'static,
    {
        let queue_name = queue_name.to_string();
        let handler = Box::new(handler) as HandlerFn;

        let mut routes = self.routes.lock().await;
        routes.insert(queue_name.clone(), handler);
        let routes = self.routes.clone();

        // Declare the queue and start consuming messages
        let channel = self.channel.clone();
        let _queue = channel
            .queue_declare(
                &queue_name,
                QueueDeclareOptions::default(),
                FieldTable::default(),
            )
            .await
            .expect("Failed to declare queue");
        tokio::spawn(async move {
            let consumer = channel
                .basic_consume(
                    &queue_name,
                    "",
                    BasicConsumeOptions::default(),
                    FieldTable::default(),
                )
                .await
                .expect("Failed to start consumer");

            Self::consume_messages(queue_name, consumer, channel,result_exchange,routes).await;
        });
        Ok(())
    }

    // Internal method to handle consuming messages
    async fn consume_messages(
        queue_name: String,
        mut consumer: Consumer,
        channel: Arc<lapin::Channel>,
        result_exchange: Option<Exchange>,
        routes: Arc<Mutex<HashMap<String, HandlerFn>>>,
    ) {
        while let Some(delivery) = consumer.next().await {
            match delivery {
                Ok(delivery) => {
                    let data = delivery.data.clone();

                    // Find the appropriate handler and call it
                    let routes = routes.lock().await;
                    if let Some(handler) = routes.get(&queue_name) {
                       let result= handler(data);
                        if let Some(result) = result {
                            if let Some(result_exchange) = &result_exchange {
                               
                                let payload = serde_json::to_vec(&result).unwrap();
                                channel
                                    .basic_publish(
                                        &result_exchange.name,
                                        &result_exchange.routing_key,
                                        BasicPublishOptions::default(),
                                        &payload,
                                        BasicProperties::default(),
                                    )
                                    .await
                                    .expect("Failed to publish message");
                            }
                        }
                    }

                    delivery
                        .ack(BasicAckOptions::default())
                        .await
                        .expect("Failed to ack message");
                }
                Err(error) => eprintln!("Error receiving message: {:?}", error),
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use lapin::BasicProperties;
    use tokio::time::{sleep, Duration};

    #[tokio::test]
    async fn test_rabbit_router() {
        // Create a router
        let router = RabbitRouter::new("amqp://127.0.0.1:5672/%2f").await;

        // Add a route and send a test message
        let _ = router
            .add_route_queue("test_queue",None, |data| {
                println!(
                    "Test handler received: {:?}",
                    String::from_utf8_lossy(&data)
                );
                assert!(String::from_utf8_lossy(&data) == "Test message");
                None
            })
            .await;

        // Simulate sending a message to the queue
        let channel = router.channel.clone();
        let payload = b"Test message".to_vec();
        channel
            .basic_publish(
                "",
                "test_queue",
                BasicPublishOptions::default(),
                &payload,
                BasicProperties::default(),
            )
            .await
            .expect("Failed to publish message");

        // Give some time for the message to be processed
        sleep(Duration::from_secs(2)).await;
    }

    #[tokio::test]
    async fn test_multiple_routes() {
        // Create a router
        let router = RabbitRouter::new("amqp://127.0.0.1:5672/%2f").await;

        // Add multiple routes
        let _ = router
            .add_route_queue("queue_1",None, |data| {
                println!(
                    "Handler for queue_1 received: {:?}",
                    String::from_utf8_lossy(&data)
                );
                assert!(String::from_utf8_lossy(&data) == "Message for queue_1");
                None
            })
            .await;

        let _ = router
            .add_route_queue("queue_2",None, |data| {
                println!(
                    "Handler for queue_2 received: {:?}",
                    String::from_utf8_lossy(&data)
                );
                assert!(String::from_utf8_lossy(&data) == "Message for queue_2");
                None
            })
            .await;

        // Simulate sending messages to the queues
        let channel = router.channel.clone();
        let payload_1 = b"Message for queue_1".to_vec();
        let payload_2 = b"Message for queue_2".to_vec();

        channel
            .basic_publish(
                "",
                "queue_1",
                BasicPublishOptions::default(),
                &payload_1,
                BasicProperties::default(),
            )
            .await
            .expect("Failed to publish message to queue_1");

        channel
            .basic_publish(
                "",
                "queue_2",
                BasicPublishOptions::default(),
                &payload_2,
                BasicProperties::default(),
            )
            .await
            .expect("Failed to publish message to queue_2");

        // Give some time for the messages to be processed
        sleep(Duration::from_secs(2)).await;
        assert!(false);
    }

    #[tokio::test]
    async fn test_exchange_route() {
        // Create a router
        let router = RabbitRouter::new("amqp://127.0.0.1:5672").await;
        // Declare the exchange
        let channel = router.channel.clone();
        channel
            .exchange_declare(
                "test_exchange",
                lapin::ExchangeKind::Topic,
                ExchangeDeclareOptions::default(),
                FieldTable::default(),
            )
            .await
            .expect("Failed to declare exchange");
        // Add a route to an exchange
        let _ = router
            .add_route_exchange("test_exchange", "test.routing.key",None, |data| {
                println!(
                    "Exchange handler received: {:?}",
                    String::from_utf8_lossy(&data)
                );
                //assert!(String::from_utf8_lossy(&data) == "Test exchange message");
                None
            })
            .await;

        // Simulate sending a message to the exchange
        let channel = router.channel.clone();
        let payload = b"Test exchange message".to_vec();
        channel
            .basic_publish(
                "test_exchange",
                "test.routing.key",
                BasicPublishOptions::default(),
                &payload,
                BasicProperties::default(),
            )
            .await
            .expect("Failed to publish message to exchange");
        println!("Message published to exchange");
        // Give some time for the message to be processed
        sleep(Duration::from_secs(2)).await;
    }
}