Trait Executor

Source
pub trait Executor {
    type Data: Serialize + DeserializeOwned;
    type Metadata: Serialize + DeserializeOwned;

    const NAME: &'static str;
    const MAX_ATTEMPTS: u16 = 5u16;
    const MAX_CONCURRENCY: Option<usize> = None;
    const BLOCKING: bool = false;
    const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = None;

    // Required method
    fn execute<'async_trait>(
        job: Job<Self::Data, Self::Metadata>,
    ) -> Pin<Box<dyn Future<Output = ExecutionResult> + Send + 'async_trait>>
       where Self: 'async_trait;

    // Provided methods
    fn backoff(job: &Job<Self::Data, Self::Metadata>) -> TimeDelta { ... }
    fn timeout(_job: &Job<Self::Data, Self::Metadata>) -> Option<Duration> { ... }
    fn builder<'a>() -> JobBuilder<'a, Self>
       where Self: Sized,
             Self::Data: Serialize + DeserializeOwned,
             Self::Metadata: Serialize + DeserializeOwned { ... }
    fn cancel_job<'async_trait>(
        job_id: JobId,
        cancellation_reason: Box<dyn CancellationReason>,
    ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
       where Self: Send + 'async_trait { ... }
    fn cancel_job_on_backend<'life0, 'async_trait, B>(
        job_id: JobId,
        cancellation_reason: Box<dyn CancellationReason>,
        backend: &'life0 B,
    ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
       where B: Backend + ?Sized + Sync + 'async_trait,
             'life0: 'async_trait { ... }
    fn rerun_job<'async_trait>(
        job_id: JobId,
    ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
       where Self: Send + 'async_trait { ... }
    fn rerun_job_on_backend<'life0, 'async_trait, B>(
        job_id: JobId,
        backend: &'life0 B,
    ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
       where B: Backend + ?Sized + Sync + 'async_trait,
             'life0: 'async_trait { ... }
    fn query_jobs<'a, 'async_trait>(
        query: Where<'a, Self::Data, Self::Metadata>,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
       where Self::Data: 'static + Send + Serialize + DeserializeOwned + Sync,
             Self::Metadata: 'static + Send + Serialize + DeserializeOwned + Sync,
             Self: Send + 'async_trait,
             'a: 'async_trait { ... }
    fn query_jobs_on_backend<'a, 'life0, 'async_trait, B>(
        query: Where<'a, Self::Data, Self::Metadata>,
        backend: &'life0 B,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
       where Self::Data: 'static + Send + Serialize + DeserializeOwned + Sync,
             Self::Metadata: 'static + Send + Serialize + DeserializeOwned + Sync,
             B: Backend + ?Sized + Sync + 'async_trait,
             Self: Send + 'async_trait,
             'a: 'async_trait,
             'life0: 'async_trait { ... }
    fn update_job<'async_trait>(
        job: Job<Self::Data, Self::Metadata>,
    ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
       where Self::Data: 'static + Send + Serialize + Sync,
             Self::Metadata: 'static + Send + Serialize + Sync,
             Self: Send + 'async_trait { ... }
    fn update_job_on_backend<'life0, 'async_trait, B>(
        job: Job<Self::Data, Self::Metadata>,
        backend: &'life0 B,
    ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
       where B: Backend + ?Sized + Sync + 'async_trait,
             Self::Data: 'static + Serialize + Send + Sync,
             Self::Metadata: 'static + Serialize + Send + Sync,
             Self: Send + 'async_trait,
             'life0: 'async_trait { ... }
}
Expand description

An enqueuable execution unit.

Jobs are defined by creating a struct/enum and implementing Executor for it.

§Example

You can define and enqueue a job as follows:

struct EmailJob;

#[async_trait::async_trait]
impl Executor for EmailJob {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_job";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {}", Self::NAME, job.data);
        /// Do something important with an email
        ExecutionResult::Done
    }
}

let _ = EmailJob::builder()
    .with_data("bob.shuruncle@example.com".to_owned())
    .schedule_in(TimeDelta::hours(3))
    .enqueue()
    .await;

assert_enqueued!(
    with_data: "bob.shuruncle@example.com".to_owned(),
    scheduled_after: Utc::now() + TimeDelta::minutes(170),
    scheduled_before: Utc::now() + TimeDelta::minutes(190),
    for_executor: EmailJob
);

§Unique jobs

It is possible to ensure uniqueness of jobs based on certain criteria. This can be defined as part of the implementation of Executor via Executor::UNIQUENESS_CRITERIA or when inserting the job via JobBuilder::unique.

For example to ensure that only one unique job is ran every five minutes it is possible to use the following uniqueness criteria.

struct UniqueJob;

#[async_trait::async_trait]
impl Executor for UniqueJob {
    type Data = ();
    type Metadata = ();
    const NAME: &'static str = "unique_job";
    const MAX_ATTEMPTS: u16 = 1;
    const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
        UniquenessCriteria::by_executor()
            .and_within(TimeDelta::seconds(300))
            .on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
    );
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {:?}", Self::NAME, job.data);
        // Do something important
        ExecutionResult::Done
    }
}

