Trait background_jobs::Processor
source · pub trait Processor<S = ()>: Clonewhere
S: 'static + Clone + Send + Sync,{
type Job: 'static + Job<S>;
const NAME: &'static str;
const QUEUE: &'static str;
const MAX_RETRIES: MaxRetries;
const BACKOFF_STRATEGY: Backoff;
fn new_job(job: Self::Job) -> Result<NewJobInfo, Error> { ... }
fn new_scheduled_job(
job: Self::Job,
after: DateTime<Utc>
) -> Result<NewJobInfo, Error> { ... }
fn process(
&self,
args: Value,
state: S
) -> Box<dyn Future<Item = (), Error = JobError> + Send + 'static, Global> { ... }
}
Expand description
The Processor trait
Processors define the logic spawning jobs such as
- The job’s name
- The job’s default queue
- The job’s default maximum number of retries
- The job’s backoff strategy
Processors also provide the default mechanism for running a job, and the only mechanism for creating a JobInfo, which is the type required for queuing jobs to be executed.
Example
use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
use failure::Error;
use futures::future::{Future, IntoFuture};
use log::info;
use serde_derive::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct MyJob {
count: i32,
}
impl Job<()> for MyJob {
fn run(self, _state: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> {
info!("Processing {}", self.count);
Box::new(Ok(()).into_future())
}
}
#[derive(Clone)]
struct MyProcessor;
impl Processor<()> for MyProcessor {
type Job = MyJob;
const NAME: &'static str = "IncrementProcessor";
const QUEUE: &'static str = "default";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2);
}
fn main() -> Result<(), Error> {
let job = MyProcessor::new_job(MyJob { count: 1234 })?;
Ok(())
}
Required Associated Types
Required Associated Constants
sourceconst NAME: &'static str
const NAME: &'static str
The name of the processor
This name must be unique!!! It is used to look up which processor should handle a job
sourceconst QUEUE: &'static str
const QUEUE: &'static str
The name of the default queue for jobs created with this processor
This can be overridden on an individual-job level, but if a non-existant queue is supplied, the job will never be processed.
sourceconst MAX_RETRIES: MaxRetries
const MAX_RETRIES: MaxRetries
Define the default number of retries for a given processor
Jobs can override
sourceconst BACKOFF_STRATEGY: Backoff
const BACKOFF_STRATEGY: Backoff
Define the default backoff strategy for a given processor
Jobs can override
Provided Methods
sourcefn new_job(job: Self::Job) -> Result<NewJobInfo, Error>
fn new_job(job: Self::Job) -> Result<NewJobInfo, Error>
A provided method to create a new JobInfo from provided arguments
This is required for spawning jobs, since it enforces the relationship between the job and the Processor that should handle it.
sourcefn new_scheduled_job(
job: Self::Job,
after: DateTime<Utc>
) -> Result<NewJobInfo, Error>
fn new_scheduled_job(
job: Self::Job,
after: DateTime<Utc>
) -> Result<NewJobInfo, Error>
Create a JobInfo to schedule a job to be performed after a certain time
sourcefn process(
&self,
args: Value,
state: S
) -> Box<dyn Future<Item = (), Error = JobError> + Send + 'static, Global>
fn process(
&self,
args: Value,
state: S
) -> Box<dyn Future<Item = (), Error = JobError> + Send + 'static, Global>
A provided method to coerce arguments into the expected type and run the job
Advanced users may want to override this method in order to provide their own custom before/after logic for certain job processors
The state passed into this method is initialized at the start of the application. The state argument could be useful for containing a hook into something like r2d2, or the address of an actor in an actix-based system.
fn process(
&self,
args: Value,
state: S
) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
let res = serde_json::from_value::<Self::Job>(args);
let fut = match res {
Ok(job) => {
// Perform some custom pre-job logic
Either::A(job.run(state).map_err(JobError::Processing))
},
Err(_) => Either::B(Err(JobError::Json).into_future()),
};
Box::new(fut.and_then(|_| {
// Perform some custom post-job logic
}))
}
Patterns like this could be useful if you want to use the same job type for multiple
scenarios. Defining the process
method for multiple Processor
s with different
before/after logic for the same
Job
type is
supported.