hydro2_async_scheduler/
process_task.rs1crate::ix!();
3
4pub async fn process_task<'threads, T>(
8 task: &mut TaskItem<'threads, T>,
9 worker_id: usize,
10) -> (Vec<usize>, Option<NetworkError>)
11where
12 T: Debug + Send + Sync + 'threads,
13{
14 let node_start = Instant::now();
15 let node_idx = *task.node_idx();
16 eprintln!(
17 "worker #{worker_id} => process_task => starting node_idx={} at t={:?} (ms resolution)",
18 node_idx,
19 node_start.elapsed().as_millis(),
20 );
21
22 let (freed_children, error) = {
23 let mut net_guard = task.network().lock().await;
25 let node_count = net_guard.nodes().len();
26
27 if node_idx >= node_count {
28 eprintln!(
29 "worker #{worker_id} => process_task => node_idx={} is out-of-bounds (network has {} nodes)",
30 node_idx,
31 node_count
32 );
33 (Vec::new(), Some(NetworkError::InvalidNode { node_idx }))
34 } else {
35 execute_node(
37 &mut net_guard,
38 node_idx,
39 &task.output_tx(),
40 worker_id,
41 node_start,
42 task.shared_in_degs(),
43 ).await
44 }
45 };
46
47 eprintln!(
48 "worker #{worker_id} => process_task => done node_idx={} => freed_children={:?}, error={:?}",
49 node_idx,
50 freed_children,
51 error
52 );
53
54 (freed_children, error)
55}
56
57#[cfg(test)]
58mod process_task_tests {
59 use super::*;
60
61 #[traced_test]
62 async fn test_process_task_out_of_bounds() {
63 let mut t = mock_minimal_task_item_with_permit(5);
65 {
66 let mut net_guard = t.network().lock().await;
67 }
69
70 let (freed, err) = process_task(&mut t, 123).await;
71 assert!(freed.is_empty());
72 match err {
73 Some(NetworkError::InvalidNode { node_idx }) => {
74 assert_eq!(node_idx, 5)
75 }
76 _ => panic!("Expected InvalidNode error"),
77 }
78 }
79
80 #[traced_test]
81 async fn test_process_task_ok() {
82 let mut t = mock_minimal_task_item_with_permit(0);
84 {
85 let mut net_guard = t.network().lock().await;
87 net_guard.nodes_mut().push(
88 node![0 => NoOpOperator::default()]
89 );
90 }
91
92 let (freed, err) = process_task(&mut t, 0).await;
93 assert!(err.is_none());
94 }
96}