gaffer/
lib.rs

1//! Prioritised, parallel job scheduler with concurrent exclusion, job merging, recurring jobs and load limiting for lower priorities.
2//!
3//! A job scheduler executes tasks on it's own thread or thread pool. This job scheduler is particularly designed to consider heavier weight or more expensive jobs, which likely have side effects. In this case it can be valuable to prioritise the jobs and merge alike jobs in the queue.
4//!
5//! __Features__
6//!
7//! * Recurring jobs: jobs which will be re-enqueued at some interval
8//! * Job queue: send jobs from various threads using the cloneable [`JobRunner`]
9//! * Future Jobs: (Optionally) create `Future`s to get results from the jobs
10//! * Job prioritisation: provide a priority for jobs and all the jobs will be executed in that order
11//! * Job merging: merge identical / similar jobs in the queue to reduce workload
12//! * Parallel execution: run jobs on multiple threads and lock jobs which should be run exclusively, they remain in the queue and don't occupy other resources
13//! * Concurrent exclusion: key-based locking to avoid jobs running concurrently which shouldn't
14//! * Priority throttling: in order to have idle threads ready to pick up higher-priority jobs, throttle lower priority jobs by restricting them to a lower number of threads
15//!
16//! __Limitations__
17//!
18//! * some of the tests are very dependent on timing and will fail if run slowly
19//!
20//! ## Example
21//!
22//! See `/examples/full.rs` for a full example, or below for examples focusing on particular capabilities.
23//!
24//! ## Capabilities
25//!
26//! Below, the examples show minimal usages of each of the capabilities for clarity, but if you're using this you probably want most or all of these.
27//!
28//! ### Recurring jobs
29//!
30//! Recurring jobs are configured on the runner when it is built, the runner then clones the job and enquese it if a matching job is not enqueued within the interval.
31//!
32//! You need to call [`Builder::set_recurring`] and you need to implement [`RecurrableJob`].
33//!
34//! ```
35//! use gaffer::{Job, JobRunner, NoExclusion, RecurrableJob};
36//! use std::time::Duration;
37//!
38//! fn main() -> Result<(), Box<dyn std::error::Error>> {
39//!     let _runner = JobRunner::builder()
40//!         .set_recurring(
41//!             Duration::from_secs(2),
42//!             std::time::Instant::now(),
43//!             MyJob("recurring"),
44//!         )
45//!         .build(1);
46//!
47//!     std::thread::sleep(Duration::from_secs(7));
48//!     Ok(())
49//! }
50//!
51//! #[derive(Debug, Clone)]
52//! struct MyJob(&'static str);
53//!
54//! impl Job for MyJob {
55//!     type Exclusion = NoExclusion;
56//!
57//!     fn exclusion(&self) -> Self::Exclusion {
58//!         NoExclusion
59//!     }
60//!
61//!     fn execute(self) {
62//!         println!("Completed job {:?}", self);
63//!     }
64//!
65//!     type Priority = ();
66//!
67//!     fn priority(&self) -> Self::Priority {}
68//! }
69//!
70//! impl RecurrableJob for MyJob {
71//!     fn matches(&self, other: &Self) -> bool {
72//!         self.0 == other.0
73//!     }
74//! }
75//!
76//! ```
77//!
78//! ### Job queue
79//!
80//! Call [`JobRunner::send`] to add jobs onto the queue, they will be executed in the order that they are enqueued
81//!
82//! ```
83//! fn main() -> Result<(), Box<dyn std::error::Error>> {
84//!     let runner = gaffer::JobRunner::builder().build(1);
85//!
86//!     for i in 1..=5 {
87//!         let name = format!("Job {}", i);
88//!         runner.send(move || {
89//!             std::thread::sleep(std::time::Duration::from_secs(1));
90//!             println!("Completed job {:?}", name);
91//!         })?;
92//!     }
93//!
94//!     println!("Jobs enqueued");
95//!     std::thread::sleep(std::time::Duration::from_secs(7));
96//!     Ok(())
97//! }
98//! ```
99//!
100//! ### Job prioritisation
101//!
102//! Return a value from [`Job::priority`] and jobs from the queue will be executed in priority order
103//!
104//! ```
105//! use gaffer::{Job, JobRunner, NoExclusion};
106//! use std::time::Duration;
107//!
108//! fn main() -> Result<(), Box<dyn std::error::Error>> {
109//!     let runner = JobRunner::builder().build(1);
110//!
111//!     for (i, priority) in (1..=5).zip([1, 2].iter().cycle()) {
112//!         runner.send(PrioritisedJob(format!("Job {}", i), *priority))?;
113//!     }
114//!
115//!     println!("Jobs enqueued");
116//!     std::thread::sleep(Duration::from_secs(7));
117//!     Ok(())
118//! }
119//!
120//! #[derive(Debug)]
121//! struct PrioritisedJob(String, u8);
122//!
123//! impl Job for PrioritisedJob {
124//!     type Exclusion = NoExclusion;
125//!
126//!     fn exclusion(&self) -> Self::Exclusion {
127//!         NoExclusion
128//!     }
129//!
130//!     type Priority = u8;
131//!
132//!     /// This Job is prioritied
133//!     fn priority(&self) -> Self::Priority {
134//!         self.1
135//!     }
136//!
137//!     fn execute(self) {
138//!         std::thread::sleep(Duration::from_secs(1));
139//!         println!("Completed job {:?}", self);
140//!     }
141//! }
142//!
143//! ```
144//!
145//! ### Job merging
146//!
147//! Gracefully handle spikes in duplicate or overlapping jobs by automatically merging those jobs in the queue.
148//! Call [`Builder::enable_merge`].
149//!
150//! ```
151//! use gaffer::{Job, JobRunner, MergeResult, NoExclusion};
152//! use std::time::Duration;
153//!
154//! fn main() -> Result<(), Box<dyn std::error::Error>> {
155//!     let runner = JobRunner::builder().enable_merge(merge_jobs).build(1);
156//!
157//!     for i in 10..=50 {
158//!         runner.send(MergeJob(format!("Job {}", i)))?;
159//!     }
160//!
161//!     println!("Jobs enqueued");
162//!     std::thread::sleep(Duration::from_secs(7));
163//!     Ok(())
164//! }
165//!
166//! #[derive(Debug)]
167//! struct MergeJob(String);
168//!
169//! impl Job for MergeJob {
170//!     type Exclusion = NoExclusion;
171//!
172//!     fn exclusion(&self) -> Self::Exclusion {
173//!         NoExclusion
174//!     }
175//!
176//!     type Priority = ();
177//!
178//!     fn priority(&self) -> Self::Priority {}
179//!
180//!     fn execute(self) {
181//!         std::thread::sleep(Duration::from_secs(1));
182//!         println!("Completed job {:?}", self);
183//!     }
184//! }
185//!
186//! fn merge_jobs(this: MergeJob, that: &mut MergeJob) -> MergeResult<MergeJob> {
187//!     if this.0[..this.0.len() - 1] == that.0[..that.0.len() - 1] {
188//!         that.0 = format!("{}x", &that.0[..that.0.len() - 1]);
189//!         MergeResult::Success
190//!     } else {
191//!         MergeResult::NotMerged(this)
192//!     }
193//! }
194//!
195//! ```
196//!
197//! ### Parallel execution
198//!
199//! Jobs can be run over multiple threads, just provide the number of threads to [`Builder::build`]
200//!
201//! ```
202//! fn main() -> Result<(), Box<dyn std::error::Error>> {
203//!     let runner = gaffer::JobRunner::builder().build(10);
204//!
205//!     for i in 1..=50 {
206//!         let name = format!("WaitJob {}", i);
207//!         runner.send(move || {
208//!             std::thread::sleep(std::time::Duration::from_secs(1));
209//!             println!("Completed job {:?}", name);
210//!         })?;
211//!     }
212//!
213//!     println!("Jobs enqueued");
214//!     std::thread::sleep(std::time::Duration::from_secs(7));
215//!     Ok(())
216//! }
217//! ```
218//!
219//! ### Concurrent exclusion
220//!
221//! Exclusion keys can be provided to show which jobs need to be run exclusively
222//!
223//! ```
224//! use gaffer::{ExclusionOption, Job, JobRunner};
225//! use std::time::Duration;
226//!
227//! fn main() -> Result<(), Box<dyn std::error::Error>> {
228//!     let runner = JobRunner::builder().build(2);
229//!
230//!     for (i, exclusion) in (1..=10).zip([ExclusionOption::Some(1), ExclusionOption::Some(2), ExclusionOption::None].iter().cycle()) {
231//!         runner.send(ExcludedJob(format!("Job {}", i), *exclusion))?;
232//!     }
233//!
234//!     println!("Jobs enqueued");
235//!     std::thread::sleep(Duration::from_secs(7));
236//!     Ok(())
237//! }
238//!
239//! #[derive(Debug)]
240//! struct ExcludedJob(String, ExclusionOption<u8>);
241//!
242//! impl Job for ExcludedJob {
243//!     type Exclusion = ExclusionOption<u8>;
244//!
245//!     fn exclusion(&self) -> Self::Exclusion {
246//!         self.1
247//!     }
248//!
249//!     type Priority = ();
250//!
251//!     fn priority(&self) -> Self::Priority {}
252//!
253//!     fn execute(self) {
254//!         std::thread::sleep(Duration::from_secs(1));
255//!         println!("Completed job {:?}", self);
256//!     }
257//! }
258//!
259//! ```
260//!
261//! ### Priority throttling
262//!
263//! Lower priority jobs can be restricted to less threads to reduce the load on system resources and encourage merging (if using).
264//!
265//! Use [`Builder::limit_concurrency`].
266//!
267//! ```
268//! use gaffer::{Job, JobRunner, NoExclusion};
269//! use std::time::Duration;
270//!
271//! fn main() -> Result<(), Box<dyn std::error::Error>> {
272//!     let runner = JobRunner::builder()
273//!         .limit_concurrency(|priority| (priority == 1).then(|| 1))
274//!         .build(4);
275//!
276//!     for (i, priority) in (1..=10).zip([1, 2].iter().cycle()) {
277//!         runner.send(PrioritisedJob(format!("Job {}", i), *priority))?;
278//!     }
279//!
280//!     println!("Jobs enqueued");
281//!     std::thread::sleep(Duration::from_secs(7));
282//!     Ok(())
283//! }
284//!
285//! #[derive(Debug)]
286//! struct PrioritisedJob(String, u8);
287//!
288//! impl Job for PrioritisedJob {
289//!     type Exclusion = NoExclusion;
290//!
291//!     fn exclusion(&self) -> Self::Exclusion {
292//!         NoExclusion
293//!     }
294//!
295//!     type Priority = u8;
296//!
297//!     /// This Job is prioritied
298//!     fn priority(&self) -> Self::Priority {
299//!         self.1
300//!     }
301//!
302//!     fn execute(self) {
303//!         std::thread::sleep(Duration::from_secs(1));
304//!         println!("Completed job {:?}", self);
305//!     }
306//! }
307//!
308//! ```
309//!
310//! ### Future jobs
311//!
312//! Use a [`future::Promise`] in the job to allow `await`ing job results in async code. When combined with merging, all the futures of the merged jobs will complete with clones of the single job which actually ran
313//!
314//! ```
315//! use gaffer::{
316//!     future::{Promise, PromiseFuture},
317//!     Job, JobRunner, MergeResult, NoExclusion,
318//! };
319//! use std::time::Duration;
320//!
321//! use futures::{executor::block_on, FutureExt, StreamExt};
322//!
323//! fn main() -> Result<(), Box<dyn std::error::Error>> {
324//!     let runner = JobRunner::builder()
325//!         .enable_merge(|this: ProcessString, that: &mut ProcessString| {
326//!             if this.0[..this.0.len() - 1] == that.0[..that.0.len() - 1] {
327//!                 that.0 = format!("{}x", &that.0[..that.0.len() - 1]);
328//!                 that.1.merge(this.1);
329//!                 MergeResult::Success
330//!             } else {
331//!                 MergeResult::NotMerged(this)
332//!             }
333//!         })
334//!         .build(1);
335//!
336//!     let mut futures: futures::stream::SelectAll<_> = (10..=50)
337//!         .filter_map(|i| {
338//!             ProcessString::new(format!("Job {}", i), &runner)
339//!                 .ok()
340//!                 .map(|f| f.into_stream())
341//!         })
342//!         .collect();
343//!     println!("Jobs enqueued");
344//!
345//!     block_on(async {
346//!         while let Some(result) = futures.next().await {
347//!             let processed_string = result.unwrap();
348//!             println!(">> {}", processed_string);
349//!         }
350//!     });
351//!     Ok(())
352//! }
353//!
354//! struct ProcessString(String, Promise<String>);
355//!
356//! impl ProcessString {
357//!     fn new(
358//!         name: String,
359//!         runner: &JobRunner<ProcessString>,
360//!     ) -> Result<PromiseFuture<String>, crossbeam_channel::SendError<ProcessString>> {
361//!         let (promise, future) = Promise::new();
362//!         runner.send(ProcessString(name, promise))?;
363//!         Ok(future)
364//!     }
365//! }
366//!
367//! impl Job for ProcessString {
368//!     type Exclusion = NoExclusion;
369//!
370//!     fn exclusion(&self) -> Self::Exclusion {
371//!         NoExclusion
372//!     }
373//!
374//!     type Priority = ();
375//!
376//!     fn priority(&self) -> Self::Priority {}
377//!
378//!     fn execute(self) {
379//!         println!("Processing job {}", self.0);
380//!         std::thread::sleep(Duration::from_secs(1));
381//!         self.1.fulfill(format!("Processed : [{}]", self.0));
382//!     }
383//! }
384//! ```
385
386#![warn(missing_docs)]
387#![warn(rust_2018_idioms)]
388
389use parking_lot::Mutex;
390
391use std::{
392    fmt,
393    sync::Arc,
394    time::{Duration, Instant},
395};
396
397use runner::ConcurrencyLimitFn;
398pub use source::RecurrableJob;
399use source::{IntervalRecurringJob, RecurringJob, SourceManager};
400
401pub mod future;
402mod runner;
403mod source;
404
405/// Top level structure of the crate. Currently, recurring jobs would keep being scheduled once this is dropped, but that will probably change.
406///
407/// See crate level docs
408pub struct JobRunner<J> {
409    sender: crossbeam_channel::Sender<J>,
410}
411
412impl<J: Job + 'static> JobRunner<J> {
413    /// Create a Builder to start building a [`JobRunner`]
414    pub fn builder() -> Builder<J> {
415        Builder::new()
416    }
417
418    /// Send a job to the queue
419    pub fn send(&self, job: J) -> Result<(), crossbeam_channel::SendError<J>> {
420        self.sender.send(job)
421    }
422}
423
424impl<J> Clone for JobRunner<J> {
425    fn clone(&self) -> Self {
426        Self {
427            sender: self.sender.clone(),
428        }
429    }
430}
431
432/// Builder of [`JobRunner`]
433pub struct Builder<J: Job + 'static> {
434    concurrency_limit: Box<ConcurrencyLimitFn<J>>,
435    recurring: Vec<Box<dyn RecurringJob<J> + Send>>,
436    /// optional function to allow merging of jobs
437    merge_fn: Option<fn(J, &mut J) -> MergeResult<J>>,
438}
439
440impl<J: Job + Send + 'static> Builder<J> {
441    /// Start building a [`JobRunner`]
442    fn new() -> Self {
443        Builder {
444            concurrency_limit: Box::new(|_: <J as Prioritised>::Priority| None as Option<u8>),
445            recurring: vec![],
446            merge_fn: None,
447        }
448    }
449
450    /// Enable merging of Jobs in the queue, if a merge function is provided here, it will be tried with each job added to the queue against each job already in the queue
451    pub fn enable_merge(mut self, f: fn(J, &mut J) -> MergeResult<J>) -> Self {
452        self.merge_fn = Some(f);
453        self
454    }
455}
456
457impl<J: Job + Send + RecurrableJob + 'static> Builder<J> {
458    /// Set a job as recurring, the job will be enqueued every time `interval` passes since the `last_enqueue` of a matching job
459    pub fn set_recurring(mut self, interval: Duration, last_enqueue: Instant, job: J) -> Self {
460        self.recurring.push(Box::new(IntervalRecurringJob {
461            last_enqueue,
462            interval,
463            job,
464        }));
465        self
466    }
467}
468
469impl<J: Job + Send + 'static> Default for Builder<J> {
470    fn default() -> Self {
471        Self::new()
472    }
473}
474
475impl<J: Job + Send + 'static> Builder<J> {
476    /// Function determining, for each priority, how many threads can be allocated to jobs of this priority, any remaining threads will be left idle to service higher-priority jobs. `None` means parallelism won't be limited
477    pub fn limit_concurrency(
478        mut self,
479        concurrency_limit: impl Fn(<J as Job>::Priority) -> Option<u8> + Send + Sync + 'static,
480    ) -> Self {
481        self.concurrency_limit = Box::new(concurrency_limit);
482        self
483    }
484
485    /// Build the [`JobRunner`], spawning `thread_num` threads as workers
486    pub fn build(self, thread_num: usize) -> JobRunner<J> {
487        let (sender, sources) =
488            SourceManager::<J, Box<dyn RecurringJob<J> + Send>>::new_with_recurring(
489                self.recurring,
490                self.merge_fn,
491            );
492        let jobs = Arc::new(Mutex::new(sources));
493        let _threads = runner::spawn(thread_num, jobs, self.concurrency_limit);
494        JobRunner { sender }
495    }
496}
497
498/// A job which can be executed by the runner, with features to synchronise jobs that would interfere with each other and reduce the parallelisation of low priority jobs
499pub trait Job: Send {
500    /// Type used to check which jobs should not be allowed to run concurrently, see [`Job::exclusion()`]. Use [`NoExclusion`] for jobs which can always be run at the same time, see also [`ExclusionOption`].
501    type Exclusion: PartialEq + Copy + fmt::Debug + Send;
502
503    /// Used to check which jobs should not be allowed to run concurrently, if `<Job::Exclusion as PartialEq>::eq(job1.exclusion(), job2.exclusion())`, then `job1` and `job2` can't run at the same time.
504    fn exclusion(&self) -> Self::Exclusion;
505
506    /// Type of the priority, the higher prioritys are those which are larger based on [`Ord::cmp`].
507    type Priority: Ord + Copy + Send;
508
509    /// Get the priority of this thing
510    fn priority(&self) -> Self::Priority;
511
512    /// Execute and consume the job
513    fn execute(self);
514}
515
516/// A type that can be put in a priority queue, tells the queue which order the items should come out in, whether / how to merge them, and checking whether item's match
517trait Prioritised: Sized {
518    /// Type of the priority, the higher prioritys are those which are larger based on [`Ord::cmp`].
519    type Priority: Ord + Copy + Send;
520
521    /// Get the priority of this thing
522    fn priority(&self) -> Self::Priority;
523}
524
525impl<J: Job> Prioritised for J {
526    type Priority = <J as Job>::Priority;
527
528    fn priority(&self) -> Self::Priority {
529        <J as Job>::priority(self)
530    }
531}
532
533impl<T> Job for T
534where
535    T: FnOnce() + Send,
536{
537    type Exclusion = NoExclusion;
538
539    fn exclusion(&self) -> Self::Exclusion {
540        NoExclusion
541    }
542
543    type Priority = ();
544
545    fn priority(&self) -> Self::Priority {}
546
547    fn execute(self) {
548        (self)()
549    }
550}
551
552/// Result of an attempted merge, see [`Builder::enable_merge`]
553pub enum MergeResult<P> {
554    /// merge was sucessful, eg. either because the items are the same or one is a superset of the other
555    Success,
556    /// the attempted items were not suitable for merging
557    NotMerged(P),
558}
559
560/// Allows any jobs to run at the same time
561#[derive(Debug, Copy, Clone)]
562pub struct NoExclusion;
563
564impl PartialEq for NoExclusion {
565    fn eq(&self, _other: &Self) -> bool {
566        false
567    }
568}
569
570/// Allows some jobs to be run at the same time, others to acquire a keyed exclusive lock, and others to acquire a global exclusive lock
571#[derive(Debug, Copy, Clone)]
572pub enum ExclusionOption<T> {
573    /// This job excludes all others, it can only be run whilst all other workers are idle. NOTE: If the runner is busy this will have to wait until all jobs are finished
574    All,
575    /// This job excludes some other jobs which match `T`
576    Some(T),
577    /// This job excludes no other jobs and can run at any time
578    None,
579}
580
581impl<T: PartialEq> PartialEq for ExclusionOption<T> {
582    fn eq(&self, other: &Self) -> bool {
583        match (self, other) {
584            (ExclusionOption::Some(me), ExclusionOption::Some(other)) => me == other,
585            (ExclusionOption::All, _) => true,
586            (_, ExclusionOption::All) => true,
587            _ => false,
588        }
589    }
590}
591
592impl<T> From<Option<T>> for ExclusionOption<T> {
593    fn from(val: Option<T>) -> Self {
594        if let Some(val) = val {
595            ExclusionOption::Some(val)
596        } else {
597            ExclusionOption::None
598        }
599    }
600}
601
602impl<T> From<T> for ExclusionOption<T> {
603    fn from(val: T) -> Self {
604        ExclusionOption::Some(val)
605    }
606}