Skip to main content

cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9#[cfg(feature = "std")]
10use crate::monitoring::ExecutionProbeHandle;
11#[cfg(feature = "std")]
12use crate::monitoring::MonitorExecutionProbe;
13use crate::monitoring::{
14    ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
15    ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
16};
17use crate::resource::ResourceManager;
18use compact_str::CompactString;
19use cu29_clock::{ClockProvider, CuTime, RobotClock};
20use cu29_traits::CuResult;
21use cu29_traits::WriteStream;
22use cu29_traits::{CopperListTuple, CuError};
23
24#[cfg(target_os = "none")]
25#[allow(unused_imports)]
26use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
27#[cfg(target_os = "none")]
28#[allow(unused_imports)]
29use cu29_log_derive::info;
30#[cfg(target_os = "none")]
31#[allow(unused_imports)]
32use cu29_log_runtime::log;
33#[cfg(all(target_os = "none", debug_assertions))]
34#[allow(unused_imports)]
35use cu29_log_runtime::log_debug_mode;
36#[cfg(target_os = "none")]
37#[allow(unused_imports)]
38use cu29_value::to_value;
39
40use alloc::boxed::Box;
41use alloc::collections::{BTreeSet, VecDeque};
42use alloc::format;
43use alloc::string::{String, ToString};
44use alloc::vec::Vec;
45use bincode::enc::EncoderImpl;
46use bincode::enc::write::{SizeWriter, SliceWriter};
47use bincode::error::EncodeError;
48use bincode::{Decode, Encode};
49use core::fmt::Result as FmtResult;
50use core::fmt::{Debug, Formatter};
51
52#[cfg(feature = "std")]
53use cu29_log_runtime::LoggerRuntime;
54#[cfg(feature = "std")]
55use cu29_unifiedlog::UnifiedLoggerWrite;
56#[cfg(feature = "std")]
57use std::sync::{Arc, Mutex};
58
59/// Just a simple struct to hold the various bits needed to run a Copper application.
60#[cfg(feature = "std")]
61pub struct CopperContext {
62    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
63    pub logger_runtime: LoggerRuntime,
64    pub clock: RobotClock,
65}
66
67/// Returns a monotonic instant used for local runtime performance timing.
68///
69/// When `sysclock-perf` (and `std`) are enabled this uses a process-local
70/// `RobotClock::new()` instance for timing. The returned value is a
71/// monotonically increasing duration since an unspecified origin (typically
72/// process or runtime initialization), not a wall-clock time-of-day. When
73/// `sysclock-perf` is disabled it delegates to the provided `RobotClock`.
74#[inline]
75pub fn perf_now(_clock: &RobotClock) -> CuTime {
76    #[cfg(all(feature = "std", feature = "sysclock-perf"))]
77    {
78        static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
79        return PERF_CLOCK.get_or_init(RobotClock::new).now();
80    }
81
82    #[allow(unreachable_code)]
83    _clock.now()
84}
85
86/// Manages the lifecycle of the copper lists and logging.
87pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
88    pub inner: CuListsManager<P, NBCL>,
89    /// Logger for the copper lists (messages between tasks)
90    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
91    /// Last encoded size returned by logger.log
92    pub last_encoded_bytes: u64,
93}
94
95impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
96    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
97        let mut is_top = true;
98        let mut nb_done = 0;
99        for cl in self.inner.iter_mut() {
100            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
101                cl.change_state(CopperListState::DoneProcessing);
102            }
103            if is_top && cl.get_state() == CopperListState::DoneProcessing {
104                if let Some(logger) = &mut self.logger {
105                    cl.change_state(CopperListState::BeingSerialized);
106                    logger.log(cl)?;
107                    self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
108                }
109                cl.change_state(CopperListState::Free);
110                nb_done += 1;
111            } else {
112                is_top = false;
113            }
114        }
115        for _ in 0..nb_done {
116            let _ = self.inner.pop();
117        }
118        Ok(())
119    }
120
121    pub fn available_copper_lists(&self) -> usize {
122        NBCL - self.inner.len()
123    }
124}
125
126/// Manages the frozen tasks state and logging.
127pub struct KeyFramesManager {
128    /// Where the serialized tasks are stored following the wave of execution of a CL.
129    inner: KeyFrame,
130
131    /// Optional override for the timestamp to stamp the next keyframe (used by deterministic replay).
132    forced_timestamp: Option<CuTime>,
133
134    /// If set, reuse this keyframe verbatim (e.g., during replay) instead of re-freezing state.
135    locked: bool,
136
137    /// Logger for the state of the tasks (frozen tasks)
138    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
139
140    /// Capture a keyframe only each...
141    keyframe_interval: u32,
142
143    /// Bytes written by the last keyframe log
144    pub last_encoded_bytes: u64,
145}
146
147impl KeyFramesManager {
148    fn is_keyframe(&self, culistid: u64) -> bool {
149        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
150    }
151
152    pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
153        if self.is_keyframe(culistid) {
154            // If a recorded keyframe was preloaded for this CL, keep it as-is.
155            if self.locked && self.inner.culistid == culistid {
156                return;
157            }
158            let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
159            self.inner.reset(culistid, ts);
160            self.locked = false;
161        }
162    }
163
164    /// Force the timestamp of the next keyframe to a given value.
165    #[cfg(feature = "std")]
166    pub fn set_forced_timestamp(&mut self, ts: CuTime) {
167        self.forced_timestamp = Some(ts);
168    }
169
170    pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
171        if self.is_keyframe(culistid) {
172            if self.locked {
173                // We are replaying a recorded keyframe verbatim; don't mutate it.
174                return Ok(0);
175            }
176            if self.inner.culistid != culistid {
177                return Err(CuError::from(format!(
178                    "Freezing task for culistid {} but current keyframe is {}",
179                    culistid, self.inner.culistid
180                )));
181            }
182            self.inner
183                .add_frozen_task(task)
184                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
185        } else {
186            Ok(0)
187        }
188    }
189
190    /// Generic helper to freeze any `Freezable` state (task or bridge) into the current keyframe.
191    pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
192        self.freeze_task(culistid, item)
193    }
194
195    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
196        if self.is_keyframe(culistid) {
197            let logger = self.logger.as_mut().unwrap();
198            logger.log(&self.inner)?;
199            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
200            // Clear the lock so the next CL can rebuild normally unless re-locked.
201            self.locked = false;
202            Ok(())
203        } else {
204            // Not a keyframe for this CL; ensure we don't carry stale sizes forward.
205            self.last_encoded_bytes = 0;
206            Ok(())
207        }
208    }
209
210    /// Preload a recorded keyframe so it is logged verbatim on the matching CL.
211    #[cfg(feature = "std")]
212    pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
213        self.inner = keyframe.clone();
214        self.forced_timestamp = Some(keyframe.timestamp);
215        self.locked = true;
216    }
217}
218
219/// This is the main structure that will be injected as a member of the Application struct.
220/// CT is the tuple of all the tasks in order of execution.
221/// CL is the type of the copper list, representing the input/output messages for all the tasks.
222pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
223    /// The base clock the runtime will be using to record time.
224    pub clock: RobotClock, // TODO: remove public at some point
225
226    /// The tuple of all the tasks in order of execution.
227    pub tasks: CT,
228
229    /// Tuple of all instantiated bridges.
230    pub bridges: CB,
231
232    /// Resource registry kept alive for tasks borrowing shared handles.
233    pub resources: ResourceManager,
234
235    /// The runtime monitoring.
236    pub monitor: M,
237
238    /// Runtime-side execution progress probe for watchdog/diagnostic monitors.
239    ///
240    /// This probe is written from the generated execution plan before each component
241    /// step. Monitors consume it asynchronously (typically from watchdog threads) to
242    /// report the last known component/step/culist when the runtime appears stalled.
243    #[cfg(feature = "std")]
244    pub execution_probe: ExecutionProbeHandle,
245    #[cfg(not(feature = "std"))]
246    pub execution_probe: RuntimeExecutionProbe,
247
248    /// The logger for the copper lists (messages between tasks)
249    pub copperlists_manager: CopperListsManager<P, NBCL>,
250
251    /// The logger for the state of the tasks (frozen tasks)
252    pub keyframes_manager: KeyFramesManager,
253
254    /// The runtime configuration controlling the behavior of the run loop
255    pub runtime_config: RuntimeConfig,
256}
257
258/// To be able to share the clock we make the runtime a clock provider.
259impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
260    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
261{
262    fn get_clock(&self) -> RobotClock {
263        self.clock.clone()
264    }
265}
266
267/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
268/// It is a double encapsulation: this one recording the culistid and another even in
269/// bincode in the serialized_tasks.
270#[derive(Clone, Encode, Decode)]
271pub struct KeyFrame {
272    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
273    pub culistid: u64,
274    // This is the timestamp when the keyframe was created, using the robot clock.
275    pub timestamp: CuTime,
276    // This is the bincode representation of the tuple of all the tasks.
277    pub serialized_tasks: Vec<u8>,
278}
279
280impl KeyFrame {
281    fn new() -> Self {
282        KeyFrame {
283            culistid: 0,
284            timestamp: CuTime::default(),
285            serialized_tasks: Vec::new(),
286        }
287    }
288
289    /// This is to be able to avoid reallocations
290    fn reset(&mut self, culistid: u64, timestamp: CuTime) {
291        self.culistid = culistid;
292        self.timestamp = timestamp;
293        self.serialized_tasks.clear();
294    }
295
296    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
297    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
298        let cfg = bincode::config::standard();
299        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
300        BincodeAdapter(task).encode(&mut sizer)?;
301        let need = sizer.into_writer().bytes_written as usize;
302
303        let start = self.serialized_tasks.len();
304        self.serialized_tasks.resize(start + need, 0);
305        let mut enc =
306            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
307        BincodeAdapter(task).encode(&mut enc)?;
308        Ok(need)
309    }
310}
311
312/// Identifies where the effective runtime configuration came from.
313#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
314pub enum RuntimeLifecycleConfigSource {
315    ProgrammaticOverride,
316    ExternalFile,
317    BundledDefault,
318}
319
320/// Build-time stack identification metadata.
321#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
322pub struct RuntimeLifecycleStackInfo {
323    pub app_name: String,
324    pub app_version: String,
325    pub git_commit: Option<String>,
326    pub git_dirty: Option<bool>,
327}
328
329/// Runtime lifecycle events emitted in the dedicated lifecycle section.
330#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
331pub enum RuntimeLifecycleEvent {
332    Instantiated {
333        config_source: RuntimeLifecycleConfigSource,
334        effective_config_ron: String,
335        stack: RuntimeLifecycleStackInfo,
336    },
337    MissionStarted {
338        mission: String,
339    },
340    MissionStopped {
341        mission: String,
342        // TODO(lifecycle): replace free-form reason with a typed stop reason enum once
343        // std/no-std behavior and panic integration are split in a follow-up PR.
344        reason: String,
345    },
346    // TODO(lifecycle): wire panic hook / no_std equivalent to emit this event consistently.
347    Panic {
348        message: String,
349        file: Option<String>,
350        line: Option<u32>,
351        column: Option<u32>,
352    },
353    ShutdownCompleted,
354}
355
356/// One event record persisted in the `UnifiedLogType::RuntimeLifecycle` section.
357#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
358pub struct RuntimeLifecycleRecord {
359    pub timestamp: CuTime,
360    pub event: RuntimeLifecycleEvent,
361}
362
363impl<
364    CT,
365    CB,
366    P: CopperListTuple + CuListZeroedInit + Default + 'static,
367    M: CuMonitor,
368    const NBCL: usize,
369> CuRuntime<CT, CB, P, M, NBCL>
370{
371    /// Records runtime execution progress in the shared probe.
372    ///
373    /// This is intentionally lightweight and does not call monitor callbacks.
374    #[inline]
375    pub fn record_execution_marker(&self, marker: ExecutionMarker) {
376        self.execution_probe.record(marker);
377    }
378
379    // FIXME(gbin): this became REALLY ugly with no-std
380    #[allow(clippy::too_many_arguments)]
381    #[cfg(feature = "std")]
382    pub fn new(
383        clock: RobotClock,
384        config: &CuConfig,
385        mission: &str,
386        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
387        tasks_instanciator: impl for<'c> Fn(
388            Vec<Option<&'c ComponentConfig>>,
389            &mut ResourceManager,
390        ) -> CuResult<CT>,
391        monitored_components: &'static [MonitorComponentMetadata],
392        culist_component_mapping: &'static [ComponentId],
393        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
394        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
395        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
396        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
397    ) -> CuResult<Self> {
398        let resources = resources_instanciator(config)?;
399        Self::new_with_resources(
400            clock,
401            config,
402            mission,
403            resources,
404            tasks_instanciator,
405            monitored_components,
406            culist_component_mapping,
407            monitor_instanciator,
408            bridges_instanciator,
409            copperlists_logger,
410            keyframes_logger,
411        )
412    }
413
414    #[allow(clippy::too_many_arguments)]
415    #[cfg(feature = "std")]
416    pub fn new_with_resources(
417        clock: RobotClock,
418        config: &CuConfig,
419        mission: &str,
420        mut resources: ResourceManager,
421        tasks_instanciator: impl for<'c> Fn(
422            Vec<Option<&'c ComponentConfig>>,
423            &mut ResourceManager,
424        ) -> CuResult<CT>,
425        monitored_components: &'static [MonitorComponentMetadata],
426        culist_component_mapping: &'static [ComponentId],
427        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
428        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
429        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
430        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
431    ) -> CuResult<Self> {
432        let graph = config.get_graph(Some(mission))?;
433        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
434            .get_all_nodes()
435            .iter()
436            .map(|(_, node)| node.get_instance_config())
437            .collect();
438
439        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
440        let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
441        let monitor_metadata = CuMonitoringMetadata::new(
442            CompactString::from(mission),
443            monitored_components,
444            culist_component_mapping,
445            CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
446            build_monitor_topology(config, mission)?,
447            None,
448        )?;
449        let monitor_runtime =
450            CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
451        let monitor = monitor_instanciator(config, monitor_metadata, monitor_runtime);
452        let bridges = bridges_instanciator(config, &mut resources)?;
453
454        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
455            Some(logging_config) if logging_config.enable_task_logging => (
456                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
457                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
458                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
459            ),
460            Some(_) => (None, None, 0), // explicit no enable logging
461            None => (
462                // default
463                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
464                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
465                DEFAULT_KEYFRAME_INTERVAL,
466            ),
467        };
468
469        let copperlists_manager = CopperListsManager {
470            inner: CuListsManager::new(),
471            logger: copperlists_logger,
472            last_encoded_bytes: 0,
473        };
474        #[cfg(target_os = "none")]
475        {
476            let cl_size = core::mem::size_of::<CopperList<P>>();
477            let total_bytes = cl_size.saturating_mul(NBCL);
478            info!(
479                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
480                NBCL, cl_size, total_bytes
481            );
482        }
483
484        let keyframes_manager = KeyFramesManager {
485            inner: KeyFrame::new(),
486            logger: keyframes_logger,
487            keyframe_interval,
488            last_encoded_bytes: 0,
489            forced_timestamp: None,
490            locked: false,
491        };
492
493        let runtime_config = config.runtime.clone().unwrap_or_default();
494
495        let runtime = Self {
496            tasks,
497            bridges,
498            resources,
499            monitor,
500            execution_probe,
501            clock,
502            copperlists_manager,
503            keyframes_manager,
504            runtime_config,
505        };
506
507        Ok(runtime)
508    }
509
510    #[allow(clippy::too_many_arguments)]
511    #[cfg(not(feature = "std"))]
512    pub fn new(
513        clock: RobotClock,
514        config: &CuConfig,
515        mission: &str,
516        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
517        tasks_instanciator: impl for<'c> Fn(
518            Vec<Option<&'c ComponentConfig>>,
519            &mut ResourceManager,
520        ) -> CuResult<CT>,
521        monitored_components: &'static [MonitorComponentMetadata],
522        culist_component_mapping: &'static [ComponentId],
523        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
524        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
525        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
526        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
527    ) -> CuResult<Self> {
528        #[cfg(target_os = "none")]
529        info!("CuRuntime::new: resources instanciator");
530        let resources = resources_instanciator(config)?;
531        Self::new_with_resources(
532            clock,
533            config,
534            mission,
535            resources,
536            tasks_instanciator,
537            monitored_components,
538            culist_component_mapping,
539            monitor_instanciator,
540            bridges_instanciator,
541            copperlists_logger,
542            keyframes_logger,
543        )
544    }
545
546    #[allow(clippy::too_many_arguments)]
547    #[cfg(not(feature = "std"))]
548    pub fn new_with_resources(
549        clock: RobotClock,
550        config: &CuConfig,
551        mission: &str,
552        mut resources: ResourceManager,
553        tasks_instanciator: impl for<'c> Fn(
554            Vec<Option<&'c ComponentConfig>>,
555            &mut ResourceManager,
556        ) -> CuResult<CT>,
557        monitored_components: &'static [MonitorComponentMetadata],
558        culist_component_mapping: &'static [ComponentId],
559        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
560        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
561        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
562        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
563    ) -> CuResult<Self> {
564        #[cfg(target_os = "none")]
565        info!("CuRuntime::new: get graph");
566        let graph = config.get_graph(Some(mission))?;
567        #[cfg(target_os = "none")]
568        info!("CuRuntime::new: graph ok");
569        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
570            .get_all_nodes()
571            .iter()
572            .map(|(_, node)| node.get_instance_config())
573            .collect();
574
575        #[cfg(target_os = "none")]
576        info!("CuRuntime::new: tasks instanciator");
577        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
578
579        #[cfg(target_os = "none")]
580        info!("CuRuntime::new: monitor instanciator");
581        let monitor_metadata = CuMonitoringMetadata::new(
582            CompactString::from(mission),
583            monitored_components,
584            culist_component_mapping,
585            CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
586            build_monitor_topology(config, mission)?,
587            None,
588        )?;
589        let monitor_runtime = CuMonitoringRuntime::unavailable();
590        let monitor = monitor_instanciator(config, monitor_metadata, monitor_runtime);
591        let execution_probe = RuntimeExecutionProbe::default();
592        #[cfg(target_os = "none")]
593        info!("CuRuntime::new: monitor instanciator ok");
594        #[cfg(target_os = "none")]
595        info!("CuRuntime::new: bridges instanciator");
596        let bridges = bridges_instanciator(config, &mut resources)?;
597
598        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
599            Some(logging_config) if logging_config.enable_task_logging => (
600                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
601                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
602                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
603            ),
604            Some(_) => (None, None, 0), // explicit no enable logging
605            None => (
606                // default
607                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
608                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
609                DEFAULT_KEYFRAME_INTERVAL,
610            ),
611        };
612
613        let copperlists_manager = CopperListsManager {
614            inner: CuListsManager::new(),
615            logger: copperlists_logger,
616            last_encoded_bytes: 0,
617        };
618        #[cfg(target_os = "none")]
619        {
620            let cl_size = core::mem::size_of::<CopperList<P>>();
621            let total_bytes = cl_size.saturating_mul(NBCL);
622            info!(
623                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
624                NBCL, cl_size, total_bytes
625            );
626        }
627
628        let keyframes_manager = KeyFramesManager {
629            inner: KeyFrame::new(),
630            logger: keyframes_logger,
631            keyframe_interval,
632            last_encoded_bytes: 0,
633            forced_timestamp: None,
634            locked: false,
635        };
636
637        let runtime_config = config.runtime.clone().unwrap_or_default();
638
639        let runtime = Self {
640            tasks,
641            bridges,
642            resources,
643            monitor,
644            execution_probe,
645            clock,
646            copperlists_manager,
647            keyframes_manager,
648            runtime_config,
649        };
650
651        Ok(runtime)
652    }
653}
654
655/// Copper tasks can be of 3 types:
656/// - Source: only producing output messages (usually used for drivers)
657/// - Regular: processing input messages and producing output messages, more like compute nodes.
658/// - Sink: only consuming input messages (usually used for actuators)
659#[derive(Debug, PartialEq, Eq, Clone, Copy)]
660pub enum CuTaskType {
661    Source,
662    Regular,
663    Sink,
664}
665
666#[derive(Debug, Clone)]
667pub struct CuOutputPack {
668    pub culist_index: u32,
669    pub msg_types: Vec<String>,
670}
671
672#[derive(Debug, Clone)]
673pub struct CuInputMsg {
674    pub culist_index: u32,
675    pub msg_type: String,
676    pub src_port: usize,
677    pub edge_id: usize,
678}
679
680/// This structure represents a step in the execution plan.
681pub struct CuExecutionStep {
682    /// NodeId: node id of the task to execute
683    pub node_id: NodeId,
684    /// Node: node instance
685    pub node: Node,
686    /// CuTaskType: type of the task
687    pub task_type: CuTaskType,
688
689    /// the indices in the copper list of the input messages and their types
690    pub input_msg_indices_types: Vec<CuInputMsg>,
691
692    /// the index in the copper list of the output message and its type
693    pub output_msg_pack: Option<CuOutputPack>,
694}
695
696impl Debug for CuExecutionStep {
697    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
698        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
699        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
700        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
701        f.write_str(
702            format!(
703                "              input_msg_types: {:?}\n",
704                self.input_msg_indices_types
705            )
706            .as_str(),
707        )?;
708        f.write_str(format!("       output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
709        Ok(())
710    }
711}
712
713/// This structure represents a loop in the execution plan.
714/// It is used to represent a sequence of Execution units (loop or steps) that are executed
715/// multiple times.
716/// if loop_count is None, the loop is infinite.
717pub struct CuExecutionLoop {
718    pub steps: Vec<CuExecutionUnit>,
719    pub loop_count: Option<u32>,
720}
721
722impl Debug for CuExecutionLoop {
723    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
724        f.write_str("CuExecutionLoop:\n")?;
725        for step in &self.steps {
726            match step {
727                CuExecutionUnit::Step(step) => {
728                    step.fmt(f)?;
729                }
730                CuExecutionUnit::Loop(l) => {
731                    l.fmt(f)?;
732                }
733            }
734        }
735
736        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
737        Ok(())
738    }
739}
740
741/// This structure represents a step in the execution plan.
742#[derive(Debug)]
743pub enum CuExecutionUnit {
744    Step(Box<CuExecutionStep>),
745    Loop(CuExecutionLoop),
746}
747
748fn find_output_pack_from_nodeid(
749    node_id: NodeId,
750    steps: &Vec<CuExecutionUnit>,
751) -> Option<CuOutputPack> {
752    for step in steps {
753        match step {
754            CuExecutionUnit::Loop(loop_unit) => {
755                if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
756                    return Some(output_pack);
757                }
758            }
759            CuExecutionUnit::Step(step) => {
760                if step.node_id == node_id {
761                    return step.output_msg_pack.clone();
762                }
763            }
764        }
765    }
766    None
767}
768
769pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
770    if graph.incoming_neighbor_count(node_id) == 0 {
771        CuTaskType::Source
772    } else if graph.outgoing_neighbor_count(node_id) == 0 {
773        CuTaskType::Sink
774    } else {
775        CuTaskType::Regular
776    }
777}
778
779/// The connection id used here is the index of the config graph edge that equates to the wanted
780/// connection.
781fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
782    input_msg_indices_types.sort_by_key(|input| input.edge_id);
783}
784
785fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
786    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
787    edge_ids.sort();
788
789    let mut msg_order: Vec<(usize, String)> = Vec::new();
790    let mut record_msg = |msg: String, order: usize| {
791        if let Some((existing_order, _)) = msg_order
792            .iter_mut()
793            .find(|(_, existing_msg)| *existing_msg == msg)
794        {
795            if order < *existing_order {
796                *existing_order = order;
797            }
798            return;
799        }
800        msg_order.push((order, msg));
801    };
802
803    for edge_id in edge_ids {
804        if let Some(edge) = graph.edge(edge_id) {
805            let order = if edge.order == usize::MAX {
806                edge_id
807            } else {
808                edge.order
809            };
810            record_msg(edge.msg.clone(), order);
811        }
812    }
813    if let Some(node) = graph.get_node(node_id) {
814        for (msg, order) in node.nc_outputs_with_order() {
815            record_msg(msg.clone(), order);
816        }
817    }
818
819    msg_order.sort_by(|(order_a, msg_a), (order_b, msg_b)| {
820        order_a.cmp(order_b).then_with(|| msg_a.cmp(msg_b))
821    });
822    msg_order.into_iter().map(|(_, msg)| msg).collect()
823}
824/// Explores a subbranch and build the partial plan out of it.
825fn plan_tasks_tree_branch(
826    graph: &CuGraph,
827    mut next_culist_output_index: u32,
828    starting_point: NodeId,
829    plan: &mut Vec<CuExecutionUnit>,
830) -> (u32, bool) {
831    #[cfg(all(feature = "std", feature = "macro_debug"))]
832    eprintln!("-- starting branch from node {starting_point}");
833
834    let mut handled = false;
835
836    for id in graph.bfs_nodes(starting_point) {
837        let node_ref = graph.get_node(id).unwrap();
838        #[cfg(all(feature = "std", feature = "macro_debug"))]
839        eprintln!("  Visiting node: {node_ref:?}");
840
841        let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
842        let output_msg_pack: Option<CuOutputPack>;
843        let task_type = find_task_type_for_id(graph, id);
844
845        match task_type {
846            CuTaskType::Source => {
847                #[cfg(all(feature = "std", feature = "macro_debug"))]
848                eprintln!("    → Source node, assign output index {next_culist_output_index}");
849                let msg_types = collect_output_msg_types(graph, id);
850                if msg_types.is_empty() {
851                    panic!(
852                        "Source node '{}' has no outgoing connections",
853                        node_ref.get_id()
854                    );
855                }
856                output_msg_pack = Some(CuOutputPack {
857                    culist_index: next_culist_output_index,
858                    msg_types,
859                });
860                next_culist_output_index += 1;
861            }
862            CuTaskType::Sink => {
863                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
864                edge_ids.sort();
865                #[cfg(all(feature = "std", feature = "macro_debug"))]
866                eprintln!("    → Sink with incoming edges: {edge_ids:?}");
867                for edge_id in edge_ids {
868                    let edge = graph
869                        .edge(edge_id)
870                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
871                    let pid = graph
872                        .get_node_id_by_name(edge.src.as_str())
873                        .unwrap_or_else(|| {
874                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
875                        });
876                    let output_pack = find_output_pack_from_nodeid(pid, plan);
877                    if let Some(output_pack) = output_pack {
878                        #[cfg(all(feature = "std", feature = "macro_debug"))]
879                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
880                        let msg_type = edge.msg.as_str();
881                        let src_port = output_pack
882                            .msg_types
883                            .iter()
884                            .position(|msg| msg == msg_type)
885                            .unwrap_or_else(|| {
886                                panic!(
887                                    "Missing output port for message type '{msg_type}' on node {pid}"
888                                )
889                            });
890                        input_msg_indices_types.push(CuInputMsg {
891                            culist_index: output_pack.culist_index,
892                            msg_type: msg_type.to_string(),
893                            src_port,
894                            edge_id,
895                        });
896                    } else {
897                        #[cfg(all(feature = "std", feature = "macro_debug"))]
898                        eprintln!("      ✗ Input from {pid} not ready, returning");
899                        return (next_culist_output_index, handled);
900                    }
901                }
902                output_msg_pack = Some(CuOutputPack {
903                    culist_index: next_culist_output_index,
904                    msg_types: Vec::from(["()".to_string()]),
905                });
906                next_culist_output_index += 1;
907            }
908            CuTaskType::Regular => {
909                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
910                edge_ids.sort();
911                #[cfg(all(feature = "std", feature = "macro_debug"))]
912                eprintln!("    → Regular task with incoming edges: {edge_ids:?}");
913                for edge_id in edge_ids {
914                    let edge = graph
915                        .edge(edge_id)
916                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
917                    let pid = graph
918                        .get_node_id_by_name(edge.src.as_str())
919                        .unwrap_or_else(|| {
920                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
921                        });
922                    let output_pack = find_output_pack_from_nodeid(pid, plan);
923                    if let Some(output_pack) = output_pack {
924                        #[cfg(all(feature = "std", feature = "macro_debug"))]
925                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
926                        let msg_type = edge.msg.as_str();
927                        let src_port = output_pack
928                            .msg_types
929                            .iter()
930                            .position(|msg| msg == msg_type)
931                            .unwrap_or_else(|| {
932                                panic!(
933                                    "Missing output port for message type '{msg_type}' on node {pid}"
934                                )
935                            });
936                        input_msg_indices_types.push(CuInputMsg {
937                            culist_index: output_pack.culist_index,
938                            msg_type: msg_type.to_string(),
939                            src_port,
940                            edge_id,
941                        });
942                    } else {
943                        #[cfg(all(feature = "std", feature = "macro_debug"))]
944                        eprintln!("      ✗ Input from {pid} not ready, returning");
945                        return (next_culist_output_index, handled);
946                    }
947                }
948                let msg_types = collect_output_msg_types(graph, id);
949                if msg_types.is_empty() {
950                    panic!(
951                        "Regular node '{}' has no outgoing connections",
952                        node_ref.get_id()
953                    );
954                }
955                output_msg_pack = Some(CuOutputPack {
956                    culist_index: next_culist_output_index,
957                    msg_types,
958                });
959                next_culist_output_index += 1;
960            }
961        }
962
963        sort_inputs_by_cnx_id(&mut input_msg_indices_types);
964
965        if let Some(pos) = plan
966            .iter()
967            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
968        {
969            #[cfg(all(feature = "std", feature = "macro_debug"))]
970            eprintln!("    → Already in plan, modifying existing step");
971            let mut step = plan.remove(pos);
972            if let CuExecutionUnit::Step(ref mut s) = step {
973                s.input_msg_indices_types = input_msg_indices_types;
974            }
975            plan.push(step);
976        } else {
977            #[cfg(all(feature = "std", feature = "macro_debug"))]
978            eprintln!("    → New step added to plan");
979            let step = CuExecutionStep {
980                node_id: id,
981                node: node_ref.clone(),
982                task_type,
983                input_msg_indices_types,
984                output_msg_pack,
985            };
986            plan.push(CuExecutionUnit::Step(Box::new(step)));
987        }
988
989        handled = true;
990    }
991
992    #[cfg(all(feature = "std", feature = "macro_debug"))]
993    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
994    (next_culist_output_index, handled)
995}
996
997/// This is the main heuristics to compute an execution plan at compilation time.
998/// TODO(gbin): Make that heuristic pluggable.
999pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1000    #[cfg(all(feature = "std", feature = "macro_debug"))]
1001    eprintln!("[runtime plan]");
1002    let mut plan = Vec::new();
1003    let mut next_culist_output_index = 0u32;
1004
1005    let mut queue: VecDeque<NodeId> = graph
1006        .node_ids()
1007        .into_iter()
1008        .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
1009        .collect();
1010
1011    #[cfg(all(feature = "std", feature = "macro_debug"))]
1012    eprintln!("Initial source nodes: {queue:?}");
1013
1014    while let Some(start_node) = queue.pop_front() {
1015        #[cfg(all(feature = "std", feature = "macro_debug"))]
1016        eprintln!("→ Starting BFS from source {start_node}");
1017        for node_id in graph.bfs_nodes(start_node) {
1018            let already_in_plan = plan
1019                .iter()
1020                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1021            if already_in_plan {
1022                #[cfg(all(feature = "std", feature = "macro_debug"))]
1023                eprintln!("    → Node {node_id} already planned, skipping");
1024                continue;
1025            }
1026
1027            #[cfg(all(feature = "std", feature = "macro_debug"))]
1028            eprintln!("    Planning from node {node_id}");
1029            let (new_index, handled) =
1030                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
1031            next_culist_output_index = new_index;
1032
1033            if !handled {
1034                #[cfg(all(feature = "std", feature = "macro_debug"))]
1035                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1036                continue;
1037            }
1038
1039            #[cfg(all(feature = "std", feature = "macro_debug"))]
1040            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
1041            for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1042                #[cfg(all(feature = "std", feature = "macro_debug"))]
1043                eprintln!("      → Enqueueing neighbor {neighbor}");
1044                queue.push_back(neighbor);
1045            }
1046        }
1047    }
1048
1049    let mut planned_nodes = BTreeSet::new();
1050    for unit in &plan {
1051        if let CuExecutionUnit::Step(step) = unit {
1052            planned_nodes.insert(step.node_id);
1053        }
1054    }
1055
1056    let mut missing = Vec::new();
1057    for node_id in graph.node_ids() {
1058        if !planned_nodes.contains(&node_id) {
1059            if let Some(node) = graph.get_node(node_id) {
1060                missing.push(node.get_id().to_string());
1061            } else {
1062                missing.push(format!("node_id_{node_id}"));
1063            }
1064        }
1065    }
1066
1067    if !missing.is_empty() {
1068        missing.sort();
1069        return Err(CuError::from(format!(
1070            "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1071            missing.join(", ")
1072        )));
1073    }
1074
1075    Ok(CuExecutionLoop {
1076        steps: plan,
1077        loop_count: None,
1078    })
1079}
1080
1081//tests
1082#[cfg(test)]
1083mod tests {
1084    use super::*;
1085    use crate::config::Node;
1086    use crate::context::CuContext;
1087    use crate::cutask::CuSinkTask;
1088    use crate::cutask::{CuSrcTask, Freezable};
1089    use crate::monitoring::NoMonitor;
1090    use crate::reflect::Reflect;
1091    use bincode::Encode;
1092    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1093    use serde_derive::{Deserialize, Serialize};
1094
1095    #[derive(Reflect)]
1096    pub struct TestSource {}
1097
1098    impl Freezable for TestSource {}
1099
1100    impl CuSrcTask for TestSource {
1101        type Resources<'r> = ();
1102        type Output<'m> = ();
1103        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1104        where
1105            Self: Sized,
1106        {
1107            Ok(Self {})
1108        }
1109
1110        fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1111            Ok(())
1112        }
1113    }
1114
1115    #[derive(Reflect)]
1116    pub struct TestSink {}
1117
1118    impl Freezable for TestSink {}
1119
1120    impl CuSinkTask for TestSink {
1121        type Resources<'r> = ();
1122        type Input<'m> = ();
1123
1124        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1125        where
1126            Self: Sized,
1127        {
1128            Ok(Self {})
1129        }
1130
1131        fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1132            Ok(())
1133        }
1134    }
1135
1136    // Those should be generated by the derive macro
1137    type Tasks = (TestSource, TestSink);
1138
1139    #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1140    struct Msgs(());
1141
1142    impl ErasedCuStampedDataSet for Msgs {
1143        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1144            Vec::new()
1145        }
1146    }
1147
1148    impl MatchingTasks for Msgs {
1149        fn get_all_task_ids() -> &'static [&'static str] {
1150            &[]
1151        }
1152    }
1153
1154    impl CuListZeroedInit for Msgs {
1155        fn init_zeroed(&mut self) {}
1156    }
1157
1158    #[cfg(feature = "std")]
1159    fn tasks_instanciator(
1160        all_instances_configs: Vec<Option<&ComponentConfig>>,
1161        _resources: &mut ResourceManager,
1162    ) -> CuResult<Tasks> {
1163        Ok((
1164            TestSource::new(all_instances_configs[0], ())?,
1165            TestSink::new(all_instances_configs[1], ())?,
1166        ))
1167    }
1168
1169    #[cfg(not(feature = "std"))]
1170    fn tasks_instanciator(
1171        all_instances_configs: Vec<Option<&ComponentConfig>>,
1172        _resources: &mut ResourceManager,
1173    ) -> CuResult<Tasks> {
1174        Ok((
1175            TestSource::new(all_instances_configs[0], ())?,
1176            TestSink::new(all_instances_configs[1], ())?,
1177        ))
1178    }
1179
1180    fn monitor_instanciator(
1181        _config: &CuConfig,
1182        metadata: CuMonitoringMetadata,
1183        runtime: CuMonitoringRuntime,
1184    ) -> NoMonitor {
1185        NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
1186    }
1187
1188    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1189        Ok(())
1190    }
1191
1192    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1193        Ok(ResourceManager::new(&[]))
1194    }
1195
1196    #[derive(Debug)]
1197    struct FakeWriter {}
1198
1199    impl<E: Encode> WriteStream<E> for FakeWriter {
1200        fn log(&mut self, _obj: &E) -> CuResult<()> {
1201            Ok(())
1202        }
1203    }
1204
1205    #[test]
1206    fn test_runtime_instantiation() {
1207        let mut config = CuConfig::default();
1208        let graph = config.get_graph_mut(None).unwrap();
1209        graph.add_node(Node::new("a", "TestSource")).unwrap();
1210        graph.add_node(Node::new("b", "TestSink")).unwrap();
1211        graph.connect(0, 1, "()").unwrap();
1212        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1213            RobotClock::default(),
1214            &config,
1215            crate::config::DEFAULT_MISSION_ID,
1216            resources_instanciator,
1217            tasks_instanciator,
1218            &[],
1219            &[],
1220            monitor_instanciator,
1221            bridges_instanciator,
1222            FakeWriter {},
1223            FakeWriter {},
1224        );
1225        assert!(runtime.is_ok());
1226    }
1227
1228    #[test]
1229    fn test_copperlists_manager_lifecycle() {
1230        let mut config = CuConfig::default();
1231        let graph = config.get_graph_mut(None).unwrap();
1232        graph.add_node(Node::new("a", "TestSource")).unwrap();
1233        graph.add_node(Node::new("b", "TestSink")).unwrap();
1234        graph.connect(0, 1, "()").unwrap();
1235
1236        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1237            RobotClock::default(),
1238            &config,
1239            crate::config::DEFAULT_MISSION_ID,
1240            resources_instanciator,
1241            tasks_instanciator,
1242            &[],
1243            &[],
1244            monitor_instanciator,
1245            bridges_instanciator,
1246            FakeWriter {},
1247            FakeWriter {},
1248        )
1249        .unwrap();
1250
1251        // Now emulates the generated runtime
1252        {
1253            let copperlists = &mut runtime.copperlists_manager;
1254            let culist0 = copperlists
1255                .inner
1256                .create()
1257                .expect("Ran out of space for copper lists");
1258            // FIXME: error handling.
1259            let id = culist0.id;
1260            assert_eq!(id, 0);
1261            culist0.change_state(CopperListState::Processing);
1262            assert_eq!(copperlists.available_copper_lists(), 1);
1263        }
1264
1265        {
1266            let copperlists = &mut runtime.copperlists_manager;
1267            let culist1 = copperlists
1268                .inner
1269                .create()
1270                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1271            let id = culist1.id;
1272            assert_eq!(id, 1);
1273            culist1.change_state(CopperListState::Processing);
1274            assert_eq!(copperlists.available_copper_lists(), 0);
1275        }
1276
1277        {
1278            let copperlists = &mut runtime.copperlists_manager;
1279            let culist2 = copperlists.inner.create();
1280            assert!(culist2.is_none());
1281            assert_eq!(copperlists.available_copper_lists(), 0);
1282            // Free in order, should let the top of the stack be serialized and freed.
1283            let _ = copperlists.end_of_processing(1);
1284            assert_eq!(copperlists.available_copper_lists(), 1);
1285        }
1286
1287        // Readd a CL
1288        {
1289            let copperlists = &mut runtime.copperlists_manager;
1290            let culist2 = copperlists
1291                .inner
1292                .create()
1293                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1294            let id = culist2.id;
1295            assert_eq!(id, 2);
1296            culist2.change_state(CopperListState::Processing);
1297            assert_eq!(copperlists.available_copper_lists(), 0);
1298            // Free out of order, the #0 first
1299            let _ = copperlists.end_of_processing(0);
1300            // Should not free up the top of the stack
1301            assert_eq!(copperlists.available_copper_lists(), 0);
1302
1303            // Free up the top of the stack
1304            let _ = copperlists.end_of_processing(2);
1305            // This should free up 2 CLs
1306
1307            assert_eq!(copperlists.available_copper_lists(), 2);
1308        }
1309    }
1310
1311    #[test]
1312    fn test_runtime_task_input_order() {
1313        let mut config = CuConfig::default();
1314        let graph = config.get_graph_mut(None).unwrap();
1315        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1316        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1317        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1318
1319        assert_eq!(src1_id, 0);
1320        assert_eq!(src2_id, 1);
1321
1322        // note that the source2 connection is before the source1
1323        let src1_type = "src1_type";
1324        let src2_type = "src2_type";
1325        graph.connect(src2_id, sink_id, src2_type).unwrap();
1326        graph.connect(src1_id, sink_id, src1_type).unwrap();
1327
1328        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1329        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1330        // the edge id depends on the order the connection is created, not
1331        // on the node id, and that is what determines the input order
1332        assert_eq!(src1_edge_id, 1);
1333        assert_eq!(src2_edge_id, 0);
1334
1335        let runtime = compute_runtime_plan(graph).unwrap();
1336        let sink_step = runtime
1337            .steps
1338            .iter()
1339            .find_map(|step| match step {
1340                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1341                _ => None,
1342            })
1343            .unwrap();
1344
1345        // since the src2 connection was added before src1 connection, the src2 type should be
1346        // first
1347        assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
1348        assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
1349    }
1350
1351    #[test]
1352    fn test_runtime_output_ports_unique_ordered() {
1353        let mut config = CuConfig::default();
1354        let graph = config.get_graph_mut(None).unwrap();
1355        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1356        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1357        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1358        let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
1359        let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
1360
1361        graph.connect(src_id, dst_a_id, "msg::A").unwrap();
1362        graph.connect(src_id, dst_b_id, "msg::B").unwrap();
1363        graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
1364        graph.connect(src_id, dst_c_id, "msg::C").unwrap();
1365
1366        let runtime = compute_runtime_plan(graph).unwrap();
1367        let src_step = runtime
1368            .steps
1369            .iter()
1370            .find_map(|step| match step {
1371                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1372                _ => None,
1373            })
1374            .unwrap();
1375
1376        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1377        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
1378
1379        let dst_a_step = runtime
1380            .steps
1381            .iter()
1382            .find_map(|step| match step {
1383                CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
1384                _ => None,
1385            })
1386            .unwrap();
1387        let dst_b_step = runtime
1388            .steps
1389            .iter()
1390            .find_map(|step| match step {
1391                CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
1392                _ => None,
1393            })
1394            .unwrap();
1395        let dst_a2_step = runtime
1396            .steps
1397            .iter()
1398            .find_map(|step| match step {
1399                CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
1400                _ => None,
1401            })
1402            .unwrap();
1403        let dst_c_step = runtime
1404            .steps
1405            .iter()
1406            .find_map(|step| match step {
1407                CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
1408                _ => None,
1409            })
1410            .unwrap();
1411
1412        assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
1413        assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
1414        assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
1415        assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
1416    }
1417
1418    #[test]
1419    fn test_runtime_output_ports_fanout_single() {
1420        let mut config = CuConfig::default();
1421        let graph = config.get_graph_mut(None).unwrap();
1422        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1423        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1424        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1425
1426        graph.connect(src_id, dst_a_id, "i32").unwrap();
1427        graph.connect(src_id, dst_b_id, "i32").unwrap();
1428
1429        let runtime = compute_runtime_plan(graph).unwrap();
1430        let src_step = runtime
1431            .steps
1432            .iter()
1433            .find_map(|step| match step {
1434                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1435                _ => None,
1436            })
1437            .unwrap();
1438
1439        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1440        assert_eq!(output_pack.msg_types, vec!["i32"]);
1441    }
1442
1443    #[test]
1444    fn test_runtime_output_ports_include_nc_outputs() {
1445        let mut config = CuConfig::default();
1446        let graph = config.get_graph_mut(None).unwrap();
1447        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1448        let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
1449        graph.connect(src_id, dst_id, "msg::A").unwrap();
1450        graph
1451            .get_node_mut(src_id)
1452            .expect("missing source node")
1453            .add_nc_output("msg::B", usize::MAX);
1454
1455        let runtime = compute_runtime_plan(graph).unwrap();
1456        let src_step = runtime
1457            .steps
1458            .iter()
1459            .find_map(|step| match step {
1460                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1461                _ => None,
1462            })
1463            .unwrap();
1464        let dst_step = runtime
1465            .steps
1466            .iter()
1467            .find_map(|step| match step {
1468                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
1469                _ => None,
1470            })
1471            .unwrap();
1472
1473        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1474        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
1475        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
1476    }
1477
1478    #[test]
1479    fn test_runtime_output_ports_respect_connection_order_with_nc() {
1480        let txt = r#"(
1481            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
1482            cnx: [
1483                (src: "src", dst: "__nc__", msg: "msg::A"),
1484                (src: "src", dst: "sink", msg: "msg::B"),
1485            ]
1486        )"#;
1487        let config = CuConfig::deserialize_ron(txt).unwrap();
1488        let graph = config.get_graph(None).unwrap();
1489        let src_id = graph.get_node_id_by_name("src").unwrap();
1490        let dst_id = graph.get_node_id_by_name("sink").unwrap();
1491
1492        let runtime = compute_runtime_plan(graph).unwrap();
1493        let src_step = runtime
1494            .steps
1495            .iter()
1496            .find_map(|step| match step {
1497                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1498                _ => None,
1499            })
1500            .unwrap();
1501        let dst_step = runtime
1502            .steps
1503            .iter()
1504            .find_map(|step| match step {
1505                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
1506                _ => None,
1507            })
1508            .unwrap();
1509
1510        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1511        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
1512        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
1513    }
1514
1515    #[cfg(feature = "std")]
1516    #[test]
1517    fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
1518        let txt = r#"(
1519            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
1520            cnx: [
1521                (src: "src", dst: "__nc__", msg: "msg::A"),
1522                (src: "src", dst: "sink", msg: "msg::B"),
1523            ]
1524        )"#;
1525        let tmp = tempfile::NamedTempFile::new().unwrap();
1526        std::fs::write(tmp.path(), txt).unwrap();
1527        let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
1528        let graph = config.get_graph(None).unwrap();
1529        let src_id = graph.get_node_id_by_name("src").unwrap();
1530        let dst_id = graph.get_node_id_by_name("sink").unwrap();
1531
1532        let runtime = compute_runtime_plan(graph).unwrap();
1533        let src_step = runtime
1534            .steps
1535            .iter()
1536            .find_map(|step| match step {
1537                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1538                _ => None,
1539            })
1540            .unwrap();
1541        let dst_step = runtime
1542            .steps
1543            .iter()
1544            .find_map(|step| match step {
1545                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
1546                _ => None,
1547            })
1548            .unwrap();
1549
1550        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1551        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
1552        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
1553    }
1554
1555    #[test]
1556    fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
1557        let txt = r#"(
1558            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
1559            cnx: [
1560                (src: "src", dst: "__nc__", msg: "i32"),
1561                (src: "src", dst: "sink", msg: "bool"),
1562            ]
1563        )"#;
1564        let config = CuConfig::deserialize_ron(txt).unwrap();
1565        let graph = config.get_graph(None).unwrap();
1566        let src_id = graph.get_node_id_by_name("src").unwrap();
1567        let dst_id = graph.get_node_id_by_name("sink").unwrap();
1568
1569        let runtime = compute_runtime_plan(graph).unwrap();
1570        let src_step = runtime
1571            .steps
1572            .iter()
1573            .find_map(|step| match step {
1574                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1575                _ => None,
1576            })
1577            .unwrap();
1578        let dst_step = runtime
1579            .steps
1580            .iter()
1581            .find_map(|step| match step {
1582                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
1583                _ => None,
1584            })
1585            .unwrap();
1586
1587        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1588        assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
1589        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
1590    }
1591
1592    #[test]
1593    fn test_runtime_plan_diamond_case1() {
1594        // more complex topology that tripped the scheduler
1595        let mut config = CuConfig::default();
1596        let graph = config.get_graph_mut(None).unwrap();
1597        let cam0_id = graph
1598            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1599            .unwrap();
1600        let inf0_id = graph
1601            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1602            .unwrap();
1603        let broadcast_id = graph
1604            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1605            .unwrap();
1606
1607        // case 1 order
1608        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1609        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1610        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1611
1612        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1613        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1614
1615        assert_eq!(edge_cam0_to_inf0, 0);
1616        assert_eq!(edge_cam0_to_broadcast, 1);
1617
1618        let runtime = compute_runtime_plan(graph).unwrap();
1619        let broadcast_step = runtime
1620            .steps
1621            .iter()
1622            .find_map(|step| match step {
1623                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1624                _ => None,
1625            })
1626            .unwrap();
1627
1628        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1629        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1630    }
1631
1632    #[test]
1633    fn test_runtime_plan_diamond_case2() {
1634        // more complex topology that tripped the scheduler variation 2
1635        let mut config = CuConfig::default();
1636        let graph = config.get_graph_mut(None).unwrap();
1637        let cam0_id = graph
1638            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1639            .unwrap();
1640        let inf0_id = graph
1641            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1642            .unwrap();
1643        let broadcast_id = graph
1644            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1645            .unwrap();
1646
1647        // case 2 order
1648        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1649        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1650        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1651
1652        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1653        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1654
1655        assert_eq!(edge_cam0_to_broadcast, 0);
1656        assert_eq!(edge_cam0_to_inf0, 1);
1657
1658        let runtime = compute_runtime_plan(graph).unwrap();
1659        let broadcast_step = runtime
1660            .steps
1661            .iter()
1662            .find_map(|step| match step {
1663                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1664                _ => None,
1665            })
1666            .unwrap();
1667
1668        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1669        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1670    }
1671}