Skip to main content

cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::app::Subsystem;
6use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node, TaskKind};
7use crate::config::{
8    CuConfig, CuGraph, MAX_RATE_TARGET_HZ, NodeId, RuntimeConfig, resolve_task_kind_for_id,
9};
10use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
11use crate::cutask::{BincodeAdapter, Freezable};
12#[cfg(feature = "std")]
13use crate::monitoring::ExecutionProbeHandle;
14#[cfg(feature = "std")]
15use crate::monitoring::MonitorExecutionProbe;
16use crate::monitoring::{
17    ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
18    ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
19    take_last_completed_handle_bytes,
20};
21#[cfg(all(feature = "std", feature = "parallel-rt"))]
22use crate::parallel_rt::{ParallelRt, ParallelRtMetadata};
23use crate::resource::ResourceManager;
24use compact_str::CompactString;
25use cu29_clock::{ClockProvider, CuDuration, CuTime, RobotClock};
26use cu29_traits::CuResult;
27use cu29_traits::WriteStream;
28use cu29_traits::{CopperListTuple, CuError};
29
30#[cfg(target_os = "none")]
31#[allow(unused_imports)]
32use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
33#[cfg(target_os = "none")]
34#[allow(unused_imports)]
35use cu29_log_derive::info;
36#[cfg(target_os = "none")]
37#[allow(unused_imports)]
38use cu29_log_runtime::log;
39#[cfg(all(target_os = "none", debug_assertions))]
40#[allow(unused_imports)]
41use cu29_log_runtime::log_debug_mode;
42#[cfg(target_os = "none")]
43#[allow(unused_imports)]
44use cu29_value::to_value;
45
46#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
47use alloc::alloc::{alloc_zeroed, handle_alloc_error};
48use alloc::boxed::Box;
49use alloc::collections::{BTreeSet, VecDeque};
50use alloc::format;
51use alloc::string::{String, ToString};
52use alloc::vec::Vec;
53use bincode::enc::EncoderImpl;
54use bincode::enc::write::{SizeWriter, SliceWriter};
55use bincode::error::EncodeError;
56use bincode::{Decode, Encode};
57#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
58use core::alloc::Layout;
59use core::fmt::Result as FmtResult;
60use core::fmt::{Debug, Formatter};
61use core::marker::PhantomData;
62
63#[cfg(all(feature = "std", feature = "async-cl-io"))]
64use std::sync::mpsc::{Receiver, SyncSender, TryRecvError, sync_channel};
65#[cfg(all(feature = "std", feature = "async-cl-io"))]
66use std::thread::JoinHandle;
67
68#[doc(hidden)]
69pub type TasksInstantiator<CT> =
70    for<'c> fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>;
71#[doc(hidden)]
72pub type BridgesInstantiator<CB> = fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>;
73#[doc(hidden)]
74pub type MonitorInstantiator<M> = fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M;
75
76#[doc(hidden)]
77pub struct CuRuntimeParts<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI> {
78    pub tasks_instanciator: TI,
79    pub monitored_components: &'static [MonitorComponentMetadata],
80    pub culist_component_mapping: &'static [ComponentId],
81    #[cfg(all(feature = "std", feature = "parallel-rt"))]
82    pub parallel_rt_metadata: &'static ParallelRtMetadata,
83    pub monitor_instanciator: MI,
84    pub bridges_instanciator: BI,
85    _payload: PhantomData<(CT, CB, P, M, [(); NBCL])>,
86}
87
88impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI>
89    CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>
90{
91    pub const fn new(
92        tasks_instanciator: TI,
93        monitored_components: &'static [MonitorComponentMetadata],
94        culist_component_mapping: &'static [ComponentId],
95        #[cfg(all(feature = "std", feature = "parallel-rt"))]
96        parallel_rt_metadata: &'static ParallelRtMetadata,
97        monitor_instanciator: MI,
98        bridges_instanciator: BI,
99    ) -> Self {
100        Self {
101            tasks_instanciator,
102            monitored_components,
103            culist_component_mapping,
104            #[cfg(all(feature = "std", feature = "parallel-rt"))]
105            parallel_rt_metadata,
106            monitor_instanciator,
107            bridges_instanciator,
108            _payload: PhantomData,
109        }
110    }
111}
112
113#[doc(hidden)]
114pub struct CuRuntimeBuilder<
115    'cfg,
116    CT,
117    CB,
118    P: CopperListTuple,
119    M: CuMonitor,
120    const NBCL: usize,
121    TI,
122    BI,
123    MI,
124    CLW,
125    KFW,
126> {
127    clock: RobotClock,
128    config: &'cfg CuConfig,
129    mission: &'cfg str,
130    subsystem: Subsystem,
131    instance_id: u32,
132    resources: Option<ResourceManager>,
133    parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
134    copperlists_logger: CLW,
135    keyframes_logger: KFW,
136}
137
138impl<'cfg, CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI, CLW, KFW>
139    CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
140{
141    pub fn new(
142        clock: RobotClock,
143        config: &'cfg CuConfig,
144        mission: &'cfg str,
145        parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
146        copperlists_logger: CLW,
147        keyframes_logger: KFW,
148    ) -> Self {
149        Self {
150            clock,
151            config,
152            mission,
153            subsystem: Subsystem::new(None, 0),
154            instance_id: 0,
155            resources: None,
156            parts,
157            copperlists_logger,
158            keyframes_logger,
159        }
160    }
161
162    pub fn with_subsystem(mut self, subsystem: Subsystem) -> Self {
163        self.subsystem = subsystem;
164        self
165    }
166
167    pub fn with_instance_id(mut self, instance_id: u32) -> Self {
168        self.instance_id = instance_id;
169        self
170    }
171
172    pub fn with_resources(mut self, resources: ResourceManager) -> Self {
173        self.resources = Some(resources);
174        self
175    }
176
177    pub fn try_with_resources_instantiator(
178        mut self,
179        resources_instantiator: impl FnOnce(&CuConfig) -> CuResult<ResourceManager>,
180    ) -> CuResult<Self> {
181        self.resources = Some(resources_instantiator(self.config)?);
182        Ok(self)
183    }
184}
185
186/// Returns a monotonic instant used for local runtime performance timing.
187///
188/// When `sysclock-perf` (and `std`) are enabled this uses a process-local
189/// `RobotClock::new()` instance for timing. The returned value is a
190/// monotonically increasing duration since an unspecified origin (typically
191/// process or runtime initialization), not a wall-clock time-of-day. When
192/// `sysclock-perf` is disabled it delegates to the provided `RobotClock`.
193///
194/// This is intentionally separate from `LoopRateLimiter`, which always uses the
195/// provided `RobotClock` so `runtime.rate_target_hz` stays tied to robot time.
196#[inline]
197pub fn perf_now(_clock: &RobotClock) -> CuTime {
198    #[cfg(all(feature = "std", feature = "sysclock-perf"))]
199    {
200        static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
201        return PERF_CLOCK.get_or_init(RobotClock::new).now();
202    }
203
204    #[allow(unreachable_code)]
205    _clock.now()
206}
207
208#[cfg(all(feature = "std", feature = "high-precision-limiter"))]
209const HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS: u64 = 200_000;
210
211/// Convert a configured runtime rate target to an integer-nanosecond period.
212#[inline]
213pub fn rate_target_period(rate_target_hz: u64) -> CuResult<CuDuration> {
214    if rate_target_hz == 0 {
215        return Err(CuError::from(
216            "Runtime rate target cannot be zero. Set runtime.rate_target_hz to at least 1.",
217        ));
218    }
219
220    if rate_target_hz > MAX_RATE_TARGET_HZ {
221        return Err(CuError::from(format!(
222            "Runtime rate target ({rate_target_hz} Hz) exceeds the supported maximum of {MAX_RATE_TARGET_HZ} Hz."
223        )));
224    }
225
226    Ok(CuDuration::from(MAX_RATE_TARGET_HZ / rate_target_hz))
227}
228
229/// Runtime loop limiter that preserves phase with absolute deadlines.
230///
231/// This is intentionally a small runtime helper so generated applications do
232/// not have to open-code loop scheduling policy. Deadlines are tracked against
233/// the provided `RobotClock`, even when `sysclock-perf` is enabled for
234/// process-time measurements.
235#[derive(Clone, Copy, Debug, PartialEq, Eq)]
236pub struct LoopRateLimiter {
237    period: CuDuration,
238    next_deadline: CuTime,
239}
240
241impl LoopRateLimiter {
242    #[inline]
243    pub fn from_rate_target_hz(rate_target_hz: u64, clock: &RobotClock) -> CuResult<Self> {
244        let period = rate_target_period(rate_target_hz)?;
245        Ok(Self {
246            period,
247            next_deadline: clock.now() + period,
248        })
249    }
250
251    #[inline]
252    pub fn is_ready(&self, clock: &RobotClock) -> bool {
253        self.remaining(clock).is_none()
254    }
255
256    #[inline]
257    pub fn remaining(&self, clock: &RobotClock) -> Option<CuDuration> {
258        let now = clock.now();
259        if now < self.next_deadline {
260            Some(self.next_deadline - now)
261        } else {
262            None
263        }
264    }
265
266    #[inline]
267    pub fn wait_until_ready(&self, clock: &RobotClock) {
268        let deadline = self.next_deadline;
269        let Some(remaining) = self.remaining(clock) else {
270            return;
271        };
272
273        #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
274        {
275            let spin_window = self.spin_window();
276            if remaining > spin_window {
277                std::thread::sleep(std::time::Duration::from(remaining - spin_window));
278            }
279            while clock.now() < deadline {
280                core::hint::spin_loop();
281            }
282        }
283
284        #[cfg(all(feature = "std", not(feature = "high-precision-limiter")))]
285        {
286            let _ = deadline;
287            std::thread::sleep(std::time::Duration::from(remaining));
288        }
289
290        #[cfg(not(feature = "std"))]
291        {
292            let _ = remaining;
293            while clock.now() < deadline {
294                core::hint::spin_loop();
295            }
296        }
297    }
298
299    #[inline]
300    pub fn mark_tick(&mut self, clock: &RobotClock) {
301        self.advance_from(clock.now());
302    }
303
304    #[inline]
305    pub fn limit(&mut self, clock: &RobotClock) {
306        self.wait_until_ready(clock);
307        self.mark_tick(clock);
308    }
309
310    #[inline]
311    fn advance_from(&mut self, now: CuTime) {
312        let steps = if now < self.next_deadline {
313            1
314        } else {
315            (now - self.next_deadline).as_nanos() / self.period.as_nanos() + 1
316        };
317        self.next_deadline += steps * self.period;
318    }
319
320    #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
321    #[inline]
322    fn spin_window(&self) -> CuDuration {
323        let _ = self.period;
324        CuDuration::from(HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS)
325    }
326
327    #[cfg(test)]
328    #[inline]
329    fn next_deadline(&self) -> CuTime {
330        self.next_deadline
331    }
332}
333
334#[cfg(all(feature = "std", feature = "async-cl-io"))]
335#[doc(hidden)]
336pub trait AsyncCopperListPayload: Send {}
337
338#[cfg(all(feature = "std", feature = "async-cl-io"))]
339impl<T: Send> AsyncCopperListPayload for T {}
340
341#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
342#[doc(hidden)]
343pub trait AsyncCopperListPayload {}
344
345#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
346impl<T> AsyncCopperListPayload for T {}
347
348/// Control-flow result returned by one generated process stage.
349///
350/// `AbortCopperList` preserves the current runtime semantics for monitor
351/// decisions that abort the current CopperList without shutting the runtime
352/// down. The outer driver remains responsible for ordered cleanup and log
353/// handoff.
354#[derive(Clone, Copy, Debug, PartialEq, Eq)]
355#[doc(hidden)]
356pub enum ProcessStepOutcome {
357    Continue,
358    AbortCopperList,
359}
360
361/// Result type used by generated process-step functions.
362#[doc(hidden)]
363pub type ProcessStepResult = CuResult<ProcessStepOutcome>;
364
365#[cfg(feature = "remote-debug")]
366fn encode_completed_copperlist_snapshot<P: CopperListTuple>(
367    cl: &CopperList<P>,
368) -> CuResult<Vec<u8>> {
369    bincode::encode_to_vec(cl, bincode::config::standard())
370        .map_err(|e| CuError::new_with_cause("Failed to encode completed CopperList snapshot", e))
371}
372
373/// Manages the lifecycle of the copper lists and logging on the synchronous path.
374#[doc(hidden)]
375pub struct SyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
376    inner: CuListsManager<P, NBCL>,
377    /// Logger for the copper lists (messages between tasks)
378    logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
379    /// Remote-debug snapshot of the most recently completed CopperList.
380    #[cfg(feature = "remote-debug")]
381    last_completed_encoded: Option<Vec<u8>>,
382    /// Last encoded size returned by logger.log
383    pub last_encoded_bytes: u64,
384    /// Last handle-backed payload bytes observed during logger.log
385    pub last_handle_bytes: u64,
386}
387
388impl<P: CopperListTuple + Default, const NBCL: usize> SyncCopperListsManager<P, NBCL> {
389    pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
390    where
391        P: CuListZeroedInit,
392    {
393        Ok(Self {
394            inner: CuListsManager::new(),
395            logger,
396            #[cfg(feature = "remote-debug")]
397            last_completed_encoded: None,
398            last_encoded_bytes: 0,
399            last_handle_bytes: 0,
400        })
401    }
402
403    pub fn next_cl_id(&self) -> u64 {
404        self.inner.next_cl_id()
405    }
406
407    pub fn last_cl_id(&self) -> u64 {
408        self.inner.last_cl_id()
409    }
410
411    pub fn peek(&self) -> Option<&CopperList<P>> {
412        self.inner.peek()
413    }
414
415    #[cfg(feature = "remote-debug")]
416    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
417        self.last_completed_encoded.as_deref()
418    }
419
420    #[cfg(not(feature = "remote-debug"))]
421    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
422        None
423    }
424
425    #[cfg(feature = "remote-debug")]
426    pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
427        self.last_completed_encoded = snapshot;
428    }
429
430    #[cfg(not(feature = "remote-debug"))]
431    pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
432
433    pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
434        self.inner
435            .create()
436            .ok_or_else(|| CuError::from("Ran out of space for copper lists"))
437    }
438
439    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
440        let mut is_top = true;
441        let mut nb_done = 0;
442        self.last_handle_bytes = 0;
443        #[cfg(feature = "remote-debug")]
444        let last_completed_encoded = &mut self.last_completed_encoded;
445        for cl in self.inner.iter_mut() {
446            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
447                cl.change_state(CopperListState::DoneProcessing);
448                #[cfg(feature = "remote-debug")]
449                {
450                    *last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
451                }
452            }
453            if is_top && cl.get_state() == CopperListState::DoneProcessing {
454                if let Some(logger) = &mut self.logger {
455                    cl.change_state(CopperListState::BeingSerialized);
456                    logger.log(cl)?;
457                    self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
458                    self.last_handle_bytes = take_last_completed_handle_bytes();
459                }
460                cl.change_state(CopperListState::Free);
461                nb_done += 1;
462            } else {
463                is_top = false;
464            }
465        }
466        for _ in 0..nb_done {
467            let _ = self.inner.pop();
468        }
469        Ok(())
470    }
471
472    pub fn finish_pending(&mut self) -> CuResult<()> {
473        Ok(())
474    }
475
476    pub fn available_copper_lists(&mut self) -> CuResult<usize> {
477        Ok(NBCL - self.inner.len())
478    }
479
480    #[cfg(feature = "std")]
481    pub fn end_of_processing_boxed(
482        &mut self,
483        mut culist: Box<CopperList<P>>,
484    ) -> CuResult<OwnedCopperListSubmission<P>> {
485        culist.change_state(CopperListState::DoneProcessing);
486        self.last_encoded_bytes = 0;
487        self.last_handle_bytes = 0;
488        if let Some(logger) = &mut self.logger {
489            culist.change_state(CopperListState::BeingSerialized);
490            logger.log(&culist)?;
491            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
492            self.last_handle_bytes = take_last_completed_handle_bytes();
493        }
494        culist.change_state(CopperListState::Free);
495        Ok(OwnedCopperListSubmission::Recycled(culist))
496    }
497
498    #[cfg(feature = "std")]
499    pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
500        Ok(None)
501    }
502
503    #[cfg(feature = "std")]
504    pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
505        Err(CuError::from(
506            "Synchronous CopperList I/O cannot block waiting for boxed completions",
507        ))
508    }
509
510    #[cfg(feature = "std")]
511    pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
512        Ok(Vec::new())
513    }
514}
515
516/// Result of handing an owned boxed CopperList to the runtime-side CL I/O path.
517#[cfg(feature = "std")]
518#[doc(hidden)]
519pub enum OwnedCopperListSubmission<P: CopperListTuple> {
520    /// The CL has been fully handled and can be recycled immediately by the caller.
521    Recycled(Box<CopperList<P>>),
522    /// The CL was queued asynchronously and will be returned by a later reclaim call.
523    Pending,
524}
525
526#[cfg(all(feature = "std", feature = "async-cl-io"))]
527struct AsyncCopperListCompletion<P: CopperListTuple> {
528    culist: Box<CopperList<P>>,
529    log_result: CuResult<(u64, u64)>,
530}
531
532#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
533fn allocate_zeroed_copperlist<P>() -> Box<CopperList<P>>
534where
535    P: CopperListTuple + CuListZeroedInit,
536{
537    // SAFETY: We allocate zeroed memory and immediately initialize required fields.
538    let mut culist = unsafe {
539        let layout = Layout::new::<CopperList<P>>();
540        let ptr = alloc_zeroed(layout) as *mut CopperList<P>;
541        if ptr.is_null() {
542            handle_alloc_error(layout);
543        }
544        Box::from_raw(ptr)
545    };
546    culist.msgs.init_zeroed();
547    culist
548}
549
550#[cfg(all(feature = "std", feature = "parallel-rt"))]
551pub fn allocate_boxed_copperlists<P, const NBCL: usize>() -> Vec<Box<CopperList<P>>>
552where
553    P: CopperListTuple + CuListZeroedInit,
554{
555    let mut free_pool = Vec::with_capacity(NBCL);
556    for _ in 0..NBCL {
557        free_pool.push(allocate_zeroed_copperlist::<P>());
558    }
559    free_pool
560}
561
562/// Manages the lifecycle of the copper lists and logging on the asynchronous path.
563#[cfg(all(feature = "std", feature = "async-cl-io"))]
564#[doc(hidden)]
565pub struct AsyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
566    free_pool: Vec<Box<CopperList<P>>>,
567    current: Option<Box<CopperList<P>>>,
568    #[cfg(feature = "remote-debug")]
569    last_completed_encoded: Option<Vec<u8>>,
570    pending_count: usize,
571    next_cl_id: u64,
572    pending_sender: Option<SyncSender<Box<CopperList<P>>>>,
573    completion_receiver: Option<Receiver<AsyncCopperListCompletion<P>>>,
574    worker_handle: Option<JoinHandle<()>>,
575    /// Last encoded size returned by logger.log
576    pub last_encoded_bytes: u64,
577    /// Last handle-backed payload bytes observed during logger.log
578    pub last_handle_bytes: u64,
579}
580
581#[cfg(all(feature = "std", feature = "async-cl-io"))]
582impl<P: CopperListTuple + Default, const NBCL: usize> AsyncCopperListsManager<P, NBCL> {
583    pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
584    where
585        P: CuListZeroedInit + AsyncCopperListPayload + 'static,
586    {
587        let mut free_pool = Vec::with_capacity(NBCL);
588        for _ in 0..NBCL {
589            free_pool.push(allocate_zeroed_copperlist::<P>());
590        }
591
592        let (pending_sender, completion_receiver, worker_handle) = if let Some(mut logger) = logger
593        {
594            let (pending_sender, pending_receiver) = sync_channel::<Box<CopperList<P>>>(NBCL);
595            let (completion_sender, completion_receiver) =
596                sync_channel::<AsyncCopperListCompletion<P>>(NBCL);
597            let worker_handle = std::thread::Builder::new()
598                .name("cu-async-cl-io".to_string())
599                .spawn(move || {
600                    while let Ok(mut culist) = pending_receiver.recv() {
601                        culist.change_state(CopperListState::BeingSerialized);
602                        let log_result = logger.log(&culist).map(|_| {
603                            (
604                                logger.last_log_bytes().unwrap_or(0) as u64,
605                                take_last_completed_handle_bytes(),
606                            )
607                        });
608                        let should_stop = log_result.is_err();
609                        if completion_sender
610                            .send(AsyncCopperListCompletion { culist, log_result })
611                            .is_err()
612                        {
613                            break;
614                        }
615                        if should_stop {
616                            break;
617                        }
618                    }
619                })
620                .map_err(|e| {
621                    CuError::from("Failed to spawn async CopperList serializer thread")
622                        .add_cause(e.to_string().as_str())
623                })?;
624            (
625                Some(pending_sender),
626                Some(completion_receiver),
627                Some(worker_handle),
628            )
629        } else {
630            (None, None, None)
631        };
632
633        Ok(Self {
634            free_pool,
635            current: None,
636            #[cfg(feature = "remote-debug")]
637            last_completed_encoded: None,
638            pending_count: 0,
639            next_cl_id: 0,
640            pending_sender,
641            completion_receiver,
642            worker_handle,
643            last_encoded_bytes: 0,
644            last_handle_bytes: 0,
645        })
646    }
647
648    pub fn next_cl_id(&self) -> u64 {
649        self.next_cl_id
650    }
651
652    pub fn last_cl_id(&self) -> u64 {
653        self.next_cl_id.saturating_sub(1)
654    }
655
656    pub fn peek(&self) -> Option<&CopperList<P>> {
657        self.current.as_deref()
658    }
659
660    #[cfg(feature = "remote-debug")]
661    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
662        self.last_completed_encoded.as_deref()
663    }
664
665    #[cfg(not(feature = "remote-debug"))]
666    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
667        None
668    }
669
670    #[cfg(feature = "remote-debug")]
671    pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
672        self.last_completed_encoded = snapshot;
673    }
674
675    #[cfg(not(feature = "remote-debug"))]
676    pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
677
678    pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
679        if self.current.is_some() {
680            return Err(CuError::from(
681                "Attempted to create a CopperList while another one is still active",
682            ));
683        }
684
685        self.reclaim_completed()?;
686        while self.free_pool.is_empty() {
687            self.wait_for_completion()?;
688        }
689
690        let culist = self
691            .free_pool
692            .pop()
693            .ok_or_else(|| CuError::from("Ran out of space for copper lists"))?;
694        self.current = Some(culist);
695
696        let current = self
697            .current
698            .as_mut()
699            .expect("current CopperList is missing");
700        current.id = self.next_cl_id;
701        current.change_state(CopperListState::Initialized);
702        self.next_cl_id += 1;
703        Ok(current.as_mut())
704    }
705
706    #[cfg(feature = "remote-debug")]
707    fn capture_completed_snapshot(&mut self, cl: &CopperList<P>) -> CuResult<()> {
708        self.last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
709        Ok(())
710    }
711
712    #[cfg(not(feature = "remote-debug"))]
713    fn capture_completed_snapshot(&mut self, _cl: &CopperList<P>) -> CuResult<()> {
714        Ok(())
715    }
716
717    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
718        self.reclaim_completed()?;
719
720        let mut culist = self.current.take().ok_or_else(|| {
721            CuError::from("Attempted to finish processing without an active CopperList")
722        })?;
723
724        if culist.id != culistid {
725            return Err(CuError::from(format!(
726                "Attempted to finish CopperList #{culistid} while CopperList #{} is active",
727                culist.id
728            )));
729        }
730
731        culist.change_state(CopperListState::DoneProcessing);
732        self.capture_completed_snapshot(&culist)?;
733        self.last_encoded_bytes = 0;
734        self.last_handle_bytes = 0;
735
736        if let Some(pending_sender) = &self.pending_sender {
737            culist.change_state(CopperListState::QueuedForSerialization);
738            pending_sender.send(culist).map_err(|e| {
739                CuError::from("Failed to enqueue CopperList for async serialization")
740                    .add_cause(e.to_string().as_str())
741            })?;
742            self.pending_count += 1;
743            self.reclaim_completed()?;
744        } else {
745            culist.change_state(CopperListState::Free);
746            self.free_pool.push(culist);
747        }
748
749        Ok(())
750    }
751
752    pub fn finish_pending(&mut self) -> CuResult<()> {
753        if self.current.is_some() {
754            return Err(CuError::from(
755                "Cannot flush CopperList I/O while a CopperList is still active",
756            ));
757        }
758
759        while self.pending_count > 0 {
760            self.wait_for_completion()?;
761        }
762        Ok(())
763    }
764
765    pub fn available_copper_lists(&mut self) -> CuResult<usize> {
766        self.reclaim_completed()?;
767        Ok(self.free_pool.len())
768    }
769
770    pub fn end_of_processing_boxed(
771        &mut self,
772        mut culist: Box<CopperList<P>>,
773    ) -> CuResult<OwnedCopperListSubmission<P>> {
774        self.reclaim_completed()?;
775        culist.change_state(CopperListState::DoneProcessing);
776        self.capture_completed_snapshot(&culist)?;
777        self.last_encoded_bytes = 0;
778        self.last_handle_bytes = 0;
779
780        if let Some(pending_sender) = &self.pending_sender {
781            culist.change_state(CopperListState::QueuedForSerialization);
782            pending_sender.send(culist).map_err(|e| {
783                CuError::from("Failed to enqueue CopperList for async serialization")
784                    .add_cause(e.to_string().as_str())
785            })?;
786            self.pending_count += 1;
787            self.reclaim_completed()?;
788            Ok(OwnedCopperListSubmission::Pending)
789        } else {
790            culist.change_state(CopperListState::Free);
791            Ok(OwnedCopperListSubmission::Recycled(culist))
792        }
793    }
794
795    pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
796        let recv_result = {
797            let Some(completion_receiver) = self.completion_receiver.as_ref() else {
798                return Ok(None);
799            };
800            completion_receiver.try_recv()
801        };
802        match recv_result {
803            Ok(completion) => self.handle_completion(completion).map(Some),
804            Err(TryRecvError::Empty) => Ok(None),
805            Err(TryRecvError::Disconnected) => Err(CuError::from(
806                "Async CopperList serializer thread disconnected unexpectedly",
807            )),
808        }
809    }
810
811    pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
812        let completion = self
813            .completion_receiver
814            .as_ref()
815            .ok_or_else(|| {
816                CuError::from("No async CopperList serializer is active to return a free slot")
817            })?
818            .recv()
819            .map_err(|e| {
820                CuError::from("Failed to receive completion from async CopperList serializer")
821                    .add_cause(e.to_string().as_str())
822            })?;
823        self.handle_completion(completion)
824    }
825
826    pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
827        let mut reclaimed = Vec::with_capacity(self.pending_count);
828        if self.current.is_some() {
829            return Err(CuError::from(
830                "Cannot flush CopperList I/O while a CopperList is still active",
831            ));
832        }
833        while self.pending_count > 0 {
834            reclaimed.push(self.wait_reclaim_boxed()?);
835        }
836        Ok(reclaimed)
837    }
838
839    fn reclaim_completed(&mut self) -> CuResult<()> {
840        loop {
841            let Some(culist) = self.try_reclaim_boxed()? else {
842                break;
843            };
844            self.free_pool.push(culist);
845        }
846        Ok(())
847    }
848
849    fn wait_for_completion(&mut self) -> CuResult<()> {
850        let culist = self.wait_reclaim_boxed()?;
851        self.free_pool.push(culist);
852        Ok(())
853    }
854
855    fn handle_completion(
856        &mut self,
857        mut completion: AsyncCopperListCompletion<P>,
858    ) -> CuResult<Box<CopperList<P>>> {
859        self.pending_count = self.pending_count.saturating_sub(1);
860        if let Ok((encoded_bytes, handle_bytes)) = completion.log_result.as_ref() {
861            self.last_encoded_bytes = *encoded_bytes;
862            self.last_handle_bytes = *handle_bytes;
863        }
864        completion.culist.change_state(CopperListState::Free);
865        completion.log_result?;
866        Ok(completion.culist)
867    }
868
869    fn shutdown_worker(&mut self) -> CuResult<()> {
870        self.finish_pending()?;
871        self.pending_sender.take();
872        if let Some(worker_handle) = self.worker_handle.take() {
873            worker_handle.join().map_err(|_| {
874                CuError::from("Async CopperList serializer thread panicked while joining")
875            })?;
876        }
877        Ok(())
878    }
879}
880
881#[cfg(all(feature = "std", feature = "async-cl-io"))]
882impl<P: CopperListTuple + Default, const NBCL: usize> Drop for AsyncCopperListsManager<P, NBCL> {
883    fn drop(&mut self) {
884        let _ = self.shutdown_worker();
885    }
886}
887
888#[cfg(all(feature = "std", feature = "async-cl-io"))]
889#[doc(hidden)]
890pub type CopperListsManager<P, const NBCL: usize> = AsyncCopperListsManager<P, NBCL>;
891
892#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
893#[doc(hidden)]
894pub type CopperListsManager<P, const NBCL: usize> = SyncCopperListsManager<P, NBCL>;
895
896/// Manages the frozen tasks state and logging.
897pub struct KeyFramesManager {
898    /// Where the serialized tasks are stored following the wave of execution of a CL.
899    inner: KeyFrame,
900
901    /// Optional override for the timestamp to stamp the next keyframe (used by deterministic replay).
902    forced_timestamp: Option<CuTime>,
903
904    /// If set, reuse this keyframe verbatim (e.g., during replay) instead of re-freezing state.
905    locked: bool,
906
907    /// Logger for the state of the tasks (frozen tasks)
908    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
909
910    /// Capture a keyframe only each...
911    keyframe_interval: u32,
912
913    /// Bytes written by the last keyframe log
914    pub last_encoded_bytes: u64,
915}
916
917impl KeyFramesManager {
918    fn is_keyframe(&self, culistid: u64) -> bool {
919        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
920    }
921
922    #[inline]
923    pub fn captures_keyframe(&self, culistid: u64) -> bool {
924        self.is_keyframe(culistid)
925    }
926
927    pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
928        if self.is_keyframe(culistid) {
929            // If a recorded keyframe was preloaded for this CL, keep it as-is.
930            if self.locked && self.inner.culistid == culistid {
931                return;
932            }
933            let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
934            self.inner.reset(culistid, ts);
935            self.locked = false;
936        }
937    }
938
939    /// Force the timestamp of the next keyframe to a given value.
940    #[cfg(feature = "std")]
941    pub fn set_forced_timestamp(&mut self, ts: CuTime) {
942        self.forced_timestamp = Some(ts);
943    }
944
945    pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
946        if self.is_keyframe(culistid) {
947            if self.locked {
948                // We are replaying a recorded keyframe verbatim; don't mutate it.
949                return Ok(0);
950            }
951            if self.inner.culistid != culistid {
952                return Err(CuError::from(format!(
953                    "Freezing task for culistid {} but current keyframe is {}",
954                    culistid, self.inner.culistid
955                )));
956            }
957            self.inner
958                .add_frozen_task(task)
959                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
960        } else {
961            Ok(0)
962        }
963    }
964
965    /// Generic helper to freeze any `Freezable` state (task or bridge) into the current keyframe.
966    pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
967        self.freeze_task(culistid, item)
968    }
969
970    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
971        if self.is_keyframe(culistid) {
972            let logger = self.logger.as_mut().unwrap();
973            logger.log(&self.inner)?;
974            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
975            // Clear the lock so the next CL can rebuild normally unless re-locked.
976            self.locked = false;
977            Ok(())
978        } else {
979            // Not a keyframe for this CL; ensure we don't carry stale sizes forward.
980            self.last_encoded_bytes = 0;
981            Ok(())
982        }
983    }
984
985    /// Preload a recorded keyframe so it is logged verbatim on the matching CL.
986    #[cfg(feature = "std")]
987    pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
988        self.inner = keyframe.clone();
989        self.forced_timestamp = Some(keyframe.timestamp);
990        self.locked = true;
991    }
992}
993
994/// This is the main structure that will be injected as a member of the Application struct.
995/// CT is the tuple of all the tasks in order of execution.
996/// CL is the type of the copper list, representing the input/output messages for all the tasks.
997pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
998    /// The base clock the runtime will be using to record time.
999    clock: RobotClock,
1000
1001    /// Compile-time subsystem identity for this Copper process.
1002    subsystem_code: u16,
1003
1004    /// Deployment/runtime instance identity for this Copper process.
1005    #[doc(hidden)]
1006    pub instance_id: u32,
1007
1008    /// The tuple of all the tasks in order of execution.
1009    #[doc(hidden)]
1010    pub tasks: CT,
1011
1012    /// Tuple of all instantiated bridges.
1013    #[doc(hidden)]
1014    pub bridges: CB,
1015
1016    /// Resource registry kept alive for tasks borrowing shared handles.
1017    #[doc(hidden)]
1018    pub resources: ResourceManager,
1019
1020    /// The runtime monitoring.
1021    #[doc(hidden)]
1022    pub monitor: M,
1023
1024    /// Runtime-side execution progress probe for watchdog/diagnostic monitors.
1025    ///
1026    /// This probe is written from the generated execution plan before each component
1027    /// step. Monitors consume it asynchronously (typically from watchdog threads) to
1028    /// report the last known component/step/culist when the runtime appears stalled.
1029    #[cfg(feature = "std")]
1030    #[doc(hidden)]
1031    pub execution_probe: ExecutionProbeHandle,
1032    #[cfg(not(feature = "std"))]
1033    #[doc(hidden)]
1034    pub execution_probe: RuntimeExecutionProbe,
1035
1036    /// The logger for the copper lists (messages between tasks)
1037    #[doc(hidden)]
1038    pub copperlists_manager: CopperListsManager<P, NBCL>,
1039
1040    /// The logger for the state of the tasks (frozen tasks)
1041    #[doc(hidden)]
1042    pub keyframes_manager: KeyFramesManager,
1043
1044    /// Feature-gated container for deterministic multi-CopperList execution.
1045    #[cfg(all(feature = "std", feature = "parallel-rt"))]
1046    #[doc(hidden)]
1047    pub parallel_rt: ParallelRt<NBCL>,
1048
1049    /// The runtime configuration controlling the behavior of the run loop
1050    #[doc(hidden)]
1051    pub runtime_config: RuntimeConfig,
1052}
1053
1054/// To be able to share the clock we make the runtime a clock provider.
1055impl<
1056    CT,
1057    CB,
1058    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload,
1059    M: CuMonitor,
1060    const NBCL: usize,
1061> ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
1062{
1063    fn get_clock(&self) -> RobotClock {
1064        self.clock.clone()
1065    }
1066}
1067
1068impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> CuRuntime<CT, CB, P, M, NBCL> {
1069    /// Returns a clone of the runtime clock handle.
1070    #[inline]
1071    pub fn clock(&self) -> RobotClock {
1072        self.clock.clone()
1073    }
1074
1075    /// Returns the runtime clock by reference for generated runtime code.
1076    #[doc(hidden)]
1077    #[inline]
1078    pub fn clock_ref(&self) -> &RobotClock {
1079        &self.clock
1080    }
1081
1082    /// Returns the compile-time subsystem code for this process.
1083    #[inline]
1084    pub fn subsystem_code(&self) -> u16 {
1085        self.subsystem_code
1086    }
1087
1088    /// Returns the configured runtime instance id for this process.
1089    #[inline]
1090    pub fn instance_id(&self) -> u32 {
1091        self.instance_id
1092    }
1093}
1094
1095impl<
1096    'cfg,
1097    CT,
1098    CB,
1099    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1100    M: CuMonitor,
1101    const NBCL: usize,
1102    TI,
1103    BI,
1104    MI,
1105    CLW,
1106    KFW,
1107> CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
1108where
1109    TI: for<'c> Fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>,
1110    BI: Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1111    MI: Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1112    CLW: WriteStream<CopperList<P>> + 'static,
1113    KFW: WriteStream<KeyFrame> + 'static,
1114{
1115    pub fn build(self) -> CuResult<CuRuntime<CT, CB, P, M, NBCL>> {
1116        let Self {
1117            clock,
1118            config,
1119            mission,
1120            subsystem,
1121            instance_id,
1122            resources,
1123            parts,
1124            copperlists_logger,
1125            keyframes_logger,
1126        } = self;
1127        let mut resources =
1128            resources.ok_or_else(|| CuError::from("Resources missing from CuRuntimeBuilder"))?;
1129
1130        let graph = config.get_graph(Some(mission))?;
1131        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1132            .get_all_nodes()
1133            .iter()
1134            .map(|(_, node)| node.get_instance_config())
1135            .collect();
1136
1137        let tasks = (parts.tasks_instanciator)(all_instances_configs, &mut resources)?;
1138
1139        #[cfg(feature = "std")]
1140        let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
1141        #[cfg(not(feature = "std"))]
1142        let execution_probe = RuntimeExecutionProbe::default();
1143        let monitor_metadata = CuMonitoringMetadata::new(
1144            CompactString::from(mission),
1145            parts.monitored_components,
1146            parts.culist_component_mapping,
1147            CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
1148            build_monitor_topology(config, mission)?,
1149            None,
1150        )?
1151        .with_subsystem_id(subsystem.id())
1152        .with_instance_id(instance_id);
1153        #[cfg(feature = "std")]
1154        let monitor_runtime =
1155            CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
1156        #[cfg(not(feature = "std"))]
1157        let monitor_runtime = CuMonitoringRuntime::unavailable();
1158        let monitor = (parts.monitor_instanciator)(config, monitor_metadata, monitor_runtime);
1159        let bridges = (parts.bridges_instanciator)(config, &mut resources)?;
1160
1161        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
1162            Some(logging_config) if logging_config.enable_task_logging => (
1163                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1164                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1165                logging_config.keyframe_interval.unwrap(),
1166            ),
1167            Some(_) => (None, None, 0),
1168            None => (
1169                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1170                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1171                DEFAULT_KEYFRAME_INTERVAL,
1172            ),
1173        };
1174
1175        let copperlists_manager = CopperListsManager::new(copperlists_logger)?;
1176        #[cfg(target_os = "none")]
1177        {
1178            let cl_size = core::mem::size_of::<CopperList<P>>();
1179            let total_bytes = cl_size.saturating_mul(NBCL);
1180            info!(
1181                "CuRuntimeBuilder: copperlists count={} cl_size={} total_bytes={}",
1182                NBCL, cl_size, total_bytes
1183            );
1184        }
1185
1186        let keyframes_manager = KeyFramesManager {
1187            inner: KeyFrame::new(),
1188            logger: keyframes_logger,
1189            keyframe_interval,
1190            last_encoded_bytes: 0,
1191            forced_timestamp: None,
1192            locked: false,
1193        };
1194        #[cfg(all(feature = "std", feature = "parallel-rt"))]
1195        let parallel_rt = ParallelRt::new(parts.parallel_rt_metadata)?;
1196
1197        let runtime_config = config.runtime.clone().unwrap_or_default();
1198        runtime_config.validate()?;
1199
1200        Ok(CuRuntime {
1201            subsystem_code: subsystem.code(),
1202            instance_id,
1203            tasks,
1204            bridges,
1205            resources,
1206            monitor,
1207            execution_probe,
1208            clock,
1209            copperlists_manager,
1210            keyframes_manager,
1211            #[cfg(all(feature = "std", feature = "parallel-rt"))]
1212            parallel_rt,
1213            runtime_config,
1214        })
1215    }
1216}
1217
1218/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
1219/// It is a double encapsulation: this one recording the culistid and another even in
1220/// bincode in the serialized_tasks.
1221#[derive(Clone, Encode, Decode)]
1222pub struct KeyFrame {
1223    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
1224    pub culistid: u64,
1225    // This is the timestamp when the keyframe was created, using the robot clock.
1226    pub timestamp: CuTime,
1227    // This is the bincode representation of the tuple of all the tasks.
1228    pub serialized_tasks: Vec<u8>,
1229}
1230
1231impl KeyFrame {
1232    fn new() -> Self {
1233        KeyFrame {
1234            culistid: 0,
1235            timestamp: CuTime::default(),
1236            serialized_tasks: Vec::new(),
1237        }
1238    }
1239
1240    /// This is to be able to avoid reallocations
1241    fn reset(&mut self, culistid: u64, timestamp: CuTime) {
1242        self.culistid = culistid;
1243        self.timestamp = timestamp;
1244        self.serialized_tasks.clear();
1245    }
1246
1247    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
1248    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
1249        let cfg = bincode::config::standard();
1250        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
1251        BincodeAdapter(task).encode(&mut sizer)?;
1252        let need = sizer.into_writer().bytes_written as usize;
1253
1254        let start = self.serialized_tasks.len();
1255        self.serialized_tasks.resize(start + need, 0);
1256        let mut enc =
1257            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
1258        BincodeAdapter(task).encode(&mut enc)?;
1259        Ok(need)
1260    }
1261}
1262
1263/// Identifies where the effective runtime configuration came from.
1264#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1265pub enum RuntimeLifecycleConfigSource {
1266    ProgrammaticOverride,
1267    ExternalFile,
1268    BundledDefault,
1269}
1270
1271/// Stack and process identification metadata persisted in the runtime lifecycle log.
1272#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1273pub struct RuntimeLifecycleStackInfo {
1274    pub app_name: String,
1275    pub app_version: String,
1276    pub git_commit: Option<String>,
1277    pub git_dirty: Option<bool>,
1278    pub subsystem_id: Option<String>,
1279    pub subsystem_code: u16,
1280    pub instance_id: u32,
1281}
1282
1283/// Runtime lifecycle events emitted in the dedicated lifecycle section.
1284#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1285pub enum RuntimeLifecycleEvent {
1286    Instantiated {
1287        config_source: RuntimeLifecycleConfigSource,
1288        effective_config_ron: String,
1289        stack: RuntimeLifecycleStackInfo,
1290    },
1291    MissionStarted {
1292        mission: String,
1293    },
1294    MissionStopped {
1295        mission: String,
1296        // TODO(lifecycle): replace free-form reason with a typed stop reason enum once
1297        // std/no-std behavior and panic integration are split in a follow-up PR.
1298        reason: String,
1299    },
1300    // TODO(lifecycle): wire panic hook / no_std equivalent to emit this event consistently.
1301    Panic {
1302        message: String,
1303        file: Option<String>,
1304        line: Option<u32>,
1305        column: Option<u32>,
1306    },
1307    ShutdownCompleted,
1308}
1309
1310/// One event record persisted in the `UnifiedLogType::RuntimeLifecycle` section.
1311#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1312pub struct RuntimeLifecycleRecord {
1313    pub timestamp: CuTime,
1314    pub event: RuntimeLifecycleEvent,
1315}
1316
1317impl<
1318    CT,
1319    CB,
1320    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1321    M: CuMonitor,
1322    const NBCL: usize,
1323> CuRuntime<CT, CB, P, M, NBCL>
1324{
1325    /// Records runtime execution progress in the shared probe.
1326    ///
1327    /// This is intentionally lightweight and does not call monitor callbacks.
1328    #[inline]
1329    pub fn record_execution_marker(&self, marker: ExecutionMarker) {
1330        self.execution_probe.record(marker);
1331    }
1332
1333    /// Returns a shared reference to the concrete runtime execution probe.
1334    ///
1335    /// The generated runtime uses this when it needs a uniform
1336    /// `&RuntimeExecutionProbe` view across `std` and `no_std` builds.
1337    #[inline]
1338    pub fn execution_probe_ref(&self) -> &RuntimeExecutionProbe {
1339        #[cfg(feature = "std")]
1340        {
1341            self.execution_probe.as_ref()
1342        }
1343
1344        #[cfg(not(feature = "std"))]
1345        {
1346            &self.execution_probe
1347        }
1348    }
1349}
1350
1351/// Copper tasks can be of 3 types:
1352/// - Source: only producing output messages (usually used for drivers)
1353/// - Regular: processing input messages and producing output messages, more like compute nodes.
1354/// - Sink: only consuming input messages (usually used for actuators)
1355#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1356pub enum CuTaskType {
1357    Source,
1358    Regular,
1359    Sink,
1360}
1361
1362impl From<TaskKind> for CuTaskType {
1363    fn from(value: TaskKind) -> Self {
1364        match value {
1365            TaskKind::Source => CuTaskType::Source,
1366            TaskKind::Regular => CuTaskType::Regular,
1367            TaskKind::Sink => CuTaskType::Sink,
1368        }
1369    }
1370}
1371
1372#[derive(Debug, Clone)]
1373pub struct CuOutputPack {
1374    pub culist_index: u32,
1375    pub msg_types: Vec<String>,
1376}
1377
1378#[derive(Debug, Clone)]
1379pub struct CuInputMsg {
1380    pub culist_index: u32,
1381    pub msg_type: String,
1382    pub src_port: usize,
1383    pub edge_id: usize,
1384    pub connection_order: usize,
1385}
1386
1387/// This structure represents a step in the execution plan.
1388pub struct CuExecutionStep {
1389    /// NodeId: node id of the task to execute
1390    pub node_id: NodeId,
1391    /// Node: node instance
1392    pub node: Node,
1393    /// CuTaskType: type of the task
1394    pub task_type: CuTaskType,
1395
1396    /// the indices in the copper list of the input messages and their types
1397    pub input_msg_indices_types: Vec<CuInputMsg>,
1398
1399    /// the index in the copper list of the output message and its type
1400    pub output_msg_pack: Option<CuOutputPack>,
1401}
1402
1403impl Debug for CuExecutionStep {
1404    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1405        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
1406        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
1407        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
1408        f.write_str(
1409            format!(
1410                "              input_msg_types: {:?}\n",
1411                self.input_msg_indices_types
1412            )
1413            .as_str(),
1414        )?;
1415        f.write_str(format!("       output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
1416        Ok(())
1417    }
1418}
1419
1420/// This structure represents a loop in the execution plan.
1421/// It is used to represent a sequence of Execution units (loop or steps) that are executed
1422/// multiple times.
1423/// if loop_count is None, the loop is infinite.
1424pub struct CuExecutionLoop {
1425    pub steps: Vec<CuExecutionUnit>,
1426    pub loop_count: Option<u32>,
1427}
1428
1429impl Debug for CuExecutionLoop {
1430    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1431        f.write_str("CuExecutionLoop:\n")?;
1432        for step in &self.steps {
1433            match step {
1434                CuExecutionUnit::Step(step) => {
1435                    step.fmt(f)?;
1436                }
1437                CuExecutionUnit::Loop(l) => {
1438                    l.fmt(f)?;
1439                }
1440            }
1441        }
1442
1443        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
1444        Ok(())
1445    }
1446}
1447
1448/// This structure represents a step in the execution plan.
1449#[derive(Debug)]
1450pub enum CuExecutionUnit {
1451    Step(Box<CuExecutionStep>),
1452    Loop(CuExecutionLoop),
1453}
1454
1455fn find_output_pack_from_nodeid(
1456    node_id: NodeId,
1457    steps: &Vec<CuExecutionUnit>,
1458) -> Option<CuOutputPack> {
1459    for step in steps {
1460        match step {
1461            CuExecutionUnit::Loop(loop_unit) => {
1462                if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
1463                    return Some(output_pack);
1464                }
1465            }
1466            CuExecutionUnit::Step(step) if step.node_id == node_id => {
1467                return step.output_msg_pack.clone();
1468            }
1469            _ => {}
1470        }
1471    }
1472    None
1473}
1474
1475pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuResult<CuTaskType> {
1476    let node = graph
1477        .get_node(node_id)
1478        .ok_or_else(|| CuError::from(format!("Node id {node_id} not found")))?;
1479
1480    if node.get_flavor() == crate::config::Flavor::Task {
1481        return resolve_task_kind_for_id(graph, node_id).map(Into::into);
1482    }
1483
1484    let has_inputs = !graph.get_dst_edges(node_id)?.is_empty();
1485    let has_outputs = !graph.get_src_edges(node_id)?.is_empty();
1486    Ok(match (has_inputs, has_outputs) {
1487        (false, true) => CuTaskType::Source,
1488        (true, false) => CuTaskType::Sink,
1489        _ => CuTaskType::Regular,
1490    })
1491}
1492
1493/// Preserve the original serialized connection order across missions.
1494///
1495/// Edge ids are assigned per mission graph, so they are not stable enough to describe a shared
1496/// input layout when missions selectively include connections.
1497fn sort_inputs_by_connection_order(input_msg_indices_types: &mut [CuInputMsg]) {
1498    input_msg_indices_types.sort_by_key(|input| input.connection_order);
1499}
1500
1501/// Explores a subbranch and build the partial plan out of it.
1502fn plan_tasks_tree_branch(
1503    graph: &CuGraph,
1504    mut next_culist_output_index: u32,
1505    starting_point: NodeId,
1506    plan: &mut Vec<CuExecutionUnit>,
1507) -> CuResult<(u32, bool)> {
1508    #[cfg(all(feature = "std", feature = "macro_debug"))]
1509    eprintln!("-- starting branch from node {starting_point}");
1510
1511    let mut handled = false;
1512
1513    for id in graph.bfs_nodes(starting_point) {
1514        let node_ref = graph.get_node(id).unwrap();
1515        #[cfg(all(feature = "std", feature = "macro_debug"))]
1516        eprintln!("  Visiting node: {node_ref:?}");
1517
1518        let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
1519        let output_msg_pack: Option<CuOutputPack>;
1520        let task_type = find_task_type_for_id(graph, id)?;
1521
1522        match task_type {
1523            CuTaskType::Source => {
1524                #[cfg(all(feature = "std", feature = "macro_debug"))]
1525                eprintln!("    → Source node, assign output index {next_culist_output_index}");
1526                let msg_types = graph.get_node_output_msg_types_by_id(id)?;
1527                if msg_types.is_empty() {
1528                    return Err(CuError::from(format!(
1529                        "Source node '{}' has no declared outputs",
1530                        node_ref.get_id()
1531                    )));
1532                }
1533                output_msg_pack = Some(CuOutputPack {
1534                    culist_index: next_culist_output_index,
1535                    msg_types,
1536                });
1537                next_culist_output_index += 1;
1538            }
1539            CuTaskType::Sink => {
1540                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1541                edge_ids.sort();
1542                #[cfg(all(feature = "std", feature = "macro_debug"))]
1543                eprintln!("    → Sink with incoming edges: {edge_ids:?}");
1544                for edge_id in edge_ids {
1545                    let edge = graph
1546                        .edge(edge_id)
1547                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1548                    let pid = graph
1549                        .get_node_id_by_name(edge.src.as_str())
1550                        .unwrap_or_else(|| {
1551                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1552                        });
1553                    let output_pack = find_output_pack_from_nodeid(pid, plan);
1554                    if let Some(output_pack) = output_pack {
1555                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1556                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
1557                        let msg_type = edge.msg.as_str();
1558                        let src_port = output_pack
1559                            .msg_types
1560                            .iter()
1561                            .position(|msg| msg == msg_type)
1562                            .unwrap_or_else(|| {
1563                                panic!(
1564                                    "Missing output port for message type '{msg_type}' on node {pid}"
1565                                )
1566                            });
1567                        input_msg_indices_types.push(CuInputMsg {
1568                            culist_index: output_pack.culist_index,
1569                            msg_type: msg_type.to_string(),
1570                            src_port,
1571                            edge_id,
1572                            connection_order: edge.order,
1573                        });
1574                    } else {
1575                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1576                        eprintln!("      ✗ Input from {pid} not ready, returning");
1577                        return Ok((next_culist_output_index, handled));
1578                    }
1579                }
1580                output_msg_pack = Some(CuOutputPack {
1581                    culist_index: next_culist_output_index,
1582                    msg_types: Vec::from(["()".to_string()]),
1583                });
1584                next_culist_output_index += 1;
1585            }
1586            CuTaskType::Regular => {
1587                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1588                edge_ids.sort();
1589                #[cfg(all(feature = "std", feature = "macro_debug"))]
1590                eprintln!("    → Regular task with incoming edges: {edge_ids:?}");
1591                for edge_id in edge_ids {
1592                    let edge = graph
1593                        .edge(edge_id)
1594                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1595                    let pid = graph
1596                        .get_node_id_by_name(edge.src.as_str())
1597                        .unwrap_or_else(|| {
1598                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1599                        });
1600                    let output_pack = find_output_pack_from_nodeid(pid, plan);
1601                    if let Some(output_pack) = output_pack {
1602                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1603                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
1604                        let msg_type = edge.msg.as_str();
1605                        let src_port = output_pack
1606                            .msg_types
1607                            .iter()
1608                            .position(|msg| msg == msg_type)
1609                            .unwrap_or_else(|| {
1610                                panic!(
1611                                    "Missing output port for message type '{msg_type}' on node {pid}"
1612                                )
1613                            });
1614                        input_msg_indices_types.push(CuInputMsg {
1615                            culist_index: output_pack.culist_index,
1616                            msg_type: msg_type.to_string(),
1617                            src_port,
1618                            edge_id,
1619                            connection_order: edge.order,
1620                        });
1621                    } else {
1622                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1623                        eprintln!("      ✗ Input from {pid} not ready, returning");
1624                        return Ok((next_culist_output_index, handled));
1625                    }
1626                }
1627                let msg_types = graph.get_node_output_msg_types_by_id(id)?;
1628                if msg_types.is_empty() {
1629                    return Err(CuError::from(format!(
1630                        "Regular node '{}' has no declared outputs",
1631                        node_ref.get_id()
1632                    )));
1633                }
1634                output_msg_pack = Some(CuOutputPack {
1635                    culist_index: next_culist_output_index,
1636                    msg_types,
1637                });
1638                next_culist_output_index += 1;
1639            }
1640        }
1641
1642        sort_inputs_by_connection_order(&mut input_msg_indices_types);
1643
1644        if let Some(pos) = plan
1645            .iter()
1646            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
1647        {
1648            #[cfg(all(feature = "std", feature = "macro_debug"))]
1649            eprintln!("    → Already in plan, modifying existing step");
1650            let mut step = plan.remove(pos);
1651            if let CuExecutionUnit::Step(ref mut s) = step {
1652                s.input_msg_indices_types = input_msg_indices_types;
1653            }
1654            plan.push(step);
1655        } else {
1656            #[cfg(all(feature = "std", feature = "macro_debug"))]
1657            eprintln!("    → New step added to plan");
1658            let step = CuExecutionStep {
1659                node_id: id,
1660                node: node_ref.clone(),
1661                task_type,
1662                input_msg_indices_types,
1663                output_msg_pack,
1664            };
1665            plan.push(CuExecutionUnit::Step(Box::new(step)));
1666        }
1667
1668        handled = true;
1669    }
1670
1671    #[cfg(all(feature = "std", feature = "macro_debug"))]
1672    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
1673    Ok((next_culist_output_index, handled))
1674}
1675
1676/// This is the main heuristics to compute an execution plan at compilation time.
1677/// TODO(gbin): Make that heuristic pluggable.
1678pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1679    #[cfg(all(feature = "std", feature = "macro_debug"))]
1680    eprintln!("[runtime plan]");
1681    let mut plan = Vec::new();
1682    let mut next_culist_output_index = 0u32;
1683
1684    let mut queue: VecDeque<NodeId> = VecDeque::new();
1685    for node_id in graph.node_ids() {
1686        if find_task_type_for_id(graph, node_id)? == CuTaskType::Source {
1687            queue.push_back(node_id);
1688        }
1689    }
1690
1691    #[cfg(all(feature = "std", feature = "macro_debug"))]
1692    eprintln!("Initial source nodes: {queue:?}");
1693
1694    while let Some(start_node) = queue.pop_front() {
1695        #[cfg(all(feature = "std", feature = "macro_debug"))]
1696        eprintln!("→ Starting BFS from source {start_node}");
1697        for node_id in graph.bfs_nodes(start_node) {
1698            let already_in_plan = plan
1699                .iter()
1700                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1701            if already_in_plan {
1702                #[cfg(all(feature = "std", feature = "macro_debug"))]
1703                eprintln!("    → Node {node_id} already planned, skipping");
1704                continue;
1705            }
1706
1707            #[cfg(all(feature = "std", feature = "macro_debug"))]
1708            eprintln!("    Planning from node {node_id}");
1709            let (new_index, handled) =
1710                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan)?;
1711            next_culist_output_index = new_index;
1712
1713            if !handled {
1714                #[cfg(all(feature = "std", feature = "macro_debug"))]
1715                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1716                continue;
1717            }
1718
1719            #[cfg(all(feature = "std", feature = "macro_debug"))]
1720            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
1721            for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1722                #[cfg(all(feature = "std", feature = "macro_debug"))]
1723                eprintln!("      → Enqueueing neighbor {neighbor}");
1724                queue.push_back(neighbor);
1725            }
1726        }
1727    }
1728
1729    let mut planned_nodes = BTreeSet::new();
1730    for unit in &plan {
1731        if let CuExecutionUnit::Step(step) = unit {
1732            planned_nodes.insert(step.node_id);
1733        }
1734    }
1735
1736    let mut missing = Vec::new();
1737    for node_id in graph.node_ids() {
1738        if !planned_nodes.contains(&node_id) {
1739            if let Some(node) = graph.get_node(node_id) {
1740                missing.push(node.get_id().to_string());
1741            } else {
1742                missing.push(format!("node_id_{node_id}"));
1743            }
1744        }
1745    }
1746
1747    if !missing.is_empty() {
1748        missing.sort();
1749        return Err(CuError::from(format!(
1750            "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1751            missing.join(", ")
1752        )));
1753    }
1754
1755    Ok(CuExecutionLoop {
1756        steps: plan,
1757        loop_count: None,
1758    })
1759}
1760
1761//tests
1762#[cfg(test)]
1763mod tests {
1764    use super::*;
1765    use crate::config::Node;
1766    use crate::context::CuContext;
1767    use crate::cutask::CuSinkTask;
1768    use crate::cutask::{CuSrcTask, Freezable};
1769    use crate::monitoring::NoMonitor;
1770    use crate::reflect::Reflect;
1771    use bincode::Encode;
1772    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1773    use serde_derive::{Deserialize, Serialize};
1774
1775    #[derive(Reflect)]
1776    pub struct TestSource {}
1777
1778    impl Freezable for TestSource {}
1779
1780    impl CuSrcTask for TestSource {
1781        type Resources<'r> = ();
1782        type Output<'m> = ();
1783        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1784        where
1785            Self: Sized,
1786        {
1787            Ok(Self {})
1788        }
1789
1790        fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1791            Ok(())
1792        }
1793    }
1794
1795    #[derive(Reflect)]
1796    pub struct TestSink {}
1797
1798    impl Freezable for TestSink {}
1799
1800    impl CuSinkTask for TestSink {
1801        type Resources<'r> = ();
1802        type Input<'m> = ();
1803
1804        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1805        where
1806            Self: Sized,
1807        {
1808            Ok(Self {})
1809        }
1810
1811        fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1812            Ok(())
1813        }
1814    }
1815
1816    // Those should be generated by the derive macro
1817    type Tasks = (TestSource, TestSink);
1818    type TestRuntime = CuRuntime<Tasks, (), Msgs, NoMonitor, 2>;
1819    const TEST_NBCL: usize = 2;
1820
1821    #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1822    struct Msgs(());
1823
1824    impl ErasedCuStampedDataSet for Msgs {
1825        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1826            Vec::new()
1827        }
1828    }
1829
1830    impl MatchingTasks for Msgs {
1831        fn get_all_task_ids() -> &'static [&'static str] {
1832            &[]
1833        }
1834    }
1835
1836    impl CuListZeroedInit for Msgs {
1837        fn init_zeroed(&mut self) {}
1838    }
1839
1840    #[cfg(feature = "std")]
1841    fn tasks_instanciator(
1842        all_instances_configs: Vec<Option<&ComponentConfig>>,
1843        _resources: &mut ResourceManager,
1844    ) -> CuResult<Tasks> {
1845        Ok((
1846            TestSource::new(all_instances_configs[0], ())?,
1847            TestSink::new(all_instances_configs[1], ())?,
1848        ))
1849    }
1850
1851    #[cfg(not(feature = "std"))]
1852    fn tasks_instanciator(
1853        all_instances_configs: Vec<Option<&ComponentConfig>>,
1854        _resources: &mut ResourceManager,
1855    ) -> CuResult<Tasks> {
1856        Ok((
1857            TestSource::new(all_instances_configs[0], ())?,
1858            TestSink::new(all_instances_configs[1], ())?,
1859        ))
1860    }
1861
1862    fn monitor_instanciator(
1863        _config: &CuConfig,
1864        metadata: CuMonitoringMetadata,
1865        runtime: CuMonitoringRuntime,
1866    ) -> NoMonitor {
1867        NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
1868    }
1869
1870    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1871        Ok(())
1872    }
1873
1874    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1875        Ok(ResourceManager::new(&[]))
1876    }
1877
1878    #[derive(Debug)]
1879    struct FakeWriter {}
1880
1881    impl<E: Encode> WriteStream<E> for FakeWriter {
1882        fn log(&mut self, _obj: &E) -> CuResult<()> {
1883            Ok(())
1884        }
1885    }
1886
1887    #[test]
1888    fn test_runtime_instantiation() {
1889        let mut config = CuConfig::default();
1890        let graph = config.get_graph_mut(None).unwrap();
1891        graph.add_node(Node::new("a", "TestSource")).unwrap();
1892        graph.add_node(Node::new("b", "TestSink")).unwrap();
1893        graph.connect(0, 1, "()").unwrap();
1894        let runtime: CuResult<TestRuntime> =
1895            CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
1896                RobotClock::default(),
1897                &config,
1898                crate::config::DEFAULT_MISSION_ID,
1899                CuRuntimeParts::new(
1900                    tasks_instanciator,
1901                    &[],
1902                    &[],
1903                    #[cfg(all(feature = "std", feature = "parallel-rt"))]
1904                    &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
1905                    monitor_instanciator,
1906                    bridges_instanciator,
1907                ),
1908                FakeWriter {},
1909                FakeWriter {},
1910            )
1911            .try_with_resources_instantiator(resources_instanciator)
1912            .and_then(|builder| builder.build());
1913        assert!(runtime.is_ok());
1914    }
1915
1916    #[test]
1917    fn test_rate_target_period_rejects_zero() {
1918        let err = rate_target_period(0).expect_err("zero rate target should fail");
1919        assert!(
1920            err.to_string()
1921                .contains("Runtime rate target cannot be zero"),
1922            "unexpected error: {err}"
1923        );
1924    }
1925
1926    #[test]
1927    fn test_loop_rate_limiter_advances_to_next_period_when_on_time() {
1928        let (clock, mock) = RobotClock::mock();
1929        let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
1930        assert_eq!(limiter.next_deadline(), CuTime::from_nanos(10_000_000));
1931
1932        mock.set_value(10_000_000);
1933        limiter.mark_tick(&clock);
1934
1935        assert_eq!(limiter.next_deadline(), CuTime::from_nanos(20_000_000));
1936    }
1937
1938    #[test]
1939    fn test_loop_rate_limiter_skips_missed_periods_without_resetting_phase() {
1940        let (clock, mock) = RobotClock::mock();
1941        let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
1942
1943        mock.set_value(35_000_000);
1944        limiter.mark_tick(&clock);
1945
1946        assert_eq!(limiter.next_deadline(), CuTime::from_nanos(40_000_000));
1947    }
1948
1949    #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
1950    #[test]
1951    fn test_loop_rate_limiter_spin_window_is_fixed_scheduler_window() {
1952        let (clock, _) = RobotClock::mock();
1953        let limiter = LoopRateLimiter::from_rate_target_hz(1_000, &clock).unwrap();
1954        assert_eq!(limiter.spin_window(), CuDuration::from(200_000));
1955
1956        let fast = LoopRateLimiter::from_rate_target_hz(10_000, &clock).unwrap();
1957        assert_eq!(fast.spin_window(), CuDuration::from(200_000));
1958    }
1959
1960    #[cfg(not(feature = "async-cl-io"))]
1961    #[test]
1962    fn test_copperlists_manager_lifecycle() {
1963        let mut config = CuConfig::default();
1964        let graph = config.get_graph_mut(None).unwrap();
1965        graph.add_node(Node::new("a", "TestSource")).unwrap();
1966        graph.add_node(Node::new("b", "TestSink")).unwrap();
1967        graph.connect(0, 1, "()").unwrap();
1968
1969        let mut runtime: TestRuntime =
1970            CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
1971                RobotClock::default(),
1972                &config,
1973                crate::config::DEFAULT_MISSION_ID,
1974                CuRuntimeParts::new(
1975                    tasks_instanciator,
1976                    &[],
1977                    &[],
1978                    #[cfg(all(feature = "std", feature = "parallel-rt"))]
1979                    &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
1980                    monitor_instanciator,
1981                    bridges_instanciator,
1982                ),
1983                FakeWriter {},
1984                FakeWriter {},
1985            )
1986            .try_with_resources_instantiator(resources_instanciator)
1987            .and_then(|builder| builder.build())
1988            .unwrap();
1989
1990        // Now emulates the generated runtime
1991        {
1992            let copperlists = &mut runtime.copperlists_manager;
1993            let culist0 = copperlists
1994                .create()
1995                .expect("Ran out of space for copper lists");
1996            let id = culist0.id;
1997            assert_eq!(id, 0);
1998            culist0.change_state(CopperListState::Processing);
1999            assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2000        }
2001
2002        {
2003            let copperlists = &mut runtime.copperlists_manager;
2004            let culist1 = copperlists
2005                .create()
2006                .expect("Ran out of space for copper lists");
2007            let id = culist1.id;
2008            assert_eq!(id, 1);
2009            culist1.change_state(CopperListState::Processing);
2010            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2011        }
2012
2013        {
2014            let copperlists = &mut runtime.copperlists_manager;
2015            let culist2 = copperlists.create();
2016            assert!(culist2.is_err());
2017            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2018            // Free in order, should let the top of the stack be serialized and freed.
2019            let _ = copperlists.end_of_processing(1);
2020            assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2021        }
2022
2023        // Readd a CL
2024        {
2025            let copperlists = &mut runtime.copperlists_manager;
2026            let culist2 = copperlists
2027                .create()
2028                .expect("Ran out of space for copper lists");
2029            let id = culist2.id;
2030            assert_eq!(id, 2);
2031            culist2.change_state(CopperListState::Processing);
2032            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2033            // Free out of order, the #0 first
2034            let _ = copperlists.end_of_processing(0);
2035            // Should not free up the top of the stack
2036            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2037
2038            // Free up the top of the stack
2039            let _ = copperlists.end_of_processing(2);
2040            // This should free up 2 CLs
2041
2042            assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2043        }
2044    }
2045
2046    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2047    #[derive(Debug, Default)]
2048    struct RecordingWriter {
2049        ids: Arc<Mutex<Vec<u64>>>,
2050    }
2051
2052    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2053    impl WriteStream<CopperList<Msgs>> for RecordingWriter {
2054        fn log(&mut self, culist: &CopperList<Msgs>) -> CuResult<()> {
2055            self.ids.lock().unwrap().push(culist.id);
2056            std::thread::sleep(std::time::Duration::from_millis(2));
2057            Ok(())
2058        }
2059    }
2060
2061    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2062    #[test]
2063    fn test_async_copperlists_manager_flushes_in_order() {
2064        let ids = Arc::new(Mutex::new(Vec::new()));
2065        let mut copperlists = CopperListsManager::<Msgs, 4>::new(Some(Box::new(RecordingWriter {
2066            ids: ids.clone(),
2067        })))
2068        .unwrap();
2069
2070        for expected_id in 0..4 {
2071            let culist = copperlists.create().unwrap();
2072            assert_eq!(culist.id, expected_id);
2073            culist.change_state(CopperListState::Processing);
2074            copperlists.end_of_processing(expected_id).unwrap();
2075        }
2076
2077        copperlists.finish_pending().unwrap();
2078        assert_eq!(copperlists.available_copper_lists().unwrap(), 4);
2079        assert_eq!(*ids.lock().unwrap(), vec![0, 1, 2, 3]);
2080    }
2081
2082    #[test]
2083    fn test_runtime_task_input_order() {
2084        let mut config = CuConfig::default();
2085        let graph = config.get_graph_mut(None).unwrap();
2086        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
2087        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
2088        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
2089
2090        assert_eq!(src1_id, 0);
2091        assert_eq!(src2_id, 1);
2092
2093        // note that the source2 connection is before the source1
2094        let src1_type = "src1_type";
2095        let src2_type = "src2_type";
2096        graph.connect(src2_id, sink_id, src2_type).unwrap();
2097        graph.connect(src1_id, sink_id, src1_type).unwrap();
2098
2099        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
2100        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
2101        // the edge id depends on the order the connection is created, not
2102        // on the node id, and that is what determines the input order
2103        assert_eq!(src1_edge_id, 1);
2104        assert_eq!(src2_edge_id, 0);
2105
2106        let runtime = compute_runtime_plan(graph).unwrap();
2107        let sink_step = runtime
2108            .steps
2109            .iter()
2110            .find_map(|step| match step {
2111                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
2112                _ => None,
2113            })
2114            .unwrap();
2115
2116        // since the src2 connection was added before src1 connection, the src2 type should be
2117        // first
2118        assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
2119        assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
2120    }
2121
2122    #[test]
2123    fn test_runtime_output_ports_unique_ordered() {
2124        let mut config = CuConfig::default();
2125        let graph = config.get_graph_mut(None).unwrap();
2126        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2127        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2128        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2129        let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
2130        let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
2131
2132        graph.connect(src_id, dst_a_id, "msg::A").unwrap();
2133        graph.connect(src_id, dst_b_id, "msg::B").unwrap();
2134        graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
2135        graph.connect(src_id, dst_c_id, "msg::C").unwrap();
2136
2137        let runtime = compute_runtime_plan(graph).unwrap();
2138        let src_step = runtime
2139            .steps
2140            .iter()
2141            .find_map(|step| match step {
2142                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2143                _ => None,
2144            })
2145            .unwrap();
2146
2147        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2148        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
2149
2150        let dst_a_step = runtime
2151            .steps
2152            .iter()
2153            .find_map(|step| match step {
2154                CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
2155                _ => None,
2156            })
2157            .unwrap();
2158        let dst_b_step = runtime
2159            .steps
2160            .iter()
2161            .find_map(|step| match step {
2162                CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
2163                _ => None,
2164            })
2165            .unwrap();
2166        let dst_a2_step = runtime
2167            .steps
2168            .iter()
2169            .find_map(|step| match step {
2170                CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
2171                _ => None,
2172            })
2173            .unwrap();
2174        let dst_c_step = runtime
2175            .steps
2176            .iter()
2177            .find_map(|step| match step {
2178                CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
2179                _ => None,
2180            })
2181            .unwrap();
2182
2183        assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
2184        assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
2185        assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
2186        assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
2187    }
2188
2189    #[test]
2190    fn test_runtime_output_ports_fanout_single() {
2191        let mut config = CuConfig::default();
2192        let graph = config.get_graph_mut(None).unwrap();
2193        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2194        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2195        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2196
2197        graph.connect(src_id, dst_a_id, "i32").unwrap();
2198        graph.connect(src_id, dst_b_id, "i32").unwrap();
2199
2200        let runtime = compute_runtime_plan(graph).unwrap();
2201        let src_step = runtime
2202            .steps
2203            .iter()
2204            .find_map(|step| match step {
2205                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2206                _ => None,
2207            })
2208            .unwrap();
2209
2210        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2211        assert_eq!(output_pack.msg_types, vec!["i32"]);
2212    }
2213
2214    #[test]
2215    fn test_runtime_output_ports_include_nc_outputs() {
2216        let mut config = CuConfig::default();
2217        let graph = config.get_graph_mut(None).unwrap();
2218        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2219        let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
2220        graph.connect(src_id, dst_id, "msg::A").unwrap();
2221        graph
2222            .get_node_mut(src_id)
2223            .expect("missing source node")
2224            .add_nc_output("msg::B", usize::MAX);
2225
2226        let runtime = compute_runtime_plan(graph).unwrap();
2227        let src_step = runtime
2228            .steps
2229            .iter()
2230            .find_map(|step| match step {
2231                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2232                _ => None,
2233            })
2234            .unwrap();
2235        let dst_step = runtime
2236            .steps
2237            .iter()
2238            .find_map(|step| match step {
2239                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2240                _ => None,
2241            })
2242            .unwrap();
2243
2244        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2245        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2246        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
2247    }
2248
2249    #[test]
2250    fn test_runtime_plan_infers_regular_task_when_outputs_are_nc_only() {
2251        let txt = r#"(
2252            tasks: [
2253                (id: "src", type: "a"),
2254                (id: "regular", type: "b"),
2255            ],
2256            cnx: [
2257                (src: "src", dst: "regular", msg: "msg::A"),
2258                (src: "regular", dst: "__nc__", msg: "msg::B"),
2259            ]
2260        )"#;
2261        let config = CuConfig::deserialize_ron(txt).unwrap();
2262        let graph = config.get_graph(None).unwrap();
2263        let regular_id = graph.get_node_id_by_name("regular").unwrap();
2264
2265        let runtime = compute_runtime_plan(graph).unwrap();
2266        let regular_step = runtime
2267            .steps
2268            .iter()
2269            .find_map(|step| match step {
2270                CuExecutionUnit::Step(step) if step.node_id == regular_id => Some(step),
2271                _ => None,
2272            })
2273            .unwrap();
2274
2275        assert_eq!(regular_step.task_type, CuTaskType::Regular);
2276        assert_eq!(
2277            regular_step.output_msg_pack.as_ref().unwrap().msg_types,
2278            vec!["msg::B"]
2279        );
2280    }
2281
2282    #[test]
2283    fn test_runtime_output_ports_respect_connection_order_with_nc() {
2284        let txt = r#"(
2285            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2286            cnx: [
2287                (src: "src", dst: "__nc__", msg: "msg::A"),
2288                (src: "src", dst: "sink", msg: "msg::B"),
2289            ]
2290        )"#;
2291        let config = CuConfig::deserialize_ron(txt).unwrap();
2292        let graph = config.get_graph(None).unwrap();
2293        let src_id = graph.get_node_id_by_name("src").unwrap();
2294        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2295
2296        let runtime = compute_runtime_plan(graph).unwrap();
2297        let src_step = runtime
2298            .steps
2299            .iter()
2300            .find_map(|step| match step {
2301                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2302                _ => None,
2303            })
2304            .unwrap();
2305        let dst_step = runtime
2306            .steps
2307            .iter()
2308            .find_map(|step| match step {
2309                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2310                _ => None,
2311            })
2312            .unwrap();
2313
2314        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2315        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2316        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2317    }
2318
2319    #[cfg(feature = "std")]
2320    #[test]
2321    fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
2322        let txt = r#"(
2323            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2324            cnx: [
2325                (src: "src", dst: "__nc__", msg: "msg::A"),
2326                (src: "src", dst: "sink", msg: "msg::B"),
2327            ]
2328        )"#;
2329        let tmp = tempfile::NamedTempFile::new().unwrap();
2330        std::fs::write(tmp.path(), txt).unwrap();
2331        let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
2332        let graph = config.get_graph(None).unwrap();
2333        let src_id = graph.get_node_id_by_name("src").unwrap();
2334        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2335
2336        let runtime = compute_runtime_plan(graph).unwrap();
2337        let src_step = runtime
2338            .steps
2339            .iter()
2340            .find_map(|step| match step {
2341                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2342                _ => None,
2343            })
2344            .unwrap();
2345        let dst_step = runtime
2346            .steps
2347            .iter()
2348            .find_map(|step| match step {
2349                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2350                _ => None,
2351            })
2352            .unwrap();
2353
2354        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2355        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2356        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2357    }
2358
2359    #[test]
2360    fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
2361        let txt = r#"(
2362            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2363            cnx: [
2364                (src: "src", dst: "__nc__", msg: "i32"),
2365                (src: "src", dst: "sink", msg: "bool"),
2366            ]
2367        )"#;
2368        let config = CuConfig::deserialize_ron(txt).unwrap();
2369        let graph = config.get_graph(None).unwrap();
2370        let src_id = graph.get_node_id_by_name("src").unwrap();
2371        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2372
2373        let runtime = compute_runtime_plan(graph).unwrap();
2374        let src_step = runtime
2375            .steps
2376            .iter()
2377            .find_map(|step| match step {
2378                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2379                _ => None,
2380            })
2381            .unwrap();
2382        let dst_step = runtime
2383            .steps
2384            .iter()
2385            .find_map(|step| match step {
2386                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2387                _ => None,
2388            })
2389            .unwrap();
2390
2391        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2392        assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
2393        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2394    }
2395
2396    #[test]
2397    fn test_runtime_plan_diamond_case1() {
2398        // more complex topology that tripped the scheduler
2399        let mut config = CuConfig::default();
2400        let graph = config.get_graph_mut(None).unwrap();
2401        let cam0_id = graph
2402            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2403            .unwrap();
2404        let inf0_id = graph
2405            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2406            .unwrap();
2407        let broadcast_id = graph
2408            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2409            .unwrap();
2410
2411        // case 1 order
2412        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2413        graph.connect(cam0_id, inf0_id, "i32").unwrap();
2414        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2415
2416        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2417        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
2418
2419        assert_eq!(edge_cam0_to_inf0, 0);
2420        assert_eq!(edge_cam0_to_broadcast, 1);
2421
2422        let runtime = compute_runtime_plan(graph).unwrap();
2423        let broadcast_step = runtime
2424            .steps
2425            .iter()
2426            .find_map(|step| match step {
2427                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2428                _ => None,
2429            })
2430            .unwrap();
2431
2432        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2433        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2434    }
2435
2436    #[test]
2437    fn test_runtime_plan_diamond_case2() {
2438        // more complex topology that tripped the scheduler variation 2
2439        let mut config = CuConfig::default();
2440        let graph = config.get_graph_mut(None).unwrap();
2441        let cam0_id = graph
2442            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2443            .unwrap();
2444        let inf0_id = graph
2445            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2446            .unwrap();
2447        let broadcast_id = graph
2448            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2449            .unwrap();
2450
2451        // case 2 order
2452        graph.connect(cam0_id, inf0_id, "i32").unwrap();
2453        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2454        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2455
2456        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2457        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
2458
2459        assert_eq!(edge_cam0_to_broadcast, 0);
2460        assert_eq!(edge_cam0_to_inf0, 1);
2461
2462        let runtime = compute_runtime_plan(graph).unwrap();
2463        let broadcast_step = runtime
2464            .steps
2465            .iter()
2466            .find_map(|step| match step {
2467                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2468                _ => None,
2469            })
2470            .unwrap();
2471
2472        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2473        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2474    }
2475}