Skip to main content

cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::app::Subsystem;
6use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
7use crate::config::{CuConfig, CuGraph, MAX_RATE_TARGET_HZ, NodeId, RuntimeConfig};
8use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
9use crate::cutask::{BincodeAdapter, Freezable};
10#[cfg(feature = "std")]
11use crate::monitoring::ExecutionProbeHandle;
12#[cfg(feature = "std")]
13use crate::monitoring::MonitorExecutionProbe;
14use crate::monitoring::{
15    ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
16    ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
17    take_last_completed_handle_bytes,
18};
19#[cfg(all(feature = "std", feature = "parallel-rt"))]
20use crate::parallel_rt::{ParallelRt, ParallelRtMetadata};
21use crate::resource::ResourceManager;
22use compact_str::CompactString;
23use cu29_clock::{ClockProvider, CuDuration, CuTime, RobotClock};
24use cu29_traits::CuResult;
25use cu29_traits::WriteStream;
26use cu29_traits::{CopperListTuple, CuError};
27
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
31#[cfg(target_os = "none")]
32#[allow(unused_imports)]
33use cu29_log_derive::info;
34#[cfg(target_os = "none")]
35#[allow(unused_imports)]
36use cu29_log_runtime::log;
37#[cfg(all(target_os = "none", debug_assertions))]
38#[allow(unused_imports)]
39use cu29_log_runtime::log_debug_mode;
40#[cfg(target_os = "none")]
41#[allow(unused_imports)]
42use cu29_value::to_value;
43
44#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
45use alloc::alloc::{alloc_zeroed, handle_alloc_error};
46use alloc::boxed::Box;
47use alloc::collections::{BTreeSet, VecDeque};
48use alloc::format;
49use alloc::string::{String, ToString};
50use alloc::vec::Vec;
51use bincode::enc::EncoderImpl;
52use bincode::enc::write::{SizeWriter, SliceWriter};
53use bincode::error::EncodeError;
54use bincode::{Decode, Encode};
55#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
56use core::alloc::Layout;
57use core::fmt::Result as FmtResult;
58use core::fmt::{Debug, Formatter};
59use core::marker::PhantomData;
60
61#[cfg(all(feature = "std", feature = "async-cl-io"))]
62use std::sync::mpsc::{Receiver, SyncSender, TryRecvError, sync_channel};
63#[cfg(all(feature = "std", feature = "async-cl-io"))]
64use std::thread::JoinHandle;
65
66pub type TasksInstantiator<CT> =
67    for<'c> fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>;
68pub type BridgesInstantiator<CB> = fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>;
69pub type MonitorInstantiator<M> = fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M;
70
71pub struct CuRuntimeParts<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI> {
72    pub tasks_instanciator: TI,
73    pub monitored_components: &'static [MonitorComponentMetadata],
74    pub culist_component_mapping: &'static [ComponentId],
75    #[cfg(all(feature = "std", feature = "parallel-rt"))]
76    pub parallel_rt_metadata: &'static ParallelRtMetadata,
77    pub monitor_instanciator: MI,
78    pub bridges_instanciator: BI,
79    _payload: PhantomData<(CT, CB, P, M, [(); NBCL])>,
80}
81
82impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI>
83    CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>
84{
85    pub const fn new(
86        tasks_instanciator: TI,
87        monitored_components: &'static [MonitorComponentMetadata],
88        culist_component_mapping: &'static [ComponentId],
89        #[cfg(all(feature = "std", feature = "parallel-rt"))]
90        parallel_rt_metadata: &'static ParallelRtMetadata,
91        monitor_instanciator: MI,
92        bridges_instanciator: BI,
93    ) -> Self {
94        Self {
95            tasks_instanciator,
96            monitored_components,
97            culist_component_mapping,
98            #[cfg(all(feature = "std", feature = "parallel-rt"))]
99            parallel_rt_metadata,
100            monitor_instanciator,
101            bridges_instanciator,
102            _payload: PhantomData,
103        }
104    }
105}
106
107pub struct CuRuntimeBuilder<
108    'cfg,
109    CT,
110    CB,
111    P: CopperListTuple,
112    M: CuMonitor,
113    const NBCL: usize,
114    TI,
115    BI,
116    MI,
117    CLW,
118    KFW,
119> {
120    clock: RobotClock,
121    config: &'cfg CuConfig,
122    mission: &'cfg str,
123    subsystem: Subsystem,
124    instance_id: u32,
125    resources: Option<ResourceManager>,
126    parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
127    copperlists_logger: CLW,
128    keyframes_logger: KFW,
129}
130
131impl<'cfg, CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI, CLW, KFW>
132    CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
133{
134    pub fn new(
135        clock: RobotClock,
136        config: &'cfg CuConfig,
137        mission: &'cfg str,
138        parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
139        copperlists_logger: CLW,
140        keyframes_logger: KFW,
141    ) -> Self {
142        Self {
143            clock,
144            config,
145            mission,
146            subsystem: Subsystem::new(None, 0),
147            instance_id: 0,
148            resources: None,
149            parts,
150            copperlists_logger,
151            keyframes_logger,
152        }
153    }
154
155    pub fn with_subsystem(mut self, subsystem: Subsystem) -> Self {
156        self.subsystem = subsystem;
157        self
158    }
159
160    pub fn with_instance_id(mut self, instance_id: u32) -> Self {
161        self.instance_id = instance_id;
162        self
163    }
164
165    pub fn with_resources(mut self, resources: ResourceManager) -> Self {
166        self.resources = Some(resources);
167        self
168    }
169
170    pub fn try_with_resources_instantiator(
171        mut self,
172        resources_instantiator: impl FnOnce(&CuConfig) -> CuResult<ResourceManager>,
173    ) -> CuResult<Self> {
174        self.resources = Some(resources_instantiator(self.config)?);
175        Ok(self)
176    }
177}
178
179/// Returns a monotonic instant used for local runtime performance timing.
180///
181/// When `sysclock-perf` (and `std`) are enabled this uses a process-local
182/// `RobotClock::new()` instance for timing. The returned value is a
183/// monotonically increasing duration since an unspecified origin (typically
184/// process or runtime initialization), not a wall-clock time-of-day. When
185/// `sysclock-perf` is disabled it delegates to the provided `RobotClock`.
186///
187/// This is intentionally separate from `LoopRateLimiter`, which always uses the
188/// provided `RobotClock` so `runtime.rate_target_hz` stays tied to robot time.
189#[inline]
190pub fn perf_now(_clock: &RobotClock) -> CuTime {
191    #[cfg(all(feature = "std", feature = "sysclock-perf"))]
192    {
193        static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
194        return PERF_CLOCK.get_or_init(RobotClock::new).now();
195    }
196
197    #[allow(unreachable_code)]
198    _clock.now()
199}
200
201#[cfg(all(feature = "std", feature = "high-precision-limiter"))]
202const HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS: u64 = 200_000;
203
204/// Convert a configured runtime rate target to an integer-nanosecond period.
205#[inline]
206pub fn rate_target_period(rate_target_hz: u64) -> CuResult<CuDuration> {
207    if rate_target_hz == 0 {
208        return Err(CuError::from(
209            "Runtime rate target cannot be zero. Set runtime.rate_target_hz to at least 1.",
210        ));
211    }
212
213    if rate_target_hz > MAX_RATE_TARGET_HZ {
214        return Err(CuError::from(format!(
215            "Runtime rate target ({rate_target_hz} Hz) exceeds the supported maximum of {MAX_RATE_TARGET_HZ} Hz."
216        )));
217    }
218
219    Ok(CuDuration::from(MAX_RATE_TARGET_HZ / rate_target_hz))
220}
221
222/// Runtime loop limiter that preserves phase with absolute deadlines.
223///
224/// This is intentionally a small runtime helper so generated applications do
225/// not have to open-code loop scheduling policy. Deadlines are tracked against
226/// the provided `RobotClock`, even when `sysclock-perf` is enabled for
227/// process-time measurements.
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229pub struct LoopRateLimiter {
230    period: CuDuration,
231    next_deadline: CuTime,
232}
233
234impl LoopRateLimiter {
235    #[inline]
236    pub fn from_rate_target_hz(rate_target_hz: u64, clock: &RobotClock) -> CuResult<Self> {
237        let period = rate_target_period(rate_target_hz)?;
238        Ok(Self {
239            period,
240            next_deadline: clock.now() + period,
241        })
242    }
243
244    #[inline]
245    pub fn is_ready(&self, clock: &RobotClock) -> bool {
246        self.remaining(clock).is_none()
247    }
248
249    #[inline]
250    pub fn remaining(&self, clock: &RobotClock) -> Option<CuDuration> {
251        let now = clock.now();
252        if now < self.next_deadline {
253            Some(self.next_deadline - now)
254        } else {
255            None
256        }
257    }
258
259    #[inline]
260    pub fn wait_until_ready(&self, clock: &RobotClock) {
261        let deadline = self.next_deadline;
262        let Some(remaining) = self.remaining(clock) else {
263            return;
264        };
265
266        #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
267        {
268            let spin_window = self.spin_window();
269            if remaining > spin_window {
270                std::thread::sleep(std::time::Duration::from(remaining - spin_window));
271            }
272            while clock.now() < deadline {
273                core::hint::spin_loop();
274            }
275        }
276
277        #[cfg(all(feature = "std", not(feature = "high-precision-limiter")))]
278        {
279            let _ = deadline;
280            std::thread::sleep(std::time::Duration::from(remaining));
281        }
282
283        #[cfg(not(feature = "std"))]
284        {
285            let _ = remaining;
286            while clock.now() < deadline {
287                core::hint::spin_loop();
288            }
289        }
290    }
291
292    #[inline]
293    pub fn mark_tick(&mut self, clock: &RobotClock) {
294        self.advance_from(clock.now());
295    }
296
297    #[inline]
298    pub fn limit(&mut self, clock: &RobotClock) {
299        self.wait_until_ready(clock);
300        self.mark_tick(clock);
301    }
302
303    #[inline]
304    fn advance_from(&mut self, now: CuTime) {
305        let steps = if now < self.next_deadline {
306            1
307        } else {
308            (now - self.next_deadline).as_nanos() / self.period.as_nanos() + 1
309        };
310        self.next_deadline += steps * self.period;
311    }
312
313    #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
314    #[inline]
315    fn spin_window(&self) -> CuDuration {
316        let _ = self.period;
317        CuDuration::from(HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS)
318    }
319
320    #[cfg(test)]
321    #[inline]
322    fn next_deadline(&self) -> CuTime {
323        self.next_deadline
324    }
325}
326
327#[cfg(all(feature = "std", feature = "async-cl-io"))]
328#[doc(hidden)]
329pub trait AsyncCopperListPayload: Send {}
330
331#[cfg(all(feature = "std", feature = "async-cl-io"))]
332impl<T: Send> AsyncCopperListPayload for T {}
333
334#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
335#[doc(hidden)]
336pub trait AsyncCopperListPayload {}
337
338#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
339impl<T> AsyncCopperListPayload for T {}
340
341/// Control-flow result returned by one generated process stage.
342///
343/// `AbortCopperList` preserves the current runtime semantics for monitor
344/// decisions that abort the current CopperList without shutting the runtime
345/// down. The outer driver remains responsible for ordered cleanup and log
346/// handoff.
347#[derive(Clone, Copy, Debug, PartialEq, Eq)]
348pub enum ProcessStepOutcome {
349    Continue,
350    AbortCopperList,
351}
352
353/// Result type used by generated process-step functions.
354pub type ProcessStepResult = CuResult<ProcessStepOutcome>;
355
356/// Manages the lifecycle of the copper lists and logging on the synchronous path.
357pub struct SyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
358    inner: CuListsManager<P, NBCL>,
359    /// Logger for the copper lists (messages between tasks)
360    logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
361    /// Last encoded size returned by logger.log
362    pub last_encoded_bytes: u64,
363    /// Last handle-backed payload bytes observed during logger.log
364    pub last_handle_bytes: u64,
365}
366
367impl<P: CopperListTuple + Default, const NBCL: usize> SyncCopperListsManager<P, NBCL> {
368    pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
369    where
370        P: CuListZeroedInit,
371    {
372        Ok(Self {
373            inner: CuListsManager::new(),
374            logger,
375            last_encoded_bytes: 0,
376            last_handle_bytes: 0,
377        })
378    }
379
380    pub fn next_cl_id(&self) -> u64 {
381        self.inner.next_cl_id()
382    }
383
384    pub fn last_cl_id(&self) -> u64 {
385        self.inner.last_cl_id()
386    }
387
388    pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
389        self.inner
390            .create()
391            .ok_or_else(|| CuError::from("Ran out of space for copper lists"))
392    }
393
394    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
395        let mut is_top = true;
396        let mut nb_done = 0;
397        self.last_handle_bytes = 0;
398        for cl in self.inner.iter_mut() {
399            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
400                cl.change_state(CopperListState::DoneProcessing);
401            }
402            if is_top && cl.get_state() == CopperListState::DoneProcessing {
403                if let Some(logger) = &mut self.logger {
404                    cl.change_state(CopperListState::BeingSerialized);
405                    logger.log(cl)?;
406                    self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
407                    self.last_handle_bytes = take_last_completed_handle_bytes();
408                }
409                cl.change_state(CopperListState::Free);
410                nb_done += 1;
411            } else {
412                is_top = false;
413            }
414        }
415        for _ in 0..nb_done {
416            let _ = self.inner.pop();
417        }
418        Ok(())
419    }
420
421    pub fn finish_pending(&mut self) -> CuResult<()> {
422        Ok(())
423    }
424
425    pub fn available_copper_lists(&mut self) -> CuResult<usize> {
426        Ok(NBCL - self.inner.len())
427    }
428
429    #[cfg(feature = "std")]
430    pub fn end_of_processing_boxed(
431        &mut self,
432        mut culist: Box<CopperList<P>>,
433    ) -> CuResult<OwnedCopperListSubmission<P>> {
434        culist.change_state(CopperListState::DoneProcessing);
435        self.last_encoded_bytes = 0;
436        self.last_handle_bytes = 0;
437        if let Some(logger) = &mut self.logger {
438            culist.change_state(CopperListState::BeingSerialized);
439            logger.log(&culist)?;
440            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
441            self.last_handle_bytes = take_last_completed_handle_bytes();
442        }
443        culist.change_state(CopperListState::Free);
444        Ok(OwnedCopperListSubmission::Recycled(culist))
445    }
446
447    #[cfg(feature = "std")]
448    pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
449        Ok(None)
450    }
451
452    #[cfg(feature = "std")]
453    pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
454        Err(CuError::from(
455            "Synchronous CopperList I/O cannot block waiting for boxed completions",
456        ))
457    }
458
459    #[cfg(feature = "std")]
460    pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
461        Ok(Vec::new())
462    }
463}
464
465/// Result of handing an owned boxed CopperList to the runtime-side CL I/O path.
466#[cfg(feature = "std")]
467pub enum OwnedCopperListSubmission<P: CopperListTuple> {
468    /// The CL has been fully handled and can be recycled immediately by the caller.
469    Recycled(Box<CopperList<P>>),
470    /// The CL was queued asynchronously and will be returned by a later reclaim call.
471    Pending,
472}
473
474#[cfg(all(feature = "std", feature = "async-cl-io"))]
475struct AsyncCopperListCompletion<P: CopperListTuple> {
476    culist: Box<CopperList<P>>,
477    log_result: CuResult<(u64, u64)>,
478}
479
480#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
481fn allocate_zeroed_copperlist<P>() -> Box<CopperList<P>>
482where
483    P: CopperListTuple + CuListZeroedInit,
484{
485    // SAFETY: We allocate zeroed memory and immediately initialize required fields.
486    let mut culist = unsafe {
487        let layout = Layout::new::<CopperList<P>>();
488        let ptr = alloc_zeroed(layout) as *mut CopperList<P>;
489        if ptr.is_null() {
490            handle_alloc_error(layout);
491        }
492        Box::from_raw(ptr)
493    };
494    culist.msgs.init_zeroed();
495    culist
496}
497
498#[cfg(all(feature = "std", feature = "parallel-rt"))]
499pub fn allocate_boxed_copperlists<P, const NBCL: usize>() -> Vec<Box<CopperList<P>>>
500where
501    P: CopperListTuple + CuListZeroedInit,
502{
503    let mut free_pool = Vec::with_capacity(NBCL);
504    for _ in 0..NBCL {
505        free_pool.push(allocate_zeroed_copperlist::<P>());
506    }
507    free_pool
508}
509
510/// Manages the lifecycle of the copper lists and logging on the asynchronous path.
511#[cfg(all(feature = "std", feature = "async-cl-io"))]
512pub struct AsyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
513    free_pool: Vec<Box<CopperList<P>>>,
514    current: Option<Box<CopperList<P>>>,
515    pending_count: usize,
516    next_cl_id: u64,
517    pending_sender: Option<SyncSender<Box<CopperList<P>>>>,
518    completion_receiver: Option<Receiver<AsyncCopperListCompletion<P>>>,
519    worker_handle: Option<JoinHandle<()>>,
520    /// Last encoded size returned by logger.log
521    pub last_encoded_bytes: u64,
522    /// Last handle-backed payload bytes observed during logger.log
523    pub last_handle_bytes: u64,
524}
525
526#[cfg(all(feature = "std", feature = "async-cl-io"))]
527impl<P: CopperListTuple + Default, const NBCL: usize> AsyncCopperListsManager<P, NBCL> {
528    pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
529    where
530        P: CuListZeroedInit + AsyncCopperListPayload + 'static,
531    {
532        let mut free_pool = Vec::with_capacity(NBCL);
533        for _ in 0..NBCL {
534            free_pool.push(allocate_zeroed_copperlist::<P>());
535        }
536
537        let (pending_sender, completion_receiver, worker_handle) = if let Some(mut logger) = logger
538        {
539            let (pending_sender, pending_receiver) = sync_channel::<Box<CopperList<P>>>(NBCL);
540            let (completion_sender, completion_receiver) =
541                sync_channel::<AsyncCopperListCompletion<P>>(NBCL);
542            let worker_handle = std::thread::Builder::new()
543                .name("cu-async-cl-io".to_string())
544                .spawn(move || {
545                    while let Ok(mut culist) = pending_receiver.recv() {
546                        culist.change_state(CopperListState::BeingSerialized);
547                        let log_result = logger.log(&culist).map(|_| {
548                            (
549                                logger.last_log_bytes().unwrap_or(0) as u64,
550                                take_last_completed_handle_bytes(),
551                            )
552                        });
553                        let should_stop = log_result.is_err();
554                        if completion_sender
555                            .send(AsyncCopperListCompletion { culist, log_result })
556                            .is_err()
557                        {
558                            break;
559                        }
560                        if should_stop {
561                            break;
562                        }
563                    }
564                })
565                .map_err(|e| {
566                    CuError::from("Failed to spawn async CopperList serializer thread")
567                        .add_cause(e.to_string().as_str())
568                })?;
569            (
570                Some(pending_sender),
571                Some(completion_receiver),
572                Some(worker_handle),
573            )
574        } else {
575            (None, None, None)
576        };
577
578        Ok(Self {
579            free_pool,
580            current: None,
581            pending_count: 0,
582            next_cl_id: 0,
583            pending_sender,
584            completion_receiver,
585            worker_handle,
586            last_encoded_bytes: 0,
587            last_handle_bytes: 0,
588        })
589    }
590
591    pub fn next_cl_id(&self) -> u64 {
592        self.next_cl_id
593    }
594
595    pub fn last_cl_id(&self) -> u64 {
596        self.next_cl_id.saturating_sub(1)
597    }
598
599    pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
600        if self.current.is_some() {
601            return Err(CuError::from(
602                "Attempted to create a CopperList while another one is still active",
603            ));
604        }
605
606        self.reclaim_completed()?;
607        while self.free_pool.is_empty() {
608            self.wait_for_completion()?;
609        }
610
611        let culist = self
612            .free_pool
613            .pop()
614            .ok_or_else(|| CuError::from("Ran out of space for copper lists"))?;
615        self.current = Some(culist);
616
617        let current = self
618            .current
619            .as_mut()
620            .expect("current CopperList is missing");
621        current.id = self.next_cl_id;
622        current.change_state(CopperListState::Initialized);
623        self.next_cl_id += 1;
624        Ok(current.as_mut())
625    }
626
627    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
628        self.reclaim_completed()?;
629
630        let mut culist = self.current.take().ok_or_else(|| {
631            CuError::from("Attempted to finish processing without an active CopperList")
632        })?;
633
634        if culist.id != culistid {
635            return Err(CuError::from(format!(
636                "Attempted to finish CopperList #{culistid} while CopperList #{} is active",
637                culist.id
638            )));
639        }
640
641        culist.change_state(CopperListState::DoneProcessing);
642        self.last_encoded_bytes = 0;
643        self.last_handle_bytes = 0;
644
645        if let Some(pending_sender) = &self.pending_sender {
646            culist.change_state(CopperListState::QueuedForSerialization);
647            pending_sender.send(culist).map_err(|e| {
648                CuError::from("Failed to enqueue CopperList for async serialization")
649                    .add_cause(e.to_string().as_str())
650            })?;
651            self.pending_count += 1;
652            self.reclaim_completed()?;
653        } else {
654            culist.change_state(CopperListState::Free);
655            self.free_pool.push(culist);
656        }
657
658        Ok(())
659    }
660
661    pub fn finish_pending(&mut self) -> CuResult<()> {
662        if self.current.is_some() {
663            return Err(CuError::from(
664                "Cannot flush CopperList I/O while a CopperList is still active",
665            ));
666        }
667
668        while self.pending_count > 0 {
669            self.wait_for_completion()?;
670        }
671        Ok(())
672    }
673
674    pub fn available_copper_lists(&mut self) -> CuResult<usize> {
675        self.reclaim_completed()?;
676        Ok(self.free_pool.len())
677    }
678
679    pub fn end_of_processing_boxed(
680        &mut self,
681        mut culist: Box<CopperList<P>>,
682    ) -> CuResult<OwnedCopperListSubmission<P>> {
683        self.reclaim_completed()?;
684        culist.change_state(CopperListState::DoneProcessing);
685        self.last_encoded_bytes = 0;
686        self.last_handle_bytes = 0;
687
688        if let Some(pending_sender) = &self.pending_sender {
689            culist.change_state(CopperListState::QueuedForSerialization);
690            pending_sender.send(culist).map_err(|e| {
691                CuError::from("Failed to enqueue CopperList for async serialization")
692                    .add_cause(e.to_string().as_str())
693            })?;
694            self.pending_count += 1;
695            self.reclaim_completed()?;
696            Ok(OwnedCopperListSubmission::Pending)
697        } else {
698            culist.change_state(CopperListState::Free);
699            Ok(OwnedCopperListSubmission::Recycled(culist))
700        }
701    }
702
703    pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
704        let recv_result = {
705            let Some(completion_receiver) = self.completion_receiver.as_ref() else {
706                return Ok(None);
707            };
708            completion_receiver.try_recv()
709        };
710        match recv_result {
711            Ok(completion) => self.handle_completion(completion).map(Some),
712            Err(TryRecvError::Empty) => Ok(None),
713            Err(TryRecvError::Disconnected) => Err(CuError::from(
714                "Async CopperList serializer thread disconnected unexpectedly",
715            )),
716        }
717    }
718
719    pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
720        let completion = self
721            .completion_receiver
722            .as_ref()
723            .ok_or_else(|| {
724                CuError::from("No async CopperList serializer is active to return a free slot")
725            })?
726            .recv()
727            .map_err(|e| {
728                CuError::from("Failed to receive completion from async CopperList serializer")
729                    .add_cause(e.to_string().as_str())
730            })?;
731        self.handle_completion(completion)
732    }
733
734    pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
735        let mut reclaimed = Vec::with_capacity(self.pending_count);
736        if self.current.is_some() {
737            return Err(CuError::from(
738                "Cannot flush CopperList I/O while a CopperList is still active",
739            ));
740        }
741        while self.pending_count > 0 {
742            reclaimed.push(self.wait_reclaim_boxed()?);
743        }
744        Ok(reclaimed)
745    }
746
747    fn reclaim_completed(&mut self) -> CuResult<()> {
748        loop {
749            let Some(culist) = self.try_reclaim_boxed()? else {
750                break;
751            };
752            self.free_pool.push(culist);
753        }
754        Ok(())
755    }
756
757    fn wait_for_completion(&mut self) -> CuResult<()> {
758        let culist = self.wait_reclaim_boxed()?;
759        self.free_pool.push(culist);
760        Ok(())
761    }
762
763    fn handle_completion(
764        &mut self,
765        mut completion: AsyncCopperListCompletion<P>,
766    ) -> CuResult<Box<CopperList<P>>> {
767        self.pending_count = self.pending_count.saturating_sub(1);
768        if let Ok((encoded_bytes, handle_bytes)) = completion.log_result.as_ref() {
769            self.last_encoded_bytes = *encoded_bytes;
770            self.last_handle_bytes = *handle_bytes;
771        }
772        completion.culist.change_state(CopperListState::Free);
773        completion.log_result?;
774        Ok(completion.culist)
775    }
776
777    fn shutdown_worker(&mut self) -> CuResult<()> {
778        self.finish_pending()?;
779        self.pending_sender.take();
780        if let Some(worker_handle) = self.worker_handle.take() {
781            worker_handle.join().map_err(|_| {
782                CuError::from("Async CopperList serializer thread panicked while joining")
783            })?;
784        }
785        Ok(())
786    }
787}
788
789#[cfg(all(feature = "std", feature = "async-cl-io"))]
790impl<P: CopperListTuple + Default, const NBCL: usize> Drop for AsyncCopperListsManager<P, NBCL> {
791    fn drop(&mut self) {
792        let _ = self.shutdown_worker();
793    }
794}
795
796#[cfg(all(feature = "std", feature = "async-cl-io"))]
797pub type CopperListsManager<P, const NBCL: usize> = AsyncCopperListsManager<P, NBCL>;
798
799#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
800pub type CopperListsManager<P, const NBCL: usize> = SyncCopperListsManager<P, NBCL>;
801
802/// Manages the frozen tasks state and logging.
803pub struct KeyFramesManager {
804    /// Where the serialized tasks are stored following the wave of execution of a CL.
805    inner: KeyFrame,
806
807    /// Optional override for the timestamp to stamp the next keyframe (used by deterministic replay).
808    forced_timestamp: Option<CuTime>,
809
810    /// If set, reuse this keyframe verbatim (e.g., during replay) instead of re-freezing state.
811    locked: bool,
812
813    /// Logger for the state of the tasks (frozen tasks)
814    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
815
816    /// Capture a keyframe only each...
817    keyframe_interval: u32,
818
819    /// Bytes written by the last keyframe log
820    pub last_encoded_bytes: u64,
821}
822
823impl KeyFramesManager {
824    fn is_keyframe(&self, culistid: u64) -> bool {
825        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
826    }
827
828    #[inline]
829    pub fn captures_keyframe(&self, culistid: u64) -> bool {
830        self.is_keyframe(culistid)
831    }
832
833    pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
834        if self.is_keyframe(culistid) {
835            // If a recorded keyframe was preloaded for this CL, keep it as-is.
836            if self.locked && self.inner.culistid == culistid {
837                return;
838            }
839            let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
840            self.inner.reset(culistid, ts);
841            self.locked = false;
842        }
843    }
844
845    /// Force the timestamp of the next keyframe to a given value.
846    #[cfg(feature = "std")]
847    pub fn set_forced_timestamp(&mut self, ts: CuTime) {
848        self.forced_timestamp = Some(ts);
849    }
850
851    pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
852        if self.is_keyframe(culistid) {
853            if self.locked {
854                // We are replaying a recorded keyframe verbatim; don't mutate it.
855                return Ok(0);
856            }
857            if self.inner.culistid != culistid {
858                return Err(CuError::from(format!(
859                    "Freezing task for culistid {} but current keyframe is {}",
860                    culistid, self.inner.culistid
861                )));
862            }
863            self.inner
864                .add_frozen_task(task)
865                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
866        } else {
867            Ok(0)
868        }
869    }
870
871    /// Generic helper to freeze any `Freezable` state (task or bridge) into the current keyframe.
872    pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
873        self.freeze_task(culistid, item)
874    }
875
876    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
877        if self.is_keyframe(culistid) {
878            let logger = self.logger.as_mut().unwrap();
879            logger.log(&self.inner)?;
880            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
881            // Clear the lock so the next CL can rebuild normally unless re-locked.
882            self.locked = false;
883            Ok(())
884        } else {
885            // Not a keyframe for this CL; ensure we don't carry stale sizes forward.
886            self.last_encoded_bytes = 0;
887            Ok(())
888        }
889    }
890
891    /// Preload a recorded keyframe so it is logged verbatim on the matching CL.
892    #[cfg(feature = "std")]
893    pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
894        self.inner = keyframe.clone();
895        self.forced_timestamp = Some(keyframe.timestamp);
896        self.locked = true;
897    }
898}
899
900/// This is the main structure that will be injected as a member of the Application struct.
901/// CT is the tuple of all the tasks in order of execution.
902/// CL is the type of the copper list, representing the input/output messages for all the tasks.
903pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
904    /// The base clock the runtime will be using to record time.
905    pub clock: RobotClock, // TODO: remove public at some point
906
907    /// Compile-time subsystem identity for this Copper process.
908    subsystem_code: u16,
909
910    /// Deployment/runtime instance identity for this Copper process.
911    pub instance_id: u32,
912
913    /// The tuple of all the tasks in order of execution.
914    pub tasks: CT,
915
916    /// Tuple of all instantiated bridges.
917    pub bridges: CB,
918
919    /// Resource registry kept alive for tasks borrowing shared handles.
920    pub resources: ResourceManager,
921
922    /// The runtime monitoring.
923    pub monitor: M,
924
925    /// Runtime-side execution progress probe for watchdog/diagnostic monitors.
926    ///
927    /// This probe is written from the generated execution plan before each component
928    /// step. Monitors consume it asynchronously (typically from watchdog threads) to
929    /// report the last known component/step/culist when the runtime appears stalled.
930    #[cfg(feature = "std")]
931    pub execution_probe: ExecutionProbeHandle,
932    #[cfg(not(feature = "std"))]
933    pub execution_probe: RuntimeExecutionProbe,
934
935    /// The logger for the copper lists (messages between tasks)
936    pub copperlists_manager: CopperListsManager<P, NBCL>,
937
938    /// The logger for the state of the tasks (frozen tasks)
939    pub keyframes_manager: KeyFramesManager,
940
941    /// Feature-gated container for deterministic multi-CopperList execution.
942    #[cfg(all(feature = "std", feature = "parallel-rt"))]
943    pub parallel_rt: ParallelRt<NBCL>,
944
945    /// The runtime configuration controlling the behavior of the run loop
946    pub runtime_config: RuntimeConfig,
947}
948
949/// To be able to share the clock we make the runtime a clock provider.
950impl<
951    CT,
952    CB,
953    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload,
954    M: CuMonitor,
955    const NBCL: usize,
956> ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
957{
958    fn get_clock(&self) -> RobotClock {
959        self.clock.clone()
960    }
961}
962
963impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> CuRuntime<CT, CB, P, M, NBCL> {
964    /// Returns a clone of the runtime clock handle.
965    #[inline]
966    pub fn clock(&self) -> RobotClock {
967        self.clock.clone()
968    }
969
970    /// Returns the compile-time subsystem code for this process.
971    #[inline]
972    pub fn subsystem_code(&self) -> u16 {
973        self.subsystem_code
974    }
975
976    /// Returns the configured runtime instance id for this process.
977    #[inline]
978    pub fn instance_id(&self) -> u32 {
979        self.instance_id
980    }
981}
982
983impl<
984    'cfg,
985    CT,
986    CB,
987    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
988    M: CuMonitor,
989    const NBCL: usize,
990    TI,
991    BI,
992    MI,
993    CLW,
994    KFW,
995> CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
996where
997    TI: for<'c> Fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>,
998    BI: Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
999    MI: Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1000    CLW: WriteStream<CopperList<P>> + 'static,
1001    KFW: WriteStream<KeyFrame> + 'static,
1002{
1003    pub fn build(self) -> CuResult<CuRuntime<CT, CB, P, M, NBCL>> {
1004        let Self {
1005            clock,
1006            config,
1007            mission,
1008            subsystem,
1009            instance_id,
1010            resources,
1011            parts,
1012            copperlists_logger,
1013            keyframes_logger,
1014        } = self;
1015        let mut resources =
1016            resources.ok_or_else(|| CuError::from("Resources missing from CuRuntimeBuilder"))?;
1017
1018        let graph = config.get_graph(Some(mission))?;
1019        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1020            .get_all_nodes()
1021            .iter()
1022            .map(|(_, node)| node.get_instance_config())
1023            .collect();
1024
1025        let tasks = (parts.tasks_instanciator)(all_instances_configs, &mut resources)?;
1026
1027        #[cfg(feature = "std")]
1028        let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
1029        #[cfg(not(feature = "std"))]
1030        let execution_probe = RuntimeExecutionProbe::default();
1031        let monitor_metadata = CuMonitoringMetadata::new(
1032            CompactString::from(mission),
1033            parts.monitored_components,
1034            parts.culist_component_mapping,
1035            CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
1036            build_monitor_topology(config, mission)?,
1037            None,
1038        )?
1039        .with_subsystem_id(subsystem.id())
1040        .with_instance_id(instance_id);
1041        #[cfg(feature = "std")]
1042        let monitor_runtime =
1043            CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
1044        #[cfg(not(feature = "std"))]
1045        let monitor_runtime = CuMonitoringRuntime::unavailable();
1046        let monitor = (parts.monitor_instanciator)(config, monitor_metadata, monitor_runtime);
1047        let bridges = (parts.bridges_instanciator)(config, &mut resources)?;
1048
1049        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
1050            Some(logging_config) if logging_config.enable_task_logging => (
1051                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1052                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1053                logging_config.keyframe_interval.unwrap(),
1054            ),
1055            Some(_) => (None, None, 0),
1056            None => (
1057                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1058                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1059                DEFAULT_KEYFRAME_INTERVAL,
1060            ),
1061        };
1062
1063        let copperlists_manager = CopperListsManager::new(copperlists_logger)?;
1064        #[cfg(target_os = "none")]
1065        {
1066            let cl_size = core::mem::size_of::<CopperList<P>>();
1067            let total_bytes = cl_size.saturating_mul(NBCL);
1068            info!(
1069                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
1070                NBCL, cl_size, total_bytes
1071            );
1072        }
1073
1074        let keyframes_manager = KeyFramesManager {
1075            inner: KeyFrame::new(),
1076            logger: keyframes_logger,
1077            keyframe_interval,
1078            last_encoded_bytes: 0,
1079            forced_timestamp: None,
1080            locked: false,
1081        };
1082        #[cfg(all(feature = "std", feature = "parallel-rt"))]
1083        let parallel_rt = ParallelRt::new(parts.parallel_rt_metadata)?;
1084
1085        let runtime_config = config.runtime.clone().unwrap_or_default();
1086        runtime_config.validate()?;
1087
1088        Ok(CuRuntime {
1089            subsystem_code: subsystem.code(),
1090            instance_id,
1091            tasks,
1092            bridges,
1093            resources,
1094            monitor,
1095            execution_probe,
1096            clock,
1097            copperlists_manager,
1098            keyframes_manager,
1099            #[cfg(all(feature = "std", feature = "parallel-rt"))]
1100            parallel_rt,
1101            runtime_config,
1102        })
1103    }
1104}
1105
1106/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
1107/// It is a double encapsulation: this one recording the culistid and another even in
1108/// bincode in the serialized_tasks.
1109#[derive(Clone, Encode, Decode)]
1110pub struct KeyFrame {
1111    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
1112    pub culistid: u64,
1113    // This is the timestamp when the keyframe was created, using the robot clock.
1114    pub timestamp: CuTime,
1115    // This is the bincode representation of the tuple of all the tasks.
1116    pub serialized_tasks: Vec<u8>,
1117}
1118
1119impl KeyFrame {
1120    fn new() -> Self {
1121        KeyFrame {
1122            culistid: 0,
1123            timestamp: CuTime::default(),
1124            serialized_tasks: Vec::new(),
1125        }
1126    }
1127
1128    /// This is to be able to avoid reallocations
1129    fn reset(&mut self, culistid: u64, timestamp: CuTime) {
1130        self.culistid = culistid;
1131        self.timestamp = timestamp;
1132        self.serialized_tasks.clear();
1133    }
1134
1135    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
1136    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
1137        let cfg = bincode::config::standard();
1138        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
1139        BincodeAdapter(task).encode(&mut sizer)?;
1140        let need = sizer.into_writer().bytes_written as usize;
1141
1142        let start = self.serialized_tasks.len();
1143        self.serialized_tasks.resize(start + need, 0);
1144        let mut enc =
1145            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
1146        BincodeAdapter(task).encode(&mut enc)?;
1147        Ok(need)
1148    }
1149}
1150
1151/// Identifies where the effective runtime configuration came from.
1152#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1153pub enum RuntimeLifecycleConfigSource {
1154    ProgrammaticOverride,
1155    ExternalFile,
1156    BundledDefault,
1157}
1158
1159/// Stack and process identification metadata persisted in the runtime lifecycle log.
1160#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1161pub struct RuntimeLifecycleStackInfo {
1162    pub app_name: String,
1163    pub app_version: String,
1164    pub git_commit: Option<String>,
1165    pub git_dirty: Option<bool>,
1166    pub subsystem_id: Option<String>,
1167    pub subsystem_code: u16,
1168    pub instance_id: u32,
1169}
1170
1171/// Runtime lifecycle events emitted in the dedicated lifecycle section.
1172#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1173pub enum RuntimeLifecycleEvent {
1174    Instantiated {
1175        config_source: RuntimeLifecycleConfigSource,
1176        effective_config_ron: String,
1177        stack: RuntimeLifecycleStackInfo,
1178    },
1179    MissionStarted {
1180        mission: String,
1181    },
1182    MissionStopped {
1183        mission: String,
1184        // TODO(lifecycle): replace free-form reason with a typed stop reason enum once
1185        // std/no-std behavior and panic integration are split in a follow-up PR.
1186        reason: String,
1187    },
1188    // TODO(lifecycle): wire panic hook / no_std equivalent to emit this event consistently.
1189    Panic {
1190        message: String,
1191        file: Option<String>,
1192        line: Option<u32>,
1193        column: Option<u32>,
1194    },
1195    ShutdownCompleted,
1196}
1197
1198/// One event record persisted in the `UnifiedLogType::RuntimeLifecycle` section.
1199#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1200pub struct RuntimeLifecycleRecord {
1201    pub timestamp: CuTime,
1202    pub event: RuntimeLifecycleEvent,
1203}
1204
1205impl<
1206    CT,
1207    CB,
1208    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1209    M: CuMonitor,
1210    const NBCL: usize,
1211> CuRuntime<CT, CB, P, M, NBCL>
1212{
1213    /// Records runtime execution progress in the shared probe.
1214    ///
1215    /// This is intentionally lightweight and does not call monitor callbacks.
1216    #[inline]
1217    pub fn record_execution_marker(&self, marker: ExecutionMarker) {
1218        self.execution_probe.record(marker);
1219    }
1220
1221    /// Returns a shared reference to the concrete runtime execution probe.
1222    ///
1223    /// The generated runtime uses this when it needs a uniform
1224    /// `&RuntimeExecutionProbe` view across `std` and `no_std` builds.
1225    #[inline]
1226    pub fn execution_probe_ref(&self) -> &RuntimeExecutionProbe {
1227        #[cfg(feature = "std")]
1228        {
1229            self.execution_probe.as_ref()
1230        }
1231
1232        #[cfg(not(feature = "std"))]
1233        {
1234            &self.execution_probe
1235        }
1236    }
1237
1238    // FIXME(gbin): this became REALLY ugly with no-std
1239    #[allow(clippy::too_many_arguments)]
1240    #[cfg(feature = "std")]
1241    #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new(...).")]
1242    pub fn new(
1243        clock: RobotClock,
1244        subsystem_code: u16,
1245        config: &CuConfig,
1246        mission: &str,
1247        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
1248        tasks_instanciator: impl for<'c> Fn(
1249            Vec<Option<&'c ComponentConfig>>,
1250            &mut ResourceManager,
1251        ) -> CuResult<CT>,
1252        monitored_components: &'static [MonitorComponentMetadata],
1253        culist_component_mapping: &'static [ComponentId],
1254        #[cfg(all(feature = "std", feature = "parallel-rt"))]
1255        parallel_rt_metadata: &'static ParallelRtMetadata,
1256        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1257        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1258        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1259        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1260    ) -> CuResult<Self> {
1261        let parts = CuRuntimeParts::new(
1262            tasks_instanciator,
1263            monitored_components,
1264            culist_component_mapping,
1265            #[cfg(all(feature = "std", feature = "parallel-rt"))]
1266            parallel_rt_metadata,
1267            monitor_instanciator,
1268            bridges_instanciator,
1269        );
1270        CuRuntimeBuilder::new(
1271            clock,
1272            config,
1273            mission,
1274            parts,
1275            copperlists_logger,
1276            keyframes_logger,
1277        )
1278        .with_subsystem(Subsystem::new(None, subsystem_code))
1279        .try_with_resources_instantiator(resources_instanciator)?
1280        .build()
1281    }
1282
1283    #[allow(clippy::too_many_arguments)]
1284    #[cfg(feature = "std")]
1285    #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new_with_resources(...).")]
1286    pub fn new_with_resources(
1287        clock: RobotClock,
1288        subsystem_code: u16,
1289        config: &CuConfig,
1290        mission: &str,
1291        resources: ResourceManager,
1292        tasks_instanciator: impl for<'c> Fn(
1293            Vec<Option<&'c ComponentConfig>>,
1294            &mut ResourceManager,
1295        ) -> CuResult<CT>,
1296        monitored_components: &'static [MonitorComponentMetadata],
1297        culist_component_mapping: &'static [ComponentId],
1298        #[cfg(all(feature = "std", feature = "parallel-rt"))]
1299        parallel_rt_metadata: &'static ParallelRtMetadata,
1300        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1301        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1302        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1303        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1304    ) -> CuResult<Self> {
1305        let parts = CuRuntimeParts::new(
1306            tasks_instanciator,
1307            monitored_components,
1308            culist_component_mapping,
1309            #[cfg(all(feature = "std", feature = "parallel-rt"))]
1310            parallel_rt_metadata,
1311            monitor_instanciator,
1312            bridges_instanciator,
1313        );
1314        CuRuntimeBuilder::new(
1315            clock,
1316            config,
1317            mission,
1318            parts,
1319            copperlists_logger,
1320            keyframes_logger,
1321        )
1322        .with_subsystem(Subsystem::new(None, subsystem_code))
1323        .with_resources(resources)
1324        .build()
1325    }
1326
1327    #[allow(clippy::too_many_arguments)]
1328    #[cfg(not(feature = "std"))]
1329    #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new(...).")]
1330    pub fn new(
1331        clock: RobotClock,
1332        subsystem_code: u16,
1333        config: &CuConfig,
1334        mission: &str,
1335        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
1336        tasks_instanciator: impl for<'c> Fn(
1337            Vec<Option<&'c ComponentConfig>>,
1338            &mut ResourceManager,
1339        ) -> CuResult<CT>,
1340        monitored_components: &'static [MonitorComponentMetadata],
1341        culist_component_mapping: &'static [ComponentId],
1342        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1343        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1344        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1345        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1346    ) -> CuResult<Self> {
1347        let parts = CuRuntimeParts::new(
1348            tasks_instanciator,
1349            monitored_components,
1350            culist_component_mapping,
1351            monitor_instanciator,
1352            bridges_instanciator,
1353        );
1354        CuRuntimeBuilder::new(
1355            clock,
1356            config,
1357            mission,
1358            parts,
1359            copperlists_logger,
1360            keyframes_logger,
1361        )
1362        .with_subsystem(Subsystem::new(None, subsystem_code))
1363        .try_with_resources_instantiator(resources_instanciator)?
1364        .build()
1365    }
1366
1367    #[allow(clippy::too_many_arguments)]
1368    #[cfg(not(feature = "std"))]
1369    #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new_with_resources(...).")]
1370    pub fn new_with_resources(
1371        clock: RobotClock,
1372        subsystem_code: u16,
1373        config: &CuConfig,
1374        mission: &str,
1375        resources: ResourceManager,
1376        tasks_instanciator: impl for<'c> Fn(
1377            Vec<Option<&'c ComponentConfig>>,
1378            &mut ResourceManager,
1379        ) -> CuResult<CT>,
1380        monitored_components: &'static [MonitorComponentMetadata],
1381        culist_component_mapping: &'static [ComponentId],
1382        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1383        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1384        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1385        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1386    ) -> CuResult<Self> {
1387        let parts = CuRuntimeParts::new(
1388            tasks_instanciator,
1389            monitored_components,
1390            culist_component_mapping,
1391            monitor_instanciator,
1392            bridges_instanciator,
1393        );
1394        CuRuntimeBuilder::new(
1395            clock,
1396            config,
1397            mission,
1398            parts,
1399            copperlists_logger,
1400            keyframes_logger,
1401        )
1402        .with_subsystem(Subsystem::new(None, subsystem_code))
1403        .with_resources(resources)
1404        .build()
1405    }
1406}
1407
1408/// Copper tasks can be of 3 types:
1409/// - Source: only producing output messages (usually used for drivers)
1410/// - Regular: processing input messages and producing output messages, more like compute nodes.
1411/// - Sink: only consuming input messages (usually used for actuators)
1412#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1413pub enum CuTaskType {
1414    Source,
1415    Regular,
1416    Sink,
1417}
1418
1419#[derive(Debug, Clone)]
1420pub struct CuOutputPack {
1421    pub culist_index: u32,
1422    pub msg_types: Vec<String>,
1423}
1424
1425#[derive(Debug, Clone)]
1426pub struct CuInputMsg {
1427    pub culist_index: u32,
1428    pub msg_type: String,
1429    pub src_port: usize,
1430    pub edge_id: usize,
1431}
1432
1433/// This structure represents a step in the execution plan.
1434pub struct CuExecutionStep {
1435    /// NodeId: node id of the task to execute
1436    pub node_id: NodeId,
1437    /// Node: node instance
1438    pub node: Node,
1439    /// CuTaskType: type of the task
1440    pub task_type: CuTaskType,
1441
1442    /// the indices in the copper list of the input messages and their types
1443    pub input_msg_indices_types: Vec<CuInputMsg>,
1444
1445    /// the index in the copper list of the output message and its type
1446    pub output_msg_pack: Option<CuOutputPack>,
1447}
1448
1449impl Debug for CuExecutionStep {
1450    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1451        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
1452        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
1453        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
1454        f.write_str(
1455            format!(
1456                "              input_msg_types: {:?}\n",
1457                self.input_msg_indices_types
1458            )
1459            .as_str(),
1460        )?;
1461        f.write_str(format!("       output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
1462        Ok(())
1463    }
1464}
1465
1466/// This structure represents a loop in the execution plan.
1467/// It is used to represent a sequence of Execution units (loop or steps) that are executed
1468/// multiple times.
1469/// if loop_count is None, the loop is infinite.
1470pub struct CuExecutionLoop {
1471    pub steps: Vec<CuExecutionUnit>,
1472    pub loop_count: Option<u32>,
1473}
1474
1475impl Debug for CuExecutionLoop {
1476    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1477        f.write_str("CuExecutionLoop:\n")?;
1478        for step in &self.steps {
1479            match step {
1480                CuExecutionUnit::Step(step) => {
1481                    step.fmt(f)?;
1482                }
1483                CuExecutionUnit::Loop(l) => {
1484                    l.fmt(f)?;
1485                }
1486            }
1487        }
1488
1489        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
1490        Ok(())
1491    }
1492}
1493
1494/// This structure represents a step in the execution plan.
1495#[derive(Debug)]
1496pub enum CuExecutionUnit {
1497    Step(Box<CuExecutionStep>),
1498    Loop(CuExecutionLoop),
1499}
1500
1501fn find_output_pack_from_nodeid(
1502    node_id: NodeId,
1503    steps: &Vec<CuExecutionUnit>,
1504) -> Option<CuOutputPack> {
1505    for step in steps {
1506        match step {
1507            CuExecutionUnit::Loop(loop_unit) => {
1508                if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
1509                    return Some(output_pack);
1510                }
1511            }
1512            CuExecutionUnit::Step(step) => {
1513                if step.node_id == node_id {
1514                    return step.output_msg_pack.clone();
1515                }
1516            }
1517        }
1518    }
1519    None
1520}
1521
1522pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
1523    if graph.incoming_neighbor_count(node_id) == 0 {
1524        CuTaskType::Source
1525    } else if graph.outgoing_neighbor_count(node_id) == 0 {
1526        CuTaskType::Sink
1527    } else {
1528        CuTaskType::Regular
1529    }
1530}
1531
1532/// The connection id used here is the index of the config graph edge that equates to the wanted
1533/// connection.
1534fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
1535    input_msg_indices_types.sort_by_key(|input| input.edge_id);
1536}
1537
1538fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
1539    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
1540    edge_ids.sort();
1541
1542    let mut msg_order: Vec<(usize, String)> = Vec::new();
1543    let mut record_msg = |msg: String, order: usize| {
1544        if let Some((existing_order, _)) = msg_order
1545            .iter_mut()
1546            .find(|(_, existing_msg)| *existing_msg == msg)
1547        {
1548            if order < *existing_order {
1549                *existing_order = order;
1550            }
1551            return;
1552        }
1553        msg_order.push((order, msg));
1554    };
1555
1556    for edge_id in edge_ids {
1557        if let Some(edge) = graph.edge(edge_id) {
1558            let order = if edge.order == usize::MAX {
1559                edge_id
1560            } else {
1561                edge.order
1562            };
1563            record_msg(edge.msg.clone(), order);
1564        }
1565    }
1566    if let Some(node) = graph.get_node(node_id) {
1567        for (msg, order) in node.nc_outputs_with_order() {
1568            record_msg(msg.clone(), order);
1569        }
1570    }
1571
1572    msg_order.sort_by(|(order_a, msg_a), (order_b, msg_b)| {
1573        order_a.cmp(order_b).then_with(|| msg_a.cmp(msg_b))
1574    });
1575    msg_order.into_iter().map(|(_, msg)| msg).collect()
1576}
1577/// Explores a subbranch and build the partial plan out of it.
1578fn plan_tasks_tree_branch(
1579    graph: &CuGraph,
1580    mut next_culist_output_index: u32,
1581    starting_point: NodeId,
1582    plan: &mut Vec<CuExecutionUnit>,
1583) -> (u32, bool) {
1584    #[cfg(all(feature = "std", feature = "macro_debug"))]
1585    eprintln!("-- starting branch from node {starting_point}");
1586
1587    let mut handled = false;
1588
1589    for id in graph.bfs_nodes(starting_point) {
1590        let node_ref = graph.get_node(id).unwrap();
1591        #[cfg(all(feature = "std", feature = "macro_debug"))]
1592        eprintln!("  Visiting node: {node_ref:?}");
1593
1594        let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
1595        let output_msg_pack: Option<CuOutputPack>;
1596        let task_type = find_task_type_for_id(graph, id);
1597
1598        match task_type {
1599            CuTaskType::Source => {
1600                #[cfg(all(feature = "std", feature = "macro_debug"))]
1601                eprintln!("    → Source node, assign output index {next_culist_output_index}");
1602                let msg_types = collect_output_msg_types(graph, id);
1603                if msg_types.is_empty() {
1604                    panic!(
1605                        "Source node '{}' has no outgoing connections",
1606                        node_ref.get_id()
1607                    );
1608                }
1609                output_msg_pack = Some(CuOutputPack {
1610                    culist_index: next_culist_output_index,
1611                    msg_types,
1612                });
1613                next_culist_output_index += 1;
1614            }
1615            CuTaskType::Sink => {
1616                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1617                edge_ids.sort();
1618                #[cfg(all(feature = "std", feature = "macro_debug"))]
1619                eprintln!("    → Sink with incoming edges: {edge_ids:?}");
1620                for edge_id in edge_ids {
1621                    let edge = graph
1622                        .edge(edge_id)
1623                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1624                    let pid = graph
1625                        .get_node_id_by_name(edge.src.as_str())
1626                        .unwrap_or_else(|| {
1627                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1628                        });
1629                    let output_pack = find_output_pack_from_nodeid(pid, plan);
1630                    if let Some(output_pack) = output_pack {
1631                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1632                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
1633                        let msg_type = edge.msg.as_str();
1634                        let src_port = output_pack
1635                            .msg_types
1636                            .iter()
1637                            .position(|msg| msg == msg_type)
1638                            .unwrap_or_else(|| {
1639                                panic!(
1640                                    "Missing output port for message type '{msg_type}' on node {pid}"
1641                                )
1642                            });
1643                        input_msg_indices_types.push(CuInputMsg {
1644                            culist_index: output_pack.culist_index,
1645                            msg_type: msg_type.to_string(),
1646                            src_port,
1647                            edge_id,
1648                        });
1649                    } else {
1650                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1651                        eprintln!("      ✗ Input from {pid} not ready, returning");
1652                        return (next_culist_output_index, handled);
1653                    }
1654                }
1655                output_msg_pack = Some(CuOutputPack {
1656                    culist_index: next_culist_output_index,
1657                    msg_types: Vec::from(["()".to_string()]),
1658                });
1659                next_culist_output_index += 1;
1660            }
1661            CuTaskType::Regular => {
1662                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1663                edge_ids.sort();
1664                #[cfg(all(feature = "std", feature = "macro_debug"))]
1665                eprintln!("    → Regular task with incoming edges: {edge_ids:?}");
1666                for edge_id in edge_ids {
1667                    let edge = graph
1668                        .edge(edge_id)
1669                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1670                    let pid = graph
1671                        .get_node_id_by_name(edge.src.as_str())
1672                        .unwrap_or_else(|| {
1673                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1674                        });
1675                    let output_pack = find_output_pack_from_nodeid(pid, plan);
1676                    if let Some(output_pack) = output_pack {
1677                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1678                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
1679                        let msg_type = edge.msg.as_str();
1680                        let src_port = output_pack
1681                            .msg_types
1682                            .iter()
1683                            .position(|msg| msg == msg_type)
1684                            .unwrap_or_else(|| {
1685                                panic!(
1686                                    "Missing output port for message type '{msg_type}' on node {pid}"
1687                                )
1688                            });
1689                        input_msg_indices_types.push(CuInputMsg {
1690                            culist_index: output_pack.culist_index,
1691                            msg_type: msg_type.to_string(),
1692                            src_port,
1693                            edge_id,
1694                        });
1695                    } else {
1696                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1697                        eprintln!("      ✗ Input from {pid} not ready, returning");
1698                        return (next_culist_output_index, handled);
1699                    }
1700                }
1701                let msg_types = collect_output_msg_types(graph, id);
1702                if msg_types.is_empty() {
1703                    panic!(
1704                        "Regular node '{}' has no outgoing connections",
1705                        node_ref.get_id()
1706                    );
1707                }
1708                output_msg_pack = Some(CuOutputPack {
1709                    culist_index: next_culist_output_index,
1710                    msg_types,
1711                });
1712                next_culist_output_index += 1;
1713            }
1714        }
1715
1716        sort_inputs_by_cnx_id(&mut input_msg_indices_types);
1717
1718        if let Some(pos) = plan
1719            .iter()
1720            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
1721        {
1722            #[cfg(all(feature = "std", feature = "macro_debug"))]
1723            eprintln!("    → Already in plan, modifying existing step");
1724            let mut step = plan.remove(pos);
1725            if let CuExecutionUnit::Step(ref mut s) = step {
1726                s.input_msg_indices_types = input_msg_indices_types;
1727            }
1728            plan.push(step);
1729        } else {
1730            #[cfg(all(feature = "std", feature = "macro_debug"))]
1731            eprintln!("    → New step added to plan");
1732            let step = CuExecutionStep {
1733                node_id: id,
1734                node: node_ref.clone(),
1735                task_type,
1736                input_msg_indices_types,
1737                output_msg_pack,
1738            };
1739            plan.push(CuExecutionUnit::Step(Box::new(step)));
1740        }
1741
1742        handled = true;
1743    }
1744
1745    #[cfg(all(feature = "std", feature = "macro_debug"))]
1746    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
1747    (next_culist_output_index, handled)
1748}
1749
1750/// This is the main heuristics to compute an execution plan at compilation time.
1751/// TODO(gbin): Make that heuristic pluggable.
1752pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1753    #[cfg(all(feature = "std", feature = "macro_debug"))]
1754    eprintln!("[runtime plan]");
1755    let mut plan = Vec::new();
1756    let mut next_culist_output_index = 0u32;
1757
1758    let mut queue: VecDeque<NodeId> = graph
1759        .node_ids()
1760        .into_iter()
1761        .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
1762        .collect();
1763
1764    #[cfg(all(feature = "std", feature = "macro_debug"))]
1765    eprintln!("Initial source nodes: {queue:?}");
1766
1767    while let Some(start_node) = queue.pop_front() {
1768        #[cfg(all(feature = "std", feature = "macro_debug"))]
1769        eprintln!("→ Starting BFS from source {start_node}");
1770        for node_id in graph.bfs_nodes(start_node) {
1771            let already_in_plan = plan
1772                .iter()
1773                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1774            if already_in_plan {
1775                #[cfg(all(feature = "std", feature = "macro_debug"))]
1776                eprintln!("    → Node {node_id} already planned, skipping");
1777                continue;
1778            }
1779
1780            #[cfg(all(feature = "std", feature = "macro_debug"))]
1781            eprintln!("    Planning from node {node_id}");
1782            let (new_index, handled) =
1783                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
1784            next_culist_output_index = new_index;
1785
1786            if !handled {
1787                #[cfg(all(feature = "std", feature = "macro_debug"))]
1788                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1789                continue;
1790            }
1791
1792            #[cfg(all(feature = "std", feature = "macro_debug"))]
1793            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
1794            for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1795                #[cfg(all(feature = "std", feature = "macro_debug"))]
1796                eprintln!("      → Enqueueing neighbor {neighbor}");
1797                queue.push_back(neighbor);
1798            }
1799        }
1800    }
1801
1802    let mut planned_nodes = BTreeSet::new();
1803    for unit in &plan {
1804        if let CuExecutionUnit::Step(step) = unit {
1805            planned_nodes.insert(step.node_id);
1806        }
1807    }
1808
1809    let mut missing = Vec::new();
1810    for node_id in graph.node_ids() {
1811        if !planned_nodes.contains(&node_id) {
1812            if let Some(node) = graph.get_node(node_id) {
1813                missing.push(node.get_id().to_string());
1814            } else {
1815                missing.push(format!("node_id_{node_id}"));
1816            }
1817        }
1818    }
1819
1820    if !missing.is_empty() {
1821        missing.sort();
1822        return Err(CuError::from(format!(
1823            "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1824            missing.join(", ")
1825        )));
1826    }
1827
1828    Ok(CuExecutionLoop {
1829        steps: plan,
1830        loop_count: None,
1831    })
1832}
1833
1834//tests
1835#[cfg(test)]
1836mod tests {
1837    use super::*;
1838    use crate::config::Node;
1839    use crate::context::CuContext;
1840    use crate::cutask::CuSinkTask;
1841    use crate::cutask::{CuSrcTask, Freezable};
1842    use crate::monitoring::NoMonitor;
1843    use crate::reflect::Reflect;
1844    use bincode::Encode;
1845    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1846    use serde_derive::{Deserialize, Serialize};
1847
1848    #[derive(Reflect)]
1849    pub struct TestSource {}
1850
1851    impl Freezable for TestSource {}
1852
1853    impl CuSrcTask for TestSource {
1854        type Resources<'r> = ();
1855        type Output<'m> = ();
1856        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1857        where
1858            Self: Sized,
1859        {
1860            Ok(Self {})
1861        }
1862
1863        fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1864            Ok(())
1865        }
1866    }
1867
1868    #[derive(Reflect)]
1869    pub struct TestSink {}
1870
1871    impl Freezable for TestSink {}
1872
1873    impl CuSinkTask for TestSink {
1874        type Resources<'r> = ();
1875        type Input<'m> = ();
1876
1877        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1878        where
1879            Self: Sized,
1880        {
1881            Ok(Self {})
1882        }
1883
1884        fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1885            Ok(())
1886        }
1887    }
1888
1889    // Those should be generated by the derive macro
1890    type Tasks = (TestSource, TestSink);
1891    type TestRuntime = CuRuntime<Tasks, (), Msgs, NoMonitor, 2>;
1892    const TEST_NBCL: usize = 2;
1893
1894    #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1895    struct Msgs(());
1896
1897    impl ErasedCuStampedDataSet for Msgs {
1898        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1899            Vec::new()
1900        }
1901    }
1902
1903    impl MatchingTasks for Msgs {
1904        fn get_all_task_ids() -> &'static [&'static str] {
1905            &[]
1906        }
1907    }
1908
1909    impl CuListZeroedInit for Msgs {
1910        fn init_zeroed(&mut self) {}
1911    }
1912
1913    #[cfg(feature = "std")]
1914    fn tasks_instanciator(
1915        all_instances_configs: Vec<Option<&ComponentConfig>>,
1916        _resources: &mut ResourceManager,
1917    ) -> CuResult<Tasks> {
1918        Ok((
1919            TestSource::new(all_instances_configs[0], ())?,
1920            TestSink::new(all_instances_configs[1], ())?,
1921        ))
1922    }
1923
1924    #[cfg(not(feature = "std"))]
1925    fn tasks_instanciator(
1926        all_instances_configs: Vec<Option<&ComponentConfig>>,
1927        _resources: &mut ResourceManager,
1928    ) -> CuResult<Tasks> {
1929        Ok((
1930            TestSource::new(all_instances_configs[0], ())?,
1931            TestSink::new(all_instances_configs[1], ())?,
1932        ))
1933    }
1934
1935    fn monitor_instanciator(
1936        _config: &CuConfig,
1937        metadata: CuMonitoringMetadata,
1938        runtime: CuMonitoringRuntime,
1939    ) -> NoMonitor {
1940        NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
1941    }
1942
1943    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1944        Ok(())
1945    }
1946
1947    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1948        Ok(ResourceManager::new(&[]))
1949    }
1950
1951    #[derive(Debug)]
1952    struct FakeWriter {}
1953
1954    impl<E: Encode> WriteStream<E> for FakeWriter {
1955        fn log(&mut self, _obj: &E) -> CuResult<()> {
1956            Ok(())
1957        }
1958    }
1959
1960    #[test]
1961    fn test_runtime_instantiation() {
1962        let mut config = CuConfig::default();
1963        let graph = config.get_graph_mut(None).unwrap();
1964        graph.add_node(Node::new("a", "TestSource")).unwrap();
1965        graph.add_node(Node::new("b", "TestSink")).unwrap();
1966        graph.connect(0, 1, "()").unwrap();
1967        let runtime: CuResult<TestRuntime> =
1968            CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
1969                RobotClock::default(),
1970                &config,
1971                crate::config::DEFAULT_MISSION_ID,
1972                CuRuntimeParts::new(
1973                    tasks_instanciator,
1974                    &[],
1975                    &[],
1976                    #[cfg(all(feature = "std", feature = "parallel-rt"))]
1977                    &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
1978                    monitor_instanciator,
1979                    bridges_instanciator,
1980                ),
1981                FakeWriter {},
1982                FakeWriter {},
1983            )
1984            .try_with_resources_instantiator(resources_instanciator)
1985            .and_then(|builder| builder.build());
1986        assert!(runtime.is_ok());
1987    }
1988
1989    #[test]
1990    fn test_rate_target_period_rejects_zero() {
1991        let err = rate_target_period(0).expect_err("zero rate target should fail");
1992        assert!(
1993            err.to_string()
1994                .contains("Runtime rate target cannot be zero"),
1995            "unexpected error: {err}"
1996        );
1997    }
1998
1999    #[test]
2000    fn test_loop_rate_limiter_advances_to_next_period_when_on_time() {
2001        let (clock, mock) = RobotClock::mock();
2002        let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2003        assert_eq!(limiter.next_deadline(), CuDuration::from(10_000_000));
2004
2005        mock.set_value(10_000_000);
2006        limiter.mark_tick(&clock);
2007
2008        assert_eq!(limiter.next_deadline(), CuDuration::from(20_000_000));
2009    }
2010
2011    #[test]
2012    fn test_loop_rate_limiter_skips_missed_periods_without_resetting_phase() {
2013        let (clock, mock) = RobotClock::mock();
2014        let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2015
2016        mock.set_value(35_000_000);
2017        limiter.mark_tick(&clock);
2018
2019        assert_eq!(limiter.next_deadline(), CuDuration::from(40_000_000));
2020    }
2021
2022    #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
2023    #[test]
2024    fn test_loop_rate_limiter_spin_window_is_fixed_scheduler_window() {
2025        let (clock, _) = RobotClock::mock();
2026        let limiter = LoopRateLimiter::from_rate_target_hz(1_000, &clock).unwrap();
2027        assert_eq!(limiter.spin_window(), CuDuration::from(200_000));
2028
2029        let fast = LoopRateLimiter::from_rate_target_hz(10_000, &clock).unwrap();
2030        assert_eq!(fast.spin_window(), CuDuration::from(200_000));
2031    }
2032
2033    #[cfg(not(feature = "async-cl-io"))]
2034    #[test]
2035    fn test_copperlists_manager_lifecycle() {
2036        let mut config = CuConfig::default();
2037        let graph = config.get_graph_mut(None).unwrap();
2038        graph.add_node(Node::new("a", "TestSource")).unwrap();
2039        graph.add_node(Node::new("b", "TestSink")).unwrap();
2040        graph.connect(0, 1, "()").unwrap();
2041
2042        let mut runtime: TestRuntime =
2043            CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
2044                RobotClock::default(),
2045                &config,
2046                crate::config::DEFAULT_MISSION_ID,
2047                CuRuntimeParts::new(
2048                    tasks_instanciator,
2049                    &[],
2050                    &[],
2051                    #[cfg(all(feature = "std", feature = "parallel-rt"))]
2052                    &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2053                    monitor_instanciator,
2054                    bridges_instanciator,
2055                ),
2056                FakeWriter {},
2057                FakeWriter {},
2058            )
2059            .try_with_resources_instantiator(resources_instanciator)
2060            .and_then(|builder| builder.build())
2061            .unwrap();
2062
2063        // Now emulates the generated runtime
2064        {
2065            let copperlists = &mut runtime.copperlists_manager;
2066            let culist0 = copperlists
2067                .create()
2068                .expect("Ran out of space for copper lists");
2069            let id = culist0.id;
2070            assert_eq!(id, 0);
2071            culist0.change_state(CopperListState::Processing);
2072            assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2073        }
2074
2075        {
2076            let copperlists = &mut runtime.copperlists_manager;
2077            let culist1 = copperlists
2078                .create()
2079                .expect("Ran out of space for copper lists");
2080            let id = culist1.id;
2081            assert_eq!(id, 1);
2082            culist1.change_state(CopperListState::Processing);
2083            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2084        }
2085
2086        {
2087            let copperlists = &mut runtime.copperlists_manager;
2088            let culist2 = copperlists.create();
2089            assert!(culist2.is_err());
2090            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2091            // Free in order, should let the top of the stack be serialized and freed.
2092            let _ = copperlists.end_of_processing(1);
2093            assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2094        }
2095
2096        // Readd a CL
2097        {
2098            let copperlists = &mut runtime.copperlists_manager;
2099            let culist2 = copperlists
2100                .create()
2101                .expect("Ran out of space for copper lists");
2102            let id = culist2.id;
2103            assert_eq!(id, 2);
2104            culist2.change_state(CopperListState::Processing);
2105            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2106            // Free out of order, the #0 first
2107            let _ = copperlists.end_of_processing(0);
2108            // Should not free up the top of the stack
2109            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2110
2111            // Free up the top of the stack
2112            let _ = copperlists.end_of_processing(2);
2113            // This should free up 2 CLs
2114
2115            assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2116        }
2117    }
2118
2119    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2120    #[derive(Debug, Default)]
2121    struct RecordingWriter {
2122        ids: Arc<Mutex<Vec<u64>>>,
2123    }
2124
2125    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2126    impl WriteStream<CopperList<Msgs>> for RecordingWriter {
2127        fn log(&mut self, culist: &CopperList<Msgs>) -> CuResult<()> {
2128            self.ids.lock().unwrap().push(culist.id);
2129            std::thread::sleep(std::time::Duration::from_millis(2));
2130            Ok(())
2131        }
2132    }
2133
2134    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2135    #[test]
2136    fn test_async_copperlists_manager_flushes_in_order() {
2137        let ids = Arc::new(Mutex::new(Vec::new()));
2138        let mut copperlists = CopperListsManager::<Msgs, 4>::new(Some(Box::new(RecordingWriter {
2139            ids: ids.clone(),
2140        })))
2141        .unwrap();
2142
2143        for expected_id in 0..4 {
2144            let culist = copperlists.create().unwrap();
2145            assert_eq!(culist.id, expected_id);
2146            culist.change_state(CopperListState::Processing);
2147            copperlists.end_of_processing(expected_id).unwrap();
2148        }
2149
2150        copperlists.finish_pending().unwrap();
2151        assert_eq!(copperlists.available_copper_lists().unwrap(), 4);
2152        assert_eq!(*ids.lock().unwrap(), vec![0, 1, 2, 3]);
2153    }
2154
2155    #[test]
2156    fn test_runtime_task_input_order() {
2157        let mut config = CuConfig::default();
2158        let graph = config.get_graph_mut(None).unwrap();
2159        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
2160        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
2161        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
2162
2163        assert_eq!(src1_id, 0);
2164        assert_eq!(src2_id, 1);
2165
2166        // note that the source2 connection is before the source1
2167        let src1_type = "src1_type";
2168        let src2_type = "src2_type";
2169        graph.connect(src2_id, sink_id, src2_type).unwrap();
2170        graph.connect(src1_id, sink_id, src1_type).unwrap();
2171
2172        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
2173        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
2174        // the edge id depends on the order the connection is created, not
2175        // on the node id, and that is what determines the input order
2176        assert_eq!(src1_edge_id, 1);
2177        assert_eq!(src2_edge_id, 0);
2178
2179        let runtime = compute_runtime_plan(graph).unwrap();
2180        let sink_step = runtime
2181            .steps
2182            .iter()
2183            .find_map(|step| match step {
2184                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
2185                _ => None,
2186            })
2187            .unwrap();
2188
2189        // since the src2 connection was added before src1 connection, the src2 type should be
2190        // first
2191        assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
2192        assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
2193    }
2194
2195    #[test]
2196    fn test_runtime_output_ports_unique_ordered() {
2197        let mut config = CuConfig::default();
2198        let graph = config.get_graph_mut(None).unwrap();
2199        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2200        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2201        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2202        let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
2203        let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
2204
2205        graph.connect(src_id, dst_a_id, "msg::A").unwrap();
2206        graph.connect(src_id, dst_b_id, "msg::B").unwrap();
2207        graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
2208        graph.connect(src_id, dst_c_id, "msg::C").unwrap();
2209
2210        let runtime = compute_runtime_plan(graph).unwrap();
2211        let src_step = runtime
2212            .steps
2213            .iter()
2214            .find_map(|step| match step {
2215                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2216                _ => None,
2217            })
2218            .unwrap();
2219
2220        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2221        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
2222
2223        let dst_a_step = runtime
2224            .steps
2225            .iter()
2226            .find_map(|step| match step {
2227                CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
2228                _ => None,
2229            })
2230            .unwrap();
2231        let dst_b_step = runtime
2232            .steps
2233            .iter()
2234            .find_map(|step| match step {
2235                CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
2236                _ => None,
2237            })
2238            .unwrap();
2239        let dst_a2_step = runtime
2240            .steps
2241            .iter()
2242            .find_map(|step| match step {
2243                CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
2244                _ => None,
2245            })
2246            .unwrap();
2247        let dst_c_step = runtime
2248            .steps
2249            .iter()
2250            .find_map(|step| match step {
2251                CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
2252                _ => None,
2253            })
2254            .unwrap();
2255
2256        assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
2257        assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
2258        assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
2259        assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
2260    }
2261
2262    #[test]
2263    fn test_runtime_output_ports_fanout_single() {
2264        let mut config = CuConfig::default();
2265        let graph = config.get_graph_mut(None).unwrap();
2266        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2267        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2268        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2269
2270        graph.connect(src_id, dst_a_id, "i32").unwrap();
2271        graph.connect(src_id, dst_b_id, "i32").unwrap();
2272
2273        let runtime = compute_runtime_plan(graph).unwrap();
2274        let src_step = runtime
2275            .steps
2276            .iter()
2277            .find_map(|step| match step {
2278                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2279                _ => None,
2280            })
2281            .unwrap();
2282
2283        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2284        assert_eq!(output_pack.msg_types, vec!["i32"]);
2285    }
2286
2287    #[test]
2288    fn test_runtime_output_ports_include_nc_outputs() {
2289        let mut config = CuConfig::default();
2290        let graph = config.get_graph_mut(None).unwrap();
2291        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2292        let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
2293        graph.connect(src_id, dst_id, "msg::A").unwrap();
2294        graph
2295            .get_node_mut(src_id)
2296            .expect("missing source node")
2297            .add_nc_output("msg::B", usize::MAX);
2298
2299        let runtime = compute_runtime_plan(graph).unwrap();
2300        let src_step = runtime
2301            .steps
2302            .iter()
2303            .find_map(|step| match step {
2304                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2305                _ => None,
2306            })
2307            .unwrap();
2308        let dst_step = runtime
2309            .steps
2310            .iter()
2311            .find_map(|step| match step {
2312                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2313                _ => None,
2314            })
2315            .unwrap();
2316
2317        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2318        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2319        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
2320    }
2321
2322    #[test]
2323    fn test_runtime_output_ports_respect_connection_order_with_nc() {
2324        let txt = r#"(
2325            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2326            cnx: [
2327                (src: "src", dst: "__nc__", msg: "msg::A"),
2328                (src: "src", dst: "sink", msg: "msg::B"),
2329            ]
2330        )"#;
2331        let config = CuConfig::deserialize_ron(txt).unwrap();
2332        let graph = config.get_graph(None).unwrap();
2333        let src_id = graph.get_node_id_by_name("src").unwrap();
2334        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2335
2336        let runtime = compute_runtime_plan(graph).unwrap();
2337        let src_step = runtime
2338            .steps
2339            .iter()
2340            .find_map(|step| match step {
2341                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2342                _ => None,
2343            })
2344            .unwrap();
2345        let dst_step = runtime
2346            .steps
2347            .iter()
2348            .find_map(|step| match step {
2349                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2350                _ => None,
2351            })
2352            .unwrap();
2353
2354        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2355        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2356        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2357    }
2358
2359    #[cfg(feature = "std")]
2360    #[test]
2361    fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
2362        let txt = r#"(
2363            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2364            cnx: [
2365                (src: "src", dst: "__nc__", msg: "msg::A"),
2366                (src: "src", dst: "sink", msg: "msg::B"),
2367            ]
2368        )"#;
2369        let tmp = tempfile::NamedTempFile::new().unwrap();
2370        std::fs::write(tmp.path(), txt).unwrap();
2371        let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
2372        let graph = config.get_graph(None).unwrap();
2373        let src_id = graph.get_node_id_by_name("src").unwrap();
2374        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2375
2376        let runtime = compute_runtime_plan(graph).unwrap();
2377        let src_step = runtime
2378            .steps
2379            .iter()
2380            .find_map(|step| match step {
2381                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2382                _ => None,
2383            })
2384            .unwrap();
2385        let dst_step = runtime
2386            .steps
2387            .iter()
2388            .find_map(|step| match step {
2389                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2390                _ => None,
2391            })
2392            .unwrap();
2393
2394        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2395        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2396        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2397    }
2398
2399    #[test]
2400    fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
2401        let txt = r#"(
2402            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2403            cnx: [
2404                (src: "src", dst: "__nc__", msg: "i32"),
2405                (src: "src", dst: "sink", msg: "bool"),
2406            ]
2407        )"#;
2408        let config = CuConfig::deserialize_ron(txt).unwrap();
2409        let graph = config.get_graph(None).unwrap();
2410        let src_id = graph.get_node_id_by_name("src").unwrap();
2411        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2412
2413        let runtime = compute_runtime_plan(graph).unwrap();
2414        let src_step = runtime
2415            .steps
2416            .iter()
2417            .find_map(|step| match step {
2418                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2419                _ => None,
2420            })
2421            .unwrap();
2422        let dst_step = runtime
2423            .steps
2424            .iter()
2425            .find_map(|step| match step {
2426                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2427                _ => None,
2428            })
2429            .unwrap();
2430
2431        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2432        assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
2433        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2434    }
2435
2436    #[test]
2437    fn test_runtime_plan_diamond_case1() {
2438        // more complex topology that tripped the scheduler
2439        let mut config = CuConfig::default();
2440        let graph = config.get_graph_mut(None).unwrap();
2441        let cam0_id = graph
2442            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2443            .unwrap();
2444        let inf0_id = graph
2445            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2446            .unwrap();
2447        let broadcast_id = graph
2448            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2449            .unwrap();
2450
2451        // case 1 order
2452        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2453        graph.connect(cam0_id, inf0_id, "i32").unwrap();
2454        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2455
2456        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2457        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
2458
2459        assert_eq!(edge_cam0_to_inf0, 0);
2460        assert_eq!(edge_cam0_to_broadcast, 1);
2461
2462        let runtime = compute_runtime_plan(graph).unwrap();
2463        let broadcast_step = runtime
2464            .steps
2465            .iter()
2466            .find_map(|step| match step {
2467                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2468                _ => None,
2469            })
2470            .unwrap();
2471
2472        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2473        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2474    }
2475
2476    #[test]
2477    fn test_runtime_plan_diamond_case2() {
2478        // more complex topology that tripped the scheduler variation 2
2479        let mut config = CuConfig::default();
2480        let graph = config.get_graph_mut(None).unwrap();
2481        let cam0_id = graph
2482            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2483            .unwrap();
2484        let inf0_id = graph
2485            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2486            .unwrap();
2487        let broadcast_id = graph
2488            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2489            .unwrap();
2490
2491        // case 2 order
2492        graph.connect(cam0_id, inf0_id, "i32").unwrap();
2493        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2494        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2495
2496        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2497        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
2498
2499        assert_eq!(edge_cam0_to_broadcast, 0);
2500        assert_eq!(edge_cam0_to_inf0, 1);
2501
2502        let runtime = compute_runtime_plan(graph).unwrap();
2503        let broadcast_step = runtime
2504            .steps
2505            .iter()
2506            .find_map(|step| match step {
2507                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2508                _ => None,
2509            })
2510            .unwrap();
2511
2512        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2513        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2514    }
2515}