Trait background_jobs::Processor
source · pub trait Processor: Clone {
type Job: Job;
fn name() -> &'static str;
fn queue() -> &'static str;
fn max_retries() -> MaxRetries;
fn backoff_strategy() -> Backoff;
fn new_job(job: Self::Job) -> Result<JobInfo, Error> { ... }
fn process(
&self,
args: Value
) -> Box<dyn Future<Error = JobError, Item = ()> + Send + 'static, Global> { ... }
}
Expand description
The Processor trait
Processors define the logic spawning jobs such as
- The job’s name
- The job’s default queue
- The job’s default maximum number of retries
- The job’s backoff strategy
Processors also provide the default mechanism for running a job, and the only mechanism for creating a JobInfo, which is the type required for queuing jobs to be executed.
Example
use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
use failure::Error;
use futures::future::{Future, IntoFuture};
use log::info;
use serde_derive::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct MyJob {
count: i32,
}
impl Job for MyJob {
fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> {
info!("Processing {}", self.count);
Box::new(Ok(()).into_future())
}
}
#[derive(Clone)]
struct MyProcessor;
impl Processor for MyProcessor {
type Job = MyJob;
fn name() -> &'static str {
"IncrementProcessor"
}
fn queue() -> &'static str {
"default"
}
fn max_retries() -> MaxRetries {
MaxRetries::Count(1)
}
fn backoff_strategy() -> Backoff {
Backoff::Exponential(2)
}
}
fn main() -> Result<(), Error> {
let job = MyProcessor::new_job(MyJob { count: 1234 })?;
Ok(())
}
Required Associated Types§
Required Methods§
sourcefn name() -> &'static str
fn name() -> &'static str
The name of the processor
This name must be unique!!! It is used to look up which processor should handle a job
sourcefn queue() -> &'static str
fn queue() -> &'static str
The name of the default queue for jobs created with this processor
This can be overridden on an individual-job level, but if a non-existant queue is supplied, the job will never be processed.
sourcefn max_retries() -> MaxRetries
fn max_retries() -> MaxRetries
Define the default number of retries for a given processor
Jobs can override
sourcefn backoff_strategy() -> Backoff
fn backoff_strategy() -> Backoff
Define the default backoff strategy for a given processor
Jobs can override
Provided Methods§
sourcefn new_job(job: Self::Job) -> Result<JobInfo, Error>
fn new_job(job: Self::Job) -> Result<JobInfo, Error>
A provided method to create a new JobInfo from provided arguments
This is required for spawning jobs, since it enforces the relationship between the job and the Processor that should handle it.
sourcefn process(
&self,
args: Value
) -> Box<dyn Future<Error = JobError, Item = ()> + Send + 'static, Global>
fn process(
&self,
args: Value
) -> Box<dyn Future<Error = JobError, Item = ()> + Send + 'static, Global>
A provided method to coerce arguments into the expected type and run the job
Advanced users may want to override this method in order to provide their own custom before/after logic for certain job processors
fn process(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
let res = serde_json::from_value::<Self::Job>(args);
let fut = match res {
Ok(job) => {
// Perform some custom pre-job logic
Either::A(job.run().map_err(JobError::Processing))
},
Err(_) => Either::B(Err(JobError::Json).into_future()),
};
Box::new(fut.and_then(|_| {
// Perform some custom post-job logic
}))
}
Patterns like this could be useful if you want to use the same job type for multiple
scenarios. Defining the process
method for multiple Processor
s with different
before/after logic for the same
Job
type is
supported.