tsuki_scheduler/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(clippy::unwrap_used, clippy::panicking_overflow_checks)]
3#[cfg(feature = "async-scheduler")]
4mod async_scheduler;
5#[cfg(feature = "async-scheduler")]
6pub use async_scheduler::*;
7/// prelude for tsuki_scheduler
8pub mod prelude;
9use std::{
10    collections::{BinaryHeap, HashMap},
11    hash::Hash,
12};
13
14use handle_manager::HandleManager;
15use runtime::Runtime;
16use schedule::Schedule;
17/// alias for [`chrono::DateTime`] in [`chrono::Utc`] timezone
18pub type Dtu = chrono::DateTime<chrono::Utc>;
19/// Process the handlers of the tasks
20pub mod handle_manager;
21/// Runtime to run the tasks
22pub mod runtime;
23/// Schedules and combinators
24pub mod schedule;
25/// unique identifier for a task
26///
27/// # Using uuid
28/// enable feature `uuid` and create a new task uid with [`TaskUid::uuid()`]
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
30pub struct TaskUid(pub(crate) u128);
31impl TaskUid {
32    #[cfg(feature = "uuid")]
33    pub fn uuid() -> Self {
34        Self(uuid::Uuid::new_v4().as_u128())
35    }
36    pub fn into_inner(self) -> u128 {
37        self.0
38    }
39    pub fn new(inner: u128) -> Self {
40        Self(inner)
41    }
42}
43
44impl std::fmt::Display for TaskUid {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        write!(f, "{:032x}", self.0)
47    }
48}
49
50pub type RunTaskFn<R> = dyn Fn(&mut R, &TaskRun) -> <R as Runtime>::Handle + Send;
51
52/// Task to be scheduled
53///
54/// # Fields
55/// - schedule: a [`Schedule`] trait object
56/// - run: function to create a new task in specific runtime `R`
57///
58pub struct Task<R: Runtime> {
59    pub schedule: Box<dyn Schedule + Send>,
60    pub run: Box<RunTaskFn<R>>,
61}
62
63impl<R: Runtime> std::fmt::Debug for Task<R> {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("Task").finish_non_exhaustive()
66    }
67}
68
69/// Scheduler to manage tasks
70///
71/// # Usage
72/// ```
73/// # use tsuki_scheduler::prelude::*;
74/// # use chrono::Utc;
75/// let mut scheduler = Scheduler::new(Thread::new());
76/// let id = TaskUid::uuid();
77/// // add a new task
78/// scheduler.add_task(TaskUid::uuid(), Task::thread(Utc::now(), || {
79///     println!("Hello, world!");
80/// }));
81/// // execute all tasks by now
82/// scheduler.execute_by_now();
83/// // delete task by id
84/// scheduler.delete_task(id);
85/// ```
86///
87/// # Manage handles
88/// The handle manager is used to manage the handles of the tasks.
89///
90/// The default one is `()`, which does nothing.
91///
92/// And you can implement your own handle manager to manage the handles, see [HandleManager](`crate::handle_manager::HandleManager`).
93#[derive(Debug)]
94pub struct Scheduler<R: Runtime, H = ()> {
95    pub(crate) next_up_heap: BinaryHeap<TaskRun>,
96    pub(crate) task_map: HashMap<TaskUid, Task<R>>,
97    pub(crate) runtime: R,
98    pub handle_manager: H,
99}
100
101impl<R, H> Default for Scheduler<R, H>
102where
103    R: Runtime + Default,
104    H: Default,
105{
106    fn default() -> Self {
107        Self {
108            next_up_heap: BinaryHeap::new(),
109            task_map: HashMap::new(),
110            runtime: R::default(),
111            handle_manager: H::default(),
112        }
113    }
114}
115/// A single task running schedule
116#[derive(Debug, Clone)]
117pub struct TaskRun {
118    key: TaskUid,
119    time: chrono::DateTime<chrono::Utc>,
120}
121
122impl std::fmt::Display for TaskRun {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        write!(
125            f,
126            "{}-[{:032x}]-[{}]",
127            env!("CARGO_CRATE_NAME"),
128            self.key.0,
129            self.time
130        )
131    }
132}
133
134impl PartialEq for TaskRun {
135    fn eq(&self, other: &Self) -> bool {
136        self.time.eq(&other.time)
137    }
138}
139
140impl Eq for TaskRun {}
141
142impl PartialOrd for TaskRun {
143    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
144        Some(self.cmp(other))
145    }
146}
147impl Ord for TaskRun {
148    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
149        // the task with earlier next time has higher priority
150        self.time.cmp(&other.time).reverse()
151    }
152}
153
154impl TaskRun {
155    pub fn key(&self) -> TaskUid {
156        self.key
157    }
158    pub fn time(&self) -> Dtu {
159        self.time
160    }
161}
162
163impl<R: Runtime> Scheduler<R, ()> {
164    pub fn new(runtime: R) -> Self {
165        Self {
166            next_up_heap: BinaryHeap::new(),
167            task_map: HashMap::new(),
168            runtime,
169            handle_manager: (),
170        }
171    }
172}
173
174impl<R: Runtime, H> Scheduler<R, H> {
175    pub fn runtime(&self) -> &R {
176        &self.runtime
177    }
178    /// set handle manager
179    /// # Example
180    /// ```
181    /// # use tsuki_scheduler::prelude::*;
182    /// // use a vector to collect handles
183    /// let handles: Vec<<Thread as Runtime>::Handle> = vec![];
184    /// let scheduler = Scheduler::new(Thread::new()).with_handle_manager(handles);
185    /// ```
186    pub fn with_handle_manager<H2>(self, handle_manager: H2) -> Scheduler<R, H2> {
187        Scheduler {
188            next_up_heap: self.next_up_heap,
189            task_map: self.task_map,
190            runtime: self.runtime,
191            handle_manager,
192        }
193    }
194}
195
196impl<R: Runtime, H: HandleManager<R::Handle>> Scheduler<R, H> {
197    /// add a new task
198    pub fn add_task(&mut self, key: TaskUid, mut task: Task<R>) {
199        if let Some(next) = task.schedule.next() {
200            let next_up = TaskRun { key, time: next };
201            self.task_map.insert(key, task);
202            self.next_up_heap.push(next_up);
203        }
204    }
205    /// delete a task by id
206    pub fn delete_task(&mut self, key: TaskUid) -> Option<Task<R>> {
207        self.task_map.remove(&key)
208    }
209    /// execute all tasks by now
210    #[inline]
211    pub fn execute_by_now(&mut self) {
212        self.execute(chrono::Utc::now())
213    }
214    /// execute all tasks by a specific time
215    pub fn execute(&mut self, base_time: Dtu) {
216        let now = base_time;
217        while let Some(peek) = self.next_up_heap.peek() {
218            if peek.time > now {
219                break;
220            } else {
221                let mut next_up = self.next_up_heap.pop().expect("should has peek");
222                let Some(task) = self.task_map.get_mut(&next_up.key) else {
223                    // has been deleted
224                    continue;
225                };
226                let handle = (task.run)(&mut self.runtime, &next_up);
227                self.handle_manager.manage(&next_up, handle);
228                if let Some(next_call) = task.schedule.next() {
229                    next_up.time = next_call;
230                    self.next_up_heap.push(next_up);
231                } else {
232                    self.task_map.remove(&next_up.key);
233                }
234            }
235        }
236    }
237}
238
239#[inline]
240/// a shortcut to call [`chrono::Utc::now()`]
241pub fn now() -> Dtu {
242    chrono::Utc::now()
243}