Expand description
§later
A distributed background job manager and runner for Rust. This is currently in PoC stage.
§Set up
§1. Import later
and required dependencies
later = { version = "0.0.7", features = ["redis", "postgres"] }
serde = "1.0"
§2. Define some types to use as a payload to the background jobs
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)] // <- Required derives
pub struct SendEmail {
pub address: String,
pub body: String,
}
// ... more as required
§3. Generate the stub
later::background_job! {
struct Jobs {
send_email: SendEmail,
}
}
This generates two types
JobsBuilder
- used to bootstrap the background job server - which can be used to enqueue jobs,JobContext<T>
- used to pass application context (T
) in the handler as well as enqueue jobs,
§4. Use the generated code to bootstrap the background job server
For struct Jobs
a type JobsBuilder
will be generated. Use this to bootstrap the server.
use later::{storage::redis::Redis, BackgroundJobServer, mq::amqp, Config};
// bootstrap the server
let ctx = MyContext{ /*..*/ }; // Any context to pass onto the handlers
let storage = Redis::new("redis://127.0.0.1/") // More storage option to be available later
.await
.expect("connect to redis");
let mq = amqp::RabbitMq::new("amqp://guest:guest@localhost:5672".into()); // RabbitMq instance
let ctx = JobsBuilder::new(
later::Config::builder()
.name("fnf-example".into()) // Unique name for this app
.context(ctx) // Pass the context here
.storage(Box::new(storage)) // Configure storage
.message_queue_client(Box::new(mq)) // Configure mq
// ...
.build()
)
// for each payload defined in the `struct Jobs` above
// the generated fn name uses the pattern "with_[name]_handler"
.with_send_email_handler(handle_send_email) // Pass the handler function
// ..
.build()
.await
.expect("start BG Jobs server");
// use ctx.enqueue(SendEmail{ ... }) to enqueue jobs,
// or ctx.enqueue_continue(parent_job_id, SendEmail{ ... }) to chain jobs.
// this will only accept types defined inside the macro above
// define handler
async fn handle_send_email(
ctx: JobsContext<MyContext>, // JobContext is generated wrapper
payload: SendEmail,
) -> anyhow::Result<()> {
// handle `payload`
// ctx.app -> Access the MyContext passed during bootstrapping
// ctx.enqueue(_).await to enqueue more jobs
// ctx.enqueue_continue(_).await to chain jobs
Ok(()) // or Err(_) to retry this message
}
This example use Redis
storage. More storage is available in the storage
module.
§Fire and forget jobs
Fire and forget jobs are executed only once and executed by an available worker almost immediately.
ctx.enqueue(SendEmail{
address: "hello@rust-lang.org".to_string(),
body: "You rock!".to_string()
}).await?;
§Continuations
One or many jobs are chained together to create an workflow. Child jobs are executed only when parent job has been finished.
let email_welcome = ctx.enqueue(SendEmail{
address: "customer@example.com".to_string(),
body: "Creating your account!".to_string()
}).await?;
let create_account = ctx.enqueue_continue(email_welcome, CreateAccount { id: "accout-1".to_string() }).await?;
let email_confirmation = ctx.enqueue_continue(create_account, SendEmail{
address: "customer@example.com".to_string(),
body: "Your account has been created!".to_string()
}).await;
§Delayed jobs
Just like fire and forget jobs that starts after a certain interval.
// delay
ctx.enqueue_delayed(SendEmail{
address: "hello@rust-lang.org".to_string(),
body: "You rock!".to_string()
}, std::time::Duration::from_secs(60)).await?;
// specific time
let run_job_at : chrono::DateTime<chrono::Utc> = todo!();
ctx.enqueue_delayed_at(SendEmail{
address: "hello@rust-lang.org".to_string(),
body: "You rock!".to_string()
}, run_job_at).await?;
§Recurring jobs
Run recurring job based on cron schedule.
ctx.enqueue_recurring("send-newsletter-1".to_string(),
SendNewsletter{
address: "hello@rust-lang.org".to_string(),
},
"0 6 1 * * *".to_string() // 6am, 1st day of every month
).await?;
§Storage
redis
:later::storage::Redis::new("redis://127.0.0.1/").await
postgres
:later::storage::Postgres::new("postgres://test:test@localhost/later_test").await
(Requires featurepostgres
)
Re-exports§
pub use anyhow;
pub use async_trait;
pub use futures;
Modules§
- core
- encoder
- instrument
- Attach a span to a
std::future::Future
. - mq
- storage
Macros§
Structs§
Functions§
Type Aliases§
Attribute Macros§
- instrument
- Instruments a function to create and enter a
tracing
span every time the function is called.