gaffer/lib.rs
1//! Prioritised, parallel job scheduler with concurrent exclusion, job merging, recurring jobs and load limiting for lower priorities.
2//!
3//! A job scheduler executes tasks on it's own thread or thread pool. This job scheduler is particularly designed to consider heavier weight or more expensive jobs, which likely have side effects. In this case it can be valuable to prioritise the jobs and merge alike jobs in the queue.
4//!
5//! __Features__
6//!
7//! * Recurring jobs: jobs which will be re-enqueued at some interval
8//! * Job queue: send jobs from various threads using the cloneable [`JobRunner`]
9//! * Future Jobs: (Optionally) create `Future`s to get results from the jobs
10//! * Job prioritisation: provide a priority for jobs and all the jobs will be executed in that order
11//! * Job merging: merge identical / similar jobs in the queue to reduce workload
12//! * Parallel execution: run jobs on multiple threads and lock jobs which should be run exclusively, they remain in the queue and don't occupy other resources
13//! * Concurrent exclusion: key-based locking to avoid jobs running concurrently which shouldn't
14//! * Priority throttling: in order to have idle threads ready to pick up higher-priority jobs, throttle lower priority jobs by restricting them to a lower number of threads
15//!
16//! __Limitations__
17//!
18//! * some of the tests are very dependent on timing and will fail if run slowly
19//!
20//! ## Example
21//!
22//! See `/examples/full.rs` for a full example, or below for examples focusing on particular capabilities.
23//!
24//! ## Capabilities
25//!
26//! Below, the examples show minimal usages of each of the capabilities for clarity, but if you're using this you probably want most or all of these.
27//!
28//! ### Recurring jobs
29//!
30//! Recurring jobs are configured on the runner when it is built, the runner then clones the job and enquese it if a matching job is not enqueued within the interval.
31//!
32//! You need to call [`Builder::set_recurring`] and you need to implement [`RecurrableJob`].
33//!
34//! ```
35//! use gaffer::{Job, JobRunner, NoExclusion, RecurrableJob};
36//! use std::time::Duration;
37//!
38//! fn main() -> Result<(), Box<dyn std::error::Error>> {
39//! let _runner = JobRunner::builder()
40//! .set_recurring(
41//! Duration::from_secs(2),
42//! std::time::Instant::now(),
43//! MyJob("recurring"),
44//! )
45//! .build(1);
46//!
47//! std::thread::sleep(Duration::from_secs(7));
48//! Ok(())
49//! }
50//!
51//! #[derive(Debug, Clone)]
52//! struct MyJob(&'static str);
53//!
54//! impl Job for MyJob {
55//! type Exclusion = NoExclusion;
56//!
57//! fn exclusion(&self) -> Self::Exclusion {
58//! NoExclusion
59//! }
60//!
61//! fn execute(self) {
62//! println!("Completed job {:?}", self);
63//! }
64//!
65//! type Priority = ();
66//!
67//! fn priority(&self) -> Self::Priority {}
68//! }
69//!
70//! impl RecurrableJob for MyJob {
71//! fn matches(&self, other: &Self) -> bool {
72//! self.0 == other.0
73//! }
74//! }
75//!
76//! ```
77//!
78//! ### Job queue
79//!
80//! Call [`JobRunner::send`] to add jobs onto the queue, they will be executed in the order that they are enqueued
81//!
82//! ```
83//! fn main() -> Result<(), Box<dyn std::error::Error>> {
84//! let runner = gaffer::JobRunner::builder().build(1);
85//!
86//! for i in 1..=5 {
87//! let name = format!("Job {}", i);
88//! runner.send(move || {
89//! std::thread::sleep(std::time::Duration::from_secs(1));
90//! println!("Completed job {:?}", name);
91//! })?;
92//! }
93//!
94//! println!("Jobs enqueued");
95//! std::thread::sleep(std::time::Duration::from_secs(7));
96//! Ok(())
97//! }
98//! ```
99//!
100//! ### Job prioritisation
101//!
102//! Return a value from [`Job::priority`] and jobs from the queue will be executed in priority order
103//!
104//! ```
105//! use gaffer::{Job, JobRunner, NoExclusion};
106//! use std::time::Duration;
107//!
108//! fn main() -> Result<(), Box<dyn std::error::Error>> {
109//! let runner = JobRunner::builder().build(1);
110//!
111//! for (i, priority) in (1..=5).zip([1, 2].iter().cycle()) {
112//! runner.send(PrioritisedJob(format!("Job {}", i), *priority))?;
113//! }
114//!
115//! println!("Jobs enqueued");
116//! std::thread::sleep(Duration::from_secs(7));
117//! Ok(())
118//! }
119//!
120//! #[derive(Debug)]
121//! struct PrioritisedJob(String, u8);
122//!
123//! impl Job for PrioritisedJob {
124//! type Exclusion = NoExclusion;
125//!
126//! fn exclusion(&self) -> Self::Exclusion {
127//! NoExclusion
128//! }
129//!
130//! type Priority = u8;
131//!
132//! /// This Job is prioritied
133//! fn priority(&self) -> Self::Priority {
134//! self.1
135//! }
136//!
137//! fn execute(self) {
138//! std::thread::sleep(Duration::from_secs(1));
139//! println!("Completed job {:?}", self);
140//! }
141//! }
142//!
143//! ```
144//!
145//! ### Job merging
146//!
147//! Gracefully handle spikes in duplicate or overlapping jobs by automatically merging those jobs in the queue.
148//! Call [`Builder::enable_merge`].
149//!
150//! ```
151//! use gaffer::{Job, JobRunner, MergeResult, NoExclusion};
152//! use std::time::Duration;
153//!
154//! fn main() -> Result<(), Box<dyn std::error::Error>> {
155//! let runner = JobRunner::builder().enable_merge(merge_jobs).build(1);
156//!
157//! for i in 10..=50 {
158//! runner.send(MergeJob(format!("Job {}", i)))?;
159//! }
160//!
161//! println!("Jobs enqueued");
162//! std::thread::sleep(Duration::from_secs(7));
163//! Ok(())
164//! }
165//!
166//! #[derive(Debug)]
167//! struct MergeJob(String);
168//!
169//! impl Job for MergeJob {
170//! type Exclusion = NoExclusion;
171//!
172//! fn exclusion(&self) -> Self::Exclusion {
173//! NoExclusion
174//! }
175//!
176//! type Priority = ();
177//!
178//! fn priority(&self) -> Self::Priority {}
179//!
180//! fn execute(self) {
181//! std::thread::sleep(Duration::from_secs(1));
182//! println!("Completed job {:?}", self);
183//! }
184//! }
185//!
186//! fn merge_jobs(this: MergeJob, that: &mut MergeJob) -> MergeResult<MergeJob> {
187//! if this.0[..this.0.len() - 1] == that.0[..that.0.len() - 1] {
188//! that.0 = format!("{}x", &that.0[..that.0.len() - 1]);
189//! MergeResult::Success
190//! } else {
191//! MergeResult::NotMerged(this)
192//! }
193//! }
194//!
195//! ```
196//!
197//! ### Parallel execution
198//!
199//! Jobs can be run over multiple threads, just provide the number of threads to [`Builder::build`]
200//!
201//! ```
202//! fn main() -> Result<(), Box<dyn std::error::Error>> {
203//! let runner = gaffer::JobRunner::builder().build(10);
204//!
205//! for i in 1..=50 {
206//! let name = format!("WaitJob {}", i);
207//! runner.send(move || {
208//! std::thread::sleep(std::time::Duration::from_secs(1));
209//! println!("Completed job {:?}", name);
210//! })?;
211//! }
212//!
213//! println!("Jobs enqueued");
214//! std::thread::sleep(std::time::Duration::from_secs(7));
215//! Ok(())
216//! }
217//! ```
218//!
219//! ### Concurrent exclusion
220//!
221//! Exclusion keys can be provided to show which jobs need to be run exclusively
222//!
223//! ```
224//! use gaffer::{ExclusionOption, Job, JobRunner};
225//! use std::time::Duration;
226//!
227//! fn main() -> Result<(), Box<dyn std::error::Error>> {
228//! let runner = JobRunner::builder().build(2);
229//!
230//! for (i, exclusion) in (1..=10).zip([ExclusionOption::Some(1), ExclusionOption::Some(2), ExclusionOption::None].iter().cycle()) {
231//! runner.send(ExcludedJob(format!("Job {}", i), *exclusion))?;
232//! }
233//!
234//! println!("Jobs enqueued");
235//! std::thread::sleep(Duration::from_secs(7));
236//! Ok(())
237//! }
238//!
239//! #[derive(Debug)]
240//! struct ExcludedJob(String, ExclusionOption<u8>);
241//!
242//! impl Job for ExcludedJob {
243//! type Exclusion = ExclusionOption<u8>;
244//!
245//! fn exclusion(&self) -> Self::Exclusion {
246//! self.1
247//! }
248//!
249//! type Priority = ();
250//!
251//! fn priority(&self) -> Self::Priority {}
252//!
253//! fn execute(self) {
254//! std::thread::sleep(Duration::from_secs(1));
255//! println!("Completed job {:?}", self);
256//! }
257//! }
258//!
259//! ```
260//!
261//! ### Priority throttling
262//!
263//! Lower priority jobs can be restricted to less threads to reduce the load on system resources and encourage merging (if using).
264//!
265//! Use [`Builder::limit_concurrency`].
266//!
267//! ```
268//! use gaffer::{Job, JobRunner, NoExclusion};
269//! use std::time::Duration;
270//!
271//! fn main() -> Result<(), Box<dyn std::error::Error>> {
272//! let runner = JobRunner::builder()
273//! .limit_concurrency(|priority| (priority == 1).then(|| 1))
274//! .build(4);
275//!
276//! for (i, priority) in (1..=10).zip([1, 2].iter().cycle()) {
277//! runner.send(PrioritisedJob(format!("Job {}", i), *priority))?;
278//! }
279//!
280//! println!("Jobs enqueued");
281//! std::thread::sleep(Duration::from_secs(7));
282//! Ok(())
283//! }
284//!
285//! #[derive(Debug)]
286//! struct PrioritisedJob(String, u8);
287//!
288//! impl Job for PrioritisedJob {
289//! type Exclusion = NoExclusion;
290//!
291//! fn exclusion(&self) -> Self::Exclusion {
292//! NoExclusion
293//! }
294//!
295//! type Priority = u8;
296//!
297//! /// This Job is prioritied
298//! fn priority(&self) -> Self::Priority {
299//! self.1
300//! }
301//!
302//! fn execute(self) {
303//! std::thread::sleep(Duration::from_secs(1));
304//! println!("Completed job {:?}", self);
305//! }
306//! }
307//!
308//! ```
309//!
310//! ### Future jobs
311//!
312//! Use a [`future::Promise`] in the job to allow `await`ing job results in async code. When combined with merging, all the futures of the merged jobs will complete with clones of the single job which actually ran
313//!
314//! ```
315//! use gaffer::{
316//! future::{Promise, PromiseFuture},
317//! Job, JobRunner, MergeResult, NoExclusion,
318//! };
319//! use std::time::Duration;
320//!
321//! use futures::{executor::block_on, FutureExt, StreamExt};
322//!
323//! fn main() -> Result<(), Box<dyn std::error::Error>> {
324//! let runner = JobRunner::builder()
325//! .enable_merge(|this: ProcessString, that: &mut ProcessString| {
326//! if this.0[..this.0.len() - 1] == that.0[..that.0.len() - 1] {
327//! that.0 = format!("{}x", &that.0[..that.0.len() - 1]);
328//! that.1.merge(this.1);
329//! MergeResult::Success
330//! } else {
331//! MergeResult::NotMerged(this)
332//! }
333//! })
334//! .build(1);
335//!
336//! let mut futures: futures::stream::SelectAll<_> = (10..=50)
337//! .filter_map(|i| {
338//! ProcessString::new(format!("Job {}", i), &runner)
339//! .ok()
340//! .map(|f| f.into_stream())
341//! })
342//! .collect();
343//! println!("Jobs enqueued");
344//!
345//! block_on(async {
346//! while let Some(result) = futures.next().await {
347//! let processed_string = result.unwrap();
348//! println!(">> {}", processed_string);
349//! }
350//! });
351//! Ok(())
352//! }
353//!
354//! struct ProcessString(String, Promise<String>);
355//!
356//! impl ProcessString {
357//! fn new(
358//! name: String,
359//! runner: &JobRunner<ProcessString>,
360//! ) -> Result<PromiseFuture<String>, crossbeam_channel::SendError<ProcessString>> {
361//! let (promise, future) = Promise::new();
362//! runner.send(ProcessString(name, promise))?;
363//! Ok(future)
364//! }
365//! }
366//!
367//! impl Job for ProcessString {
368//! type Exclusion = NoExclusion;
369//!
370//! fn exclusion(&self) -> Self::Exclusion {
371//! NoExclusion
372//! }
373//!
374//! type Priority = ();
375//!
376//! fn priority(&self) -> Self::Priority {}
377//!
378//! fn execute(self) {
379//! println!("Processing job {}", self.0);
380//! std::thread::sleep(Duration::from_secs(1));
381//! self.1.fulfill(format!("Processed : [{}]", self.0));
382//! }
383//! }
384//! ```
385
386#![warn(missing_docs)]
387#![warn(rust_2018_idioms)]
388
389use parking_lot::Mutex;
390
391use std::{
392 fmt,
393 sync::Arc,
394 time::{Duration, Instant},
395};
396
397use runner::ConcurrencyLimitFn;
398pub use source::RecurrableJob;
399use source::{IntervalRecurringJob, RecurringJob, SourceManager};
400
401pub mod future;
402mod runner;
403mod source;
404
405/// Top level structure of the crate. Currently, recurring jobs would keep being scheduled once this is dropped, but that will probably change.
406///
407/// See crate level docs
408pub struct JobRunner<J> {
409 sender: crossbeam_channel::Sender<J>,
410}
411
412impl<J: Job + 'static> JobRunner<J> {
413 /// Create a Builder to start building a [`JobRunner`]
414 pub fn builder() -> Builder<J> {
415 Builder::new()
416 }
417
418 /// Send a job to the queue
419 pub fn send(&self, job: J) -> Result<(), crossbeam_channel::SendError<J>> {
420 self.sender.send(job)
421 }
422}
423
424impl<J> Clone for JobRunner<J> {
425 fn clone(&self) -> Self {
426 Self {
427 sender: self.sender.clone(),
428 }
429 }
430}
431
432/// Builder of [`JobRunner`]
433pub struct Builder<J: Job + 'static> {
434 concurrency_limit: Box<ConcurrencyLimitFn<J>>,
435 recurring: Vec<Box<dyn RecurringJob<J> + Send>>,
436 /// optional function to allow merging of jobs
437 merge_fn: Option<fn(J, &mut J) -> MergeResult<J>>,
438}
439
440impl<J: Job + Send + 'static> Builder<J> {
441 /// Start building a [`JobRunner`]
442 fn new() -> Self {
443 Builder {
444 concurrency_limit: Box::new(|_: <J as Prioritised>::Priority| None as Option<u8>),
445 recurring: vec![],
446 merge_fn: None,
447 }
448 }
449
450 /// Enable merging of Jobs in the queue, if a merge function is provided here, it will be tried with each job added to the queue against each job already in the queue
451 pub fn enable_merge(mut self, f: fn(J, &mut J) -> MergeResult<J>) -> Self {
452 self.merge_fn = Some(f);
453 self
454 }
455}
456
457impl<J: Job + Send + RecurrableJob + 'static> Builder<J> {
458 /// Set a job as recurring, the job will be enqueued every time `interval` passes since the `last_enqueue` of a matching job
459 pub fn set_recurring(mut self, interval: Duration, last_enqueue: Instant, job: J) -> Self {
460 self.recurring.push(Box::new(IntervalRecurringJob {
461 last_enqueue,
462 interval,
463 job,
464 }));
465 self
466 }
467}
468
469impl<J: Job + Send + 'static> Default for Builder<J> {
470 fn default() -> Self {
471 Self::new()
472 }
473}
474
475impl<J: Job + Send + 'static> Builder<J> {
476 /// Function determining, for each priority, how many threads can be allocated to jobs of this priority, any remaining threads will be left idle to service higher-priority jobs. `None` means parallelism won't be limited
477 pub fn limit_concurrency(
478 mut self,
479 concurrency_limit: impl Fn(<J as Job>::Priority) -> Option<u8> + Send + Sync + 'static,
480 ) -> Self {
481 self.concurrency_limit = Box::new(concurrency_limit);
482 self
483 }
484
485 /// Build the [`JobRunner`], spawning `thread_num` threads as workers
486 pub fn build(self, thread_num: usize) -> JobRunner<J> {
487 let (sender, sources) =
488 SourceManager::<J, Box<dyn RecurringJob<J> + Send>>::new_with_recurring(
489 self.recurring,
490 self.merge_fn,
491 );
492 let jobs = Arc::new(Mutex::new(sources));
493 let _threads = runner::spawn(thread_num, jobs, self.concurrency_limit);
494 JobRunner { sender }
495 }
496}
497
498/// A job which can be executed by the runner, with features to synchronise jobs that would interfere with each other and reduce the parallelisation of low priority jobs
499pub trait Job: Send {
500 /// Type used to check which jobs should not be allowed to run concurrently, see [`Job::exclusion()`]. Use [`NoExclusion`] for jobs which can always be run at the same time, see also [`ExclusionOption`].
501 type Exclusion: PartialEq + Copy + fmt::Debug + Send;
502
503 /// Used to check which jobs should not be allowed to run concurrently, if `<Job::Exclusion as PartialEq>::eq(job1.exclusion(), job2.exclusion())`, then `job1` and `job2` can't run at the same time.
504 fn exclusion(&self) -> Self::Exclusion;
505
506 /// Type of the priority, the higher prioritys are those which are larger based on [`Ord::cmp`].
507 type Priority: Ord + Copy + Send;
508
509 /// Get the priority of this thing
510 fn priority(&self) -> Self::Priority;
511
512 /// Execute and consume the job
513 fn execute(self);
514}
515
516/// A type that can be put in a priority queue, tells the queue which order the items should come out in, whether / how to merge them, and checking whether item's match
517trait Prioritised: Sized {
518 /// Type of the priority, the higher prioritys are those which are larger based on [`Ord::cmp`].
519 type Priority: Ord + Copy + Send;
520
521 /// Get the priority of this thing
522 fn priority(&self) -> Self::Priority;
523}
524
525impl<J: Job> Prioritised for J {
526 type Priority = <J as Job>::Priority;
527
528 fn priority(&self) -> Self::Priority {
529 <J as Job>::priority(self)
530 }
531}
532
533impl<T> Job for T
534where
535 T: FnOnce() + Send,
536{
537 type Exclusion = NoExclusion;
538
539 fn exclusion(&self) -> Self::Exclusion {
540 NoExclusion
541 }
542
543 type Priority = ();
544
545 fn priority(&self) -> Self::Priority {}
546
547 fn execute(self) {
548 (self)()
549 }
550}
551
552/// Result of an attempted merge, see [`Builder::enable_merge`]
553pub enum MergeResult<P> {
554 /// merge was sucessful, eg. either because the items are the same or one is a superset of the other
555 Success,
556 /// the attempted items were not suitable for merging
557 NotMerged(P),
558}
559
560/// Allows any jobs to run at the same time
561#[derive(Debug, Copy, Clone)]
562pub struct NoExclusion;
563
564impl PartialEq for NoExclusion {
565 fn eq(&self, _other: &Self) -> bool {
566 false
567 }
568}
569
570/// Allows some jobs to be run at the same time, others to acquire a keyed exclusive lock, and others to acquire a global exclusive lock
571#[derive(Debug, Copy, Clone)]
572pub enum ExclusionOption<T> {
573 /// This job excludes all others, it can only be run whilst all other workers are idle. NOTE: If the runner is busy this will have to wait until all jobs are finished
574 All,
575 /// This job excludes some other jobs which match `T`
576 Some(T),
577 /// This job excludes no other jobs and can run at any time
578 None,
579}
580
581impl<T: PartialEq> PartialEq for ExclusionOption<T> {
582 fn eq(&self, other: &Self) -> bool {
583 match (self, other) {
584 (ExclusionOption::Some(me), ExclusionOption::Some(other)) => me == other,
585 (ExclusionOption::All, _) => true,
586 (_, ExclusionOption::All) => true,
587 _ => false,
588 }
589 }
590}
591
592impl<T> From<Option<T>> for ExclusionOption<T> {
593 fn from(val: Option<T>) -> Self {
594 if let Some(val) = val {
595 ExclusionOption::Some(val)
596 } else {
597 ExclusionOption::None
598 }
599 }
600}
601
602impl<T> From<T> for ExclusionOption<T> {
603 fn from(val: T) -> Self {
604 ExclusionOption::Some(val)
605 }
606}