messaging_thread_pool/samples/randoms_batch/mod.rs
1//! # RandomsBatch - Advanced Nested Thread Pool Example
2//!
3//! This module demonstrates advanced patterns including:
4//! - Generic pool items
5//! - Custom initialization types
6//! - Nested thread pools (a pool item that contains another pool)
7//! - Mocking nested dependencies for testing
8
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use crate::pool_item;
13use crate::samples::Randoms;
14use crate::{id_provider::IdProvider, *};
15
16use super::{RandomsAddRequest, SumRequest, SumResponse};
17
18/// Trait for abstracting the inner thread pool type.
19///
20/// This enables:
21/// - Using a real `ThreadPool<Randoms>` in production
22/// - Using a `SenderAndReceiverMock` in tests
23///
24/// # Example
25///
26/// ```rust,ignore
27/// // Production code uses RandomsThreadPool
28/// let batch: RandomsBatch<RandomsThreadPool> = /* ... */;
29///
30/// // Test code uses a mock
31/// let batch: RandomsBatch<SenderAndReceiverMock<_, _>> = /* ... */;
32/// ```
33pub trait InnerThreadPool: Debug + Send {
34 /// The concrete thread pool type that implements `SenderAndReceiver<Randoms>`
35 type ThreadPool: SenderAndReceiver<Randoms> + Send + Sync + Debug;
36}
37
38/// Marker type for using a real `ThreadPool<Randoms>` as the inner pool.
39#[derive(Debug)]
40pub struct RandomsThreadPool;
41impl InnerThreadPool for RandomsThreadPool {
42 type ThreadPool = ThreadPool<Randoms>;
43}
44
45/// Implement `InnerThreadPool` for mock types to enable testing.
46impl<T: RequestWithResponse<Randoms> + Send + Sync> InnerThreadPool
47 for SenderAndReceiverMock<Randoms, T>
48where
49 <T as request_with_response::RequestWithResponse<Randoms>>::Response: Send,
50{
51 type ThreadPool = SenderAndReceiverMock<Randoms, T>;
52}
53
54/// Custom initialization request for `RandomsBatch`.
55///
56/// This demonstrates using a custom `Init` type instead of the generated one.
57/// Custom Init types are needed when:
58/// - The constructor needs more than just an ID
59/// - You need to pass complex configuration
60/// - The pool item is generic and needs type information
61///
62/// # Fields
63///
64/// - `id` - Unique identifier for this batch
65/// - `number_of_contained_randoms` - How many `Randoms` items to create
66/// - `id_provider` - Shared ID generator (ensures unique IDs across batches)
67/// - `randoms_thread_pool` - Shared inner thread pool for the `Randoms`
68#[derive(Debug, Clone)]
69pub struct RandomsBatchAddRequest<P: InnerThreadPool> {
70 pub id: u64,
71 pub number_of_contained_randoms: usize,
72 pub id_provider: Arc<dyn IdProvider>,
73 /// This thread pool is shared by all Randoms across all RandomsBatches
74 pub randoms_thread_pool: Arc<P::ThreadPool>,
75}
76
77impl<P: InnerThreadPool> RandomsBatchAddRequest<P> {
78 pub fn id_provider(&self) -> &dyn IdProvider {
79 self.id_provider.as_ref()
80 }
81
82 pub fn id(&self) -> u64 {
83 self.id
84 }
85}
86
87impl<P: InnerThreadPool> IdTargeted for RandomsBatchAddRequest<P> {
88 fn id(&self) -> u64 {
89 self.id
90 }
91}
92
93impl<P: InnerThreadPool> RequestWithResponse<RandomsBatch<P>> for RandomsBatchAddRequest<P> {
94 type Response = AddResponse;
95}
96
97impl<P: InnerThreadPool> From<RandomsBatchAddRequest<P>>
98 for ThreadRequestResponse<RandomsBatch<P>>
99{
100 fn from(request: RandomsBatchAddRequest<P>) -> Self {
101 ThreadRequestResponse::<RandomsBatch<P>>::AddPoolItem(RequestResponse::Request(request))
102 }
103}
104
105/// A batch of `Randoms` items, demonstrating nested thread pools.
106///
107/// This is an advanced example showing:
108/// - **Generic pool items**: `RandomsBatch<P>` is generic over the inner pool type
109/// - **Custom Init type**: Uses `RandomsBatchAddRequest<P>` instead of generated init
110/// - **Nested pools**: Each batch references an inner pool of `Randoms`
111/// - **Shared resources**: Multiple batches share the same inner pool and ID provider
112///
113/// # Architecture
114///
115/// ```text
116/// ┌─────────────────────────────────────────┐
117/// │ ThreadPool<RandomsBatch<P>> │ ← Outer pool (manages batches)
118/// │ ┌─────────────┐ ┌─────────────┐ │
119/// │ │ Batch (id=0)│ │ Batch (id=1)│ ... │
120/// │ │ refs→[1,2] │ │ refs→[3,4] │ │
121/// │ └──────┬──────┘ └──────┬──────┘ │
122/// └─────────┼────────────────┼──────────────┘
123/// │ │
124/// ▼ ▼
125/// ┌─────────────────────────────────────────┐
126/// │ Arc<ThreadPool<Randoms>> │ ← Inner pool (shared)
127/// │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
128/// │ │ R(1) │ │ R(2) │ │ R(3) │ │ R(4) │ │
129/// │ └──────┘ └──────┘ └──────┘ └──────┘ │
130/// └─────────────────────────────────────────┘
131/// ```
132///
133/// # Example: Real Thread Pools
134///
135/// ```rust
136/// use std::sync::Arc;
137/// use messaging_thread_pool::{ThreadPool, id_provider::id_provider_mutex::IdProviderMutex, samples::*};
138///
139/// // Create the outer pool for batches
140/// let batch_pool = ThreadPool::<RandomsBatch<RandomsThreadPool>>::new(2);
141///
142/// // Create the shared inner pool for Randoms
143/// let randoms_pool = Arc::new(ThreadPool::<Randoms>::new(4));
144///
145/// // Create shared ID provider (ensures unique IDs across all batches)
146/// let id_provider = Arc::new(IdProviderMutex::new(0));
147///
148/// // Create a batch - this will create 10 Randoms in the inner pool
149/// batch_pool.send_and_receive_once(RandomsBatchAddRequest {
150/// id: 0,
151/// number_of_contained_randoms: 10,
152/// id_provider: id_provider.clone(),
153/// randoms_thread_pool: randoms_pool.clone(),
154/// }).unwrap();
155///
156/// // Query the batch - it will in turn query its Randoms
157/// let response = batch_pool.send_and_receive_once(
158/// SumOfSumsRequest(0, std::marker::PhantomData)
159/// ).unwrap();
160/// ```
161///
162/// # Example: Mocking the Inner Pool
163///
164/// See `tests/example_two_level.rs` for a complete mocking example.
165#[derive(Debug)]
166pub struct RandomsBatch<P: InnerThreadPool> {
167 pub id: u64,
168 pub contained_random_ids: Vec<u64>,
169 pub id_provider: Arc<dyn IdProvider>,
170 pub randoms_thread_pool: Arc<P::ThreadPool>,
171}
172
173#[pool_item(Init = "RandomsBatchAddRequest<P>")]
174impl<P: InnerThreadPool> RandomsBatch<P> {
175 pub fn new(add_request: RandomsBatchAddRequest<P>) -> Self {
176 let mut new = Self {
177 id: add_request.id,
178 contained_random_ids: vec![],
179 id_provider: Arc::clone(&add_request.id_provider),
180 randoms_thread_pool: Arc::clone(&add_request.randoms_thread_pool),
181 };
182
183 let mut ids = Vec::<u64>::default();
184 new.randoms_thread_pool()
185 .send_and_receive(
186 (0..add_request.number_of_contained_randoms)
187 .map(|_| RandomsAddRequest(new.id_provider.next_id())),
188 )
189 .expect("randoms thread pool to be available")
190 .for_each(|r: AddResponse| {
191 assert!(r.result().is_ok(), "Request to add Randoms failed");
192 ids.push(r.id());
193 });
194
195 new.contained_random_ids_mut().append(&mut ids);
196 new
197 }
198
199 pub fn randoms_thread_pool(&self) -> &P::ThreadPool {
200 self.randoms_thread_pool.as_ref()
201 }
202
203 #[messaging(SumOfSumsRequest, SumOfSumsResponse)]
204 pub fn sum_of_sums(&self) -> u128 {
205 // to get the sum of sums need to message the controls Randoms to get their sums
206 // and then add them all up
207 self.randoms_thread_pool()
208 .send_and_receive(self.contained_random_ids.iter().map(|id| SumRequest(*id)))
209 .expect("randoms thread pool to be available")
210 .map(|response: SumResponse| response.sum())
211 .sum()
212 }
213
214 pub fn contained_random_ids_mut(&mut self) -> &mut Vec<u64> {
215 &mut self.contained_random_ids
216 }
217}