celery/beat/
mod.rs

1//! Celery [`Beat`] is an app that can automatically produce tasks at scheduled times.
2//!
3//! ### Terminology
4//!
5//! This is the terminology used in this module (with references to the corresponding names
6//! in the Python implementation):
7//! - schedule: the strategy used to decide when a task must be executed (each scheduled
8//!   task has its own schedule);
9//! - scheduled task: a task together with its schedule (it more or less corresponds to
10//!   a *schedule entry* in Python);
11//! - scheduler: the component in charge of keeping track of tasks to execute;
12//! - scheduler backend: the component that updates the internal state of the scheduler according to
13//!   to an external source of truth (e.g., a database); there is no equivalent in Python,
14//!   due to the fact that another pattern is used (see below);
15//! - beat: the service that drives the execution, calling the appropriate
16//!   methods of the scheduler in an infinite loop (called just *service* in Python).
17//!
18//! The main difference with the architecture used in Python is that in Python
19//! there is a base scheduler class which contains the scheduling logic, then different
20//! implementations use different strategies to synchronize the scheduler.
21//! Here instead we have only one scheduler struct, and the different backends
22//! correspond to the different scheduler implementations in Python.
23
24use crate::broker::{
25    broker_builder_from_url, build_and_connect, configure_task_routes, BrokerBuilder,
26};
27use crate::routing::{self, Rule};
28use crate::{
29    error::{BeatError, BrokerError},
30    protocol::MessageContentType,
31    task::{Signature, Task, TaskOptions},
32};
33use log::{debug, error, info};
34use std::time::SystemTime;
35use tokio::time::{self, Duration};
36
37mod scheduler;
38pub use scheduler::Scheduler;
39
40mod backend;
41pub use backend::{
42    LocalSchedulerBackend, RedisBackendConfig, RedisSchedulerBackend, SchedulerBackend,
43};
44
45mod schedule;
46pub use schedule::{CronSchedule, DeltaSchedule, Schedule, ScheduleDescriptor};
47
48mod scheduled_task;
49pub use scheduled_task::ScheduledTask;
50
51struct Config {
52    name: String,
53    broker_builder: Box<dyn BrokerBuilder>,
54    broker_connection_timeout: u32,
55    broker_connection_retry: bool,
56    broker_connection_max_retries: u32,
57    broker_connection_retry_delay: u32,
58    default_queue: String,
59    task_routes: Vec<(String, String)>,
60    task_options: TaskOptions,
61    max_sleep_duration: Option<Duration>,
62}
63
64/// Used to create a [`Beat`] app with a custom configuration.
65///
66/// # Choosing a backend
67///
68/// - [`LocalSchedulerBackend`] keeps all scheduling state in memory and is intended for
69///   single-instance deployments.
70/// - [`RedisSchedulerBackend`] persists state in Redis and coordinates leadership across
71///   multiple beat instances using Redis locks. When using the Redis backend you can further
72///   tweak [`RedisBackendConfig`] to customise the lock key prefix, lock timeout, renewal
73///   interval, follower polling interval, etc.
74pub struct BeatBuilder<Sb>
75where
76    Sb: SchedulerBackend,
77{
78    config: Config,
79    scheduler_backend: Sb,
80}
81
82impl BeatBuilder<LocalSchedulerBackend> {
83    /// Get a `BeatBuilder` for creating a `Beat` app with a default scheduler backend
84    /// and a custom configuration.
85    pub fn with_default_scheduler_backend(name: &str, broker_url: &str) -> Self {
86        Self {
87            config: Config {
88                name: name.into(),
89                broker_builder: broker_builder_from_url(broker_url),
90                broker_connection_timeout: 2,
91                broker_connection_retry: true,
92                broker_connection_max_retries: 5,
93                broker_connection_retry_delay: 5,
94                default_queue: "celery".into(),
95                task_routes: vec![],
96                task_options: TaskOptions::default(),
97                max_sleep_duration: None,
98            },
99            scheduler_backend: LocalSchedulerBackend::new(),
100        }
101    }
102}
103
104impl<Sb> BeatBuilder<Sb>
105where
106    Sb: SchedulerBackend,
107{
108    /// Get a `BeatBuilder` for creating a `Beat` app with a custom scheduler backend and
109    /// a custom configuration.
110    pub fn with_custom_scheduler_backend(
111        name: &str,
112        broker_url: &str,
113        scheduler_backend: Sb,
114    ) -> Self {
115        Self {
116            config: Config {
117                name: name.into(),
118                broker_builder: broker_builder_from_url(broker_url),
119                broker_connection_timeout: 2,
120                broker_connection_retry: true,
121                broker_connection_max_retries: 5,
122                broker_connection_retry_delay: 5,
123                default_queue: "celery".into(),
124                task_routes: vec![],
125                task_options: TaskOptions::default(),
126                max_sleep_duration: None,
127            },
128            scheduler_backend,
129        }
130    }
131
132    /// Set the name of the default queue to something other than "celery".
133    pub fn default_queue(mut self, queue_name: &str) -> Self {
134        self.config.default_queue = queue_name.into();
135        self
136    }
137
138    /// Set the broker heartbeat. The default value depends on the broker implementation.
139    pub fn heartbeat(mut self, heartbeat: Option<u16>) -> Self {
140        self.config.broker_builder = self.config.broker_builder.heartbeat(heartbeat);
141        self
142    }
143
144    /// Add a routing rule.
145    pub fn task_route(mut self, pattern: &str, queue: &str) -> Self {
146        self.config.task_routes.push((pattern.into(), queue.into()));
147        self
148    }
149
150    /// Set a timeout in seconds before giving up establishing a connection to a broker.
151    pub fn broker_connection_timeout(mut self, timeout: u32) -> Self {
152        self.config.broker_connection_timeout = timeout;
153        self
154    }
155
156    /// Set whether or not to automatically try to re-establish connection to the AMQP broker.
157    pub fn broker_connection_retry(mut self, retry: bool) -> Self {
158        self.config.broker_connection_retry = retry;
159        self
160    }
161
162    /// Set the maximum number of retries before we give up trying to re-establish connection
163    /// to the AMQP broker.
164    pub fn broker_connection_max_retries(mut self, max_retries: u32) -> Self {
165        self.config.broker_connection_max_retries = max_retries;
166        self
167    }
168
169    /// Set the number of seconds to wait before re-trying the connection with the broker.
170    pub fn broker_connection_retry_delay(mut self, retry_delay: u32) -> Self {
171        self.config.broker_connection_retry_delay = retry_delay;
172        self
173    }
174
175    /// Set a default content type of the message body serialization.
176    pub fn task_content_type(mut self, content_type: MessageContentType) -> Self {
177        self.config.task_options.content_type = Some(content_type);
178        self
179    }
180
181    /// Set a maximum sleep duration, which limits the amount of time that
182    /// can pass between ticks. This is useful to ensure that the scheduler backend
183    /// implementation is called regularly.
184    pub fn max_sleep_duration(mut self, max_sleep_duration: Duration) -> Self {
185        self.config.max_sleep_duration = Some(max_sleep_duration);
186        self
187    }
188
189    /// Construct a `Beat` app with the current configuration.
190    pub async fn build(self) -> Result<Beat<Sb>, BeatError> {
191        // Declare default queue to broker.
192        let broker_builder = self
193            .config
194            .broker_builder
195            .declare_queue(&self.config.default_queue);
196
197        let (broker_builder, task_routes) =
198            configure_task_routes(broker_builder, &self.config.task_routes)?;
199
200        let broker = build_and_connect(
201            broker_builder,
202            self.config.broker_connection_timeout,
203            if self.config.broker_connection_retry {
204                self.config.broker_connection_max_retries
205            } else {
206                0
207            },
208            self.config.broker_connection_retry_delay,
209        )
210        .await?;
211
212        let scheduler = Scheduler::new(broker);
213
214        Ok(Beat {
215            name: self.config.name,
216            scheduler,
217            scheduler_backend: self.scheduler_backend,
218            task_routes,
219            default_queue: self.config.default_queue,
220            task_options: self.config.task_options,
221            broker_connection_timeout: self.config.broker_connection_timeout,
222            broker_connection_retry: self.config.broker_connection_retry,
223            broker_connection_max_retries: self.config.broker_connection_max_retries,
224            broker_connection_retry_delay: self.config.broker_connection_retry_delay,
225            max_sleep_duration: self.config.max_sleep_duration,
226        })
227    }
228}
229
230/// A [`Beat`] app is used to send out scheduled tasks. This is the struct that is
231/// created with the [`beat!`](crate::beat!) macro.
232///
233/// It drives execution by making the internal scheduler "tick", and updates the list of scheduled
234/// tasks through a customizable scheduler backend.
235pub struct Beat<Sb: SchedulerBackend> {
236    pub name: String,
237    pub scheduler: Scheduler,
238    pub scheduler_backend: Sb,
239
240    task_routes: Vec<Rule>,
241    default_queue: String,
242    task_options: TaskOptions,
243
244    broker_connection_timeout: u32,
245    broker_connection_retry: bool,
246    broker_connection_max_retries: u32,
247    broker_connection_retry_delay: u32,
248
249    max_sleep_duration: Option<Duration>,
250}
251
252impl Beat<LocalSchedulerBackend> {
253    /// Get a `BeatBuilder` for creating a `Beat` app with a custom configuration and a
254    /// default scheduler backend.
255    pub fn default_builder(name: &str, broker_url: &str) -> BeatBuilder<LocalSchedulerBackend> {
256        BeatBuilder::<LocalSchedulerBackend>::with_default_scheduler_backend(name, broker_url)
257    }
258}
259
260impl<Sb> Beat<Sb>
261where
262    Sb: SchedulerBackend,
263{
264    /// Get a `BeatBuilder` for creating a `Beat` app with a custom configuration and
265    /// a custom scheduler backend.
266    pub fn custom_builder(name: &str, broker_url: &str, scheduler_backend: Sb) -> BeatBuilder<Sb> {
267        BeatBuilder::<Sb>::with_custom_scheduler_backend(name, broker_url, scheduler_backend)
268    }
269
270    /// Schedule the execution of a task.
271    pub fn schedule_task<T, S>(&mut self, signature: Signature<T>, schedule: S)
272    where
273        T: Task + Clone + 'static,
274        S: Schedule + 'static,
275    {
276        self.schedule_named_task(Signature::<T>::task_name().to_string(), signature, schedule);
277    }
278
279    /// Schedule the execution of a task with the given `name`.
280    pub fn schedule_named_task<T, S>(
281        &mut self,
282        name: String,
283        mut signature: Signature<T>,
284        schedule: S,
285    ) where
286        T: Task + Clone + 'static,
287        S: Schedule + 'static,
288    {
289        signature.options.update(&self.task_options);
290        let queue = match &signature.queue {
291            Some(queue) => queue.to_string(),
292            None => routing::route(T::NAME, &self.task_routes)
293                .unwrap_or(&self.default_queue)
294                .to_string(),
295        };
296        let message_factory = Box::new(signature);
297
298        self.scheduler
299            .schedule_task(name, message_factory, queue, schedule);
300    }
301
302    /// Start the *beat*.
303    pub async fn start(&mut self) -> Result<(), BeatError> {
304        info!("Starting beat service");
305        loop {
306            let result = self.beat_loop().await;
307            if !self.broker_connection_retry {
308                return result;
309            }
310
311            if let Err(err) = result {
312                match err {
313                    BeatError::BrokerError(broker_err) => {
314                        if broker_err.is_connection_error() {
315                            error!("Broker connection failed");
316                        } else {
317                            return Err(BeatError::BrokerError(broker_err));
318                        }
319                    }
320                    _ => return Err(err),
321                };
322            } else {
323                return result;
324            }
325
326            let mut reconnect_successful: bool = false;
327            for _ in 0..self.broker_connection_max_retries {
328                info!("Trying to re-establish connection with broker");
329                time::sleep(Duration::from_secs(
330                    self.broker_connection_retry_delay as u64,
331                ))
332                .await;
333
334                match self
335                    .scheduler
336                    .broker
337                    .reconnect(self.broker_connection_timeout)
338                    .await
339                {
340                    Err(err) => {
341                        if err.is_connection_error() {
342                            continue;
343                        }
344                        return Err(BeatError::BrokerError(err));
345                    }
346                    Ok(_) => {
347                        info!("Successfully reconnected with broker");
348                        reconnect_successful = true;
349                        break;
350                    }
351                };
352            }
353
354            if !reconnect_successful {
355                return Err(BeatError::BrokerError(BrokerError::NotConnected));
356            }
357        }
358    }
359
360    async fn beat_loop(&mut self) -> Result<(), BeatError> {
361        loop {
362            let mut sleep_hint = None;
363            let execute_tasks = {
364                if let Some(distributed) = self.scheduler_backend.as_distributed() {
365                    let decision = distributed.before_tick().await?;
366                    sleep_hint = decision.sleep_hint;
367                    decision.execute_tasks
368                } else {
369                    true
370                }
371            };
372
373            let next_tick_at = if execute_tasks {
374                self.scheduler.tick().await?
375            } else {
376                let fallback = sleep_hint.unwrap_or_else(|| Duration::from_secs(1));
377                SystemTime::now() + fallback
378            };
379
380            let used_distributed =
381                if let Some(distributed) = self.scheduler_backend.as_distributed() {
382                    distributed
383                        .after_tick(self.scheduler.get_scheduled_tasks())
384                        .await?;
385                    true
386                } else {
387                    false
388                };
389
390            if !used_distributed && self.scheduler_backend.should_sync() {
391                self.scheduler_backend
392                    .sync(self.scheduler.get_scheduled_tasks())?;
393            }
394
395            let now = SystemTime::now();
396            let mut sleep_interval = if now < next_tick_at {
397                next_tick_at.duration_since(now).expect(
398                    "Unexpected error when unwrapping a SystemTime comparison that is not supposed to fail",
399                )
400            } else {
401                Duration::from_millis(0)
402            };
403
404            if !execute_tasks {
405                if let Some(hint) = sleep_hint {
406                    sleep_interval = hint;
407                }
408            } else if let Some(hint) = sleep_hint {
409                sleep_interval = std::cmp::min(sleep_interval, hint);
410            }
411
412            if let Some(max_sleep_duration) = &self.max_sleep_duration {
413                sleep_interval = std::cmp::min(sleep_interval, *max_sleep_duration);
414            }
415
416            if sleep_interval > Duration::from_millis(0) {
417                debug!("Now sleeping for {:?}", sleep_interval);
418                time::sleep(sleep_interval).await;
419            }
420        }
421    }
422}
423
424#[cfg(test)]
425mod tests;