easy-rmq-rs 1.0.2

Easy-to-use RabbitMQ library for Rust
Documentation
use crate::{ChannelPool, Result, Subscriber, default_exchange_for_kind, middleware::Middleware, registry::HandlerFn};
use lapin::ExchangeKind;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

pub struct Data<T>(pub Arc<T>);

impl<T> Data<T> {
    pub fn new(data: T) -> Self {
        Self(Arc::new(data))
    }
}

impl<T> Clone for Data<T> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl<T> AsRef<T> for Data<T> {
    fn as_ref(&self) -> &T {
        &self.0
    }
}

impl<T> std::ops::Deref for Data<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

pub struct WorkerBuilderWithData<T> {
    builder: WorkerBuilder,
    data: Arc<T>,
}

impl<T> WorkerBuilderWithData<T>
where
    T: Send + Sync + 'static + Clone,
{
    pub fn pool(mut self, pool: Arc<ChannelPool>) -> Self {
        self.builder.channel_pool = Some(pool);
        self
    }

    pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
        self.builder.exchange = exchange.into();
        self
    }

    pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
        self.builder.routing_key = Some(routing_key.into());
        self
    }

    pub fn queue(mut self, queue: impl Into<String>) -> Self {
        self.builder.queue = Some(queue.into());
        self
    }

    pub fn retry(mut self, max_retries: u32, delay_ms: u64) -> Self {
        self.builder.retry_enabled = true;
        self.builder.max_retries = max_retries;
        self.builder.retry_delay = Duration::from_millis(delay_ms);
        self
    }

    pub fn prefetch(mut self, count: u16) -> Self {
        self.builder.prefetch = count;
        self
    }

    pub fn concurrency(mut self, count: u16) -> Self {
        self.builder.concurrency = Some(count);
        self
    }

    pub fn parallelize<F>(mut self, spawn_fn: F) -> Self
    where
        F: Fn(
                Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
            ) -> tokio::task::JoinHandle<Result<()>>
            + Send
            + Sync
            + 'static,
    {
        self.builder.spawn_fn = Some(Arc::new(spawn_fn));
        self
    }

    pub fn single_active_consumer(mut self, enabled: bool) -> Self {
        self.builder.single_active_consumer = enabled;
        self
    }

    pub fn middleware<M>(mut self, middleware: M) -> Self
    where
        M: Middleware + 'static,
    {
        self.builder.middlewares.push(Arc::new(middleware));
        self
    }

    pub fn build<F, Fut>(self, handler: F) -> BuiltWorker
    where
        F: Fn(Data<T>, Vec<u8>) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<()>> + Send + 'static,
    {
        let pool = self.builder.channel_pool.expect("Pool must be set with .pool()");

        let data = self.data.clone();
        let handler_arc = Arc::new(handler);

        let wrapped_handler = move |payload: Vec<u8>| -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
            let data = data.clone();
            let handler = handler_arc.clone();
            Box::pin(async move {
                handler(Data(data), payload).await
            })
        };

        let subscriber = Subscriber::new(pool, self.builder.exchange_kind.clone())
            .with_exchange(&self.builder.exchange)
            .with_single_active_consumer(self.builder.single_active_consumer);

        BuiltWorker {
            exchange_kind: self.builder.exchange_kind,
            subscriber,
            routing_key: self.builder.routing_key,
            queue: self.builder.queue.expect("queue required"),
            handler: Box::new(wrapped_handler),
            retry_max_retries: if self.builder.retry_enabled { Some(self.builder.max_retries) } else { None },
            retry_delay: if self.builder.retry_enabled { Some(self.builder.retry_delay) } else { None },
            prefetch: self.builder.prefetch,
            concurrency: self.builder.concurrency,
            spawn_fn: self.builder.spawn_fn,
            middlewares: self.builder.middlewares,
        }
    }
}

pub type SpawnFn = Arc<
    dyn Fn(
            Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
        ) -> tokio::task::JoinHandle<Result<()>>
        + Send
        + Sync,
>;

pub struct WorkerBuilder {
    exchange_kind: ExchangeKind,
    exchange: String,
    channel_pool: Option<Arc<ChannelPool>>,
    routing_key: Option<String>,
    queue: Option<String>,
    retry_enabled: bool,
    max_retries: u32,
    retry_delay: Duration,
    prefetch: u16,
    concurrency: Option<u16>,
    spawn_fn: Option<SpawnFn>,
    single_active_consumer: bool,
    middlewares: Vec<Arc<dyn Middleware>>,
}

