celery/beat/
scheduler.rs

1use super::{scheduled_task::ScheduledTask, Schedule};
2use crate::{broker::Broker, error::BeatError, protocol::TryCreateMessage};
3use log::{debug, info};
4use std::collections::BinaryHeap;
5use std::time::{Duration, SystemTime};
6
7const DEFAULT_SLEEP_INTERVAL: Duration = Duration::from_millis(500);
8
9/// A [`Scheduler`] is in charge of executing scheduled tasks when they are due.
10///
11/// It is somehow similar to a future, in the sense that by itself it does nothing,
12/// and execution is driven by an "executor" (the [`Beat`](super::Beat)) which
13/// is in charge of calling the scheduler [`tick`](Scheduler::tick).
14///
15/// Internally it uses a min-heap to store tasks and efficiently retrieve the ones
16/// that are due for execution.
17pub struct Scheduler {
18    heap: BinaryHeap<ScheduledTask>,
19    default_sleep_interval: Duration,
20    pub broker: Box<dyn Broker>,
21}
22
23impl Scheduler {
24    /// Create a new scheduler which uses the given `broker`.
25    pub fn new(broker: Box<dyn Broker>) -> Scheduler {
26        Scheduler {
27            heap: BinaryHeap::new(),
28            default_sleep_interval: DEFAULT_SLEEP_INTERVAL,
29            broker,
30        }
31    }
32
33    /// Schedule the execution of a task.
34    pub fn schedule_task<S>(
35        &mut self,
36        name: String,
37        message_factory: Box<dyn TryCreateMessage>,
38        queue: String,
39        schedule: S,
40    ) where
41        S: Schedule + 'static,
42    {
43        match schedule.next_call_at(None) {
44            Some(next_call_at) => self.heap.push(ScheduledTask::new(
45                name,
46                message_factory,
47                queue,
48                schedule,
49                next_call_at,
50            )),
51            None => debug!(
52                "The schedule of task {} never scheduled the task to run, so it has been dropped.",
53                name
54            ),
55        }
56    }
57
58    /// Get all scheduled tasks.
59    pub fn get_scheduled_tasks(&mut self) -> &mut BinaryHeap<ScheduledTask> {
60        &mut self.heap
61    }
62
63    /// Get the time when the next task should be executed.
64    fn next_task_time(&self, now: SystemTime) -> SystemTime {
65        if let Some(scheduled_task) = self.heap.peek() {
66            debug!(
67                "Next scheduled task is at {:?}",
68                scheduled_task.next_call_at
69            );
70            scheduled_task.next_call_at
71        } else {
72            debug!(
73                "No scheduled tasks, sleeping for {:?}",
74                self.default_sleep_interval
75            );
76            now + self.default_sleep_interval
77        }
78    }
79
80    /// Tick once. This method checks if there is a scheduled task which is due
81    /// for execution and, if so, sends it to the broker.
82    /// It returns the time by which `tick` should be called again.
83    pub async fn tick(&mut self) -> Result<SystemTime, BeatError> {
84        let now = SystemTime::now();
85        let next_task_time = self.next_task_time(now);
86
87        if next_task_time <= now {
88            let mut scheduled_task = self
89                .heap
90                .pop()
91                .expect("No scheduled tasks found even though there should be");
92            let result = self.send_scheduled_task(&mut scheduled_task).await;
93
94            // Reschedule the task before checking if the task execution was successful.
95            // TODO: we may have more fine-grained logic here and reschedule the task
96            // only after examining the type of error.
97            if let Some(rescheduled_task) = scheduled_task.reschedule_task() {
98                self.heap.push(rescheduled_task);
99            } else {
100                debug!("A task is not scheduled to run anymore and will be dropped");
101            }
102
103            result?;
104            Ok(self.next_task_time(now))
105        } else {
106            Ok(next_task_time)
107        }
108    }
109
110    /// Send a task to the broker.
111    async fn send_scheduled_task(
112        &self,
113        scheduled_task: &mut ScheduledTask,
114    ) -> Result<(), BeatError> {
115        let queue = &scheduled_task.queue;
116
117        let message = scheduled_task.message_factory.try_create_message()?;
118
119        info!(
120            "Sending task {}[{}] to {} queue",
121            scheduled_task.name,
122            message.task_id(),
123            queue
124        );
125        self.broker.send(&message, queue).await?;
126        scheduled_task.last_run_at.replace(SystemTime::now());
127        scheduled_task.total_run_count += 1;
128        Ok(())
129    }
130}