Expand description

later

A distributed background job manager and runner for Rust. This is currently in PoC stage.

Set up

1. Import later and required dependencies

later = { version = "0.0.7", features = ["redis", "postgres"] }
serde = "1.0"

2. Define some types to use as a payload to the background jobs

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)] // <- Required derives
pub struct SendEmail {
    pub address: String,
    pub body: String,
}

// ... more as required

3. Generate the stub

later::background_job! {
    struct Jobs {
        send_email: SendEmail,
    }
}

This generates two types

  • JobsBuilder - used to bootstrap the background job server - which can be used to enqueue jobs,
  • JobContext<T> - used to pass application context (T) in the handler as well as enqueue jobs,

4. Use the generated code to bootstrap the background job server

For struct Jobs a type JobsBuilder will be generated. Use this to bootstrap the server.

use later::{storage::redis::Redis, BackgroundJobServer, mq::amqp, Config};

// bootstrap the server
let ctx = MyContext{ /*..*/ };                  // Any context to pass onto the handlers
let storage = Redis::new("redis://127.0.0.1/")  // More storage option to be available later
    .await
    .expect("connect to redis");
let mq = amqp::RabbitMq::new("amqp://guest:guest@localhost:5672".into()); // RabbitMq instance
let ctx = JobsBuilder::new(
    later::Config::builder()
        .name("fnf-example".into())             // Unique name for this app
        .context(ctx)                       // Pass the context here
        .storage(Box::new(storage))             // Configure storage
        .message_queue_client(Box::new(mq))     // Configure mq
        // ...
        .build()
    )
    // for each payload defined in the `struct Jobs` above
    // the generated fn name uses the pattern "with_[name]_handler"
    .with_send_email_handler(handle_send_email)     // Pass the handler function
    // ..
    .build()
    .await
    .expect("start BG Jobs server");

// use ctx.enqueue(SendEmail{ ... }) to enqueue jobs,
// or ctx.enqueue_continue(parent_job_id, SendEmail{ ... }) to chain jobs.
// this will only accept types defined inside the macro above
// define handler
async fn handle_send_email(
        ctx: JobsContext<MyContext>, // JobContext is generated wrapper
        payload: SendEmail,
    ) -> anyhow::Result<()> {
    // handle `payload`

    // ctx.app -> Access the MyContext passed during bootstrapping
    // ctx.enqueue(_).await to enqueue more jobs
    // ctx.enqueue_continue(_).await to chain jobs

    Ok(()) // or Err(_) to retry this message
}

This example use Redis storage. More storage is available in the storage module.


Fire and forget jobs

Fire and forget jobs are executed only once and executed by an available worker almost immediately.

ctx.enqueue(SendEmail{
    address: "hello@rust-lang.org".to_string(),
    body: "You rock!".to_string()
}).await?;

Continuations

One or many jobs are chained together to create an workflow. Child jobs are executed only when parent job has been finished.

let email_welcome = ctx.enqueue(SendEmail{
    address: "customer@example.com".to_string(),
    body: "Creating your account!".to_string()
}).await?;

let create_account = ctx.enqueue_continue(email_welcome, CreateAccount { id: "accout-1".to_string() }).await?;

let email_confirmation = ctx.enqueue_continue(create_account, SendEmail{
    address: "customer@example.com".to_string(),
    body: "Your account has been created!".to_string()
}).await;

Delayed jobs

Just like fire and forget jobs that starts after a certain interval.

// delay
ctx.enqueue_delayed(SendEmail{
    address: "hello@rust-lang.org".to_string(),
    body: "You rock!".to_string()
}, std::time::Duration::from_secs(60)).await?;

// specific time
let run_job_at : chrono::DateTime<chrono::Utc> = todo!();
ctx.enqueue_delayed_at(SendEmail{
    address: "hello@rust-lang.org".to_string(),
    body: "You rock!".to_string()
}, run_job_at).await?;

Recurring jobs

Run recurring job based on cron schedule.

ctx.enqueue_recurring("send-newsletter-1".to_string(),
    SendNewsletter{
        address: "hello@rust-lang.org".to_string(),
    },
    "0 6 1 * * *".to_string() // 6am, 1st day of every month
).await?;

Storage

  • redis: later::storage::Redis::new("redis://127.0.0.1/").await
  • postgres: later::storage::Postgres::new("postgres://test:test@localhost/later_test").await (Requires feature postgres)

Re-exports

pub use anyhow;
pub use async_trait;
pub use futures;

Modules

Attach a span to a std::future::Future.

Macros

Structs

Functions

Type Definitions

Attribute Macros

Instruments a function to create and enter a tracing span every time the function is called.