1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
/*
* This file is part of Background Jobs.
*
* Copyright © 2018 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use failure::Error;
use futures::{
future::{Either, IntoFuture},
Future,
};
use serde_json::Value;
use crate::{Backoff, Job, JobError, JobInfo, MaxRetries};
/// ## The Processor trait
///
/// Processors define the logic spawning jobs such as
/// - The job's name
/// - The job's default queue
/// - The job's default maximum number of retries
/// - The job's [backoff
/// strategy](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.Backoff)
///
/// Processors also provide the default mechanism for running a job, and the only mechanism for
/// creating a [JobInfo](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.JobInfo),
/// which is the type required for queuing jobs to be executed.
///
/// ### Example
///
/// ```rust
/// use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
/// use failure::Error;
/// use futures::future::{Future, IntoFuture};
/// use log::info;
/// use serde_derive::{Deserialize, Serialize};
///
/// #[derive(Deserialize, Serialize)]
/// struct MyJob {
/// count: i32,
/// }
///
/// impl Job for MyJob {
/// fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> {
/// info!("Processing {}", self.count);
///
/// Box::new(Ok(()).into_future())
/// }
/// }
///
/// #[derive(Clone)]
/// struct MyProcessor;
///
/// impl Processor for MyProcessor {
/// type Job = MyJob;
///
/// fn name() -> &'static str {
/// "IncrementProcessor"
/// }
///
/// fn queue() -> &'static str {
/// "default"
/// }
///
/// fn max_retries() -> MaxRetries {
/// MaxRetries::Count(1)
/// }
///
/// fn backoff_strategy() -> Backoff {
/// Backoff::Exponential(2)
/// }
/// }
///
/// fn main() -> Result<(), Error> {
/// let job = MyProcessor::new_job(MyJob { count: 1234 })?;
///
/// Ok(())
/// }
/// ```
pub trait Processor: Clone {
type Job: Job;
/// The name of the processor
///
/// This name must be unique!!! It is used to look up which processor should handle a job
fn name() -> &'static str;
/// The name of the default queue for jobs created with this processor
///
/// This can be overridden on an individual-job level, but if a non-existant queue is supplied,
/// the job will never be processed.
fn queue() -> &'static str;
/// Define the default number of retries for a given processor
///
/// Jobs can override
fn max_retries() -> MaxRetries;
/// Define the default backoff strategy for a given processor
///
/// Jobs can override
fn backoff_strategy() -> Backoff;
/// A provided method to create a new JobInfo from provided arguments
///
/// This is required for spawning jobs, since it enforces the relationship between the job and
/// the Processor that should handle it.
fn new_job(job: Self::Job) -> Result<JobInfo, Error> {
let queue = job.queue().unwrap_or(Self::queue()).to_owned();
let max_retries = job.max_retries().unwrap_or(Self::max_retries());
let backoff_strategy = job.backoff_strategy().unwrap_or(Self::backoff_strategy());
let job = JobInfo::new(
Self::name().to_owned(),
queue,
serde_json::to_value(job)?,
max_retries,
backoff_strategy,
);
Ok(job)
}
/// A provided method to coerce arguments into the expected type and run the job
///
/// Advanced users may want to override this method in order to provide their own custom
/// before/after logic for certain job processors
///
/// ```rust,ignore
/// fn process(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
/// let res = serde_json::from_value::<Self::Job>(args);
///
/// let fut = match res {
/// Ok(job) => {
/// // Perform some custom pre-job logic
/// Either::A(job.run().map_err(JobError::Processing))
/// },
/// Err(_) => Either::B(Err(JobError::Json).into_future()),
/// };
///
/// Box::new(fut.and_then(|_| {
/// // Perform some custom post-job logic
/// }))
/// }
/// ```
///
/// Patterns like this could be useful if you want to use the same job type for multiple
/// scenarios. Defining the `process` method for multiple `Processor`s with different
/// before/after logic for the same
/// [`Job`](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.Job) type is
/// supported.
fn process(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
let res = serde_json::from_value::<Self::Job>(args);
let fut = match res {
Ok(job) => Either::A(job.run().map_err(JobError::Processing)),
Err(_) => Either::B(Err(JobError::Json).into_future()),
};
Box::new(fut)
}
}