Crate rexecutor

Source
Expand description

A robust job processing library for rust.

§Setting up Rexecutor

To create an instance of Rexecutor you will need to have an implementation of Backend. The only one provided in this crate is backend::memory::InMemoryBackend which is primarily provided for testing purposes. Instead a backend should be used from one of the implementations provided by other crates.

§Creating executors

Jobs are defined by creating a struct/enum and implementing Executor for it.

§Example defining an executor

You can define and enqueue a job as follows:

use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct EmailJob;

#[async_trait::async_trait]
impl Executor for EmailJob {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_job";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {}", Self::NAME, job.data);
        /// Do something important with an email
        ExecutionResult::Done
    }
}

let _ = EmailJob::builder()
    .with_data("bob.shuruncle@example.com".to_owned())
    .schedule_in(TimeDelta::hours(3))
    .enqueue()
    .await;

assert_enqueued!(
    with_data: "bob.shuruncle@example.com".to_owned(),
    scheduled_after: Utc::now() + TimeDelta::minutes(170),
    scheduled_before: Utc::now() + TimeDelta::minutes(190),
    for_executor: EmailJob
);

§Unique jobs

It is possible to ensure uniqueness of jobs based on certain criteria. This can be defined as part of the implementation of Executor via Executor::UNIQUENESS_CRITERIA or when inserting the job via job::builder::JobBuilder::unique.

For example to ensure that only one unique job is ran every five minutes it is possible to use the following uniqueness criteria.

struct UniqueJob;

#[async_trait::async_trait]
impl Executor for UniqueJob {
    type Data = ();
    type Metadata = ();
    const NAME: &'static str = "unique_job";
    const MAX_ATTEMPTS: u16 = 1;
    const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
        UniquenessCriteria::by_executor()
            .and_within(TimeDelta::seconds(300))
            .on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
    );
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {:?}", Self::NAME, job.data);
        // Do something important
        ExecutionResult::Done
    }
}

let _ = UniqueJob::builder().enqueue().await;
let _ = UniqueJob::builder().enqueue().await;

// Only one of jobs was enqueued
assert_enqueued!(
    1 job,
    scheduled_before: Utc::now(),
    for_executor: UniqueJob
);

Additionally it is possible to specify what action should be taken when there is a conflicting job. In the example above the priority is override. For more details of how to use uniqueness see job::uniqueness_criteria::UniquenessCriteria.

§Overriding Executor default values

When defining an Executor you specify the maximum number of attempts via Executor::MAX_ATTEMPTS. However, when inserting a job it is possible to override this value by calling job::builder::JobBuilder::with_max_attempts (if not called the max attempts will be equal to Executor::MAX_ATTEMPTS).

Similarly, the executor can define a job uniqueness criteria via Executor::UNIQUENESS_CRITERIA. However, using job::builder::JobBuilder::unique it is possible to override this value for a specific job.

§Setting up executors to run

For each executor you would like to run Rexecutor::with_executor should be called. Being explicit about this opens the possibility of having specific nodes in a cluster running as worker nodes for certain enqueued jobs while other node not responsible for their execution.

§Example setting up executors

let backend = InMemoryBackend::new();
Rexecutor::new(backend)
    .with_executor::<RefreshWorker>()
    .with_executor::<EmailScheduler>()
    .with_executor::<RegistrationWorker>();

§Enqueuing jobs

Generally jobs will be enqueued using the job::builder::JobBuilder returned by Executor::builder.

When enqueuing jobs the data and metadata of the job can be specified. Additionally, the default value of the Executor can be overriden.

§Overriding Executor default values

When defining an Executor you specify the maximum number of attempts via Executor::MAX_ATTEMPTS. However, when inserting a job it is possible to override this value by calling job::builder::JobBuilder::with_max_attempts (if not called the max attempts will be equal to Executor::MAX_ATTEMPTS).

Similarly, the executor can define a job uniqueness criteria via Executor::UNIQUENESS_CRITERIA. However, using job::builder::JobBuilder::unique it is possible to override this value for a specific job.

§Example enqueuing a job

let backend = Arc::new(InMemoryBackend::new().paused());
Rexecutor::new(backend.clone()).set_global_backend().unwrap();

