async_periodic_job/lib.rs
1//! [](https://crates.io/crates/async-periodic-job)
2//! [](https://docs.rs/async-periodic-job)
3//!
4//! A simple async periodic job scheduler library base on tokio-util.
5//!
6//! ## Features
7//!
8//! - **Periodic Job Execution**: Define jobs that run at regular intervals
9//! - **Asynchronous Execution**: Based on Tokio, schedule jobs in a non-blocking manner
10//! - **Truncated Run Time**: Jobs can be configured to run at truncated time intervals, ensuring precise timing
11//! - **Graceful Shutdown**: Gracefully stop the scheduler using either a `Ctrl+C` signal or a cancellation token
12//!
13//! ## Quick Started
14//!
15//! ### Installation
16//!
17//! ```toml
18//! [dependencies]
19//! async-periodic-job = "0.1.3"
20//! ```
21//!
22//! ### Usage
23//!
24//! #### Base Usage
25//!
26//! ```rust
27//! use async_periodic_job::{Job, Scheduler};
28//! use std::time::Duration;
29//!
30//! // Define a job without options
31//! struct JobImplA;
32//! impl Job for JobImplA {
33//! async fn run(&mut self) {
34//! // ...
35//! }
36//! }
37//!
38//! // Define a job with customized period and time truncation
39//! struct JobImplB;
40//! impl Job for JobImplB {
41//! // Job repeat period, default: 1s
42//! fn period(&self) -> Duration {
43//! Duration::from_secs(2)
44//! }
45//!
46//! // If run job with truncate time, default: true
47//! fn with_truncate_time(&self) -> bool {
48//! false
49//! }
50//!
51//! // Job run
52//! async fn run(&mut self) {
53//! // ...
54//! }
55//! }
56//!
57//! #[tokio::main]
58//! async fn main() {
59//! // Spawn job instances of JobA and JobB and wait scheduler stop
60//! // When received `CTRL+C` signal, scheduler will exit (after all running jobs exit)
61//! Scheduler::new()
62//! .spawn(JobImplA)
63//! .spawn(JobImplB)
64//! .wait()
65//! .await;
66//! }
67//!
68//! ```
69//!
70//! #### Scheduler cancellation
71//!
72//! ```rust
73//! use async_periodic_job::{Job, Scheduler, Token};
74//! use std::time::Duration;
75//! use tokio::time::sleep;
76//!
77//! // Define a job with cancel
78//! struct JobImpl;
79//! impl Job for JobImpl {
80//! async fn run(&mut self) {
81//! // ...
82//! }
83//! }
84//!
85//! #[tokio::main]
86//! async fn main() {
87//! // Create a cancellation token and spawn a future to cancel the token after 5 secs
88//! let token = Token::new();
89//! let token_copy = token.clone();
90//! tokio::spawn(async move {
91//! sleep(Duration::from_secs(5)).await;
92//! token_copy.cancel();
93//! });
94//!
95//! // Spawn instance of JobImpl and wait scheduler stop
96//! // When token cancelled, scheduler will exit (after all running jobs exit)
97//! Scheduler::new()
98//! .spawn(JobImpl)
99//! .wait_cancel(token)
100//! .await;
101//! }
102//! ```
103//!
104//! #### Spawn job with cancel
105//!
106//! ```rust
107//! use async_periodic_job::{Job, Scheduler, Token};
108//!
109//! // Define a job with cancel
110//! struct JobImpl;
111//! impl Job for JobImpl {
112//! // If run job with cancel, default: false
113//! // When this method returned true, job method `run_with_cancel` will be executed instead of `run`
114//! fn with_with_cancel(&self) -> bool {
115//! true
116//! }
117//!
118//! // Job run: instead implementing `run`, implement `run_with_cancel`
119//! async fn run_with_cancel(&mut self, token: Token) {
120//! loop {
121//! if token.is_cancelled() {
122//! return;
123//! }
124//! // ...
125//! }
126//! }
127//! }
128//!
129//! #[tokio::main]
130//! async fn main() {
131//! Scheduler::new()
132//! .spawn(JobImpl)
133//! .wait()
134//! .await;
135//! }
136//! ```
137//!
138//! ## License
139//!
140//! MIT
141//!
142//! Contributions and suggestions are welcome!
143
144use std::time::{Duration, SystemTime};
145use tokio::time::sleep;
146use tokio::{select, signal};
147use tokio_util::sync::CancellationToken;
148use tokio_util::task::TaskTracker;
149
150pub type Token = CancellationToken;
151
152pub trait Job: Send + 'static {
153 fn period(&self) -> Duration {
154 Duration::from_secs(1)
155 }
156
157 fn with_truncate_time(&self) -> bool {
158 true
159 }
160
161 fn run(&mut self) -> impl Future<Output = ()> + Send {
162 async {}
163 }
164
165 fn with_cancel(&self) -> bool {
166 false
167 }
168
169 fn run_with_cancel(&mut self, _token: Token) -> impl Future<Output = ()> + Send {
170 async {}
171 }
172}
173
174pub struct Scheduler {
175 tracker: TaskTracker,
176 token: Token,
177}
178
179impl Scheduler {
180 pub fn new() -> Self {
181 Self {
182 tracker: TaskTracker::new(),
183 token: Token::new(),
184 }
185 }
186
187 pub fn spawn(self, mut job: impl Job) -> Self {
188 let period = job.period();
189 let token = self.token.clone();
190 self.tracker.spawn(async move {
191 loop {
192 let period = if job.with_truncate_time() {
193 Self::truncate_period(period)
194 } else {
195 period
196 };
197 select! {
198 _ = token.cancelled() => break,
199 _ = sleep(period) => {
200 if job.with_cancel() {
201 job.run_with_cancel(token.child_token()).await;
202 } else {
203 job.run().await
204 }
205 }
206 };
207 }
208 });
209 self
210 }
211
212 pub async fn stop(self) {
213 self.tracker.close();
214 self.token.cancel();
215 self.tracker.wait().await;
216 }
217
218 pub async fn wait(self) {
219 signal::ctrl_c().await.unwrap();
220 self.stop().await;
221 }
222
223 pub async fn wait_cancel(self, token: CancellationToken) {
224 token.cancelled().await;
225 self.stop().await;
226 }
227
228 fn truncate_period(period: Duration) -> Duration {
229 let period = period.as_nanos();
230 let epoch = SystemTime::now()
231 .duration_since(SystemTime::UNIX_EPOCH)
232 .unwrap()
233 .as_nanos();
234 let nanos = period - epoch % period;
235 Duration::from_nanos(nanos as u64)
236 }
237}