hydro2_async_scheduler/
process_task.rs

1// ---------------- [ File: src/process_task.rs ]
2crate::ix!();
3
4/// Processes the `task` by locking the network, executing the node operator,
5/// and computing Freed children. More detailed logs to help ensure we see
6/// exact timing and concurrency behavior.
7pub async fn process_task<'threads, T>(
8    task: &mut TaskItem<'threads, T>,
9    worker_id: usize,
10) -> (Vec<usize>, Option<NetworkError>)
11where
12    T: Debug + Send + Sync + 'threads,
13{
14    let node_start = Instant::now();
15    let node_idx   = *task.node_idx();
16    eprintln!(
17        "worker #{worker_id} => process_task => starting node_idx={} at t={:?} (ms resolution)",
18        node_idx,
19        node_start.elapsed().as_millis(),
20    );
21
22    let (freed_children, error) = {
23        // Lock the network to ensure consistent node access
24        let mut net_guard = task.network().lock().await;
25        let node_count    = net_guard.nodes().len();
26
27        if node_idx >= node_count {
28            eprintln!(
29                "worker #{worker_id} => process_task => node_idx={} is out-of-bounds (network has {} nodes)",
30                node_idx,
31                node_count
32            );
33            (Vec::new(), Some(NetworkError::InvalidNode { node_idx }))
34        } else {
35            // Actually execute
36            execute_node(
37                &mut net_guard,
38                node_idx,
39                &task.output_tx(),
40                worker_id,
41                node_start,
42                task.shared_in_degs(),
43            ).await
44        }
45    };
46
47    eprintln!(
48        "worker #{worker_id} => process_task => done node_idx={} => freed_children={:?}, error={:?}",
49        node_idx,
50        freed_children,
51        error
52    );
53
54    (freed_children, error)
55}
56
57#[cfg(test)]
58mod process_task_tests {
59    use super::*;
60
61    #[traced_test]
62    async fn test_process_task_out_of_bounds() {
63        // Suppose the network is empty => no nodes
64        let mut t = mock_minimal_task_item_with_permit(5);
65        {
66            let mut net_guard = t.network().lock().await;
67            // net_guard has 0 nodes => out of bounds
68        }
69
70        let (freed, err) = process_task(&mut t, 123).await;
71        assert!(freed.is_empty());
72        match err {
73            Some(NetworkError::InvalidNode { node_idx }) => {
74                assert_eq!(node_idx, 5)
75            }
76            _ => panic!("Expected InvalidNode error"),
77        }
78    }
79
80    #[traced_test]
81    async fn test_process_task_ok() {
82        // Suppose the network has at least 1 node, node_idx=0 => we expect success
83        let mut t = mock_minimal_task_item_with_permit(0);
84        {
85            // Insert at least 1 node so node_idx=0 is valid
86            let mut net_guard = t.network().lock().await;
87            net_guard.nodes_mut().push(
88                node![0 => NoOpOperator::default()]
89            );
90        }
91
92        let (freed, err) = process_task(&mut t, 0).await;
93        assert!(err.is_none());
94        // Freed children depends on your network edges => verify as needed
95    }
96}