impl WorkerBuilder {
    pub fn new(exchange_kind: ExchangeKind) -> Self {
        let exchange = default_exchange_for_kind(&exchange_kind);

        Self {
            exchange_kind,
            exchange,
            channel_pool: None,
            routing_key: None,
            queue: None,
            retry_enabled: false,
            max_retries: 3,
            retry_delay: Duration::from_secs(60),
            prefetch: 1,
            concurrency: None,
            spawn_fn: None,
            single_active_consumer: false,
            middlewares: Vec::new(),
        }
    }

    pub fn pool(mut self, pool: Arc<ChannelPool>) -> Self {
        self.channel_pool = Some(pool);
        self
    }

    pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
        self.exchange = exchange.into();
        self
    }

    pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
        self.routing_key = Some(routing_key.into());
        self
    }

    pub fn queue(mut self, queue: impl Into<String>) -> Self {
        self.queue = Some(queue.into());
        self
    }

    pub fn retry(mut self, max_retries: u32, delay_ms: u64) -> Self {
        self.retry_enabled = true;
        self.max_retries = max_retries;
        self.retry_delay = Duration::from_millis(delay_ms);
        self
    }

    pub fn prefetch(mut self, count: u16) -> Self {
        self.prefetch = count;
        self
    }

    pub fn concurrency(mut self, count: u16) -> Self {
        self.concurrency = Some(count);
        self
    }

    pub fn parallelize<F>(mut self, spawn_fn: F) -> Self
    where
        F: Fn(
                Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
            ) -> tokio::task::JoinHandle<Result<()>>
            + Send
            + Sync
            + 'static,
    {
        self.spawn_fn = Some(Arc::new(spawn_fn));
        self
    }

    pub fn single_active_consumer(mut self, enabled: bool) -> Self {
        self.single_active_consumer = enabled;
        self
    }

    pub fn middleware<M>(mut self, middleware: M) -> Self
    where
        M: Middleware + 'static,
    {
        self.middlewares.push(Arc::new(middleware));
        self
    }

    pub fn data<T>(self, data: Data<T>) -> WorkerBuilderWithData<T>
    where
        T: Send + Sync + 'static + Clone,
    {
        WorkerBuilderWithData {
            builder: self,
            data: data.0,
        }
    }

    pub fn build<F, Fut>(self, handler: F) -> BuiltWorker
    where
        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<()>> + Send + 'static,
    {
        let pool = self.channel_pool.expect("Pool must be set with .pool()");

        let handler_arc = Arc::new(handler);
        let wrapped_handler = move |payload: Vec<u8>| -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
            let handler = handler_arc.clone();
            Box::pin(async move {
                handler(payload).await
            })
        };

        let subscriber = Subscriber::new(pool, self.exchange_kind.clone())
            .with_exchange(&self.exchange)
            .with_single_active_consumer(self.single_active_consumer);

        BuiltWorker {
            exchange_kind: self.exchange_kind,
            subscriber,
            routing_key: self.routing_key,
            queue: self.queue.expect("queue required"),
            handler: Box::new(wrapped_handler),
            retry_max_retries: if self.retry_enabled { Some(self.max_retries) } else { None },
            retry_delay: if self.retry_enabled { Some(self.retry_delay) } else { None },
            prefetch: self.prefetch,
            concurrency: self.concurrency,
            spawn_fn: self.spawn_fn,
            middlewares: self.middlewares,
        }
    }
}

pub struct BuiltWorker {
    exchange_kind: ExchangeKind,
    subscriber: Subscriber,
    routing_key: Option<String>,
    queue: String,
    handler: HandlerFn,
    retry_max_retries: Option<u32>,
    retry_delay: Option<Duration>,
    prefetch: u16,
    concurrency: Option<u16>,
    spawn_fn: Option<SpawnFn>,
    middlewares: Vec<Arc<dyn Middleware>>,
}

impl BuiltWorker {
    pub async fn run(self) -> Result<()> {
        let subscriber = if let (Some(max_retries), Some(delay)) = (self.retry_max_retries, self.retry_delay) {
            self.subscriber.with_retry(max_retries, delay)
        } else {
            self.subscriber
        };

        let subscriber = subscriber
            .with_prefetch(self.prefetch)
            .with_concurrency(self.concurrency)
            .with_spawn_fn(self.spawn_fn)
            .with_middlewares(self.middlewares);

        match self.exchange_kind {
            ExchangeKind::Direct => {
                subscriber.direct(&self.queue).build(self.handler).await
            }
            ExchangeKind::Topic => {
                let routing_key = self.routing_key.expect("routing_key required for Topic");
                subscriber.topic(&routing_key, &self.queue).build(self.handler).await
            }
            ExchangeKind::Fanout => {
                subscriber.fanout(&self.queue).build(self.handler).await
            }
            _ => panic!("Unsupported exchange kind"),
        }
    }
}