1#![doc = include_str!("../README.md")]
2#![warn(clippy::unwrap_used, clippy::panicking_overflow_checks)]
3#[cfg(feature = "async-scheduler")]
4mod async_scheduler;
5#[cfg(feature = "async-scheduler")]
6pub use async_scheduler::*;
7pub mod prelude;
9use std::{
10 collections::{BinaryHeap, HashMap},
11 hash::Hash,
12};
13
14use handle_manager::HandleManager;
15use runtime::Runtime;
16use schedule::Schedule;
17pub type Dtu = chrono::DateTime<chrono::Utc>;
19pub mod handle_manager;
21pub mod runtime;
23pub mod schedule;
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
30pub struct TaskUid(pub(crate) u128);
31impl TaskUid {
32 #[cfg(feature = "uuid")]
33 pub fn uuid() -> Self {
34 Self(uuid::Uuid::new_v4().as_u128())
35 }
36 pub fn into_inner(self) -> u128 {
37 self.0
38 }
39 pub fn new(inner: u128) -> Self {
40 Self(inner)
41 }
42}
43
44impl std::fmt::Display for TaskUid {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "{:032x}", self.0)
47 }
48}
49
50pub type RunTaskFn<R> = dyn Fn(&mut R, &TaskRun) -> <R as Runtime>::Handle + Send;
51
52pub struct Task<R: Runtime> {
59 pub schedule: Box<dyn Schedule + Send>,
60 pub run: Box<RunTaskFn<R>>,
61}
62
63impl<R: Runtime> std::fmt::Debug for Task<R> {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("Task").finish_non_exhaustive()
66 }
67}
68
69#[derive(Debug)]
94pub struct Scheduler<R: Runtime, H = ()> {
95 pub(crate) next_up_heap: BinaryHeap<TaskRun>,
96 pub(crate) task_map: HashMap<TaskUid, Task<R>>,
97 pub(crate) runtime: R,
98 pub handle_manager: H,
99}
100
101impl<R, H> Default for Scheduler<R, H>
102where
103 R: Runtime + Default,
104 H: Default,
105{
106 fn default() -> Self {
107 Self {
108 next_up_heap: BinaryHeap::new(),
109 task_map: HashMap::new(),
110 runtime: R::default(),
111 handle_manager: H::default(),
112 }
113 }
114}
115#[derive(Debug, Clone)]
117pub struct TaskRun {
118 key: TaskUid,
119 time: chrono::DateTime<chrono::Utc>,
120}
121
122impl std::fmt::Display for TaskRun {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 write!(
125 f,
126 "{}-[{:032x}]-[{}]",
127 env!("CARGO_CRATE_NAME"),
128 self.key.0,
129 self.time
130 )
131 }
132}
133
134impl PartialEq for TaskRun {
135 fn eq(&self, other: &Self) -> bool {
136 self.time.eq(&other.time)
137 }
138}
139
140impl Eq for TaskRun {}
141
142impl PartialOrd for TaskRun {
143 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
144 Some(self.cmp(other))
145 }
146}
147impl Ord for TaskRun {
148 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
149 self.time.cmp(&other.time).reverse()
151 }
152}
153
154impl TaskRun {
155 pub fn key(&self) -> TaskUid {
156 self.key
157 }
158 pub fn time(&self) -> Dtu {
159 self.time
160 }
161}
162
163impl<R: Runtime> Scheduler<R, ()> {
164 pub fn new(runtime: R) -> Self {
165 Self {
166 next_up_heap: BinaryHeap::new(),
167 task_map: HashMap::new(),
168 runtime,
169 handle_manager: (),
170 }
171 }
172}
173
174impl<R: Runtime, H> Scheduler<R, H> {
175 pub fn runtime(&self) -> &R {
176 &self.runtime
177 }
178 pub fn with_handle_manager<H2>(self, handle_manager: H2) -> Scheduler<R, H2> {
187 Scheduler {
188 next_up_heap: self.next_up_heap,
189 task_map: self.task_map,
190 runtime: self.runtime,
191 handle_manager,
192 }
193 }
194}
195
196impl<R: Runtime, H: HandleManager<R::Handle>> Scheduler<R, H> {
197 pub fn add_task(&mut self, key: TaskUid, mut task: Task<R>) {
199 if let Some(next) = task.schedule.next() {
200 let next_up = TaskRun { key, time: next };
201 self.task_map.insert(key, task);
202 self.next_up_heap.push(next_up);
203 }
204 }
205 pub fn delete_task(&mut self, key: TaskUid) -> Option<Task<R>> {
207 self.task_map.remove(&key)
208 }
209 #[inline]
211 pub fn execute_by_now(&mut self) {
212 self.execute(chrono::Utc::now())
213 }
214 pub fn execute(&mut self, base_time: Dtu) {
216 let now = base_time;
217 while let Some(peek) = self.next_up_heap.peek() {
218 if peek.time > now {
219 break;
220 } else {
221 let mut next_up = self.next_up_heap.pop().expect("should has peek");
222 let Some(task) = self.task_map.get_mut(&next_up.key) else {
223 continue;
225 };
226 let handle = (task.run)(&mut self.runtime, &next_up);
227 self.handle_manager.manage(&next_up, handle);
228 if let Some(next_call) = task.schedule.next() {
229 next_up.time = next_call;
230 self.next_up_heap.push(next_up);
231 } else {
232 self.task_map.remove(&next_up.key);
233 }
234 }
235 }
236 }
237}
238
239#[inline]
240pub fn now() -> Dtu {
242 chrono::Utc::now()
243}