amqpsy 0.1.0

Extremely opinionated AMQP PubSub library
Documentation
#![doc = include_str!("../README.md")]

use std::sync::OnceLock;

use lapin::Channel;

#[cfg(feature = "chaos")]
mod chaos;
#[cfg(feature = "consumer")]
pub mod consumers;
mod ext;
mod protogen;
#[cfg(feature = "publisher")]
pub mod publishers;
mod telemetry;

#[derive(Debug, Clone)]
pub struct AmqpConfig {
    pub connection_string: String,
}

static POOL: OnceLock<deadpool_lapin::Pool> = OnceLock::new();

pub(crate) async fn new_ampq_channel(config: &AmqpConfig) -> anyhow::Result<Channel> {
    let pool = POOL.get_or_init(|| {
        tracing::debug!("Creating new pool for {}", config.connection_string);
        let pool_config = deadpool_lapin::Config {
            url: Some(config.connection_string.clone()),
            ..Default::default()
        };
        pool_config
            .create_pool(Some(deadpool_lapin::Runtime::Tokio1))
            .expect("create rabbitmq pool")
    });

    tracing::debug!("New channel for {}", config.connection_string);
    let connection = pool.get().await?;
    let channel = connection.create_channel().await?;

    Ok(channel)
}