1use super::{scheduled_task::ScheduledTask, Schedule};
2use crate::{broker::Broker, error::BeatError, protocol::TryCreateMessage};
3use log::{debug, info};
4use std::collections::BinaryHeap;
5use std::time::{Duration, SystemTime};
6
7const DEFAULT_SLEEP_INTERVAL: Duration = Duration::from_millis(500);
8
9pub struct Scheduler {
18 heap: BinaryHeap<ScheduledTask>,
19 default_sleep_interval: Duration,
20 pub broker: Box<dyn Broker>,
21}
22
23impl Scheduler {
24 pub fn new(broker: Box<dyn Broker>) -> Scheduler {
26 Scheduler {
27 heap: BinaryHeap::new(),
28 default_sleep_interval: DEFAULT_SLEEP_INTERVAL,
29 broker,
30 }
31 }
32
33 pub fn schedule_task<S>(
35 &mut self,
36 name: String,
37 message_factory: Box<dyn TryCreateMessage>,
38 queue: String,
39 schedule: S,
40 ) where
41 S: Schedule + 'static,
42 {
43 match schedule.next_call_at(None) {
44 Some(next_call_at) => self.heap.push(ScheduledTask::new(
45 name,
46 message_factory,
47 queue,
48 schedule,
49 next_call_at,
50 )),
51 None => debug!(
52 "The schedule of task {} never scheduled the task to run, so it has been dropped.",
53 name
54 ),
55 }
56 }
57
58 pub fn get_scheduled_tasks(&mut self) -> &mut BinaryHeap<ScheduledTask> {
60 &mut self.heap
61 }
62
63 fn next_task_time(&self, now: SystemTime) -> SystemTime {
65 if let Some(scheduled_task) = self.heap.peek() {
66 debug!(
67 "Next scheduled task is at {:?}",
68 scheduled_task.next_call_at
69 );
70 scheduled_task.next_call_at
71 } else {
72 debug!(
73 "No scheduled tasks, sleeping for {:?}",
74 self.default_sleep_interval
75 );
76 now + self.default_sleep_interval
77 }
78 }
79
80 pub async fn tick(&mut self) -> Result<SystemTime, BeatError> {
84 let now = SystemTime::now();
85 let next_task_time = self.next_task_time(now);
86
87 if next_task_time <= now {
88 let mut scheduled_task = self
89 .heap
90 .pop()
91 .expect("No scheduled tasks found even though there should be");
92 let result = self.send_scheduled_task(&mut scheduled_task).await;
93
94 if let Some(rescheduled_task) = scheduled_task.reschedule_task() {
98 self.heap.push(rescheduled_task);
99 } else {
100 debug!("A task is not scheduled to run anymore and will be dropped");
101 }
102
103 result?;
104 Ok(self.next_task_time(now))
105 } else {
106 Ok(next_task_time)
107 }
108 }
109
110 async fn send_scheduled_task(
112 &self,
113 scheduled_task: &mut ScheduledTask,
114 ) -> Result<(), BeatError> {
115 let queue = &scheduled_task.queue;
116
117 let message = scheduled_task.message_factory.try_create_message()?;
118
119 info!(
120 "Sending task {}[{}] to {} queue",
121 scheduled_task.name,
122 message.task_id(),
123 queue
124 );
125 self.broker.send(&message, queue).await?;
126 scheduled_task.last_run_at.replace(SystemTime::now());
127 scheduled_task.total_run_count += 1;
128 Ok(())
129 }
130}