pub trait Processor<S = ()>: Clonewhere
    S: Clone + Send + Sync + 'static,
{ type Job: Job<S> + 'static; const NAME: &'static str; const QUEUE: &'static str; const MAX_RETRIES: MaxRetries; const BACKOFF_STRATEGY: Backoff; fn new_job(job: Self::Job) -> Result<NewJobInfo, Error> { ... } fn new_scheduled_job(
        job: Self::Job,
        after: DateTime<Utc>
    ) -> Result<NewJobInfo, Error> { ... } fn process(
        &self,
        args: Value,
        state: S
    ) -> Box<dyn Future<Item = (), Error = JobError> + Send> { ... } }
Expand description

The Processor trait

Processors define the logic spawning jobs such as

  • The job’s name
  • The job’s default queue
  • The job’s default maximum number of retries
  • The job’s backoff strategy

Processors also provide the default mechanism for running a job, and the only mechanism for creating a JobInfo, which is the type required for queuing jobs to be executed.

Example

use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
use failure::Error;
use futures::future::{Future, IntoFuture};
use log::info;
use serde_derive::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct MyJob {
    count: i32,
}

impl Job<()> for MyJob {
    fn run(self, _state: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> {
        info!("Processing {}", self.count);

        Box::new(Ok(()).into_future())
    }
}

#[derive(Clone)]
struct MyProcessor;

impl Processor<()> for MyProcessor {
    type Job = MyJob;

    const NAME: &'static str = "IncrementProcessor";
    const QUEUE: &'static str = "default";
    const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
    const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2);
}

fn main() -> Result<(), Error> {
    let job = MyProcessor::new_job(MyJob { count: 1234 })?;

    Ok(())
}

Required Associated Types

Required Associated Constants

The name of the processor

This name must be unique!!! It is used to look up which processor should handle a job

The name of the default queue for jobs created with this processor

This can be overridden on an individual-job level, but if a non-existant queue is supplied, the job will never be processed.

Define the default number of retries for a given processor

Jobs can override

Define the default backoff strategy for a given processor

Jobs can override

Provided Methods

A provided method to create a new JobInfo from provided arguments

This is required for spawning jobs, since it enforces the relationship between the job and the Processor that should handle it.

Create a JobInfo to schedule a job to be performed after a certain time

A provided method to coerce arguments into the expected type and run the job

Advanced users may want to override this method in order to provide their own custom before/after logic for certain job processors

The state passed into this method is initialized at the start of the application. The state argument could be useful for containing a hook into something like r2d2, or the address of an actor in an actix-based system.

fn process(
    &self,
    args: Value,
    state: S
) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
    let res = serde_json::from_value::<Self::Job>(args);

    let fut = match res {
        Ok(job) => {
            // Perform some custom pre-job logic
            Either::A(job.run(state).map_err(JobError::Processing))
        },
        Err(_) => Either::B(Err(JobError::Json).into_future()),
    };

    Box::new(fut.and_then(|_| {
        // Perform some custom post-job logic
    }))
}

Patterns like this could be useful if you want to use the same job type for multiple scenarios. Defining the process method for multiple Processors with different before/after logic for the same Job type is supported.

Implementors