1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
//! Celery [`Beat`] is an app that can automatically produce tasks at scheduled times.
//!
//! ### Terminology
//!
//! This is the terminology used in this module (with references to the corresponding names
//! in the Python implementation):
//! - schedule: the strategy used to decide when a task must be executed (each scheduled
//!   task has its own schedule);
//! - scheduled task: a task together with its schedule (it more or less corresponds to
//!   a *schedule entry* in Python);
//! - scheduler: the component in charge of keeping track of tasks to execute;
//! - scheduler backend: the component that updates the internal state of the scheduler according to
//!   to an external source of truth (e.g., a database); there is no equivalent in Python,
//!   due to the fact that another pattern is used (see below);
//! - beat: the service that drives the execution, calling the appropriate
//!   methods of the scheduler in an infinite loop (called just *service* in Python).
//!
//! The main difference with the architecture used in Python is that in Python
//! there is a base scheduler class which contains the scheduling logic, then different
//! implementations use different strategies to synchronize the scheduler.
//! Here instead we have only one scheduler struct, and the different backends
//! correspond to the different scheduler implementations in Python.

use crate::broker::{
    broker_builder_from_url, build_and_connect, configure_task_routes, BrokerBuilder,
};
use crate::routing::{self, Rule};
use crate::{
    error::{BeatError, BrokerError},
    protocol::MessageContentType,
    task::{Signature, Task, TaskOptions},
};
use log::{debug, error, info};
use std::time::SystemTime;
use tokio::time::{self, Duration};

mod scheduler;
pub use scheduler::Scheduler;

mod backend;
pub use backend::{LocalSchedulerBackend, SchedulerBackend};

mod schedule;
pub use schedule::{CronSchedule, DeltaSchedule, Schedule};

mod scheduled_task;
pub use scheduled_task::ScheduledTask;

struct Config {
    name: String,
    broker_builder: Box<dyn BrokerBuilder>,
    broker_connection_timeout: u32,
    broker_connection_retry: bool,
    broker_connection_max_retries: u32,
    broker_connection_retry_delay: u32,
    default_queue: String,
    task_routes: Vec<(String, String)>,
    task_options: TaskOptions,
    max_sleep_duration: Option<Duration>,
}

/// Used to create a [`Beat`] app with a custom configuration.
pub struct BeatBuilder<Sb>
where
    Sb: SchedulerBackend,
{
    config: Config,
    scheduler_backend: Sb,
}

impl BeatBuilder<LocalSchedulerBackend> {
    /// Get a `BeatBuilder` for creating a `Beat` app with a default scheduler backend
    /// and a custom configuration.
    pub fn with_default_scheduler_backend(name: &str, broker_url: &str) -> Self {
        Self {
            config: Config {
                name: name.into(),
                broker_builder: broker_builder_from_url(broker_url),
                broker_connection_timeout: 2,
                broker_connection_retry: true,
                broker_connection_max_retries: 5,
                broker_connection_retry_delay: 5,
                default_queue: "celery".into(),
                task_routes: vec![],
                task_options: TaskOptions::default(),
                max_sleep_duration: None,
            },
            scheduler_backend: LocalSchedulerBackend::new(),
        }
    }
}

