scheduling/
lib.rs

1/*!
2A very simple job scheduler. Runs one job (one-time or recurring) on one spawned thread.
3
4# Usage
5
6```rust
7fn main() {
8    let _once_handle = scheduling::Scheduler::once(|| println!("ONCE")).start();
9
10    let recurring_handle = scheduling::Scheduler::delayed_recurring(
11        std::time::Duration::from_secs(1),
12        std::time::Duration::from_secs(1),
13        || println!("1 SEC ELAPSED"),
14    )
15    .start();
16
17    std::thread::sleep(std::time::Duration::from_secs(5));
18
19    recurring_handle.cancel();
20
21    std::thread::sleep(std::time::Duration::from_secs(5));
22}
23```
24*/
25
26use std::sync::atomic::{self, AtomicBool};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29
30/// Handle for a `Scheduler`, running or not
31pub struct SchedulerHandle(Arc<AtomicBool>);
32
33impl Drop for SchedulerHandle {
34    fn drop(&mut self) {
35        self.cancel()
36    }
37}
38
39impl SchedulerHandle {
40    /// Cancel the `Scheduler` (stop any running job)
41    pub fn cancel(&self) {
42        self.0.store(true, atomic::Ordering::SeqCst);
43    }
44}
45
46enum JobType {
47    Once(Box<dyn FnMut() + Send + 'static>),
48    Recurring { f: Box<dyn FnMut() + Send + 'static>, rate: Duration },
49}
50
51struct Job {
52    type_: JobType,
53    time: Instant,
54}
55
56/// The `Scheduler` container
57#[derive(Default)]
58pub struct Scheduler {
59    job: Option<Job>,
60    cancelled: Arc<AtomicBool>,
61}
62
63impl Scheduler {
64    /// Get the `SchedulerHandle` for this scheduler. Can be used to setup tasks that cancel themselves on some condition.
65    pub fn handle(&self) -> SchedulerHandle {
66        SchedulerHandle(self.cancelled.clone())
67    }
68
69    /// Create a scheduler to run a one-time job in a background thread
70    pub fn once<F>(f: F) -> Self
71    where
72        F: FnMut() + Send + 'static,
73    {
74        let job = Job { type_: JobType::Once(Box::new(f)), time: Instant::now() };
75
76        Self { job: Some(job), cancelled: Arc::new(AtomicBool::new(false)) }
77    }
78
79    /// Create a scheduler to run a one-time job with an initial delay
80    pub fn delayed_once<F>(delay: Duration, f: F) -> Self
81    where
82        F: FnMut() + Send + 'static,
83    {
84        let job = Job { type_: JobType::Once(Box::new(f)), time: Instant::now() + delay };
85
86        Self { job: Some(job), cancelled: Arc::new(AtomicBool::new(false)) }
87    }
88
89    /// Create a scheduler to run a recurring job at a fixed rate
90    pub fn recurring<F>(rate: Duration, f: F) -> Self
91    where
92        F: FnMut() + Send + 'static,
93    {
94        let job = Job { type_: JobType::Recurring { f: Box::new(f), rate }, time: Instant::now() };
95
96        Self { job: Some(job), cancelled: Arc::new(AtomicBool::new(false)) }
97    }
98
99    /// Create a scheduler to run a recurring job at a fixed rate, with an initial delay
100    pub fn delayed_recurring<F>(delay: Duration, rate: Duration, f: F) -> Self
101    where
102        F: FnMut() + Send + 'static,
103    {
104        let job = Job {
105            type_: JobType::Recurring { f: Box::new(f), rate },
106            time: Instant::now() + delay,
107        };
108
109        Self { job: Some(job), cancelled: Arc::new(AtomicBool::new(false)) }
110    }
111
112    /// Start running the `Scheduler` and return its `handle`
113    pub fn start(self) -> SchedulerHandle {
114        let handle = self.handle();
115        std::thread::spawn(move || self.run());
116        handle
117    }
118
119    fn run(mut self) {
120        while let Some(job) = self.get_job() {
121            self.run_job(job);
122        }
123    }
124
125    fn get_job(&mut self) -> Option<Job> {
126        loop {
127            if self.cancelled.load(atomic::Ordering::SeqCst) {
128                return None;
129            }
130
131            let now = Instant::now();
132
133            match self.job.as_ref() {
134                None => {
135                    break;
136                }
137                Some(j) if j.time <= now => {
138                    break;
139                }
140                Some(j) => {
141                    std::thread::sleep(j.time - now);
142                }
143            };
144        }
145
146        self.job.take()
147    }
148
149    fn run_job(&mut self, job: Job) {
150        match job.type_ {
151            JobType::Once(mut f) => {
152                f();
153            }
154            JobType::Recurring { mut f, rate } => {
155                f();
156                let new_job = Job { type_: JobType::Recurring { f, rate }, time: job.time + rate };
157                self.job = Some(new_job);
158            }
159        }
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    #[test]
166    fn it_works() {
167        assert_eq!(2 + 2, 4);
168    }
169}