workerpool 1.0.2

A thread pool for running a number of jobs on a fixed set of stateful worker threads.
Documentation

workerpool

doc.rs

A worker threadpool used to execute a number of jabs atop stateful workers in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.

A single Worker runs in its own thread, to be implemented according to the trait:

pub trait Worker {
    type Input: Send;
    type Output: Send;

    fn new() -> Self;
    fn execute(&mut self, Self::Input) -> Self::Output;
}

Usage

[dependencies]
workerpool = "1.0"

This crate provides Pool<W> where W: Worker. With a pool, there are four primary functions of interest:

  • Pool::<MyWorker>::new(n_threads) creates a new pool for a particular Worker.
  • pool.execute(inp) non-blocking executes the worker and ignores the return value.
  • pool.execute_to(tx, inp) non-blocking executes the worker and sends return value to the given Sender.
  • pool.join() blocking waits for all tasks (from execute and execute_to) to complete.

A worker is provided in workerpool::thunk, a stateless ThunkWorker<T>. It executes on inputs of Thunk<T>, effectively argumentless functions that are Sized + Send. These thunks are creates by wrapping functions which return T with Thunk::of.

extern crate workerpool;

use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<ThunkWorker<i32>>::new(n_workers);
    
    let (tx, rx) = channel();
    for _ in 0..n_jobs {
        pool.execute_to(tx.clone(), Thunk::of(|| 1i32));
    }
    
    assert_eq!(8, rx.iter().take(n_jobs).fold(0, |a, b| a + b));
}

For stateful workers, you have to implement Worker yourself.

Suppose there's a line-delimited process, such as cat or tr, which you'd like running on many threads for use in a pool-like manner. You may create and use a worker, with maintained state of the stdin/stdout for the process, as follows:

extern crate workerpool;

use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;

struct LineDelimitedProcess {
    stdin: ChildStdin,
    stdout: BufReader<ChildStdout>,
}
impl Worker for LineDelimitedProcess {
    type Input = Box<[u8]>;
    type Output = io::Result<String>;

    fn new() -> Self {
        let child = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .unwrap();
        Self {
            stdin: child.stdin.unwrap(),
            stdout: BufReader::new(child.stdout.unwrap()),
        }
    }
    fn execute(&mut self, inp: Self::Input) -> Self::Output {
        self.stdin.write_all(&*inp)?;
        self.stdin.write_all(b"\n")?;
        self.stdin.flush()?;
        let mut s = String::new();
        self.stdout.read_line(&mut s)?;
        s.pop(); // exclude newline
        Ok(s)
    }
}

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<LineDelimitedProcess>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        let inp = Box::new([97 + i]);
        pool.execute_to(tx.clone(), inp);
    }
    
    // output is a permutation of "abcdefgh"
    let mut output = rx.iter()
        .take(n_jobs as usize)
        .fold(String::new(), |mut a, b| {
            a.push_str(&b.unwrap());
            a
        })
        .into_bytes();
    output.sort();
    assert_eq!(output, b"abcdefgh");
}

Similar libraries

License

This work is derivative of threadpool.

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.