nova-boot-messaging 0.1.1

Messaging abstraction and adapters (NATS/Kafka/RabbitMQ) for Nova
use crate::{envelope::EventEnvelope, error::MessagingError, traits::MessageBroker};
use async_trait::async_trait;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Default)]
pub struct InMemoryBroker {
    queues: Arc<RwLock<HashMap<String, VecDeque<EventEnvelope>>>>,
    dlq: Arc<RwLock<HashMap<String, VecDeque<EventEnvelope>>>>,
}

#[async_trait]
impl MessageBroker for InMemoryBroker {
    async fn publish(&self, envelope: EventEnvelope) -> Result<(), MessagingError> {
        let topic = envelope.topic.clone();
        let mut queues = self.queues.write().await;
        queues.entry(topic).or_default().push_back(envelope);
        Ok(())
    }

    async fn poll(
        &self,
        topic: &str,
        max_messages: usize,
    ) -> Result<Vec<EventEnvelope>, MessagingError> {
        let mut queues = self.queues.write().await;
        let queue = queues.entry(topic.to_string()).or_default();

        let mut out = Vec::new();
        for _ in 0..max_messages {
            if let Some(msg) = queue.pop_front() {
                out.push(msg);
            } else {
                break;
            }
        }
        Ok(out)
    }

    async fn publish_dlq(
        &self,
        source_topic: &str,
        mut envelope: EventEnvelope,
        reason: &str,
    ) -> Result<(), MessagingError> {
        envelope = envelope
            .with_header("x-dlq-reason", reason)
            .with_header("x-source-topic", source_topic);
        envelope.attempts = envelope.attempts.saturating_add(1);

        let mut dlq = self.dlq.write().await;
        dlq.entry(source_topic.to_string())
            .or_default()
            .push_back(envelope);
        Ok(())
    }

    async fn poll_dlq(
        &self,
        source_topic: &str,
        max_messages: usize,
    ) -> Result<Vec<EventEnvelope>, MessagingError> {
        let mut dlq = self.dlq.write().await;
        let queue = dlq.entry(source_topic.to_string()).or_default();

        let mut out = Vec::new();
        for _ in 0..max_messages {
            if let Some(msg) = queue.pop_front() {
                out.push(msg);
            } else {
                break;
            }
        }
        Ok(out)
    }
}