Crate gaffer[][src]

Expand description

Prioritised, parallel job scheduler with concurrent exclusion, job merging, recurring jobs and load limiting for lower priorities.

A job scheduler executes tasks on it’s own thread or thread pool. This job scheduler is particularly designed to consider heavier weight or more expensive jobs, which likely have side effects. In this case it can be valuable to prioritise the jobs and merge alike jobs in the queue.

Features

  • Recurring jobs: jobs which will be re-enqueued at some interval 2
  • Job queue: use an crossbeam_channel::Sender<YourJob> to send jobs
  • Future Jobs: (Optionally) create Futures to get results from the jobs 2
  • Job prioritisation: provide a priority for jobs and all the jobs will be executed in that order
  • Job merging: merge identical / similar jobs in the queue to reduce workload 2
  • Parallel execution: run jobs on multiple threads and lock jobs which should be run exclusively, they remain in the queue and don’t occupy other resources
  • Priority throttling: in order to have idle threads ready to pick up higher-priority jobs, throttle lower priority jobs by restricting them to a lower number of threads

Limitations

  • 2 There are a few ergonomics issues to do with the job merging and recurring jobs apis. For example, all jobs need to implement Clone (so they can be reproduced for recurring) and any results provided by a future need to implement Clone (so that they can be merged).
  • some of the tests are very dependent on timing and will fail if run slowly

Example

See /examples/example.rs

use gaffer::*;
use std::{thread, time::Duration};

#[derive(Debug)]
struct WaitJob(Duration, u8, Option<char>);

impl Job for WaitJob {
    type Exclusion = ExclusionOption<char>;

    fn exclusion(&self) -> Self::Exclusion {
        self.2.into()
    }

    fn execute(self) {
        thread::sleep(self.0);
        println!("Completed job {:?}", self);
    }
}

impl Prioritised for WaitJob {
    type Priority = u8;

    fn priority(&self) -> Self::Priority {
        self.1.into()
    }

    const ATTEMPT_MERGE_INTO: Option<fn(Self, &mut Self) -> MergeResult<Self>> =
        Some(|me: Self, other: &mut Self| -> MergeResult<Self> {
            if me.1 == other.1 {
                other.0 += me.0;
                other.1 = other.1.max(me.1);
                MergeResult::Success
            } else {
                MergeResult::NotMerged(me)
            }
        });
}

Capabilities

Recurring jobs

Job queue

Future jobs

Job prioritisation

Job merging

Parallel execution

Priority throttling

Usage

[dependencies]
gaffer = { git = "ssh://git@github.com/survemobility/gaffer.git", branch = "pr-1" }

Modules

Promises which can be used by a job to complete a future from another thread. Promises can also be merged, a merged promise completes all the merged futures at the same time.

Structs

Builder of JobRunner

Top level structure of the crate. Currently, recirring jobs would keep being scheduled once this is dropped, but that will probably change.

Allows any jobs to run at the same time

Enums

Allows some jobs to be run at the same time, others to acquire a keyed exclusive lock, and others to acquire a global exclusive lock

Result of an attempted merge, see Prioritised::ATTEMPT_MERGE_INTO

Traits

A job which can be executed by the runner, with features to synchronise jobs that would interfere with each other and reduce the parallelisation of low priority jobs

A type that can be put in a priority queue, tells the queue which order the items should come out in, whether / how to merge them, and checking whether item’s match