Skip to main content

celery/beat/
backend.rs

1/// This module contains the definition of application-provided scheduler backends.
2use super::scheduled_task::ScheduledTask;
3use crate::error::BeatError;
4use std::{collections::BinaryHeap, future::Future, pin::Pin, time::Duration};
5
6mod redis;
7pub use redis::{RedisBackendConfig, RedisSchedulerBackend};
8
9/// A `SchedulerBackend` is in charge of keeping track of the internal state of the scheduler
10/// according to some source of truth, such as a database.
11///
12/// The default scheduler backend, [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html),
13/// doesn't do any external synchronization, so the source of truth is just the locally defined
14/// schedules.
15pub trait SchedulerBackend: Send {
16    /// Check whether the internal state of the scheduler should be synchronized.
17    /// If this method returns `true`, then `sync` will be called as soon as possible.
18    fn should_sync(&self) -> bool;
19
20    /// Synchronize the internal state of the scheduler.
21    ///
22    /// This method is called in the pauses between scheduled tasks. Synchronization should
23    /// be as quick as possible, as it may otherwise delay the execution of due tasks.
24    /// If synchronization is slow, it should be done incrementally (i.e., it should span
25    /// multiple calls to `sync`).
26    ///
27    /// This method will not be called if `should_sync` returns `false`.
28    fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError>;
29
30    /// Return a mutable reference to the distributed capability of this backend, if any.
31    ///
32    /// Backends that do not support distributed coordination can leave the default
33    /// implementation untouched, and the beat loop will fall back to single-instance
34    /// behaviour.
35    fn as_distributed(&mut self) -> Option<&mut dyn DistributedScheduler> {
36        None
37    }
38
39    // Maybe we should consider some methods to inform the backend that a task has been executed.
40    // Not sure about what Python does, but at least it keeps a counter with the number of executed tasks,
41    // and the backend has access to that.
42}
43
44pub struct TickDecision {
45    pub execute_tasks: bool,
46    pub sleep_hint: Option<Duration>,
47}
48
49impl TickDecision {
50    pub fn execute() -> Self {
51        TickDecision {
52            execute_tasks: true,
53            sleep_hint: None,
54        }
55    }
56
57    pub fn execute_with_hint(sleep_hint: Duration) -> Self {
58        TickDecision {
59            execute_tasks: true,
60            sleep_hint: Some(sleep_hint),
61        }
62    }
63
64    pub fn skip(sleep_hint: Duration) -> Self {
65        TickDecision {
66            execute_tasks: false,
67            sleep_hint: Some(sleep_hint),
68        }
69    }
70}
71
72pub trait DistributedScheduler: Send {
73    fn before_tick<'a>(
74        &'a mut self,
75    ) -> Pin<Box<dyn Future<Output = Result<TickDecision, BeatError>> + Send + 'a>>;
76
77    fn after_tick<'a>(
78        &'a mut self,
79        scheduled_tasks: &'a mut BinaryHeap<ScheduledTask>,
80    ) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + Send + 'a>>;
81
82    fn shutdown<'a>(
83        &'a mut self,
84    ) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + Send + 'a>> {
85        Box::pin(async { Ok(()) })
86    }
87}
88
89/// The default [`SchedulerBackend`](trait.SchedulerBackend.html).
90pub struct LocalSchedulerBackend {}
91
92#[allow(clippy::new_without_default)]
93impl LocalSchedulerBackend {
94    pub fn new() -> Self {
95        Self {}
96    }
97}
98
99impl SchedulerBackend for LocalSchedulerBackend {
100    fn should_sync(&self) -> bool {
101        false
102    }
103
104    #[allow(unused_variables)]
105    fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
106        unimplemented!()
107    }
108}