celery/beat/backend.rs
1/// This module contains the definition of application-provided scheduler backends.
2use super::scheduled_task::ScheduledTask;
3use crate::error::BeatError;
4use std::{collections::BinaryHeap, future::Future, pin::Pin, time::Duration};
5
6mod redis;
7pub use redis::{RedisBackendConfig, RedisSchedulerBackend};
8
9/// A `SchedulerBackend` is in charge of keeping track of the internal state of the scheduler
10/// according to some source of truth, such as a database.
11///
12/// The default scheduler backend, [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html),
13/// doesn't do any external synchronization, so the source of truth is just the locally defined
14/// schedules.
15pub trait SchedulerBackend {
16 /// Check whether the internal state of the scheduler should be synchronized.
17 /// If this method returns `true`, then `sync` will be called as soon as possible.
18 fn should_sync(&self) -> bool;
19
20 /// Synchronize the internal state of the scheduler.
21 ///
22 /// This method is called in the pauses between scheduled tasks. Synchronization should
23 /// be as quick as possible, as it may otherwise delay the execution of due tasks.
24 /// If synchronization is slow, it should be done incrementally (i.e., it should span
25 /// multiple calls to `sync`).
26 ///
27 /// This method will not be called if `should_sync` returns `false`.
28 fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError>;
29
30 /// Return a mutable reference to the distributed capability of this backend, if any.
31 ///
32 /// Backends that do not support distributed coordination can leave the default
33 /// implementation untouched, and the beat loop will fall back to single-instance
34 /// behaviour.
35 fn as_distributed(&mut self) -> Option<&mut dyn DistributedScheduler> {
36 None
37 }
38
39 // Maybe we should consider some methods to inform the backend that a task has been executed.
40 // Not sure about what Python does, but at least it keeps a counter with the number of executed tasks,
41 // and the backend has access to that.
42}
43
44pub struct TickDecision {
45 pub execute_tasks: bool,
46 pub sleep_hint: Option<Duration>,
47}
48
49impl TickDecision {
50 pub fn execute() -> Self {
51 TickDecision {
52 execute_tasks: true,
53 sleep_hint: None,
54 }
55 }
56
57 pub fn execute_with_hint(sleep_hint: Duration) -> Self {
58 TickDecision {
59 execute_tasks: true,
60 sleep_hint: Some(sleep_hint),
61 }
62 }
63
64 pub fn skip(sleep_hint: Duration) -> Self {
65 TickDecision {
66 execute_tasks: false,
67 sleep_hint: Some(sleep_hint),
68 }
69 }
70}
71
72pub trait DistributedScheduler {
73 fn before_tick<'a>(
74 &'a mut self,
75 ) -> Pin<Box<dyn Future<Output = Result<TickDecision, BeatError>> + 'a>>;
76
77 fn after_tick<'a>(
78 &'a mut self,
79 scheduled_tasks: &'a mut BinaryHeap<ScheduledTask>,
80 ) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + 'a>>;
81
82 fn shutdown<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + 'a>> {
83 Box::pin(async { Ok(()) })
84 }
85}
86
87/// The default [`SchedulerBackend`](trait.SchedulerBackend.html).
88pub struct LocalSchedulerBackend {}
89
90#[allow(clippy::new_without_default)]
91impl LocalSchedulerBackend {
92 pub fn new() -> Self {
93 Self {}
94 }
95}
96
97impl SchedulerBackend for LocalSchedulerBackend {
98 fn should_sync(&self) -> bool {
99 false
100 }
101
102 #[allow(unused_variables)]
103 fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
104 unimplemented!()
105 }
106}