1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
/*
* This file is part of Background Jobs.
*
* Copyright © 2018 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use std::marker::PhantomData;
use background_jobs_core::{Backoff, Job, MaxRetries};
use failure::Error;
use futures::{future::poll_fn, Future};
use serde::{de::DeserializeOwned, ser::Serialize};
use serde_derive::{Deserialize, Serialize};
use tokio_threadpool::blocking;
mod server;
mod spawner;
mod worker;
pub use crate::{server::ServerConfig, spawner::SpawnerConfig, worker::WorkerConfig};
fn coerce<T, F>(res: Result<Result<T, Error>, F>) -> Result<T, Error>
where
F: Into<Error>,
{
match res {
Ok(res) => res,
Err(e) => Err(e.into()),
}
}
/// The SyncJob trait defines parameters pertaining to a synchronous instance of background job
///
/// This trait should be implemented sparingly, but is provided so that synchronous tasks may be
/// executed. If you have the ability to implement the
/// [`Job`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Job.html) trait directly,
/// you should.
///
/// ### Example
///
/// ```rust
/// use background_jobs_server::SyncJob;
/// use failure::Error;
/// use log::info;
/// use serde_derive::{Deserialize, Serialize};
///
/// #[derive(Clone, Deserialize, Serialize)]
/// struct MyJob {
/// count: i32,
/// }
///
/// impl SyncJob for MyJob {
/// fn run(self, _state: ()) -> Result<(), Error> {
/// info!("Processing {}", self.count);
///
/// // Perform some synchronous operation, like a DB action with r2d2 and diesel
///
/// Ok(())
/// }
/// }
///
/// fn main() {
/// let sync_job = MyJob { count: 0 };
/// let job = sync_job.to_job();
/// }
/// ```
pub trait SyncJob<S = ()>: Clone {
/// Users of this library must define what it means to run a job.
///
/// This should contain all the logic needed to complete a job. If that means queuing more
/// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy
/// processes, that logic should all be called from inside this method.
///
/// The state passed into this job is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system.
fn run(self, state: S) -> Result<(), Error>;
/// If this job should not use the default queue for its processor, this can be overridden in
/// user-code.
///
/// Jobs will only be processed by processors that are registered, and if a queue is supplied
/// here that is not associated with a valid processor for this job, it will never be
/// processed.
fn queue(&self) -> Option<&str> {
None
}
/// If this job should not use the default maximum retry count for its processor, this can be
/// overridden in user-code.
fn max_retries(&self) -> Option<MaxRetries> {
None
}
/// If this job should not use the default backoff strategy for its processor, this can be
/// overridden in user-code.
fn backoff_strategy(&self) -> Option<Backoff> {
None
}
/// Wrap this type in a SyncJobWrapper so it implements Job
fn to_job(self) -> SyncJobWrapper<Self, S> {
SyncJobWrapper {
inner: self,
phantom: PhantomData,
}
}
}
/// A wrapper around synchronous jobs
#[derive(Clone, Deserialize, Serialize)]
pub struct SyncJobWrapper<J, S = ()>
where
J: SyncJob<S>,
{
inner: J,
phantom: PhantomData<S>,
}
impl<J, S> Job<S> for SyncJobWrapper<J, S>
where
J: SyncJob<S> + Serialize + DeserializeOwned + Send + 'static,
S: Clone + Send + Sync + 'static,
{
fn queue(&self) -> Option<&str> {
self.inner.queue()
}
fn max_retries(&self) -> Option<MaxRetries> {
self.inner.max_retries()
}
fn backoff_strategy(&self) -> Option<Backoff> {
self.inner.backoff_strategy()
}
fn run(self, state: S) -> Box<dyn Future<Item = (), Error = Error> + Send> {
let fut = poll_fn(move || {
let job = self.inner.clone();
let state = state.clone();
blocking(move || job.run(state.clone()))
})
.then(coerce);
Box::new(fut)
}
}