impl<Sb> BeatBuilder<Sb>
where
    Sb: SchedulerBackend,
{
    /// Get a `BeatBuilder` for creating a `Beat` app with a custom scheduler backend and
    /// a custom configuration.
    pub fn with_custom_scheduler_backend(
        name: &str,
        broker_url: &str,
        scheduler_backend: Sb,
    ) -> Self {
        Self {
            config: Config {
                name: name.into(),
                broker_builder: broker_builder_from_url(broker_url),
                broker_connection_timeout: 2,
                broker_connection_retry: true,
                broker_connection_max_retries: 5,
                broker_connection_retry_delay: 5,
                default_queue: "celery".into(),
                task_routes: vec![],
                task_options: TaskOptions::default(),
                max_sleep_duration: None,
            },
            scheduler_backend,
        }
    }

    /// Set the name of the default queue to something other than "celery".
    pub fn default_queue(mut self, queue_name: &str) -> Self {
        self.config.default_queue = queue_name.into();
        self
    }

    /// Set the broker heartbeat. The default value depends on the broker implementation.
    pub fn heartbeat(mut self, heartbeat: Option<u16>) -> Self {
        self.config.broker_builder = self.config.broker_builder.heartbeat(heartbeat);
        self
    }

    /// Add a routing rule.
    pub fn task_route(mut self, pattern: &str, queue: &str) -> Self {
        self.config.task_routes.push((pattern.into(), queue.into()));
        self
    }

    /// Set a timeout in seconds before giving up establishing a connection to a broker.
    pub fn broker_connection_timeout(mut self, timeout: u32) -> Self {
        self.config.broker_connection_timeout = timeout;
        self
    }

    /// Set whether or not to automatically try to re-establish connection to the AMQP broker.
    pub fn broker_connection_retry(mut self, retry: bool) -> Self {
        self.config.broker_connection_retry = retry;
        self
    }

    /// Set the maximum number of retries before we give up trying to re-establish connection
    /// to the AMQP broker.
    pub fn broker_connection_max_retries(mut self, max_retries: u32) -> Self {
        self.config.broker_connection_max_retries = max_retries;
        self
    }

    /// Set the number of seconds to wait before re-trying the connection with the broker.
    pub fn broker_connection_retry_delay(mut self, retry_delay: u32) -> Self {
        self.config.broker_connection_retry_delay = retry_delay;
        self
    }

    /// Set a default content type of the message body serialization.
    pub fn task_content_type(mut self, content_type: MessageContentType) -> Self {
        self.config.task_options.content_type = Some(content_type);
        self
    }

    /// Set a maximum sleep duration, which limits the amount of time that
    /// can pass between ticks. This is useful to ensure that the scheduler backend
    /// implementation is called regularly.
    pub fn max_sleep_duration(mut self, max_sleep_duration: Duration) -> Self {
        self.config.max_sleep_duration = Some(max_sleep_duration);
        self
    }

    /// Construct a `Beat` app with the current configuration.
    pub async fn build(self) -> Result<Beat<Sb>, BeatError> {
        // Declare default queue to broker.
        let broker_builder = self
            .config
            .broker_builder
            .declare_queue(&self.config.default_queue);

        let (broker_builder, task_routes) =
            configure_task_routes(broker_builder, &self.config.task_routes)?;

        let broker = build_and_connect(
            broker_builder,
            self.config.broker_connection_timeout,
            if self.config.broker_connection_retry {
                self.config.broker_connection_max_retries
            } else {
                0
            },
            self.config.broker_connection_retry_delay,
        )
        .await?;

        let scheduler = Scheduler::new(broker);

        Ok(Beat {
            name: self.config.name,
            scheduler,
            scheduler_backend: self.scheduler_backend,
            task_routes,
            default_queue: self.config.default_queue,
            task_options: self.config.task_options,
            broker_connection_timeout: self.config.broker_connection_timeout,
            broker_connection_retry: self.config.broker_connection_retry,
            broker_connection_max_retries: self.config.broker_connection_max_retries,
            broker_connection_retry_delay: self.config.broker_connection_retry_delay,
            max_sleep_duration: self.config.max_sleep_duration,
        })
    }
}

/// A [`Beat`] app is used to send out scheduled tasks. This is the struct that is
/// created with the [`beat!`](crate::beat!) macro.
///
/// It drives execution by making the internal scheduler "tick", and updates the list of scheduled
/// tasks through a customizable scheduler backend.
pub struct Beat<Sb: SchedulerBackend> {
    pub name: String,
    pub scheduler: Scheduler,
    pub scheduler_backend: Sb,

    task_routes: Vec<Rule>,
    default_queue: String,
    task_options: TaskOptions,

    broker_connection_timeout: u32,
    broker_connection_retry: bool,
    broker_connection_max_retries: u32,
    broker_connection_retry_delay: u32,

    max_sleep_duration: Option<Duration>,
}