let _ = UniqueJob::builder().enqueue().await;
let _ = UniqueJob::builder().enqueue().await;

// Only one of jobs was enqueued
assert_enqueued!(
    1 job,
    scheduled_before: Utc::now(),
    for_executor: UniqueJob
);

Additionally it is possible to specify what action should be taken when there is a conflicting job. In the example above the priority is override. For more details of how to use uniqueness see UniquenessCriteria.

§Overriding Executor default values

When defining an Executor you specify the maximum number of attempts via Executor::MAX_ATTEMPTS. However, when inserting a job it is possible to override this value by calling JobBuilder::with_max_attempts (if not called the max attempts will be equal to Executor::MAX_ATTEMPTS.

Similarly, the executor can define a job uniqueness criteria via Executor::UNIQUENESS_CRITERIA. However, using JobBuilder::unique it is possible to override this value for a specific job.

Required Associated Constants§

Source

const NAME: &'static str

The name of the executor.

This is used to associate the jobs stored in the backend with this particular executor.

This should be set to a unique value for the backend to ensure no clashes with other executors running on the same backend.

The motivation for using a static string here is to enable developers to rename their rust types for their executor without breaking the integration with the backend.

Provided Associated Constants§

Source

const MAX_ATTEMPTS: u16 = 5u16

The maximum number of attempts to try this job before it is discarded.

When enqueuing any given job this can be overridden via JobBuilder::with_max_attempts.

Source

const MAX_CONCURRENCY: Option<usize> = None

The maximum number of concurrent jobs to be running. If set to None there will be no concurrency limit and an arbitrary number can be ran simultaneously.

Source

const BLOCKING: bool = false

This flag should be set to true if the job is computationally expensive.

This is to prevent a computationally expensive job locking up the asynchronous runtime.

Under the covers this results in the job executor being ran via tokio::task::spawn_blocking. See the docs for more details about blocking futures.

Source

const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = None

This can be used to ensure that only unique jobs matching the UniquenessCriteria are inserted.

If there is already a job inserted matching the given constraints there is the option to either update the current job or do nothing. See the docs of UniquenessCriteria for more details.

It is possible to override when inserting a specific job via JobBuilder::unique.

Required Associated Types§

Source

type Data: Serialize + DeserializeOwned

The type representing the executors arguments/data.

If this is not needed it can be set to unit ().

Source

type Metadata: Serialize + DeserializeOwned

The type for storing the executors metadata this is always optional.

If this is not needed it can be set to unit ().

Required Methods§

Source

fn execute<'async_trait>( job: Job<Self::Data, Self::Metadata>, ) -> Pin<Box<dyn Future<Output = ExecutionResult> + Send + 'async_trait>>
where Self: 'async_trait,

The work this a job should do when executing.

Provided Methods§

Source

fn backoff(job: &Job<Self::Data, Self::Metadata>) -> TimeDelta

Defines the backoff after a failed job.

The default backoff strategy is

  • exponential backoff with initial backoff of 4 seconds, and
  • a max backoff of seven days,
  • with a 10% jitter margin.

For some standard backoff strategies see the crate::backoff.

Note: this method is called preemptively before attempting execution.

Source

fn timeout(_job: &Job<Self::Data, Self::Metadata>) -> Option<Duration>

The timeout when executing a specific job.

If this returns None (default behaviour) then the job will be ran without a timeout.

Source

fn builder<'a>() -> JobBuilder<'a, Self>

The builder for inserting jobs.

For details of the API see JobBuilder.

Source

fn cancel_job<'async_trait>( job_id: JobId, cancellation_reason: Box<dyn CancellationReason>, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where Self: Send + 'async_trait,

Cancel a job with the given reason.

To make use this API and the global backend, crate::Rexecutor::set_global_backend should be called. If this hasn’t been called, then a RexecutorError::GlobalBackend will be returned.

§Example
let reason = Box::new("No longer needed");
let result = SimpleExecutor::cancel_job(job_id, reason).await;
Source

fn cancel_job_on_backend<'life0, 'async_trait, B>( job_id: JobId, cancellation_reason: Box<dyn CancellationReason>, backend: &'life0 B, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where B: Backend + ?Sized + Sync + 'async_trait, 'life0: 'async_trait,

Cancel a job with the given reason on the provided backend.

Source

fn rerun_job<'async_trait>( job_id: JobId, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where Self: Send + 'async_trait,

Rerun a completed or discarded job.

To make use this API and the global backend, crate::Rexecutor::set_global_backend should be called. If this hasn’t been called, then a RexecutorError::GlobalBackend will be returned.

§Example
let result = SimpleExecutor::rerun_job(job_id).await;
Source

fn rerun_job_on_backend<'life0, 'async_trait, B>( job_id: JobId, backend: &'life0 B, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where B: Backend + ?Sized + Sync + 'async_trait, 'life0: 'async_trait,

Rerun a completed or discarded job on the provided backend.

Source

fn query_jobs<'a, 'async_trait>( query: Where<'a, Self::Data, Self::Metadata>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
where Self::Data: 'static + Send + Serialize + DeserializeOwned + Sync, Self::Metadata: 'static + Send + Serialize + DeserializeOwned + Sync, Self: Send + 'async_trait, 'a: 'async_trait,

Query the jobs for this executor using the provided query.

For details of the query API see Where.

To make use this API and the global backend, crate::Rexecutor::set_global_backend should be called. If this hasn’t been called, then a RexecutorError::GlobalBackend will be returned.

§Example

To query all completed jobs for the SimpleExecutor:

let result = SimpleExecutor::query_jobs(Where::status_equals(JobStatus::Complete)).await;
Source

fn query_jobs_on_backend<'a, 'life0, 'async_trait, B>( query: Where<'a, Self::Data, Self::Metadata>, backend: &'life0 B, ) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
where Self::Data: 'static + Send + Serialize + DeserializeOwned + Sync, Self::Metadata: 'static + Send + Serialize + DeserializeOwned + Sync, B: Backend + ?Sized + Sync + 'async_trait, Self: Send + 'async_trait, 'a: 'async_trait, 'life0: 'async_trait,

Query the jobs for this executor using the provided query on the provided backend.

For details of the query API see Where.

Source

fn update_job<'async_trait>( job: Job<Self::Data, Self::Metadata>, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where Self::Data: 'static + Send + Serialize + Sync, Self::Metadata: 'static + Send + Serialize + Sync, Self: Send + 'async_trait,

Update the given job

To make use this API and the global backend, crate::Rexecutor::set_global_backend should be called. If this hasn’t been called, then a RexecutorError::GlobalBackend will be returned.

§Example

To query all completed jobs for the SimpleExecutor:

job.max_attempts = 5;
let result = SimpleExecutor::update_job(job).await;
Source

fn update_job_on_backend<'life0, 'async_trait, B>( job: Job<Self::Data, Self::Metadata>, backend: &'life0 B, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where B: Backend + ?Sized + Sync + 'async_trait, Self::Data: 'static + Serialize + Send + Sync, Self::Metadata: 'static + Serialize + Send + Sync, Self: Send + 'async_trait, 'life0: 'async_trait,

Update the given job

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§