strev 0.5.0

Event-driven pub/sub messaging library with compile-time ack safety
Documentation
use std::sync::Arc;

use async_trait::async_trait;
use tokio_stream::StreamExt;

use crate::error::{CloseError, PublishError, SubscribeError};
use crate::message::{Message, Pending};
use crate::outcome::Outcome;
use crate::publisher::Publisher;
use crate::stream::MessageStream;
use crate::subscriber::Subscriber;
use crate::topic::Topic;

type MessageTransformFn = Arc<dyn Fn(&mut Message<Pending>) + Send + Sync>;

pub trait PublisherDecorator: Send + Sync {
    fn decorate(&self, publisher: Box<dyn Publisher>) -> Box<dyn Publisher>;
}

pub trait SubscriberDecorator: Send + Sync {
    fn decorate(&self, subscriber: Box<dyn Subscriber>) -> Box<dyn Subscriber>;
}

pub struct MessageTransformPublisherDecorator {
    pub transform: MessageTransformFn,
}

impl PublisherDecorator for MessageTransformPublisherDecorator {
    fn decorate(&self, publisher: Box<dyn Publisher>) -> Box<dyn Publisher> {
        Box::new(TransformedPublisher {
            inner: publisher,
            transform: self.transform.clone(),
        })
    }
}

struct TransformedPublisher {
    inner: Box<dyn Publisher>,
    transform: MessageTransformFn,
}

#[async_trait]
impl Publisher for TransformedPublisher {
    async fn publish(
        &self,
        topic: &Topic,
        messages: Vec<Message<Pending>>,
    ) -> Result<Vec<Outcome>, PublishError> {
        let transformed = messages
            .into_iter()
            .map(|mut m| {
                (self.transform)(&mut m);
                m
            })
            .collect();
        self.inner.publish(topic, transformed).await
    }

    async fn close(&mut self) -> Result<(), CloseError> {
        self.inner.close().await
    }
}

pub struct MessageTransformSubscriberDecorator {
    pub transform: MessageTransformFn,
}

impl SubscriberDecorator for MessageTransformSubscriberDecorator {
    fn decorate(&self, subscriber: Box<dyn Subscriber>) -> Box<dyn Subscriber> {
        Box::new(TransformedSubscriber {
            inner: subscriber,
            transform: self.transform.clone(),
        })
    }
}

struct TransformedSubscriber {
    inner: Box<dyn Subscriber>,
    transform: MessageTransformFn,
}

#[async_trait]
impl Subscriber for TransformedSubscriber {
    async fn subscribe(&self, topic: &Topic) -> Result<MessageStream, SubscribeError> {
        let mut inner_stream = self.inner.subscribe(topic).await?;
        let (tx, stream) = MessageStream::channel(256);
        let transform = self.transform.clone();

        tokio::spawn(async move {
            while let Some(mut msg) = inner_stream.next().await {
                transform(&mut msg);
                if tx.send(msg).await.is_err() {
                    break;
                }
            }
        });

        Ok(stream)
    }

    async fn close(&mut self) -> Result<(), CloseError> {
        self.inner.close().await
    }
}