Expand description
A robust job processing library for rust.
§Setting up Rexecutor
To create an instance of Rexecutor
you will need to have an implementation of Backend
.
The only one provided in this crate is backend::memory::InMemoryBackend
which is primarily
provided for testing purposes. Instead a backend should be used from one of the implementations
provided by other crates.
§Creating executors
Jobs are defined by creating a struct/enum and implementing Executor
for it.
§Example defining an executor
You can define and enqueue a job as follows:
use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct EmailJob;
#[async_trait::async_trait]
impl Executor for EmailJob {
type Data = String;
type Metadata = String;
const NAME: &'static str = "email_job";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
println!("{} running, with args: {}", Self::NAME, job.data);
/// Do something important with an email
ExecutionResult::Done
}
}
let _ = EmailJob::builder()
.with_data("bob.shuruncle@example.com".to_owned())
.schedule_in(TimeDelta::hours(3))
.enqueue()
.await;
assert_enqueued!(
with_data: "bob.shuruncle@example.com".to_owned(),
scheduled_after: Utc::now() + TimeDelta::minutes(170),
scheduled_before: Utc::now() + TimeDelta::minutes(190),
for_executor: EmailJob
);
§Unique jobs
It is possible to ensure uniqueness of jobs based on certain criteria. This can be defined as
part of the implementation of Executor
via Executor::UNIQUENESS_CRITERIA
or when
inserting the job via job::builder::JobBuilder::unique
.
For example to ensure that only one unique job is ran every five minutes it is possible to use the following uniqueness criteria.
struct UniqueJob;
#[async_trait::async_trait]
impl Executor for UniqueJob {
type Data = ();
type Metadata = ();
const NAME: &'static str = "unique_job";
const MAX_ATTEMPTS: u16 = 1;
const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
UniquenessCriteria::by_executor()
.and_within(TimeDelta::seconds(300))
.on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
);
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
println!("{} running, with args: {:?}", Self::NAME, job.data);
// Do something important
ExecutionResult::Done
}
}
let _ = UniqueJob::builder().enqueue().await;
let _ = UniqueJob::builder().enqueue().await;
// Only one of jobs was enqueued
assert_enqueued!(
1 job,
scheduled_before: Utc::now(),
for_executor: UniqueJob
);
Additionally it is possible to specify what action should be taken when there is a conflicting
job. In the example above the priority is override. For more details of how to use uniqueness
see job::uniqueness_criteria::UniquenessCriteria
.
§Overriding Executor
default values
When defining an Executor
you specify the maximum number of attempts via
Executor::MAX_ATTEMPTS
. However, when inserting a job it is possible to override this value
by calling job::builder::JobBuilder::with_max_attempts
(if not called the max attempts will be equal to
Executor::MAX_ATTEMPTS
).
Similarly, the executor can define a job uniqueness criteria via
Executor::UNIQUENESS_CRITERIA
. However, using job::builder::JobBuilder::unique
it is possible to
override this value for a specific job.
§Setting up executors to run
For each executor you would like to run Rexecutor::with_executor
should be called. Being
explicit about this opens the possibility of having specific nodes in a cluster running as
worker nodes for certain enqueued jobs while other node not responsible for their execution.
§Example setting up executors
let backend = InMemoryBackend::new();
Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>();
§Enqueuing jobs
Generally jobs will be enqueued using the job::builder::JobBuilder
returned by Executor::builder
.
When enqueuing jobs the data and metadata of the job can be specified. Additionally, the
default value of the Executor
can be overriden.
§Overriding Executor
default values
When defining an Executor
you specify the maximum number of attempts via
Executor::MAX_ATTEMPTS
. However, when inserting a job it is possible to override this value
by calling job::builder::JobBuilder::with_max_attempts
(if not called the max attempts will be equal to
Executor::MAX_ATTEMPTS
).
Similarly, the executor can define a job uniqueness criteria via
Executor::UNIQUENESS_CRITERIA
. However, using job::builder::JobBuilder::unique
it is possible to
override this value for a specific job.
§Example enqueuing a job
let backend = Arc::new(InMemoryBackend::new().paused());
Rexecutor::new(backend.clone()).set_global_backend().unwrap();
ExampleExecutor::builder()
.with_max_attempts(2)
.with_tags(vec!["initial_job", "delayed"])
.with_data("First job".into())
.schedule_in(TimeDelta::hours(2))
.enqueue_to_backend(&backend)
.await
.unwrap();
assert_enqueued!(
to: backend,
with_data: "First job".to_owned(),
tagged_with: ["initial_job", "delayed"],
scheduled_after: Utc::now() + TimeDelta::minutes(110),
scheduled_before: Utc::now() + TimeDelta::minutes(130),
for_executor: ExampleExecutor
);
§Compile time scheduling of cron jobs
It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using
either Rexecutor::with_cron_executor
or Rexecutor::with_cron_executor_for_timezone
. The
later is use to specify the specific timezone that the jobs should be scheduled to run in.
§Example setting up a UTC cron job
To setup a cron jobs to run every day at midnight you can use the following code.
struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
type Data = String;
type Metadata = ();
const NAME: &'static str = "cron_job";
const MAX_ATTEMPTS: u16 = 1;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
/// Do something important
ExecutionResult::Done
}
}
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
§Pruning jobs
After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
To setup the job pruner Rexecutor::with_job_pruner
should be called passing in the given
PrunerConfig
.
Given the different ways in which jobs can finish it is often useful to be able to have fine
grained control over how old jobs should be cleaned up. PrunerConfig
enables such control.
When constructing PrunerConfig
a cron::Schedule
is provided to specify when the pruner
should run.
Depending on the load/throughput of the system the pruner can be scheduled to run anywhere from once a year through to multiple times per hour.
§Example configuring the job pruner
let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
.with_max_concurrency(Some(2))
.with_pruner(
Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
.only::<RefreshWorker>()
.and::<EmailScheduler>(),
)
.with_pruner(
Pruner::max_length(200, JobStatus::Discarded)
.except::<RefreshWorker>()
.and::<EmailScheduler>(),
);
let backend = InMemoryBackend::new();
Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>()
.with_job_pruner(config);
§Shutting rexecutor down
To avoid jobs getting killed mid way through their executions it is important to make use of
graceful shutdown. This can either explicitly be called using Rexecutor::graceful_shutdown
,
or via use of the DropGuard
obtained via Rexecutor::drop_guard
.
Using Rexecutor::graceful_shutdown
or Rexecutor::drop_guard
will ensure that all
currently executing jobs will be allowed time to complete before shutting rexecutor down.
§Example using the DropGuard
let backend = InMemoryBackend::new();
// Note this must be given a name to ensure it is dropped at the end of the scope.
// See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
let _guard = Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>()
.drop_guard();
§Global backend
Rexecutor can be ran either with use of a global backend. This enables the use of the
convenience job::builder::JobBuilder::enqueue
method which does not require a reference to
the backend to be passed down to the code that needs to enqueue a job.
The global backend can be set using Rexecutor::set_global_backend
this should only be
called once otherwise it will return an error.
In fact for a single Rexecutor
instance it is impossible to call this twice
let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();
Note, using a global backend has many of the same drawbacks of any global variable in particular it can make unit testing more difficult.
Modules§
- backend
- The definition of the rexecutor backend.
- backoff
- Common backoff strategies for use as part of
crate::Executor::backoff
. - executor
- Definition of the main trait
Executor
for defining enqueuable executions units (jobs). - job
- Structs and definitions for jobs.
- prelude
- The purpose of this module is to alleviate the need to import many of the
crate
types. - pruner
- The API for configuring the job pruner.
- testing
- Helpers for testing.
Macros§
- assert_
enqueued - A macro for making assertions about what jobs should have been enqueued.
- test_
suite - Create test suite for rexecutor backend.
Structs§
- Drop
Guard - To enable automatic clean up this guard will shut down rexucutor and all associated tasks when dropped.
- Rexecutor
- The entry point for setting up rexecutor.
Enums§
- Rexecutor
Error - Errors that can occur when working with rexecutor.