messaging_thread_pool/samples/randoms_batch/
mod.rs

1//! # RandomsBatch - Advanced Nested Thread Pool Example
2//!
3//! This module demonstrates advanced patterns including:
4//! - Generic pool items
5//! - Custom initialization types
6//! - Nested thread pools (a pool item that contains another pool)
7//! - Mocking nested dependencies for testing
8
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use crate::pool_item;
13use crate::samples::Randoms;
14use crate::{id_provider::IdProvider, *};
15
16use super::{RandomsAddRequest, SumRequest, SumResponse};
17
18/// Trait for abstracting the inner thread pool type.
19///
20/// This enables:
21/// - Using a real `ThreadPool<Randoms>` in production
22/// - Using a `SenderAndReceiverMock` in tests
23///
24/// # Example
25///
26/// ```rust,ignore
27/// // Production code uses RandomsThreadPool
28/// let batch: RandomsBatch<RandomsThreadPool> = /* ... */;
29///
30/// // Test code uses a mock
31/// let batch: RandomsBatch<SenderAndReceiverMock<_, _>> = /* ... */;
32/// ```
33pub trait InnerThreadPool: Debug + Send {
34    /// The concrete thread pool type that implements `SenderAndReceiver<Randoms>`
35    type ThreadPool: SenderAndReceiver<Randoms> + Send + Sync + Debug;
36}
37
38/// Marker type for using a real `ThreadPool<Randoms>` as the inner pool.
39#[derive(Debug)]
40pub struct RandomsThreadPool;
41impl InnerThreadPool for RandomsThreadPool {
42    type ThreadPool = ThreadPool<Randoms>;
43}
44
45/// Implement `InnerThreadPool` for mock types to enable testing.
46impl<T: RequestWithResponse<Randoms> + Send + Sync> InnerThreadPool
47    for SenderAndReceiverMock<Randoms, T>
48where
49    <T as request_with_response::RequestWithResponse<Randoms>>::Response: Send,
50{
51    type ThreadPool = SenderAndReceiverMock<Randoms, T>;
52}
53
54/// Custom initialization request for `RandomsBatch`.
55///
56/// This demonstrates using a custom `Init` type instead of the generated one.
57/// Custom Init types are needed when:
58/// - The constructor needs more than just an ID
59/// - You need to pass complex configuration
60/// - The pool item is generic and needs type information
61///
62/// # Fields
63///
64/// - `id` - Unique identifier for this batch
65/// - `number_of_contained_randoms` - How many `Randoms` items to create
66/// - `id_provider` - Shared ID generator (ensures unique IDs across batches)
67/// - `randoms_thread_pool` - Shared inner thread pool for the `Randoms`
68#[derive(Debug, Clone)]
69pub struct RandomsBatchAddRequest<P: InnerThreadPool> {
70    pub id: u64,
71    pub number_of_contained_randoms: usize,
72    pub id_provider: Arc<dyn IdProvider>,
73    /// This thread pool is shared by all Randoms across all RandomsBatches
74    pub randoms_thread_pool: Arc<P::ThreadPool>,
75}
76
77impl<P: InnerThreadPool> RandomsBatchAddRequest<P> {
78    pub fn id_provider(&self) -> &dyn IdProvider {
79        self.id_provider.as_ref()
80    }
81
82    pub fn id(&self) -> u64 {
83        self.id
84    }
85}
86
87impl<P: InnerThreadPool> IdTargeted for RandomsBatchAddRequest<P> {
88    fn id(&self) -> u64 {
89        self.id
90    }
91}
92
93impl<P: InnerThreadPool> RequestWithResponse<RandomsBatch<P>> for RandomsBatchAddRequest<P> {
94    type Response = AddResponse;
95}
96
97impl<P: InnerThreadPool> From<RandomsBatchAddRequest<P>>
98    for ThreadRequestResponse<RandomsBatch<P>>
99{
100    fn from(request: RandomsBatchAddRequest<P>) -> Self {
101        ThreadRequestResponse::<RandomsBatch<P>>::AddPoolItem(RequestResponse::Request(request))
102    }
103}
104
105/// A batch of `Randoms` items, demonstrating nested thread pools.
106///
107/// This is an advanced example showing:
108/// - **Generic pool items**: `RandomsBatch<P>` is generic over the inner pool type
109/// - **Custom Init type**: Uses `RandomsBatchAddRequest<P>` instead of generated init
110/// - **Nested pools**: Each batch references an inner pool of `Randoms`
111/// - **Shared resources**: Multiple batches share the same inner pool and ID provider
112///
113/// # Architecture
114///
115/// ```text
116/// ┌─────────────────────────────────────────┐
117/// │     ThreadPool<RandomsBatch<P>>         │ ← Outer pool (manages batches)
118/// │  ┌─────────────┐  ┌─────────────┐       │
119/// │  │ Batch (id=0)│  │ Batch (id=1)│  ...  │
120/// │  │  refs→[1,2] │  │  refs→[3,4] │       │
121/// │  └──────┬──────┘  └──────┬──────┘       │
122/// └─────────┼────────────────┼──────────────┘
123///           │                │
124///           ▼                ▼
125/// ┌─────────────────────────────────────────┐
126/// │       Arc<ThreadPool<Randoms>>          │ ← Inner pool (shared)
127/// │  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐    │
128/// │  │ R(1) │ │ R(2) │ │ R(3) │ │ R(4) │    │
129/// │  └──────┘ └──────┘ └──────┘ └──────┘    │
130/// └─────────────────────────────────────────┘
131/// ```
132///
133/// # Example: Real Thread Pools
134///
135/// ```rust
136/// use std::sync::Arc;
137/// use messaging_thread_pool::{ThreadPool, id_provider::id_provider_mutex::IdProviderMutex, samples::*};
138///
139/// // Create the outer pool for batches
140/// let batch_pool = ThreadPool::<RandomsBatch<RandomsThreadPool>>::new(2);
141///
142/// // Create the shared inner pool for Randoms
143/// let randoms_pool = Arc::new(ThreadPool::<Randoms>::new(4));
144///
145/// // Create shared ID provider (ensures unique IDs across all batches)
146/// let id_provider = Arc::new(IdProviderMutex::new(0));
147///
148/// // Create a batch - this will create 10 Randoms in the inner pool
149/// batch_pool.send_and_receive_once(RandomsBatchAddRequest {
150///     id: 0,
151///     number_of_contained_randoms: 10,
152///     id_provider: id_provider.clone(),
153///     randoms_thread_pool: randoms_pool.clone(),
154/// }).unwrap();
155///
156/// // Query the batch - it will in turn query its Randoms
157/// let response = batch_pool.send_and_receive_once(
158///     SumOfSumsRequest(0, std::marker::PhantomData)
159/// ).unwrap();
160/// ```
161///
162/// # Example: Mocking the Inner Pool
163///
164/// See `tests/example_two_level.rs` for a complete mocking example.
165#[derive(Debug)]
166pub struct RandomsBatch<P: InnerThreadPool> {
167    pub id: u64,
168    pub contained_random_ids: Vec<u64>,
169    pub id_provider: Arc<dyn IdProvider>,
170    pub randoms_thread_pool: Arc<P::ThreadPool>,
171}
172
173#[pool_item(Init = "RandomsBatchAddRequest<P>")]
174impl<P: InnerThreadPool> RandomsBatch<P> {
175    pub fn new(add_request: RandomsBatchAddRequest<P>) -> Self {
176        let mut new = Self {
177            id: add_request.id,
178            contained_random_ids: vec![],
179            id_provider: Arc::clone(&add_request.id_provider),
180            randoms_thread_pool: Arc::clone(&add_request.randoms_thread_pool),
181        };
182
183        let mut ids = Vec::<u64>::default();
184        new.randoms_thread_pool()
185            .send_and_receive(
186                (0..add_request.number_of_contained_randoms)
187                    .map(|_| RandomsAddRequest(new.id_provider.next_id())),
188            )
189            .expect("randoms thread pool to be available")
190            .for_each(|r: AddResponse| {
191                assert!(r.result().is_ok(), "Request to add Randoms failed");
192                ids.push(r.id());
193            });
194
195        new.contained_random_ids_mut().append(&mut ids);
196        new
197    }
198
199    pub fn randoms_thread_pool(&self) -> &P::ThreadPool {
200        self.randoms_thread_pool.as_ref()
201    }
202
203    #[messaging(SumOfSumsRequest, SumOfSumsResponse)]
204    pub fn sum_of_sums(&self) -> u128 {
205        // to get the sum of sums need to message the controls Randoms to get their sums
206        // and then add them all up
207        self.randoms_thread_pool()
208            .send_and_receive(self.contained_random_ids.iter().map(|id| SumRequest(*id)))
209            .expect("randoms thread pool to be available")
210            .map(|response: SumResponse| response.sum())
211            .sum()
212    }
213
214    pub fn contained_random_ids_mut(&mut self) -> &mut Vec<u64> {
215        &mut self.contained_random_ids
216    }
217}