hydro2_async_scheduler/
process_immediate.rs1crate::ix!();
3
4#[derive(Debug, PartialEq, Eq)]
5pub enum ReadyNodeReceivedOperationOutcome {
6 ChannelClosed,
8 NodeSubmitted,
10 NoOp,
12 }
14
15#[derive(Debug, PartialEq, Eq)]
16pub enum FreedChildReceivedOperationOutcome {
17 ChannelClosed,
19 ChildRequeued,
21 NoOp,
23 }
25
26pub async fn process_immediate<'threads, T>(
27 network: Arc<AsyncMutex<Network<T>>>,
28 concurrency_limit: Arc<Semaphore>,
29 worker_pool: &WorkerPool<'threads, T>,
30 mut ready_nodes_rx: Receiver<usize>,
31 mut child_nodes_rx: Receiver<usize>,
32 completed_nodes: SharedCompletedNodes,
33 shared_in_degs: Arc<AsyncMutex<Vec<usize>>>,
34 total_node_count: usize,
35 output_tx: Option<StreamingOutputSender<T>>,
36 checkpoint_cb: Option<Arc<dyn CheckpointCallback>>,
37 child_nodes_tx: Sender<usize>,
38 ready_nodes_tx: Sender<usize>,
39) -> Result<(), NetworkError>
40where
41 T: Debug + Send + Sync + 'threads,
42{
43 eprintln!("process_immediate => starting => total_node_count={}", total_node_count);
44
45 let mut in_flight = InFlightCounter::default();
46
47 loop {
48 if check_all_nodes_done(&completed_nodes, total_node_count).await {
50 eprintln!("process_immediate => all_nodes_done => break");
51 break;
52 }
53
54 if in_flight.get() == 0 && ready_nodes_rx.is_closed() {
58 if child_nodes_rx.is_closed() {
59 eprintln!("process_immediate => in_flight=0, both channels closed => break");
60 break;
61 }
62 eprintln!("process_immediate => Freed channel open but no tasks in flight => break");
64 break;
65 }
66
67 eprintln!(
68 "process_immediate => top_of_loop => in_flight={}, checking select!",
69 in_flight.get()
70 );
71
72 tokio::select! {
74
75 maybe_idx = ready_nodes_rx.recv() => {
77 let outcome = process_immediate_ready_node_received(
78 maybe_idx,
79 concurrency_limit.clone(),
80 &network,
81 &shared_in_degs,
82 &output_tx,
83 &checkpoint_cb,
84 &child_nodes_tx,
85 &ready_nodes_tx,
86 &completed_nodes,
87 worker_pool,
88 &mut in_flight, ).await?;
90
91 match outcome {
92 ReadyNodeReceivedOperationOutcome::ChannelClosed => {
93 eprintln!("process_immediate => ready_nodes_rx closed => maybe break");
96 },
99 ReadyNodeReceivedOperationOutcome::NodeSubmitted => {
100 eprintln!("process_immediate => NodeSubmitted => continuing loop");
102 },
103 ReadyNodeReceivedOperationOutcome::NoOp => {
104 eprintln!("process_immediate => no-op => continuing loop");
106 },
107 }
108 },
109
110 maybe_child = child_nodes_rx.recv() => {
112 let outcome = process_immediate_freed_child_received(
113 maybe_child,
114 &child_nodes_tx,
115 worker_pool,
116 &completed_nodes,
117 &mut in_flight
118 ).await?;
119
120 match outcome {
121 FreedChildReceivedOperationOutcome::ChannelClosed => {
122 eprintln!("process_immediate => Freed channel closed => maybe break soon");
124 },
125 FreedChildReceivedOperationOutcome::ChildRequeued => {
126 eprintln!("process_immediate => Freed child => continuing loop");
128 },
129 FreedChildReceivedOperationOutcome::NoOp => {
130 eprintln!("process_immediate => Freed child => no-op => continuing loop");
131 },
132 }
133 },
134
135 _ = poll_worker_results(worker_pool, &completed_nodes, &mut in_flight) => {
137 let snapshot = completed_nodes.as_slice().await;
138 eprintln!("process_immediate => polled results => completed={:?}", snapshot);
139 }
140 }
141 }
142
143 eprintln!("process_immediate => leftover drain => drain_leftover_results(...)");
144 drain_leftover_results(worker_pool).await?;
145 eprintln!("process_immediate => done => returning Ok");
146
147 Ok(())
148}