Crate later

Source
Expand description

§later

A distributed background job manager and runner for Rust. This is currently in PoC stage.

§Set up

§1. Import later and required dependencies

later = { version = "0.0.7", features = ["redis", "postgres"] }
serde = "1.0"

§2. Define some types to use as a payload to the background jobs

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)] // <- Required derives
pub struct SendEmail {
    pub address: String,
    pub body: String,
}

// ... more as required

§3. Generate the stub

later::background_job! {
    struct Jobs {
        send_email: SendEmail,
    }
}

This generates two types

  • JobsBuilder - used to bootstrap the background job server - which can be used to enqueue jobs,
  • JobContext<T> - used to pass application context (T) in the handler as well as enqueue jobs,

§4. Use the generated code to bootstrap the background job server

For struct Jobs a type JobsBuilder will be generated. Use this to bootstrap the server.

use later::{storage::redis::Redis, BackgroundJobServer, mq::amqp, Config};

// bootstrap the server
let ctx = MyContext{ /*..*/ };                  // Any context to pass onto the handlers
let storage = Redis::new("redis://127.0.0.1/")  // More storage option to be available later
    .await
    .expect("connect to redis");
let mq = amqp::RabbitMq::new("amqp://guest:guest@localhost:5672".into()); // RabbitMq instance
let ctx = JobsBuilder::new(
    later::Config::builder()
        .name("fnf-example".into())             // Unique name for this app
        .context(ctx)                       // Pass the context here
        .storage(Box::new(storage))             // Configure storage
        .message_queue_client(Box::new(mq))     // Configure mq
        // ...
        .build()
    )
    // for each payload defined in the `struct Jobs` above
    // the generated fn name uses the pattern "with_[name]_handler"
    .with_send_email_handler(handle_send_email)     // Pass the handler function
    // ..
    .build()
    .await
    .expect("start BG Jobs server");

// use ctx.enqueue(SendEmail{ ... }) to enqueue jobs,
// or ctx.enqueue_continue(parent_job_id, SendEmail{ ... }) to chain jobs.
// this will only accept types defined inside the macro above
// define handler
async fn handle_send_email(
        ctx: JobsContext<MyContext>, // JobContext is generated wrapper
        payload: SendEmail,
    ) -> anyhow::Result<()> {
    // handle `payload`

    // ctx.app -> Access the MyContext passed during bootstrapping
    // ctx.enqueue(_).await to enqueue more jobs
    // ctx.enqueue_continue(_).await to chain jobs

    Ok(()) // or Err(_) to retry this message
}

This example use Redis storage. More storage is available in the storage module.


§Fire and forget jobs

Fire and forget jobs are executed only once and executed by an available worker almost immediately.

ctx.enqueue(SendEmail{
    address: "hello@rust-lang.org".to_string(),
    body: "You rock!".to_string()
}).await?;

§Continuations

One or many jobs are chained together to create an workflow. Child jobs are executed only when parent job has been finished.

let email_welcome = ctx.enqueue(SendEmail{
    address: "customer@example.com".to_string(),
    body: "Creating your account!".to_string()
}).await?;

let create_account = ctx.enqueue_continue(email_welcome, CreateAccount { id: "accout-1".to_string() }).await?;

let email_confirmation = ctx.enqueue_continue(create_account, SendEmail{
    address: "customer@example.com".to_string(),
    body: "Your account has been created!".to_string()
}).await;

§Delayed jobs

Just like fire and forget jobs that starts after a certain interval.

// delay
ctx.enqueue_delayed(SendEmail{
    address: "hello@rust-lang.org".to_string(),
    body: "You rock!".to_string()
}, std::time::Duration::from_secs(60)).await?;

// specific time
let run_job_at : chrono::DateTime<chrono::Utc> = todo!();
ctx.enqueue_delayed_at(SendEmail{
    address: "hello@rust-lang.org".to_string(),
    body: "You rock!".to_string()
}, run_job_at).await?;

§Recurring jobs

Run recurring job based on cron schedule.

ctx.enqueue_recurring("send-newsletter-1".to_string(),
    SendNewsletter{
        address: "hello@rust-lang.org".to_string(),
    },
    "0 6 1 * * *".to_string() // 6am, 1st day of every month
).await?;

§Storage

  • redis: later::storage::Redis::new("redis://127.0.0.1/").await
  • postgres: later::storage::Postgres::new("postgres://test:test@localhost/later_test").await (Requires feature postgres)

Re-exports§

pub use anyhow;
pub use async_trait;
pub use futures;

Modules§

core
encoder
instrument
Attach a span to a std::future::Future.
mq
storage

Macros§

background_job

Structs§

BackgroundJobServer
BackgroundJobServerPublisher
Config
JobId
RecurringJobId

Functions§

generate_id

Type Aliases§

UtcDateTime

Attribute Macros§

instrument
Instruments a function to create and enter a tracing span every time the function is called.