hydro2_async_scheduler/
fetch_next_task.rs1crate::ix!();
3
4pub async fn fetch_next_task<'threads, T>(
8 worker_rx: &mut Receiver<TaskItem<'threads, T>>,
9 worker_id: usize,
10) -> Option<TaskItem<'threads, T>>
11where
12 T: Debug + Send + Sync + 'threads
13{
14 eprintln!("worker #{worker_id} => waiting on fetch_next_task(...)");
15 let maybe_task = worker_rx.recv().await;
16 match &maybe_task {
17 Some(t) => {
18 eprintln!(
19 "worker #{worker_id} => fetch_next_task => received TaskItem for node={}, concurrency={}",
20 t.node_idx(),
21 if t.permit().is_some() { "acquired" } else { "none" },
22 );
23 }
24 None => {
25 eprintln!("worker #{worker_id} => fetch_next_task => channel closed (no more tasks)");
26 }
27 }
28 maybe_task
29}
30
31#[cfg(test)]
32mod fetch_next_task_tests {
33 use super::*;
34
35 #[traced_test]
37 async fn test_fetch_next_task_empty_channel() {
38 let (tx, mut rx) = mpsc::channel::<TaskItem<'static, TestWireIO<i32>>>(1);
39 drop(tx); let maybe_task = fetch_next_task(&mut rx, 0).await;
42 assert!(maybe_task.is_none(), "No tasks => None");
43 }
44
45 #[traced_test]
47 async fn test_fetch_next_task_one_item() {
48 let (tx, mut rx) = mpsc::channel::<TaskItem<'static,TestWireIO<i32>>>(1);
49
50 let item = mock_minimal_task_item_with_permit(0);
51 tx.send(item).await.unwrap();
52 drop(tx); let maybe_task = fetch_next_task(&mut rx, 99).await;
55 assert!(maybe_task.is_some(), "Expected one task => Some");
56 let t = maybe_task.unwrap();
57 assert_eq!(*t.node_idx(), 0);
58
59 let second = fetch_next_task(&mut rx, 99).await;
61 assert!(second.is_none());
62 }
63
64 #[traced_test]
66 async fn test_fetch_next_task_multiple_items() {
67 let (tx, mut rx) = mpsc::channel::<TaskItem<'static, TestWireIO<i32>>>(10);
68
69 for i in 0..3 {
71 let it = mock_minimal_task_item_with_permit(i);
72 tx.send(it).await.unwrap();
73 }
74 drop(tx);
75
76 let task1 = fetch_next_task(&mut rx, 1).await;
78 assert!(task1.is_some());
79 assert_eq!(*task1.as_ref().unwrap().node_idx(), 0);
80
81 let task2 = fetch_next_task(&mut rx, 1).await;
83 assert!(task2.is_some());
84 assert_eq!(*task2.as_ref().unwrap().node_idx(), 1);
85
86 let task3 = fetch_next_task(&mut rx, 1).await;
88 assert!(task3.is_some());
89 assert_eq!(*task3.as_ref().unwrap().node_idx(), 2);
90
91 let t4 = fetch_next_task(&mut rx, 1).await;
93 assert!(t4.is_none());
94 }
95
96 #[traced_test]
100 async fn test_fetch_next_task_with_producer() -> Result<(),NetworkError> {
101 let (tx, mut rx) = mpsc::channel::<TaskItem<'static, TestWireIO<i32>>>(2);
102
103 let producer = tokio::spawn(async move {
105 for i in 0..5 {
106 let item = mock_minimal_task_item_with_permit(i);
107 tx.send(item).await.unwrap();
108 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
109 }
110 });
112
113 let consumer = tokio::spawn(async move {
115 let mut out = Vec::new();
116 loop {
117 match fetch_next_task(&mut rx, 10).await {
118 Some(t) => {
119 out.push(*t.node_idx());
120 }
121 None => break,
122 }
123 }
125 out
126 });
127
128 let (p, results) = tokio::join!(producer, consumer);
129 let mut results = results.expect("expected to be able to join");
130 p.expect("Producer panicked");
131 results.sort_unstable();
132 assert_eq!(results, vec![0,1,2,3,4]);
134 Ok(())
135 }
136
137 #[traced_test]
142 async fn test_fetch_next_task_concurrency_permit_vs_none() {
143 let (tx, mut rx) = mpsc::channel::<TaskItem<'static,TestWireIO<i32>>>(2);
144
145 let mut it1 = mock_minimal_task_item_with_permit(111);
147 assert!(it1.permit().is_some()); let mut it2 = mock_minimal_task_item_with_permit(222);
151 *it2.permit_mut() = None;
152
153 tx.send(it1).await.unwrap();
154 tx.send(it2).await.unwrap();
155 drop(tx);
156
157 let first = fetch_next_task(&mut rx, 77).await.unwrap();
158 assert_eq!(*first.node_idx(), 111);
159 assert!(first.permit().is_some());
160
161 let second = fetch_next_task(&mut rx, 77).await.unwrap();
162 assert_eq!(*second.node_idx(), 222);
163 assert!(second.permit().is_none());
164
165 let none = fetch_next_task(&mut rx, 77).await;
166 assert!(none.is_none());
167 }
168
169 #[traced_test]
173 async fn test_fetch_next_task_blocking_scenario() {
174 let (tx, mut rx) = mpsc::channel::<TaskItem<'static,TestWireIO<i32>>>(2);
175
176 let consumer = tokio::spawn(async move {
180 let maybe_t = fetch_next_task(&mut rx, 55).await;
181 maybe_t
182 });
183
184 let producer = tokio::spawn(async move {
185 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
186 let item = mock_minimal_task_item_with_permit(999);
187 tx.send(item).await.unwrap();
188 });
190
191 let (cons_res, _) = tokio::join!(consumer, producer);
192 let maybe_task = cons_res.expect("Consumer panicked");
193 assert!(maybe_task.is_some());
194 assert_eq!(*maybe_task.as_ref().unwrap().node_idx(), 999);
195 }
196
197 #[traced_test]
202 async fn test_fetch_next_task_multi_fetchers_producers() {
203 let (tx, rx) = mpsc::channel::<TaskItem<'static,TestWireIO<i32>>>(5);
204 let rx_arc = Arc::new(AsyncMutex::new(rx));
205
206 let txc1 = tx.clone();
207 let txc2 = tx.clone();
208
209 let p1 = {
211 tokio::spawn(async move {
212 for i in 0..5 {
213 let item = mock_minimal_task_item_with_permit(i);
214 txc1.send(item).await.unwrap();
215 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
216 }
217 })
218 };
219
220 let p2 = {
221 tokio::spawn(async move {
222 for j in 5..10 {
223 let item = mock_minimal_task_item_with_permit(j);
224 txc2.send(item).await.unwrap();
228 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
229 }
230 })
231 };
232
233 let rxa = rx_arc.clone();
237 let fetcher_a = tokio::spawn(async move {
238 let mut out = Vec::new();
239 loop {
240 let mut guard = rxa.lock().await;
241 if let Some(t) = fetch_next_task(&mut *guard, 111).await {
242 out.push(*t.node_idx());
243 } else {
244 break;
245 }
246 }
247 out
248 });
249
250 let rxb = rx_arc.clone();
251 let fetcher_b = tokio::spawn(async move {
252 let mut out = Vec::new();
253 loop {
254 let mut guard = rxb.lock().await;
255 if let Some(t) = fetch_next_task(&mut *guard, 222).await {
256 out.push(*t.node_idx());
257 } else {
258 break;
259 }
260 }
261 out
262 });
263
264 let _ = tokio::join!(p1, p2);
266 drop(tx); let (a_res, b_res) = tokio::join!(fetcher_a, fetcher_b);
269
270 let list_a = a_res.expect("FetcherA panicked");
271 let list_b = b_res.expect("FetcherB panicked");
272 let total_count = list_a.len() + list_b.len();
273
274 assert_eq!(total_count, 10);
276
277 let mut combined = Vec::new();
279 combined.extend(list_a);
280 combined.extend(list_b);
281 combined.sort_unstable();
282 combined.dedup();
283 assert_eq!(combined, vec![0,1,2,3,4,5,6,7,8,9]);
284 }
285}