use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::Stream;
use log::error;
use tokio::time::{self, Duration};
use crate::error::BrokerError;
use crate::{
protocol::{Message, TryDeserializeMessage},
routing::Rule,
};
mod amqp;
mod redis;
pub use self::redis::{RedisBroker, RedisBrokerBuilder};
pub use amqp::{AMQPBroker, AMQPBrokerBuilder};
#[cfg(test)]
pub mod mock;
#[cfg(test)]
use std::any::Any;
#[async_trait]
pub trait Delivery: TryDeserializeMessage + Send + Sync + std::fmt::Debug {
async fn resend(
&self,
broker: &dyn Broker,
eta: Option<DateTime<Utc>>,
) -> Result<(), BrokerError>;
async fn remove(&self) -> Result<(), BrokerError>;
async fn ack(&self) -> Result<(), BrokerError>;
}
pub trait DeliveryError: std::fmt::Display + Send + Sync {}
pub trait DeliveryStream:
Stream<Item = Result<Box<dyn Delivery>, Box<dyn DeliveryError>>> + Unpin
{
}
#[async_trait]
pub trait Broker: Send + Sync {
fn safe_url(&self) -> String;
async fn consume(
&self,
queue: &str,
error_handler: Box<dyn Fn(BrokerError) + Send + Sync + 'static>,
) -> Result<(String, Box<dyn DeliveryStream>), BrokerError>;
async fn cancel(&self, consumer_tag: &str) -> Result<(), BrokerError>;
async fn ack(&self, delivery: &dyn Delivery) -> Result<(), BrokerError>;
async fn retry(
&self,
delivery: &dyn Delivery,
eta: Option<DateTime<Utc>>,
) -> Result<(), BrokerError>;
async fn send(&self, message: &Message, queue: &str) -> Result<(), BrokerError>;
async fn increase_prefetch_count(&self) -> Result<(), BrokerError>;
async fn decrease_prefetch_count(&self) -> Result<(), BrokerError>;
async fn close(&self) -> Result<(), BrokerError>;
async fn reconnect(&self, connection_timeout: u32) -> Result<(), BrokerError>;
#[cfg(test)]
fn into_any(self: Box<Self>) -> Box<dyn Any>;
}
#[async_trait]
pub trait BrokerBuilder: Send + Sync {
fn new(broker_url: &str) -> Self
where
Self: Sized;
fn prefetch_count(self: Box<Self>, prefetch_count: u16) -> Box<dyn BrokerBuilder>;
fn declare_queue(self: Box<Self>, name: &str) -> Box<dyn BrokerBuilder>;
fn heartbeat(self: Box<Self>, heartbeat: Option<u16>) -> Box<dyn BrokerBuilder>;
async fn build(&self, connection_timeout: u32) -> Result<Box<dyn Broker>, BrokerError>;
}
pub(crate) fn broker_builder_from_url(broker_url: &str) -> Box<dyn BrokerBuilder> {
match broker_url.split_once("://") {
Some(("amqp", _)) => Box::new(AMQPBrokerBuilder::new(broker_url)),
Some(("redis", _)) => Box::new(RedisBrokerBuilder::new(broker_url)),
#[cfg(test)]
Some(("mock", _)) => Box::new(mock::MockBrokerBuilder::new(broker_url)),
_ => panic!("Unsupported broker"),
}
}
pub(crate) fn configure_task_routes(
mut broker_builder: Box<dyn BrokerBuilder>,
task_routes: &[(String, String)],
) -> Result<(Box<dyn BrokerBuilder>, Vec<Rule>), BrokerError> {
let mut rules: Vec<Rule> = Vec::with_capacity(task_routes.len());
for (pattern, queue) in task_routes {
let rule = Rule::new(pattern, queue)?;
rules.push(rule);
broker_builder = broker_builder.declare_queue(queue);
}
Ok((broker_builder, rules))
}
pub(crate) async fn build_and_connect(
broker_builder: Box<dyn BrokerBuilder>,
connection_timeout: u32,
connection_max_retries: u32,
connection_retry_delay: u32,
) -> Result<Box<dyn Broker>, BrokerError> {
let mut broker: Option<Box<dyn Broker>> = None;
for _ in 0..connection_max_retries {
match broker_builder.build(connection_timeout).await {
Err(err) => {
if err.is_connection_error() {
error!("{}", err);
error!(
"Failed to establish connection with broker, trying again in {}s...",
connection_retry_delay
);
time::sleep(Duration::from_secs(connection_retry_delay as u64)).await;
continue;
}
return Err(err);
}
Ok(b) => {
broker = Some(b);
break;
}
};
}
broker.ok_or_else(|| {
error!("Failed to establish connection with broker");
BrokerError::NotConnected
})
}