shindo_coding_utils 0.4.2

A utils crates which will be used in various micro-services
Documentation
use anyhow::Result;
use lapin::{
    options::{BasicConsumeOptions, BasicPublishOptions, BasicQosOptions}, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties
};
use serde_json::Value;
use tracing::info;

pub async fn publish_message(
    channel: &Channel,
    routing_key: &str,
    payload: Value,
) -> Result<(), lapin::Error> {
    let payload_bytes = serde_json::to_vec(&payload).unwrap();

    channel
        .basic_publish(
            "events",
            routing_key,
            BasicPublishOptions::default(),
            &payload_bytes,
            BasicProperties::default(),
        )
        .await?;

    Ok(())
}

pub async fn create_rabbitmq_channel(rabbitmq_url: &str) -> Result<Channel, lapin::Error> {
    info!(
        "[RabbitMQ module] Attempting to connect to RabbitMQ at {}",
        rabbitmq_url
    );
    let conn = Connection::connect(rabbitmq_url, ConnectionProperties::default()).await?;
    info!("[RabbitMQ module] Successfully connected to RabbitMQ.");
    let channel = conn.create_channel().await?;
    info!("[RabbitMQ module] RabbitMQ channel created.");

    // Set the prefetch count
    channel
        .basic_qos(
            1, // Prefetch count
            BasicQosOptions::default(),
        )
        .await?;

    Ok(channel)
}

/// # Example: How to Write a RabbitMQ Consumer
///
/// This example demonstrates a typical consumer loop using Lapin and an application state.
/// The consumer pulls messages from RabbitMQ and processes them one at a time.
/// If the loop exits, it indicates a lost connection or cancellation.
///
/// ```rust
/// pub async fn consume(channel: lapin::Channel, app_state: AppState, consumer: lapin::Consumer) -> Result<(), anyhow::Error> {
///     let mut consumer = consumer.clone();
///     while let Some(delivery) = consumer.try_next().await? {
///         // Handle the received message
///         handle_message(delivery, &channel, &app_state).await;
///     }
///
///     // This block is reached if the consumer loop exits unexpectedly
///     warn!("RabbitMQ consumer loop exited. Connection may be lost.");
///     Err(anyhow::anyhow!("RabbitMQ consumer disconnected"))
/// }
/// ```
///
/// ## Notes
/// - Each `delivery` received in the loop represents a RabbitMQ message to be processed.
/// - The `handle_message` function should contain the logic for message handling and acknowledgment.
/// - If the connection to RabbitMQ is lost, proper error handling and reconnection logic should be implemented.
/// Creates a RabbitMQ consumer for the given queue.
///
/// # Arguments
/// * `channel` - A reference to an established RabbitMQ channel.
/// * `queue_name` - The name of the queue from which to consume messages.
/// * `consumer_tag` - An optional consumer tag; if not provided, RabbitMQ will generate one.
///
/// # Returns
/// Returns a [`lapin::Consumer`] instance on success, which can be used to receive messages asynchronously.
///
/// # Example
/// ```rust
/// let consumer = create_consumer(&channel, "my_queue", None).await?;
/// while let Some(delivery) = consumer.try_next().await? {
///     // Handle message
/// }
/// ```
///
/// # Notes
/// - Providing an empty string as `consumer_tag` allows RabbitMQ to generate a tag automatically.
/// - This function uses default options for consuming and field table. For custom behavior, adjust the options as needed.
/// - Logging is performed to trace consumer creation for observability.
pub async fn create_consumer(
    channel: &Channel,
    queue_name: &str,
    consumer_tag: Option<&str>,
) -> Result<lapin::Consumer, lapin::Error> {
    info!(
        "[RabbitMQ module] Creating consumer for queue: {}",
        queue_name
    );
    let consumer_tag = consumer_tag.unwrap_or(""); // Use empty string to let RabbitMQ generate a tag
    let consumer = channel
        .basic_consume(
            queue_name,
            consumer_tag,
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await?;

    Ok(consumer)
}