1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//! Thread pool implementation in Rust
//!
//! Command Executor is a multiple producer/multiple consumer thread pool implementation in Rust
//! that provides an interface for concurrent command execution with backpressure. Main design
//! goals of this implementation are:
//! * control of memory consumption - achieved through use of a bounded blocking queue
//! * indication of workload completion - achieved through a shutdown "after all work is complete"
//! * maintain a predictable models of the execution - commands are submitted for execution in FIFO order with
//! backpressure until all work is done. Note that the actual order of execution is depending on
//! the system scheduler, however, you can assume that the `n + 1` command will be dequeued by an
//! executing thread after 'n' command was dequeued.
//!
//! # Use cases
//! The use case for this approach to parallelism is when handling very large data sets that cannot fit
//! into memory, or when the memory consumption must be within specified bounds. When data set can
//! fit into memory or the memory consumption is not an issue almost always better results would be
//! achieved using [Rayon](https://crates.io/crates/rayon) or other data parallelism libraries that
//! use non blocking queues.
//! Example for such an use case was to convert a large and very dense Protobuf file - about
//! 67 GB of sorted data - into about 1 TB of pg_dump files ready to load into Postgresql
//! database. Using this library it was possible to create a pipeline that parallelized the
//! Protobuf decoding and pg_dump encoding and finally wrote the merged result while maintaining
//! the initial order and keeping the memory within bounds.
//!
//! # Issues
//! Issues are welcome and appreciated. Please submit to https://github.com/navigatorsguild/command-executor/issues
//!
//! # Benchmarks
//! [Benchmakrs](https://github.com/navigatorsguild/command-executor/wiki/Benchmarks) generated by
//! [benchmark-rs](https://crates.io/crates/benchmark-rs)
//!
//! # Examples
//!
//! Submit commands for execution and wait for completion
//! ```
//!use std::thread;
//!use std::time::Duration;
//!use command_executor::command::Command;
//!use command_executor::shutdown_mode::ShutdownMode;
//!use command_executor::thread_pool_builder::ThreadPoolBuilder;
//!
//!struct ExampleCommand {
//!    payload: i32,
//!}
//!
//!impl ExampleCommand {
//!    pub fn new(payload: i32) -> ExampleCommand {
//!        ExampleCommand {
//!            payload,
//!        }
//!    }
//!}
//!
//!impl Command for ExampleCommand {
//!    fn execute(&self) -> Result<(), anyhow::Error> {
//!        println!("processing {} in {}", self.payload, thread::current().name().unwrap_or("unnamed"));
//!        thread::sleep(Duration::from_millis(10));
//!        Ok(())
//!    }
//!}
//!
//!pub fn main() -> Result<(), anyhow::Error> {
//!    let mut thread_pool_builder = ThreadPoolBuilder::new();
//!    let mut tp = thread_pool_builder
//!        .name("example".to_string())
//!        .tasks(4)
//!        .queue_size(16)
//!        .shutdown_mode(ShutdownMode::CompletePending)
//!        .build()
//!        .unwrap();
//!
//!    for i in 0..16 {
//!        tp.submit(Box::new(ExampleCommand::new(i)));
//!    }
//!
//!    tp.shutdown();
//!    tp.join()
//!}
//! ```
//!
//! Install a thread local value in all threads of the thread pool
//! ```
//!use std::thread;
//!use std::time::Duration;
//!use std::cell::RefCell;
//!use command_executor::command::Command;
//!use command_executor::shutdown_mode::ShutdownMode;
//!use command_executor::thread_pool_builder::ThreadPoolBuilder;
//!
//!#[derive(Clone)]
//!struct Config {
//!    sleep_time: u64,
//!}
//!
//!impl Default for Config {
//!    fn default() -> Self {
//!        Config {
//!            sleep_time: 1,
//!        }
//!    }
//!}
//!
//!thread_local! {
//!    static THREAD_LOCAL_CONFIG: RefCell<Option<Config>> = RefCell::new(None);
//!}
//!
//!struct ThreadLocalExampleCommand {
//!    payload: i32,
//!}
//!
//!impl ThreadLocalExampleCommand {
//!    pub fn new(payload: i32) -> ThreadLocalExampleCommand {
//!        ThreadLocalExampleCommand { payload }
//!    }
//!}
//!
//!impl Command for ThreadLocalExampleCommand {
//!    fn execute(&self) -> Result<(), anyhow::Error> {
//!        THREAD_LOCAL_CONFIG.with(
//!            |config| {
//!                let sleep_time = config.borrow().as_ref().unwrap().sleep_time;
//!                thread::sleep(Duration::from_millis(sleep_time));
//!                println!(
//!                    "processing {} in {}",
//!                    self.payload,
//!                    thread::current().name().unwrap_or("unnamed")
//!                );
//!            }
//!        );
//!        Ok(())
//!    }
//!}
//!
//!fn main() -> Result<(), anyhow::Error> {
//!    let mut thread_pool_builder = ThreadPoolBuilder::new();
//!    let mut tp = thread_pool_builder
//!        .name("example".to_string())
//!        .tasks(4)
//!        .queue_size(16)
//!        .shutdown_mode(ShutdownMode::CompletePending)
//!        .build()
//!        .unwrap();
//!
//!    tp.set_thread_local(&THREAD_LOCAL_CONFIG, Some(Config::default()));
//!
//!    for i in 0..16 {
//!        tp.submit(Box::new(ThreadLocalExampleCommand::new(i)));
//!    }
//!
//!    tp.shutdown();
//!    tp.join()
//!}
//! ```
//!

pub mod blocking_queue;
pub mod thread_pool;
pub mod thread_pool_builder;
pub mod signal;
pub mod shutdown_mode;
pub mod command;