ExampleExecutor::builder()
    .with_max_attempts(2)
    .with_tags(vec!["initial_job", "delayed"])
    .with_data("First job".into())
    .schedule_in(TimeDelta::hours(2))
    .enqueue_to_backend(&backend)
    .await
    .unwrap();

assert_enqueued!(
    to: backend,
    with_data: "First job".to_owned(),
    tagged_with: ["initial_job", "delayed"],
    scheduled_after: Utc::now() + TimeDelta::minutes(110),
    scheduled_before: Utc::now() + TimeDelta::minutes(130),
    for_executor: ExampleExecutor
);

§Compile time scheduling of cron jobs

It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using either Rexecutor::with_cron_executor or Rexecutor::with_cron_executor_for_timezone. The later is use to specify the specific timezone that the jobs should be scheduled to run in.

§Example setting up a UTC cron job

To setup a cron jobs to run every day at midnight you can use the following code.

struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
    type Data = String;
    type Metadata = ();
    const NAME: &'static str = "cron_job";
    const MAX_ATTEMPTS: u16 = 1;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        /// Do something important
        ExecutionResult::Done
    }
}
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();

let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());

§Pruning jobs

After jobs have completed, been cancelled, or discarded it is useful to be able to clean up. To setup the job pruner Rexecutor::with_job_pruner should be called passing in the given PrunerConfig.

Given the different ways in which jobs can finish it is often useful to be able to have fine grained control over how old jobs should be cleaned up. PrunerConfig enables such control.

When constructing PrunerConfig a cron::Schedule is provided to specify when the pruner should run.

Depending on the load/throughput of the system the pruner can be scheduled to run anywhere from once a year through to multiple times per hour.

§Example configuring the job pruner

let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
    .with_max_concurrency(Some(2))
    .with_pruner(
        Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
            .only::<RefreshWorker>()
            .and::<EmailScheduler>(),
    )
    .with_pruner(
        Pruner::max_length(200, JobStatus::Discarded)
            .except::<RefreshWorker>()
            .and::<EmailScheduler>(),
    );

let backend = InMemoryBackend::new();
Rexecutor::new(backend)
    .with_executor::<RefreshWorker>()
    .with_executor::<EmailScheduler>()
    .with_executor::<RegistrationWorker>()
    .with_job_pruner(config);

§Shutting rexecutor down

To avoid jobs getting killed mid way through their executions it is important to make use of graceful shutdown. This can either explicitly be called using Rexecutor::graceful_shutdown, or via use of the DropGuard obtained via Rexecutor::drop_guard.

Using Rexecutor::graceful_shutdown or Rexecutor::drop_guard will ensure that all currently executing jobs will be allowed time to complete before shutting rexecutor down.

§Example using the DropGuard

let backend = InMemoryBackend::new();
// Note this must be given a name to ensure it is dropped at the end of the scope.
// See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
let _guard = Rexecutor::new(backend)
    .with_executor::<RefreshWorker>()
    .with_executor::<EmailScheduler>()
    .with_executor::<RegistrationWorker>()
    .drop_guard();

§Global backend

Rexecutor can be ran either with use of a global backend. This enables the use of the convenience job::builder::JobBuilder::enqueue method which does not require a reference to the backend to be passed down to the code that needs to enqueue a job.

The global backend can be set using Rexecutor::set_global_backend this should only be called once otherwise it will return an error.

In fact for a single Rexecutor instance it is impossible to call this twice

let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();

Note, using a global backend has many of the same drawbacks of any global variable in particular it can make unit testing more difficult.

Modules§

backend
The definition of the rexecutor backend.
backoff
Common backoff strategies for use as part of crate::Executor::backoff.
executor
Definition of the main trait Executor for defining enqueuable executions units (jobs).
job
Structs and definitions for jobs.
prelude
The purpose of this module is to alleviate the need to import many of the crate types.
pruner
The API for configuring the job pruner.
testing
Helpers for testing.

Macros§

assert_enqueued
A macro for making assertions about what jobs should have been enqueued.
test_suite
Create test suite for rexecutor backend.

Structs§

DropGuard
To enable automatic clean up this guard will shut down rexucutor and all associated tasks when dropped.
Rexecutor
The entry point for setting up rexecutor.

Enums§

RexecutorError
Errors that can occur when working with rexecutor.