# tokio-cron-scheduler
Use cron-like scheduling in an async tokio environment.
Also schedule tasks at an instant or repeat them at a fixed duration.
Task's data can optionally be persisted using PostgreSQL or Nats.
Inspired by https://github.com/lholden/job_scheduler
[![](https://docs.rs/tokio_cron_scheduler/badge.svg)](https://docs.rs/tokio_cron_scheduler) [![](https://img.shields.io/crates/v/tokio_cron_scheduler.svg)](https://crates.io/crates/tokio_cron_scheduler) [![](https://travis-ci.org/mvniekerk/tokio_cron_scheduler.svg?branch=master)](https://travis-ci.org/mvniekerk/tokio_cron_scheduler)
## Usage
Please see the [Documentation](https://docs.rs/tokio_cron_scheduler/) for more details.
Be sure to add the job_scheduler crate to your `Cargo.toml`:
```toml
[dependencies]
tokio-cron-scheduler = "*"
```
Creating a schedule for a job is done using the `FromStr` impl for the
`Schedule` type of the [cron](https://github.com/zslayton/cron) library.
The scheduling format is as follows:
```text
sec min hour day of month month day of week year
* * * * * * *
```
Time is specified for `UTC` and not your local timezone. Note that the year may
be omitted.
Comma separated values such as `5,8,10` represent more than one time value. So
for example, a schedule of `0 2,14,26 * * * *` would execute on the 2nd, 14th,
and 26th minute of every hour.
Ranges can be specified with a dash. A schedule of `0 0 * 5-10 * *` would
execute once per hour but only on day 5 through 10 of the month.
Day of the week can be specified as an abbreviation or the full name. A
schedule of `0 0 6 * * Sun,Sat` would execute at 6am on Sunday and Saturday.
Per job you can be notified when the jobs were started, stopped and removed. Because these notifications
are scheduled using tokio::spawn, the order of these are not guaranteed if the task finishes quickly.
A simple usage example:
```rust
use tokio_cron_scheduler::{JobScheduler, JobToRun, Job};
#[tokio::main]
async fn main() {
let mut sched = JobScheduler::new().await;
sched.add(Job::new("1/10 * * * * *", |uuid, l| {
println!("I run every 10 seconds");
}).await.unwrap());
sched.add(Job::new_async("1/7 * * * * *", |uuid, mut l| Box::pin( async {
println!("I run async every 7 seconds");
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for 7s is {:?}", ts),
_ => warn!("Could not get next tick for 7s job"),
}
})).await.unwrap());
sched.add(Job::new("1/30 * * * * *", |uuid, l| {
println!("I run every 30 seconds");
}).await.unwrap());
sched.add(
Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
println!("{:?} I'm only run once", chrono::Utc::now());
}).unwrap()
).await;
let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
println!("{:?} I'm repeated every 8 seconds", chrono::Utc::now());
}).unwrap();
jj.on_start_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("Job {:?} was started, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
})).await;
jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("Job {:?} was completed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
})).await;
jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("Job {:?} was removed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
})).await;
sched.add(jj).await;
let five_s_job = Job::new("1/5 * * * * *", |_uuid, _l| {
println!("{:?} I run every 5 seconds", chrono::Utc::now());
})
.unwrap();
sched.add(five_s_job).await;
let four_s_job_async = Job::new_async("1/4 * * * * *", |_uuid, _l| Box::pin(async move {
println!("{:?} I run async every 4 seconds", chrono::Utc::now());
})).unwrap();
sched.add(four_s_job_async).await;
sched.add(
Job::new("1/30 * * * * *", |_uuid, _l| {
println!("{:?} I run every 30 seconds", chrono::Utc::now());
})
.unwrap(),
).await;
sched.add(
Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
println!("{:?} I'm only run once", chrono::Utc::now());
}).unwrap()
).await;
sched.add(
Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| Box::pin( async move {
println!("{:?} I'm only run once async", chrono::Utc::now());
})).unwrap()
).await;
let jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
println!("{:?} I'm repeated every 8 seconds", chrono::Utc::now());
}).unwrap();
sched.add(jj).await;
let jja = Job::new_repeated_async(Duration::from_secs(7), |_uuid, _l| Box::pin(async move {
println!("{:?} I'm repeated async every 7 seconds", chrono::Utc::now());
})).unwrap();
sched.add(jja).await;
#[cfg(feature = "signal")]
sched.shutdown_on_ctrl_c();
sched.set_shutdown_handler(Box::new(|| {
Box::pin(async move {
println!("Shut down done");
})
}));
sched.start().await;
// Wait a while so that the jobs actually run
tokio::time::sleep(core::time::Duration::from_secs(100)).await;
}
```
## Similar Libraries
* [job_scheduler](https://github.com/lholden/job_scheduler) The crate that inspired this one
* [cron](https://github.com/zslayton/cron) the cron expression parser we use.
* [schedule-rs](https://github.com/mehcode/schedule-rs) is a similar rust library that implements it's own cron expression parser.
## License
TokioCronScheduler is licensed under either of
* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or
http://opensource.org/licenses/MIT)
## Custom storage
The MetadataStore and NotificationStore traits can be implemented and be used in the JobScheduler.
A default volatile hashmap based version is provided with the SimpleMetadataStore and SimpleNotificationStore. A persistent version using Nats is provided with NatsMetadataStore and NatsNotificationStore.
## Contributing
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall
be dual licensed as above, without any additional terms or conditions.
Please see the [CONTRIBUTING](CONTRIBUTING.md) file for more information.
## Features
### has_bytes
Since 0.7
Enables Prost-generated data structures to be used by stores that need to get the bytes
of the data structs. The Nats and Postgres stores depend on this feature being enabled.
### postgres_storage
Since 0.6
Adds the Postgres metadata store, notification store (PostgresMetadataStore, PostgresNotificationStore). Use a Postgres database to store the metadata and notifications data.
See [PostgreSQL docs](./postgres.md)
### postgres_native_tls
Since 0.6
Uses postgres-native-tls crate as the TLS provider for the PostgreSQL connection.
### postgres_openssl
Since 0.6
Uses the postgres-openssl crate as the TLS provider for the PostgreSQL connection.
### nats_storage
Since 0.6
Adds the Nats metadata store, notification store (NatsMetadataStore, NatsNotificationStore). Use a Nats system as a way to store the metadata and notifications.
See [Nats docs](./nats.md)
### signal
Since 0.5
Adds `shutdown_on_signal` and `shutdown_on_ctrl_c` to the scheduler.
Both shuts the system down (stops the scheduler, removes all the tasks) when a signal
was received.
## Writing tests
When doing a tokio::test, remember to have it run in a multi-threaded context otherwise the test
will hang on `scheduler.add()`.
For example:
```rust
#[cfg(test)]
mod test {
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;
// Needs multi_thread to test, otherwise it hangs on scheduler.add()
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// #[tokio::test]
async fn test_schedule() {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("Setting default subscriber failed");
info!("Create scheduler");
let scheduler = JobScheduler::new().await.unwrap();
info!("Add job");
scheduler
.add(
Job::new_async("*/1 * * * * *", |_, _| {
Box::pin(async {
info!("Run every seconds");
})
})
.unwrap(),
)
.await
.expect("Should be able to add a job");
scheduler.start().await.unwrap();
tokio::time::sleep(core::time::Duration::from_secs(20)).await;
}
}
```
## Examples
### simple
Runs the in-memory hashmap based storage
```shell
cargo run --example simple --features="tracing-subscriber"
```
### postgres
Needs a running PostgreSQL instance first:
```shell
docker run --rm -it -p 5432:5432 -e POSTGRES_USER="postgres" -e POSTGRES_PASSWORD="" -e POSTGRES_HOST_AUTH_METHOD="trust" postgres:14.1
```
Then run the example:
```shell
POSTGRES_INIT_METADATA=true POSTGRES_INIT_NOTIFICATIONS=true cargo run --example postgres --features="postgres_storage tracing-subscriber"
```
### nats
Needs a running Nats instance first with Jetstream enabled:
```shell
docker run --rm -it -p 4222:4222 -p 6222:6222 -p 7222:7222 -p 8222:8222 nats -js -DV
```
Then run the example:
```shell
cargo run --example nats --features="nats_storage tracing-subscriber"
```
## Design
### Job activity
![Job activity](./doc/job_activity.svg)
### Create job
![Create job](./doc/create_job.svg)
### Create notification
![Create notification](./doc/create_notification.svg)
### Delete job
![Delete job](./doc/delete_job.svg)
### Delete notification
![Delete notification](./doc/delete_notification.svg)