celery/beat/
backend.rs

1/// This module contains the definition of application-provided scheduler backends.
2use super::scheduled_task::ScheduledTask;
3use crate::error::BeatError;
4use std::{collections::BinaryHeap, future::Future, pin::Pin, time::Duration};
5
6mod redis;
7pub use redis::{RedisBackendConfig, RedisSchedulerBackend};
8
9/// A `SchedulerBackend` is in charge of keeping track of the internal state of the scheduler
10/// according to some source of truth, such as a database.
11///
12/// The default scheduler backend, [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html),
13/// doesn't do any external synchronization, so the source of truth is just the locally defined
14/// schedules.
15pub trait SchedulerBackend {
16    /// Check whether the internal state of the scheduler should be synchronized.
17    /// If this method returns `true`, then `sync` will be called as soon as possible.
18    fn should_sync(&self) -> bool;
19
20    /// Synchronize the internal state of the scheduler.
21    ///
22    /// This method is called in the pauses between scheduled tasks. Synchronization should
23    /// be as quick as possible, as it may otherwise delay the execution of due tasks.
24    /// If synchronization is slow, it should be done incrementally (i.e., it should span
25    /// multiple calls to `sync`).
26    ///
27    /// This method will not be called if `should_sync` returns `false`.
28    fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError>;
29
30    /// Return a mutable reference to the distributed capability of this backend, if any.
31    ///
32    /// Backends that do not support distributed coordination can leave the default
33    /// implementation untouched, and the beat loop will fall back to single-instance
34    /// behaviour.
35    fn as_distributed(&mut self) -> Option<&mut dyn DistributedScheduler> {
36        None
37    }
38
39    // Maybe we should consider some methods to inform the backend that a task has been executed.
40    // Not sure about what Python does, but at least it keeps a counter with the number of executed tasks,
41    // and the backend has access to that.
42}
43
44pub struct TickDecision {
45    pub execute_tasks: bool,
46    pub sleep_hint: Option<Duration>,
47}
48
49impl TickDecision {
50    pub fn execute() -> Self {
51        TickDecision {
52            execute_tasks: true,
53            sleep_hint: None,
54        }
55    }
56
57    pub fn execute_with_hint(sleep_hint: Duration) -> Self {
58        TickDecision {
59            execute_tasks: true,
60            sleep_hint: Some(sleep_hint),
61        }
62    }
63
64    pub fn skip(sleep_hint: Duration) -> Self {
65        TickDecision {
66            execute_tasks: false,
67            sleep_hint: Some(sleep_hint),
68        }
69    }
70}
71
72pub trait DistributedScheduler {
73    fn before_tick<'a>(
74        &'a mut self,
75    ) -> Pin<Box<dyn Future<Output = Result<TickDecision, BeatError>> + 'a>>;
76
77    fn after_tick<'a>(
78        &'a mut self,
79        scheduled_tasks: &'a mut BinaryHeap<ScheduledTask>,
80    ) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + 'a>>;
81
82    fn shutdown<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + 'a>> {
83        Box::pin(async { Ok(()) })
84    }
85}
86
87/// The default [`SchedulerBackend`](trait.SchedulerBackend.html).
88pub struct LocalSchedulerBackend {}
89
90#[allow(clippy::new_without_default)]
91impl LocalSchedulerBackend {
92    pub fn new() -> Self {
93        Self {}
94    }
95}
96
97impl SchedulerBackend for LocalSchedulerBackend {
98    fn should_sync(&self) -> bool {
99        false
100    }
101
102    #[allow(unused_variables)]
103    fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
104        unimplemented!()
105    }
106}