crabbyq 1.0.0

A declarative async Rust framework for message-driven microservices.
Documentation
use crate::brokers::base::{Broker, BrokerError, BrokerMessage, HeaderMap};
use async_trait::async_trait;
use futures_util::StreamExt;
use futures_util::stream::BoxStream;

#[derive(Clone)]
pub struct RedisBroker {
    client: redis::Client,
}

impl RedisBroker {
    pub fn new(client: redis::Client) -> Self {
        Self { client }
    }
}

#[async_trait]
impl Broker for RedisBroker {
    type MessageStream = BoxStream<'static, BrokerMessage>;

    async fn subscribe(&self, subjects: &[String]) -> Result<Self::MessageStream, BrokerError> {
        let mut pubsub = self.client.get_async_pubsub().await?;

        for subject in subjects {
            pubsub.subscribe(subject).await?;
        }

        let stream = pubsub.into_on_message().filter_map(|message| async move {
            Some(BrokerMessage {
                subject: message.get_channel_name().to_string(),
                payload: message.get_payload_bytes().to_vec(),
                headers: None,
                reply_to: None,
                acknowledger: None,
            })
        });

        Ok(Box::pin(stream))
    }

    async fn publish(
        &self,
        subject: &str,
        payload: &[u8],
        _headers: Option<&HeaderMap>,
    ) -> Result<(), BrokerError> {
        let mut connection = self.client.get_multiplexed_async_connection().await?;
        redis::cmd("PUBLISH")
            .arg(subject)
            .arg(payload)
            .query_async::<()>(&mut connection)
            .await?;
        Ok(())
    }

    async fn request(
        &self,
        _subject: &str,
        _payload: &[u8],
        _headers: Option<&HeaderMap>,
    ) -> Result<BrokerMessage, BrokerError> {
        Err(anyhow::anyhow!("Redis pub/sub does not support request-reply").into())
    }
}