hydro2_async_scheduler/
fetch_next_task.rs

1// ---------------- [ File: src/fetch_next_task.rs ]
2crate::ix!();
3
4/// Retrieves the next `TaskItem` from the worker’s channel.
5/// We now log more details about whether we're waiting, whether a task is present, etc.
6/// This can help confirm if a worker is truly idle or if a hang might be from unreceived tasks.
7pub async fn fetch_next_task<'threads, T>(
8    worker_rx: &mut Receiver<TaskItem<'threads, T>>,
9    worker_id: usize,
10) -> Option<TaskItem<'threads, T>>
11where
12    T: Debug + Send + Sync + 'threads
13{
14    eprintln!("worker #{worker_id} => waiting on fetch_next_task(...)");
15    let maybe_task = worker_rx.recv().await;
16    match &maybe_task {
17        Some(t) => {
18            eprintln!(
19                "worker #{worker_id} => fetch_next_task => received TaskItem for node={}, concurrency={}",
20                t.node_idx(),
21                if t.permit().is_some() { "acquired" } else { "none" },
22            );
23        }
24        None => {
25            eprintln!("worker #{worker_id} => fetch_next_task => channel closed (no more tasks)");
26        }
27    }
28    maybe_task
29}
30
31#[cfg(test)]
32mod fetch_next_task_tests {
33    use super::*;
34
35    /// 1) If the channel is closed immediately, we expect `None`.
36    #[traced_test]
37    async fn test_fetch_next_task_empty_channel() {
38        let (tx, mut rx) = mpsc::channel::<TaskItem<'static, TestWireIO<i32>>>(1);
39        drop(tx); // close immediately
40
41        let maybe_task = fetch_next_task(&mut rx, 0).await;
42        assert!(maybe_task.is_none(), "No tasks => None");
43    }
44
45    /// 2) Single item => we fetch exactly one => then channel closes => no more.
46    #[traced_test]
47    async fn test_fetch_next_task_one_item() {
48        let (tx, mut rx) = mpsc::channel::<TaskItem<'static,TestWireIO<i32>>>(1);
49
50        let item = mock_minimal_task_item_with_permit(0); 
51        tx.send(item).await.unwrap();
52        drop(tx); // channel closed after sending one
53
54        let maybe_task = fetch_next_task(&mut rx, 99).await;
55        assert!(maybe_task.is_some(), "Expected one task => Some");
56        let t = maybe_task.unwrap();
57        assert_eq!(*t.node_idx(), 0);
58
59        // Next call => None
60        let second = fetch_next_task(&mut rx, 99).await;
61        assert!(second.is_none());
62    }
63
64    /// 3) Multiple items => we fetch them sequentially with repeated calls.
65    #[traced_test]
66    async fn test_fetch_next_task_multiple_items() {
67        let (tx, mut rx) = mpsc::channel::<TaskItem<'static, TestWireIO<i32>>>(10);
68
69        // Enqueue 3 tasks
70        for i in 0..3 {
71            let it = mock_minimal_task_item_with_permit(i);
72            tx.send(it).await.unwrap();
73        }
74        drop(tx);
75
76        // fetch #1
77        let task1 = fetch_next_task(&mut rx, 1).await;
78        assert!(task1.is_some());
79        assert_eq!(*task1.as_ref().unwrap().node_idx(), 0);
80
81        // fetch #2
82        let task2 = fetch_next_task(&mut rx, 1).await;
83        assert!(task2.is_some());
84        assert_eq!(*task2.as_ref().unwrap().node_idx(), 1);
85
86        // fetch #3
87        let task3 = fetch_next_task(&mut rx, 1).await;
88        assert!(task3.is_some());
89        assert_eq!(*task3.as_ref().unwrap().node_idx(), 2);
90
91        // done => None
92        let t4 = fetch_next_task(&mut rx, 1).await;
93        assert!(t4.is_none());
94    }
95
96    /// 4) If the channel is unbounded or large capacity, we can do concurrency.
97    /// Here we'll spawn a producer that sends tasks slowly, 
98    /// and repeatedly call fetch_next_task from the consumer side.
99    #[traced_test]
100    async fn test_fetch_next_task_with_producer() -> Result<(),NetworkError> {
101        let (tx, mut rx) = mpsc::channel::<TaskItem<'static, TestWireIO<i32>>>(2);
102
103        // We'll spawn a producer that sends 5 tasks with small delay
104        let producer = tokio::spawn(async move {
105            for i in 0..5 {
106                let item = mock_minimal_task_item_with_permit(i);
107                tx.send(item).await.unwrap();
108                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
109            }
110            // done
111        });
112
113        // Meanwhile, we keep calling fetch_next_task in a loop
114        let consumer = tokio::spawn(async move {
115            let mut out = Vec::new();
116            loop {
117                match fetch_next_task(&mut rx, 10).await {
118                    Some(t) => {
119                        out.push(*t.node_idx());
120                    }
121                    None => break,
122                }
123                // we can do a small sleep or not
124            }
125            out
126        });
127
128        let (p, results) = tokio::join!(producer, consumer);
129        let mut results = results.expect("expected to be able to join");
130        p.expect("Producer panicked");
131        results.sort_unstable();
132        // We expect [0,1,2,3,4]
133        assert_eq!(results, vec![0,1,2,3,4]);
134        Ok(())
135    }
136
137    /// 5) If concurrency_permit is Some or None => we check the logs or
138    /// the debug statement "concurrency=acquired" vs "concurrency=none".
139    /// We'll just confirm it doesn't break logic, but we can store a "with concurrency" item
140    /// or "no concurrency" item and see if we still get them.
141    #[traced_test]
142    async fn test_fetch_next_task_concurrency_permit_vs_none() {
143        let (tx, mut rx) = mpsc::channel::<TaskItem<'static,TestWireIO<i32>>>(2);
144
145        // Item #1 => with concurrency permit
146        let mut it1 = mock_minimal_task_item_with_permit(111);
147        assert!(it1.permit().is_some()); // by default
148
149        // Item #2 => we forcibly remove the .permit => None
150        let mut it2 = mock_minimal_task_item_with_permit(222);
151        *it2.permit_mut() = None;
152
153        tx.send(it1).await.unwrap();
154        tx.send(it2).await.unwrap();
155        drop(tx);
156
157        let first = fetch_next_task(&mut rx, 77).await.unwrap();
158        assert_eq!(*first.node_idx(), 111);
159        assert!(first.permit().is_some());
160
161        let second = fetch_next_task(&mut rx, 77).await.unwrap();
162        assert_eq!(*second.node_idx(), 222);
163        assert!(second.permit().is_none());
164
165        let none = fetch_next_task(&mut rx, 77).await;
166        assert!(none.is_none());
167    }
168
169    /// 6) If we never close the channel, fetch_next_task(...) can block forever 
170    /// if there's no tasks. Let's confirm we can do partial concurrency:
171    /// We'll do a small test that times out if no items arrive.
172    #[traced_test]
173    async fn test_fetch_next_task_blocking_scenario() {
174        let (tx, mut rx) = mpsc::channel::<TaskItem<'static,TestWireIO<i32>>>(2);
175
176        // We'll spawn a consumer that does fetch_next_task => we expect it to block 
177        // until a task arrives or channel closes.
178        // We'll produce 1 task after 100ms.
179        let consumer = tokio::spawn(async move {
180            let maybe_t = fetch_next_task(&mut rx, 55).await; 
181            maybe_t
182        });
183
184        let producer = tokio::spawn(async move {
185            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
186            let item = mock_minimal_task_item_with_permit(999);
187            tx.send(item).await.unwrap();
188            // do not close channel => let's see if consumer finishes once it gets one.
189        });
190
191        let (cons_res, _) = tokio::join!(consumer, producer);
192        let maybe_task = cons_res.expect("Consumer panicked");
193        assert!(maybe_task.is_some());
194        assert_eq!(*maybe_task.as_ref().unwrap().node_idx(), 999);
195    }
196
197    /// 7) Large concurrency => multiple fetchers, multiple producers => 
198    /// we confirm tasks are distributed among them. This is more aggregator-like logic.
199    /// We'll do each fetcher in a separate future, the producers in separate futures, 
200    /// and see how many tasks each fetcher obtains.
201    #[traced_test]
202    async fn test_fetch_next_task_multi_fetchers_producers() {
203        let (tx, rx) = mpsc::channel::<TaskItem<'static,TestWireIO<i32>>>(5);
204        let rx_arc = Arc::new(AsyncMutex::new(rx));
205
206        let txc1 = tx.clone();
207        let txc2 = tx.clone();
208
209        // We'll spawn 2 producers => each sends 5 tasks => total=10.
210        let p1 = {
211            tokio::spawn(async move {
212                for i in 0..5 {
213                    let item = mock_minimal_task_item_with_permit(i);
214                    txc1.send(item).await.unwrap();
215                    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
216                }
217            })
218        };
219
220        let p2 = {
221            tokio::spawn(async move {
222                for j in 5..10 {
223                    let item = mock_minimal_task_item_with_permit(j);
224                    // the original tx, since we didn't drop it
225                    // or we do a clone again
226                    // but let's do a new clone:
227                    txc2.send(item).await.unwrap();
228                    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
229                }
230            })
231        };
232
233        // We'll spawn 2 fetchers => each tries to read tasks in a loop until channel is closed
234        // but we don't plan to close the channel => let's do a short approach:
235        // Actually let's forcibly close after producers are done => or we can't get `None`.
236        let rxa = rx_arc.clone();
237        let fetcher_a = tokio::spawn(async move {
238            let mut out = Vec::new();
239            loop {
240                let mut guard = rxa.lock().await;
241                if let Some(t) = fetch_next_task(&mut *guard, 111).await {
242                    out.push(*t.node_idx());
243                } else {
244                    break;
245                }
246            }
247            out
248        });
249
250        let rxb = rx_arc.clone();
251        let fetcher_b = tokio::spawn(async move {
252            let mut out = Vec::new();
253            loop {
254                let mut guard = rxb.lock().await;
255                if let Some(t) = fetch_next_task(&mut *guard, 222).await {
256                    out.push(*t.node_idx());
257                } else {
258                    break;
259                }
260            }
261            out
262        });
263
264        // Wait for producers to finish => then drop the main tx => that ensures channel yields None
265        let _ = tokio::join!(p1, p2);
266        drop(tx); // channel is closed => subsequent fetch_next_task => None
267
268        let (a_res, b_res) = tokio::join!(fetcher_a, fetcher_b);
269
270        let list_a = a_res.expect("FetcherA panicked");
271        let list_b = b_res.expect("FetcherB panicked");
272        let total_count = list_a.len() + list_b.len();
273
274        // total tasks => 10 (5 from p1, 5 from p2)
275        assert_eq!(total_count, 10);
276
277        // we can also check there's no duplication => combine + sort + unique
278        let mut combined = Vec::new();
279        combined.extend(list_a);
280        combined.extend(list_b);
281        combined.sort_unstable();
282        combined.dedup();
283        assert_eq!(combined, vec![0,1,2,3,4,5,6,7,8,9]);
284    }
285}