pub trait Processor: Clone {
    type Job: Job;

    fn name() -> &'static str;
    fn queue() -> &'static str;
    fn max_retries() -> MaxRetries;
    fn backoff_strategy() -> Backoff;

    fn new_job(job: Self::Job) -> Result<JobInfo, Error> { ... }
    fn process(
        &self,
        args: Value
    ) -> Box<dyn Future<Error = JobError, Item = ()> + Send + 'static, Global> { ... } }
Expand description

The Processor trait

Processors define the logic spawning jobs such as

  • The job’s name
  • The job’s default queue
  • The job’s default maximum number of retries
  • The job’s backoff strategy

Processors also provide the default mechanism for running a job, and the only mechanism for creating a JobInfo, which is the type required for queuing jobs to be executed.

Example

use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
use failure::Error;
use futures::future::{Future, IntoFuture};
use log::info;
use serde_derive::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct MyJob {
    count: i32,
}

impl Job for MyJob {
    fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> {
        info!("Processing {}", self.count);

        Box::new(Ok(()).into_future())
    }
}

#[derive(Clone)]
struct MyProcessor;

impl Processor for MyProcessor {
    type Job = MyJob;

    fn name() -> &'static str {
        "IncrementProcessor"
    }

    fn queue() -> &'static str {
        "default"
    }

    fn max_retries() -> MaxRetries {
        MaxRetries::Count(1)
    }

    fn backoff_strategy() -> Backoff {
        Backoff::Exponential(2)
    }
}

fn main() -> Result<(), Error> {
    let job = MyProcessor::new_job(MyJob { count: 1234 })?;

    Ok(())
}

Required Associated Types§

Required Methods§

The name of the processor

This name must be unique!!! It is used to look up which processor should handle a job

The name of the default queue for jobs created with this processor

This can be overridden on an individual-job level, but if a non-existant queue is supplied, the job will never be processed.

Define the default number of retries for a given processor

Jobs can override

Define the default backoff strategy for a given processor

Jobs can override

Provided Methods§

A provided method to create a new JobInfo from provided arguments

This is required for spawning jobs, since it enforces the relationship between the job and the Processor that should handle it.

A provided method to coerce arguments into the expected type and run the job

Advanced users may want to override this method in order to provide their own custom before/after logic for certain job processors

fn process(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
    let res = serde_json::from_value::<Self::Job>(args);

    let fut = match res {
        Ok(job) => {
            // Perform some custom pre-job logic
            Either::A(job.run().map_err(JobError::Processing))
        },
        Err(_) => Either::B(Err(JobError::Json).into_future()),
    };

    Box::new(fut.and_then(|_| {
        // Perform some custom post-job logic
    }))
}

Patterns like this could be useful if you want to use the same job type for multiple scenarios. Defining the process method for multiple Processors with different before/after logic for the same Job type is supported.

Implementors§