command_executor/lib.rs
1//! Thread pool implementation in Rust
2//!
3//! Command Executor is a multiple producer/multiple consumer thread pool implementation in Rust
4//! that provides an interface for concurrent command execution with backpressure. Main design
5//! goals of this implementation are:
6//! * control of memory consumption - achieved through use of a bounded blocking queue
7//! * indication of workload completion - achieved through a shutdown "after all work is complete"
8//! * maintain a predictable models of the execution - commands are submitted for execution in FIFO order with
9//! backpressure until all work is done. Note that the actual order of execution is depending on
10//! the system scheduler, however, you can assume that the `n + 1` command will be dequeued by an
11//! executing thread after 'n' command was dequeued.
12//!
13//! # Use cases
14//! The use case for this approach to parallelism is when handling very large data sets that cannot fit
15//! into memory, or when the memory consumption must be within specified bounds. When data set can
16//! fit into memory or the memory consumption is not an issue almost always better results would be
17//! achieved using [Rayon](https://crates.io/crates/rayon) or other data parallelism libraries that
18//! use non blocking queues.
19//! Example for such an use case was to convert a large and very dense Protobuf file - about
20//! 67 GB of sorted data - into about 1 TB of pg_dump files ready to load into Postgresql
21//! database. Using this library it was possible to create a pipeline that parallelized the
22//! Protobuf decoding and pg_dump encoding and finally wrote the merged result while maintaining
23//! the initial order and keeping the memory within bounds.
24//!
25//! # Issues
26//! Issues are welcome and appreciated. Please submit to https://github.com/navigatorsguild/command-executor/issues
27//!
28//! # Benchmarks
29//! [Benchmarks](https://github.com/navigatorsguild/command-executor/wiki/Benchmarks) generated by
30//! [benchmark-rs](https://crates.io/crates/benchmark-rs)
31//!
32//! 
33//!
34//! # Examples
35//!
36//! Submit commands for execution and wait for completion
37//! ```
38//!use std::thread;
39//!use std::time::Duration;
40//!use command_executor::command::Command;
41//!use command_executor::shutdown_mode::ShutdownMode;
42//!use command_executor::thread_pool_builder::ThreadPoolBuilder;
43//!
44//!struct ExampleCommand {
45//! payload: i32,
46//!}
47//!
48//!impl ExampleCommand {
49//! pub fn new(payload: i32) -> ExampleCommand {
50//! ExampleCommand {
51//! payload,
52//! }
53//! }
54//!}
55//!
56//!impl Command for ExampleCommand {
57//! fn execute(&self) -> Result<(), anyhow::Error> {
58//! println!("processing {} in {}", self.payload, thread::current().name().unwrap_or("unnamed"));
59//! thread::sleep(Duration::from_millis(10));
60//! Ok(())
61//! }
62//!}
63//!
64//!pub fn main() -> Result<(), anyhow::Error> {
65//! let mut thread_pool_builder = ThreadPoolBuilder::new();
66//! let mut tp = thread_pool_builder
67//! .with_name("example".to_string())
68//! .with_tasks(4)
69//! .with_queue_size(16)
70//! .with_shutdown_mode(ShutdownMode::CompletePending)
71//! .build()
72//! .unwrap();
73//!
74//! for i in 0..16 {
75//! tp.submit(Box::new(ExampleCommand::new(i)));
76//! }
77//!
78//! tp.shutdown();
79//! tp.join()
80//!}
81//! ```
82//!
83//! Install a thread local value in all threads of the thread pool
84//! ```
85//!use std::thread;
86//!use std::time::Duration;
87//!use std::cell::RefCell;
88//!use command_executor::command::Command;
89//!use command_executor::shutdown_mode::ShutdownMode;
90//!use command_executor::thread_pool_builder::ThreadPoolBuilder;
91//!
92//!#[derive(Clone)]
93//!struct Config {
94//! sleep_time: u64,
95//!}
96//!
97//!impl Default for Config {
98//! fn default() -> Self {
99//! Config {
100//! sleep_time: 1,
101//! }
102//! }
103//!}
104//!
105//!thread_local! {
106//! static THREAD_LOCAL_CONFIG: RefCell<Option<Config>> = RefCell::new(None);
107//!}
108//!
109//!struct ThreadLocalExampleCommand {
110//! payload: i32,
111//!}
112//!
113//!impl ThreadLocalExampleCommand {
114//! pub fn new(payload: i32) -> ThreadLocalExampleCommand {
115//! ThreadLocalExampleCommand { payload }
116//! }
117//!}
118//!
119//!impl Command for ThreadLocalExampleCommand {
120//! fn execute(&self) -> Result<(), anyhow::Error> {
121//! THREAD_LOCAL_CONFIG.with(
122//! |config| {
123//! let sleep_time = config.borrow().as_ref().unwrap().sleep_time;
124//! thread::sleep(Duration::from_millis(sleep_time));
125//! println!(
126//! "processing {} in {}",
127//! self.payload,
128//! thread::current().name().unwrap_or("unnamed")
129//! );
130//! }
131//! );
132//! Ok(())
133//! }
134//!}
135//!
136//!fn main() -> Result<(), anyhow::Error> {
137//! let mut thread_pool_builder = ThreadPoolBuilder::new();
138//! let mut tp = thread_pool_builder
139//! .with_name("example".to_string())
140//! .with_tasks(4)
141//! .with_queue_size(16)
142//! .with_shutdown_mode(ShutdownMode::CompletePending)
143//! .build()
144//! .unwrap();
145//!
146//! tp.set_thread_local(&THREAD_LOCAL_CONFIG, Some(Config::default()));
147//!
148//! for i in 0..16 {
149//! tp.submit(Box::new(ThreadLocalExampleCommand::new(i)));
150//! }
151//!
152//! tp.shutdown();
153//! tp.join()
154//!}
155//! ```
156//!
157
158pub mod blocking_queue;
159pub mod thread_pool;
160pub mod thread_pool_builder;
161pub mod shutdown_mode;
162pub mod command;
163pub mod queue_type;
164pub mod crossbeam_blocking_queue;
165pub mod blocking_queue_adapter;