Crate workerpool [−] [src]
A worker threadpool used to execute stateful functions in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.
A single Worker
runs in its own thread, to be implemented according to the trait:
pub trait Worker : Default { type Input: Send; type Output: Send; fn execute(&mut self, Self::Input) -> Self::Output; }
Examples
A worker is provided in workerpool::thunk
, a stateless ThunkWorker<T>
. It executes on
inputs of Thunk<T>
, effectively argumentless functions that are Sized + Send
. These
thunks are creates by wrapping functions which return T
with Thunk::of
.
use workerpool::Pool; use workerpool::thunk::{Thunk, ThunkWorker}; use std::sync::mpsc::channel; fn main() { let n_workers = 4; let n_jobs = 8; let pool = Pool::<ThunkWorker<i32>>::new(n_workers); let (tx, rx) = channel(); for i in 0..n_jobs { pool.execute_to(tx.clone(), Thunk::of(move || i * i)); } assert_eq!(140, rx.iter().take(n_jobs as usize).sum()); }
For stateful workers, you have to implement Worker
yourself.
Suppose there's a line-delimited process, such as cat
or tr
, which you'd
like running on many threads for use in a pool-like manner. You may create
and use a worker, with maintained state of the stdin/stdout for the process,
as follows:
use workerpool::{Worker, Pool}; use std::process::{Command, ChildStdin, ChildStdout, Stdio}; use std::io::prelude::*; use std::io::{self, BufReader}; use std::sync::mpsc::channel; struct LineDelimitedProcess { stdin: ChildStdin, stdout: BufReader<ChildStdout>, } impl Default for LineDelimitedProcess { fn default() -> Self { let child = Command::new("cat") .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()) .spawn() .unwrap(); Self { stdin: child.stdin.unwrap(), stdout: BufReader::new(child.stdout.unwrap()), } } } impl Worker for LineDelimitedProcess { type Input = Box<[u8]>; type Output = io::Result<String>; fn execute(&mut self, inp: Self::Input) -> Self::Output { self.stdin.write_all(&*inp)?; self.stdin.write_all(b"\n")?; self.stdin.flush()?; let mut s = String::new(); self.stdout.read_line(&mut s)?; s.pop(); // exclude newline Ok(s) } } let n_workers = 4; let n_jobs = 8; let pool = Pool::<LineDelimitedProcess>::new(n_workers); let (tx, rx) = channel(); for i in 0..n_jobs { let inp = Box::new([97 + i]); pool.execute_to(tx.clone(), inp); } // output is a permutation of "abcdefgh" let mut output = rx.iter() .take(n_jobs as usize) .fold(String::new(), |mut a, b| { a.push_str(&b.unwrap()); a }) .into_bytes(); output.sort(); assert_eq!(output, b"abcdefgh");
Modules
thunk |
Provides a |
Structs
Builder |
|
Pool |
Abstraction of a thread pool for basic parallelism. |
Traits
Worker |
Abstraction of a worker which executes tasks in its own thread. |