Crate workctl[][src]

workctl provides a set of higher-level abstractions for controlling concurrent/parallel programs. These abstractions are especially focused on the “controller/worker” paradigm, in which one or a few “controller” threads determine what work needs to be done and use WorkQueues and SyncFlags to communicate that to many “worker” threads.

workctl is lower level than crates like rayon, but provides a more abstract interface than the primatives available in the standard library.

Examples

Here is a typical example using a WorkQueue, a SyncFlag, and a std::sync::mpsc. This is somewhat more complex than is required for processing a list of numbers, but it illustrates the principle. When looking at this example, imagine that you might

  • have a mechanism by which some of the worker threads can add new work or,
  • that the control thread (or another thread) expects to produce work forever, as in a server, for instance.

The SyncFlag can then be used at any future time to gracefully shut down all the worker threads, e.g. when the controller gets SIGTERM.

use std::thread;
use workctl::{WorkQueue, new_syncflag};

// Create a new work queue to schedule pieces of work; in this case, i32s.
// The type annotation is not strictly needed.
let mut queue: WorkQueue<i32> = WorkQueue::new();

// Create a channel for the worker threads to send messages back on.
use std::sync::mpsc::channel;
let (results_tx, results_rx) = channel();

// Create a SyncFlag to share whether or not the worker threads should
// keep waiting on jobs.
let (mut more_jobs_tx, more_jobs_rx) = new_syncflag(true);

// This Vec is just for the controller to keep track of the worker threads.
let mut thread_handles = Vec::new();

// Spawn 4 workers.
for _ in 0..4 {
    // Create clones of the various control mechanisms for the new thread.
    let mut t_queue = queue.clone();
    let t_results_tx = results_tx.clone();
    let t_more_jobs = more_jobs_rx.clone();

    let handle = thread::spawn(move || {
        // Loop until the controller says to stop.
        while let Some(work_input) = t_queue.wait(&t_more_jobs) {
            // Do some work. Totally contrived in this case.
            let result = work_input % 1024;
            // Send the results of the work to the main thread.
            t_results_tx.send((work_input, result)).unwrap();
        }
    });

    // Add the handle to the vec of handles
    thread_handles.push(handle);
}

// Put some work in the queue.
let mut total_work = 0;
for _ in 0..10 {
    queue.push_work(1023);
    total_work += 1;
}

for _ in 0..10 {
    queue.push_work(1024);
    total_work += 1;
}


// Now, receive all the results.
let mut results = Vec::new();
while total_work > 0 {
    // In reality, you'd do something with these results.
    let r = results_rx.recv().unwrap();
    total_work -= 1;
    results.push(r);
}



// All the work is done, so tell the workers to stop looking for work.
more_jobs_tx.set(false);

// Join all the threads.
for thread_handle in thread_handles {
    thread_handle.join().unwrap();
}

assert_eq!(results.len(), 20);

Re-exports

pub use work_queue::WorkQueue;
pub use sync_flag::new_syncflag;

Modules

sync_flag

SyncFlags, or syncronization flags, are one-way Boolean values that can be shared across threads, sending messages from a single producer to any number of consumers.

work_queue

WorkQueue is a purely safe work queue, suitable for fairly distributing work to any number of worker threads from any number of controllers.