//! Prioritised, parallel job scheduler with concurrent exclusion, job merging, recurring jobs and load limiting for lower priorities.
//!
//! A job scheduler executes tasks on it's own thread or thread pool. This job scheduler is particularly designed to consider heavier weight or more expensive jobs, which likely have side effects. In this case it can be valuable to prioritise the jobs and merge alike jobs in the queue.
//!
//! __Features__
//!
//! * Recurring jobs: jobs which will be re-enqueued at some interval
//! * Job queue: send jobs from various threads using the cloneable [`JobRunner`]
//! * Future Jobs: (Optionally) create `Future`s to get results from the jobs
//! * Job prioritisation: provide a priority for jobs and all the jobs will be executed in that order
//! * Job merging: merge identical / similar jobs in the queue to reduce workload
//! * Parallel execution: run jobs on multiple threads and lock jobs which should be run exclusively, they remain in the queue and don't occupy other resources
//! * Concurrent exclusion: key-based locking to avoid jobs running concurrently which shouldn't
//! * Priority throttling: in order to have idle threads ready to pick up higher-priority jobs, throttle lower priority jobs by restricting them to a lower number of threads
//!
//! __Limitations__
//!
//! * some of the tests are very dependent on timing and will fail if run slowly
//!
//! ## Example
//!
//! See `/examples/full.rs` for a full example, or below for examples focusing on particular capabilities.
//!
//! ## Capabilities
//!
//! Below, the examples show minimal usages of each of the capabilities for clarity, but if you're using this you probably want most or all of these.
//!
//! ### Recurring jobs
//!
//! Recurring jobs are configured on the runner when it is built, the runner then clones the job and enquese it if a matching job is not enqueued within the interval.
//!
//! You need to call [`Builder::set_recurring`] and you need to implement [`RecurrableJob`].
//!
//! ```
//! use gaffer::{Job, JobRunner, NoExclusion, RecurrableJob};
//! use std::time::Duration;
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let _runner = JobRunner::builder()
//! .set_recurring(
//! Duration::from_secs(2),
//! std::time::Instant::now(),
//! MyJob("recurring"),
//! )
//! .build(1);
//!
//! std::thread::sleep(Duration::from_secs(7));
//! Ok(())
//! }
//!
//! #[derive(Debug, Clone)]
//! struct MyJob(&'static str);
//!
//! impl Job for MyJob {
//! type Exclusion = NoExclusion;
//!
//! fn exclusion(&self) -> Self::Exclusion {
//! NoExclusion
//! }
//!
//! fn execute(self) {
//! println!("Completed job {:?}", self);
//! }
//!
//! type Priority = ();
//!
//! fn priority(&self) -> Self::Priority {}
//! }
//!
//! impl RecurrableJob for MyJob {
//! fn matches(&self, other: &Self) -> bool {
//! self.0 == other.0
//! }
//! }
//!
//! ```
//!
//! ### Job queue
//!
//! Call [`JobRunner::send`] to add jobs onto the queue, they will be executed in the order that they are enqueued
//!
//! ```
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let runner = gaffer::JobRunner::builder().build(1);
//!
//! for i in 1..=5 {
//! let name = format!("Job {}", i);
//! runner.send(move || {
//! std::thread::sleep(std::time::Duration::from_secs(1));
//! println!("Completed job {:?}", name);
//! })?;
//! }
//!
//! println!("Jobs enqueued");
//! std::thread::sleep(std::time::Duration::from_secs(7));
//! Ok(())
//! }
//! ```
//!
//! ### Job prioritisation
//!
//! Return a value from [`Job::priority`] and jobs from the queue will be executed in priority order
//!
//! ```
//! use gaffer::{Job, JobRunner, NoExclusion};
//! use std::time::Duration;
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let runner = JobRunner::builder().build(1);
//!
//! for (i, priority) in (1..=5).zip([1, 2].iter().cycle()) {
//! runner.send(PrioritisedJob(format!("Job {}", i), *priority))?;
//! }
//!
//! println!("Jobs enqueued");
//! std::thread::sleep(Duration::from_secs(7));
//! Ok(())
//! }
//!
//! #[derive(Debug)]
//! struct PrioritisedJob(String, u8);
//!
//! impl Job for PrioritisedJob {
//! type Exclusion = NoExclusion;
//!
//! fn exclusion(&self) -> Self::Exclusion {
//! NoExclusion
//! }
//!
//! type Priority = u8;
//!
//! /// This Job is prioritied
//! fn priority(&self) -> Self::Priority {
//! self.1
//! }
//!
//! fn execute(self) {
//! std::thread::sleep(Duration::from_secs(1));
//! println!("Completed job {:?}", self);
//! }
//! }
//!
//! ```
//!
//! ### Job merging
//!
//! Gracefully handle spikes in duplicate or overlapping jobs by automatically merging those jobs in the queue.
//! Call [`Builder::enable_merge`].
//!
//! ```
//! use gaffer::{Job, JobRunner, MergeResult, NoExclusion};
//! use std::time::Duration;
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let runner = JobRunner::builder().enable_merge(merge_jobs).build(1);
//!
//! for i in 10..=50 {
//! runner.send(MergeJob(format!("Job {}", i)))?;
//! }
//!
//! println!("Jobs enqueued");
//! std::thread::sleep(Duration::from_secs(7));
//! Ok(())
//! }
//!
//! #[derive(Debug)]
//! struct MergeJob(String);
//!
//! impl Job for MergeJob {
//! type Exclusion = NoExclusion;
//!
//! fn exclusion(&self) -> Self::Exclusion {
//! NoExclusion
//! }
//!
//! type Priority = ();
//!
//! fn priority(&self) -> Self::Priority {}
//!
//! fn execute(self) {
//! std::thread::sleep(Duration::from_secs(1));
//! println!("Completed job {:?}", self);
//! }
//! }
//!
//! fn merge_jobs(this: MergeJob, that: &mut MergeJob) -> MergeResult<MergeJob> {
//! if this.0[..this.0.len() - 1] == that.0[..that.0.len() - 1] {
//! that.0 = format!("{}x", &that.0[..that.0.len() - 1]);
//! MergeResult::Success
//! } else {
//! MergeResult::NotMerged(this)
//! }
//! }
//!
//! ```
//!
//! ### Parallel execution
//!
//! Jobs can be run over multiple threads, just provide the number of threads to [`Builder::build`]
//!
//! ```
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let runner = gaffer::JobRunner::builder().build(10);
//!
//! for i in 1..=50 {
//! let name = format!("WaitJob {}", i);
//! runner.send(move || {
//! std::thread::sleep(std::time::Duration::from_secs(1));
//! println!("Completed job {:?}", name);
//! })?;
//! }
//!
//! println!("Jobs enqueued");
//! std::thread::sleep(std::time::Duration::from_secs(7));
//! Ok(())
//! }
//! ```
//!
//! ### Concurrent exclusion
//!
//! Exclusion keys can be provided to show which jobs need to be run exclusively
//!
//! ```
//! use gaffer::{ExclusionOption, Job, JobRunner};
//! use std::time::Duration;
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let runner = JobRunner::builder().build(2);
//!
//! for (i, exclusion) in (1..=10).zip([ExclusionOption::Some(1), ExclusionOption::Some(2), ExclusionOption::None].iter().cycle()) {
//! runner.send(ExcludedJob(format!("Job {}", i), *exclusion))?;
//! }
//!
//! println!("Jobs enqueued");
//! std::thread::sleep(Duration::from_secs(7));
//! Ok(())
//! }
//!
//! #[derive(Debug)]
//! struct ExcludedJob(String, ExclusionOption<u8>);
//!
//! impl Job for ExcludedJob {
//! type Exclusion = ExclusionOption<u8>;
//!
//! fn exclusion(&self) -> Self::Exclusion {
//! self.1
//! }
//!
//! type Priority = ();
//!
//! fn priority(&self) -> Self::Priority {}
//!
//! fn execute(self) {
//! std::thread::sleep(Duration::from_secs(1));
//! println!("Completed job {:?}", self);
//! }
//! }
//!
//! ```
//!
//! ### Priority throttling
//!
//! Lower priority jobs can be restricted to less threads to reduce the load on system resources and encourage merging (if using).
//!
//! Use [`Builder::limit_concurrency`].
//!
//! ```
//! use gaffer::{Job, JobRunner, NoExclusion};
//! use std::time::Duration;
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let runner = JobRunner::builder()
//! .limit_concurrency(|priority| (priority == 1).then(|| 1))
//! .build(4);
//!
//! for (i, priority) in (1..=10).zip([1, 2].iter().cycle()) {
//! runner.send(PrioritisedJob(format!("Job {}", i), *priority))?;
//! }
//!
//! println!("Jobs enqueued");
//! std::thread::sleep(Duration::from_secs(7));
//! Ok(())
//! }
//!
//! #[derive(Debug)]
//! struct PrioritisedJob(String, u8);
//!
//! impl Job for PrioritisedJob {
//! type Exclusion = NoExclusion;
//!
//! fn exclusion(&self) -> Self::Exclusion {
//! NoExclusion
//! }
//!
//! type Priority = u8;
//!
//! /// This Job is prioritied
//! fn priority(&self) -> Self::Priority {
//! self.1
//! }
//!
//! fn execute(self) {
//! std::thread::sleep(Duration::from_secs(1));
//! println!("Completed job {:?}", self);
//! }
//! }
//!
//! ```
//!
//! ### Future jobs
//!
//! Use a [`future::Promise`] in the job to allow `await`ing job results in async code. When combined with merging, all the futures of the merged jobs will complete with clones of the single job which actually ran
//!
//! ```
//! use gaffer::{
//! future::{Promise, PromiseFuture},
//! Job, JobRunner, MergeResult, NoExclusion,
//! };
//! use std::time::Duration;
//!
//! use futures::{executor::block_on, FutureExt, StreamExt};
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let runner = JobRunner::builder()
//! .enable_merge(|this: ProcessString, that: &mut ProcessString| {
//! if this.0[..this.0.len() - 1] == that.0[..that.0.len() - 1] {
//! that.0 = format!("{}x", &that.0[..that.0.len() - 1]);
//! that.1.merge(this.1);
//! MergeResult::Success
//! } else {
//! MergeResult::NotMerged(this)
//! }
//! })
//! .build(1);
//!
//! let mut futures: futures::stream::SelectAll<_> = (10..=50)
//! .filter_map(|i| {
//! ProcessString::new(format!("Job {}", i), &runner)
//! .ok()
//! .map(|f| f.into_stream())
//! })
//! .collect();
//! println!("Jobs enqueued");
//!
//! block_on(async {
//! while let Some(result) = futures.next().await {
//! let processed_string = result.unwrap();
//! println!(">> {}", processed_string);
//! }
//! });
//! Ok(())
//! }
//!
//! struct ProcessString(String, Promise<String>);
//!
//! impl ProcessString {
//! fn new(
//! name: String,
//! runner: &JobRunner<ProcessString>,
//! ) -> Result<PromiseFuture<String>, crossbeam_channel::SendError<ProcessString>> {
//! let (promise, future) = Promise::new();
//! runner.send(ProcessString(name, promise))?;
//! Ok(future)
//! }
//! }
//!
//! impl Job for ProcessString {
//! type Exclusion = NoExclusion;
//!
//! fn exclusion(&self) -> Self::Exclusion {
//! NoExclusion
//! }
//!
//! type Priority = ();
//!
//! fn priority(&self) -> Self::Priority {}
//!
//! fn execute(self) {
//! println!("Processing job {}", self.0);
//! std::thread::sleep(Duration::from_secs(1));
//! self.1.fulfill(format!("Processed : [{}]", self.0));
//! }
//! }
//! ```
use Mutex;
use ;
use ConcurrencyLimitFn;
pub use RecurrableJob;
use ;
/// Top level structure of the crate. Currently, recurring jobs would keep being scheduled once this is dropped, but that will probably change.
///
/// See crate level docs
/// Builder of [`JobRunner`]
/// A job which can be executed by the runner, with features to synchronise jobs that would interfere with each other and reduce the parallelisation of low priority jobs
/// A type that can be put in a priority queue, tells the queue which order the items should come out in, whether / how to merge them, and checking whether item's match
/// Result of an attempted merge, see [`Builder::enable_merge`]
/// Allows any jobs to run at the same time
;
/// Allows some jobs to be run at the same time, others to acquire a keyed exclusive lock, and others to acquire a global exclusive lock