celery/beat/backend.rs
1/// This module contains the definition of application-provided scheduler backends.
2use super::scheduled_task::ScheduledTask;
3use crate::error::BeatError;
4use std::{collections::BinaryHeap, future::Future, pin::Pin, time::Duration};
5
6mod redis;
7pub use redis::{RedisBackendConfig, RedisSchedulerBackend};
8
9/// A `SchedulerBackend` is in charge of keeping track of the internal state of the scheduler
10/// according to some source of truth, such as a database.
11///
12/// The default scheduler backend, [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html),
13/// doesn't do any external synchronization, so the source of truth is just the locally defined
14/// schedules.
15pub trait SchedulerBackend: Send {
16 /// Check whether the internal state of the scheduler should be synchronized.
17 /// If this method returns `true`, then `sync` will be called as soon as possible.
18 fn should_sync(&self) -> bool;
19
20 /// Synchronize the internal state of the scheduler.
21 ///
22 /// This method is called in the pauses between scheduled tasks. Synchronization should
23 /// be as quick as possible, as it may otherwise delay the execution of due tasks.
24 /// If synchronization is slow, it should be done incrementally (i.e., it should span
25 /// multiple calls to `sync`).
26 ///
27 /// This method will not be called if `should_sync` returns `false`.
28 fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError>;
29
30 /// Return a mutable reference to the distributed capability of this backend, if any.
31 ///
32 /// Backends that do not support distributed coordination can leave the default
33 /// implementation untouched, and the beat loop will fall back to single-instance
34 /// behaviour.
35 fn as_distributed(&mut self) -> Option<&mut dyn DistributedScheduler> {
36 None
37 }
38
39 // Maybe we should consider some methods to inform the backend that a task has been executed.
40 // Not sure about what Python does, but at least it keeps a counter with the number of executed tasks,
41 // and the backend has access to that.
42}
43
44pub struct TickDecision {
45 pub execute_tasks: bool,
46 pub sleep_hint: Option<Duration>,
47}
48
49impl TickDecision {
50 pub fn execute() -> Self {
51 TickDecision {
52 execute_tasks: true,
53 sleep_hint: None,
54 }
55 }
56
57 pub fn execute_with_hint(sleep_hint: Duration) -> Self {
58 TickDecision {
59 execute_tasks: true,
60 sleep_hint: Some(sleep_hint),
61 }
62 }
63
64 pub fn skip(sleep_hint: Duration) -> Self {
65 TickDecision {
66 execute_tasks: false,
67 sleep_hint: Some(sleep_hint),
68 }
69 }
70}
71
72pub trait DistributedScheduler: Send {
73 fn before_tick<'a>(
74 &'a mut self,
75 ) -> Pin<Box<dyn Future<Output = Result<TickDecision, BeatError>> + Send + 'a>>;
76
77 fn after_tick<'a>(
78 &'a mut self,
79 scheduled_tasks: &'a mut BinaryHeap<ScheduledTask>,
80 ) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + Send + 'a>>;
81
82 fn shutdown<'a>(
83 &'a mut self,
84 ) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + Send + 'a>> {
85 Box::pin(async { Ok(()) })
86 }
87}
88
89/// The default [`SchedulerBackend`](trait.SchedulerBackend.html).
90pub struct LocalSchedulerBackend {}
91
92#[allow(clippy::new_without_default)]
93impl LocalSchedulerBackend {
94 pub fn new() -> Self {
95 Self {}
96 }
97}
98
99impl SchedulerBackend for LocalSchedulerBackend {
100 fn should_sync(&self) -> bool {
101 false
102 }
103
104 #[allow(unused_variables)]
105 fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
106 unimplemented!()
107 }
108}