hydro2_async_scheduler/
process_immediate.rs

1// ---------------- [ File: src/process_immediate.rs ]
2crate::ix!();
3
4#[derive(Debug, PartialEq, Eq)]
5pub enum ReadyNodeReceivedOperationOutcome {
6    /// We tried to read from `ready_nodes_rx` but got `None`; channel is closed.
7    ChannelClosed,
8    /// We read a `Some(node_idx)`, and actually submitted it to the worker => in_flight++.
9    NodeSubmitted,
10    /// We read a `Some(node_idx)` but decided not to do anything (rare).
11    NoOp,
12    // Possibly more variants if needed
13}
14
15#[derive(Debug, PartialEq, Eq)]
16pub enum FreedChildReceivedOperationOutcome {
17    /// Freed channel is closed => no more Freed children.
18    ChannelClosed,
19    /// Freed child was re‐queued => in_flight++ if we submitted.
20    ChildRequeued,
21    /// Freed child arrived but we decided not to requeue (rare).
22    NoOp,
23    // Possibly more variants
24}
25
26pub async fn process_immediate<'threads, T>(
27    network:            Arc<AsyncMutex<Network<T>>>,
28    concurrency_limit:  Arc<Semaphore>,
29    worker_pool:        &WorkerPool<'threads, T>,
30    mut ready_nodes_rx: Receiver<usize>,
31    mut child_nodes_rx: Receiver<usize>,
32    completed_nodes:    SharedCompletedNodes,
33    shared_in_degs:     Arc<AsyncMutex<Vec<usize>>>,
34    total_node_count:   usize,
35    output_tx:          Option<StreamingOutputSender<T>>,
36    checkpoint_cb:      Option<Arc<dyn CheckpointCallback>>,
37    child_nodes_tx:     Sender<usize>,
38    ready_nodes_tx:     Sender<usize>,
39) -> Result<(), NetworkError>
40where
41    T: Debug + Send + Sync + 'threads,
42{
43    eprintln!("process_immediate => starting => total_node_count={}", total_node_count);
44
45    let mut in_flight = InFlightCounter::default();
46
47    loop {
48        // (A) If we have completed all nodes, break
49        if check_all_nodes_done(&completed_nodes, total_node_count).await {
50            eprintln!("process_immediate => all_nodes_done => break");
51            break;
52        }
53
54        // (B) If `in_flight == 0` and `ready_nodes_rx` is closed => 
55        //     no more ready nodes. If Freed is also closed (or we can't get Freed),
56        //     we can break to avoid hanging. 
57        if in_flight.get() == 0 && ready_nodes_rx.is_closed() {
58            if child_nodes_rx.is_closed() {
59                eprintln!("process_immediate => in_flight=0, both channels closed => break");
60                break;
61            }
62            // Freed channel is open but no tasks in flight => no Freed can appear => break
63            eprintln!("process_immediate => Freed channel open but no tasks in flight => break");
64            break;
65        }
66
67        eprintln!(
68            "process_immediate => top_of_loop => in_flight={}, checking select!",
69            in_flight.get()
70        );
71
72        // (C) Attempt to read from channels or poll worker results
73        tokio::select! {
74
75            // (C1) Ready nodes
76            maybe_idx = ready_nodes_rx.recv() => {
77                let outcome = process_immediate_ready_node_received(
78                    maybe_idx,
79                    concurrency_limit.clone(),
80                    &network,
81                    &shared_in_degs,
82                    &output_tx,
83                    &checkpoint_cb,
84                    &child_nodes_tx,
85                    &ready_nodes_tx,
86                    &completed_nodes,
87                    worker_pool,
88                    &mut in_flight,  // pass in_flight so we can inc/dec
89                ).await?;
90
91                match outcome {
92                    ReadyNodeReceivedOperationOutcome::ChannelClosed => {
93                        // The channel is closed => we might break or just continue
94                        // Typically you'd do: 
95                        eprintln!("process_immediate => ready_nodes_rx closed => maybe break");
96                        // No immediate break => we rely on the top-of-loop checks 
97                        // to see if Freed is also closed or if in_flight=0
98                    },
99                    ReadyNodeReceivedOperationOutcome::NodeSubmitted => {
100                        // We already did `in_flight.increment()` inside the function
101                        eprintln!("process_immediate => NodeSubmitted => continuing loop");
102                    },
103                    ReadyNodeReceivedOperationOutcome::NoOp => {
104                        // Possibly do nothing
105                        eprintln!("process_immediate => no-op => continuing loop");
106                    },
107                }
108            },
109
110            // (C2) Freed children
111            maybe_child = child_nodes_rx.recv() => {
112                let outcome = process_immediate_freed_child_received(
113                    maybe_child,
114                    &child_nodes_tx,
115                    worker_pool,
116                    &completed_nodes,
117                    &mut in_flight
118                ).await?;
119
120                match outcome {
121                    FreedChildReceivedOperationOutcome::ChannelClosed => {
122                        // Freed channel is closed => no Freed => rely on loop checks
123                        eprintln!("process_immediate => Freed channel closed => maybe break soon");
124                    },
125                    FreedChildReceivedOperationOutcome::ChildRequeued => {
126                        // We presumably did in_flight.increment() if we submitted
127                        eprintln!("process_immediate => Freed child => continuing loop");
128                    },
129                    FreedChildReceivedOperationOutcome::NoOp => {
130                        eprintln!("process_immediate => Freed child => no-op => continuing loop");
131                    },
132                }
133            },
134
135            // (C3) Poll worker results => might decrement in_flight if tasks finished
136            _ = poll_worker_results(worker_pool, &completed_nodes, &mut in_flight) => {
137                let snapshot = completed_nodes.as_slice().await;
138                eprintln!("process_immediate => polled results => completed={:?}", snapshot);
139            }
140        }
141    }
142
143    eprintln!("process_immediate => leftover drain => drain_leftover_results(...)");
144    drain_leftover_results(worker_pool).await?;
145    eprintln!("process_immediate => done => returning Ok");
146
147    Ok(())
148}