easy-rmq-rs 1.0.2

Easy-to-use RabbitMQ library for Rust
Documentation
use crate::error::{AmqpError, Result};
use deadpool::managed::{Manager, Pool, RecycleError, RecycleResult};
use lapin::{Channel, Connection, ConnectionProperties};
use std::future::Future;
use std::sync::Arc;
use tokio::sync::Mutex;

pub struct AmqpConnectionManager {
    uri: String,
    connection_name: String,
}

impl AmqpConnectionManager {
    pub fn new(uri: String, connection_name: String) -> Self {
        Self { uri, connection_name }
    }
}

impl Manager for AmqpConnectionManager {
    type Type = Connection;
    type Error = lapin::Error;

    fn create(&self) -> impl Future<Output = std::result::Result<Self::Type, Self::Error>> + Send {
        let uri = self.uri.clone();
        let connection_name = self.connection_name.clone();
        async move {
            let opts = ConnectionProperties::default()
                .with_connection_name(connection_name.clone().into());

            Connection::connect(&uri, opts).await
        }
    }

    #[allow(clippy::manual_async_fn)]
    fn recycle(&self, conn: &mut Self::Type, _metrics: &deadpool::managed::Metrics) -> impl Future<Output = RecycleResult<Self::Error>> + Send {
        async move {
            if conn.status().connected() {
                Ok(())
            } else {
                Err(RecycleError::Backend(
                    std::io::Error::new(std::io::ErrorKind::ConnectionReset, "Connection not connected").into(),
                ))
            }
        }
    }
}

pub type AmqpPool = Pool<AmqpConnectionManager>;

pub fn create_pool(uri: String, connection_name: String, max_size: usize) -> Result<AmqpPool> {
    let manager = AmqpConnectionManager::new(uri, connection_name);
    let pool = Pool::builder(manager)
        .max_size(max_size)
        .build()
        .map_err(|e| AmqpError::PoolError(e.to_string()))?;

    Ok(pool)
}

#[derive(Clone)]
pub struct ChannelPool {
    pool: Arc<AmqpPool>,
    channel: Arc<Mutex<Option<Channel>>>,
}

impl ChannelPool {
    pub fn new(pool: Arc<AmqpPool>) -> Self {
        Self {
            pool,
            channel: Arc::new(Mutex::new(None)),
        }
    }

    pub async fn get_channel(&self) -> Result<Channel> {
        let mut cached = self.channel.lock().await;

        if let Some(channel) = cached.as_ref()
            && channel.status().connected()
        {
            return Ok(channel.clone());
        }

        let conn = self
            .pool
            .get()
            .await
            .map_err(|e| AmqpError::PoolError(e.to_string()))?;

        let channel = conn
            .create_channel()
            .await
            .map_err(AmqpError::ConnectionError)?;
        *cached = Some(channel.clone());

        Ok(channel)
    }
}