celery/beat/
mod.rs

1//! Celery [`Beat`] is an app that can automatically produce tasks at scheduled times.
2//!
3//! ### Terminology
4//!
5//! This is the terminology used in this module (with references to the corresponding names
6//! in the Python implementation):
7//! - schedule: the strategy used to decide when a task must be executed (each scheduled
8//!   task has its own schedule);
9//! - scheduled task: a task together with its schedule (it more or less corresponds to
10//!   a *schedule entry* in Python);
11//! - scheduler: the component in charge of keeping track of tasks to execute;
12//! - scheduler backend: the component that updates the internal state of the scheduler according to
13//!   to an external source of truth (e.g., a database); there is no equivalent in Python,
14//!   due to the fact that another pattern is used (see below);
15//! - beat: the service that drives the execution, calling the appropriate
16//!   methods of the scheduler in an infinite loop (called just *service* in Python).
17//!
18//! The main difference with the architecture used in Python is that in Python
19//! there is a base scheduler class which contains the scheduling logic, then different
20//! implementations use different strategies to synchronize the scheduler.
21//! Here instead we have only one scheduler struct, and the different backends
22//! correspond to the different scheduler implementations in Python.
23
24use crate::broker::{
25    broker_builder_from_url, build_and_connect, configure_task_routes, BrokerBuilder,
26};
27use crate::routing::{self, Rule};
28use crate::{
29    error::{BeatError, BrokerError},
30    protocol::MessageContentType,
31    task::{Signature, Task, TaskOptions},
32};
33use log::{debug, error, info};
34use std::time::SystemTime;
35use tokio::time::{self, Duration};
36
37mod scheduler;
38pub use scheduler::Scheduler;
39
40mod backend;
41pub use backend::{LocalSchedulerBackend, SchedulerBackend};
42
43mod schedule;
44pub use schedule::{CronSchedule, DeltaSchedule, Schedule};
45
46mod scheduled_task;
47pub use scheduled_task::ScheduledTask;
48
49struct Config {
50    name: String,
51    broker_builder: Box<dyn BrokerBuilder>,
52    broker_connection_timeout: u32,
53    broker_connection_retry: bool,
54    broker_connection_max_retries: u32,
55    broker_connection_retry_delay: u32,
56    default_queue: String,
57    task_routes: Vec<(String, String)>,
58    task_options: TaskOptions,
59    max_sleep_duration: Option<Duration>,
60}
61
62/// Used to create a [`Beat`] app with a custom configuration.
63pub struct BeatBuilder<Sb>
64where
65    Sb: SchedulerBackend,
66{
67    config: Config,
68    scheduler_backend: Sb,
69}
70
71impl BeatBuilder<LocalSchedulerBackend> {
72    /// Get a `BeatBuilder` for creating a `Beat` app with a default scheduler backend
73    /// and a custom configuration.
74    pub fn with_default_scheduler_backend(name: &str, broker_url: &str) -> Self {
75        Self {
76            config: Config {
77                name: name.into(),
78                broker_builder: broker_builder_from_url(broker_url),
79                broker_connection_timeout: 2,
80                broker_connection_retry: true,
81                broker_connection_max_retries: 5,
82                broker_connection_retry_delay: 5,
83                default_queue: "celery".into(),
84                task_routes: vec![],
85                task_options: TaskOptions::default(),
86                max_sleep_duration: None,
87            },
88            scheduler_backend: LocalSchedulerBackend::new(),
89        }
90    }
91}
92
93impl<Sb> BeatBuilder<Sb>
94where
95    Sb: SchedulerBackend,
96{
97    /// Get a `BeatBuilder` for creating a `Beat` app with a custom scheduler backend and
98    /// a custom configuration.
99    pub fn with_custom_scheduler_backend(
100        name: &str,
101        broker_url: &str,
102        scheduler_backend: Sb,
103    ) -> Self {
104        Self {
105            config: Config {
106                name: name.into(),
107                broker_builder: broker_builder_from_url(broker_url),
108                broker_connection_timeout: 2,
109                broker_connection_retry: true,
110                broker_connection_max_retries: 5,
111                broker_connection_retry_delay: 5,
112                default_queue: "celery".into(),
113                task_routes: vec![],
114                task_options: TaskOptions::default(),
115                max_sleep_duration: None,
116            },
117            scheduler_backend,
118        }
119    }
120
121    /// Set the name of the default queue to something other than "celery".
122    pub fn default_queue(mut self, queue_name: &str) -> Self {
123        self.config.default_queue = queue_name.into();
124        self
125    }
126
127    /// Set the broker heartbeat. The default value depends on the broker implementation.
128    pub fn heartbeat(mut self, heartbeat: Option<u16>) -> Self {
129        self.config.broker_builder = self.config.broker_builder.heartbeat(heartbeat);
130        self
131    }
132
133    /// Add a routing rule.
134    pub fn task_route(mut self, pattern: &str, queue: &str) -> Self {
135        self.config.task_routes.push((pattern.into(), queue.into()));
136        self
137    }
138
139    /// Set a timeout in seconds before giving up establishing a connection to a broker.
140    pub fn broker_connection_timeout(mut self, timeout: u32) -> Self {
141        self.config.broker_connection_timeout = timeout;
142        self
143    }
144
145    /// Set whether or not to automatically try to re-establish connection to the AMQP broker.
146    pub fn broker_connection_retry(mut self, retry: bool) -> Self {
147        self.config.broker_connection_retry = retry;
148        self
149    }
150
151    /// Set the maximum number of retries before we give up trying to re-establish connection
152    /// to the AMQP broker.
153    pub fn broker_connection_max_retries(mut self, max_retries: u32) -> Self {
154        self.config.broker_connection_max_retries = max_retries;
155        self
156    }
157
158    /// Set the number of seconds to wait before re-trying the connection with the broker.
159    pub fn broker_connection_retry_delay(mut self, retry_delay: u32) -> Self {
160        self.config.broker_connection_retry_delay = retry_delay;
161        self
162    }
163
164    /// Set a default content type of the message body serialization.
165    pub fn task_content_type(mut self, content_type: MessageContentType) -> Self {
166        self.config.task_options.content_type = Some(content_type);
167        self
168    }
169
170    /// Set a maximum sleep duration, which limits the amount of time that
171    /// can pass between ticks. This is useful to ensure that the scheduler backend
172    /// implementation is called regularly.
173    pub fn max_sleep_duration(mut self, max_sleep_duration: Duration) -> Self {
174        self.config.max_sleep_duration = Some(max_sleep_duration);
175        self
176    }
177
178    /// Construct a `Beat` app with the current configuration.
179    pub async fn build(self) -> Result<Beat<Sb>, BeatError> {
180        // Declare default queue to broker.
181        let broker_builder = self
182            .config
183            .broker_builder
184            .declare_queue(&self.config.default_queue);
185
186        let (broker_builder, task_routes) =
187            configure_task_routes(broker_builder, &self.config.task_routes)?;
188
189        let broker = build_and_connect(
190            broker_builder,
191            self.config.broker_connection_timeout,
192            if self.config.broker_connection_retry {
193                self.config.broker_connection_max_retries
194            } else {
195                0
196            },
197            self.config.broker_connection_retry_delay,
198        )
199        .await?;
200
201        let scheduler = Scheduler::new(broker);
202
203        Ok(Beat {
204            name: self.config.name,
205            scheduler,
206            scheduler_backend: self.scheduler_backend,
207            task_routes,
208            default_queue: self.config.default_queue,
209            task_options: self.config.task_options,
210            broker_connection_timeout: self.config.broker_connection_timeout,
211            broker_connection_retry: self.config.broker_connection_retry,
212            broker_connection_max_retries: self.config.broker_connection_max_retries,
213            broker_connection_retry_delay: self.config.broker_connection_retry_delay,
214            max_sleep_duration: self.config.max_sleep_duration,
215        })
216    }
217}
218
219/// A [`Beat`] app is used to send out scheduled tasks. This is the struct that is
220/// created with the [`beat!`](crate::beat!) macro.
221///
222/// It drives execution by making the internal scheduler "tick", and updates the list of scheduled
223/// tasks through a customizable scheduler backend.
224pub struct Beat<Sb: SchedulerBackend> {
225    pub name: String,
226    pub scheduler: Scheduler,
227    pub scheduler_backend: Sb,
228
229    task_routes: Vec<Rule>,
230    default_queue: String,
231    task_options: TaskOptions,
232
233    broker_connection_timeout: u32,
234    broker_connection_retry: bool,
235    broker_connection_max_retries: u32,
236    broker_connection_retry_delay: u32,
237
238    max_sleep_duration: Option<Duration>,
239}
240
241impl Beat<LocalSchedulerBackend> {
242    /// Get a `BeatBuilder` for creating a `Beat` app with a custom configuration and a
243    /// default scheduler backend.
244    pub fn default_builder(name: &str, broker_url: &str) -> BeatBuilder<LocalSchedulerBackend> {
245        BeatBuilder::<LocalSchedulerBackend>::with_default_scheduler_backend(name, broker_url)
246    }
247}
248
249impl<Sb> Beat<Sb>
250where
251    Sb: SchedulerBackend,
252{
253    /// Get a `BeatBuilder` for creating a `Beat` app with a custom configuration and
254    /// a custom scheduler backend.
255    pub fn custom_builder(name: &str, broker_url: &str, scheduler_backend: Sb) -> BeatBuilder<Sb> {
256        BeatBuilder::<Sb>::with_custom_scheduler_backend(name, broker_url, scheduler_backend)
257    }
258
259    /// Schedule the execution of a task.
260    pub fn schedule_task<T, S>(&mut self, signature: Signature<T>, schedule: S)
261    where
262        T: Task + Clone + 'static,
263        S: Schedule + 'static,
264    {
265        self.schedule_named_task(Signature::<T>::task_name().to_string(), signature, schedule);
266    }
267
268    /// Schedule the execution of a task with the given `name`.
269    pub fn schedule_named_task<T, S>(
270        &mut self,
271        name: String,
272        mut signature: Signature<T>,
273        schedule: S,
274    ) where
275        T: Task + Clone + 'static,
276        S: Schedule + 'static,
277    {
278        signature.options.update(&self.task_options);
279        let queue = match &signature.queue {
280            Some(queue) => queue.to_string(),
281            None => routing::route(T::NAME, &self.task_routes)
282                .unwrap_or(&self.default_queue)
283                .to_string(),
284        };
285        let message_factory = Box::new(signature);
286
287        self.scheduler
288            .schedule_task(name, message_factory, queue, schedule);
289    }
290
291    /// Start the *beat*.
292    pub async fn start(&mut self) -> Result<(), BeatError> {
293        info!("Starting beat service");
294        loop {
295            let result = self.beat_loop().await;
296            if !self.broker_connection_retry {
297                return result;
298            }
299
300            if let Err(err) = result {
301                match err {
302                    BeatError::BrokerError(broker_err) => {
303                        if broker_err.is_connection_error() {
304                            error!("Broker connection failed");
305                        } else {
306                            return Err(BeatError::BrokerError(broker_err));
307                        }
308                    }
309                    _ => return Err(err),
310                };
311            } else {
312                return result;
313            }
314
315            let mut reconnect_successful: bool = false;
316            for _ in 0..self.broker_connection_max_retries {
317                info!("Trying to re-establish connection with broker");
318                time::sleep(Duration::from_secs(
319                    self.broker_connection_retry_delay as u64,
320                ))
321                .await;
322
323                match self
324                    .scheduler
325                    .broker
326                    .reconnect(self.broker_connection_timeout)
327                    .await
328                {
329                    Err(err) => {
330                        if err.is_connection_error() {
331                            continue;
332                        }
333                        return Err(BeatError::BrokerError(err));
334                    }
335                    Ok(_) => {
336                        info!("Successfully reconnected with broker");
337                        reconnect_successful = true;
338                        break;
339                    }
340                };
341            }
342
343            if !reconnect_successful {
344                return Err(BeatError::BrokerError(BrokerError::NotConnected));
345            }
346        }
347    }
348
349    async fn beat_loop(&mut self) -> Result<(), BeatError> {
350        loop {
351            let next_tick_at = self.scheduler.tick().await?;
352
353            if self.scheduler_backend.should_sync() {
354                self.scheduler_backend
355                    .sync(self.scheduler.get_scheduled_tasks())?;
356            }
357
358            let now = SystemTime::now();
359            if now < next_tick_at {
360                let sleep_interval = next_tick_at.duration_since(now).expect(
361                    "Unexpected error when unwrapping a SystemTime comparison that is not supposed to fail",
362                );
363                let sleep_interval = match &self.max_sleep_duration {
364                    Some(max_sleep_duration) => std::cmp::min(sleep_interval, *max_sleep_duration),
365                    None => sleep_interval,
366                };
367                debug!("Now sleeping for {:?}", sleep_interval);
368                time::sleep(sleep_interval).await;
369            }
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests;