nova-boot-messaging 0.1.1

Messaging abstraction and adapters (NATS/Kafka/RabbitMQ) for Nova
use crate::{envelope::EventEnvelope, error::MessagingError, traits::MessageBroker};
use async_trait::async_trait;

pub struct RabbitMqBroker {
    pub amqp_url: String,
}

impl RabbitMqBroker {
    pub fn new(amqp_url: impl Into<String>) -> Self {
        Self {
            amqp_url: amqp_url.into(),
        }
    }

    fn dlq_queue(source_topic: &str) -> String {
        format!("{source_topic}.dlq")
    }

    async fn connect(&self) -> Result<lapin::Connection, MessagingError> {
        lapin::Connection::connect(&self.amqp_url, lapin::ConnectionProperties::default())
            .await
            .map_err(|e| MessagingError::Backend(e.to_string()))
    }

    async fn ensure_queue(channel: &lapin::Channel, topic: &str) -> Result<(), MessagingError> {
        channel
            .queue_declare(
                topic,
                lapin::options::QueueDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                lapin::types::FieldTable::default(),
            )
            .await
            .map_err(|e| MessagingError::Backend(e.to_string()))?;
        Ok(())
    }
}

#[async_trait]
impl MessageBroker for RabbitMqBroker {
    async fn publish(&self, envelope: EventEnvelope) -> Result<(), MessagingError> {
        let conn = self.connect().await?;
        let channel = conn
            .create_channel()
            .await
            .map_err(|e| MessagingError::Backend(e.to_string()))?;

        Self::ensure_queue(&channel, &envelope.topic).await?;

        let bytes = serde_json::to_vec(&envelope)
            .map_err(|e| MessagingError::Serialization(e.to_string()))?;

        channel
            .basic_publish(
                "",
                &envelope.topic,
                lapin::options::BasicPublishOptions::default(),
                &bytes,
                lapin::BasicProperties::default(),
            )
            .await
            .map_err(|e| MessagingError::Backend(e.to_string()))?;

        Ok(())
    }

    async fn poll(
        &self,
        topic: &str,
        max_messages: usize,
    ) -> Result<Vec<EventEnvelope>, MessagingError> {
        let conn = self.connect().await?;
        let channel = conn
            .create_channel()
            .await
            .map_err(|e| MessagingError::Backend(e.to_string()))?;

        Self::ensure_queue(&channel, topic).await?;

        let mut out = Vec::new();
        for _ in 0..max_messages {
            let result = channel
                .basic_get(topic, lapin::options::BasicGetOptions { no_ack: true })
                .await
                .map_err(|e| MessagingError::Backend(e.to_string()))?;

            match result {
                Some(reply) => {
                    let env = serde_json::from_slice::<EventEnvelope>(&reply.delivery.data)
                        .map_err(|e| MessagingError::Serialization(e.to_string()))?;
                    out.push(env);
                }
                None => break,
            }
        }

        Ok(out)
    }

    async fn publish_dlq(
        &self,
        source_topic: &str,
        mut envelope: EventEnvelope,
        reason: &str,
    ) -> Result<(), MessagingError> {
        envelope.attempts = envelope.attempts.saturating_add(1);
        envelope.topic = Self::dlq_queue(source_topic);
        envelope
            .headers
            .insert("x-dlq-reason".to_string(), reason.to_string());
        envelope
            .headers
            .insert("x-source-topic".to_string(), source_topic.to_string());
        self.publish(envelope).await
    }

    async fn poll_dlq(
        &self,
        source_topic: &str,
        max_messages: usize,
    ) -> Result<Vec<EventEnvelope>, MessagingError> {
        self.poll(&Self::dlq_queue(source_topic), max_messages)
            .await
    }
}