1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
//! `workctl` provides a set of higher-level abstractions for controlling
//! concurrent/parallel programs. These abstractions are especially focused on
//! the "controller/worker" paradigm, in which one or a few "controller"
//! threads determine what work needs to be done and use `WorkQueue`s and
//! `SyncFlag`s to communicate that to many "worker" threads.
//!
//! `workctl` is lower level than crates like [rayon](https://crates.io/crates/rayon),
//! but provides a more abstract interface than the primatives available in the
//! standard library.
//!
//!
//! # Examples
//!
//! Here is a typical example using a `WorkQueue`, a `SyncFlag`, and a `std::sync::mpsc`.
//! This is somewhat more complex than is required for processing a list of numbers, but
//! it illustrates the principle. When looking at this example, imagine that you might
//!
//! * have a mechanism by which some of the worker threads can add new work or,
//! * that the control thread (or another thread) expects to produce work _forever_,
//! as in a server, for instance.
//!
//! The `SyncFlag` can then be used at any future time to
//! gracefully shut down all the worker threads, e.g. when the controller gets
//! `SIGTERM`.
//!
//! ```
//! use std::thread;
//! use workctl::{WorkQueue, new_syncflag};
//!
//! // Create a new work queue to schedule pieces of work; in this case, i32s.
//! // The type annotation is not strictly needed.
//! let mut queue: WorkQueue<i32> = WorkQueue::new();
//!
//! // Create a channel for the worker threads to send messages back on.
//! use std::sync::mpsc::channel;
//! let (results_tx, results_rx) = channel();
//!
//! // Create a SyncFlag to share whether or not the worker threads should
//! // keep waiting on jobs.
//! let (mut more_jobs_tx, more_jobs_rx) = new_syncflag(true);
//!
//! // This Vec is just for the controller to keep track of the worker threads.
//! let mut thread_handles = Vec::new();
//!
//! // Spawn 4 workers.
//! for _ in 0..4 {
//! // Create clones of the various control mechanisms for the new thread.
//! let mut t_queue = queue.clone();
//! let t_results_tx = results_tx.clone();
//! let t_more_jobs = more_jobs_rx.clone();
//!
//! let handle = thread::spawn(move || {
//! // Loop until the controller says to stop.
//! while let Some(work_input) = t_queue.wait(&t_more_jobs) {
//! // Do some work. Totally contrived in this case.
//! let result = work_input % 1024;
//! // Send the results of the work to the main thread.
//! t_results_tx.send((work_input, result)).unwrap();
//! }
//! });
//!
//! // Add the handle to the vec of handles
//! thread_handles.push(handle);
//! }
//!
//! // Put some work in the queue.
//! let mut total_work = 0;
//! for _ in 0..10 {
//! queue.push_work(1023);
//! total_work += 1;
//! }
//!
//! for _ in 0..10 {
//! queue.push_work(1024);
//! total_work += 1;
//! }
//!
//!
//! // Now, receive all the results.
//! let mut results = Vec::new();
//! while total_work > 0 {
//! // In reality, you'd do something with these results.
//! let r = results_rx.recv().unwrap();
//! total_work -= 1;
//! results.push(r);
//! }
//!
//!
//!
//! // All the work is done, so tell the workers to stop looking for work.
//! more_jobs_tx.set(false);
//!
//! // Join all the threads.
//! for thread_handle in thread_handles {
//! thread_handle.join().unwrap();
//! }
//!
//! assert_eq!(results.len(), 20);
//! ```
pub use WorkQueue;
pub use new_syncflag;