celery/beat/
scheduled_task.rs

1use super::Schedule;
2use crate::protocol::TryCreateMessage;
3use std::{cmp::Ordering, time::SystemTime};
4
5/// A task which is scheduled for execution. It contains the task to execute,
6/// the queue where to send it and the schedule which determines when to do it.
7pub struct ScheduledTask {
8    pub name: String,
9    pub message_factory: Box<dyn TryCreateMessage>,
10    pub queue: String,
11    pub schedule: Box<dyn Schedule>,
12    pub total_run_count: u32,
13    pub last_run_at: Option<SystemTime>,
14    pub next_call_at: SystemTime,
15}
16
17impl ScheduledTask {
18    /// Create a new scheduled task.
19    pub fn new<S>(
20        name: String,
21        message_factory: Box<dyn TryCreateMessage>,
22        queue: String,
23        schedule: S,
24        next_call_at: SystemTime,
25    ) -> ScheduledTask
26    where
27        S: Schedule + 'static,
28    {
29        ScheduledTask {
30            name,
31            message_factory,
32            queue,
33            schedule: Box::new(schedule),
34            total_run_count: 0,
35            last_run_at: None,
36            next_call_at,
37        }
38    }
39
40    /// Update the `next_call_at` field of the task.
41    /// If the task is not scheduled to run again, this method
42    /// will return `None`.
43    pub(super) fn reschedule_task(mut self) -> Option<ScheduledTask> {
44        match self.schedule.next_call_at(self.last_run_at) {
45            Some(next_call_at) => {
46                self.next_call_at = next_call_at;
47                Some(self)
48            }
49            None => None,
50        }
51    }
52}
53
54// We implement PartialEq, Eq, PartialOrd and Ord for ScheduledTask
55// because we want to use it in a BinaryHeap.
56
57impl Ord for ScheduledTask {
58    fn cmp(&self, other: &Self) -> Ordering {
59        // We only care about next_call_at when we are comparing different tasks.
60        // The comparison order is important (other is compared to self):
61        // BinaryHeap is a max-heap by default, but we want a min-heap.
62        other.next_call_at.cmp(&self.next_call_at)
63    }
64}
65
66impl PartialOrd for ScheduledTask {
67    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
68        Some(self.cmp(other))
69    }
70}
71
72impl PartialEq for ScheduledTask {
73    fn eq(&self, other: &Self) -> bool {
74        other.cmp(self) == Ordering::Equal
75    }
76}
77
78impl Eq for ScheduledTask {}