#![allow(clippy::expect_used, unused_qualifications, clippy::unused_async, clippy::print_stdout)]
mod common;
use bonsaidb::local::{
config::{Builder, StorageConfiguration},
AsyncDatabase,
};
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;
async fn checkpoints(mut job: CurrentJob) -> Result<()> {
loop {
match job.payload_json::<usize>().transpose()? {
None => {
println!("Beginning new job without input, without checkpoint.");
job.checkpoint().payload_json(1_usize)?.set().await?;
}
Some(mut count) => {
println!("Continuing job from checkpoint with count {count}");
count += 1;
job.checkpoint().payload_json(count)?.set().await?;
if count >= 5 {
job.complete().await?;
println!("Job completed!");
return Ok(());
}
}
}
}
}
job_registry!(JobRegistry, {
Checkpoints: "checkpoints" => checkpoints,
});
#[tokio::main]
async fn main() -> Result<()> {
common::init();
let db_path = "checkpoints-example.bonsaidb";
let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;
let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();
let job_id = JobRegistry::Checkpoints.builder().spawn(&db).await?;
bonsaimq::await_job(job_id, 100, &db).await?;
job_runner.abort(); tokio::fs::remove_dir_all(db_path).await?;
Ok(())
}
#[test]
#[ntest::timeout(10000)]
fn example_checkpoints() {
main().expect("running main");
}