use anyhow::Result;
use lapin::{
options::{BasicConsumeOptions, BasicPublishOptions, BasicQosOptions}, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties
};
use serde_json::Value;
use tracing::info;
pub async fn publish_message(
channel: &Channel,
routing_key: &str,
payload: Value,
) -> Result<(), lapin::Error> {
let payload_bytes = serde_json::to_vec(&payload).unwrap();
channel
.basic_publish(
"events",
routing_key,
BasicPublishOptions::default(),
&payload_bytes,
BasicProperties::default(),
)
.await?;
Ok(())
}
pub async fn create_rabbitmq_channel(rabbitmq_url: &str) -> Result<Channel, lapin::Error> {
info!(
"[RabbitMQ module] Attempting to connect to RabbitMQ at {}",
rabbitmq_url
);
let conn = Connection::connect(rabbitmq_url, ConnectionProperties::default()).await?;
info!("[RabbitMQ module] Successfully connected to RabbitMQ.");
let channel = conn.create_channel().await?;
info!("[RabbitMQ module] RabbitMQ channel created.");
channel
.basic_qos(
1, BasicQosOptions::default(),
)
.await?;
Ok(channel)
}
pub async fn create_consumer(
channel: &Channel,
queue_name: &str,
consumer_tag: Option<&str>,
) -> Result<lapin::Consumer, lapin::Error> {
info!(
"[RabbitMQ module] Creating consumer for queue: {}",
queue_name
);
let consumer_tag = consumer_tag.unwrap_or(""); let consumer = channel
.basic_consume(
queue_name,
consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
Ok(consumer)
}