1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
//! `workctl` provides a set of higher-level abstractions for controlling //! concurrent/parallel programs. These abstractions are especially focused on //! the "controller/worker" paradigm, in which one or a few "controller" //! threads determine what work needs to be done and use `WorkQueue`s and //! `SyncFlag`s to communicate that to many "worker" threads. //! //! `workctl` is lower level than crates like [rayon](https://crates.io/crates/rayon), //! but provides a more abstract interface than the primatives available in the //! standard library. //! //! //! # Examples //! //! Here is a typical example using a `WorkQueue`, a `SyncFlag`, and a `std::sync::mpsc`. //! This is somewhat more complex than is required for processing a list of numbers, but //! it illustrates the principle. When looking at this example, imagine that you might //! //! * have a mechanism by which some of the worker threads can add new work or, //! * that the control thread (or another thread) expects to produce work _forever_, //! as in a server, for instance. //! //! The `SyncFlag` can then be used at any future time to //! gracefully shut down all the worker threads, e.g. when the controller gets //! `SIGTERM`. //! //! ``` //! use std::thread; //! use workctl::{WorkQueue, new_syncflag}; //! //! // Create a new work queue to schedule pieces of work; in this case, i32s. //! // The type annotation is not strictly needed. //! let mut queue: WorkQueue<i32> = WorkQueue::new(); //! //! // Create a channel for the worker threads to send messages back on. //! use std::sync::mpsc::channel; //! let (results_tx, results_rx) = channel(); //! //! // Create a SyncFlag to share whether or not the worker threads should //! // keep waiting on jobs. //! let (mut more_jobs_tx, more_jobs_rx) = new_syncflag(true); //! //! // This Vec is just for the controller to keep track of the worker threads. //! let mut thread_handles = Vec::new(); //! //! // Spawn 4 workers. //! for _ in 0..4 { //! // Create clones of the various control mechanisms for the new thread. //! let mut t_queue = queue.clone(); //! let t_results_tx = results_tx.clone(); //! let t_more_jobs = more_jobs_rx.clone(); //! //! let handle = thread::spawn(move || { //! // Loop until the controller says to stop. //! while let Some(work_input) = t_queue.wait(&t_more_jobs) { //! // Do some work. Totally contrived in this case. //! let result = work_input % 1024; //! // Send the results of the work to the main thread. //! t_results_tx.send((work_input, result)).unwrap(); //! } //! }); //! //! // Add the handle to the vec of handles //! thread_handles.push(handle); //! } //! //! // Put some work in the queue. //! let mut total_work = 0; //! for _ in 0..10 { //! queue.push_work(1023); //! total_work += 1; //! } //! //! for _ in 0..10 { //! queue.push_work(1024); //! total_work += 1; //! } //! //! //! // Now, receive all the results. //! let mut results = Vec::new(); //! while total_work > 0 { //! // In reality, you'd do something with these results. //! let r = results_rx.recv().unwrap(); //! total_work -= 1; //! results.push(r); //! } //! //! //! //! // All the work is done, so tell the workers to stop looking for work. //! more_jobs_tx.set(false); //! //! // Join all the threads. //! for thread_handle in thread_handles { //! thread_handle.join().unwrap(); //! } //! //! assert_eq!(results.len(), 20); //! ``` pub mod work_queue; pub use work_queue::WorkQueue; pub mod sync_flag; pub use sync_flag::new_syncflag;