async_periodic_job/
lib.rs

1//! [![Crates.io](https://img.shields.io/crates/v/async-periodic-job)](https://crates.io/crates/async-periodic-job)
2//! [![docs](https://img.shields.io/crates/v/async-periodic-job?color=orange&label=docs)](https://docs.rs/async-periodic-job)
3//!
4//! A simple async periodic job scheduler library base on tokio-util.
5//!
6//! ## Features
7//!
8//! - **Periodic Job Execution**: Define jobs that run at regular intervals
9//! - **Asynchronous Execution**: Based on Tokio, schedule jobs in a non-blocking manner
10//! - **Truncated Run Time**: Jobs can be configured to run at truncated time intervals, ensuring precise timing
11//! - **Graceful Shutdown**: Gracefully stop the scheduler using either a `Ctrl+C` signal or a cancellation token
12//!
13//! ## Quick Started
14//!
15//! ### Installation
16//!
17//! ```toml
18//! [dependencies]
19//! async-periodic-job = "0.1.3"
20//! ```
21//!
22//! ### Usage
23//!
24//! #### Base Usage
25//!
26//! ```rust
27//! use async_periodic_job::{Job, Scheduler};
28//! use std::time::Duration;
29//!
30//! // Define a job without options
31//! struct JobImplA;
32//! impl Job for JobImplA {
33//!     async fn run(&mut self) {
34//!         // ...
35//!     }
36//! }
37//!
38//! // Define a job with customized period and time truncation
39//! struct JobImplB;
40//! impl Job for JobImplB {
41//!     // Job repeat period, default: 1s
42//!     fn period(&self) -> Duration {
43//!         Duration::from_secs(2)
44//!     }
45//!
46//!     // If run job with truncate time, default: true
47//!     fn with_truncate_time(&self) -> bool {
48//!         false
49//!     }
50//!
51//!     // Job run
52//!     async fn run(&mut self) {
53//!         // ...
54//!     }
55//! }
56//!
57//! #[tokio::main]
58//! async fn main() {
59//!     // Spawn job instances of JobA and JobB and wait scheduler stop
60//!     // When received `CTRL+C` signal, scheduler will exit (after all running jobs exit)
61//!     Scheduler::new()
62//!         .spawn(JobImplA)
63//!         .spawn(JobImplB)
64//!         .wait()
65//!         .await;
66//! }
67//!
68//! ```
69//!
70//! #### Scheduler cancellation
71//!
72//! ```rust
73//! use async_periodic_job::{Job, Scheduler, Token};
74//! use std::time::Duration;
75//! use tokio::time::sleep;
76//!
77//! // Define a job with cancel
78//! struct JobImpl;
79//! impl Job for JobImpl {
80//!     async fn run(&mut self) {
81//!         // ...
82//!     }
83//! }
84//!
85//! #[tokio::main]
86//! async fn main() {
87//!     // Create a cancellation token and spawn a future to cancel the token after 5 secs
88//!     let token = Token::new();
89//!     let token_copy = token.clone();
90//!     tokio::spawn(async move {
91//!         sleep(Duration::from_secs(5)).await;
92//!         token_copy.cancel();
93//!     });
94//!
95//!     // Spawn instance of JobImpl and wait scheduler stop
96//!     // When token cancelled, scheduler will exit (after all running jobs exit)
97//!     Scheduler::new()
98//!         .spawn(JobImpl)
99//!         .wait_cancel(token)
100//!         .await;
101//! }
102//! ```
103//!
104//! #### Spawn job with cancel
105//!
106//! ```rust
107//! use async_periodic_job::{Job, Scheduler, Token};
108//!
109//! // Define a job with cancel
110//! struct JobImpl;
111//! impl Job for JobImpl {
112//!     // If run job with cancel, default: false
113//!     // When this method returned true, job method `run_with_cancel` will be executed instead of `run`
114//!     fn with_with_cancel(&self) -> bool {
115//!         true
116//!     }
117//!
118//!     // Job run: instead implementing `run`, implement `run_with_cancel`
119//!     async fn run_with_cancel(&mut self, token: Token) {
120//!         loop {
121//!             if token.is_cancelled() {
122//!                 return;
123//!             }
124//!             // ...
125//!         }
126//!     }
127//! }
128//!
129//! #[tokio::main]
130//! async fn main() {
131//!     Scheduler::new()
132//!         .spawn(JobImpl)
133//!         .wait()
134//!         .await;
135//! }
136//! ```
137//!
138//! ## License
139//!
140//! MIT
141//!
142//! Contributions and suggestions are welcome!
143
144use std::time::{Duration, SystemTime};
145use tokio::time::sleep;
146use tokio::{select, signal};
147use tokio_util::sync::CancellationToken;
148use tokio_util::task::TaskTracker;
149
150pub type Token = CancellationToken;
151
152pub trait Job: Send + 'static {
153    fn period(&self) -> Duration {
154        Duration::from_secs(1)
155    }
156
157    fn with_truncate_time(&self) -> bool {
158        true
159    }
160
161    fn run(&mut self) -> impl Future<Output = ()> + Send {
162        async {}
163    }
164
165    fn with_cancel(&self) -> bool {
166        false
167    }
168
169    fn run_with_cancel(&mut self, _token: Token) -> impl Future<Output = ()> + Send {
170        async {}
171    }
172}
173
174pub struct Scheduler {
175    tracker: TaskTracker,
176    token: Token,
177}
178
179impl Scheduler {
180    pub fn new() -> Self {
181        Self {
182            tracker: TaskTracker::new(),
183            token: Token::new(),
184        }
185    }
186
187    pub fn spawn(self, mut job: impl Job) -> Self {
188        let period = job.period();
189        let token = self.token.clone();
190        self.tracker.spawn(async move {
191            loop {
192                let period = if job.with_truncate_time() {
193                    Self::truncate_period(period)
194                } else {
195                    period
196                };
197                select! {
198                    _ = token.cancelled() => break,
199                    _ = sleep(period) =>  {
200                        if job.with_cancel() {
201                            job.run_with_cancel(token.child_token()).await;
202                        } else {
203                            job.run().await
204                        }
205                    }
206                };
207            }
208        });
209        self
210    }
211
212    pub async fn stop(self) {
213        self.tracker.close();
214        self.token.cancel();
215        self.tracker.wait().await;
216    }
217
218    pub async fn wait(self) {
219        signal::ctrl_c().await.unwrap();
220        self.stop().await;
221    }
222
223    pub async fn wait_cancel(self, token: CancellationToken) {
224        token.cancelled().await;
225        self.stop().await;
226    }
227
228    fn truncate_period(period: Duration) -> Duration {
229        let period = period.as_nanos();
230        let epoch = SystemTime::now()
231            .duration_since(SystemTime::UNIX_EPOCH)
232            .unwrap()
233            .as_nanos();
234        let nanos = period - epoch % period;
235        Duration::from_nanos(nanos as u64)
236    }
237}