command_executor/
lib.rs

1//! Thread pool implementation in Rust
2//!
3//! Command Executor is a multiple producer/multiple consumer thread pool implementation in Rust
4//! that provides an interface for concurrent command execution with backpressure. Main design
5//! goals of this implementation are:
6//! * control of memory consumption - achieved through use of a bounded blocking queue
7//! * indication of workload completion - achieved through a shutdown "after all work is complete"
8//! * maintain a predictable models of the execution - commands are submitted for execution in FIFO order with
9//! backpressure until all work is done. Note that the actual order of execution is depending on
10//! the system scheduler, however, you can assume that the `n + 1` command will be dequeued by an
11//! executing thread after 'n' command was dequeued.
12//!
13//! # Use cases
14//! The use case for this approach to parallelism is when handling very large data sets that cannot fit
15//! into memory, or when the memory consumption must be within specified bounds. When data set can
16//! fit into memory or the memory consumption is not an issue almost always better results would be
17//! achieved using [Rayon](https://crates.io/crates/rayon) or other data parallelism libraries that
18//! use non blocking queues.
19//! Example for such an use case was to convert a large and very dense Protobuf file - about
20//! 67 GB of sorted data - into about 1 TB of pg_dump files ready to load into Postgresql
21//! database. Using this library it was possible to create a pipeline that parallelized the
22//! Protobuf decoding and pg_dump encoding and finally wrote the merged result while maintaining
23//! the initial order and keeping the memory within bounds.
24//!
25//! # Issues
26//! Issues are welcome and appreciated. Please submit to https://github.com/navigatorsguild/command-executor/issues
27//!
28//! # Benchmarks
29//! [Benchmarks](https://github.com/navigatorsguild/command-executor/wiki/Benchmarks) generated by
30//! [benchmark-rs](https://crates.io/crates/benchmark-rs)
31//!
32//! ![link](https://user-images.githubusercontent.com/122003456/235414598-727d804a-b8ad-4520-871b-5fd8be33bf44.png)
33//!
34//! # Examples
35//!
36//! Submit commands for execution and wait for completion
37//! ```
38//!use std::thread;
39//!use std::time::Duration;
40//!use command_executor::command::Command;
41//!use command_executor::shutdown_mode::ShutdownMode;
42//!use command_executor::thread_pool_builder::ThreadPoolBuilder;
43//!
44//!struct ExampleCommand {
45//!    payload: i32,
46//!}
47//!
48//!impl ExampleCommand {
49//!    pub fn new(payload: i32) -> ExampleCommand {
50//!        ExampleCommand {
51//!            payload,
52//!        }
53//!    }
54//!}
55//!
56//!impl Command for ExampleCommand {
57//!    fn execute(&self) -> Result<(), anyhow::Error> {
58//!        println!("processing {} in {}", self.payload, thread::current().name().unwrap_or("unnamed"));
59//!        thread::sleep(Duration::from_millis(10));
60//!        Ok(())
61//!    }
62//!}
63//!
64//!pub fn main() -> Result<(), anyhow::Error> {
65//!    let mut thread_pool_builder = ThreadPoolBuilder::new();
66//!    let mut tp = thread_pool_builder
67//!        .with_name("example".to_string())
68//!        .with_tasks(4)
69//!        .with_queue_size(16)
70//!        .with_shutdown_mode(ShutdownMode::CompletePending)
71//!        .build()
72//!        .unwrap();
73//!
74//!    for i in 0..16 {
75//!        tp.submit(Box::new(ExampleCommand::new(i)));
76//!    }
77//!
78//!    tp.shutdown();
79//!    tp.join()
80//!}
81//! ```
82//!
83//! Install a thread local value in all threads of the thread pool
84//! ```
85//!use std::thread;
86//!use std::time::Duration;
87//!use std::cell::RefCell;
88//!use command_executor::command::Command;
89//!use command_executor::shutdown_mode::ShutdownMode;
90//!use command_executor::thread_pool_builder::ThreadPoolBuilder;
91//!
92//!#[derive(Clone)]
93//!struct Config {
94//!    sleep_time: u64,
95//!}
96//!
97//!impl Default for Config {
98//!    fn default() -> Self {
99//!        Config {
100//!            sleep_time: 1,
101//!        }
102//!    }
103//!}
104//!
105//!thread_local! {
106//!    static THREAD_LOCAL_CONFIG: RefCell<Option<Config>> = RefCell::new(None);
107//!}
108//!
109//!struct ThreadLocalExampleCommand {
110//!    payload: i32,
111//!}
112//!
113//!impl ThreadLocalExampleCommand {
114//!    pub fn new(payload: i32) -> ThreadLocalExampleCommand {
115//!        ThreadLocalExampleCommand { payload }
116//!    }
117//!}
118//!
119//!impl Command for ThreadLocalExampleCommand {
120//!    fn execute(&self) -> Result<(), anyhow::Error> {
121//!        THREAD_LOCAL_CONFIG.with(
122//!            |config| {
123//!                let sleep_time = config.borrow().as_ref().unwrap().sleep_time;
124//!                thread::sleep(Duration::from_millis(sleep_time));
125//!                println!(
126//!                    "processing {} in {}",
127//!                    self.payload,
128//!                    thread::current().name().unwrap_or("unnamed")
129//!                );
130//!            }
131//!        );
132//!        Ok(())
133//!    }
134//!}
135//!
136//!fn main() -> Result<(), anyhow::Error> {
137//!    let mut thread_pool_builder = ThreadPoolBuilder::new();
138//!    let mut tp = thread_pool_builder
139//!        .with_name("example".to_string())
140//!        .with_tasks(4)
141//!        .with_queue_size(16)
142//!        .with_shutdown_mode(ShutdownMode::CompletePending)
143//!        .build()
144//!        .unwrap();
145//!
146//!    tp.set_thread_local(&THREAD_LOCAL_CONFIG, Some(Config::default()));
147//!
148//!    for i in 0..16 {
149//!        tp.submit(Box::new(ThreadLocalExampleCommand::new(i)));
150//!    }
151//!
152//!    tp.shutdown();
153//!    tp.join()
154//!}
155//! ```
156//!
157
158pub mod blocking_queue;
159pub mod thread_pool;
160pub mod thread_pool_builder;
161pub mod shutdown_mode;
162pub mod command;
163pub mod queue_type;
164pub mod crossbeam_blocking_queue;
165pub mod blocking_queue_adapter;