celery/broker/
mod.rs

1//! The broker is an integral part of a [`Celery`](crate::Celery) app. It provides the transport for messages that
2//! encode tasks.
3
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use futures::Stream;
7use log::error;
8use tokio::time::{self, Duration};
9
10use crate::error::BrokerError;
11use crate::{
12    protocol::{Message, TryDeserializeMessage},
13    routing::Rule,
14};
15
16mod amqp;
17mod redis;
18pub use self::redis::{RedisBroker, RedisBrokerBuilder};
19pub use amqp::{AMQPBroker, AMQPBrokerBuilder};
20
21#[cfg(test)]
22pub mod mock;
23#[cfg(test)]
24use std::any::Any;
25
26/// The type representing a successful delivery.
27#[async_trait]
28pub trait Delivery: TryDeserializeMessage + Send + Sync + std::fmt::Debug {
29    async fn resend(
30        &self,
31        broker: &dyn Broker,
32        eta: Option<DateTime<Utc>>,
33    ) -> Result<(), BrokerError>;
34    async fn remove(&self) -> Result<(), BrokerError>;
35    async fn ack(&self) -> Result<(), BrokerError>;
36}
37
38/// The error type of an unsuccessful delivery.
39pub trait DeliveryError: std::fmt::Display + Send + Sync {}
40
41/// The stream type that the [`Celery`](crate::Celery) app will consume deliveries from.
42///
43/// AMQP broker supports Send + Sync with lapin 3.6.0+, but Redis broker still has limitations
44/// due to underlying redis-rs and deadpool-redis clients not being fully Sync.
45/// TODO: Consider alternative Redis client that supports Sync or use different async patterns
46pub trait DeliveryStream:
47    Stream<Item = Result<Box<dyn Delivery>, Box<dyn DeliveryError>>> + Unpin + Send
48{
49}
50
51/// A message [`Broker`] is used as the transport for producing or consuming tasks.
52#[async_trait]
53pub trait Broker: Send + Sync {
54    /// Return a string representation of the broker URL with any sensitive information
55    /// redacted.
56    fn safe_url(&self) -> String;
57
58    /// Consume messages from a queue.
59    ///
60    /// If the connection is successful, this should return a unique consumer tag and a
61    /// corresponding stream of `Result`s where an `Ok`
62    /// value is a [`Self::Delivery`](trait.Broker.html#associatedtype.Delivery)
63    /// type that can be coerced into a [`Message`](protocol/struct.Message.html)
64    /// and an `Err` value is a
65    /// [`Self::DeliveryError`](trait.Broker.html#associatedtype.DeliveryError) type.
66    async fn consume(
67        &self,
68        queue: &str,
69        error_handler: Box<dyn Fn(BrokerError) + Send + Sync + 'static>,
70    ) -> Result<(String, Box<dyn DeliveryStream>), BrokerError>;
71
72    /// Cancel the consumer with the given `consumer_tag`.
73    async fn cancel(&self, consumer_tag: &str) -> Result<(), BrokerError>;
74
75    /// Acknowledge a [`Delivery`](trait.Broker.html#associatedtype.Delivery) for deletion.
76    async fn ack(&self, delivery: &dyn Delivery) -> Result<(), BrokerError>;
77
78    /// Retry a delivery.
79    async fn retry(
80        &self,
81        delivery: &dyn Delivery,
82        eta: Option<DateTime<Utc>>,
83    ) -> Result<(), BrokerError>;
84
85    /// Send a [`Message`](protocol/struct.Message.html) into a queue.
86    async fn send(&self, message: &Message, queue: &str) -> Result<(), BrokerError>;
87
88    /// Increase the `prefetch_count`. This has to be done when a task with a future
89    /// ETA is consumed.
90    async fn increase_prefetch_count(&self) -> Result<(), BrokerError>;
91
92    /// Decrease the `prefetch_count`. This has to be done after a task with a future
93    /// ETA is executed.
94    async fn decrease_prefetch_count(&self) -> Result<(), BrokerError>;
95
96    /// Clone all channels and connection.
97    async fn close(&self) -> Result<(), BrokerError>;
98
99    /// Try reconnecting in the event of some sort of connection error.
100    async fn reconnect(&self, connection_timeout: u32) -> Result<(), BrokerError>;
101
102    #[cfg(test)]
103    fn into_any(self: Box<Self>) -> Box<dyn Any>;
104}
105
106/// A [`BrokerBuilder`] is used to create a type of broker with a custom configuration.
107#[async_trait]
108pub trait BrokerBuilder: Send + Sync {
109    /// Create a new `BrokerBuilder`.
110    fn new(broker_url: &str) -> Self
111    where
112        Self: Sized;
113
114    /// Set the prefetch count.
115    fn prefetch_count(self: Box<Self>, prefetch_count: u16) -> Box<dyn BrokerBuilder>;
116
117    /// Declare a queue.
118    fn declare_queue(self: Box<Self>, name: &str) -> Box<dyn BrokerBuilder>;
119
120    /// Set the heartbeat.
121    fn heartbeat(self: Box<Self>, heartbeat: Option<u16>) -> Box<dyn BrokerBuilder>;
122
123    /// Construct the `Broker` with the given configuration.
124    async fn build(&self, connection_timeout: u32) -> Result<Box<dyn Broker>, BrokerError>;
125}
126
127pub(crate) fn broker_builder_from_url(broker_url: &str) -> Box<dyn BrokerBuilder> {
128    match broker_url.split_once("://") {
129        Some(("amqp", _)) => Box::new(AMQPBrokerBuilder::new(broker_url)),
130        Some(("redis", _)) => Box::new(RedisBrokerBuilder::new(broker_url)),
131        #[cfg(test)]
132        Some(("mock", _)) => Box::new(mock::MockBrokerBuilder::new(broker_url)),
133        _ => panic!("Unsupported broker"),
134    }
135}
136
137// TODO: this function consumes the broker_builder, which results in a not so ergonomic API.
138// Can it be improved?
139/// A utility function to configure the task routes on a broker builder.
140pub(crate) fn configure_task_routes(
141    mut broker_builder: Box<dyn BrokerBuilder>,
142    task_routes: &[(String, String)],
143) -> Result<(Box<dyn BrokerBuilder>, Vec<Rule>), BrokerError> {
144    let mut rules: Vec<Rule> = Vec::with_capacity(task_routes.len());
145    for (pattern, queue) in task_routes {
146        let rule = Rule::new(pattern, queue)?;
147        rules.push(rule);
148        // Ensure all other queues mentioned in task_routes are declared to the broker.
149        broker_builder = broker_builder.declare_queue(queue);
150    }
151
152    Ok((broker_builder, rules))
153}
154
155/// A utility function that can be used to build a broker
156/// and initialize the connection.
157pub(crate) async fn build_and_connect(
158    broker_builder: Box<dyn BrokerBuilder>,
159    connection_timeout: u32,
160    connection_max_retries: u32,
161    connection_retry_delay: u32,
162) -> Result<Box<dyn Broker>, BrokerError> {
163    let mut broker: Option<Box<dyn Broker>> = None;
164
165    for _ in 0..connection_max_retries {
166        match broker_builder.build(connection_timeout).await {
167            Err(err) => {
168                if err.is_connection_error() {
169                    error!("{}", err);
170                    error!(
171                        "Failed to establish connection with broker, trying again in {}s...",
172                        connection_retry_delay
173                    );
174                    time::sleep(Duration::from_secs(connection_retry_delay as u64)).await;
175                    continue;
176                }
177                return Err(err);
178            }
179            Ok(b) => {
180                broker = Some(b);
181                break;
182            }
183        };
184    }
185
186    broker.ok_or_else(|| {
187        error!("Failed to establish connection with broker");
188        BrokerError::NotConnected
189    })
190}