crabbyq 1.0.0

A declarative async Rust framework for message-driven microservices.
Documentation
use crate::brokers::base::{Broker, BrokerError, BrokerMessage, HeaderMap};
use async_trait::async_trait;
use futures_util::stream::{self, BoxStream};
use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS};
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Clone)]
pub struct MqttBroker {
    client: AsyncClient,
    eventloop: Arc<Mutex<Option<EventLoop>>>,
}

impl MqttBroker {
    pub fn new(options: MqttOptions, inflight: usize) -> Self {
        let (client, eventloop) = AsyncClient::new(options, inflight);
        Self {
            client,
            eventloop: Arc::new(Mutex::new(Some(eventloop))),
        }
    }
}

#[async_trait]
impl Broker for MqttBroker {
    type MessageStream = BoxStream<'static, BrokerMessage>;

    async fn subscribe(&self, subjects: &[String]) -> Result<Self::MessageStream, BrokerError> {
        for subject in subjects {
            self.client.subscribe(subject, QoS::AtLeastOnce).await?;
        }

        let eventloop = self
            .eventloop
            .lock()
            .await
            .take()
            .ok_or_else(|| anyhow::anyhow!("MQTT event loop is already running"))?;

        let stream = stream::unfold(eventloop, |mut eventloop| async move {
            loop {
                match eventloop.poll().await {
                    Ok(Event::Incoming(Incoming::Publish(publish))) => {
                        return Some((
                            BrokerMessage {
                                subject: publish.topic,
                                payload: publish.payload.to_vec(),
                                headers: None,
                                reply_to: None,
                                acknowledger: None,
                            },
                            eventloop,
                        ));
                    }
                    Ok(_) => continue,
                    Err(error) => {
                        tracing::error!("MQTT stream error: {}", error);
                        return None;
                    }
                }
            }
        });

        Ok(Box::pin(stream))
    }

    async fn publish(
        &self,
        subject: &str,
        payload: &[u8],
        _headers: Option<&HeaderMap>,
    ) -> Result<(), BrokerError> {
        self.client
            .publish(subject, QoS::AtLeastOnce, false, payload.to_vec())
            .await?;
        Ok(())
    }

    async fn request(
        &self,
        _subject: &str,
        _payload: &[u8],
        _headers: Option<&HeaderMap>,
    ) -> Result<BrokerMessage, BrokerError> {
        Err(anyhow::anyhow!("MQTT does not support request-reply in the core broker API").into())
    }
}