1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
//! # Messaging thread pool
//!
//! Messaging thread pool is a collection of traits and structs for setting up a simple fix sized thread pool
//! which holds a collection of a given type.
//!
//! Instances of the objects are identified by an id which is unique within the thread pool.
//!
//! Objects are distributed across the thread pool based on their id and ownership of the
//! object is held there
//!
//! Objects are communicated with via a user defined set of messages which effectively form an api.
//! These messages are sent and received over crossbeam channels.
//!
//! The object need to implement a set of simple traits and define a set of request/response messages
//! to allow the thread pool infrastructure to handle the objects and to route messages to them.
//!
//! The lifetimes of the objects are easy to reason about as is the behaviour of the thread pools themselves.
//!
//! The original motivation was to provide support for a hierarchy of dependent, long-lived objects,
//! that each required their own thread pools to avoid complex threading dependencies
//! The objects in the thread pools were all CPU bound i.e. did not perform any significant I/O
//!
//! # Example
//! ```
//! use std::iter;
//! use messaging_thread_pool::{*, samples::*};
//!
//!    // creates a thread pool with 4 threads.
//!    // The lifetime of the elements created (the Randoms in this case) will be tied to the
//!    // life of this struct
//!    let thread_pool = ThreadPool::<Randoms>::new(10);
//!
//!    // create a 1000 Randoms across the thread pool by sending a thousand add requests.
//!    // The creation of these objects (with the keys 0..1000) will be distributed across
//!    // the 10 threads in the pool.
//!    // Their owning thread will create and store them.
//!    // They will not be dropped until they are either requested to be dropped or until the
//!    // thread pool itself is dropped.
//!    thread_pool
//!        .send_and_receive((0..1000usize).map(|i| RandomsAddRequest(i)))
//!        .expect("thread pool to be available")
//!        .for_each(|response: AddResponse| assert!(response.result().is_ok()));
//!
//!    // now create 1000 messages asking them for the sum of the Randoms objects contained
//!    // random numbers.
//!    // The message will be routed to the thread to where the targeted object resides
//!    // This call will block until all of the work is done and the responses returned
//!    let sums: Vec<SumResponse> = thread_pool
//!        .send_and_receive((0..1000usize).map(|i| SumRequest(i)))
//!        .expect("thread pool to be available")
//!        .collect();
//!    assert_eq!(1000, sums.len());
//!
//!    // get the mean of the randoms for object with id 0, this will execute on thread 0
//!    // this call will block until complete
//!    let mean_response_0: MeanResponse = thread_pool
//!        .send_and_receive(iter::once(MeanRequest(0)))
//!        .expect("thread pool to be available")
//!        .nth(0)
//!        .unwrap();
//!    println!("{}", mean_response_0.mean());
//!
//!    // remove object with id 1
//!    // it will be dropped from the thread where it was residing
//!    thread_pool
//!        .send_and_receive(iter::once(RemovePoolItemRequest(1)))
//!        .expect("thread pool to be available")
//!        .for_each(|response: RemovePoolItemResponse| assert!(response.item_existed()));
//!
//!    // add a new object with id 1000
//!    thread_pool
//!        .send_and_receive(iter::once(RandomsAddRequest(1000)))
//!        .expect("thread pool to be available")
//!        .for_each(|response: AddResponse| assert!(response.result().is_ok()));
//!
//!    // all objects are dropped when the basic thread pool batcher is dropped
//!    // the threads are shutdown and joined back the the main thread
//!    drop(thread_pool);
//! ```
//!
//! # Limitations
//!
//! The thread pool cannot be dynamically sized.\
//! It is fixed at creation.\
//! As there is a ThreadShutdown request it could be implied that therefore there should be a ThreadCreation request.
//! This is not the case and it is not intended that individual threads will be shutdown in isolation and in fact
//! this will lead to the thread pool panicking.\
//! The shutdown request is intended to be called only when the whole thread pool is finished with and in fact it
//! is probably best to avoid using it and to just drop the thread pool (which internally sends out all the required shutdown messages).\
//!
//! It was not really intended for anything other than long-lived CPU bound elements.
//!
use std::{cell::RefCell, sync::RwLock};

use thread_endpoint::ThreadEndpoint;

pub mod global_test_scope;
pub mod id_being_processed;
pub mod id_provider;
pub mod samples;
pub mod sender_couplet;

mod drop;
mod id_targeted;
mod new;
mod pool_item;
mod pool_thread;
mod receive;
mod request_response;
mod request_with_response;
mod send;
mod send_and_receive;
mod sender_and_receiver;
mod shutdown;
mod thread_endpoint;
mod thread_request_response;

pub use id_being_processed::*;
pub use id_targeted::IdTargeted;
pub use pool_item::*;
pub use request_response::RequestResponse;
pub use request_with_response::RequestWithResponse;
pub use sender_and_receiver::*;
pub use sender_couplet::*;
pub use thread_request_response::*;

thread_local! {
    static ID_BEING_PROCESSED: RefCell<Option<usize>> = RefCell::new(None);
}

/// This struct represents a pool of threads that can target a particular type of
/// resource (a resource being a struct that implements [`PoolItem`])
///
/// In order to allow for distribution over multiple threads each resource must have an id
/// that allows for routing to a particular thread.
#[derive(Debug)]
pub struct ThreadPool<P>
where
    P: PoolItem,
{
    thread_endpoints: RwLock<Vec<ThreadEndpoint<P>>>,
}

impl<P> ThreadPool<P>
where
    P: PoolItem,
{
    /// This function returns the number of threads in the thread pool
    pub fn thread_count(&self) -> usize {
        self.thread_endpoints
            .read()
            .expect("read should never be poisoned")
            .len()
    }
}

#[cfg(test)]
mod tests {
    use crate::{samples::*, ThreadPool};

    #[test]
    fn thread_pool_size_2_thread_count_2() {
        let result = ThreadPool::<Randoms>::new(2);

        // one thread created
        assert_eq!(2, result.thread_count());

        // shutdown the thread pool
        result.shutdown();
    }

    #[test]
    fn thread_pool_size_1_thread_count_1() {
        let result = ThreadPool::<Randoms>::new(1);

        // one thread created
        assert_eq!(1, result.thread_count());

        // shutdown the thread pool
        result.shutdown();
    }
}