Crate gaffer[−][src]
Expand description
Prioritised, parallel job scheduler with concurrent exclusion, job merging, recurring jobs and load limiting for lower priorities.
A job scheduler executes tasks on it’s own thread or thread pool. This job scheduler is particularly designed to consider heavier weight or more expensive jobs, which likely have side effects. In this case it can be valuable to prioritise the jobs and merge alike jobs in the queue.
Features
- Recurring jobs: jobs which will be re-enqueued at some interval 2
- Job queue: use an
crossbeam_channel::Sender<YourJob>
to send jobs - Future Jobs: (Optionally) create
Future
s to get results from the jobs 2 - Job prioritisation: provide a priority for jobs and all the jobs will be executed in that order
- Job merging: merge identical / similar jobs in the queue to reduce workload 2
- Parallel execution: run jobs on multiple threads and lock jobs which should be run exclusively, they remain in the queue and don’t occupy other resources
- Priority throttling: in order to have idle threads ready to pick up higher-priority jobs, throttle lower priority jobs by restricting them to a lower number of threads
Limitations
- 2 There are a few ergonomics issues to do with the job merging and recurring jobs apis. For example, all jobs need to implement
Clone
(so they can be reproduced for recurring) and any results provided by a future need to implementClone
(so that they can be merged). - some of the tests are very dependent on timing and will fail if run slowly
Example
See /examples/full.rs
for a full example, or below for examples focusing on particular capabilities.
Capabilities
The capabilities can all be combined, see the full example.
Recurring jobs
Recurring jobs are configured on the runner when it is built, the runner then clones the job and enquese it if a matching job is not enqueued within the interval.
You need to call Builder::set_recurring
and you need to implement Prioritised::matches
.
use std::time::Duration;
use gaffer::{Builder, Job, MergeResult, NoExclusion, Prioritised};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let _runner = Builder::new()
.set_recurring(
Duration::from_secs(2),
std::time::Instant::now(),
MyJob("recurring"),
)
.build(1);
std::thread::sleep(Duration::from_secs(7));
Ok(())
}
#[derive(Debug, Clone)]
struct MyJob(&'static str);
impl Job for MyJob {
type Exclusion = NoExclusion;
fn exclusion(&self) -> Self::Exclusion {
NoExclusion
}
fn execute(self) {
println!("Completed job {:?}", self);
}
}
/// This Job isn't actually prioritised but this trait needs to be implemented for now
impl Prioritised for MyJob {
type Priority = ();
fn priority(&self) -> Self::Priority {}
const ATTEMPT_MERGE_INTO: Option<fn(Self, &mut Self) -> MergeResult<Self>> = None;
/// matches needs to be implemented for recurring jobs, it must return `true` for a `.clone()` of it's self
fn matches(&self, job: &Self) -> bool {
self.0 == job.0
}
}
Job queue
Call JobRunner::send
to add jobs onto the queue, they will be executed in the order that they are enqueued
use gaffer::{Builder, Job, MergeResult, NoExclusion, Prioritised};
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runner = Builder::new().build(1);
for i in 1..=5 {
runner.send(WaitJob(format!("Job {}", i)))?;
}
println!("Jobs enqueued");
std::thread::sleep(Duration::from_secs(7));
Ok(())
}
#[derive(Debug, Clone)]
struct WaitJob(String);
impl Job for WaitJob {
type Exclusion = NoExclusion;
fn exclusion(&self) -> Self::Exclusion {
NoExclusion
}
fn execute(self) {
std::thread::sleep(Duration::from_secs(1));
println!("Completed job {:?}", self);
}
}
/// This Job isn't actually prioritised but this trait needs to be implemented for now
impl Prioritised for WaitJob {
type Priority = ();
fn priority(&self) -> Self::Priority {}
const ATTEMPT_MERGE_INTO: Option<fn(Self, &mut Self) -> MergeResult<Self>> = None;
fn matches(&self, _job: &Self) -> bool {
false
}
}
Job prioritisation
Return a value from Prioritised::priority
and jobs from the queue will be executed in priority order
use gaffer::{Builder, Job, MergeResult, NoExclusion, Prioritised};
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runner = Builder::new().build(1);
for (i, priority) in (1..=5).zip([1, 2].iter().cycle()) {
runner.send(PrioritisedJob(format!("Job {}", i), *priority))?;
}
println!("Jobs enqueued");
std::thread::sleep(Duration::from_secs(7));
Ok(())
}
#[derive(Debug, Clone)]
struct PrioritisedJob(String, u8);
impl Job for PrioritisedJob {
type Exclusion = NoExclusion;
fn exclusion(&self) -> Self::Exclusion {
NoExclusion
}
fn execute(self) {
std::thread::sleep(Duration::from_secs(1));
println!("Completed job {:?}", self);
}
}
impl Prioritised for PrioritisedJob {
type Priority = u8;
/// This Job is prioritied
fn priority(&self) -> Self::Priority {
self.1
}
const ATTEMPT_MERGE_INTO: Option<fn(Self, &mut Self) -> MergeResult<Self>> = None;
fn matches(&self, _job: &Self) -> bool {
false
}
}
Job merging
Gracefully handle spikes in duplicate or overlapping jobs by automatically merging those jobs in the queue.
Implement the Prioritised::ATTEMPT_MERGE_INTO
function.
use gaffer::{Builder, Job, MergeResult, NoExclusion, Prioritised};
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runner = Builder::new().build(1);
for i in 10..=50 {
runner.send(MergeJob(format!("Job {}", i)))?;
}
println!("Jobs enqueued");
std::thread::sleep(Duration::from_secs(7));
Ok(())
}
#[derive(Debug, Clone)]
struct MergeJob(String);
impl Job for MergeJob {
type Exclusion = NoExclusion;
fn exclusion(&self) -> Self::Exclusion {
NoExclusion
}
fn execute(self) {
std::thread::sleep(Duration::from_secs(1));
println!("Completed job {:?}", self);
}
}
/// This Job isn't actually prioritised but this trait implements the job merge
impl Prioritised for MergeJob {
type Priority = ();
fn priority(&self) -> Self::Priority {}
const ATTEMPT_MERGE_INTO: Option<fn(Self, &mut Self) -> MergeResult<Self>> =
Some(|this, that| {
if this.matches(that) {
that.0 = format!("{}x", &that.0[..that.0.len() - 1]);
MergeResult::Success
} else {
MergeResult::NotMerged(this)
}
});
fn matches(&self, that: &Self) -> bool {
self.0[..self.0.len() - 1] == that.0[..that.0.len() - 1]
}
}
Parallel execution
Jobs can be run over multiple threads, just provide the number of threads to Builder::build
use gaffer::{Builder, Job, MergeResult, NoExclusion, Prioritised};
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runner = Builder::new().build(10);
for i in 1..=50 {
runner.send(WaitJob(format!("Job {}", i)))?;
}
println!("Jobs enqueued");
std::thread::sleep(Duration::from_secs(7));
Ok(())
}
#[derive(Debug, Clone)]
struct WaitJob(String);
impl Job for WaitJob {
type Exclusion = NoExclusion;
fn exclusion(&self) -> Self::Exclusion {
NoExclusion
}
fn execute(self) {
std::thread::sleep(Duration::from_secs(1));
println!("Completed job {:?}", self);
}
}
/// This Job isn't actually prioritised but this trait needs to be implemented for now
impl Prioritised for WaitJob {
type Priority = ();
fn priority(&self) -> Self::Priority {}
const ATTEMPT_MERGE_INTO: Option<fn(Self, &mut Self) -> MergeResult<Self>> = None;
fn matches(&self, _job: &Self) -> bool {
false
}
}
Priority throttling
Lower priority jobs can be restricted to less threads to reduce the load on system resources and encourage merging (if using).
Use Builder::limit_concurrency
.
use gaffer::{Builder, Job, MergeResult, NoExclusion, Prioritised};
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runner = Builder::new()
.limit_concurrency(|priority| (priority == 1).then(|| 1))
.build(4);
for (i, priority) in (1..=10).zip([1, 2].iter().cycle()) {
runner.send(PrioritisedJob(format!("Job {}", i), *priority))?;
}
println!("Jobs enqueued");
std::thread::sleep(Duration::from_secs(7));
Ok(())
}
#[derive(Debug, Clone)]
struct PrioritisedJob(String, u8);
impl Job for PrioritisedJob {
type Exclusion = NoExclusion;
fn exclusion(&self) -> Self::Exclusion {
NoExclusion
}
fn execute(self) {
std::thread::sleep(Duration::from_secs(1));
println!("Completed job {:?}", self);
}
}
impl Prioritised for PrioritisedJob {
type Priority = u8;
/// This Job is prioritied
fn priority(&self) -> Self::Priority {
self.1
}
const ATTEMPT_MERGE_INTO: Option<fn(Self, &mut Self) -> MergeResult<Self>> = None;
fn matches(&self, _job: &Self) -> bool {
false
}
}
Future jobs
Use a [Promise
] in the job to allow await
ing job results in async code. When combined with merging, all the futures of the merged jobs will complete with clones of the single job which actually ran
use gaffer::{
future::{Promise, PromiseFuture},
Builder, Job, JobRunner, MergeResult, NoExclusion, Prioritised,
};
use std::time::Duration;
use futures::{executor::block_on, FutureExt, StreamExt};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runner = Builder::new().build(1);
let mut futures: futures::stream::SelectAll<_> = (10..=50)
.filter_map(|i| {
ProcessString::new(format!("Job {}", i), &runner)
.ok()
.map(|f| f.into_stream())
})
.collect();
println!("Jobs enqueued");
block_on(async {
while let Some(result) = futures.next().await {
let processed_string = result.unwrap();
println!(">> {}", processed_string);
}
});
Ok(())
}
struct ProcessString(String, Promise<String>);
impl ProcessString {
fn new(
name: String,
runner: &JobRunner<ProcessString>,
) -> Result<PromiseFuture<String>, crossbeam_channel::SendError<ProcessString>> {
let (promise, future) = Promise::new();
runner.send(ProcessString(name, promise))?;
Ok(future)
}
}
/// Clone is needed for recurring jobs which doesn't make sense for promises, it is a deficiency in the api that Clone needs to be implemented here and it won't be called
impl Clone for ProcessString {
fn clone(&self) -> Self {
panic!()
}
}
impl Job for ProcessString {
type Exclusion = NoExclusion;
fn exclusion(&self) -> Self::Exclusion {
NoExclusion
}
fn execute(self) {
println!("Processing job {}", self.0);
std::thread::sleep(Duration::from_secs(1));
self.1.fulfill(format!("Processed : [{}]", self.0));
}
}
/// This Job isn't actually prioritised but this trait needs to be implemented for now
impl Prioritised for ProcessString {
type Priority = ();
fn priority(&self) -> Self::Priority {}
const ATTEMPT_MERGE_INTO: Option<fn(Self, &mut Self) -> MergeResult<Self>> =
Some(|this, that| {
if this.matches(that) {
that.0 = format!("{}x", &that.0[..that.0.len() - 1]);
that.1.merge(this.1);
MergeResult::Success
} else {
MergeResult::NotMerged(this)
}
});
fn matches(&self, that: &Self) -> bool {
self.0[..self.0.len() - 1] == that.0[..that.0.len() - 1]
}
}
Usage
[dependencies]
gaffer = { git = "ssh://git@github.com/survemobility/gaffer.git", branch = "pr-1" }
Modules
Promises which can be used by a job to complete a future from another thread. Promises can also be merged, a merged promise completes all the merged futures at the same time.
Structs
Top level structure of the crate. Currently, recirring jobs would keep being scheduled once this is dropped, but that will probably change.
Allows any jobs to run at the same time
Enums
Allows some jobs to be run at the same time, others to acquire a keyed exclusive lock, and others to acquire a global exclusive lock
Result of an attempted merge, see Prioritised::ATTEMPT_MERGE_INTO
Traits
A job which can be executed by the runner, with features to synchronise jobs that would interfere with each other and reduce the parallelisation of low priority jobs
A type that can be put in a priority queue, tells the queue which order the items should come out in, whether / how to merge them, and checking whether item’s match