1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
//! The broker is an integral part of a [`Celery`](crate::Celery) app. It provides the transport for messages that
//! encode tasks.

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::Stream;
use log::error;
use tokio::time::{self, Duration};

use crate::error::BrokerError;
use crate::{
    protocol::{Message, TryDeserializeMessage},
    routing::Rule,
};

mod amqp;
mod redis;
pub use self::redis::{RedisBroker, RedisBrokerBuilder};
pub use amqp::{AMQPBroker, AMQPBrokerBuilder};

#[cfg(test)]
pub mod mock;
#[cfg(test)]
use std::any::Any;

/// The type representing a successful delivery.
#[async_trait]
pub trait Delivery: TryDeserializeMessage + Send + Sync + std::fmt::Debug {
    async fn resend(
        &self,
        broker: &dyn Broker,
        eta: Option<DateTime<Utc>>,
    ) -> Result<(), BrokerError>;
    async fn remove(&self) -> Result<(), BrokerError>;
    async fn ack(&self) -> Result<(), BrokerError>;
}

/// The error type of an unsuccessful delivery.
pub trait DeliveryError: std::fmt::Display + Send + Sync {}

/// The stream type that the [`Celery`](crate::Celery) app will consume deliveries from.
pub trait DeliveryStream:
    Stream<Item = Result<Box<dyn Delivery>, Box<dyn DeliveryError>>> + Unpin
{
}

/// A message [`Broker`] is used as the transport for producing or consuming tasks.
#[async_trait]
pub trait Broker: Send + Sync {
    /// Return a string representation of the broker URL with any sensitive information
    /// redacted.
    fn safe_url(&self) -> String;

    /// Consume messages from a queue.
    ///
    /// If the connection is successful, this should return a unique consumer tag and a
    /// corresponding stream of `Result`s where an `Ok`
    /// value is a [`Self::Delivery`](trait.Broker.html#associatedtype.Delivery)
    /// type that can be coerced into a [`Message`](protocol/struct.Message.html)
    /// and an `Err` value is a
    /// [`Self::DeliveryError`](trait.Broker.html#associatedtype.DeliveryError) type.
    async fn consume(
        &self,
        queue: &str,
        error_handler: Box<dyn Fn(BrokerError) + Send + Sync + 'static>,
    ) -> Result<(String, Box<dyn DeliveryStream>), BrokerError>;

    /// Cancel the consumer with the given `consumer_tag`.
    async fn cancel(&self, consumer_tag: &str) -> Result<(), BrokerError>;

    /// Acknowledge a [`Delivery`](trait.Broker.html#associatedtype.Delivery) for deletion.
    async fn ack(&self, delivery: &dyn Delivery) -> Result<(), BrokerError>;

    /// Retry a delivery.
    async fn retry(
        &self,
        delivery: &dyn Delivery,
        eta: Option<DateTime<Utc>>,
    ) -> Result<(), BrokerError>;

    /// Send a [`Message`](protocol/struct.Message.html) into a queue.
    async fn send(&self, message: &Message, queue: &str) -> Result<(), BrokerError>;

    /// Increase the `prefetch_count`. This has to be done when a task with a future
    /// ETA is consumed.
    async fn increase_prefetch_count(&self) -> Result<(), BrokerError>;

    /// Decrease the `prefetch_count`. This has to be done after a task with a future
    /// ETA is executed.
    async fn decrease_prefetch_count(&self) -> Result<(), BrokerError>;

    /// Clone all channels and connection.
    async fn close(&self) -> Result<(), BrokerError>;

    /// Try reconnecting in the event of some sort of connection error.
    async fn reconnect(&self, connection_timeout: u32) -> Result<(), BrokerError>;

    #[cfg(test)]
    fn into_any(self: Box<Self>) -> Box<dyn Any>;
}

/// A [`BrokerBuilder`] is used to create a type of broker with a custom configuration.
#[async_trait]
pub trait BrokerBuilder: Send + Sync {
    /// Create a new `BrokerBuilder`.
    fn new(broker_url: &str) -> Self
    where
        Self: Sized;

    /// Set the prefetch count.
    fn prefetch_count(self: Box<Self>, prefetch_count: u16) -> Box<dyn BrokerBuilder>;

    /// Declare a queue.
    fn declare_queue(self: Box<Self>, name: &str) -> Box<dyn BrokerBuilder>;

    /// Set the heartbeat.
    fn heartbeat(self: Box<Self>, heartbeat: Option<u16>) -> Box<dyn BrokerBuilder>;

    /// Construct the `Broker` with the given configuration.
    async fn build(&self, connection_timeout: u32) -> Result<Box<dyn Broker>, BrokerError>;
}

pub(crate) fn broker_builder_from_url(broker_url: &str) -> Box<dyn BrokerBuilder> {
    match broker_url.split_once("://") {
        Some(("amqp", _)) => Box::new(AMQPBrokerBuilder::new(broker_url)),
        Some(("redis", _)) => Box::new(RedisBrokerBuilder::new(broker_url)),
        #[cfg(test)]
        Some(("mock", _)) => Box::new(mock::MockBrokerBuilder::new(broker_url)),
        _ => panic!("Unsupported broker"),
    }
}

// TODO: this function consumes the broker_builder, which results in a not so ergonomic API.
// Can it be improved?
/// A utility function to configure the task routes on a broker builder.
pub(crate) fn configure_task_routes(
    mut broker_builder: Box<dyn BrokerBuilder>,
    task_routes: &[(String, String)],
) -> Result<(Box<dyn BrokerBuilder>, Vec<Rule>), BrokerError> {
    let mut rules: Vec<Rule> = Vec::with_capacity(task_routes.len());
    for (pattern, queue) in task_routes {
        let rule = Rule::new(pattern, queue)?;
        rules.push(rule);
        // Ensure all other queues mentioned in task_routes are declared to the broker.
        broker_builder = broker_builder.declare_queue(queue);
    }

    Ok((broker_builder, rules))
}

/// A utility function that can be used to build a broker
/// and initialize the connection.
pub(crate) async fn build_and_connect(
    broker_builder: Box<dyn BrokerBuilder>,
    connection_timeout: u32,
    connection_max_retries: u32,
    connection_retry_delay: u32,
) -> Result<Box<dyn Broker>, BrokerError> {
    let mut broker: Option<Box<dyn Broker>> = None;

    for _ in 0..connection_max_retries {
        match broker_builder.build(connection_timeout).await {
            Err(err) => {
                if err.is_connection_error() {
                    error!("{}", err);
                    error!(
                        "Failed to establish connection with broker, trying again in {}s...",
                        connection_retry_delay
                    );
                    time::sleep(Duration::from_secs(connection_retry_delay as u64)).await;
                    continue;
                }
                return Err(err);
            }
            Ok(b) => {
                broker = Some(b);
                break;
            }
        };
    }

    broker.ok_or_else(|| {
        error!("Failed to establish connection with broker");
        BrokerError::NotConnected
    })
}