hydro2_async_scheduler/
worker_pool.rs1crate::ix!();
3
4#[derive(Builder)]
9#[builder(setter(into), pattern = "owned")]
10pub struct WorkerPool<'threads, T>
11where
12 T: Debug + Send + Sync + 'threads
13{
14 main_tasks_tx: Sender<TaskItem<'threads, T>>,
16
17 threads: Vec<ScopedJoinHandle<'threads, ()>>,
19
20 results_rx: AsyncMutex<Receiver<TaskResult>>,
22
23 #[cfg(test)]
24 #[builder(default)]
25 pub(crate) results_tx_for_test: Option<Sender<TaskResult>>
26}
27
28impl<'threads, T> WorkerPool<'threads, T>
29where
30 T: Debug + Send + Sync + 'threads,
31{
32 pub fn new_in_scope(
42 scope: &'threads Scope<'threads, '_>,
43 num_workers: usize,
44 buffer_size: usize,
45 ) -> Self {
46
47 eprintln!(
48 "WorkerPool::new_in_scope => setting up aggregator + {} workers, buffer_size={}",
49 num_workers, buffer_size
50 );
51
52 let (main_tasks_tx, main_tasks_rx) = mpsc::channel::<TaskItem<'threads, T>>(buffer_size);
54 eprintln!("WorkerPool::new_in_scope => created main_tasks channel (aggregator consumer)");
55
56 let (results_tx, results_rx) = mpsc::channel::<TaskResult>(buffer_size);
58 eprintln!("WorkerPool::new_in_scope => created results channel for all workers");
59
60 let (worker_senders, worker_receivers) = create_worker_channels(num_workers, buffer_size);
63
64 let threads = spawn_aggregator_and_workers(
66 scope,
67 main_tasks_rx,
68 worker_senders,
69 worker_receivers,
70 results_tx
71 );
72
73 eprintln!("WorkerPool::new_in_scope => aggregator + {} workers => returning WorkerPool", num_workers);
74
75 WorkerPool {
76 main_tasks_tx,
77 threads,
78 results_rx: AsyncMutex::new(results_rx),
79
80 #[cfg(test)]
81 results_tx_for_test: None
82 }
83 }
84
85 pub async fn submit(&self, item: TaskItem<'threads, T>) -> Result<(), NetworkError> {
87 eprintln!("WorkerPool::submit => sending to aggregator main_tasks channel => node_idx={}", item.node_idx());
88 match self.main_tasks_tx.try_send(item) {
90 Ok(()) => Ok(()),
91 Err(_e) => Err(NetworkError::ResourceExhaustion {
92 resource: "WorkerPool Main Tasks Channel".into(),
93 }),
94 }
95 }
96
97 pub async fn try_recv_result(&self) -> Option<TaskResult> {
99 let mut guard = self.results_rx.lock().await;
100 let res = guard.try_recv().ok();
101 if let Some(ref r) = res {
102 eprintln!("WorkerPool::try_recv_result => got a result => node_idx={}", r.node_idx());
103 }
104 res
105 }
106
107 pub fn is_main_channel_closed(&self) -> bool {
108 self.main_tasks_tx.is_closed()
109 }
110
111 pub fn close_main_tasks_channel(&self) {
113 eprintln!("WorkerPool::close_main_tasks_channel => about to drop main_tasks_tx");
114 eprintln!("Pointer: {:p}", &self.main_tasks_tx);
115 drop(&self.main_tasks_tx);
116 eprintln!("WorkerPool::close_main_tasks_channel => after drop(main_tasks_tx)");
117 }
118
119 pub fn shutdown(self) {
122 eprintln!("WorkerPool::shutdown => dropping main_tasks_tx => aggregator sees None => eventually done");
123 drop(self.main_tasks_tx);
124
125 for (i, th) in self.threads.into_iter().enumerate() {
126 eprintln!("WorkerPool::shutdown => joining aggregator/worker thread #{}", i);
127 let _ = th.join();
128 }
129 eprintln!("WorkerPool::shutdown => all aggregator+worker threads joined => done");
130 }
131}