[−][src]Trait background_jobs::Processor
The Processor trait
Processors define the logic spawning jobs such as
- The job's name
- The job's default queue
- The job's default maximum number of retries
- The job's backoff strategy
Processors also provide the default mechanism for running a job, and the only mechanism for creating a JobInfo, which is the type required for queuing jobs to be executed.
Example
use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; use failure::Error; use futures::future::Future; use log::info; use serde_derive::{Deserialize, Serialize}; #[derive(Deserialize, Serialize)] struct MyJob { count: i32, } impl Job for MyJob { type Processor = MyProcessor; type State = (); type Future = Result<(), Error>; fn run(self, _state: Self::State) -> Self::Future { info!("Processing {}", self.count); Ok(()) } } #[derive(Clone)] struct MyProcessor; impl Processor for MyProcessor { type Job = MyJob; const NAME: &'static str = "IncrementProcessor"; const QUEUE: &'static str = "default"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); } fn main() -> Result<(), Error> { let job = MyProcessor::new_job(MyJob { count: 1234 })?; Ok(()) }
Associated Types
Loading content...Associated Constants
const NAME: &'static str
The name of the processor
This name must be unique!!! It is used to look up which processor should handle a job
const QUEUE: &'static str
The name of the default queue for jobs created with this processor
This can be overridden on an individual-job level, but if a non-existant queue is supplied, the job will never be processed.
const MAX_RETRIES: MaxRetries
Define the default number of retries for a given processor
Jobs can override
const BACKOFF_STRATEGY: Backoff
Define the default backoff strategy for a given processor
Jobs can override
Provided methods
fn new_job(job: Self::Job) -> Result<NewJobInfo, Error>
A provided method to create a new JobInfo from provided arguments
This is required for spawning jobs, since it enforces the relationship between the job and the Processor that should handle it.
fn new_scheduled_job(
job: Self::Job,
after: DateTime<Utc>
) -> Result<NewJobInfo, Error>
job: Self::Job,
after: DateTime<Utc>
) -> Result<NewJobInfo, Error>
Create a JobInfo to schedule a job to be performed after a certain time
fn process(
&self,
args: Value,
state: <Self::Job as Job>::State
) -> Box<dyn Future<Error = JobError, Item = ()> + 'static + Send> where
<<Self::Job as Job>::Future as IntoFuture>::Future: Send,
&self,
args: Value,
state: <Self::Job as Job>::State
) -> Box<dyn Future<Error = JobError, Item = ()> + 'static + Send> where
<<Self::Job as Job>::Future as IntoFuture>::Future: Send,
A provided method to coerce arguments into the expected type and run the job
Advanced users may want to override this method in order to provide their own custom before/after logic for certain job processors
The state passed into this method is initialized at the start of the application. The state argument could be useful for containing a hook into something like r2d2, or the address of an actor in an actix-based system.
fn process( &self, args: Value, state: S ) -> Box<dyn Future<Item = (), Error = JobError> + Send> { let res = serde_json::from_value::<Self::Job>(args); let fut = match res { Ok(job) => { // Perform some custom pre-job logic Either::A(job.run(state).map_err(JobError::Processing)) }, Err(_) => Either::B(Err(JobError::Json).into_future()), }; Box::new(fut.and_then(|_| { // Perform some custom post-job logic })) }
Patterns like this could be useful if you want to use the same job type for multiple
scenarios. Defining the process
method for multiple Processor
s with different
before/after logic for the same
Job
type is
supported.