Crate workerpool [] [src]

A worker threadpool used to execute stateful functions in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.

A single Worker runs in its own thread, to be implemented according to the trait:

Be careful when using this code, it's not being tested!
pub trait Worker {
    type Input: Send;
    type Output: Send;

    fn new() -> Self;
    fn execute(&mut self, Self::Input) -> Self::Output;
}

Examples

A worker is provided in workerpool::thunk, a stateless ThunkWorker<T>. It executes on inputs of Thunk<T>, effectively argumentless functions that are Sized + Send. These thunks are creates by wrapping functions which return T with Thunk::of.

use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<ThunkWorker<i32>>::new(n_workers);

    let (tx, rx) = channel();
    for _ in 0..n_jobs {
        pool.execute_to(tx.clone(), Thunk::of(|| 1i32));
    }

    assert_eq!(8, rx.iter().take(n_jobs).fold(0, |a, b| a + b));
}

For stateful workers, you have to implement Worker yourself.

Suppose there's a line-delimited process, such as cat or tr, which you'd like running on many threads for use in a pool-like manner. You may create and use a worker, with maintained state of the stdin/stdout for the process, as follows:

use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;

struct LineDelimitedProcess {
    stdin: ChildStdin,
    stdout: BufReader<ChildStdout>,
}
impl Worker for LineDelimitedProcess {
    type Input = Box<[u8]>;
    type Output = io::Result<String>;

    fn new() -> Self {
        let child = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .unwrap();
        Self {
            stdin: child.stdin.unwrap(),
            stdout: BufReader::new(child.stdout.unwrap()),
        }
    }
    fn execute(&mut self, inp: Self::Input) -> Self::Output {
        self.stdin.write_all(&*inp)?;
        self.stdin.write_all(b"\n")?;
        self.stdin.flush()?;
        let mut s = String::new();
        self.stdout.read_line(&mut s)?;
        s.pop(); // exclude newline
        Ok(s)
    }
}

let n_workers = 4;
let n_jobs = 8;
let pool = Pool::<LineDelimitedProcess>::new(n_workers);

let (tx, rx) = channel();
for i in 0..n_jobs {
    let inp = Box::new([97 + i]);
    pool.execute_to(tx.clone(), inp);
}

// output is a permutation of "abcdefgh"
let mut output = rx.iter()
    .take(n_jobs as usize)
    .fold(String::new(), |mut a, b| {
        a.push_str(&b.unwrap());
        a
    })
    .into_bytes();
output.sort();
assert_eq!(output, b"abcdefgh");

Modules

thunk

Provides a Worker for simple stateless functions that have no arguments.

Structs

Builder

Pool factory, which can be used in order to configure the properties of the Pool.

Pool

Abstraction of a thread pool for basic parallelism.

Traits

Worker

Abstraction of a worker which executes tasks in its own thread.