1use crate::config::{ComponentConfig, Node};
6use crate::config::{CuConfig, CuGraph, NodeId};
7use crate::copperlist::{CopperList, CopperListState, CuListsManager};
8use crate::monitoring::CuMonitor;
9use cu29_clock::{ClockProvider, RobotClock};
10use cu29_log_runtime::LoggerRuntime;
11use cu29_traits::CopperListTuple;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_unifiedlog::UnifiedLoggerWrite;
15use std::sync::{Arc, Mutex};
16
17use petgraph::prelude::*;
18use petgraph::visit::VisitMap;
19use petgraph::visit::Visitable;
20use std::fmt::Debug;
21
22pub struct CopperContext {
24 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
25 pub logger_runtime: LoggerRuntime,
26 pub clock: RobotClock,
27}
28
29pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
33 pub tasks: CT,
35
36 pub monitor: M,
37
38 pub copper_lists_manager: CuListsManager<P, NBCL>,
40
41 pub clock: RobotClock, logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
46}
47
48impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
50 for CuRuntime<CT, P, M, NBCL>
51{
52 fn get_clock(&self) -> RobotClock {
53 self.clock.clone()
54 }
55}
56
57impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
58 pub fn new(
59 clock: RobotClock,
60 config: &CuConfig,
61 mission: Option<&str>,
62 tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
63 monitor_instanciator: impl Fn(&CuConfig) -> M,
64 logger: impl WriteStream<CopperList<P>> + 'static,
65 ) -> CuResult<Self> {
66 let graph = config.get_graph(mission)?;
67 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
68 .get_all_nodes()
69 .iter()
70 .map(|(_, node)| node.get_instance_config())
71 .collect();
72 let tasks = tasks_instanciator(all_instances_configs)?;
73
74 let monitor = monitor_instanciator(config);
75
76 let logger_: Option<Box<dyn WriteStream<CopperList<P>>>> =
78 if let Some(logging_config) = &config.logging {
79 if logging_config.enable_task_logging {
80 Some(Box::new(logger))
81 } else {
82 None
83 }
84 } else {
85 Some(Box::new(logger))
86 };
87
88 let runtime = Self {
89 tasks,
90 monitor,
91 copper_lists_manager: CuListsManager::new(), clock,
93 logger: logger_,
94 };
95
96 Ok(runtime)
97 }
98
99 pub fn available_copper_lists(&self) -> usize {
100 NBCL - self.copper_lists_manager.len()
101 }
102
103 pub fn end_of_processing(&mut self, culistid: u32) {
104 let mut is_top = true;
105 let mut nb_done = 0;
106 self.copper_lists_manager.iter_mut().for_each(|cl| {
107 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
108 cl.change_state(CopperListState::DoneProcessing);
109 }
110 if is_top && cl.get_state() == CopperListState::DoneProcessing {
113 if let Some(logger) = &mut self.logger {
114 cl.change_state(CopperListState::BeingSerialized);
115 logger.log(cl).unwrap();
116 }
117 cl.change_state(CopperListState::Free);
118 nb_done += 1;
119 } else {
120 is_top = false;
121 }
122 });
123 for _ in 0..nb_done {
124 let _ = self.copper_lists_manager.pop();
125 }
126 }
127}
128
129#[derive(Debug, PartialEq, Eq, Clone, Copy)]
134pub enum CuTaskType {
135 Source,
136 Regular,
137 Sink,
138}
139
140pub struct CuExecutionStep {
142 pub node_id: NodeId,
144 pub node: Node,
146 pub task_type: CuTaskType,
148
149 pub input_msg_indices_types: Vec<(u32, String)>,
151
152 pub output_msg_index_type: Option<(u32, String)>,
154}
155
156impl Debug for CuExecutionStep {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
159 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
160 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
161 f.write_str(
162 format!(
163 " input_msg_types: {:?}\n",
164 self.input_msg_indices_types
165 )
166 .as_str(),
167 )?;
168 f.write_str(
169 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
170 )?;
171 Ok(())
172 }
173}
174
175pub struct CuExecutionLoop {
180 pub steps: Vec<CuExecutionUnit>,
181 pub loop_count: Option<u32>,
182}
183
184impl Debug for CuExecutionLoop {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 f.write_str("CuExecutionLoop:\n")?;
187 for step in &self.steps {
188 match step {
189 CuExecutionUnit::Step(step) => {
190 step.fmt(f)?;
191 }
192 CuExecutionUnit::Loop(l) => {
193 l.fmt(f)?;
194 }
195 }
196 }
197
198 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
199 Ok(())
200 }
201}
202
203#[derive(Debug)]
205pub enum CuExecutionUnit {
206 Step(CuExecutionStep),
207 Loop(CuExecutionLoop),
208}
209
210fn find_output_index_type_from_nodeid(
211 node_id: NodeId,
212 steps: &Vec<CuExecutionUnit>,
213) -> Option<(u32, String)> {
214 for step in steps {
215 match step {
216 CuExecutionUnit::Loop(loop_unit) => {
217 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
218 return Some(index);
219 }
220 }
221 CuExecutionUnit::Step(step) => {
222 if step.node_id == node_id {
223 return step.output_msg_index_type.clone();
224 }
225 }
226 }
227 }
228 None
229}
230
231pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
232 if graph.0.neighbors_directed(node_id.into(), Incoming).count() == 0 {
233 CuTaskType::Source
234 } else if graph.0.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
235 CuTaskType::Sink
236 } else {
237 CuTaskType::Regular
238 }
239}
240
241fn find_edge_with_plan_input_id(
244 plan: &[CuExecutionUnit],
245 graph: &CuGraph,
246 plan_id: u32,
247 output_node_id: NodeId,
248) -> usize {
249 let input_node = plan
250 .get(plan_id as usize)
251 .expect("Input step should've been added to plan before the step that receives the input");
252 let CuExecutionUnit::Step(input_step) = input_node else {
253 panic!("Expected input to be from a step, not a loop");
254 };
255 let input_node_id = input_step.node_id;
256
257 graph
258 .0
259 .edges_connecting(input_node_id.into(), output_node_id.into())
260 .map(|edge| edge.id().index())
261 .next()
262 .expect("An edge connecting the input to the output should exist")
263}
264
265fn sort_inputs_by_cnx_id(
268 input_msg_indices_types: &mut [(u32, String)],
269 plan: &[CuExecutionUnit],
270 graph: &CuGraph,
271 curr_node_id: NodeId,
272) {
273 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
274 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
275 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
276 a_edge_id.cmp(&b_edge_id)
277 });
278}
279fn plan_tasks_tree_branch(
281 graph: &CuGraph,
282 mut next_culist_output_index: u32,
283 starting_point: NodeId,
284 plan: &mut Vec<CuExecutionUnit>,
285) -> (u32, bool) {
286 #[cfg(feature = "macro_debug")]
287 eprintln!("-- starting branch from node {starting_point}");
288
289 let mut visitor = Bfs::new(&graph.0, starting_point.into());
290 let mut handled = false;
291
292 while let Some(node) = visitor.next(&graph.0) {
293 let id = node.index() as NodeId;
294 let node_ref = graph.get_node(id).unwrap();
295 #[cfg(feature = "macro_debug")]
296 eprintln!(" Visiting node: {node_ref:?}");
297
298 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
299 let output_msg_index_type: Option<(u32, String)>;
300 let task_type = find_task_type_for_id(graph, id);
301
302 match task_type {
303 CuTaskType::Source => {
304 #[cfg(feature = "macro_debug")]
305 eprintln!(" → Source node, assign output index {next_culist_output_index}");
306 output_msg_index_type = Some((
307 next_culist_output_index,
308 graph
309 .0
310 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
311 .unwrap() .msg
313 .clone(),
314 ));
315 next_culist_output_index += 1;
316 }
317 CuTaskType::Sink => {
318 let parents: Vec<NodeIndex> =
319 graph.0.neighbors_directed(id.into(), Incoming).collect();
320 #[cfg(feature = "macro_debug")]
321 eprintln!(" → Sink with parents: {parents:?}");
322 for parent in &parents {
323 let pid = parent.index() as NodeId;
324 let index_type = find_output_index_type_from_nodeid(pid, plan);
325 if let Some(index_type) = index_type {
326 #[cfg(feature = "macro_debug")]
327 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
328 input_msg_indices_types.push(index_type);
329 } else {
330 #[cfg(feature = "macro_debug")]
331 eprintln!(" ✗ Input from {pid} not ready, returning");
332 return (next_culist_output_index, handled);
333 }
334 }
335 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
336 next_culist_output_index += 1;
337 }
338 CuTaskType::Regular => {
339 let parents: Vec<NodeIndex> =
340 graph.0.neighbors_directed(id.into(), Incoming).collect();
341 #[cfg(feature = "macro_debug")]
342 eprintln!(" → Regular task with parents: {parents:?}");
343 for parent in &parents {
344 let pid = parent.index() as NodeId;
345 let index_type = find_output_index_type_from_nodeid(pid, plan);
346 if let Some(index_type) = index_type {
347 #[cfg(feature = "macro_debug")]
348 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
349 input_msg_indices_types.push(index_type);
350 } else {
351 #[cfg(feature = "macro_debug")]
352 eprintln!(" ✗ Input from {pid} not ready, returning");
353 return (next_culist_output_index, handled);
354 }
355 }
356 output_msg_index_type = Some((
357 next_culist_output_index,
358 graph
359 .0
360 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
362 .msg
363 .clone(),
364 ));
365 next_culist_output_index += 1;
366 }
367 }
368
369 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
370
371 if let Some(pos) = plan
372 .iter()
373 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
374 {
375 #[cfg(feature = "macro_debug")]
376 eprintln!(" → Already in plan, modifying existing step");
377 let mut step = plan.remove(pos);
378 if let CuExecutionUnit::Step(ref mut s) = step {
379 s.input_msg_indices_types = input_msg_indices_types;
380 }
381 plan.push(step);
382 } else {
383 #[cfg(feature = "macro_debug")]
384 eprintln!(" → New step added to plan");
385 let step = CuExecutionStep {
386 node_id: id,
387 node: node_ref.clone(),
388 task_type,
389 input_msg_indices_types,
390 output_msg_index_type,
391 };
392 plan.push(CuExecutionUnit::Step(step));
393 }
394
395 handled = true;
396 }
397
398 #[cfg(feature = "macro_debug")]
399 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
400 (next_culist_output_index, handled)
401}
402
403pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
406 #[cfg(feature = "macro_debug")]
407 eprintln!("[runtime plan]");
408 let visited = graph.0.visit_map();
409 let mut plan = Vec::new();
410 let mut next_culist_output_index = 0u32;
411
412 let mut queue: std::collections::VecDeque<NodeId> = graph
413 .node_indices()
414 .iter()
415 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
416 .map(|node| node.index() as NodeId)
417 .collect();
418
419 #[cfg(feature = "macro_debug")]
420 eprintln!("Initial source nodes: {queue:?}");
421
422 while let Some(start_node) = queue.pop_front() {
423 if visited.is_visited(&start_node) {
424 #[cfg(feature = "macro_debug")]
425 eprintln!("→ Skipping already visited source {start_node}");
426 continue;
427 }
428
429 #[cfg(feature = "macro_debug")]
430 eprintln!("→ Starting BFS from source {start_node}");
431 let mut bfs = Bfs::new(&graph.0, start_node.into());
432
433 while let Some(node_index) = bfs.next(&graph.0) {
434 let node_id = node_index.index() as NodeId;
435 let already_in_plan = plan
436 .iter()
437 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
438 if already_in_plan {
439 #[cfg(feature = "macro_debug")]
440 eprintln!(" → Node {node_id} already planned, skipping");
441 continue;
442 }
443
444 #[cfg(feature = "macro_debug")]
445 eprintln!(" Planning from node {node_id}");
446 let (new_index, handled) =
447 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
448 next_culist_output_index = new_index;
449
450 if !handled {
451 #[cfg(feature = "macro_debug")]
452 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
453 continue;
454 }
455
456 #[cfg(feature = "macro_debug")]
457 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
458 for neighbor in graph.0.neighbors(node_index) {
459 if !visited.is_visited(&neighbor) {
460 let nid = neighbor.index() as NodeId;
461 #[cfg(feature = "macro_debug")]
462 eprintln!(" → Enqueueing neighbor {nid}");
463 queue.push_back(nid);
464 }
465 }
466 }
467 }
468
469 Ok(CuExecutionLoop {
470 steps: plan,
471 loop_count: None,
472 })
473}
474
475#[cfg(test)]
477mod tests {
478 use super::*;
479 use crate::config::Node;
480 use crate::cutask::CuSinkTask;
481 use crate::cutask::{CuSrcTask, Freezable};
482 use crate::monitoring::NoMonitor;
483 use bincode::Encode;
484
485 pub struct TestSource {}
486
487 impl Freezable for TestSource {}
488
489 impl CuSrcTask<'_> for TestSource {
490 type Output = ();
491 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
492 where
493 Self: Sized,
494 {
495 Ok(Self {})
496 }
497
498 fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
499 Ok(())
500 }
501 }
502
503 pub struct TestSink {}
504
505 impl Freezable for TestSink {}
506
507 impl CuSinkTask<'_> for TestSink {
508 type Input = ();
509
510 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
511 where
512 Self: Sized,
513 {
514 Ok(Self {})
515 }
516
517 fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
518 Ok(())
519 }
520 }
521
522 type Tasks = (TestSource, TestSink);
524 type Msgs = ((),);
525
526 fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
527 Ok((
528 TestSource::new(all_instances_configs[0])?,
529 TestSink::new(all_instances_configs[1])?,
530 ))
531 }
532
533 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
534 NoMonitor {}
535 }
536
537 #[derive(Debug)]
538 struct FakeWriter {}
539
540 impl<E: Encode> WriteStream<E> for FakeWriter {
541 fn log(&mut self, _obj: &E) -> CuResult<()> {
542 Ok(())
543 }
544 }
545
546 #[test]
547 fn test_runtime_instantiation() {
548 let mut config = CuConfig::default();
549 let graph = config.get_graph_mut(None).unwrap();
550 graph.add_node(Node::new("a", "TestSource")).unwrap();
551 graph.add_node(Node::new("b", "TestSink")).unwrap();
552 graph.connect(0, 1, "()").unwrap();
553 let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
554 RobotClock::default(),
555 &config,
556 None,
557 tasks_instanciator,
558 monitor_instanciator,
559 FakeWriter {},
560 );
561 assert!(runtime.is_ok());
562 }
563
564 #[test]
565 fn test_copperlists_manager_lifecycle() {
566 let mut config = CuConfig::default();
567 let graph = config.get_graph_mut(None).unwrap();
568 graph.add_node(Node::new("a", "TestSource")).unwrap();
569 graph.add_node(Node::new("b", "TestSink")).unwrap();
570 graph.connect(0, 1, "()").unwrap();
571 let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
572 RobotClock::default(),
573 &config,
574 None,
575 tasks_instanciator,
576 monitor_instanciator,
577 FakeWriter {},
578 )
579 .unwrap();
580
581 {
583 let copperlists = &mut runtime.copper_lists_manager;
584 let culist0 = copperlists
585 .create()
586 .expect("Ran out of space for copper lists");
587 let id = culist0.id;
589 assert_eq!(id, 0);
590 culist0.change_state(CopperListState::Processing);
591 assert_eq!(runtime.available_copper_lists(), 1);
592 }
593
594 {
595 let copperlists = &mut runtime.copper_lists_manager;
596 let culist1 = copperlists
597 .create()
598 .expect("Ran out of space for copper lists"); let id = culist1.id;
600 assert_eq!(id, 1);
601 culist1.change_state(CopperListState::Processing);
602 assert_eq!(runtime.available_copper_lists(), 0);
603 }
604
605 {
606 let copperlists = &mut runtime.copper_lists_manager;
607 let culist2 = copperlists.create();
608 assert!(culist2.is_none());
609 assert_eq!(runtime.available_copper_lists(), 0);
610 }
611
612 runtime.end_of_processing(1);
614 assert_eq!(runtime.available_copper_lists(), 1);
615
616 {
618 let copperlists = &mut runtime.copper_lists_manager;
619 let culist2 = copperlists
620 .create()
621 .expect("Ran out of space for copper lists"); let id = culist2.id;
623 assert_eq!(id, 2);
624 culist2.change_state(CopperListState::Processing);
625 assert_eq!(runtime.available_copper_lists(), 0);
626 }
627
628 runtime.end_of_processing(0);
630 assert_eq!(runtime.available_copper_lists(), 0);
632
633 runtime.end_of_processing(2);
635 assert_eq!(runtime.available_copper_lists(), 2);
638 }
639
640 #[test]
641 fn test_runtime_task_input_order() {
642 let mut config = CuConfig::default();
643 let graph = config.get_graph_mut(None).unwrap();
644 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
645 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
646 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
647
648 assert_eq!(src1_id, 0);
649 assert_eq!(src2_id, 1);
650
651 let src1_type = "src1_type";
653 let src2_type = "src2_type";
654 graph.connect(src2_id, sink_id, src2_type).unwrap();
655 graph.connect(src1_id, sink_id, src1_type).unwrap();
656
657 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
658 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
659 assert_eq!(src1_edge_id, 1);
662 assert_eq!(src2_edge_id, 0);
663
664 let runtime = compute_runtime_plan(graph).unwrap();
665 let sink_step = runtime
666 .steps
667 .iter()
668 .find_map(|step| match step {
669 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
670 _ => None,
671 })
672 .unwrap();
673
674 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
677 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
678 }
679
680 #[test]
681 fn test_runtime_plan_diamond_case1() {
682 let mut config = CuConfig::default();
684 let graph = config.get_graph_mut(None).unwrap();
685 let cam0_id = graph
686 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
687 .unwrap();
688 let inf0_id = graph
689 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
690 .unwrap();
691 let broadcast_id = graph
692 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
693 .unwrap();
694
695 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
697 graph.connect(cam0_id, inf0_id, "i32").unwrap();
698 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
699
700 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
701 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
702
703 assert_eq!(edge_cam0_to_inf0, 0);
704 assert_eq!(edge_cam0_to_broadcast, 1);
705
706 let runtime = compute_runtime_plan(graph).unwrap();
707 let broadcast_step = runtime
708 .steps
709 .iter()
710 .find_map(|step| match step {
711 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
712 _ => None,
713 })
714 .unwrap();
715
716 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
717 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
718 }
719
720 #[test]
721 fn test_runtime_plan_diamond_case2() {
722 let mut config = CuConfig::default();
724 let graph = config.get_graph_mut(None).unwrap();
725 let cam0_id = graph
726 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
727 .unwrap();
728 let inf0_id = graph
729 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
730 .unwrap();
731 let broadcast_id = graph
732 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
733 .unwrap();
734
735 graph.connect(cam0_id, inf0_id, "i32").unwrap();
737 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
738 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
739
740 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
741 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
742
743 assert_eq!(edge_cam0_to_broadcast, 0);
744 assert_eq!(edge_cam0_to_inf0, 1);
745
746 let runtime = compute_runtime_plan(graph).unwrap();
747 let broadcast_step = runtime
748 .steps
749 .iter()
750 .find_map(|step| match step {
751 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
752 _ => None,
753 })
754 .unwrap();
755
756 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
757 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
758 }
759}