Crate bonsaimq

source ·
Expand description

Bonsaimq

crates.io page docs.rs page license: MIT

Simple database message queue based on bonsaidb.

The project is highly influenced by sqlxmq.

Warning: This project is in early alpha and should not be used in production!

Usage

Import the project using:

cargo add bonsaimq

or

# adjust the version to the latest version:
bonsaimq = "0.3.0"
# or
bonsaimq = { git = "https://github.com/FlixCoder/bonsaimq" }

Then you can use the message/job queue as follows:

  • You need job handlers, which are async functions that receive one argument of type CurrentJob and return nothing. CurrentJob allows interfacing the job to retrieve job input or complete the job etc.
  • The macro job_regristy! needs to be use to create a job registry, which maps message names/types to the job handlers and allows spawning new jobs.
  • A job runner needs to be created and run on a bonsai database. It runs in the background as long as the handle is in scope and executes the jobs according to the incoming messages. It acts on the job registry.

Example

Besides the following simple example, see the examples in the examples folder and take a look at the tests.

use bonsaidb::local::{
    config::{Builder, StorageConfiguration},
    AsyncDatabase,
};
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;

/// Example job function. It receives a handle to the current job, which gives
/// the ability to get the input payload, complete the job and more.
async fn greet(mut job: CurrentJob) -> color_eyre::Result<()> {
    // Load the JSON payload and make sure it is there.
    let name: String = job.payload_json().expect("input should be given")?;
    println!("Hello {name}!");
    job.complete().await?;
    Ok(())
}

// The JobRegistry provides a way to spawn new jobs and provides the interface
// for the JobRunner to find the functions to execute for the jobs.
job_registry!(JobRegistry, {
    Greetings: "greet" => greet,
});

#[tokio::main]
async fn main() -> Result<()> {
    // Open a local database for this example.
    let db_path = "simple-doc-example.bonsaidb";
    let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;

    // Start the job runner to execute jobs from the messages in the queue in the
    // database.
    let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();

    // Spawn new jobs via a message on the database queue.
    let job_id = JobRegistry::Greetings.builder().payload_json("cats")?.spawn(&db).await?;

    // Wait for job to finish execution, polling every 100 ms.
    bonsaimq::await_job(job_id, 100, &db).await?;

    // Clean up.
    job_runner.abort(); // Is done automatically on drop.
    tokio::fs::remove_dir_all(db_path).await?;
    Ok(())
}

Lints

This projects uses a bunch of clippy lints for higher code quality and style.

Install cargo-lints using cargo install --git https://github.com/FlixCoder/cargo-lints. The lints are defined in lints.toml and can be checked by running cargo lints clippy --all-targets --workspace.

Macros

  • Creates a job registry with the given name as first parameter. The second parameter is a named map of message names to functions, which are executed for the message.

Structs

  • JoinHandle that is aborted on Drop.
  • Handle for the job handlers. Allows retrieving input data, setting checkpoints and completing the job. The job is kept alive as long as this object lives.
  • Builder for spawning a job. By default, ordered mode is off and infinite retries with capped exponential backoff is used (1 second initially, maximum 1 hour between tries).
  • Job Runner. This is the job execution system to be run in the background. It runs on the specified database and using a specific job registry. It also allows to set a callback for errors that appear in jobs.
  • Database schema for the message queue.

Enums

Traits

Functions

  • Wait for a job to finish using a fixed interval to check if it exists.
  • Check whether the job with the given ID exists. Can also be used to check if a job has finished already.

Type Aliases