pub trait Executor {
type Data: Serialize + DeserializeOwned;
type Metadata: Serialize + DeserializeOwned;
const NAME: &'static str;
const MAX_ATTEMPTS: u16 = 5u16;
const MAX_CONCURRENCY: Option<usize> = None;
const BLOCKING: bool = false;
const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = None;
// Required method
fn execute<'async_trait>(
job: Job<Self::Data, Self::Metadata>,
) -> Pin<Box<dyn Future<Output = ExecutionResult> + Send + 'async_trait>>
where Self: 'async_trait;
// Provided methods
fn backoff(job: &Job<Self::Data, Self::Metadata>) -> TimeDelta { ... }
fn timeout(_job: &Job<Self::Data, Self::Metadata>) -> Option<Duration> { ... }
fn builder<'a>() -> JobBuilder<'a, Self>
where Self: Sized,
Self::Data: Serialize + DeserializeOwned,
Self::Metadata: Serialize + DeserializeOwned { ... }
fn cancel_job<'async_trait>(
job_id: JobId,
cancellation_reason: Box<dyn CancellationReason>,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where Self: Send + 'async_trait { ... }
fn cancel_job_on_backend<'life0, 'async_trait, B>(
job_id: JobId,
cancellation_reason: Box<dyn CancellationReason>,
backend: &'life0 B,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where B: Backend + ?Sized + Sync + 'async_trait,
'life0: 'async_trait { ... }
fn rerun_job<'async_trait>(
job_id: JobId,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where Self: Send + 'async_trait { ... }
fn rerun_job_on_backend<'life0, 'async_trait, B>(
job_id: JobId,
backend: &'life0 B,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where B: Backend + ?Sized + Sync + 'async_trait,
'life0: 'async_trait { ... }
fn query_jobs<'a, 'async_trait>(
query: Where<'a, Self::Data, Self::Metadata>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
where Self::Data: 'static + Send + Serialize + DeserializeOwned + Sync,
Self::Metadata: 'static + Send + Serialize + DeserializeOwned + Sync,
Self: Send + 'async_trait,
'a: 'async_trait { ... }
fn query_jobs_on_backend<'a, 'life0, 'async_trait, B>(
query: Where<'a, Self::Data, Self::Metadata>,
backend: &'life0 B,
) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
where Self::Data: 'static + Send + Serialize + DeserializeOwned + Sync,
Self::Metadata: 'static + Send + Serialize + DeserializeOwned + Sync,
B: Backend + ?Sized + Sync + 'async_trait,
Self: Send + 'async_trait,
'a: 'async_trait,
'life0: 'async_trait { ... }
fn update_job<'async_trait>(
job: Job<Self::Data, Self::Metadata>,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where Self::Data: 'static + Send + Serialize + Sync,
Self::Metadata: 'static + Send + Serialize + Sync,
Self: Send + 'async_trait { ... }
fn update_job_on_backend<'life0, 'async_trait, B>(
job: Job<Self::Data, Self::Metadata>,
backend: &'life0 B,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
where B: Backend + ?Sized + Sync + 'async_trait,
Self::Data: 'static + Serialize + Send + Sync,
Self::Metadata: 'static + Serialize + Send + Sync,
Self: Send + 'async_trait,
'life0: 'async_trait { ... }
}Expand description
An enqueuable execution unit.
Jobs are defined by creating a struct/enum and implementing Executor for it.
§Example
You can define and enqueue a job as follows:
struct EmailJob;
#[async_trait::async_trait]
impl Executor for EmailJob {
type Data = String;
type Metadata = String;
const NAME: &'static str = "email_job";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
println!("{} running, with args: {}", Self::NAME, job.data);
/// Do something important with an email
ExecutionResult::Done
}
}
let _ = EmailJob::builder()
.with_data("bob.shuruncle@example.com".to_owned())
.schedule_in(TimeDelta::hours(3))
.enqueue()
.await;
assert_enqueued!(
with_data: "bob.shuruncle@example.com".to_owned(),
scheduled_after: Utc::now() + TimeDelta::minutes(170),
scheduled_before: Utc::now() + TimeDelta::minutes(190),
for_executor: EmailJob
);§Unique jobs
It is possible to ensure uniqueness of jobs based on certain criteria. This can be defined as
part of the implementation of Executor via Executor::UNIQUENESS_CRITERIA or when
inserting the job via JobBuilder::unique.
For example to ensure that only one unique job is ran every five minutes it is possible to use the following uniqueness criteria.
struct UniqueJob;
#[async_trait::async_trait]
impl Executor for UniqueJob {
type Data = ();
type Metadata = ();
const NAME: &'static str = "unique_job";
const MAX_ATTEMPTS: u16 = 1;
const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
UniquenessCriteria::by_executor()
.and_within(TimeDelta::seconds(300))
.on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
);
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
println!("{} running, with args: {:?}", Self::NAME, job.data);
// Do something important
ExecutionResult::Done
}
}
let _ = UniqueJob::builder().enqueue().await;
let _ = UniqueJob::builder().enqueue().await;
// Only one of jobs was enqueued
assert_enqueued!(
1 job,
scheduled_before: Utc::now(),
for_executor: UniqueJob
);Additionally it is possible to specify what action should be taken when there is a conflicting
job. In the example above the priority is override. For more details of how to use uniqueness
see UniquenessCriteria.
§Overriding Executor default values
When defining an Executor you specify the maximum number of attempts via
Executor::MAX_ATTEMPTS. However, when inserting a job it is possible to override this value
by calling JobBuilder::with_max_attempts (if not called the max attempts will be equal to
Executor::MAX_ATTEMPTS.
Similarly, the executor can define a job uniqueness criteria via
Executor::UNIQUENESS_CRITERIA. However, using JobBuilder::unique it is possible to
override this value for a specific job.
Required Associated Constants§
Sourceconst NAME: &'static str
const NAME: &'static str
The name of the executor.
This is used to associate the jobs stored in the backend with this particular executor.
This should be set to a unique value for the backend to ensure no clashes with other executors running on the same backend.
The motivation for using a static string here is to enable developers to rename their rust types for their executor without breaking the integration with the backend.
Provided Associated Constants§
Sourceconst MAX_ATTEMPTS: u16 = 5u16
const MAX_ATTEMPTS: u16 = 5u16
The maximum number of attempts to try this job before it is discarded.
When enqueuing any given job this can be overridden via JobBuilder::with_max_attempts.
Sourceconst MAX_CONCURRENCY: Option<usize> = None
const MAX_CONCURRENCY: Option<usize> = None
The maximum number of concurrent jobs to be running. If set to None there will be no
concurrency limit and an arbitrary number can be ran simultaneously.
Sourceconst BLOCKING: bool = false
const BLOCKING: bool = false
This flag should be set to true if the job is computationally expensive.
This is to prevent a computationally expensive job locking up the asynchronous runtime.
Under the covers this results in the job executor being ran via
tokio::task::spawn_blocking. See the docs for more details about blocking futures.
Sourceconst UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = None
const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = None
This can be used to ensure that only unique jobs matching the UniquenessCriteria are
inserted.
If there is already a job inserted matching the given constraints there is the option to
either update the current job or do nothing. See the docs of UniquenessCriteria for
more details.
It is possible to override when inserting a specific job via JobBuilder::unique.
Required Associated Types§
Sourcetype Data: Serialize + DeserializeOwned
type Data: Serialize + DeserializeOwned
The type representing the executors arguments/data.
If this is not needed it can be set to unit ().
Sourcetype Metadata: Serialize + DeserializeOwned
type Metadata: Serialize + DeserializeOwned
The type for storing the executors metadata this is always optional.
If this is not needed it can be set to unit ().
Required Methods§
Provided Methods§
Sourcefn backoff(job: &Job<Self::Data, Self::Metadata>) -> TimeDelta
fn backoff(job: &Job<Self::Data, Self::Metadata>) -> TimeDelta
Defines the backoff after a failed job.
The default backoff strategy is
- exponential backoff with initial backoff of 4 seconds, and
- a max backoff of seven days,
- with a 10% jitter margin.
For some standard backoff strategies see the crate::backoff.
Note: this method is called preemptively before attempting execution.
Sourcefn timeout(_job: &Job<Self::Data, Self::Metadata>) -> Option<Duration>
fn timeout(_job: &Job<Self::Data, Self::Metadata>) -> Option<Duration>
The timeout when executing a specific job.
If this returns None (default behaviour) then the job will be ran without a timeout.
Sourcefn builder<'a>() -> JobBuilder<'a, Self>where
Self: Sized,
Self::Data: Serialize + DeserializeOwned,
Self::Metadata: Serialize + DeserializeOwned,
fn builder<'a>() -> JobBuilder<'a, Self>where
Self: Sized,
Self::Data: Serialize + DeserializeOwned,
Self::Metadata: Serialize + DeserializeOwned,
The builder for inserting jobs.
For details of the API see JobBuilder.
Sourcefn cancel_job<'async_trait>(
job_id: JobId,
cancellation_reason: Box<dyn CancellationReason>,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
fn cancel_job<'async_trait>(
job_id: JobId,
cancellation_reason: Box<dyn CancellationReason>,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
Cancel a job with the given reason.
To make use this API and the global backend, crate::Rexecutor::set_global_backend
should be called. If this hasn’t been called, then a RexecutorError::GlobalBackend
will be returned.
§Example
let reason = Box::new("No longer needed");
let result = SimpleExecutor::cancel_job(job_id, reason).await;Sourcefn cancel_job_on_backend<'life0, 'async_trait, B>(
job_id: JobId,
cancellation_reason: Box<dyn CancellationReason>,
backend: &'life0 B,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
fn cancel_job_on_backend<'life0, 'async_trait, B>( job_id: JobId, cancellation_reason: Box<dyn CancellationReason>, backend: &'life0 B, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
Cancel a job with the given reason on the provided backend.
Sourcefn rerun_job<'async_trait>(
job_id: JobId,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
fn rerun_job<'async_trait>(
job_id: JobId,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
Rerun a completed or discarded job.
To make use this API and the global backend, crate::Rexecutor::set_global_backend
should be called. If this hasn’t been called, then a RexecutorError::GlobalBackend
will be returned.
§Example
let result = SimpleExecutor::rerun_job(job_id).await;Sourcefn rerun_job_on_backend<'life0, 'async_trait, B>(
job_id: JobId,
backend: &'life0 B,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
fn rerun_job_on_backend<'life0, 'async_trait, B>( job_id: JobId, backend: &'life0 B, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
Rerun a completed or discarded job on the provided backend.
Sourcefn query_jobs<'a, 'async_trait>(
query: Where<'a, Self::Data, Self::Metadata>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
fn query_jobs<'a, 'async_trait>( query: Where<'a, Self::Data, Self::Metadata>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
Query the jobs for this executor using the provided query.
For details of the query API see Where.
To make use this API and the global backend, crate::Rexecutor::set_global_backend
should be called. If this hasn’t been called, then a RexecutorError::GlobalBackend
will be returned.
§Example
To query all completed jobs for the SimpleExecutor:
let result = SimpleExecutor::query_jobs(Where::status_equals(JobStatus::Complete)).await;Sourcefn query_jobs_on_backend<'a, 'life0, 'async_trait, B>(
query: Where<'a, Self::Data, Self::Metadata>,
backend: &'life0 B,
) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
fn query_jobs_on_backend<'a, 'life0, 'async_trait, B>( query: Where<'a, Self::Data, Self::Metadata>, backend: &'life0 B, ) -> Pin<Box<dyn Future<Output = Result<Vec<Job<Self::Data, Self::Metadata>>, RexecutorError>> + Send + 'async_trait>>
Query the jobs for this executor using the provided query on the provided backend.
For details of the query API see Where.
Sourcefn update_job<'async_trait>(
job: Job<Self::Data, Self::Metadata>,
) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
fn update_job<'async_trait>( job: Job<Self::Data, Self::Metadata>, ) -> Pin<Box<dyn Future<Output = Result<(), RexecutorError>> + Send + 'async_trait>>
Update the given job
To make use this API and the global backend, crate::Rexecutor::set_global_backend
should be called. If this hasn’t been called, then a RexecutorError::GlobalBackend
will be returned.
§Example
To query all completed jobs for the SimpleExecutor:
job.max_attempts = 5;
let result = SimpleExecutor::update_job(job).await;Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.