cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16#[cfg(target_os = "none")]
17#[allow(unused_imports)]
18use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
19#[cfg(target_os = "none")]
20#[allow(unused_imports)]
21use cu29_log_derive::info;
22#[cfg(target_os = "none")]
23#[allow(unused_imports)]
24use cu29_log_runtime::log;
25#[cfg(all(target_os = "none", debug_assertions))]
26#[allow(unused_imports)]
27use cu29_log_runtime::log_debug_mode;
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_value::to_value;
31
32use alloc::boxed::Box;
33use alloc::collections::{BTreeSet, VecDeque};
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use bincode::enc::EncoderImpl;
38use bincode::enc::write::{SizeWriter, SliceWriter};
39use bincode::error::EncodeError;
40use bincode::{Decode, Encode};
41use core::fmt::Result as FmtResult;
42use core::fmt::{Debug, Formatter};
43
44#[cfg(feature = "std")]
45use cu29_log_runtime::LoggerRuntime;
46#[cfg(feature = "std")]
47use cu29_unifiedlog::UnifiedLoggerWrite;
48#[cfg(feature = "std")]
49use std::sync::{Arc, Mutex};
50
51/// Just a simple struct to hold the various bits needed to run a Copper application.
52#[cfg(feature = "std")]
53pub struct CopperContext {
54    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
55    pub logger_runtime: LoggerRuntime,
56    pub clock: RobotClock,
57}
58
59/// Manages the lifecycle of the copper lists and logging.
60pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
61    pub inner: CuListsManager<P, NBCL>,
62    /// Logger for the copper lists (messages between tasks)
63    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
64}
65
66impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
67    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
68        let mut is_top = true;
69        let mut nb_done = 0;
70        for cl in self.inner.iter_mut() {
71            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
72                cl.change_state(CopperListState::DoneProcessing);
73            }
74            if is_top && cl.get_state() == CopperListState::DoneProcessing {
75                if let Some(logger) = &mut self.logger {
76                    cl.change_state(CopperListState::BeingSerialized);
77                    logger.log(cl)?;
78                }
79                cl.change_state(CopperListState::Free);
80                nb_done += 1;
81            } else {
82                is_top = false;
83            }
84        }
85        for _ in 0..nb_done {
86            let _ = self.inner.pop();
87        }
88        Ok(())
89    }
90
91    pub fn available_copper_lists(&self) -> usize {
92        NBCL - self.inner.len()
93    }
94}
95
96/// Manages the frozen tasks state and logging.
97pub struct KeyFramesManager {
98    /// Where the serialized tasks are stored following the wave of execution of a CL.
99    inner: KeyFrame,
100
101    /// Logger for the state of the tasks (frozen tasks)
102    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
103
104    /// Capture a keyframe only each...
105    keyframe_interval: u32,
106}
107
108impl KeyFramesManager {
109    fn is_keyframe(&self, culistid: u32) -> bool {
110        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
111    }
112
113    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
114        if self.is_keyframe(culistid) {
115            self.inner.reset(culistid, clock.now());
116        }
117    }
118
119    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
120        if self.is_keyframe(culistid) {
121            if self.inner.culistid != culistid {
122                panic!("Freezing task for a different culistid");
123            }
124            self.inner
125                .add_frozen_task(task)
126                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
127        } else {
128            Ok(0)
129        }
130    }
131
132    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
133        if self.is_keyframe(culistid) {
134            let logger = self.logger.as_mut().unwrap();
135            logger.log(&self.inner)
136        } else {
137            Ok(())
138        }
139    }
140}
141
142/// This is the main structure that will be injected as a member of the Application struct.
143/// CT is the tuple of all the tasks in order of execution.
144/// CL is the type of the copper list, representing the input/output messages for all the tasks.
145pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
146    /// The base clock the runtime will be using to record time.
147    pub clock: RobotClock, // TODO: remove public at some point
148
149    /// The tuple of all the tasks in order of execution.
150    pub tasks: CT,
151
152    /// Tuple of all instantiated bridges.
153    pub bridges: CB,
154
155    /// Resource registry kept alive for tasks borrowing shared handles.
156    pub resources: ResourceManager,
157
158    /// The runtime monitoring.
159    pub monitor: M,
160
161    /// The logger for the copper lists (messages between tasks)
162    pub copperlists_manager: CopperListsManager<P, NBCL>,
163
164    /// The logger for the state of the tasks (frozen tasks)
165    pub keyframes_manager: KeyFramesManager,
166
167    /// The runtime configuration controlling the behavior of the run loop
168    pub runtime_config: RuntimeConfig,
169}
170
171/// To be able to share the clock we make the runtime a clock provider.
172impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
173    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
174{
175    fn get_clock(&self) -> RobotClock {
176        self.clock.clone()
177    }
178}
179
180/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
181/// It is a double encapsulation: this one recording the culistid and another even in
182/// bincode in the serialized_tasks.
183#[derive(Encode, Decode)]
184pub struct KeyFrame {
185    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
186    pub culistid: u32,
187    // This is the timestamp when the keyframe was created, using the robot clock.
188    pub timestamp: CuTime,
189    // This is the bincode representation of the tuple of all the tasks.
190    pub serialized_tasks: Vec<u8>,
191}
192
193impl KeyFrame {
194    fn new() -> Self {
195        KeyFrame {
196            culistid: 0,
197            timestamp: CuTime::default(),
198            serialized_tasks: Vec::new(),
199        }
200    }
201
202    /// This is to be able to avoid reallocations
203    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
204        self.culistid = culistid;
205        self.timestamp = timestamp;
206        self.serialized_tasks.clear();
207    }
208
209    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
210    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
211        let cfg = bincode::config::standard();
212        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
213        BincodeAdapter(task).encode(&mut sizer)?;
214        let need = sizer.into_writer().bytes_written as usize;
215
216        let start = self.serialized_tasks.len();
217        self.serialized_tasks.resize(start + need, 0);
218        let mut enc =
219            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
220        BincodeAdapter(task).encode(&mut enc)?;
221        Ok(need)
222    }
223}
224
225impl<
226    CT,
227    CB,
228    P: CopperListTuple + CuListZeroedInit + Default + 'static,
229    M: CuMonitor,
230    const NBCL: usize,
231> CuRuntime<CT, CB, P, M, NBCL>
232{
233    // FIXME(gbin): this became REALLY ugly with no-std
234    #[allow(clippy::too_many_arguments)]
235    #[cfg(feature = "std")]
236    pub fn new(
237        clock: RobotClock,
238        config: &CuConfig,
239        mission: Option<&str>,
240        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
241        tasks_instanciator: impl for<'c> Fn(
242            Vec<Option<&'c ComponentConfig>>,
243            &mut ResourceManager,
244        ) -> CuResult<CT>,
245        monitor_instanciator: impl Fn(&CuConfig) -> M,
246        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
247        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
248        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
249    ) -> CuResult<Self> {
250        let resources = resources_instanciator(config)?;
251        Self::new_with_resources(
252            clock,
253            config,
254            mission,
255            resources,
256            tasks_instanciator,
257            monitor_instanciator,
258            bridges_instanciator,
259            copperlists_logger,
260            keyframes_logger,
261        )
262    }
263
264    #[allow(clippy::too_many_arguments)]
265    #[cfg(feature = "std")]
266    pub fn new_with_resources(
267        clock: RobotClock,
268        config: &CuConfig,
269        mission: Option<&str>,
270        mut resources: ResourceManager,
271        tasks_instanciator: impl for<'c> Fn(
272            Vec<Option<&'c ComponentConfig>>,
273            &mut ResourceManager,
274        ) -> CuResult<CT>,
275        monitor_instanciator: impl Fn(&CuConfig) -> M,
276        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
277        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
278        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
279    ) -> CuResult<Self> {
280        let graph = config.get_graph(mission)?;
281        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
282            .get_all_nodes()
283            .iter()
284            .map(|(_, node)| node.get_instance_config())
285            .collect();
286
287        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
288        let mut monitor = monitor_instanciator(config);
289        if let Ok(topology) = build_monitor_topology(config, mission) {
290            monitor.set_topology(topology);
291        }
292        let bridges = bridges_instanciator(config, &mut resources)?;
293
294        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
295            Some(logging_config) if logging_config.enable_task_logging => (
296                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
297                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
298                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
299            ),
300            Some(_) => (None, None, 0), // explicit no enable logging
301            None => (
302                // default
303                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
304                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
305                DEFAULT_KEYFRAME_INTERVAL,
306            ),
307        };
308
309        let copperlists_manager = CopperListsManager {
310            inner: CuListsManager::new(),
311            logger: copperlists_logger,
312        };
313        #[cfg(target_os = "none")]
314        {
315            let cl_size = core::mem::size_of::<CopperList<P>>();
316            let total_bytes = cl_size.saturating_mul(NBCL);
317            info!(
318                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
319                NBCL, cl_size, total_bytes
320            );
321        }
322
323        let keyframes_manager = KeyFramesManager {
324            inner: KeyFrame::new(),
325            logger: keyframes_logger,
326            keyframe_interval,
327        };
328
329        let runtime_config = config.runtime.clone().unwrap_or_default();
330
331        let runtime = Self {
332            tasks,
333            bridges,
334            resources,
335            monitor,
336            clock,
337            copperlists_manager,
338            keyframes_manager,
339            runtime_config,
340        };
341
342        Ok(runtime)
343    }
344
345    #[allow(clippy::too_many_arguments)]
346    #[cfg(not(feature = "std"))]
347    pub fn new(
348        clock: RobotClock,
349        config: &CuConfig,
350        mission: Option<&str>,
351        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
352        tasks_instanciator: impl for<'c> Fn(
353            Vec<Option<&'c ComponentConfig>>,
354            &mut ResourceManager,
355        ) -> CuResult<CT>,
356        monitor_instanciator: impl Fn(&CuConfig) -> M,
357        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
358        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
359        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
360    ) -> CuResult<Self> {
361        #[cfg(target_os = "none")]
362        info!("CuRuntime::new: resources instanciator");
363        let resources = resources_instanciator(config)?;
364        Self::new_with_resources(
365            clock,
366            config,
367            mission,
368            resources,
369            tasks_instanciator,
370            monitor_instanciator,
371            bridges_instanciator,
372            copperlists_logger,
373            keyframes_logger,
374        )
375    }
376
377    #[allow(clippy::too_many_arguments)]
378    #[cfg(not(feature = "std"))]
379    pub fn new_with_resources(
380        clock: RobotClock,
381        config: &CuConfig,
382        mission: Option<&str>,
383        mut resources: ResourceManager,
384        tasks_instanciator: impl for<'c> Fn(
385            Vec<Option<&'c ComponentConfig>>,
386            &mut ResourceManager,
387        ) -> CuResult<CT>,
388        monitor_instanciator: impl Fn(&CuConfig) -> M,
389        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
390        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
391        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
392    ) -> CuResult<Self> {
393        #[cfg(target_os = "none")]
394        info!("CuRuntime::new: get graph");
395        let graph = config.get_graph(mission)?;
396        #[cfg(target_os = "none")]
397        info!("CuRuntime::new: graph ok");
398        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
399            .get_all_nodes()
400            .iter()
401            .map(|(_, node)| node.get_instance_config())
402            .collect();
403
404        #[cfg(target_os = "none")]
405        info!("CuRuntime::new: tasks instanciator");
406        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
407
408        #[cfg(target_os = "none")]
409        info!("CuRuntime::new: monitor instanciator");
410        let mut monitor = monitor_instanciator(config);
411        #[cfg(target_os = "none")]
412        info!("CuRuntime::new: monitor instanciator ok");
413        #[cfg(target_os = "none")]
414        info!("CuRuntime::new: build monitor topology");
415        if let Ok(topology) = build_monitor_topology(config, mission) {
416            #[cfg(target_os = "none")]
417            info!("CuRuntime::new: monitor topology ok");
418            monitor.set_topology(topology);
419            #[cfg(target_os = "none")]
420            info!("CuRuntime::new: monitor topology set");
421        }
422        #[cfg(target_os = "none")]
423        info!("CuRuntime::new: bridges instanciator");
424        let bridges = bridges_instanciator(config, &mut resources)?;
425
426        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
427            Some(logging_config) if logging_config.enable_task_logging => (
428                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
429                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
430                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
431            ),
432            Some(_) => (None, None, 0), // explicit no enable logging
433            None => (
434                // default
435                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
436                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
437                DEFAULT_KEYFRAME_INTERVAL,
438            ),
439        };
440
441        let copperlists_manager = CopperListsManager {
442            inner: CuListsManager::new(),
443            logger: copperlists_logger,
444        };
445        #[cfg(target_os = "none")]
446        {
447            let cl_size = core::mem::size_of::<CopperList<P>>();
448            let total_bytes = cl_size.saturating_mul(NBCL);
449            info!(
450                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
451                NBCL, cl_size, total_bytes
452            );
453        }
454
455        let keyframes_manager = KeyFramesManager {
456            inner: KeyFrame::new(),
457            logger: keyframes_logger,
458            keyframe_interval,
459        };
460
461        let runtime_config = config.runtime.clone().unwrap_or_default();
462
463        let runtime = Self {
464            tasks,
465            bridges,
466            resources,
467            monitor,
468            clock,
469            copperlists_manager,
470            keyframes_manager,
471            runtime_config,
472        };
473
474        Ok(runtime)
475    }
476}
477
478/// Copper tasks can be of 3 types:
479/// - Source: only producing output messages (usually used for drivers)
480/// - Regular: processing input messages and producing output messages, more like compute nodes.
481/// - Sink: only consuming input messages (usually used for actuators)
482#[derive(Debug, PartialEq, Eq, Clone, Copy)]
483pub enum CuTaskType {
484    Source,
485    Regular,
486    Sink,
487}
488
489#[derive(Debug, Clone)]
490pub struct CuOutputPack {
491    pub culist_index: u32,
492    pub msg_types: Vec<String>,
493}
494
495#[derive(Debug, Clone)]
496pub struct CuInputMsg {
497    pub culist_index: u32,
498    pub msg_type: String,
499    pub src_port: usize,
500    pub edge_id: usize,
501}
502
503/// This structure represents a step in the execution plan.
504pub struct CuExecutionStep {
505    /// NodeId: node id of the task to execute
506    pub node_id: NodeId,
507    /// Node: node instance
508    pub node: Node,
509    /// CuTaskType: type of the task
510    pub task_type: CuTaskType,
511
512    /// the indices in the copper list of the input messages and their types
513    pub input_msg_indices_types: Vec<CuInputMsg>,
514
515    /// the index in the copper list of the output message and its type
516    pub output_msg_pack: Option<CuOutputPack>,
517}
518
519impl Debug for CuExecutionStep {
520    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
521        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
522        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
523        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
524        f.write_str(
525            format!(
526                "              input_msg_types: {:?}\n",
527                self.input_msg_indices_types
528            )
529            .as_str(),
530        )?;
531        f.write_str(format!("       output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
532        Ok(())
533    }
534}
535
536/// This structure represents a loop in the execution plan.
537/// It is used to represent a sequence of Execution units (loop or steps) that are executed
538/// multiple times.
539/// if loop_count is None, the loop is infinite.
540pub struct CuExecutionLoop {
541    pub steps: Vec<CuExecutionUnit>,
542    pub loop_count: Option<u32>,
543}
544
545impl Debug for CuExecutionLoop {
546    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
547        f.write_str("CuExecutionLoop:\n")?;
548        for step in &self.steps {
549            match step {
550                CuExecutionUnit::Step(step) => {
551                    step.fmt(f)?;
552                }
553                CuExecutionUnit::Loop(l) => {
554                    l.fmt(f)?;
555                }
556            }
557        }
558
559        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
560        Ok(())
561    }
562}
563
564/// This structure represents a step in the execution plan.
565#[derive(Debug)]
566pub enum CuExecutionUnit {
567    Step(CuExecutionStep),
568    Loop(CuExecutionLoop),
569}
570
571fn find_output_pack_from_nodeid(
572    node_id: NodeId,
573    steps: &Vec<CuExecutionUnit>,
574) -> Option<CuOutputPack> {
575    for step in steps {
576        match step {
577            CuExecutionUnit::Loop(loop_unit) => {
578                if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
579                    return Some(output_pack);
580                }
581            }
582            CuExecutionUnit::Step(step) => {
583                if step.node_id == node_id {
584                    return step.output_msg_pack.clone();
585                }
586            }
587        }
588    }
589    None
590}
591
592pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
593    if graph.incoming_neighbor_count(node_id) == 0 {
594        CuTaskType::Source
595    } else if graph.outgoing_neighbor_count(node_id) == 0 {
596        CuTaskType::Sink
597    } else {
598        CuTaskType::Regular
599    }
600}
601
602/// The connection id used here is the index of the config graph edge that equates to the wanted
603/// connection.
604fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
605    input_msg_indices_types.sort_by_key(|input| input.edge_id);
606}
607
608fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
609    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
610    edge_ids.sort();
611
612    let mut msg_types = Vec::new();
613    let mut seen = Vec::new();
614    for edge_id in edge_ids {
615        if let Some(edge) = graph.edge(edge_id) {
616            if seen.iter().any(|msg| msg == &edge.msg) {
617                continue;
618            }
619            seen.push(edge.msg.clone());
620            msg_types.push(edge.msg.clone());
621        }
622    }
623    msg_types
624}
625/// Explores a subbranch and build the partial plan out of it.
626fn plan_tasks_tree_branch(
627    graph: &CuGraph,
628    mut next_culist_output_index: u32,
629    starting_point: NodeId,
630    plan: &mut Vec<CuExecutionUnit>,
631) -> (u32, bool) {
632    #[cfg(all(feature = "std", feature = "macro_debug"))]
633    eprintln!("-- starting branch from node {starting_point}");
634
635    let mut handled = false;
636
637    for id in graph.bfs_nodes(starting_point) {
638        let node_ref = graph.get_node(id).unwrap();
639        #[cfg(all(feature = "std", feature = "macro_debug"))]
640        eprintln!("  Visiting node: {node_ref:?}");
641
642        let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
643        let output_msg_pack: Option<CuOutputPack>;
644        let task_type = find_task_type_for_id(graph, id);
645
646        match task_type {
647            CuTaskType::Source => {
648                #[cfg(all(feature = "std", feature = "macro_debug"))]
649                eprintln!("    → Source node, assign output index {next_culist_output_index}");
650                let msg_types = collect_output_msg_types(graph, id);
651                if msg_types.is_empty() {
652                    panic!(
653                        "Source node '{}' has no outgoing connections",
654                        node_ref.get_id()
655                    );
656                }
657                output_msg_pack = Some(CuOutputPack {
658                    culist_index: next_culist_output_index,
659                    msg_types,
660                });
661                next_culist_output_index += 1;
662            }
663            CuTaskType::Sink => {
664                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
665                edge_ids.sort();
666                #[cfg(all(feature = "std", feature = "macro_debug"))]
667                eprintln!("    → Sink with incoming edges: {edge_ids:?}");
668                for edge_id in edge_ids {
669                    let edge = graph
670                        .edge(edge_id)
671                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
672                    let pid = graph
673                        .get_node_id_by_name(edge.src.as_str())
674                        .unwrap_or_else(|| {
675                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
676                        });
677                    let output_pack = find_output_pack_from_nodeid(pid, plan);
678                    if let Some(output_pack) = output_pack {
679                        #[cfg(all(feature = "std", feature = "macro_debug"))]
680                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
681                        let msg_type = edge.msg.as_str();
682                        let src_port = output_pack
683                            .msg_types
684                            .iter()
685                            .position(|msg| msg == msg_type)
686                            .unwrap_or_else(|| {
687                                panic!(
688                                    "Missing output port for message type '{msg_type}' on node {pid}"
689                                )
690                            });
691                        input_msg_indices_types.push(CuInputMsg {
692                            culist_index: output_pack.culist_index,
693                            msg_type: msg_type.to_string(),
694                            src_port,
695                            edge_id,
696                        });
697                    } else {
698                        #[cfg(all(feature = "std", feature = "macro_debug"))]
699                        eprintln!("      ✗ Input from {pid} not ready, returning");
700                        return (next_culist_output_index, handled);
701                    }
702                }
703                output_msg_pack = Some(CuOutputPack {
704                    culist_index: next_culist_output_index,
705                    msg_types: Vec::from(["()".to_string()]),
706                });
707                next_culist_output_index += 1;
708            }
709            CuTaskType::Regular => {
710                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
711                edge_ids.sort();
712                #[cfg(all(feature = "std", feature = "macro_debug"))]
713                eprintln!("    → Regular task with incoming edges: {edge_ids:?}");
714                for edge_id in edge_ids {
715                    let edge = graph
716                        .edge(edge_id)
717                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
718                    let pid = graph
719                        .get_node_id_by_name(edge.src.as_str())
720                        .unwrap_or_else(|| {
721                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
722                        });
723                    let output_pack = find_output_pack_from_nodeid(pid, plan);
724                    if let Some(output_pack) = output_pack {
725                        #[cfg(all(feature = "std", feature = "macro_debug"))]
726                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
727                        let msg_type = edge.msg.as_str();
728                        let src_port = output_pack
729                            .msg_types
730                            .iter()
731                            .position(|msg| msg == msg_type)
732                            .unwrap_or_else(|| {
733                                panic!(
734                                    "Missing output port for message type '{msg_type}' on node {pid}"
735                                )
736                            });
737                        input_msg_indices_types.push(CuInputMsg {
738                            culist_index: output_pack.culist_index,
739                            msg_type: msg_type.to_string(),
740                            src_port,
741                            edge_id,
742                        });
743                    } else {
744                        #[cfg(all(feature = "std", feature = "macro_debug"))]
745                        eprintln!("      ✗ Input from {pid} not ready, returning");
746                        return (next_culist_output_index, handled);
747                    }
748                }
749                let msg_types = collect_output_msg_types(graph, id);
750                if msg_types.is_empty() {
751                    panic!(
752                        "Regular node '{}' has no outgoing connections",
753                        node_ref.get_id()
754                    );
755                }
756                output_msg_pack = Some(CuOutputPack {
757                    culist_index: next_culist_output_index,
758                    msg_types,
759                });
760                next_culist_output_index += 1;
761            }
762        }
763
764        sort_inputs_by_cnx_id(&mut input_msg_indices_types);
765
766        if let Some(pos) = plan
767            .iter()
768            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
769        {
770            #[cfg(all(feature = "std", feature = "macro_debug"))]
771            eprintln!("    → Already in plan, modifying existing step");
772            let mut step = plan.remove(pos);
773            if let CuExecutionUnit::Step(ref mut s) = step {
774                s.input_msg_indices_types = input_msg_indices_types;
775            }
776            plan.push(step);
777        } else {
778            #[cfg(all(feature = "std", feature = "macro_debug"))]
779            eprintln!("    → New step added to plan");
780            let step = CuExecutionStep {
781                node_id: id,
782                node: node_ref.clone(),
783                task_type,
784                input_msg_indices_types,
785                output_msg_pack,
786            };
787            plan.push(CuExecutionUnit::Step(step));
788        }
789
790        handled = true;
791    }
792
793    #[cfg(all(feature = "std", feature = "macro_debug"))]
794    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
795    (next_culist_output_index, handled)
796}
797
798/// This is the main heuristics to compute an execution plan at compilation time.
799/// TODO(gbin): Make that heuristic pluggable.
800pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
801    #[cfg(all(feature = "std", feature = "macro_debug"))]
802    eprintln!("[runtime plan]");
803    let mut plan = Vec::new();
804    let mut next_culist_output_index = 0u32;
805
806    let mut queue: VecDeque<NodeId> = graph
807        .node_ids()
808        .into_iter()
809        .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
810        .collect();
811
812    #[cfg(all(feature = "std", feature = "macro_debug"))]
813    eprintln!("Initial source nodes: {queue:?}");
814
815    while let Some(start_node) = queue.pop_front() {
816        #[cfg(all(feature = "std", feature = "macro_debug"))]
817        eprintln!("→ Starting BFS from source {start_node}");
818        for node_id in graph.bfs_nodes(start_node) {
819            let already_in_plan = plan
820                .iter()
821                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
822            if already_in_plan {
823                #[cfg(all(feature = "std", feature = "macro_debug"))]
824                eprintln!("    → Node {node_id} already planned, skipping");
825                continue;
826            }
827
828            #[cfg(all(feature = "std", feature = "macro_debug"))]
829            eprintln!("    Planning from node {node_id}");
830            let (new_index, handled) =
831                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
832            next_culist_output_index = new_index;
833
834            if !handled {
835                #[cfg(all(feature = "std", feature = "macro_debug"))]
836                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
837                continue;
838            }
839
840            #[cfg(all(feature = "std", feature = "macro_debug"))]
841            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
842            for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
843                #[cfg(all(feature = "std", feature = "macro_debug"))]
844                eprintln!("      → Enqueueing neighbor {neighbor}");
845                queue.push_back(neighbor);
846            }
847        }
848    }
849
850    let mut planned_nodes = BTreeSet::new();
851    for unit in &plan {
852        if let CuExecutionUnit::Step(step) = unit {
853            planned_nodes.insert(step.node_id);
854        }
855    }
856
857    let mut missing = Vec::new();
858    for node_id in graph.node_ids() {
859        if !planned_nodes.contains(&node_id) {
860            if let Some(node) = graph.get_node(node_id) {
861                missing.push(node.get_id().to_string());
862            } else {
863                missing.push(format!("node_id_{node_id}"));
864            }
865        }
866    }
867
868    if !missing.is_empty() {
869        missing.sort();
870        return Err(CuError::from(format!(
871            "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
872            missing.join(", ")
873        )));
874    }
875
876    Ok(CuExecutionLoop {
877        steps: plan,
878        loop_count: None,
879    })
880}
881
882//tests
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use crate::config::Node;
887    use crate::cutask::CuSinkTask;
888    use crate::cutask::{CuSrcTask, Freezable};
889    use crate::monitoring::NoMonitor;
890    use bincode::Encode;
891    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
892    use serde_derive::{Deserialize, Serialize};
893
894    pub struct TestSource {}
895
896    impl Freezable for TestSource {}
897
898    impl CuSrcTask for TestSource {
899        type Resources<'r> = ();
900        type Output<'m> = ();
901        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
902        where
903            Self: Sized,
904        {
905            Ok(Self {})
906        }
907
908        fn process(
909            &mut self,
910            _clock: &RobotClock,
911            _empty_msg: &mut Self::Output<'_>,
912        ) -> CuResult<()> {
913            Ok(())
914        }
915    }
916
917    pub struct TestSink {}
918
919    impl Freezable for TestSink {}
920
921    impl CuSinkTask for TestSink {
922        type Resources<'r> = ();
923        type Input<'m> = ();
924
925        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
926        where
927            Self: Sized,
928        {
929            Ok(Self {})
930        }
931
932        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
933            Ok(())
934        }
935    }
936
937    // Those should be generated by the derive macro
938    type Tasks = (TestSource, TestSink);
939
940    #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
941    struct Msgs(());
942
943    impl ErasedCuStampedDataSet for Msgs {
944        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
945            Vec::new()
946        }
947    }
948
949    impl MatchingTasks for Msgs {
950        fn get_all_task_ids() -> &'static [&'static str] {
951            &[]
952        }
953    }
954
955    impl CuListZeroedInit for Msgs {
956        fn init_zeroed(&mut self) {}
957    }
958
959    #[cfg(feature = "std")]
960    fn tasks_instanciator(
961        all_instances_configs: Vec<Option<&ComponentConfig>>,
962        _resources: &mut ResourceManager,
963    ) -> CuResult<Tasks> {
964        Ok((
965            TestSource::new(all_instances_configs[0], ())?,
966            TestSink::new(all_instances_configs[1], ())?,
967        ))
968    }
969
970    #[cfg(not(feature = "std"))]
971    fn tasks_instanciator(
972        all_instances_configs: Vec<Option<&ComponentConfig>>,
973        _resources: &mut ResourceManager,
974    ) -> CuResult<Tasks> {
975        Ok((
976            TestSource::new(all_instances_configs[0], ())?,
977            TestSink::new(all_instances_configs[1], ())?,
978        ))
979    }
980
981    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
982        NoMonitor {}
983    }
984
985    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
986        Ok(())
987    }
988
989    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
990        Ok(ResourceManager::new(&[]))
991    }
992
993    #[derive(Debug)]
994    struct FakeWriter {}
995
996    impl<E: Encode> WriteStream<E> for FakeWriter {
997        fn log(&mut self, _obj: &E) -> CuResult<()> {
998            Ok(())
999        }
1000    }
1001
1002    #[test]
1003    fn test_runtime_instantiation() {
1004        let mut config = CuConfig::default();
1005        let graph = config.get_graph_mut(None).unwrap();
1006        graph.add_node(Node::new("a", "TestSource")).unwrap();
1007        graph.add_node(Node::new("b", "TestSink")).unwrap();
1008        graph.connect(0, 1, "()").unwrap();
1009        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1010            RobotClock::default(),
1011            &config,
1012            None,
1013            resources_instanciator,
1014            tasks_instanciator,
1015            monitor_instanciator,
1016            bridges_instanciator,
1017            FakeWriter {},
1018            FakeWriter {},
1019        );
1020        assert!(runtime.is_ok());
1021    }
1022
1023    #[test]
1024    fn test_copperlists_manager_lifecycle() {
1025        let mut config = CuConfig::default();
1026        let graph = config.get_graph_mut(None).unwrap();
1027        graph.add_node(Node::new("a", "TestSource")).unwrap();
1028        graph.add_node(Node::new("b", "TestSink")).unwrap();
1029        graph.connect(0, 1, "()").unwrap();
1030
1031        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1032            RobotClock::default(),
1033            &config,
1034            None,
1035            resources_instanciator,
1036            tasks_instanciator,
1037            monitor_instanciator,
1038            bridges_instanciator,
1039            FakeWriter {},
1040            FakeWriter {},
1041        )
1042        .unwrap();
1043
1044        // Now emulates the generated runtime
1045        {
1046            let copperlists = &mut runtime.copperlists_manager;
1047            let culist0 = copperlists
1048                .inner
1049                .create()
1050                .expect("Ran out of space for copper lists");
1051            // FIXME: error handling.
1052            let id = culist0.id;
1053            assert_eq!(id, 0);
1054            culist0.change_state(CopperListState::Processing);
1055            assert_eq!(copperlists.available_copper_lists(), 1);
1056        }
1057
1058        {
1059            let copperlists = &mut runtime.copperlists_manager;
1060            let culist1 = copperlists
1061                .inner
1062                .create()
1063                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1064            let id = culist1.id;
1065            assert_eq!(id, 1);
1066            culist1.change_state(CopperListState::Processing);
1067            assert_eq!(copperlists.available_copper_lists(), 0);
1068        }
1069
1070        {
1071            let copperlists = &mut runtime.copperlists_manager;
1072            let culist2 = copperlists.inner.create();
1073            assert!(culist2.is_none());
1074            assert_eq!(copperlists.available_copper_lists(), 0);
1075            // Free in order, should let the top of the stack be serialized and freed.
1076            let _ = copperlists.end_of_processing(1);
1077            assert_eq!(copperlists.available_copper_lists(), 1);
1078        }
1079
1080        // Readd a CL
1081        {
1082            let copperlists = &mut runtime.copperlists_manager;
1083            let culist2 = copperlists
1084                .inner
1085                .create()
1086                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1087            let id = culist2.id;
1088            assert_eq!(id, 2);
1089            culist2.change_state(CopperListState::Processing);
1090            assert_eq!(copperlists.available_copper_lists(), 0);
1091            // Free out of order, the #0 first
1092            let _ = copperlists.end_of_processing(0);
1093            // Should not free up the top of the stack
1094            assert_eq!(copperlists.available_copper_lists(), 0);
1095
1096            // Free up the top of the stack
1097            let _ = copperlists.end_of_processing(2);
1098            // This should free up 2 CLs
1099
1100            assert_eq!(copperlists.available_copper_lists(), 2);
1101        }
1102    }
1103
1104    #[test]
1105    fn test_runtime_task_input_order() {
1106        let mut config = CuConfig::default();
1107        let graph = config.get_graph_mut(None).unwrap();
1108        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1109        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1110        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1111
1112        assert_eq!(src1_id, 0);
1113        assert_eq!(src2_id, 1);
1114
1115        // note that the source2 connection is before the source1
1116        let src1_type = "src1_type";
1117        let src2_type = "src2_type";
1118        graph.connect(src2_id, sink_id, src2_type).unwrap();
1119        graph.connect(src1_id, sink_id, src1_type).unwrap();
1120
1121        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1122        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1123        // the edge id depends on the order the connection is created, not
1124        // on the node id, and that is what determines the input order
1125        assert_eq!(src1_edge_id, 1);
1126        assert_eq!(src2_edge_id, 0);
1127
1128        let runtime = compute_runtime_plan(graph).unwrap();
1129        let sink_step = runtime
1130            .steps
1131            .iter()
1132            .find_map(|step| match step {
1133                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1134                _ => None,
1135            })
1136            .unwrap();
1137
1138        // since the src2 connection was added before src1 connection, the src2 type should be
1139        // first
1140        assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
1141        assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
1142    }
1143
1144    #[test]
1145    fn test_runtime_output_ports_unique_ordered() {
1146        let mut config = CuConfig::default();
1147        let graph = config.get_graph_mut(None).unwrap();
1148        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1149        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1150        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1151        let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
1152        let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
1153
1154        graph.connect(src_id, dst_a_id, "msg::A").unwrap();
1155        graph.connect(src_id, dst_b_id, "msg::B").unwrap();
1156        graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
1157        graph.connect(src_id, dst_c_id, "msg::C").unwrap();
1158
1159        let runtime = compute_runtime_plan(graph).unwrap();
1160        let src_step = runtime
1161            .steps
1162            .iter()
1163            .find_map(|step| match step {
1164                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1165                _ => None,
1166            })
1167            .unwrap();
1168
1169        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1170        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
1171
1172        let dst_a_step = runtime
1173            .steps
1174            .iter()
1175            .find_map(|step| match step {
1176                CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
1177                _ => None,
1178            })
1179            .unwrap();
1180        let dst_b_step = runtime
1181            .steps
1182            .iter()
1183            .find_map(|step| match step {
1184                CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
1185                _ => None,
1186            })
1187            .unwrap();
1188        let dst_a2_step = runtime
1189            .steps
1190            .iter()
1191            .find_map(|step| match step {
1192                CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
1193                _ => None,
1194            })
1195            .unwrap();
1196        let dst_c_step = runtime
1197            .steps
1198            .iter()
1199            .find_map(|step| match step {
1200                CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
1201                _ => None,
1202            })
1203            .unwrap();
1204
1205        assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
1206        assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
1207        assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
1208        assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
1209    }
1210
1211    #[test]
1212    fn test_runtime_output_ports_fanout_single() {
1213        let mut config = CuConfig::default();
1214        let graph = config.get_graph_mut(None).unwrap();
1215        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1216        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1217        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1218
1219        graph.connect(src_id, dst_a_id, "i32").unwrap();
1220        graph.connect(src_id, dst_b_id, "i32").unwrap();
1221
1222        let runtime = compute_runtime_plan(graph).unwrap();
1223        let src_step = runtime
1224            .steps
1225            .iter()
1226            .find_map(|step| match step {
1227                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1228                _ => None,
1229            })
1230            .unwrap();
1231
1232        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1233        assert_eq!(output_pack.msg_types, vec!["i32"]);
1234    }
1235
1236    #[test]
1237    fn test_runtime_plan_diamond_case1() {
1238        // more complex topology that tripped the scheduler
1239        let mut config = CuConfig::default();
1240        let graph = config.get_graph_mut(None).unwrap();
1241        let cam0_id = graph
1242            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1243            .unwrap();
1244        let inf0_id = graph
1245            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1246            .unwrap();
1247        let broadcast_id = graph
1248            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1249            .unwrap();
1250
1251        // case 1 order
1252        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1253        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1254        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1255
1256        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1257        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1258
1259        assert_eq!(edge_cam0_to_inf0, 0);
1260        assert_eq!(edge_cam0_to_broadcast, 1);
1261
1262        let runtime = compute_runtime_plan(graph).unwrap();
1263        let broadcast_step = runtime
1264            .steps
1265            .iter()
1266            .find_map(|step| match step {
1267                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1268                _ => None,
1269            })
1270            .unwrap();
1271
1272        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1273        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1274    }
1275
1276    #[test]
1277    fn test_runtime_plan_diamond_case2() {
1278        // more complex topology that tripped the scheduler variation 2
1279        let mut config = CuConfig::default();
1280        let graph = config.get_graph_mut(None).unwrap();
1281        let cam0_id = graph
1282            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1283            .unwrap();
1284        let inf0_id = graph
1285            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1286            .unwrap();
1287        let broadcast_id = graph
1288            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1289            .unwrap();
1290
1291        // case 2 order
1292        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1293        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1294        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1295
1296        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1297        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1298
1299        assert_eq!(edge_cam0_to_broadcast, 0);
1300        assert_eq!(edge_cam0_to_inf0, 1);
1301
1302        let runtime = compute_runtime_plan(graph).unwrap();
1303        let broadcast_step = runtime
1304            .steps
1305            .iter()
1306            .find_map(|step| match step {
1307                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1308                _ => None,
1309            })
1310            .unwrap();
1311
1312        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1313        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1314    }
1315}