cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, Node};
6use crate::config::{CuConfig, CuGraph, NodeId};
7use crate::copperlist::{CopperList, CopperListState, CuListsManager};
8use crate::monitoring::CuMonitor;
9use cu29_clock::{ClockProvider, RobotClock};
10use cu29_log_runtime::LoggerRuntime;
11use cu29_traits::CopperListTuple;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_unifiedlog::UnifiedLoggerWrite;
15use std::sync::{Arc, Mutex};
16
17use petgraph::prelude::*;
18use petgraph::visit::VisitMap;
19use petgraph::visit::Visitable;
20use std::fmt::Debug;
21
22/// Just a simple struct to hold the various bits needed to run a Copper application.
23pub struct CopperContext {
24    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
25    pub logger_runtime: LoggerRuntime,
26    pub clock: RobotClock,
27}
28
29/// This is the main structure that will be injected as a member of the Application struct.
30/// CT is the tuple of all the tasks in order of execution.
31/// CL is the type of the copper list, representing the input/output messages for all the tasks.
32pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
33    /// The tuple of all the tasks in order of execution.
34    pub tasks: CT,
35
36    pub monitor: M,
37
38    /// Copper lists hold in order all the input/output messages for all the tasks.
39    pub copper_lists_manager: CuListsManager<P, NBCL>,
40
41    /// The base clock the runtime will be using to record time.
42    pub clock: RobotClock, // TODO: remove public at some point
43
44    /// Logger
45    logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
46}
47
48/// To be able to share the clock we make the runtime a clock provider.
49impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
50    for CuRuntime<CT, P, M, NBCL>
51{
52    fn get_clock(&self) -> RobotClock {
53        self.clock.clone()
54    }
55}
56
57impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
58    pub fn new(
59        clock: RobotClock,
60        config: &CuConfig,
61        mission: Option<&str>,
62        tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
63        monitor_instanciator: impl Fn(&CuConfig) -> M,
64        logger: impl WriteStream<CopperList<P>> + 'static,
65    ) -> CuResult<Self> {
66        let graph = config.get_graph(mission)?;
67        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
68            .get_all_nodes()
69            .iter()
70            .map(|(_, node)| node.get_instance_config())
71            .collect();
72        let tasks = tasks_instanciator(all_instances_configs)?;
73
74        let monitor = monitor_instanciator(config);
75
76        // Needed to declare type explicitly as `cargo check` was failing without it
77        let logger_: Option<Box<dyn WriteStream<CopperList<P>>>> =
78            if let Some(logging_config) = &config.logging {
79                if logging_config.enable_task_logging {
80                    Some(Box::new(logger))
81                } else {
82                    None
83                }
84            } else {
85                Some(Box::new(logger))
86            };
87
88        let runtime = Self {
89            tasks,
90            monitor,
91            copper_lists_manager: CuListsManager::new(), // placeholder
92            clock,
93            logger: logger_,
94        };
95
96        Ok(runtime)
97    }
98
99    pub fn available_copper_lists(&self) -> usize {
100        NBCL - self.copper_lists_manager.len()
101    }
102
103    pub fn end_of_processing(&mut self, culistid: u32) {
104        let mut is_top = true;
105        let mut nb_done = 0;
106        self.copper_lists_manager.iter_mut().for_each(|cl| {
107            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
108                cl.change_state(CopperListState::DoneProcessing);
109            }
110            // if we have a series of copper lists that are done processing at the top of the circular buffer
111            // serialize them all and Free them.
112            if is_top && cl.get_state() == CopperListState::DoneProcessing {
113                if let Some(logger) = &mut self.logger {
114                    cl.change_state(CopperListState::BeingSerialized);
115                    logger.log(cl).unwrap();
116                }
117                cl.change_state(CopperListState::Free);
118                nb_done += 1;
119            } else {
120                is_top = false;
121            }
122        });
123        for _ in 0..nb_done {
124            let _ = self.copper_lists_manager.pop();
125        }
126    }
127}
128
129/// Copper tasks can be of 3 types:
130/// - Source: only producing output messages (usually used for drivers)
131/// - Regular: processing input messages and producing output messages, more like compute nodes.
132/// - Sink: only consuming input messages (usually used for actuators)
133#[derive(Debug, PartialEq, Eq, Clone, Copy)]
134pub enum CuTaskType {
135    Source,
136    Regular,
137    Sink,
138}
139
140/// This structure represents a step in the execution plan.
141pub struct CuExecutionStep {
142    /// NodeId: node id of the task to execute
143    pub node_id: NodeId,
144    /// Node: node instance
145    pub node: Node,
146    /// CuTaskType: type of the task
147    pub task_type: CuTaskType,
148
149    /// the indices in the copper list of the input messages and their types
150    pub input_msg_indices_types: Vec<(u32, String)>,
151
152    /// the index in the copper list of the output message and its type
153    pub output_msg_index_type: Option<(u32, String)>,
154}
155
156impl Debug for CuExecutionStep {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
159        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
160        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
161        f.write_str(
162            format!(
163                "              input_msg_types: {:?}\n",
164                self.input_msg_indices_types
165            )
166            .as_str(),
167        )?;
168        f.write_str(
169            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
170        )?;
171        Ok(())
172    }
173}
174
175/// This structure represents a loop in the execution plan.
176/// It is used to represent a sequence of Execution units (loop or steps) that are executed
177/// multiple times.
178/// if loop_count is None, the loop is infinite.
179pub struct CuExecutionLoop {
180    pub steps: Vec<CuExecutionUnit>,
181    pub loop_count: Option<u32>,
182}
183
184impl Debug for CuExecutionLoop {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        f.write_str("CuExecutionLoop:\n")?;
187        for step in &self.steps {
188            match step {
189                CuExecutionUnit::Step(step) => {
190                    step.fmt(f)?;
191                }
192                CuExecutionUnit::Loop(l) => {
193                    l.fmt(f)?;
194                }
195            }
196        }
197
198        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
199        Ok(())
200    }
201}
202
203/// This structure represents a step in the execution plan.
204#[derive(Debug)]
205pub enum CuExecutionUnit {
206    Step(CuExecutionStep),
207    Loop(CuExecutionLoop),
208}
209
210fn find_output_index_type_from_nodeid(
211    node_id: NodeId,
212    steps: &Vec<CuExecutionUnit>,
213) -> Option<(u32, String)> {
214    for step in steps {
215        match step {
216            CuExecutionUnit::Loop(loop_unit) => {
217                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
218                    return Some(index);
219                }
220            }
221            CuExecutionUnit::Step(step) => {
222                if step.node_id == node_id {
223                    return step.output_msg_index_type.clone();
224                }
225            }
226        }
227    }
228    None
229}
230
231pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
232    if graph.0.neighbors_directed(node_id.into(), Incoming).count() == 0 {
233        CuTaskType::Source
234    } else if graph.0.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
235        CuTaskType::Sink
236    } else {
237        CuTaskType::Regular
238    }
239}
240
241/// This function gets the input node by using the input step plan id, to get the edge that
242/// connects the input to the output in the config graph
243fn find_edge_with_plan_input_id(
244    plan: &[CuExecutionUnit],
245    graph: &CuGraph,
246    plan_id: u32,
247    output_node_id: NodeId,
248) -> usize {
249    let input_node = plan
250        .get(plan_id as usize)
251        .expect("Input step should've been added to plan before the step that receives the input");
252    let CuExecutionUnit::Step(input_step) = input_node else {
253        panic!("Expected input to be from a step, not a loop");
254    };
255    let input_node_id = input_step.node_id;
256
257    graph
258        .0
259        .edges_connecting(input_node_id.into(), output_node_id.into())
260        .map(|edge| edge.id().index())
261        .next()
262        .expect("An edge connecting the input to the output should exist")
263}
264
265/// The connection id used here is the index of the config graph edge that equates to the wanted
266/// connection
267fn sort_inputs_by_cnx_id(
268    input_msg_indices_types: &mut [(u32, String)],
269    plan: &[CuExecutionUnit],
270    graph: &CuGraph,
271    curr_node_id: NodeId,
272) {
273    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
274        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
275        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
276        a_edge_id.cmp(&b_edge_id)
277    });
278}
279/// Explores a subbranch and build the partial plan out of it.
280fn plan_tasks_tree_branch(
281    graph: &CuGraph,
282    mut next_culist_output_index: u32,
283    starting_point: NodeId,
284    plan: &mut Vec<CuExecutionUnit>,
285) -> (u32, bool) {
286    #[cfg(feature = "macro_debug")]
287    eprintln!("-- starting branch from node {starting_point}");
288
289    let mut visitor = Bfs::new(&graph.0, starting_point.into());
290    let mut handled = false;
291
292    while let Some(node) = visitor.next(&graph.0) {
293        let id = node.index() as NodeId;
294        let node_ref = graph.get_node(id).unwrap();
295        #[cfg(feature = "macro_debug")]
296        eprintln!("  Visiting node: {node_ref:?}");
297
298        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
299        let output_msg_index_type: Option<(u32, String)>;
300        let task_type = find_task_type_for_id(graph, id);
301
302        match task_type {
303            CuTaskType::Source => {
304                #[cfg(feature = "macro_debug")]
305                eprintln!("    → Source node, assign output index {next_culist_output_index}");
306                output_msg_index_type = Some((
307                    next_culist_output_index,
308                    graph
309                        .0
310                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
311                        .unwrap() // FIXME(gbin): Error handling
312                        .msg
313                        .clone(),
314                ));
315                next_culist_output_index += 1;
316            }
317            CuTaskType::Sink => {
318                let parents: Vec<NodeIndex> =
319                    graph.0.neighbors_directed(id.into(), Incoming).collect();
320                #[cfg(feature = "macro_debug")]
321                eprintln!("    → Sink with parents: {parents:?}");
322                for parent in &parents {
323                    let pid = parent.index() as NodeId;
324                    let index_type = find_output_index_type_from_nodeid(pid, plan);
325                    if let Some(index_type) = index_type {
326                        #[cfg(feature = "macro_debug")]
327                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
328                        input_msg_indices_types.push(index_type);
329                    } else {
330                        #[cfg(feature = "macro_debug")]
331                        eprintln!("      ✗ Input from {pid} not ready, returning");
332                        return (next_culist_output_index, handled);
333                    }
334                }
335                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
336                next_culist_output_index += 1;
337            }
338            CuTaskType::Regular => {
339                let parents: Vec<NodeIndex> =
340                    graph.0.neighbors_directed(id.into(), Incoming).collect();
341                #[cfg(feature = "macro_debug")]
342                eprintln!("    → Regular task with parents: {parents:?}");
343                for parent in &parents {
344                    let pid = parent.index() as NodeId;
345                    let index_type = find_output_index_type_from_nodeid(pid, plan);
346                    if let Some(index_type) = index_type {
347                        #[cfg(feature = "macro_debug")]
348                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
349                        input_msg_indices_types.push(index_type);
350                    } else {
351                        #[cfg(feature = "macro_debug")]
352                        eprintln!("      ✗ Input from {pid} not ready, returning");
353                        return (next_culist_output_index, handled);
354                    }
355                }
356                output_msg_index_type = Some((
357                    next_culist_output_index,
358                    graph
359                        .0
360                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
361                        .unwrap()
362                        .msg
363                        .clone(),
364                ));
365                next_culist_output_index += 1;
366            }
367        }
368
369        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
370
371        if let Some(pos) = plan
372            .iter()
373            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
374        {
375            #[cfg(feature = "macro_debug")]
376            eprintln!("    → Already in plan, modifying existing step");
377            let mut step = plan.remove(pos);
378            if let CuExecutionUnit::Step(ref mut s) = step {
379                s.input_msg_indices_types = input_msg_indices_types;
380            }
381            plan.push(step);
382        } else {
383            #[cfg(feature = "macro_debug")]
384            eprintln!("    → New step added to plan");
385            let step = CuExecutionStep {
386                node_id: id,
387                node: node_ref.clone(),
388                task_type,
389                input_msg_indices_types,
390                output_msg_index_type,
391            };
392            plan.push(CuExecutionUnit::Step(step));
393        }
394
395        handled = true;
396    }
397
398    #[cfg(feature = "macro_debug")]
399    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
400    (next_culist_output_index, handled)
401}
402
403/// This is the main heuristics to compute an execution plan at compilation time.
404/// TODO(gbin): Make that heuristic pluggable.
405pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
406    #[cfg(feature = "macro_debug")]
407    eprintln!("[runtime plan]");
408    let visited = graph.0.visit_map();
409    let mut plan = Vec::new();
410    let mut next_culist_output_index = 0u32;
411
412    let mut queue: std::collections::VecDeque<NodeId> = graph
413        .node_indices()
414        .iter()
415        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
416        .map(|node| node.index() as NodeId)
417        .collect();
418
419    #[cfg(feature = "macro_debug")]
420    eprintln!("Initial source nodes: {queue:?}");
421
422    while let Some(start_node) = queue.pop_front() {
423        if visited.is_visited(&start_node) {
424            #[cfg(feature = "macro_debug")]
425            eprintln!("→ Skipping already visited source {start_node}");
426            continue;
427        }
428
429        #[cfg(feature = "macro_debug")]
430        eprintln!("→ Starting BFS from source {start_node}");
431        let mut bfs = Bfs::new(&graph.0, start_node.into());
432
433        while let Some(node_index) = bfs.next(&graph.0) {
434            let node_id = node_index.index() as NodeId;
435            let already_in_plan = plan
436                .iter()
437                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
438            if already_in_plan {
439                #[cfg(feature = "macro_debug")]
440                eprintln!("    → Node {node_id} already planned, skipping");
441                continue;
442            }
443
444            #[cfg(feature = "macro_debug")]
445            eprintln!("    Planning from node {node_id}");
446            let (new_index, handled) =
447                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
448            next_culist_output_index = new_index;
449
450            if !handled {
451                #[cfg(feature = "macro_debug")]
452                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
453                continue;
454            }
455
456            #[cfg(feature = "macro_debug")]
457            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
458            for neighbor in graph.0.neighbors(node_index) {
459                if !visited.is_visited(&neighbor) {
460                    let nid = neighbor.index() as NodeId;
461                    #[cfg(feature = "macro_debug")]
462                    eprintln!("      → Enqueueing neighbor {nid}");
463                    queue.push_back(nid);
464                }
465            }
466        }
467    }
468
469    Ok(CuExecutionLoop {
470        steps: plan,
471        loop_count: None,
472    })
473}
474
475//tests
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use crate::config::Node;
480    use crate::cutask::CuSinkTask;
481    use crate::cutask::{CuSrcTask, Freezable};
482    use crate::monitoring::NoMonitor;
483    use bincode::Encode;
484
485    pub struct TestSource {}
486
487    impl Freezable for TestSource {}
488
489    impl CuSrcTask<'_> for TestSource {
490        type Output = ();
491        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
492        where
493            Self: Sized,
494        {
495            Ok(Self {})
496        }
497
498        fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
499            Ok(())
500        }
501    }
502
503    pub struct TestSink {}
504
505    impl Freezable for TestSink {}
506
507    impl CuSinkTask<'_> for TestSink {
508        type Input = ();
509
510        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
511        where
512            Self: Sized,
513        {
514            Ok(Self {})
515        }
516
517        fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
518            Ok(())
519        }
520    }
521
522    // Those should be generated by the derive macro
523    type Tasks = (TestSource, TestSink);
524    type Msgs = ((),);
525
526    fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
527        Ok((
528            TestSource::new(all_instances_configs[0])?,
529            TestSink::new(all_instances_configs[1])?,
530        ))
531    }
532
533    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
534        NoMonitor {}
535    }
536
537    #[derive(Debug)]
538    struct FakeWriter {}
539
540    impl<E: Encode> WriteStream<E> for FakeWriter {
541        fn log(&mut self, _obj: &E) -> CuResult<()> {
542            Ok(())
543        }
544    }
545
546    #[test]
547    fn test_runtime_instantiation() {
548        let mut config = CuConfig::default();
549        let graph = config.get_graph_mut(None).unwrap();
550        graph.add_node(Node::new("a", "TestSource")).unwrap();
551        graph.add_node(Node::new("b", "TestSink")).unwrap();
552        graph.connect(0, 1, "()").unwrap();
553        let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
554            RobotClock::default(),
555            &config,
556            None,
557            tasks_instanciator,
558            monitor_instanciator,
559            FakeWriter {},
560        );
561        assert!(runtime.is_ok());
562    }
563
564    #[test]
565    fn test_copperlists_manager_lifecycle() {
566        let mut config = CuConfig::default();
567        let graph = config.get_graph_mut(None).unwrap();
568        graph.add_node(Node::new("a", "TestSource")).unwrap();
569        graph.add_node(Node::new("b", "TestSink")).unwrap();
570        graph.connect(0, 1, "()").unwrap();
571        let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
572            RobotClock::default(),
573            &config,
574            None,
575            tasks_instanciator,
576            monitor_instanciator,
577            FakeWriter {},
578        )
579        .unwrap();
580
581        // Now emulates the generated runtime
582        {
583            let copperlists = &mut runtime.copper_lists_manager;
584            let culist0 = copperlists
585                .create()
586                .expect("Ran out of space for copper lists");
587            // FIXME: error handling.
588            let id = culist0.id;
589            assert_eq!(id, 0);
590            culist0.change_state(CopperListState::Processing);
591            assert_eq!(runtime.available_copper_lists(), 1);
592        }
593
594        {
595            let copperlists = &mut runtime.copper_lists_manager;
596            let culist1 = copperlists
597                .create()
598                .expect("Ran out of space for copper lists"); // FIXME: error handling.
599            let id = culist1.id;
600            assert_eq!(id, 1);
601            culist1.change_state(CopperListState::Processing);
602            assert_eq!(runtime.available_copper_lists(), 0);
603        }
604
605        {
606            let copperlists = &mut runtime.copper_lists_manager;
607            let culist2 = copperlists.create();
608            assert!(culist2.is_none());
609            assert_eq!(runtime.available_copper_lists(), 0);
610        }
611
612        // Free in order, should let the top of the stack be serialized and freed.
613        runtime.end_of_processing(1);
614        assert_eq!(runtime.available_copper_lists(), 1);
615
616        // Readd a CL
617        {
618            let copperlists = &mut runtime.copper_lists_manager;
619            let culist2 = copperlists
620                .create()
621                .expect("Ran out of space for copper lists"); // FIXME: error handling.
622            let id = culist2.id;
623            assert_eq!(id, 2);
624            culist2.change_state(CopperListState::Processing);
625            assert_eq!(runtime.available_copper_lists(), 0);
626        }
627
628        // Free out of order, the #0 first
629        runtime.end_of_processing(0);
630        // Should not free up the top of the stack
631        assert_eq!(runtime.available_copper_lists(), 0);
632
633        // Free up the top of the stack
634        runtime.end_of_processing(2);
635        // This should free up 2 CLs
636
637        assert_eq!(runtime.available_copper_lists(), 2);
638    }
639
640    #[test]
641    fn test_runtime_task_input_order() {
642        let mut config = CuConfig::default();
643        let graph = config.get_graph_mut(None).unwrap();
644        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
645        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
646        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
647
648        assert_eq!(src1_id, 0);
649        assert_eq!(src2_id, 1);
650
651        // note that the source2 connection is before the source1
652        let src1_type = "src1_type";
653        let src2_type = "src2_type";
654        graph.connect(src2_id, sink_id, src2_type).unwrap();
655        graph.connect(src1_id, sink_id, src1_type).unwrap();
656
657        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
658        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
659        // the edge id depends on the order the connection is created, not
660        // on the node id, and that is what determines the input order
661        assert_eq!(src1_edge_id, 1);
662        assert_eq!(src2_edge_id, 0);
663
664        let runtime = compute_runtime_plan(graph).unwrap();
665        let sink_step = runtime
666            .steps
667            .iter()
668            .find_map(|step| match step {
669                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
670                _ => None,
671            })
672            .unwrap();
673
674        // since the src2 connection was added before src1 connection, the src2 type should be
675        // first
676        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
677        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
678    }
679
680    #[test]
681    fn test_runtime_plan_diamond_case1() {
682        // more complex topology that tripped the scheduler
683        let mut config = CuConfig::default();
684        let graph = config.get_graph_mut(None).unwrap();
685        let cam0_id = graph
686            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
687            .unwrap();
688        let inf0_id = graph
689            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
690            .unwrap();
691        let broadcast_id = graph
692            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
693            .unwrap();
694
695        // case 1 order
696        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
697        graph.connect(cam0_id, inf0_id, "i32").unwrap();
698        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
699
700        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
701        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
702
703        assert_eq!(edge_cam0_to_inf0, 0);
704        assert_eq!(edge_cam0_to_broadcast, 1);
705
706        let runtime = compute_runtime_plan(graph).unwrap();
707        let broadcast_step = runtime
708            .steps
709            .iter()
710            .find_map(|step| match step {
711                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
712                _ => None,
713            })
714            .unwrap();
715
716        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
717        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
718    }
719
720    #[test]
721    fn test_runtime_plan_diamond_case2() {
722        // more complex topology that tripped the scheduler variation 2
723        let mut config = CuConfig::default();
724        let graph = config.get_graph_mut(None).unwrap();
725        let cam0_id = graph
726            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
727            .unwrap();
728        let inf0_id = graph
729            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
730            .unwrap();
731        let broadcast_id = graph
732            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
733            .unwrap();
734
735        // case 2 order
736        graph.connect(cam0_id, inf0_id, "i32").unwrap();
737        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
738        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
739
740        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
741        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
742
743        assert_eq!(edge_cam0_to_broadcast, 0);
744        assert_eq!(edge_cam0_to_inf0, 1);
745
746        let runtime = compute_runtime_plan(graph).unwrap();
747        let broadcast_step = runtime
748            .steps
749            .iter()
750            .find_map(|step| match step {
751                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
752                _ => None,
753            })
754            .unwrap();
755
756        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
757        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
758    }
759}