use crate::config::{BrokerType, ExchangeConfig, MessagingConfig, QueueConfig};
use crate::error::MessagingError;
use crate::message::{ConsumeOptions, PublishOptions, RawMessage};
use crate::traits::{BrokerChannel, BrokerConnection, MessageStream, Publisher};
use async_trait::async_trait;
use serde::Serialize;
use std::sync::Arc;
use tracing::{debug, info};
pub struct MessageBroker {
config: MessagingConfig,
connection: Arc<dyn BrokerConnection>,
}
impl MessageBroker {
pub async fn connect(config: MessagingConfig) -> Result<Self, MessagingError> {
info!(
broker = %config.broker_type,
"Connecting to message broker"
);
let connection: Arc<dyn BrokerConnection> = match config.broker_type {
#[cfg(feature = "rabbitmq")]
BrokerType::RabbitMQ => {
Arc::new(crate::rabbitmq::RabbitMQConnection::connect(&config).await?)
}
#[cfg(feature = "kafka")]
BrokerType::Kafka => {
Arc::new(crate::kafka::KafkaConnection::connect(&config).await?)
}
#[cfg(feature = "nats")]
BrokerType::Nats => {
Arc::new(crate::nats::NatsConnection::connect(&config).await?)
}
#[allow(unreachable_patterns)]
_ => {
return Err(MessagingError::BackendNotAvailable(format!(
"{} backend is not enabled",
config.broker_type
)));
}
};
Ok(Self { config, connection })
}
pub fn broker_type(&self) -> &BrokerType {
&self.config.broker_type
}
pub async fn is_healthy(&self) -> bool {
self.connection.is_healthy().await
}
pub async fn channel(&self) -> Result<BrokerChannelHandle, MessagingError> {
let channel = self.connection.channel().await?;
Ok(BrokerChannelHandle {
channel,
default_exchange: None,
})
}
pub async fn publish<T: Serialize + Send + Sync>(
&self,
exchange: &str,
routing_key: &str,
message: &T,
) -> Result<(), MessagingError> {
let channel = self.channel().await?;
let options = PublishOptions::new(routing_key).exchange(exchange);
channel.publish_with_options(&options, message).await
}
pub async fn close(&self) -> Result<(), MessagingError> {
info!(broker = %self.config.broker_type, "Closing message broker connection");
self.connection.close().await
}
}
pub struct BrokerChannelHandle {
channel: Box<dyn BrokerChannel>,
default_exchange: Option<String>,
}
impl BrokerChannelHandle {
pub fn with_default_exchange(mut self, exchange: &str) -> Self {
self.default_exchange = Some(exchange.to_string());
self
}
pub async fn declare_queue(&self, config: &QueueConfig) -> Result<(), MessagingError> {
debug!(queue = %config.name, "Declaring queue");
self.channel.declare_queue(config).await
}
pub async fn declare_exchange(&self, config: &ExchangeConfig) -> Result<(), MessagingError> {
debug!(exchange = %config.name, "Declaring exchange");
self.channel.declare_exchange(config).await
}
pub async fn bind_queue(
&self,
queue: &str,
exchange: &str,
routing_key: &str,
) -> Result<(), MessagingError> {
debug!(
queue = queue,
exchange = exchange,
routing_key = routing_key,
"Binding queue to exchange"
);
self.channel.bind_queue(queue, exchange, routing_key).await
}
pub async fn publish<T: Serialize + Send + Sync>(
&self,
routing_key: &str,
message: &T,
) -> Result<(), MessagingError> {
let mut options = PublishOptions::new(routing_key);
if let Some(exchange) = &self.default_exchange {
options = options.exchange(exchange);
}
self.publish_with_options(&options, message).await
}
pub async fn publish_with_options<T: Serialize + Send + Sync>(
&self,
options: &PublishOptions,
message: &T,
) -> Result<(), MessagingError> {
debug!(
routing_key = %options.routing_key,
exchange = ?options.exchange,
"Publishing message"
);
self.channel.publish(options, message).await
}
pub async fn consume(&self, options: ConsumeOptions) -> Result<MessageStream, MessagingError> {
debug!(queue = %options.queue, "Starting consumer");
self.channel.consume(options).await
}
pub async fn ack(&self, delivery_tag: u64) -> Result<(), MessagingError> {
self.channel.ack(delivery_tag).await
}
pub async fn reject(&self, delivery_tag: u64, requeue: bool) -> Result<(), MessagingError> {
self.channel.reject(delivery_tag, requeue).await
}
pub async fn close(&self) -> Result<(), MessagingError> {
self.channel.close().await
}
}
#[async_trait]
impl Publisher for BrokerChannelHandle {
async fn publish<T: Serialize + Send + Sync>(
&self,
routing_key: &str,
message: &T,
) -> Result<(), MessagingError> {
BrokerChannelHandle::publish(self, routing_key, message).await
}
async fn publish_with_options<T: Serialize + Send + Sync>(
&self,
options: &PublishOptions,
message: &T,
) -> Result<(), MessagingError> {
BrokerChannelHandle::publish_with_options(self, options, message).await
}
}
pub async fn setup_queue(
channel: &BrokerChannelHandle,
queue_name: &str,
exchange_name: &str,
exchange_type: crate::config::ExchangeType,
routing_keys: &[&str],
) -> Result<(), MessagingError> {
channel
.declare_exchange(&ExchangeConfig::new(exchange_name, exchange_type))
.await?;
channel.declare_queue(&QueueConfig::new(queue_name)).await?;
for routing_key in routing_keys {
channel
.bind_queue(queue_name, exchange_name, routing_key)
.await?;
}
Ok(())
}
pub async fn process_messages<T, F, Fut>(
channel: &BrokerChannelHandle,
stream: &mut MessageStream,
handler: F,
) -> Result<(), MessagingError>
where
T: serde::de::DeserializeOwned,
F: Fn(crate::message::Message<T>) -> Fut,
Fut: std::future::Future<Output = crate::message::AckResult>,
{
use futures::StreamExt;
while let Some(msg_result) = stream.next().await {
match msg_result {
Ok(raw_msg) => {
let delivery_tag = raw_msg.delivery_tag;
match raw_msg.deserialize::<T>() {
Ok(msg) => {
let result = handler(msg).await;
match result {
crate::message::AckResult::Acked => {
channel.ack(delivery_tag).await?;
}
crate::message::AckResult::Requeued => {
channel.reject(delivery_tag, true).await?;
}
crate::message::AckResult::Rejected => {
channel.reject(delivery_tag, false).await?;
}
}
}
Err(e) => {
tracing::error!(error = %e, "Failed to deserialize message");
channel.reject(delivery_tag, false).await?;
}
}
}
Err(e) => {
tracing::error!(error = %e, "Error receiving message");
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_channel_handle_default_exchange() {
}
}