impl Beat<LocalSchedulerBackend> {
    /// Get a `BeatBuilder` for creating a `Beat` app with a custom configuration and a
    /// default scheduler backend.
    pub fn default_builder(name: &str, broker_url: &str) -> BeatBuilder<LocalSchedulerBackend> {
        BeatBuilder::<LocalSchedulerBackend>::with_default_scheduler_backend(name, broker_url)
    }
}

impl<Sb> Beat<Sb>
where
    Sb: SchedulerBackend,
{
    /// Get a `BeatBuilder` for creating a `Beat` app with a custom configuration and
    /// a custom scheduler backend.
    pub fn custom_builder(name: &str, broker_url: &str, scheduler_backend: Sb) -> BeatBuilder<Sb> {
        BeatBuilder::<Sb>::with_custom_scheduler_backend(name, broker_url, scheduler_backend)
    }

    /// Schedule the execution of a task.
    pub fn schedule_task<T, S>(&mut self, signature: Signature<T>, schedule: S)
    where
        T: Task + Clone + 'static,
        S: Schedule + 'static,
    {
        self.schedule_named_task(Signature::<T>::task_name().to_string(), signature, schedule);
    }

    /// Schedule the execution of a task with the given `name`.
    pub fn schedule_named_task<T, S>(
        &mut self,
        name: String,
        mut signature: Signature<T>,
        schedule: S,
    ) where
        T: Task + Clone + 'static,
        S: Schedule + 'static,
    {
        signature.options.update(&self.task_options);
        let queue = match &signature.queue {
            Some(queue) => queue.to_string(),
            None => routing::route(T::NAME, &self.task_routes)
                .unwrap_or(&self.default_queue)
                .to_string(),
        };
        let message_factory = Box::new(signature);

        self.scheduler
            .schedule_task(name, message_factory, queue, schedule);
    }

    /// Start the *beat*.
    pub async fn start(&mut self) -> Result<(), BeatError> {
        info!("Starting beat service");
        loop {
            let result = self.beat_loop().await;
            if !self.broker_connection_retry {
                return result;
            }

            if let Err(err) = result {
                match err {
                    BeatError::BrokerError(broker_err) => {
                        if broker_err.is_connection_error() {
                            error!("Broker connection failed");
                        } else {
                            return Err(BeatError::BrokerError(broker_err));
                        }
                    }
                    _ => return Err(err),
                };
            } else {
                return result;
            }

            let mut reconnect_successful: bool = false;
            for _ in 0..self.broker_connection_max_retries {
                info!("Trying to re-establish connection with broker");
                time::sleep(Duration::from_secs(
                    self.broker_connection_retry_delay as u64,
                ))
                .await;

                match self
                    .scheduler
                    .broker
                    .reconnect(self.broker_connection_timeout)
                    .await
                {
                    Err(err) => {
                        if err.is_connection_error() {
                            continue;
                        }
                        return Err(BeatError::BrokerError(err));
                    }
                    Ok(_) => {
                        info!("Successfully reconnected with broker");
                        reconnect_successful = true;
                        break;
                    }
                };
            }

            if !reconnect_successful {
                return Err(BeatError::BrokerError(BrokerError::NotConnected));
            }
        }
    }

    async fn beat_loop(&mut self) -> Result<(), BeatError> {
        loop {
            let next_tick_at = self.scheduler.tick().await?;

            if self.scheduler_backend.should_sync() {
                self.scheduler_backend
                    .sync(self.scheduler.get_scheduled_tasks())?;
            }

            let now = SystemTime::now();
            if now < next_tick_at {
                let sleep_interval = next_tick_at.duration_since(now).expect(
                    "Unexpected error when unwrapping a SystemTime comparison that is not supposed to fail",
                );
                let sleep_interval = match &self.max_sleep_duration {
                    Some(max_sleep_duration) => std::cmp::min(sleep_interval, *max_sleep_duration),
                    None => sleep_interval,
                };
                debug!("Now sleeping for {:?}", sleep_interval);
                time::sleep(sleep_interval).await;
            }
        }
    }
}

#[cfg(test)]
mod tests;