1use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use futures::Stream;
7use log::error;
8use tokio::time::{self, Duration};
9
10use crate::error::BrokerError;
11use crate::{
12 protocol::{Message, TryDeserializeMessage},
13 routing::Rule,
14};
15
16mod amqp;
17mod redis;
18pub use self::redis::{RedisBroker, RedisBrokerBuilder};
19pub use amqp::{AMQPBroker, AMQPBrokerBuilder};
20
21#[cfg(test)]
22pub mod mock;
23#[cfg(test)]
24use std::any::Any;
25
26#[async_trait]
28pub trait Delivery: TryDeserializeMessage + Send + Sync + std::fmt::Debug {
29 async fn resend(
30 &self,
31 broker: &dyn Broker,
32 eta: Option<DateTime<Utc>>,
33 ) -> Result<(), BrokerError>;
34 async fn remove(&self) -> Result<(), BrokerError>;
35 async fn ack(&self) -> Result<(), BrokerError>;
36}
37
38pub trait DeliveryError: std::fmt::Display + Send + Sync {}
40
41pub trait DeliveryStream:
47 Stream<Item = Result<Box<dyn Delivery>, Box<dyn DeliveryError>>> + Unpin + Send
48{
49}
50
51#[async_trait]
53pub trait Broker: Send + Sync {
54 fn safe_url(&self) -> String;
57
58 async fn consume(
67 &self,
68 queue: &str,
69 error_handler: Box<dyn Fn(BrokerError) + Send + Sync + 'static>,
70 ) -> Result<(String, Box<dyn DeliveryStream>), BrokerError>;
71
72 async fn cancel(&self, consumer_tag: &str) -> Result<(), BrokerError>;
74
75 async fn ack(&self, delivery: &dyn Delivery) -> Result<(), BrokerError>;
77
78 async fn retry(
80 &self,
81 delivery: &dyn Delivery,
82 eta: Option<DateTime<Utc>>,
83 ) -> Result<(), BrokerError>;
84
85 async fn send(&self, message: &Message, queue: &str) -> Result<(), BrokerError>;
87
88 async fn increase_prefetch_count(&self) -> Result<(), BrokerError>;
91
92 async fn decrease_prefetch_count(&self) -> Result<(), BrokerError>;
95
96 async fn close(&self) -> Result<(), BrokerError>;
98
99 async fn reconnect(&self, connection_timeout: u32) -> Result<(), BrokerError>;
101
102 #[cfg(test)]
103 fn into_any(self: Box<Self>) -> Box<dyn Any>;
104}
105
106#[async_trait]
108pub trait BrokerBuilder: Send + Sync {
109 fn new(broker_url: &str) -> Self
111 where
112 Self: Sized;
113
114 fn prefetch_count(self: Box<Self>, prefetch_count: u16) -> Box<dyn BrokerBuilder>;
116
117 fn declare_queue(self: Box<Self>, name: &str) -> Box<dyn BrokerBuilder>;
119
120 fn heartbeat(self: Box<Self>, heartbeat: Option<u16>) -> Box<dyn BrokerBuilder>;
122
123 async fn build(&self, connection_timeout: u32) -> Result<Box<dyn Broker>, BrokerError>;
125}
126
127pub(crate) fn broker_builder_from_url(broker_url: &str) -> Box<dyn BrokerBuilder> {
128 match broker_url.split_once("://") {
129 Some(("amqp", _)) => Box::new(AMQPBrokerBuilder::new(broker_url)),
130 Some(("redis", _)) => Box::new(RedisBrokerBuilder::new(broker_url)),
131 #[cfg(test)]
132 Some(("mock", _)) => Box::new(mock::MockBrokerBuilder::new(broker_url)),
133 _ => panic!("Unsupported broker"),
134 }
135}
136
137pub(crate) fn configure_task_routes(
141 mut broker_builder: Box<dyn BrokerBuilder>,
142 task_routes: &[(String, String)],
143) -> Result<(Box<dyn BrokerBuilder>, Vec<Rule>), BrokerError> {
144 let mut rules: Vec<Rule> = Vec::with_capacity(task_routes.len());
145 for (pattern, queue) in task_routes {
146 let rule = Rule::new(pattern, queue)?;
147 rules.push(rule);
148 broker_builder = broker_builder.declare_queue(queue);
150 }
151
152 Ok((broker_builder, rules))
153}
154
155pub(crate) async fn build_and_connect(
158 broker_builder: Box<dyn BrokerBuilder>,
159 connection_timeout: u32,
160 connection_max_retries: u32,
161 connection_retry_delay: u32,
162) -> Result<Box<dyn Broker>, BrokerError> {
163 let mut broker: Option<Box<dyn Broker>> = None;
164
165 for _ in 0..connection_max_retries {
166 match broker_builder.build(connection_timeout).await {
167 Err(err) => {
168 if err.is_connection_error() {
169 error!("{}", err);
170 error!(
171 "Failed to establish connection with broker, trying again in {}s...",
172 connection_retry_delay
173 );
174 time::sleep(Duration::from_secs(connection_retry_delay as u64)).await;
175 continue;
176 }
177 return Err(err);
178 }
179 Ok(b) => {
180 broker = Some(b);
181 break;
182 }
183 };
184 }
185
186 broker.ok_or_else(|| {
187 error!("Failed to establish connection with broker");
188 BrokerError::NotConnected
189 })
190}