workctl/lib.rs
1//! `workctl` provides a set of higher-level abstractions for controlling
2//! concurrent/parallel programs. These abstractions are especially focused on
3//! the "controller/worker" paradigm, in which one or a few "controller"
4//! threads determine what work needs to be done and use `WorkQueue`s and
5//! `SyncFlag`s to communicate that to many "worker" threads.
6//!
7//! `workctl` is lower level than crates like [rayon](https://crates.io/crates/rayon),
8//! but provides a more abstract interface than the primatives available in the
9//! standard library.
10//!
11//!
12//! # Examples
13//!
14//! Here is a typical example using a `WorkQueue`, a `SyncFlag`, and a `std::sync::mpsc`.
15//! This is somewhat more complex than is required for processing a list of numbers, but
16//! it illustrates the principle. When looking at this example, imagine that you might
17//!
18//! * have a mechanism by which some of the worker threads can add new work or,
19//! * that the control thread (or another thread) expects to produce work _forever_,
20//! as in a server, for instance.
21//!
22//! The `SyncFlag` can then be used at any future time to
23//! gracefully shut down all the worker threads, e.g. when the controller gets
24//! `SIGTERM`.
25//!
26//! ```
27//! use std::thread;
28//! use workctl::{WorkQueue, new_syncflag};
29//!
30//! // Create a new work queue to schedule pieces of work; in this case, i32s.
31//! // The type annotation is not strictly needed.
32//! let mut queue: WorkQueue<i32> = WorkQueue::new();
33//!
34//! // Create a channel for the worker threads to send messages back on.
35//! use std::sync::mpsc::channel;
36//! let (results_tx, results_rx) = channel();
37//!
38//! // Create a SyncFlag to share whether or not the worker threads should
39//! // keep waiting on jobs.
40//! let (mut more_jobs_tx, more_jobs_rx) = new_syncflag(true);
41//!
42//! // This Vec is just for the controller to keep track of the worker threads.
43//! let mut thread_handles = Vec::new();
44//!
45//! // Spawn 4 workers.
46//! for _ in 0..4 {
47//! // Create clones of the various control mechanisms for the new thread.
48//! let mut t_queue = queue.clone();
49//! let t_results_tx = results_tx.clone();
50//! let t_more_jobs = more_jobs_rx.clone();
51//!
52//! let handle = thread::spawn(move || {
53//! // Loop until the controller says to stop.
54//! while let Some(work_input) = t_queue.wait(&t_more_jobs) {
55//! // Do some work. Totally contrived in this case.
56//! let result = work_input % 1024;
57//! // Send the results of the work to the main thread.
58//! t_results_tx.send((work_input, result)).unwrap();
59//! }
60//! });
61//!
62//! // Add the handle to the vec of handles
63//! thread_handles.push(handle);
64//! }
65//!
66//! // Put some work in the queue.
67//! let mut total_work = 0;
68//! for _ in 0..10 {
69//! queue.push_work(1023);
70//! total_work += 1;
71//! }
72//!
73//! for _ in 0..10 {
74//! queue.push_work(1024);
75//! total_work += 1;
76//! }
77//!
78//!
79//! // Now, receive all the results.
80//! let mut results = Vec::new();
81//! while total_work > 0 {
82//! // In reality, you'd do something with these results.
83//! let r = results_rx.recv().unwrap();
84//! total_work -= 1;
85//! results.push(r);
86//! }
87//!
88//!
89//!
90//! // All the work is done, so tell the workers to stop looking for work.
91//! more_jobs_tx.set(false);
92//!
93//! // Join all the threads.
94//! for thread_handle in thread_handles {
95//! thread_handle.join().unwrap();
96//! }
97//!
98//! assert_eq!(results.len(), 20);
99//! ```
100pub mod work_queue;
101pub use work_queue::WorkQueue;
102
103pub mod sync_flag;
104pub use sync_flag::new_syncflag;