Skip to main content

cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::app::Subsystem;
6use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node, TaskKind};
7use crate::config::{
8    CuConfig, CuGraph, MAX_RATE_TARGET_HZ, NodeId, RuntimeConfig, resolve_task_kind_for_id,
9};
10use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
11use crate::cutask::{BincodeAdapter, Freezable};
12#[cfg(feature = "std")]
13use crate::monitoring::ExecutionProbeHandle;
14#[cfg(feature = "std")]
15use crate::monitoring::MonitorExecutionProbe;
16use crate::monitoring::{
17    ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
18    ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
19    take_last_completed_handle_bytes,
20};
21#[cfg(all(feature = "std", feature = "parallel-rt"))]
22use crate::parallel_rt::{ParallelRt, ParallelRtMetadata};
23use crate::resource::ResourceManager;
24use compact_str::CompactString;
25use cu29_clock::{ClockProvider, CuDuration, CuTime, RobotClock};
26use cu29_traits::CuResult;
27use cu29_traits::WriteStream;
28use cu29_traits::{CopperListTuple, CuError};
29
30#[cfg(target_os = "none")]
31#[allow(unused_imports)]
32use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
33#[cfg(target_os = "none")]
34#[allow(unused_imports)]
35use cu29_log_derive::info;
36#[cfg(target_os = "none")]
37#[allow(unused_imports)]
38use cu29_log_runtime::log;
39#[cfg(all(target_os = "none", debug_assertions))]
40#[allow(unused_imports)]
41use cu29_log_runtime::log_debug_mode;
42#[cfg(target_os = "none")]
43#[allow(unused_imports)]
44use cu29_value::to_value;
45
46#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
47use alloc::alloc::{alloc_zeroed, handle_alloc_error};
48use alloc::boxed::Box;
49use alloc::collections::{BTreeSet, VecDeque};
50use alloc::format;
51use alloc::string::{String, ToString};
52use alloc::vec::Vec;
53use bincode::enc::EncoderImpl;
54use bincode::enc::write::{SizeWriter, SliceWriter};
55use bincode::error::EncodeError;
56use bincode::{Decode, Encode};
57#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
58use core::alloc::Layout;
59use core::fmt::Result as FmtResult;
60use core::fmt::{Debug, Formatter};
61use core::marker::PhantomData;
62
63#[cfg(all(feature = "std", feature = "async-cl-io"))]
64use std::sync::mpsc::{Receiver, SyncSender, TryRecvError, sync_channel};
65#[cfg(all(feature = "std", feature = "async-cl-io"))]
66use std::thread::JoinHandle;
67
68#[doc(hidden)]
69pub type TasksInstantiator<CT> =
70    for<'c> fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>;
71#[doc(hidden)]
72pub type BridgesInstantiator<CB> = fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>;
73#[doc(hidden)]
74pub type MonitorInstantiator<M> = fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M;
75
76#[doc(hidden)]
77pub struct CuRuntimeParts<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI> {
78    pub tasks_instanciator: TI,
79    pub monitored_components: &'static [MonitorComponentMetadata],
80    pub culist_component_mapping: &'static [ComponentId],
81    #[cfg(all(feature = "std", feature = "parallel-rt"))]
82    pub parallel_rt_metadata: &'static ParallelRtMetadata,
83    pub monitor_instanciator: MI,
84    pub bridges_instanciator: BI,
85    _payload: PhantomData<(CT, CB, P, M, [(); NBCL])>,
86}
87
88impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI>
89    CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>
90{
91    pub const fn new(
92        tasks_instanciator: TI,
93        monitored_components: &'static [MonitorComponentMetadata],
94        culist_component_mapping: &'static [ComponentId],
95        #[cfg(all(feature = "std", feature = "parallel-rt"))]
96        parallel_rt_metadata: &'static ParallelRtMetadata,
97        monitor_instanciator: MI,
98        bridges_instanciator: BI,
99    ) -> Self {
100        Self {
101            tasks_instanciator,
102            monitored_components,
103            culist_component_mapping,
104            #[cfg(all(feature = "std", feature = "parallel-rt"))]
105            parallel_rt_metadata,
106            monitor_instanciator,
107            bridges_instanciator,
108            _payload: PhantomData,
109        }
110    }
111}
112
113#[doc(hidden)]
114pub struct CuRuntimeBuilder<
115    'cfg,
116    CT,
117    CB,
118    P: CopperListTuple,
119    M: CuMonitor,
120    const NBCL: usize,
121    TI,
122    BI,
123    MI,
124    CLW,
125    KFW,
126> {
127    clock: RobotClock,
128    config: &'cfg CuConfig,
129    mission: &'cfg str,
130    subsystem: Subsystem,
131    instance_id: u32,
132    resources: Option<ResourceManager>,
133    parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
134    copperlists_logger: CLW,
135    keyframes_logger: KFW,
136}
137
138impl<'cfg, CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI, CLW, KFW>
139    CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
140{
141    pub fn new(
142        clock: RobotClock,
143        config: &'cfg CuConfig,
144        mission: &'cfg str,
145        parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
146        copperlists_logger: CLW,
147        keyframes_logger: KFW,
148    ) -> Self {
149        Self {
150            clock,
151            config,
152            mission,
153            subsystem: Subsystem::new(None, 0),
154            instance_id: 0,
155            resources: None,
156            parts,
157            copperlists_logger,
158            keyframes_logger,
159        }
160    }
161
162    pub fn with_subsystem(mut self, subsystem: Subsystem) -> Self {
163        self.subsystem = subsystem;
164        self
165    }
166
167    pub fn with_instance_id(mut self, instance_id: u32) -> Self {
168        self.instance_id = instance_id;
169        self
170    }
171
172    pub fn with_resources(mut self, resources: ResourceManager) -> Self {
173        self.resources = Some(resources);
174        self
175    }
176
177    pub fn try_with_resources_instantiator(
178        mut self,
179        resources_instantiator: impl FnOnce(&CuConfig) -> CuResult<ResourceManager>,
180    ) -> CuResult<Self> {
181        self.resources = Some(resources_instantiator(self.config)?);
182        Ok(self)
183    }
184}
185
186/// Returns a monotonic instant used for local runtime performance timing.
187///
188/// When `sysclock-perf` (and `std`) are enabled this uses a process-local
189/// `RobotClock::new()` instance for timing. The returned value is a
190/// monotonically increasing duration since an unspecified origin (typically
191/// process or runtime initialization), not a wall-clock time-of-day. When
192/// `sysclock-perf` is disabled it delegates to the provided `RobotClock`.
193///
194/// This is intentionally separate from `LoopRateLimiter`, which always uses the
195/// provided `RobotClock` so `runtime.rate_target_hz` stays tied to robot time.
196#[inline]
197pub fn perf_now(_clock: &RobotClock) -> CuTime {
198    #[cfg(all(feature = "std", feature = "sysclock-perf"))]
199    {
200        static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
201        return PERF_CLOCK.get_or_init(RobotClock::new).now();
202    }
203
204    #[allow(unreachable_code)]
205    _clock.now()
206}
207
208#[cfg(all(feature = "std", feature = "high-precision-limiter"))]
209const HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS: u64 = 200_000;
210
211/// Convert a configured runtime rate target to an integer-nanosecond period.
212#[inline]
213pub fn rate_target_period(rate_target_hz: u64) -> CuResult<CuDuration> {
214    if rate_target_hz == 0 {
215        return Err(CuError::from(
216            "Runtime rate target cannot be zero. Set runtime.rate_target_hz to at least 1.",
217        ));
218    }
219
220    if rate_target_hz > MAX_RATE_TARGET_HZ {
221        return Err(CuError::from(format!(
222            "Runtime rate target ({rate_target_hz} Hz) exceeds the supported maximum of {MAX_RATE_TARGET_HZ} Hz."
223        )));
224    }
225
226    Ok(CuDuration::from(MAX_RATE_TARGET_HZ / rate_target_hz))
227}
228
229/// Runtime loop limiter that preserves phase with absolute deadlines.
230///
231/// This is intentionally a small runtime helper so generated applications do
232/// not have to open-code loop scheduling policy. Deadlines are tracked against
233/// the provided `RobotClock`, even when `sysclock-perf` is enabled for
234/// process-time measurements.
235#[derive(Clone, Copy, Debug, PartialEq, Eq)]
236pub struct LoopRateLimiter {
237    period: CuDuration,
238    next_deadline: CuTime,
239}
240
241impl LoopRateLimiter {
242    #[inline]
243    pub fn from_rate_target_hz(rate_target_hz: u64, clock: &RobotClock) -> CuResult<Self> {
244        let period = rate_target_period(rate_target_hz)?;
245        Ok(Self {
246            period,
247            next_deadline: clock.now() + period,
248        })
249    }
250
251    #[inline]
252    pub fn is_ready(&self, clock: &RobotClock) -> bool {
253        self.remaining(clock).is_none()
254    }
255
256    #[inline]
257    pub fn remaining(&self, clock: &RobotClock) -> Option<CuDuration> {
258        let now = clock.now();
259        if now < self.next_deadline {
260            Some(self.next_deadline - now)
261        } else {
262            None
263        }
264    }
265
266    #[inline]
267    pub fn wait_until_ready(&self, clock: &RobotClock) {
268        let deadline = self.next_deadline;
269        let Some(remaining) = self.remaining(clock) else {
270            return;
271        };
272
273        #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
274        {
275            let spin_window = self.spin_window();
276            if remaining > spin_window {
277                std::thread::sleep(std::time::Duration::from(remaining - spin_window));
278            }
279            while clock.now() < deadline {
280                core::hint::spin_loop();
281            }
282        }
283
284        #[cfg(all(feature = "std", not(feature = "high-precision-limiter")))]
285        {
286            let _ = deadline;
287            std::thread::sleep(std::time::Duration::from(remaining));
288        }
289
290        #[cfg(not(feature = "std"))]
291        {
292            let _ = remaining;
293            while clock.now() < deadline {
294                core::hint::spin_loop();
295            }
296        }
297    }
298
299    #[inline]
300    pub fn mark_tick(&mut self, clock: &RobotClock) {
301        self.advance_from(clock.now());
302    }
303
304    #[inline]
305    pub fn limit(&mut self, clock: &RobotClock) {
306        self.wait_until_ready(clock);
307        self.mark_tick(clock);
308    }
309
310    #[inline]
311    fn advance_from(&mut self, now: CuTime) {
312        let steps = if now < self.next_deadline {
313            1
314        } else {
315            (now - self.next_deadline).as_nanos() / self.period.as_nanos() + 1
316        };
317        self.next_deadline += steps * self.period;
318    }
319
320    #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
321    #[inline]
322    fn spin_window(&self) -> CuDuration {
323        let _ = self.period;
324        CuDuration::from(HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS)
325    }
326
327    #[cfg(test)]
328    #[inline]
329    fn next_deadline(&self) -> CuTime {
330        self.next_deadline
331    }
332}
333
334#[cfg(all(feature = "std", feature = "async-cl-io"))]
335#[doc(hidden)]
336pub trait AsyncCopperListPayload: Send {}
337
338#[cfg(all(feature = "std", feature = "async-cl-io"))]
339impl<T: Send> AsyncCopperListPayload for T {}
340
341#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
342#[doc(hidden)]
343pub trait AsyncCopperListPayload {}
344
345#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
346impl<T> AsyncCopperListPayload for T {}
347
348/// Control-flow result returned by one generated process stage.
349///
350/// `AbortCopperList` preserves the current runtime semantics for monitor
351/// decisions that abort the current CopperList without shutting the runtime
352/// down. The outer driver remains responsible for ordered cleanup and log
353/// handoff.
354#[derive(Clone, Copy, Debug, PartialEq, Eq)]
355#[doc(hidden)]
356pub enum ProcessStepOutcome {
357    Continue,
358    AbortCopperList,
359}
360
361/// Result type used by generated process-step functions.
362#[doc(hidden)]
363pub type ProcessStepResult = CuResult<ProcessStepOutcome>;
364
365#[cfg(feature = "remote-debug")]
366fn encode_completed_copperlist_snapshot<P: CopperListTuple>(
367    cl: &CopperList<P>,
368) -> CuResult<Vec<u8>> {
369    bincode::encode_to_vec(cl, bincode::config::standard())
370        .map_err(|e| CuError::new_with_cause("Failed to encode completed CopperList snapshot", e))
371}
372
373/// Manages the lifecycle of the copper lists and logging on the synchronous path.
374#[doc(hidden)]
375pub struct SyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
376    inner: CuListsManager<P, NBCL>,
377    /// Logger for the copper lists (messages between tasks)
378    logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
379    /// Remote-debug snapshot of the most recently completed CopperList.
380    #[cfg(feature = "remote-debug")]
381    last_completed_encoded: Option<Vec<u8>>,
382    /// Last encoded size returned by logger.log
383    pub last_encoded_bytes: u64,
384    /// Last handle-backed payload bytes observed during logger.log
385    pub last_handle_bytes: u64,
386}
387
388impl<P: CopperListTuple + Default, const NBCL: usize> SyncCopperListsManager<P, NBCL> {
389    pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
390    where
391        P: CuListZeroedInit,
392    {
393        Ok(Self {
394            inner: CuListsManager::new(),
395            logger,
396            #[cfg(feature = "remote-debug")]
397            last_completed_encoded: None,
398            last_encoded_bytes: 0,
399            last_handle_bytes: 0,
400        })
401    }
402
403    pub fn next_cl_id(&self) -> u64 {
404        self.inner.next_cl_id()
405    }
406
407    pub fn last_cl_id(&self) -> u64 {
408        self.inner.last_cl_id()
409    }
410
411    pub fn peek(&self) -> Option<&CopperList<P>> {
412        self.inner.peek()
413    }
414
415    #[cfg(feature = "remote-debug")]
416    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
417        self.last_completed_encoded.as_deref()
418    }
419
420    #[cfg(not(feature = "remote-debug"))]
421    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
422        None
423    }
424
425    #[cfg(feature = "remote-debug")]
426    pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
427        self.last_completed_encoded = snapshot;
428    }
429
430    #[cfg(not(feature = "remote-debug"))]
431    pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
432
433    pub fn create(&mut self) -> CuResult<&mut CopperList<P>>
434    where
435        P: CuListZeroedInit,
436    {
437        self.inner
438            .create()
439            .ok_or_else(|| CuError::from("Ran out of space for copper lists"))
440    }
441
442    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
443        #[cfg(debug_assertions)]
444        self.debug_assert_end_of_processing_target(culistid);
445
446        let mut is_top = true;
447        let mut nb_done = 0;
448        self.last_encoded_bytes = 0;
449        self.last_handle_bytes = 0;
450        #[cfg(feature = "remote-debug")]
451        let last_completed_encoded = &mut self.last_completed_encoded;
452        for cl in self.inner.iter_mut() {
453            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
454                cl.change_state(CopperListState::DoneProcessing);
455                #[cfg(feature = "remote-debug")]
456                {
457                    *last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
458                }
459            }
460            if is_top && cl.get_state() == CopperListState::DoneProcessing {
461                if let Some(logger) = &mut self.logger {
462                    cl.change_state(CopperListState::BeingSerialized);
463                    logger.log(cl)?;
464                    self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
465                    self.last_handle_bytes = take_last_completed_handle_bytes();
466                }
467                cl.change_state(CopperListState::Free);
468                nb_done += 1;
469            } else {
470                is_top = false;
471            }
472        }
473        for _ in 0..nb_done {
474            let _ = self.inner.pop();
475        }
476        Ok(())
477    }
478
479    pub fn finish_pending(&mut self) -> CuResult<()> {
480        Ok(())
481    }
482
483    pub fn available_copper_lists(&mut self) -> CuResult<usize> {
484        Ok(NBCL - self.inner.len())
485    }
486
487    #[cfg(feature = "std")]
488    pub fn end_of_processing_boxed(
489        &mut self,
490        mut culist: Box<CopperList<P>>,
491    ) -> CuResult<OwnedCopperListSubmission<P>> {
492        #[cfg(debug_assertions)]
493        debug_assert_processing_completion_state(culist.as_ref(), "sync boxed end_of_processing");
494
495        culist.change_state(CopperListState::DoneProcessing);
496        self.last_encoded_bytes = 0;
497        self.last_handle_bytes = 0;
498        if let Some(logger) = &mut self.logger {
499            culist.change_state(CopperListState::BeingSerialized);
500            logger.log(&culist)?;
501            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
502            self.last_handle_bytes = take_last_completed_handle_bytes();
503        }
504        culist.change_state(CopperListState::Free);
505        Ok(OwnedCopperListSubmission::Recycled(culist))
506    }
507
508    #[cfg(feature = "std")]
509    pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
510        Ok(None)
511    }
512
513    #[cfg(feature = "std")]
514    pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
515        Err(CuError::from(
516            "Synchronous CopperList I/O cannot block waiting for boxed completions",
517        ))
518    }
519
520    #[cfg(feature = "std")]
521    pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
522        Ok(Vec::new())
523    }
524
525    #[cfg(debug_assertions)]
526    fn debug_assert_end_of_processing_target(&self, culistid: u64) {
527        let mut matches = 0usize;
528        let mut state = None;
529        for cl in self.inner.iter() {
530            if cl.id == culistid {
531                matches += 1;
532                state = Some(cl.get_state());
533            }
534        }
535
536        assert_eq!(
537            matches, 1,
538            "sync end_of_processing expected exactly one active CopperList #{culistid}, found {matches}"
539        );
540        assert_eq!(
541            state,
542            Some(CopperListState::Processing),
543            "sync end_of_processing expected CopperList #{culistid} to be Processing, found {:?}",
544            state
545        );
546    }
547}
548
549/// Result of handing an owned boxed CopperList to the runtime-side CL I/O path.
550#[cfg(feature = "std")]
551#[doc(hidden)]
552pub enum OwnedCopperListSubmission<P: CopperListTuple> {
553    /// The CL has been fully handled and can be recycled immediately by the caller.
554    Recycled(Box<CopperList<P>>),
555    /// The CL was queued asynchronously and will be returned by a later reclaim call.
556    Pending,
557}
558
559#[cfg(all(feature = "std", feature = "async-cl-io"))]
560struct AsyncCopperListCompletion<P: CopperListTuple> {
561    culist: Box<CopperList<P>>,
562    log_result: CuResult<(u64, u64)>,
563}
564
565#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
566fn allocate_zeroed_copperlist<P>() -> Box<CopperList<P>>
567where
568    P: CopperListTuple + CuListZeroedInit,
569{
570    // SAFETY: We allocate zeroed memory and immediately initialize required fields.
571    let mut culist = unsafe {
572        let layout = Layout::new::<CopperList<P>>();
573        let ptr = alloc_zeroed(layout) as *mut CopperList<P>;
574        if ptr.is_null() {
575            handle_alloc_error(layout);
576        }
577        Box::from_raw(ptr)
578    };
579    culist.msgs.init_zeroed();
580    culist
581}
582
583#[cfg(all(feature = "std", feature = "parallel-rt"))]
584pub fn allocate_boxed_copperlists<P, const NBCL: usize>() -> Vec<Box<CopperList<P>>>
585where
586    P: CopperListTuple + CuListZeroedInit,
587{
588    let mut free_pool = Vec::with_capacity(NBCL);
589    for _ in 0..NBCL {
590        free_pool.push(allocate_zeroed_copperlist::<P>());
591    }
592    free_pool
593}
594
595/// Manages the lifecycle of the copper lists and logging on the asynchronous path.
596#[cfg(all(feature = "std", feature = "async-cl-io"))]
597#[doc(hidden)]
598pub struct AsyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
599    free_pool: Vec<Box<CopperList<P>>>,
600    current: Option<Box<CopperList<P>>>,
601    #[cfg(feature = "remote-debug")]
602    last_completed_encoded: Option<Vec<u8>>,
603    pending_count: usize,
604    next_cl_id: u64,
605    pending_sender: Option<SyncSender<Box<CopperList<P>>>>,
606    completion_receiver: Option<Receiver<AsyncCopperListCompletion<P>>>,
607    worker_handle: Option<JoinHandle<()>>,
608    /// Last encoded size returned by logger.log
609    pub last_encoded_bytes: u64,
610    /// Last handle-backed payload bytes observed during logger.log
611    pub last_handle_bytes: u64,
612}
613
614#[cfg(all(feature = "std", feature = "async-cl-io"))]
615impl<P: CopperListTuple + Default, const NBCL: usize> AsyncCopperListsManager<P, NBCL> {
616    pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
617    where
618        P: CuListZeroedInit + AsyncCopperListPayload + 'static,
619    {
620        let mut free_pool = Vec::with_capacity(NBCL);
621        for _ in 0..NBCL {
622            free_pool.push(allocate_zeroed_copperlist::<P>());
623        }
624
625        let (pending_sender, completion_receiver, worker_handle) = if let Some(mut logger) = logger
626        {
627            let (pending_sender, pending_receiver) = sync_channel::<Box<CopperList<P>>>(NBCL);
628            let (completion_sender, completion_receiver) =
629                sync_channel::<AsyncCopperListCompletion<P>>(NBCL);
630            let worker_handle = std::thread::Builder::new()
631                .name("cu-async-cl-io".to_string())
632                .spawn(move || {
633                    while let Ok(mut culist) = pending_receiver.recv() {
634                        culist.change_state(CopperListState::BeingSerialized);
635                        let log_result = logger.log(&culist).map(|_| {
636                            (
637                                logger.last_log_bytes().unwrap_or(0) as u64,
638                                take_last_completed_handle_bytes(),
639                            )
640                        });
641                        let should_stop = log_result.is_err();
642                        if completion_sender
643                            .send(AsyncCopperListCompletion { culist, log_result })
644                            .is_err()
645                        {
646                            break;
647                        }
648                        if should_stop {
649                            break;
650                        }
651                    }
652                })
653                .map_err(|e| {
654                    CuError::from("Failed to spawn async CopperList serializer thread")
655                        .add_cause(e.to_string().as_str())
656                })?;
657            (
658                Some(pending_sender),
659                Some(completion_receiver),
660                Some(worker_handle),
661            )
662        } else {
663            (None, None, None)
664        };
665
666        Ok(Self {
667            free_pool,
668            current: None,
669            #[cfg(feature = "remote-debug")]
670            last_completed_encoded: None,
671            pending_count: 0,
672            next_cl_id: 0,
673            pending_sender,
674            completion_receiver,
675            worker_handle,
676            last_encoded_bytes: 0,
677            last_handle_bytes: 0,
678        })
679    }
680
681    pub fn next_cl_id(&self) -> u64 {
682        self.next_cl_id
683    }
684
685    pub fn last_cl_id(&self) -> u64 {
686        self.next_cl_id.saturating_sub(1)
687    }
688
689    pub fn peek(&self) -> Option<&CopperList<P>> {
690        self.current.as_deref()
691    }
692
693    #[cfg(feature = "remote-debug")]
694    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
695        self.last_completed_encoded.as_deref()
696    }
697
698    #[cfg(not(feature = "remote-debug"))]
699    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
700        None
701    }
702
703    #[cfg(feature = "remote-debug")]
704    pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
705        self.last_completed_encoded = snapshot;
706    }
707
708    #[cfg(not(feature = "remote-debug"))]
709    pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
710
711    pub fn create(&mut self) -> CuResult<&mut CopperList<P>>
712    where
713        P: CuListZeroedInit,
714    {
715        if self.current.is_some() {
716            return Err(CuError::from(
717                "Attempted to create a CopperList while another one is still active",
718            ));
719        }
720
721        self.reclaim_completed()?;
722        while self.free_pool.is_empty() {
723            self.wait_for_completion()?;
724        }
725
726        let culist = self
727            .free_pool
728            .pop()
729            .ok_or_else(|| CuError::from("Ran out of space for copper lists"))?;
730        self.current = Some(culist);
731
732        let current = self
733            .current
734            .as_mut()
735            .expect("current CopperList is missing");
736        current.reset_for_runtime_use(self.next_cl_id);
737        self.next_cl_id += 1;
738        Ok(current.as_mut())
739    }
740
741    #[cfg(feature = "remote-debug")]
742    fn capture_completed_snapshot(&mut self, cl: &CopperList<P>) -> CuResult<()> {
743        self.last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
744        Ok(())
745    }
746
747    #[cfg(not(feature = "remote-debug"))]
748    fn capture_completed_snapshot(&mut self, _cl: &CopperList<P>) -> CuResult<()> {
749        Ok(())
750    }
751
752    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
753        self.reclaim_completed()?;
754
755        let mut culist = self.current.take().ok_or_else(|| {
756            CuError::from("Attempted to finish processing without an active CopperList")
757        })?;
758
759        if culist.id != culistid {
760            return Err(CuError::from(format!(
761                "Attempted to finish CopperList #{culistid} while CopperList #{} is active",
762                culist.id
763            )));
764        }
765        #[cfg(debug_assertions)]
766        debug_assert_processing_completion_state(culist.as_ref(), "async end_of_processing");
767
768        culist.change_state(CopperListState::DoneProcessing);
769        self.capture_completed_snapshot(&culist)?;
770        self.last_encoded_bytes = 0;
771        self.last_handle_bytes = 0;
772
773        if let Some(pending_sender) = &self.pending_sender {
774            culist.change_state(CopperListState::QueuedForSerialization);
775            pending_sender.send(culist).map_err(|e| {
776                CuError::from("Failed to enqueue CopperList for async serialization")
777                    .add_cause(e.to_string().as_str())
778            })?;
779            self.pending_count += 1;
780            self.reclaim_completed()?;
781        } else {
782            culist.change_state(CopperListState::Free);
783            self.free_pool.push(culist);
784        }
785
786        Ok(())
787    }
788
789    pub fn finish_pending(&mut self) -> CuResult<()> {
790        if self.current.is_some() {
791            return Err(CuError::from(
792                "Cannot flush CopperList I/O while a CopperList is still active",
793            ));
794        }
795
796        while self.pending_count > 0 {
797            self.wait_for_completion()?;
798        }
799        Ok(())
800    }
801
802    pub fn available_copper_lists(&mut self) -> CuResult<usize> {
803        self.reclaim_completed()?;
804        Ok(self.free_pool.len())
805    }
806
807    pub fn end_of_processing_boxed(
808        &mut self,
809        mut culist: Box<CopperList<P>>,
810    ) -> CuResult<OwnedCopperListSubmission<P>> {
811        self.reclaim_completed()?;
812        #[cfg(debug_assertions)]
813        debug_assert_processing_completion_state(culist.as_ref(), "async boxed end_of_processing");
814        culist.change_state(CopperListState::DoneProcessing);
815        self.capture_completed_snapshot(&culist)?;
816        self.last_encoded_bytes = 0;
817        self.last_handle_bytes = 0;
818
819        if let Some(pending_sender) = &self.pending_sender {
820            culist.change_state(CopperListState::QueuedForSerialization);
821            pending_sender.send(culist).map_err(|e| {
822                CuError::from("Failed to enqueue CopperList for async serialization")
823                    .add_cause(e.to_string().as_str())
824            })?;
825            self.pending_count += 1;
826            self.reclaim_completed()?;
827            Ok(OwnedCopperListSubmission::Pending)
828        } else {
829            culist.change_state(CopperListState::Free);
830            Ok(OwnedCopperListSubmission::Recycled(culist))
831        }
832    }
833
834    pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
835        let recv_result = {
836            let Some(completion_receiver) = self.completion_receiver.as_ref() else {
837                return Ok(None);
838            };
839            completion_receiver.try_recv()
840        };
841        match recv_result {
842            Ok(completion) => self.handle_completion(completion).map(Some),
843            Err(TryRecvError::Empty) => Ok(None),
844            Err(TryRecvError::Disconnected) => Err(CuError::from(
845                "Async CopperList serializer thread disconnected unexpectedly",
846            )),
847        }
848    }
849
850    pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
851        let completion = self
852            .completion_receiver
853            .as_ref()
854            .ok_or_else(|| {
855                CuError::from("No async CopperList serializer is active to return a free slot")
856            })?
857            .recv()
858            .map_err(|e| {
859                CuError::from("Failed to receive completion from async CopperList serializer")
860                    .add_cause(e.to_string().as_str())
861            })?;
862        self.handle_completion(completion)
863    }
864
865    pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
866        let mut reclaimed = Vec::with_capacity(self.pending_count);
867        if self.current.is_some() {
868            return Err(CuError::from(
869                "Cannot flush CopperList I/O while a CopperList is still active",
870            ));
871        }
872        while self.pending_count > 0 {
873            reclaimed.push(self.wait_reclaim_boxed()?);
874        }
875        Ok(reclaimed)
876    }
877
878    fn reclaim_completed(&mut self) -> CuResult<()> {
879        loop {
880            let Some(culist) = self.try_reclaim_boxed()? else {
881                break;
882            };
883            self.free_pool.push(culist);
884        }
885        Ok(())
886    }
887
888    fn wait_for_completion(&mut self) -> CuResult<()> {
889        let culist = self.wait_reclaim_boxed()?;
890        self.free_pool.push(culist);
891        Ok(())
892    }
893
894    fn handle_completion(
895        &mut self,
896        mut completion: AsyncCopperListCompletion<P>,
897    ) -> CuResult<Box<CopperList<P>>> {
898        self.pending_count = self.pending_count.saturating_sub(1);
899        if let Ok((encoded_bytes, handle_bytes)) = completion.log_result.as_ref() {
900            self.last_encoded_bytes = *encoded_bytes;
901            self.last_handle_bytes = *handle_bytes;
902        }
903        completion.culist.change_state(CopperListState::Free);
904        completion.log_result?;
905        Ok(completion.culist)
906    }
907
908    fn shutdown_worker(&mut self) -> CuResult<()> {
909        self.finish_pending()?;
910        self.pending_sender.take();
911        if let Some(worker_handle) = self.worker_handle.take() {
912            worker_handle.join().map_err(|_| {
913                CuError::from("Async CopperList serializer thread panicked while joining")
914            })?;
915        }
916        Ok(())
917    }
918}
919
920#[cfg(all(feature = "std", feature = "async-cl-io"))]
921impl<P: CopperListTuple + Default, const NBCL: usize> Drop for AsyncCopperListsManager<P, NBCL> {
922    fn drop(&mut self) {
923        let _ = self.shutdown_worker();
924    }
925}
926
927#[cfg(all(feature = "std", debug_assertions))]
928fn debug_assert_processing_completion_state<P: CopperListTuple>(
929    culist: &CopperList<P>,
930    context: &str,
931) {
932    assert_eq!(
933        culist.get_state(),
934        CopperListState::Processing,
935        "{context} expected CopperList #{} to be Processing, found {}",
936        culist.id,
937        culist.get_state()
938    );
939}
940
941#[cfg(all(feature = "std", feature = "async-cl-io"))]
942#[doc(hidden)]
943pub type CopperListsManager<P, const NBCL: usize> = AsyncCopperListsManager<P, NBCL>;
944
945#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
946#[doc(hidden)]
947pub type CopperListsManager<P, const NBCL: usize> = SyncCopperListsManager<P, NBCL>;
948
949/// Manages the frozen tasks state and logging.
950pub struct KeyFramesManager {
951    /// Where the serialized tasks are stored following the wave of execution of a CL.
952    inner: KeyFrame,
953
954    /// Optional override for the timestamp to stamp the next keyframe (used by deterministic replay).
955    forced_timestamp: Option<CuTime>,
956
957    /// If set, reuse this keyframe verbatim (e.g., during replay) instead of re-freezing state.
958    locked: bool,
959
960    /// Logger for the state of the tasks (frozen tasks)
961    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
962
963    /// Capture a keyframe only each...
964    keyframe_interval: u32,
965
966    /// Bytes written by the last keyframe log
967    pub last_encoded_bytes: u64,
968}
969
970impl KeyFramesManager {
971    fn is_keyframe(&self, culistid: u64) -> bool {
972        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
973    }
974
975    #[inline]
976    pub fn captures_keyframe(&self, culistid: u64) -> bool {
977        self.is_keyframe(culistid)
978    }
979
980    pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
981        if self.is_keyframe(culistid) {
982            // If a recorded keyframe was preloaded for this CL, keep it as-is.
983            if self.locked && self.inner.culistid == culistid {
984                return;
985            }
986            let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
987            self.inner.reset(culistid, ts);
988            self.locked = false;
989        }
990    }
991
992    /// Force the timestamp of the next keyframe to a given value.
993    #[cfg(feature = "std")]
994    pub fn set_forced_timestamp(&mut self, ts: CuTime) {
995        self.forced_timestamp = Some(ts);
996    }
997
998    pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
999        if self.is_keyframe(culistid) {
1000            if self.locked {
1001                // We are replaying a recorded keyframe verbatim; don't mutate it.
1002                return Ok(0);
1003            }
1004            if self.inner.culistid != culistid {
1005                return Err(CuError::from(format!(
1006                    "Freezing task for culistid {} but current keyframe is {}",
1007                    culistid, self.inner.culistid
1008                )));
1009            }
1010            self.inner
1011                .add_frozen_task(task)
1012                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
1013        } else {
1014            Ok(0)
1015        }
1016    }
1017
1018    /// Generic helper to freeze any `Freezable` state (task or bridge) into the current keyframe.
1019    pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
1020        self.freeze_task(culistid, item)
1021    }
1022
1023    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
1024        if self.is_keyframe(culistid) {
1025            let logger = self.logger.as_mut().unwrap();
1026            logger.log(&self.inner)?;
1027            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
1028            // Clear the lock so the next CL can rebuild normally unless re-locked.
1029            self.locked = false;
1030            Ok(())
1031        } else {
1032            // Not a keyframe for this CL; ensure we don't carry stale sizes forward.
1033            self.last_encoded_bytes = 0;
1034            Ok(())
1035        }
1036    }
1037
1038    /// Preload a recorded keyframe so it is logged verbatim on the matching CL.
1039    #[cfg(feature = "std")]
1040    pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
1041        self.inner = keyframe.clone();
1042        self.forced_timestamp = Some(keyframe.timestamp);
1043        self.locked = true;
1044    }
1045}
1046
1047/// This is the main structure that will be injected as a member of the Application struct.
1048/// CT is the tuple of all the tasks in order of execution.
1049/// CL is the type of the copper list, representing the input/output messages for all the tasks.
1050pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
1051    /// The base clock the runtime will be using to record time.
1052    clock: RobotClock,
1053
1054    /// Compile-time subsystem identity for this Copper process.
1055    subsystem_code: u16,
1056
1057    /// Deployment/runtime instance identity for this Copper process.
1058    #[doc(hidden)]
1059    pub instance_id: u32,
1060
1061    /// The tuple of all the tasks in order of execution.
1062    #[doc(hidden)]
1063    pub tasks: CT,
1064
1065    /// Tuple of all instantiated bridges.
1066    #[doc(hidden)]
1067    pub bridges: CB,
1068
1069    /// Resource registry kept alive for tasks borrowing shared handles.
1070    #[doc(hidden)]
1071    pub resources: ResourceManager,
1072
1073    /// The runtime monitoring.
1074    #[doc(hidden)]
1075    pub monitor: M,
1076
1077    /// Runtime-side execution progress probe for watchdog/diagnostic monitors.
1078    ///
1079    /// This probe is written from the generated execution plan before each component
1080    /// step. Monitors consume it asynchronously (typically from watchdog threads) to
1081    /// report the last known component/step/culist when the runtime appears stalled.
1082    #[cfg(feature = "std")]
1083    #[doc(hidden)]
1084    pub execution_probe: ExecutionProbeHandle,
1085    #[cfg(not(feature = "std"))]
1086    #[doc(hidden)]
1087    pub execution_probe: RuntimeExecutionProbe,
1088
1089    /// The logger for the copper lists (messages between tasks)
1090    #[doc(hidden)]
1091    pub copperlists_manager: CopperListsManager<P, NBCL>,
1092
1093    /// The logger for the state of the tasks (frozen tasks)
1094    #[doc(hidden)]
1095    pub keyframes_manager: KeyFramesManager,
1096
1097    /// Feature-gated container for deterministic multi-CopperList execution.
1098    #[cfg(all(feature = "std", feature = "parallel-rt"))]
1099    #[doc(hidden)]
1100    pub parallel_rt: ParallelRt<NBCL>,
1101
1102    /// The runtime configuration controlling the behavior of the run loop
1103    #[doc(hidden)]
1104    pub runtime_config: RuntimeConfig,
1105}
1106
1107/// To be able to share the clock we make the runtime a clock provider.
1108impl<
1109    CT,
1110    CB,
1111    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload,
1112    M: CuMonitor,
1113    const NBCL: usize,
1114> ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
1115{
1116    fn get_clock(&self) -> RobotClock {
1117        self.clock.clone()
1118    }
1119}
1120
1121impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> CuRuntime<CT, CB, P, M, NBCL> {
1122    /// Returns a clone of the runtime clock handle.
1123    #[inline]
1124    pub fn clock(&self) -> RobotClock {
1125        self.clock.clone()
1126    }
1127
1128    /// Returns the runtime clock by reference for generated runtime code.
1129    #[doc(hidden)]
1130    #[inline]
1131    pub fn clock_ref(&self) -> &RobotClock {
1132        &self.clock
1133    }
1134
1135    /// Returns the compile-time subsystem code for this process.
1136    #[inline]
1137    pub fn subsystem_code(&self) -> u16 {
1138        self.subsystem_code
1139    }
1140
1141    /// Returns the configured runtime instance id for this process.
1142    #[inline]
1143    pub fn instance_id(&self) -> u32 {
1144        self.instance_id
1145    }
1146}
1147
1148impl<
1149    'cfg,
1150    CT,
1151    CB,
1152    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1153    M: CuMonitor,
1154    const NBCL: usize,
1155    TI,
1156    BI,
1157    MI,
1158    CLW,
1159    KFW,
1160> CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
1161where
1162    TI: for<'c> Fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>,
1163    BI: Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1164    MI: Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1165    CLW: WriteStream<CopperList<P>> + 'static,
1166    KFW: WriteStream<KeyFrame> + 'static,
1167{
1168    pub fn build(self) -> CuResult<CuRuntime<CT, CB, P, M, NBCL>> {
1169        let Self {
1170            clock,
1171            config,
1172            mission,
1173            subsystem,
1174            instance_id,
1175            resources,
1176            parts,
1177            copperlists_logger,
1178            keyframes_logger,
1179        } = self;
1180        let mut resources =
1181            resources.ok_or_else(|| CuError::from("Resources missing from CuRuntimeBuilder"))?;
1182
1183        let graph = config.get_graph(Some(mission))?;
1184        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1185            .get_all_nodes()
1186            .iter()
1187            .map(|(_, node)| node.get_instance_config())
1188            .collect();
1189
1190        let tasks = (parts.tasks_instanciator)(all_instances_configs, &mut resources)?;
1191
1192        #[cfg(feature = "std")]
1193        let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
1194        #[cfg(not(feature = "std"))]
1195        let execution_probe = RuntimeExecutionProbe::default();
1196        let monitor_metadata = CuMonitoringMetadata::new(
1197            CompactString::from(mission),
1198            parts.monitored_components,
1199            parts.culist_component_mapping,
1200            CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
1201            build_monitor_topology(config, mission)?,
1202            None,
1203        )?
1204        .with_subsystem_id(subsystem.id())
1205        .with_instance_id(instance_id);
1206        #[cfg(feature = "std")]
1207        let monitor_runtime =
1208            CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
1209        #[cfg(not(feature = "std"))]
1210        let monitor_runtime = CuMonitoringRuntime::unavailable();
1211        let monitor = (parts.monitor_instanciator)(config, monitor_metadata, monitor_runtime);
1212        let bridges = (parts.bridges_instanciator)(config, &mut resources)?;
1213
1214        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
1215            Some(logging_config) if logging_config.enable_task_logging => (
1216                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1217                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1218                logging_config.keyframe_interval.unwrap(),
1219            ),
1220            Some(_) => (None, None, 0),
1221            None => (
1222                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1223                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1224                DEFAULT_KEYFRAME_INTERVAL,
1225            ),
1226        };
1227
1228        let copperlists_manager = CopperListsManager::new(copperlists_logger)?;
1229        #[cfg(target_os = "none")]
1230        {
1231            let cl_size = core::mem::size_of::<CopperList<P>>();
1232            let total_bytes = cl_size.saturating_mul(NBCL);
1233            info!(
1234                "CuRuntimeBuilder: copperlists count={} cl_size={} total_bytes={}",
1235                NBCL, cl_size, total_bytes
1236            );
1237        }
1238
1239        let keyframes_manager = KeyFramesManager {
1240            inner: KeyFrame::new(),
1241            logger: keyframes_logger,
1242            keyframe_interval,
1243            last_encoded_bytes: 0,
1244            forced_timestamp: None,
1245            locked: false,
1246        };
1247        #[cfg(all(feature = "std", feature = "parallel-rt"))]
1248        let parallel_rt = ParallelRt::new(parts.parallel_rt_metadata)?;
1249
1250        let runtime_config = config.runtime.clone().unwrap_or_default();
1251        runtime_config.validate()?;
1252
1253        Ok(CuRuntime {
1254            subsystem_code: subsystem.code(),
1255            instance_id,
1256            tasks,
1257            bridges,
1258            resources,
1259            monitor,
1260            execution_probe,
1261            clock,
1262            copperlists_manager,
1263            keyframes_manager,
1264            #[cfg(all(feature = "std", feature = "parallel-rt"))]
1265            parallel_rt,
1266            runtime_config,
1267        })
1268    }
1269}
1270
1271/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
1272/// It is a double encapsulation: this one recording the culistid and another even in
1273/// bincode in the serialized_tasks.
1274#[derive(Clone, Encode, Decode)]
1275pub struct KeyFrame {
1276    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
1277    pub culistid: u64,
1278    // This is the timestamp when the keyframe was created, using the robot clock.
1279    pub timestamp: CuTime,
1280    // This is the bincode representation of the tuple of all the tasks.
1281    pub serialized_tasks: Vec<u8>,
1282}
1283
1284impl KeyFrame {
1285    fn new() -> Self {
1286        KeyFrame {
1287            culistid: 0,
1288            timestamp: CuTime::default(),
1289            serialized_tasks: Vec::new(),
1290        }
1291    }
1292
1293    /// This is to be able to avoid reallocations
1294    fn reset(&mut self, culistid: u64, timestamp: CuTime) {
1295        self.culistid = culistid;
1296        self.timestamp = timestamp;
1297        self.serialized_tasks.clear();
1298    }
1299
1300    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
1301    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
1302        let cfg = bincode::config::standard();
1303        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
1304        BincodeAdapter(task).encode(&mut sizer)?;
1305        let need = sizer.into_writer().bytes_written as usize;
1306
1307        let start = self.serialized_tasks.len();
1308        self.serialized_tasks.resize(start + need, 0);
1309        let mut enc =
1310            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
1311        BincodeAdapter(task).encode(&mut enc)?;
1312        Ok(need)
1313    }
1314}
1315
1316/// Identifies where the effective runtime configuration came from.
1317#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1318pub enum RuntimeLifecycleConfigSource {
1319    ProgrammaticOverride,
1320    ExternalFile,
1321    BundledDefault,
1322}
1323
1324/// Stack and process identification metadata persisted in the runtime lifecycle log.
1325#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1326pub struct RuntimeLifecycleStackInfo {
1327    pub app_name: String,
1328    pub app_version: String,
1329    pub git_commit: Option<String>,
1330    pub git_dirty: Option<bool>,
1331    pub subsystem_id: Option<String>,
1332    pub subsystem_code: u16,
1333    pub instance_id: u32,
1334}
1335
1336/// Runtime lifecycle events emitted in the dedicated lifecycle section.
1337#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1338pub enum RuntimeLifecycleEvent {
1339    Instantiated {
1340        config_source: RuntimeLifecycleConfigSource,
1341        effective_config_ron: String,
1342        stack: RuntimeLifecycleStackInfo,
1343    },
1344    MissionStarted {
1345        mission: String,
1346    },
1347    MissionStopped {
1348        mission: String,
1349        // TODO(lifecycle): replace free-form reason with a typed stop reason enum once
1350        // std/no-std behavior and panic integration are split in a follow-up PR.
1351        reason: String,
1352    },
1353    // TODO(lifecycle): wire panic hook / no_std equivalent to emit this event consistently.
1354    Panic {
1355        message: String,
1356        file: Option<String>,
1357        line: Option<u32>,
1358        column: Option<u32>,
1359    },
1360    ShutdownCompleted,
1361}
1362
1363/// One event record persisted in the `UnifiedLogType::RuntimeLifecycle` section.
1364#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1365pub struct RuntimeLifecycleRecord {
1366    pub timestamp: CuTime,
1367    pub event: RuntimeLifecycleEvent,
1368}
1369
1370impl<
1371    CT,
1372    CB,
1373    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1374    M: CuMonitor,
1375    const NBCL: usize,
1376> CuRuntime<CT, CB, P, M, NBCL>
1377{
1378    /// Records runtime execution progress in the shared probe.
1379    ///
1380    /// This is intentionally lightweight and does not call monitor callbacks.
1381    #[inline]
1382    pub fn record_execution_marker(&self, marker: ExecutionMarker) {
1383        self.execution_probe.record(marker);
1384    }
1385
1386    /// Returns a shared reference to the concrete runtime execution probe.
1387    ///
1388    /// The generated runtime uses this when it needs a uniform
1389    /// `&RuntimeExecutionProbe` view across `std` and `no_std` builds.
1390    #[inline]
1391    pub fn execution_probe_ref(&self) -> &RuntimeExecutionProbe {
1392        #[cfg(feature = "std")]
1393        {
1394            self.execution_probe.as_ref()
1395        }
1396
1397        #[cfg(not(feature = "std"))]
1398        {
1399            &self.execution_probe
1400        }
1401    }
1402}
1403
1404/// Copper tasks can be of 3 types:
1405/// - Source: only producing output messages (usually used for drivers)
1406/// - Regular: processing input messages and producing output messages, more like compute nodes.
1407/// - Sink: only consuming input messages (usually used for actuators)
1408#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1409pub enum CuTaskType {
1410    Source,
1411    Regular,
1412    Sink,
1413}
1414
1415impl From<TaskKind> for CuTaskType {
1416    fn from(value: TaskKind) -> Self {
1417        match value {
1418            TaskKind::Source => CuTaskType::Source,
1419            TaskKind::Regular => CuTaskType::Regular,
1420            TaskKind::Sink => CuTaskType::Sink,
1421        }
1422    }
1423}
1424
1425#[derive(Debug, Clone)]
1426pub struct CuOutputPack {
1427    pub culist_index: u32,
1428    pub msg_types: Vec<String>,
1429}
1430
1431#[derive(Debug, Clone)]
1432pub struct CuInputMsg {
1433    pub culist_index: u32,
1434    pub msg_type: String,
1435    pub src_port: usize,
1436    pub edge_id: usize,
1437    pub connection_order: usize,
1438}
1439
1440/// This structure represents a step in the execution plan.
1441pub struct CuExecutionStep {
1442    /// NodeId: node id of the task to execute
1443    pub node_id: NodeId,
1444    /// Node: node instance
1445    pub node: Node,
1446    /// CuTaskType: type of the task
1447    pub task_type: CuTaskType,
1448
1449    /// the indices in the copper list of the input messages and their types
1450    pub input_msg_indices_types: Vec<CuInputMsg>,
1451
1452    /// the index in the copper list of the output message and its type
1453    pub output_msg_pack: Option<CuOutputPack>,
1454}
1455
1456impl Debug for CuExecutionStep {
1457    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1458        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
1459        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
1460        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
1461        f.write_str(
1462            format!(
1463                "              input_msg_types: {:?}\n",
1464                self.input_msg_indices_types
1465            )
1466            .as_str(),
1467        )?;
1468        f.write_str(format!("       output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
1469        Ok(())
1470    }
1471}
1472
1473/// This structure represents a loop in the execution plan.
1474/// It is used to represent a sequence of Execution units (loop or steps) that are executed
1475/// multiple times.
1476/// if loop_count is None, the loop is infinite.
1477pub struct CuExecutionLoop {
1478    pub steps: Vec<CuExecutionUnit>,
1479    pub loop_count: Option<u32>,
1480}
1481
1482impl Debug for CuExecutionLoop {
1483    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1484        f.write_str("CuExecutionLoop:\n")?;
1485        for step in &self.steps {
1486            match step {
1487                CuExecutionUnit::Step(step) => {
1488                    step.fmt(f)?;
1489                }
1490                CuExecutionUnit::Loop(l) => {
1491                    l.fmt(f)?;
1492                }
1493            }
1494        }
1495
1496        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
1497        Ok(())
1498    }
1499}
1500
1501/// This structure represents a step in the execution plan.
1502#[derive(Debug)]
1503pub enum CuExecutionUnit {
1504    Step(Box<CuExecutionStep>),
1505    Loop(CuExecutionLoop),
1506}
1507
1508fn find_output_pack_from_nodeid(
1509    node_id: NodeId,
1510    steps: &Vec<CuExecutionUnit>,
1511) -> Option<CuOutputPack> {
1512    for step in steps {
1513        match step {
1514            CuExecutionUnit::Loop(loop_unit) => {
1515                if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
1516                    return Some(output_pack);
1517                }
1518            }
1519            CuExecutionUnit::Step(step) if step.node_id == node_id => {
1520                return step.output_msg_pack.clone();
1521            }
1522            _ => {}
1523        }
1524    }
1525    None
1526}
1527
1528pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuResult<CuTaskType> {
1529    let node = graph
1530        .get_node(node_id)
1531        .ok_or_else(|| CuError::from(format!("Node id {node_id} not found")))?;
1532
1533    if node.get_flavor() == crate::config::Flavor::Task {
1534        return resolve_task_kind_for_id(graph, node_id).map(Into::into);
1535    }
1536
1537    let has_inputs = !graph.get_dst_edges(node_id)?.is_empty();
1538    let has_outputs = !graph.get_src_edges(node_id)?.is_empty();
1539    Ok(match (has_inputs, has_outputs) {
1540        (false, true) => CuTaskType::Source,
1541        (true, false) => CuTaskType::Sink,
1542        _ => CuTaskType::Regular,
1543    })
1544}
1545
1546/// Preserve the original serialized connection order across missions.
1547///
1548/// Edge ids are assigned per mission graph, so they are not stable enough to describe a shared
1549/// input layout when missions selectively include connections.
1550fn sort_inputs_by_connection_order(input_msg_indices_types: &mut [CuInputMsg]) {
1551    input_msg_indices_types.sort_by_key(|input| input.connection_order);
1552}
1553
1554/// Explores a subbranch and build the partial plan out of it.
1555fn plan_tasks_tree_branch(
1556    graph: &CuGraph,
1557    mut next_culist_output_index: u32,
1558    starting_point: NodeId,
1559    plan: &mut Vec<CuExecutionUnit>,
1560) -> CuResult<(u32, bool)> {
1561    #[cfg(all(feature = "std", feature = "macro_debug"))]
1562    eprintln!("-- starting branch from node {starting_point}");
1563
1564    let mut handled = false;
1565
1566    for id in graph.bfs_nodes(starting_point) {
1567        let node_ref = graph.get_node(id).unwrap();
1568        #[cfg(all(feature = "std", feature = "macro_debug"))]
1569        eprintln!("  Visiting node: {node_ref:?}");
1570
1571        let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
1572        let output_msg_pack: Option<CuOutputPack>;
1573        let task_type = find_task_type_for_id(graph, id)?;
1574
1575        match task_type {
1576            CuTaskType::Source => {
1577                #[cfg(all(feature = "std", feature = "macro_debug"))]
1578                eprintln!("    → Source node, assign output index {next_culist_output_index}");
1579                let msg_types = graph.get_node_output_msg_types_by_id(id)?;
1580                if msg_types.is_empty() {
1581                    return Err(CuError::from(format!(
1582                        "Source node '{}' has no declared outputs",
1583                        node_ref.get_id()
1584                    )));
1585                }
1586                output_msg_pack = Some(CuOutputPack {
1587                    culist_index: next_culist_output_index,
1588                    msg_types,
1589                });
1590                next_culist_output_index += 1;
1591            }
1592            CuTaskType::Sink => {
1593                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1594                edge_ids.sort();
1595                #[cfg(all(feature = "std", feature = "macro_debug"))]
1596                eprintln!("    → Sink with incoming edges: {edge_ids:?}");
1597                for edge_id in edge_ids {
1598                    let edge = graph
1599                        .edge(edge_id)
1600                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1601                    let pid = graph
1602                        .get_node_id_by_name(edge.src.as_str())
1603                        .unwrap_or_else(|| {
1604                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1605                        });
1606                    let output_pack = find_output_pack_from_nodeid(pid, plan);
1607                    if let Some(output_pack) = output_pack {
1608                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1609                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
1610                        let msg_type = edge.msg.as_str();
1611                        let src_port = output_pack
1612                            .msg_types
1613                            .iter()
1614                            .position(|msg| msg == msg_type)
1615                            .unwrap_or_else(|| {
1616                                panic!(
1617                                    "Missing output port for message type '{msg_type}' on node {pid}"
1618                                )
1619                            });
1620                        input_msg_indices_types.push(CuInputMsg {
1621                            culist_index: output_pack.culist_index,
1622                            msg_type: msg_type.to_string(),
1623                            src_port,
1624                            edge_id,
1625                            connection_order: edge.order,
1626                        });
1627                    } else {
1628                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1629                        eprintln!("      ✗ Input from {pid} not ready, returning");
1630                        return Ok((next_culist_output_index, handled));
1631                    }
1632                }
1633                output_msg_pack = Some(CuOutputPack {
1634                    culist_index: next_culist_output_index,
1635                    msg_types: Vec::from(["()".to_string()]),
1636                });
1637                next_culist_output_index += 1;
1638            }
1639            CuTaskType::Regular => {
1640                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1641                edge_ids.sort();
1642                #[cfg(all(feature = "std", feature = "macro_debug"))]
1643                eprintln!("    → Regular task with incoming edges: {edge_ids:?}");
1644                for edge_id in edge_ids {
1645                    let edge = graph
1646                        .edge(edge_id)
1647                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1648                    let pid = graph
1649                        .get_node_id_by_name(edge.src.as_str())
1650                        .unwrap_or_else(|| {
1651                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1652                        });
1653                    let output_pack = find_output_pack_from_nodeid(pid, plan);
1654                    if let Some(output_pack) = output_pack {
1655                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1656                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
1657                        let msg_type = edge.msg.as_str();
1658                        let src_port = output_pack
1659                            .msg_types
1660                            .iter()
1661                            .position(|msg| msg == msg_type)
1662                            .unwrap_or_else(|| {
1663                                panic!(
1664                                    "Missing output port for message type '{msg_type}' on node {pid}"
1665                                )
1666                            });
1667                        input_msg_indices_types.push(CuInputMsg {
1668                            culist_index: output_pack.culist_index,
1669                            msg_type: msg_type.to_string(),
1670                            src_port,
1671                            edge_id,
1672                            connection_order: edge.order,
1673                        });
1674                    } else {
1675                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1676                        eprintln!("      ✗ Input from {pid} not ready, returning");
1677                        return Ok((next_culist_output_index, handled));
1678                    }
1679                }
1680                let msg_types = graph.get_node_output_msg_types_by_id(id)?;
1681                if msg_types.is_empty() {
1682                    return Err(CuError::from(format!(
1683                        "Regular node '{}' has no declared outputs",
1684                        node_ref.get_id()
1685                    )));
1686                }
1687                output_msg_pack = Some(CuOutputPack {
1688                    culist_index: next_culist_output_index,
1689                    msg_types,
1690                });
1691                next_culist_output_index += 1;
1692            }
1693        }
1694
1695        sort_inputs_by_connection_order(&mut input_msg_indices_types);
1696
1697        if let Some(pos) = plan
1698            .iter()
1699            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
1700        {
1701            #[cfg(all(feature = "std", feature = "macro_debug"))]
1702            eprintln!("    → Already in plan, modifying existing step");
1703            let mut step = plan.remove(pos);
1704            if let CuExecutionUnit::Step(ref mut s) = step {
1705                s.input_msg_indices_types = input_msg_indices_types;
1706            }
1707            plan.push(step);
1708        } else {
1709            #[cfg(all(feature = "std", feature = "macro_debug"))]
1710            eprintln!("    → New step added to plan");
1711            let step = CuExecutionStep {
1712                node_id: id,
1713                node: node_ref.clone(),
1714                task_type,
1715                input_msg_indices_types,
1716                output_msg_pack,
1717            };
1718            plan.push(CuExecutionUnit::Step(Box::new(step)));
1719        }
1720
1721        handled = true;
1722    }
1723
1724    #[cfg(all(feature = "std", feature = "macro_debug"))]
1725    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
1726    Ok((next_culist_output_index, handled))
1727}
1728
1729/// This is the main heuristics to compute an execution plan at compilation time.
1730/// TODO(gbin): Make that heuristic pluggable.
1731pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1732    #[cfg(all(feature = "std", feature = "macro_debug"))]
1733    eprintln!("[runtime plan]");
1734    let mut plan = Vec::new();
1735    let mut next_culist_output_index = 0u32;
1736
1737    let mut queue: VecDeque<NodeId> = VecDeque::new();
1738    for node_id in graph.node_ids() {
1739        if find_task_type_for_id(graph, node_id)? == CuTaskType::Source {
1740            queue.push_back(node_id);
1741        }
1742    }
1743
1744    #[cfg(all(feature = "std", feature = "macro_debug"))]
1745    eprintln!("Initial source nodes: {queue:?}");
1746
1747    while let Some(start_node) = queue.pop_front() {
1748        #[cfg(all(feature = "std", feature = "macro_debug"))]
1749        eprintln!("→ Starting BFS from source {start_node}");
1750        for node_id in graph.bfs_nodes(start_node) {
1751            let already_in_plan = plan
1752                .iter()
1753                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1754            if already_in_plan {
1755                #[cfg(all(feature = "std", feature = "macro_debug"))]
1756                eprintln!("    → Node {node_id} already planned, skipping");
1757                continue;
1758            }
1759
1760            #[cfg(all(feature = "std", feature = "macro_debug"))]
1761            eprintln!("    Planning from node {node_id}");
1762            let (new_index, handled) =
1763                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan)?;
1764            next_culist_output_index = new_index;
1765
1766            if !handled {
1767                #[cfg(all(feature = "std", feature = "macro_debug"))]
1768                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1769                continue;
1770            }
1771
1772            #[cfg(all(feature = "std", feature = "macro_debug"))]
1773            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
1774            for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1775                #[cfg(all(feature = "std", feature = "macro_debug"))]
1776                eprintln!("      → Enqueueing neighbor {neighbor}");
1777                queue.push_back(neighbor);
1778            }
1779        }
1780    }
1781
1782    let mut planned_nodes = BTreeSet::new();
1783    for unit in &plan {
1784        if let CuExecutionUnit::Step(step) = unit {
1785            planned_nodes.insert(step.node_id);
1786        }
1787    }
1788
1789    let mut missing = Vec::new();
1790    for node_id in graph.node_ids() {
1791        if !planned_nodes.contains(&node_id) {
1792            if let Some(node) = graph.get_node(node_id) {
1793                missing.push(node.get_id().to_string());
1794            } else {
1795                missing.push(format!("node_id_{node_id}"));
1796            }
1797        }
1798    }
1799
1800    if !missing.is_empty() {
1801        missing.sort();
1802        return Err(CuError::from(format!(
1803            "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1804            missing.join(", ")
1805        )));
1806    }
1807
1808    Ok(CuExecutionLoop {
1809        steps: plan,
1810        loop_count: None,
1811    })
1812}
1813
1814//tests
1815#[cfg(test)]
1816mod tests {
1817    use super::*;
1818    use crate::config::Node;
1819    use crate::context::CuContext;
1820    use crate::cutask::CuSinkTask;
1821    use crate::cutask::{CuSrcTask, Freezable};
1822    use crate::monitoring::NoMonitor;
1823    use crate::reflect::Reflect;
1824    use bincode::Encode;
1825    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1826    use serde_derive::{Deserialize, Serialize};
1827    #[cfg(feature = "std")]
1828    use std::sync::{Arc, Mutex};
1829
1830    #[derive(Reflect)]
1831    pub struct TestSource {}
1832
1833    impl Freezable for TestSource {}
1834
1835    impl CuSrcTask for TestSource {
1836        type Resources<'r> = ();
1837        type Output<'m> = ();
1838        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1839        where
1840            Self: Sized,
1841        {
1842            Ok(Self {})
1843        }
1844
1845        fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1846            Ok(())
1847        }
1848    }
1849
1850    #[derive(Reflect)]
1851    pub struct TestSink {}
1852
1853    impl Freezable for TestSink {}
1854
1855    impl CuSinkTask for TestSink {
1856        type Resources<'r> = ();
1857        type Input<'m> = ();
1858
1859        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1860        where
1861            Self: Sized,
1862        {
1863            Ok(Self {})
1864        }
1865
1866        fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1867            Ok(())
1868        }
1869    }
1870
1871    // Those should be generated by the derive macro
1872    type Tasks = (TestSource, TestSink);
1873    type TestRuntime = CuRuntime<Tasks, (), Msgs, NoMonitor, 2>;
1874    const TEST_NBCL: usize = 2;
1875
1876    #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1877    struct Msgs(());
1878
1879    impl ErasedCuStampedDataSet for Msgs {
1880        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1881            Vec::new()
1882        }
1883    }
1884
1885    impl MatchingTasks for Msgs {
1886        fn get_all_task_ids() -> &'static [&'static str] {
1887            &[]
1888        }
1889    }
1890
1891    impl CuListZeroedInit for Msgs {
1892        fn init_zeroed(&mut self) {}
1893    }
1894
1895    #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1896    struct IntMsgs(i32);
1897
1898    impl ErasedCuStampedDataSet for IntMsgs {
1899        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1900            Vec::new()
1901        }
1902    }
1903
1904    impl MatchingTasks for IntMsgs {
1905        fn get_all_task_ids() -> &'static [&'static str] {
1906            &[]
1907        }
1908    }
1909
1910    impl CuListZeroedInit for IntMsgs {
1911        fn init_zeroed(&mut self) {}
1912    }
1913
1914    #[cfg(feature = "std")]
1915    fn tasks_instanciator(
1916        all_instances_configs: Vec<Option<&ComponentConfig>>,
1917        _resources: &mut ResourceManager,
1918    ) -> CuResult<Tasks> {
1919        Ok((
1920            TestSource::new(all_instances_configs[0], ())?,
1921            TestSink::new(all_instances_configs[1], ())?,
1922        ))
1923    }
1924
1925    #[cfg(not(feature = "std"))]
1926    fn tasks_instanciator(
1927        all_instances_configs: Vec<Option<&ComponentConfig>>,
1928        _resources: &mut ResourceManager,
1929    ) -> CuResult<Tasks> {
1930        Ok((
1931            TestSource::new(all_instances_configs[0], ())?,
1932            TestSink::new(all_instances_configs[1], ())?,
1933        ))
1934    }
1935
1936    fn monitor_instanciator(
1937        _config: &CuConfig,
1938        metadata: CuMonitoringMetadata,
1939        runtime: CuMonitoringRuntime,
1940    ) -> NoMonitor {
1941        NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
1942    }
1943
1944    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1945        Ok(())
1946    }
1947
1948    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1949        Ok(ResourceManager::new(&[]))
1950    }
1951
1952    #[derive(Debug)]
1953    struct FakeWriter {}
1954
1955    impl<E: Encode> WriteStream<E> for FakeWriter {
1956        fn log(&mut self, _obj: &E) -> CuResult<()> {
1957            Ok(())
1958        }
1959    }
1960
1961    #[cfg(not(feature = "async-cl-io"))]
1962    #[derive(Debug)]
1963    struct RecordingSyncWriter {
1964        ids: Arc<Mutex<Vec<u64>>>,
1965        last_log_bytes: usize,
1966        fail_on: Option<u64>,
1967    }
1968
1969    #[cfg(not(feature = "async-cl-io"))]
1970    impl WriteStream<CopperList<IntMsgs>> for RecordingSyncWriter {
1971        fn log(&mut self, culist: &CopperList<IntMsgs>) -> CuResult<()> {
1972            self.ids.lock().unwrap().push(culist.id);
1973            if self.fail_on == Some(culist.id) {
1974                return Err(CuError::from(format!(
1975                    "logger failed for CopperList #{}",
1976                    culist.id
1977                )));
1978            }
1979            Ok(())
1980        }
1981
1982        fn last_log_bytes(&self) -> Option<usize> {
1983            Some(self.last_log_bytes)
1984        }
1985    }
1986
1987    #[test]
1988    fn test_runtime_instantiation() {
1989        let mut config = CuConfig::default();
1990        let graph = config.get_graph_mut(None).unwrap();
1991        graph.add_node(Node::new("a", "TestSource")).unwrap();
1992        graph.add_node(Node::new("b", "TestSink")).unwrap();
1993        graph.connect(0, 1, "()").unwrap();
1994        let runtime: CuResult<TestRuntime> =
1995            CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
1996                RobotClock::default(),
1997                &config,
1998                crate::config::DEFAULT_MISSION_ID,
1999                CuRuntimeParts::new(
2000                    tasks_instanciator,
2001                    &[],
2002                    &[],
2003                    #[cfg(all(feature = "std", feature = "parallel-rt"))]
2004                    &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2005                    monitor_instanciator,
2006                    bridges_instanciator,
2007                ),
2008                FakeWriter {},
2009                FakeWriter {},
2010            )
2011            .try_with_resources_instantiator(resources_instanciator)
2012            .and_then(|builder| builder.build());
2013        assert!(runtime.is_ok());
2014    }
2015
2016    #[test]
2017    fn test_rate_target_period_rejects_zero() {
2018        let err = rate_target_period(0).expect_err("zero rate target should fail");
2019        assert!(
2020            err.to_string()
2021                .contains("Runtime rate target cannot be zero"),
2022            "unexpected error: {err}"
2023        );
2024    }
2025
2026    #[test]
2027    fn test_loop_rate_limiter_advances_to_next_period_when_on_time() {
2028        let (clock, mock) = RobotClock::mock();
2029        let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2030        assert_eq!(limiter.next_deadline(), CuTime::from_nanos(10_000_000));
2031
2032        mock.set_value(10_000_000);
2033        limiter.mark_tick(&clock);
2034
2035        assert_eq!(limiter.next_deadline(), CuTime::from_nanos(20_000_000));
2036    }
2037
2038    #[test]
2039    fn test_loop_rate_limiter_skips_missed_periods_without_resetting_phase() {
2040        let (clock, mock) = RobotClock::mock();
2041        let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2042
2043        mock.set_value(35_000_000);
2044        limiter.mark_tick(&clock);
2045
2046        assert_eq!(limiter.next_deadline(), CuTime::from_nanos(40_000_000));
2047    }
2048
2049    #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
2050    #[test]
2051    fn test_loop_rate_limiter_spin_window_is_fixed_scheduler_window() {
2052        let (clock, _) = RobotClock::mock();
2053        let limiter = LoopRateLimiter::from_rate_target_hz(1_000, &clock).unwrap();
2054        assert_eq!(limiter.spin_window(), CuDuration::from(200_000));
2055
2056        let fast = LoopRateLimiter::from_rate_target_hz(10_000, &clock).unwrap();
2057        assert_eq!(fast.spin_window(), CuDuration::from(200_000));
2058    }
2059
2060    #[cfg(not(feature = "async-cl-io"))]
2061    #[test]
2062    fn test_copperlists_manager_lifecycle() {
2063        let mut config = CuConfig::default();
2064        let graph = config.get_graph_mut(None).unwrap();
2065        graph.add_node(Node::new("a", "TestSource")).unwrap();
2066        graph.add_node(Node::new("b", "TestSink")).unwrap();
2067        graph.connect(0, 1, "()").unwrap();
2068
2069        let mut runtime: TestRuntime =
2070            CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
2071                RobotClock::default(),
2072                &config,
2073                crate::config::DEFAULT_MISSION_ID,
2074                CuRuntimeParts::new(
2075                    tasks_instanciator,
2076                    &[],
2077                    &[],
2078                    #[cfg(all(feature = "std", feature = "parallel-rt"))]
2079                    &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2080                    monitor_instanciator,
2081                    bridges_instanciator,
2082                ),
2083                FakeWriter {},
2084                FakeWriter {},
2085            )
2086            .try_with_resources_instantiator(resources_instanciator)
2087            .and_then(|builder| builder.build())
2088            .unwrap();
2089
2090        // Now emulates the generated runtime
2091        {
2092            let copperlists = &mut runtime.copperlists_manager;
2093            let culist0 = copperlists
2094                .create()
2095                .expect("Ran out of space for copper lists");
2096            let id = culist0.id;
2097            assert_eq!(id, 0);
2098            culist0.change_state(CopperListState::Processing);
2099            assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2100        }
2101
2102        {
2103            let copperlists = &mut runtime.copperlists_manager;
2104            let culist1 = copperlists
2105                .create()
2106                .expect("Ran out of space for copper lists");
2107            let id = culist1.id;
2108            assert_eq!(id, 1);
2109            culist1.change_state(CopperListState::Processing);
2110            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2111        }
2112
2113        {
2114            let copperlists = &mut runtime.copperlists_manager;
2115            let culist2 = copperlists.create();
2116            assert!(culist2.is_err());
2117            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2118            // Free in order, should let the top of the stack be serialized and freed.
2119            let _ = copperlists.end_of_processing(1);
2120            assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2121        }
2122
2123        // Readd a CL
2124        {
2125            let copperlists = &mut runtime.copperlists_manager;
2126            let culist2 = copperlists
2127                .create()
2128                .expect("Ran out of space for copper lists");
2129            let id = culist2.id;
2130            assert_eq!(id, 2);
2131            culist2.change_state(CopperListState::Processing);
2132            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2133            // Free out of order, the #0 first
2134            let _ = copperlists.end_of_processing(0);
2135            // Should not free up the top of the stack
2136            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2137
2138            // Free up the top of the stack
2139            let _ = copperlists.end_of_processing(2);
2140            // This should free up 2 CLs
2141
2142            assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2143        }
2144    }
2145
2146    #[cfg(not(feature = "async-cl-io"))]
2147    #[test]
2148    fn test_sync_copperlists_accessors_passthrough_to_inner_manager() {
2149        let mut copperlists = SyncCopperListsManager::<IntMsgs, 2>::new(None).unwrap();
2150
2151        assert_eq!(copperlists.next_cl_id(), 0);
2152        assert_eq!(copperlists.last_cl_id(), 0);
2153        assert!(copperlists.peek().is_none());
2154
2155        {
2156            let culist = copperlists.create().unwrap();
2157            culist.msgs.0 = 11;
2158            assert_eq!(culist.id, 0);
2159            assert_eq!(culist.get_state(), CopperListState::Initialized);
2160        }
2161
2162        assert_eq!(copperlists.next_cl_id(), 1);
2163        assert_eq!(copperlists.last_cl_id(), 0);
2164        let peeked = copperlists.peek().unwrap();
2165        assert_eq!(peeked.id, 0);
2166        assert_eq!(peeked.msgs.0, 11);
2167        assert_eq!(peeked.get_state(), CopperListState::Initialized);
2168    }
2169
2170    #[cfg(not(feature = "async-cl-io"))]
2171    #[test]
2172    fn test_sync_reclaimed_slot_reuse_reinitializes_state_but_preserves_payload_storage() {
2173        let mut copperlists = SyncCopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2174
2175        {
2176            let culist = copperlists.create().unwrap();
2177            culist.msgs.0 = 41;
2178            culist.change_state(CopperListState::Processing);
2179            assert_eq!(culist.id, 0);
2180        }
2181
2182        copperlists.end_of_processing(0).unwrap();
2183        assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2184
2185        let reused = copperlists.create().unwrap();
2186        assert_eq!(reused.id, 1);
2187        assert_eq!(reused.get_state(), CopperListState::Initialized);
2188        assert_eq!(reused.msgs.0, 41);
2189    }
2190
2191    #[cfg(all(not(feature = "async-cl-io"), debug_assertions))]
2192    #[test]
2193    #[should_panic(expected = "sync end_of_processing expected exactly one active CopperList #99")]
2194    fn test_sync_end_of_processing_unknown_id_panics_in_debug() {
2195        let mut copperlists = SyncCopperListsManager::<IntMsgs, 2>::new(None).unwrap();
2196
2197        {
2198            let culist = copperlists.create().unwrap();
2199            culist.msgs.0 = 10;
2200            culist.change_state(CopperListState::Processing);
2201        }
2202        {
2203            let culist = copperlists.create().unwrap();
2204            culist.msgs.0 = 20;
2205            culist.change_state(CopperListState::Processing);
2206        }
2207
2208        let _ = copperlists.end_of_processing(99);
2209    }
2210
2211    #[cfg(all(not(feature = "async-cl-io"), debug_assertions))]
2212    #[test]
2213    #[should_panic(expected = "sync end_of_processing expected CopperList #0 to be Processing")]
2214    fn test_sync_end_of_processing_wrong_state_panics_in_debug() {
2215        let mut copperlists = SyncCopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2216
2217        {
2218            let culist = copperlists.create().unwrap();
2219            culist.msgs.0 = 10;
2220            assert_eq!(culist.get_state(), CopperListState::Initialized);
2221        }
2222
2223        let _ = copperlists.end_of_processing(0);
2224    }
2225
2226    #[cfg(not(feature = "async-cl-io"))]
2227    #[test]
2228    fn test_sync_end_of_processing_serializes_done_suffix_from_newest_to_oldest() {
2229        let ids = Arc::new(Mutex::new(Vec::new()));
2230        let mut copperlists =
2231            SyncCopperListsManager::<IntMsgs, 2>::new(Some(Box::new(RecordingSyncWriter {
2232                ids: ids.clone(),
2233                last_log_bytes: 17,
2234                fail_on: None,
2235            })))
2236            .unwrap();
2237
2238        {
2239            let culist = copperlists.create().unwrap();
2240            culist.msgs.0 = 10;
2241            culist.change_state(CopperListState::Processing);
2242        }
2243        {
2244            let culist = copperlists.create().unwrap();
2245            culist.msgs.0 = 20;
2246            culist.change_state(CopperListState::Processing);
2247        }
2248
2249        copperlists.end_of_processing(0).unwrap();
2250        assert!(ids.lock().unwrap().is_empty());
2251        assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2252
2253        copperlists.end_of_processing(1).unwrap();
2254
2255        assert_eq!(*ids.lock().unwrap(), vec![1, 0]);
2256        assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2257    }
2258
2259    #[cfg(not(feature = "async-cl-io"))]
2260    #[test]
2261    fn test_sync_end_of_processing_updates_logger_counters_on_success() {
2262        let ids = Arc::new(Mutex::new(Vec::new()));
2263        let mut copperlists =
2264            SyncCopperListsManager::<IntMsgs, 1>::new(Some(Box::new(RecordingSyncWriter {
2265                ids: ids.clone(),
2266                last_log_bytes: 17,
2267                fail_on: None,
2268            })))
2269            .unwrap();
2270        let io_cache = crate::monitoring::CuMsgIoCache::<1>::default();
2271
2272        {
2273            let culist = copperlists.create().unwrap();
2274            culist.msgs.0 = 10;
2275            culist.change_state(CopperListState::Processing);
2276        }
2277
2278        {
2279            let capture = crate::monitoring::start_copperlist_io_capture(&io_cache);
2280            capture.select_slot(0);
2281            crate::monitoring::record_payload_handle_bytes(32);
2282        }
2283
2284        copperlists.end_of_processing(0).unwrap();
2285
2286        assert_eq!(*ids.lock().unwrap(), vec![0]);
2287        assert_eq!(copperlists.last_encoded_bytes, 17);
2288        assert_eq!(copperlists.last_handle_bytes, 32);
2289        assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2290    }
2291
2292    #[cfg(not(feature = "async-cl-io"))]
2293    #[test]
2294    fn test_sync_end_of_processing_preserves_slot_on_logger_error() {
2295        let ids = Arc::new(Mutex::new(Vec::new()));
2296        let mut copperlists =
2297            SyncCopperListsManager::<IntMsgs, 1>::new(Some(Box::new(RecordingSyncWriter {
2298                ids: ids.clone(),
2299                last_log_bytes: 17,
2300                fail_on: Some(0),
2301            })))
2302            .unwrap();
2303
2304        {
2305            let culist = copperlists.create().unwrap();
2306            culist.change_state(CopperListState::Processing);
2307        }
2308
2309        let err = copperlists.end_of_processing(0).unwrap_err();
2310
2311        assert!(
2312            err.to_string().contains("logger failed for CopperList #0"),
2313            "unexpected error: {err}"
2314        );
2315        assert_eq!(*ids.lock().unwrap(), vec![0]);
2316        assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2317        assert_eq!(copperlists.last_encoded_bytes, 0);
2318        assert_eq!(copperlists.last_handle_bytes, 0);
2319
2320        let peeked = copperlists.peek().unwrap();
2321        assert_eq!(peeked.id, 0);
2322        assert_eq!(peeked.get_state(), CopperListState::BeingSerialized);
2323    }
2324
2325    #[cfg(all(not(feature = "async-cl-io"), feature = "std", debug_assertions))]
2326    #[test]
2327    #[should_panic(
2328        expected = "sync boxed end_of_processing expected CopperList #7 to be Processing"
2329    )]
2330    fn test_sync_end_of_processing_boxed_wrong_state_panics_in_debug() {
2331        let mut copperlists = SyncCopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2332        let culist = Box::new(CopperList::new(7, IntMsgs::default()));
2333
2334        let _ = copperlists.end_of_processing_boxed(culist);
2335    }
2336
2337    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2338    #[derive(Debug, Default)]
2339    struct RecordingWriter {
2340        ids: Arc<Mutex<Vec<u64>>>,
2341    }
2342
2343    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2344    impl WriteStream<CopperList<Msgs>> for RecordingWriter {
2345        fn log(&mut self, culist: &CopperList<Msgs>) -> CuResult<()> {
2346            self.ids.lock().unwrap().push(culist.id);
2347            std::thread::sleep(std::time::Duration::from_millis(2));
2348            Ok(())
2349        }
2350    }
2351
2352    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2353    #[test]
2354    fn test_async_copperlists_manager_flushes_in_order() {
2355        let ids = Arc::new(Mutex::new(Vec::new()));
2356        let mut copperlists = CopperListsManager::<Msgs, 4>::new(Some(Box::new(RecordingWriter {
2357            ids: ids.clone(),
2358        })))
2359        .unwrap();
2360
2361        for expected_id in 0..4 {
2362            let culist = copperlists.create().unwrap();
2363            assert_eq!(culist.id, expected_id);
2364            culist.change_state(CopperListState::Processing);
2365            copperlists.end_of_processing(expected_id).unwrap();
2366        }
2367
2368        copperlists.finish_pending().unwrap();
2369        assert_eq!(copperlists.available_copper_lists().unwrap(), 4);
2370        assert_eq!(*ids.lock().unwrap(), vec![0, 1, 2, 3]);
2371    }
2372
2373    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2374    #[test]
2375    fn test_async_create_reinitializes_reclaimed_slot_state_but_preserves_payload_storage() {
2376        let mut copperlists = CopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2377
2378        {
2379            let culist = copperlists.create().unwrap();
2380            assert_eq!(culist.id, 0);
2381            assert_eq!(culist.get_state(), CopperListState::Initialized);
2382            culist.msgs.0 = 41;
2383            culist.change_state(CopperListState::Processing);
2384        }
2385
2386        copperlists.end_of_processing(0).unwrap();
2387        assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2388
2389        let reused = copperlists.create().unwrap();
2390        assert_eq!(reused.id, 1);
2391        assert_eq!(reused.get_state(), CopperListState::Initialized);
2392        assert_eq!(reused.msgs.0, 41);
2393    }
2394
2395    #[cfg(all(feature = "std", feature = "async-cl-io", debug_assertions))]
2396    #[test]
2397    #[should_panic(expected = "async end_of_processing expected CopperList #0 to be Processing")]
2398    fn test_async_end_of_processing_wrong_state_panics_in_debug() {
2399        let mut copperlists = CopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2400
2401        let culist = copperlists.create().unwrap();
2402        assert_eq!(culist.id, 0);
2403        assert_eq!(culist.get_state(), CopperListState::Initialized);
2404
2405        let _ = copperlists.end_of_processing(0);
2406    }
2407
2408    #[test]
2409    fn test_runtime_task_input_order() {
2410        let mut config = CuConfig::default();
2411        let graph = config.get_graph_mut(None).unwrap();
2412        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
2413        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
2414        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
2415
2416        assert_eq!(src1_id, 0);
2417        assert_eq!(src2_id, 1);
2418
2419        // note that the source2 connection is before the source1
2420        let src1_type = "src1_type";
2421        let src2_type = "src2_type";
2422        graph.connect(src2_id, sink_id, src2_type).unwrap();
2423        graph.connect(src1_id, sink_id, src1_type).unwrap();
2424
2425        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
2426        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
2427        // the edge id depends on the order the connection is created, not
2428        // on the node id, and that is what determines the input order
2429        assert_eq!(src1_edge_id, 1);
2430        assert_eq!(src2_edge_id, 0);
2431
2432        let runtime = compute_runtime_plan(graph).unwrap();
2433        let sink_step = runtime
2434            .steps
2435            .iter()
2436            .find_map(|step| match step {
2437                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
2438                _ => None,
2439            })
2440            .unwrap();
2441
2442        // since the src2 connection was added before src1 connection, the src2 type should be
2443        // first
2444        assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
2445        assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
2446    }
2447
2448    #[test]
2449    fn test_runtime_output_ports_unique_ordered() {
2450        let mut config = CuConfig::default();
2451        let graph = config.get_graph_mut(None).unwrap();
2452        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2453        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2454        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2455        let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
2456        let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
2457
2458        graph.connect(src_id, dst_a_id, "msg::A").unwrap();
2459        graph.connect(src_id, dst_b_id, "msg::B").unwrap();
2460        graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
2461        graph.connect(src_id, dst_c_id, "msg::C").unwrap();
2462
2463        let runtime = compute_runtime_plan(graph).unwrap();
2464        let src_step = runtime
2465            .steps
2466            .iter()
2467            .find_map(|step| match step {
2468                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2469                _ => None,
2470            })
2471            .unwrap();
2472
2473        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2474        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
2475
2476        let dst_a_step = runtime
2477            .steps
2478            .iter()
2479            .find_map(|step| match step {
2480                CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
2481                _ => None,
2482            })
2483            .unwrap();
2484        let dst_b_step = runtime
2485            .steps
2486            .iter()
2487            .find_map(|step| match step {
2488                CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
2489                _ => None,
2490            })
2491            .unwrap();
2492        let dst_a2_step = runtime
2493            .steps
2494            .iter()
2495            .find_map(|step| match step {
2496                CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
2497                _ => None,
2498            })
2499            .unwrap();
2500        let dst_c_step = runtime
2501            .steps
2502            .iter()
2503            .find_map(|step| match step {
2504                CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
2505                _ => None,
2506            })
2507            .unwrap();
2508
2509        assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
2510        assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
2511        assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
2512        assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
2513    }
2514
2515    #[test]
2516    fn test_runtime_output_ports_fanout_single() {
2517        let mut config = CuConfig::default();
2518        let graph = config.get_graph_mut(None).unwrap();
2519        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2520        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2521        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2522
2523        graph.connect(src_id, dst_a_id, "i32").unwrap();
2524        graph.connect(src_id, dst_b_id, "i32").unwrap();
2525
2526        let runtime = compute_runtime_plan(graph).unwrap();
2527        let src_step = runtime
2528            .steps
2529            .iter()
2530            .find_map(|step| match step {
2531                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2532                _ => None,
2533            })
2534            .unwrap();
2535
2536        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2537        assert_eq!(output_pack.msg_types, vec!["i32"]);
2538    }
2539
2540    #[test]
2541    fn test_runtime_output_ports_include_nc_outputs() {
2542        let mut config = CuConfig::default();
2543        let graph = config.get_graph_mut(None).unwrap();
2544        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2545        let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
2546        graph.connect(src_id, dst_id, "msg::A").unwrap();
2547        graph
2548            .get_node_mut(src_id)
2549            .expect("missing source node")
2550            .add_nc_output("msg::B", usize::MAX);
2551
2552        let runtime = compute_runtime_plan(graph).unwrap();
2553        let src_step = runtime
2554            .steps
2555            .iter()
2556            .find_map(|step| match step {
2557                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2558                _ => None,
2559            })
2560            .unwrap();
2561        let dst_step = runtime
2562            .steps
2563            .iter()
2564            .find_map(|step| match step {
2565                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2566                _ => None,
2567            })
2568            .unwrap();
2569
2570        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2571        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2572        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
2573    }
2574
2575    #[test]
2576    fn test_runtime_plan_infers_regular_task_when_outputs_are_nc_only() {
2577        let txt = r#"(
2578            tasks: [
2579                (id: "src", type: "a"),
2580                (id: "regular", type: "b"),
2581            ],
2582            cnx: [
2583                (src: "src", dst: "regular", msg: "msg::A"),
2584                (src: "regular", dst: "__nc__", msg: "msg::B"),
2585            ]
2586        )"#;
2587        let config = CuConfig::deserialize_ron(txt).unwrap();
2588        let graph = config.get_graph(None).unwrap();
2589        let regular_id = graph.get_node_id_by_name("regular").unwrap();
2590
2591        let runtime = compute_runtime_plan(graph).unwrap();
2592        let regular_step = runtime
2593            .steps
2594            .iter()
2595            .find_map(|step| match step {
2596                CuExecutionUnit::Step(step) if step.node_id == regular_id => Some(step),
2597                _ => None,
2598            })
2599            .unwrap();
2600
2601        assert_eq!(regular_step.task_type, CuTaskType::Regular);
2602        assert_eq!(
2603            regular_step.output_msg_pack.as_ref().unwrap().msg_types,
2604            vec!["msg::B"]
2605        );
2606    }
2607
2608    #[test]
2609    fn test_runtime_output_ports_respect_connection_order_with_nc() {
2610        let txt = r#"(
2611            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2612            cnx: [
2613                (src: "src", dst: "__nc__", msg: "msg::A"),
2614                (src: "src", dst: "sink", msg: "msg::B"),
2615            ]
2616        )"#;
2617        let config = CuConfig::deserialize_ron(txt).unwrap();
2618        let graph = config.get_graph(None).unwrap();
2619        let src_id = graph.get_node_id_by_name("src").unwrap();
2620        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2621
2622        let runtime = compute_runtime_plan(graph).unwrap();
2623        let src_step = runtime
2624            .steps
2625            .iter()
2626            .find_map(|step| match step {
2627                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2628                _ => None,
2629            })
2630            .unwrap();
2631        let dst_step = runtime
2632            .steps
2633            .iter()
2634            .find_map(|step| match step {
2635                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2636                _ => None,
2637            })
2638            .unwrap();
2639
2640        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2641        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2642        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2643    }
2644
2645    #[cfg(feature = "std")]
2646    #[test]
2647    fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
2648        let txt = r#"(
2649            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2650            cnx: [
2651                (src: "src", dst: "__nc__", msg: "msg::A"),
2652                (src: "src", dst: "sink", msg: "msg::B"),
2653            ]
2654        )"#;
2655        let tmp = tempfile::NamedTempFile::new().unwrap();
2656        std::fs::write(tmp.path(), txt).unwrap();
2657        let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
2658        let graph = config.get_graph(None).unwrap();
2659        let src_id = graph.get_node_id_by_name("src").unwrap();
2660        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2661
2662        let runtime = compute_runtime_plan(graph).unwrap();
2663        let src_step = runtime
2664            .steps
2665            .iter()
2666            .find_map(|step| match step {
2667                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2668                _ => None,
2669            })
2670            .unwrap();
2671        let dst_step = runtime
2672            .steps
2673            .iter()
2674            .find_map(|step| match step {
2675                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2676                _ => None,
2677            })
2678            .unwrap();
2679
2680        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2681        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2682        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2683    }
2684
2685    #[test]
2686    fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
2687        let txt = r#"(
2688            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2689            cnx: [
2690                (src: "src", dst: "__nc__", msg: "i32"),
2691                (src: "src", dst: "sink", msg: "bool"),
2692            ]
2693        )"#;
2694        let config = CuConfig::deserialize_ron(txt).unwrap();
2695        let graph = config.get_graph(None).unwrap();
2696        let src_id = graph.get_node_id_by_name("src").unwrap();
2697        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2698
2699        let runtime = compute_runtime_plan(graph).unwrap();
2700        let src_step = runtime
2701            .steps
2702            .iter()
2703            .find_map(|step| match step {
2704                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2705                _ => None,
2706            })
2707            .unwrap();
2708        let dst_step = runtime
2709            .steps
2710            .iter()
2711            .find_map(|step| match step {
2712                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2713                _ => None,
2714            })
2715            .unwrap();
2716
2717        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2718        assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
2719        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2720    }
2721
2722    #[test]
2723    fn test_runtime_plan_diamond_case1() {
2724        // more complex topology that tripped the scheduler
2725        let mut config = CuConfig::default();
2726        let graph = config.get_graph_mut(None).unwrap();
2727        let cam0_id = graph
2728            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2729            .unwrap();
2730        let inf0_id = graph
2731            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2732            .unwrap();
2733        let broadcast_id = graph
2734            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2735            .unwrap();
2736
2737        // case 1 order
2738        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2739        graph.connect(cam0_id, inf0_id, "i32").unwrap();
2740        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2741
2742        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2743        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
2744
2745        assert_eq!(edge_cam0_to_inf0, 0);
2746        assert_eq!(edge_cam0_to_broadcast, 1);
2747
2748        let runtime = compute_runtime_plan(graph).unwrap();
2749        let broadcast_step = runtime
2750            .steps
2751            .iter()
2752            .find_map(|step| match step {
2753                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2754                _ => None,
2755            })
2756            .unwrap();
2757
2758        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2759        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2760    }
2761
2762    #[test]
2763    fn test_runtime_plan_diamond_case2() {
2764        // more complex topology that tripped the scheduler variation 2
2765        let mut config = CuConfig::default();
2766        let graph = config.get_graph_mut(None).unwrap();
2767        let cam0_id = graph
2768            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2769            .unwrap();
2770        let inf0_id = graph
2771            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2772            .unwrap();
2773        let broadcast_id = graph
2774            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2775            .unwrap();
2776
2777        // case 2 order
2778        graph.connect(cam0_id, inf0_id, "i32").unwrap();
2779        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2780        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2781
2782        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2783        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
2784
2785        assert_eq!(edge_cam0_to_broadcast, 0);
2786        assert_eq!(edge_cam0_to_inf0, 1);
2787
2788        let runtime = compute_runtime_plan(graph).unwrap();
2789        let broadcast_step = runtime
2790            .steps
2791            .iter()
2792            .find_map(|step| match step {
2793                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2794                _ => None,
2795            })
2796            .unwrap();
2797
2798        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2799        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2800    }
2801}