bonsaimq 0.2.0

Message/job queue based on bonsaidb, similar to sqlxmq.
//! Checkpoint example.
#![allow(clippy::expect_used, unused_qualifications, clippy::unused_async, clippy::print_stdout)]

mod common;

use bonsaidb::local::{
	config::{Builder, StorageConfiguration},
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;

/// Example job function using checkpoints.
async fn checkpoints(mut job: CurrentJob) -> Result<()> {
	loop {
		match job.payload_json::<usize>().transpose()? {
			None => {
				println!("Beginning new job without input, without checkpoint.");
			Some(mut count) => {
				println!("Continuing job from checkpoint with count {count}");
				count += 1;
				if count >= 5 {
					println!("Job completed!");
					return Ok(());

// The JobRegistry provides a way to spawn new jobs and provides the interface
// for the JobRunner to find the functions to execute for the jobs.
job_registry!(JobRegistry, {
	Checkpoints: "checkpoints" => checkpoints,

async fn main() -> Result<()> {

	// Open a local database for this example.
	let db_path = "checkpoints-example.bonsaidb";
	let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;

	// Start the job runner to execute jobs from the messages in the queue in the
	// database.
	let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();

	// Spawn new jobs via a message on the database queue.
	let job_id = JobRegistry::Checkpoints.builder().spawn(&db).await?;

	// Wait for job to finish execution.
	bonsaimq::await_job(job_id, 100, &db).await?;

	job_runner.abort(); // Is done automatically on drop.

fn example_checkpoints() {
	main().expect("running main");