1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
use crate::{Backoff, BoxError, JobError, MaxRetries, NewJobInfo};
use serde::{de::DeserializeOwned, ser::Serialize};
use serde_json::Value;
use std::{future::Future, pin::Pin, time::SystemTime};
use tracing::{Instrument, Span};
/// The Job trait defines parameters pertaining to an instance of background job
///
/// Jobs are defnitions of work to be executed.
///
/// The simplest implementation defines the job's State and Future types, NAME contant, and
/// run method.
///
/// ### Example
///
/// ```rust
/// use background_jobs_core::{Job, new_job, BoxError};
/// use tracing::info;
/// use std::future::{ready, Ready};
///
/// #[derive(serde::Deserialize, serde::Serialize)]
/// struct MyJob {
/// count: i64,
/// }
///
/// impl Job for MyJob {
/// type State = ();
/// type Error = BoxError;
/// type Future = Ready<Result<(), BoxError>>;
///
/// const NAME: &'static str = "MyJob";
///
/// fn run(self, _: Self::State) -> Self::Future {
/// info!("Processing {}", self.count);
///
/// ready(Ok(()))
/// }
/// }
///
/// fn main() -> Result<(), BoxError> {
/// let job = new_job(MyJob { count: 1234 })?;
///
/// Ok(())
/// }
/// ```
pub trait Job: Serialize + DeserializeOwned + 'static {
/// The application state provided to this job at runtime.
type State: Clone + 'static;
/// The error type this job returns
type Error: Into<BoxError>;
/// The future returned by this job
type Future: Future<Output = Result<(), Self::Error>> + Send;
/// The name of the job
///
/// This name must be unique!!!
const NAME: &'static str;
/// The name of the default queue for this job
///
/// This can be overridden on an individual-job level, but if a non-existant queue is supplied,
/// the job will never be processed.
const QUEUE: &'static str = "default";
/// Define the default number of retries for this job
///
/// Defaults to Count(5)
/// Jobs can override
const MAX_RETRIES: MaxRetries = MaxRetries::Count(5);
/// Define the default backoff strategy for this job
///
/// Defaults to Exponential(2)
/// Jobs can override
const BACKOFF: Backoff = Backoff::Exponential(2);
/// Define how often a job should update its heartbeat timestamp
///
/// This is important for allowing the job server to reap processes that were started but never
/// completed.
///
/// Defaults to 5 seconds
/// Jobs can override
const HEARTBEAT_INTERVAL: u64 = 5_000;
/// Users of this library must define what it means to run a job.
///
/// This should contain all the logic needed to complete a job. If that means queuing more
/// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy
/// processes, that logic should all be called from inside this method.
///
/// The state passed into this job is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system.
fn run(self, state: Self::State) -> Self::Future;
/// Generate a Span that the job will be processed within
fn span(&self) -> Option<Span> {
None
}
/// If this job should not use it's default queue, this can be overridden in
/// user-code.
fn queue(&self) -> &str {
Self::QUEUE
}
/// If this job should not use it's default maximum retry count, this can be
/// overridden in user-code.
fn max_retries(&self) -> MaxRetries {
Self::MAX_RETRIES
}
/// If this job should not use it's default backoff strategy, this can be
/// overridden in user-code.
fn backoff_strategy(&self) -> Backoff {
Self::BACKOFF
}
/// Define how often a job should update its heartbeat timestamp
///
/// This is important for allowing the job server to reap processes that were started but never
/// completed.
fn heartbeat_interval(&self) -> u64 {
Self::HEARTBEAT_INTERVAL
}
}
/// A provided method to create a new JobInfo from provided arguments
pub fn new_job<J>(job: J) -> Result<NewJobInfo, BoxError>
where
J: Job,
{
let job = NewJobInfo::new(
J::NAME.to_owned(),
job.queue().to_owned(),
job.max_retries(),
job.backoff_strategy(),
job.heartbeat_interval(),
serde_json::to_value(job).map_err(|_| ToJson)?,
);
Ok(job)
}
/// Create a NewJobInfo to schedule a job to be performed after a certain time
pub fn new_scheduled_job<J>(job: J, after: SystemTime) -> Result<NewJobInfo, BoxError>
where
J: Job,
{
let mut job = new_job(job)?;
job.schedule(after);
Ok(job)
}
/// A provided method to coerce arguments into the expected type and run the job
pub fn process<J>(
args: Value,
state: J::State,
) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>>
where
J: Job,
{
let res = serde_json::from_value::<J>(args).map(move |job| {
if let Some(span) = job.span() {
let fut = span.in_scope(move || job.run(state));
(fut, Some(span))
} else {
(job.run(state), None)
}
});
Box::pin(async move {
let (fut, span) = res?;
let res = if let Some(span) = span {
fut.instrument(span).await
} else {
fut.await
};
res.map_err(Into::into).map_err(JobError::Processing)
})
}
#[derive(Clone, Debug, thiserror::Error)]
#[error("Failed to to turn job into value")]
pub struct ToJson;