1use crate::app::Subsystem;
6use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node, TaskKind};
7use crate::config::{
8 CuConfig, CuGraph, MAX_RATE_TARGET_HZ, NodeId, RuntimeConfig, resolve_task_kind_for_id,
9};
10use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
11use crate::cutask::{BincodeAdapter, Freezable};
12#[cfg(feature = "std")]
13use crate::monitoring::ExecutionProbeHandle;
14#[cfg(feature = "std")]
15use crate::monitoring::MonitorExecutionProbe;
16use crate::monitoring::{
17 ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
18 ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
19 take_last_completed_handle_bytes,
20};
21#[cfg(all(feature = "std", feature = "parallel-rt"))]
22use crate::parallel_rt::{ParallelRt, ParallelRtMetadata};
23use crate::resource::ResourceManager;
24use compact_str::CompactString;
25use cu29_clock::{ClockProvider, CuDuration, CuTime, RobotClock};
26use cu29_traits::CuResult;
27use cu29_traits::WriteStream;
28use cu29_traits::{CopperListTuple, CuError};
29
30#[cfg(target_os = "none")]
31#[allow(unused_imports)]
32use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
33#[cfg(target_os = "none")]
34#[allow(unused_imports)]
35use cu29_log_derive::info;
36#[cfg(target_os = "none")]
37#[allow(unused_imports)]
38use cu29_log_runtime::log;
39#[cfg(all(target_os = "none", debug_assertions))]
40#[allow(unused_imports)]
41use cu29_log_runtime::log_debug_mode;
42#[cfg(target_os = "none")]
43#[allow(unused_imports)]
44use cu29_value::to_value;
45
46#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
47use alloc::alloc::{alloc_zeroed, handle_alloc_error};
48use alloc::boxed::Box;
49use alloc::collections::{BTreeSet, VecDeque};
50use alloc::format;
51use alloc::string::{String, ToString};
52use alloc::vec::Vec;
53use bincode::enc::EncoderImpl;
54use bincode::enc::write::{SizeWriter, SliceWriter};
55use bincode::error::EncodeError;
56use bincode::{Decode, Encode};
57#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
58use core::alloc::Layout;
59use core::fmt::Result as FmtResult;
60use core::fmt::{Debug, Formatter};
61use core::marker::PhantomData;
62
63#[cfg(all(feature = "std", feature = "async-cl-io"))]
64use std::sync::mpsc::{Receiver, SyncSender, TryRecvError, sync_channel};
65#[cfg(all(feature = "std", feature = "async-cl-io"))]
66use std::thread::JoinHandle;
67
68#[doc(hidden)]
69pub type TasksInstantiator<CT> =
70 for<'c> fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>;
71#[doc(hidden)]
72pub type BridgesInstantiator<CB> = fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>;
73#[doc(hidden)]
74pub type MonitorInstantiator<M> = fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M;
75
76#[doc(hidden)]
77pub struct CuRuntimeParts<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI> {
78 pub tasks_instanciator: TI,
79 pub monitored_components: &'static [MonitorComponentMetadata],
80 pub culist_component_mapping: &'static [ComponentId],
81 #[cfg(all(feature = "std", feature = "parallel-rt"))]
82 pub parallel_rt_metadata: &'static ParallelRtMetadata,
83 pub monitor_instanciator: MI,
84 pub bridges_instanciator: BI,
85 _payload: PhantomData<(CT, CB, P, M, [(); NBCL])>,
86}
87
88impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI>
89 CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>
90{
91 pub const fn new(
92 tasks_instanciator: TI,
93 monitored_components: &'static [MonitorComponentMetadata],
94 culist_component_mapping: &'static [ComponentId],
95 #[cfg(all(feature = "std", feature = "parallel-rt"))]
96 parallel_rt_metadata: &'static ParallelRtMetadata,
97 monitor_instanciator: MI,
98 bridges_instanciator: BI,
99 ) -> Self {
100 Self {
101 tasks_instanciator,
102 monitored_components,
103 culist_component_mapping,
104 #[cfg(all(feature = "std", feature = "parallel-rt"))]
105 parallel_rt_metadata,
106 monitor_instanciator,
107 bridges_instanciator,
108 _payload: PhantomData,
109 }
110 }
111}
112
113#[doc(hidden)]
114pub struct CuRuntimeBuilder<
115 'cfg,
116 CT,
117 CB,
118 P: CopperListTuple,
119 M: CuMonitor,
120 const NBCL: usize,
121 TI,
122 BI,
123 MI,
124 CLW,
125 KFW,
126> {
127 clock: RobotClock,
128 config: &'cfg CuConfig,
129 mission: &'cfg str,
130 subsystem: Subsystem,
131 instance_id: u32,
132 resources: Option<ResourceManager>,
133 parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
134 copperlists_logger: CLW,
135 keyframes_logger: KFW,
136}
137
138impl<'cfg, CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI, CLW, KFW>
139 CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
140{
141 pub fn new(
142 clock: RobotClock,
143 config: &'cfg CuConfig,
144 mission: &'cfg str,
145 parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
146 copperlists_logger: CLW,
147 keyframes_logger: KFW,
148 ) -> Self {
149 Self {
150 clock,
151 config,
152 mission,
153 subsystem: Subsystem::new(None, 0),
154 instance_id: 0,
155 resources: None,
156 parts,
157 copperlists_logger,
158 keyframes_logger,
159 }
160 }
161
162 pub fn with_subsystem(mut self, subsystem: Subsystem) -> Self {
163 self.subsystem = subsystem;
164 self
165 }
166
167 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
168 self.instance_id = instance_id;
169 self
170 }
171
172 pub fn with_resources(mut self, resources: ResourceManager) -> Self {
173 self.resources = Some(resources);
174 self
175 }
176
177 pub fn try_with_resources_instantiator(
178 mut self,
179 resources_instantiator: impl FnOnce(&CuConfig) -> CuResult<ResourceManager>,
180 ) -> CuResult<Self> {
181 self.resources = Some(resources_instantiator(self.config)?);
182 Ok(self)
183 }
184}
185
186#[inline]
197pub fn perf_now(_clock: &RobotClock) -> CuTime {
198 #[cfg(all(feature = "std", feature = "sysclock-perf"))]
199 {
200 static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
201 return PERF_CLOCK.get_or_init(RobotClock::new).now();
202 }
203
204 #[allow(unreachable_code)]
205 _clock.now()
206}
207
208#[cfg(all(feature = "std", feature = "high-precision-limiter"))]
209const HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS: u64 = 200_000;
210
211#[inline]
213pub fn rate_target_period(rate_target_hz: u64) -> CuResult<CuDuration> {
214 if rate_target_hz == 0 {
215 return Err(CuError::from(
216 "Runtime rate target cannot be zero. Set runtime.rate_target_hz to at least 1.",
217 ));
218 }
219
220 if rate_target_hz > MAX_RATE_TARGET_HZ {
221 return Err(CuError::from(format!(
222 "Runtime rate target ({rate_target_hz} Hz) exceeds the supported maximum of {MAX_RATE_TARGET_HZ} Hz."
223 )));
224 }
225
226 Ok(CuDuration::from(MAX_RATE_TARGET_HZ / rate_target_hz))
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq)]
236pub struct LoopRateLimiter {
237 period: CuDuration,
238 next_deadline: CuTime,
239}
240
241impl LoopRateLimiter {
242 #[inline]
243 pub fn from_rate_target_hz(rate_target_hz: u64, clock: &RobotClock) -> CuResult<Self> {
244 let period = rate_target_period(rate_target_hz)?;
245 Ok(Self {
246 period,
247 next_deadline: clock.now() + period,
248 })
249 }
250
251 #[inline]
252 pub fn is_ready(&self, clock: &RobotClock) -> bool {
253 self.remaining(clock).is_none()
254 }
255
256 #[inline]
257 pub fn remaining(&self, clock: &RobotClock) -> Option<CuDuration> {
258 let now = clock.now();
259 if now < self.next_deadline {
260 Some(self.next_deadline - now)
261 } else {
262 None
263 }
264 }
265
266 #[inline]
267 pub fn wait_until_ready(&self, clock: &RobotClock) {
268 let deadline = self.next_deadline;
269 let Some(remaining) = self.remaining(clock) else {
270 return;
271 };
272
273 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
274 {
275 let spin_window = self.spin_window();
276 if remaining > spin_window {
277 std::thread::sleep(std::time::Duration::from(remaining - spin_window));
278 }
279 while clock.now() < deadline {
280 core::hint::spin_loop();
281 }
282 }
283
284 #[cfg(all(feature = "std", not(feature = "high-precision-limiter")))]
285 {
286 let _ = deadline;
287 std::thread::sleep(std::time::Duration::from(remaining));
288 }
289
290 #[cfg(not(feature = "std"))]
291 {
292 let _ = remaining;
293 while clock.now() < deadline {
294 core::hint::spin_loop();
295 }
296 }
297 }
298
299 #[inline]
300 pub fn mark_tick(&mut self, clock: &RobotClock) {
301 self.advance_from(clock.now());
302 }
303
304 #[inline]
305 pub fn limit(&mut self, clock: &RobotClock) {
306 self.wait_until_ready(clock);
307 self.mark_tick(clock);
308 }
309
310 #[inline]
311 fn advance_from(&mut self, now: CuTime) {
312 let steps = if now < self.next_deadline {
313 1
314 } else {
315 (now - self.next_deadline).as_nanos() / self.period.as_nanos() + 1
316 };
317 self.next_deadline += steps * self.period;
318 }
319
320 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
321 #[inline]
322 fn spin_window(&self) -> CuDuration {
323 let _ = self.period;
324 CuDuration::from(HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS)
325 }
326
327 #[cfg(test)]
328 #[inline]
329 fn next_deadline(&self) -> CuTime {
330 self.next_deadline
331 }
332}
333
334#[cfg(all(feature = "std", feature = "async-cl-io"))]
335#[doc(hidden)]
336pub trait AsyncCopperListPayload: Send {}
337
338#[cfg(all(feature = "std", feature = "async-cl-io"))]
339impl<T: Send> AsyncCopperListPayload for T {}
340
341#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
342#[doc(hidden)]
343pub trait AsyncCopperListPayload {}
344
345#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
346impl<T> AsyncCopperListPayload for T {}
347
348#[derive(Clone, Copy, Debug, PartialEq, Eq)]
355#[doc(hidden)]
356pub enum ProcessStepOutcome {
357 Continue,
358 AbortCopperList,
359}
360
361#[doc(hidden)]
363pub type ProcessStepResult = CuResult<ProcessStepOutcome>;
364
365#[cfg(feature = "remote-debug")]
366fn encode_completed_copperlist_snapshot<P: CopperListTuple>(
367 cl: &CopperList<P>,
368) -> CuResult<Vec<u8>> {
369 bincode::encode_to_vec(cl, bincode::config::standard())
370 .map_err(|e| CuError::new_with_cause("Failed to encode completed CopperList snapshot", e))
371}
372
373#[doc(hidden)]
375pub struct SyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
376 inner: CuListsManager<P, NBCL>,
377 logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
379 #[cfg(feature = "remote-debug")]
381 last_completed_encoded: Option<Vec<u8>>,
382 pub last_encoded_bytes: u64,
384 pub last_handle_bytes: u64,
386}
387
388impl<P: CopperListTuple + Default, const NBCL: usize> SyncCopperListsManager<P, NBCL> {
389 pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
390 where
391 P: CuListZeroedInit,
392 {
393 Ok(Self {
394 inner: CuListsManager::new(),
395 logger,
396 #[cfg(feature = "remote-debug")]
397 last_completed_encoded: None,
398 last_encoded_bytes: 0,
399 last_handle_bytes: 0,
400 })
401 }
402
403 pub fn next_cl_id(&self) -> u64 {
404 self.inner.next_cl_id()
405 }
406
407 pub fn last_cl_id(&self) -> u64 {
408 self.inner.last_cl_id()
409 }
410
411 pub fn peek(&self) -> Option<&CopperList<P>> {
412 self.inner.peek()
413 }
414
415 #[cfg(feature = "remote-debug")]
416 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
417 self.last_completed_encoded.as_deref()
418 }
419
420 #[cfg(not(feature = "remote-debug"))]
421 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
422 None
423 }
424
425 #[cfg(feature = "remote-debug")]
426 pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
427 self.last_completed_encoded = snapshot;
428 }
429
430 #[cfg(not(feature = "remote-debug"))]
431 pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
432
433 pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
434 self.inner
435 .create()
436 .ok_or_else(|| CuError::from("Ran out of space for copper lists"))
437 }
438
439 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
440 let mut is_top = true;
441 let mut nb_done = 0;
442 self.last_handle_bytes = 0;
443 #[cfg(feature = "remote-debug")]
444 let last_completed_encoded = &mut self.last_completed_encoded;
445 for cl in self.inner.iter_mut() {
446 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
447 cl.change_state(CopperListState::DoneProcessing);
448 #[cfg(feature = "remote-debug")]
449 {
450 *last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
451 }
452 }
453 if is_top && cl.get_state() == CopperListState::DoneProcessing {
454 if let Some(logger) = &mut self.logger {
455 cl.change_state(CopperListState::BeingSerialized);
456 logger.log(cl)?;
457 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
458 self.last_handle_bytes = take_last_completed_handle_bytes();
459 }
460 cl.change_state(CopperListState::Free);
461 nb_done += 1;
462 } else {
463 is_top = false;
464 }
465 }
466 for _ in 0..nb_done {
467 let _ = self.inner.pop();
468 }
469 Ok(())
470 }
471
472 pub fn finish_pending(&mut self) -> CuResult<()> {
473 Ok(())
474 }
475
476 pub fn available_copper_lists(&mut self) -> CuResult<usize> {
477 Ok(NBCL - self.inner.len())
478 }
479
480 #[cfg(feature = "std")]
481 pub fn end_of_processing_boxed(
482 &mut self,
483 mut culist: Box<CopperList<P>>,
484 ) -> CuResult<OwnedCopperListSubmission<P>> {
485 culist.change_state(CopperListState::DoneProcessing);
486 self.last_encoded_bytes = 0;
487 self.last_handle_bytes = 0;
488 if let Some(logger) = &mut self.logger {
489 culist.change_state(CopperListState::BeingSerialized);
490 logger.log(&culist)?;
491 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
492 self.last_handle_bytes = take_last_completed_handle_bytes();
493 }
494 culist.change_state(CopperListState::Free);
495 Ok(OwnedCopperListSubmission::Recycled(culist))
496 }
497
498 #[cfg(feature = "std")]
499 pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
500 Ok(None)
501 }
502
503 #[cfg(feature = "std")]
504 pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
505 Err(CuError::from(
506 "Synchronous CopperList I/O cannot block waiting for boxed completions",
507 ))
508 }
509
510 #[cfg(feature = "std")]
511 pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
512 Ok(Vec::new())
513 }
514}
515
516#[cfg(feature = "std")]
518#[doc(hidden)]
519pub enum OwnedCopperListSubmission<P: CopperListTuple> {
520 Recycled(Box<CopperList<P>>),
522 Pending,
524}
525
526#[cfg(all(feature = "std", feature = "async-cl-io"))]
527struct AsyncCopperListCompletion<P: CopperListTuple> {
528 culist: Box<CopperList<P>>,
529 log_result: CuResult<(u64, u64)>,
530}
531
532#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
533fn allocate_zeroed_copperlist<P>() -> Box<CopperList<P>>
534where
535 P: CopperListTuple + CuListZeroedInit,
536{
537 let mut culist = unsafe {
539 let layout = Layout::new::<CopperList<P>>();
540 let ptr = alloc_zeroed(layout) as *mut CopperList<P>;
541 if ptr.is_null() {
542 handle_alloc_error(layout);
543 }
544 Box::from_raw(ptr)
545 };
546 culist.msgs.init_zeroed();
547 culist
548}
549
550#[cfg(all(feature = "std", feature = "parallel-rt"))]
551pub fn allocate_boxed_copperlists<P, const NBCL: usize>() -> Vec<Box<CopperList<P>>>
552where
553 P: CopperListTuple + CuListZeroedInit,
554{
555 let mut free_pool = Vec::with_capacity(NBCL);
556 for _ in 0..NBCL {
557 free_pool.push(allocate_zeroed_copperlist::<P>());
558 }
559 free_pool
560}
561
562#[cfg(all(feature = "std", feature = "async-cl-io"))]
564#[doc(hidden)]
565pub struct AsyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
566 free_pool: Vec<Box<CopperList<P>>>,
567 current: Option<Box<CopperList<P>>>,
568 #[cfg(feature = "remote-debug")]
569 last_completed_encoded: Option<Vec<u8>>,
570 pending_count: usize,
571 next_cl_id: u64,
572 pending_sender: Option<SyncSender<Box<CopperList<P>>>>,
573 completion_receiver: Option<Receiver<AsyncCopperListCompletion<P>>>,
574 worker_handle: Option<JoinHandle<()>>,
575 pub last_encoded_bytes: u64,
577 pub last_handle_bytes: u64,
579}
580
581#[cfg(all(feature = "std", feature = "async-cl-io"))]
582impl<P: CopperListTuple + Default, const NBCL: usize> AsyncCopperListsManager<P, NBCL> {
583 pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
584 where
585 P: CuListZeroedInit + AsyncCopperListPayload + 'static,
586 {
587 let mut free_pool = Vec::with_capacity(NBCL);
588 for _ in 0..NBCL {
589 free_pool.push(allocate_zeroed_copperlist::<P>());
590 }
591
592 let (pending_sender, completion_receiver, worker_handle) = if let Some(mut logger) = logger
593 {
594 let (pending_sender, pending_receiver) = sync_channel::<Box<CopperList<P>>>(NBCL);
595 let (completion_sender, completion_receiver) =
596 sync_channel::<AsyncCopperListCompletion<P>>(NBCL);
597 let worker_handle = std::thread::Builder::new()
598 .name("cu-async-cl-io".to_string())
599 .spawn(move || {
600 while let Ok(mut culist) = pending_receiver.recv() {
601 culist.change_state(CopperListState::BeingSerialized);
602 let log_result = logger.log(&culist).map(|_| {
603 (
604 logger.last_log_bytes().unwrap_or(0) as u64,
605 take_last_completed_handle_bytes(),
606 )
607 });
608 let should_stop = log_result.is_err();
609 if completion_sender
610 .send(AsyncCopperListCompletion { culist, log_result })
611 .is_err()
612 {
613 break;
614 }
615 if should_stop {
616 break;
617 }
618 }
619 })
620 .map_err(|e| {
621 CuError::from("Failed to spawn async CopperList serializer thread")
622 .add_cause(e.to_string().as_str())
623 })?;
624 (
625 Some(pending_sender),
626 Some(completion_receiver),
627 Some(worker_handle),
628 )
629 } else {
630 (None, None, None)
631 };
632
633 Ok(Self {
634 free_pool,
635 current: None,
636 #[cfg(feature = "remote-debug")]
637 last_completed_encoded: None,
638 pending_count: 0,
639 next_cl_id: 0,
640 pending_sender,
641 completion_receiver,
642 worker_handle,
643 last_encoded_bytes: 0,
644 last_handle_bytes: 0,
645 })
646 }
647
648 pub fn next_cl_id(&self) -> u64 {
649 self.next_cl_id
650 }
651
652 pub fn last_cl_id(&self) -> u64 {
653 self.next_cl_id.saturating_sub(1)
654 }
655
656 pub fn peek(&self) -> Option<&CopperList<P>> {
657 self.current.as_deref()
658 }
659
660 #[cfg(feature = "remote-debug")]
661 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
662 self.last_completed_encoded.as_deref()
663 }
664
665 #[cfg(not(feature = "remote-debug"))]
666 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
667 None
668 }
669
670 #[cfg(feature = "remote-debug")]
671 pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
672 self.last_completed_encoded = snapshot;
673 }
674
675 #[cfg(not(feature = "remote-debug"))]
676 pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
677
678 pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
679 if self.current.is_some() {
680 return Err(CuError::from(
681 "Attempted to create a CopperList while another one is still active",
682 ));
683 }
684
685 self.reclaim_completed()?;
686 while self.free_pool.is_empty() {
687 self.wait_for_completion()?;
688 }
689
690 let culist = self
691 .free_pool
692 .pop()
693 .ok_or_else(|| CuError::from("Ran out of space for copper lists"))?;
694 self.current = Some(culist);
695
696 let current = self
697 .current
698 .as_mut()
699 .expect("current CopperList is missing");
700 current.id = self.next_cl_id;
701 current.change_state(CopperListState::Initialized);
702 self.next_cl_id += 1;
703 Ok(current.as_mut())
704 }
705
706 #[cfg(feature = "remote-debug")]
707 fn capture_completed_snapshot(&mut self, cl: &CopperList<P>) -> CuResult<()> {
708 self.last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
709 Ok(())
710 }
711
712 #[cfg(not(feature = "remote-debug"))]
713 fn capture_completed_snapshot(&mut self, _cl: &CopperList<P>) -> CuResult<()> {
714 Ok(())
715 }
716
717 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
718 self.reclaim_completed()?;
719
720 let mut culist = self.current.take().ok_or_else(|| {
721 CuError::from("Attempted to finish processing without an active CopperList")
722 })?;
723
724 if culist.id != culistid {
725 return Err(CuError::from(format!(
726 "Attempted to finish CopperList #{culistid} while CopperList #{} is active",
727 culist.id
728 )));
729 }
730
731 culist.change_state(CopperListState::DoneProcessing);
732 self.capture_completed_snapshot(&culist)?;
733 self.last_encoded_bytes = 0;
734 self.last_handle_bytes = 0;
735
736 if let Some(pending_sender) = &self.pending_sender {
737 culist.change_state(CopperListState::QueuedForSerialization);
738 pending_sender.send(culist).map_err(|e| {
739 CuError::from("Failed to enqueue CopperList for async serialization")
740 .add_cause(e.to_string().as_str())
741 })?;
742 self.pending_count += 1;
743 self.reclaim_completed()?;
744 } else {
745 culist.change_state(CopperListState::Free);
746 self.free_pool.push(culist);
747 }
748
749 Ok(())
750 }
751
752 pub fn finish_pending(&mut self) -> CuResult<()> {
753 if self.current.is_some() {
754 return Err(CuError::from(
755 "Cannot flush CopperList I/O while a CopperList is still active",
756 ));
757 }
758
759 while self.pending_count > 0 {
760 self.wait_for_completion()?;
761 }
762 Ok(())
763 }
764
765 pub fn available_copper_lists(&mut self) -> CuResult<usize> {
766 self.reclaim_completed()?;
767 Ok(self.free_pool.len())
768 }
769
770 pub fn end_of_processing_boxed(
771 &mut self,
772 mut culist: Box<CopperList<P>>,
773 ) -> CuResult<OwnedCopperListSubmission<P>> {
774 self.reclaim_completed()?;
775 culist.change_state(CopperListState::DoneProcessing);
776 self.capture_completed_snapshot(&culist)?;
777 self.last_encoded_bytes = 0;
778 self.last_handle_bytes = 0;
779
780 if let Some(pending_sender) = &self.pending_sender {
781 culist.change_state(CopperListState::QueuedForSerialization);
782 pending_sender.send(culist).map_err(|e| {
783 CuError::from("Failed to enqueue CopperList for async serialization")
784 .add_cause(e.to_string().as_str())
785 })?;
786 self.pending_count += 1;
787 self.reclaim_completed()?;
788 Ok(OwnedCopperListSubmission::Pending)
789 } else {
790 culist.change_state(CopperListState::Free);
791 Ok(OwnedCopperListSubmission::Recycled(culist))
792 }
793 }
794
795 pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
796 let recv_result = {
797 let Some(completion_receiver) = self.completion_receiver.as_ref() else {
798 return Ok(None);
799 };
800 completion_receiver.try_recv()
801 };
802 match recv_result {
803 Ok(completion) => self.handle_completion(completion).map(Some),
804 Err(TryRecvError::Empty) => Ok(None),
805 Err(TryRecvError::Disconnected) => Err(CuError::from(
806 "Async CopperList serializer thread disconnected unexpectedly",
807 )),
808 }
809 }
810
811 pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
812 let completion = self
813 .completion_receiver
814 .as_ref()
815 .ok_or_else(|| {
816 CuError::from("No async CopperList serializer is active to return a free slot")
817 })?
818 .recv()
819 .map_err(|e| {
820 CuError::from("Failed to receive completion from async CopperList serializer")
821 .add_cause(e.to_string().as_str())
822 })?;
823 self.handle_completion(completion)
824 }
825
826 pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
827 let mut reclaimed = Vec::with_capacity(self.pending_count);
828 if self.current.is_some() {
829 return Err(CuError::from(
830 "Cannot flush CopperList I/O while a CopperList is still active",
831 ));
832 }
833 while self.pending_count > 0 {
834 reclaimed.push(self.wait_reclaim_boxed()?);
835 }
836 Ok(reclaimed)
837 }
838
839 fn reclaim_completed(&mut self) -> CuResult<()> {
840 loop {
841 let Some(culist) = self.try_reclaim_boxed()? else {
842 break;
843 };
844 self.free_pool.push(culist);
845 }
846 Ok(())
847 }
848
849 fn wait_for_completion(&mut self) -> CuResult<()> {
850 let culist = self.wait_reclaim_boxed()?;
851 self.free_pool.push(culist);
852 Ok(())
853 }
854
855 fn handle_completion(
856 &mut self,
857 mut completion: AsyncCopperListCompletion<P>,
858 ) -> CuResult<Box<CopperList<P>>> {
859 self.pending_count = self.pending_count.saturating_sub(1);
860 if let Ok((encoded_bytes, handle_bytes)) = completion.log_result.as_ref() {
861 self.last_encoded_bytes = *encoded_bytes;
862 self.last_handle_bytes = *handle_bytes;
863 }
864 completion.culist.change_state(CopperListState::Free);
865 completion.log_result?;
866 Ok(completion.culist)
867 }
868
869 fn shutdown_worker(&mut self) -> CuResult<()> {
870 self.finish_pending()?;
871 self.pending_sender.take();
872 if let Some(worker_handle) = self.worker_handle.take() {
873 worker_handle.join().map_err(|_| {
874 CuError::from("Async CopperList serializer thread panicked while joining")
875 })?;
876 }
877 Ok(())
878 }
879}
880
881#[cfg(all(feature = "std", feature = "async-cl-io"))]
882impl<P: CopperListTuple + Default, const NBCL: usize> Drop for AsyncCopperListsManager<P, NBCL> {
883 fn drop(&mut self) {
884 let _ = self.shutdown_worker();
885 }
886}
887
888#[cfg(all(feature = "std", feature = "async-cl-io"))]
889#[doc(hidden)]
890pub type CopperListsManager<P, const NBCL: usize> = AsyncCopperListsManager<P, NBCL>;
891
892#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
893#[doc(hidden)]
894pub type CopperListsManager<P, const NBCL: usize> = SyncCopperListsManager<P, NBCL>;
895
896pub struct KeyFramesManager {
898 inner: KeyFrame,
900
901 forced_timestamp: Option<CuTime>,
903
904 locked: bool,
906
907 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
909
910 keyframe_interval: u32,
912
913 pub last_encoded_bytes: u64,
915}
916
917impl KeyFramesManager {
918 fn is_keyframe(&self, culistid: u64) -> bool {
919 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
920 }
921
922 #[inline]
923 pub fn captures_keyframe(&self, culistid: u64) -> bool {
924 self.is_keyframe(culistid)
925 }
926
927 pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
928 if self.is_keyframe(culistid) {
929 if self.locked && self.inner.culistid == culistid {
931 return;
932 }
933 let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
934 self.inner.reset(culistid, ts);
935 self.locked = false;
936 }
937 }
938
939 #[cfg(feature = "std")]
941 pub fn set_forced_timestamp(&mut self, ts: CuTime) {
942 self.forced_timestamp = Some(ts);
943 }
944
945 pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
946 if self.is_keyframe(culistid) {
947 if self.locked {
948 return Ok(0);
950 }
951 if self.inner.culistid != culistid {
952 return Err(CuError::from(format!(
953 "Freezing task for culistid {} but current keyframe is {}",
954 culistid, self.inner.culistid
955 )));
956 }
957 self.inner
958 .add_frozen_task(task)
959 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
960 } else {
961 Ok(0)
962 }
963 }
964
965 pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
967 self.freeze_task(culistid, item)
968 }
969
970 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
971 if self.is_keyframe(culistid) {
972 let logger = self.logger.as_mut().unwrap();
973 logger.log(&self.inner)?;
974 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
975 self.locked = false;
977 Ok(())
978 } else {
979 self.last_encoded_bytes = 0;
981 Ok(())
982 }
983 }
984
985 #[cfg(feature = "std")]
987 pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
988 self.inner = keyframe.clone();
989 self.forced_timestamp = Some(keyframe.timestamp);
990 self.locked = true;
991 }
992}
993
994pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
998 clock: RobotClock,
1000
1001 subsystem_code: u16,
1003
1004 #[doc(hidden)]
1006 pub instance_id: u32,
1007
1008 #[doc(hidden)]
1010 pub tasks: CT,
1011
1012 #[doc(hidden)]
1014 pub bridges: CB,
1015
1016 #[doc(hidden)]
1018 pub resources: ResourceManager,
1019
1020 #[doc(hidden)]
1022 pub monitor: M,
1023
1024 #[cfg(feature = "std")]
1030 #[doc(hidden)]
1031 pub execution_probe: ExecutionProbeHandle,
1032 #[cfg(not(feature = "std"))]
1033 #[doc(hidden)]
1034 pub execution_probe: RuntimeExecutionProbe,
1035
1036 #[doc(hidden)]
1038 pub copperlists_manager: CopperListsManager<P, NBCL>,
1039
1040 #[doc(hidden)]
1042 pub keyframes_manager: KeyFramesManager,
1043
1044 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1046 #[doc(hidden)]
1047 pub parallel_rt: ParallelRt<NBCL>,
1048
1049 #[doc(hidden)]
1051 pub runtime_config: RuntimeConfig,
1052}
1053
1054impl<
1056 CT,
1057 CB,
1058 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload,
1059 M: CuMonitor,
1060 const NBCL: usize,
1061> ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
1062{
1063 fn get_clock(&self) -> RobotClock {
1064 self.clock.clone()
1065 }
1066}
1067
1068impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> CuRuntime<CT, CB, P, M, NBCL> {
1069 #[inline]
1071 pub fn clock(&self) -> RobotClock {
1072 self.clock.clone()
1073 }
1074
1075 #[doc(hidden)]
1077 #[inline]
1078 pub fn clock_ref(&self) -> &RobotClock {
1079 &self.clock
1080 }
1081
1082 #[inline]
1084 pub fn subsystem_code(&self) -> u16 {
1085 self.subsystem_code
1086 }
1087
1088 #[inline]
1090 pub fn instance_id(&self) -> u32 {
1091 self.instance_id
1092 }
1093}
1094
1095impl<
1096 'cfg,
1097 CT,
1098 CB,
1099 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1100 M: CuMonitor,
1101 const NBCL: usize,
1102 TI,
1103 BI,
1104 MI,
1105 CLW,
1106 KFW,
1107> CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
1108where
1109 TI: for<'c> Fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>,
1110 BI: Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1111 MI: Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1112 CLW: WriteStream<CopperList<P>> + 'static,
1113 KFW: WriteStream<KeyFrame> + 'static,
1114{
1115 pub fn build(self) -> CuResult<CuRuntime<CT, CB, P, M, NBCL>> {
1116 let Self {
1117 clock,
1118 config,
1119 mission,
1120 subsystem,
1121 instance_id,
1122 resources,
1123 parts,
1124 copperlists_logger,
1125 keyframes_logger,
1126 } = self;
1127 let mut resources =
1128 resources.ok_or_else(|| CuError::from("Resources missing from CuRuntimeBuilder"))?;
1129
1130 let graph = config.get_graph(Some(mission))?;
1131 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1132 .get_all_nodes()
1133 .iter()
1134 .map(|(_, node)| node.get_instance_config())
1135 .collect();
1136
1137 let tasks = (parts.tasks_instanciator)(all_instances_configs, &mut resources)?;
1138
1139 #[cfg(feature = "std")]
1140 let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
1141 #[cfg(not(feature = "std"))]
1142 let execution_probe = RuntimeExecutionProbe::default();
1143 let monitor_metadata = CuMonitoringMetadata::new(
1144 CompactString::from(mission),
1145 parts.monitored_components,
1146 parts.culist_component_mapping,
1147 CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
1148 build_monitor_topology(config, mission)?,
1149 None,
1150 )?
1151 .with_subsystem_id(subsystem.id())
1152 .with_instance_id(instance_id);
1153 #[cfg(feature = "std")]
1154 let monitor_runtime =
1155 CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
1156 #[cfg(not(feature = "std"))]
1157 let monitor_runtime = CuMonitoringRuntime::unavailable();
1158 let monitor = (parts.monitor_instanciator)(config, monitor_metadata, monitor_runtime);
1159 let bridges = (parts.bridges_instanciator)(config, &mut resources)?;
1160
1161 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
1162 Some(logging_config) if logging_config.enable_task_logging => (
1163 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1164 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1165 logging_config.keyframe_interval.unwrap(),
1166 ),
1167 Some(_) => (None, None, 0),
1168 None => (
1169 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1170 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1171 DEFAULT_KEYFRAME_INTERVAL,
1172 ),
1173 };
1174
1175 let copperlists_manager = CopperListsManager::new(copperlists_logger)?;
1176 #[cfg(target_os = "none")]
1177 {
1178 let cl_size = core::mem::size_of::<CopperList<P>>();
1179 let total_bytes = cl_size.saturating_mul(NBCL);
1180 info!(
1181 "CuRuntimeBuilder: copperlists count={} cl_size={} total_bytes={}",
1182 NBCL, cl_size, total_bytes
1183 );
1184 }
1185
1186 let keyframes_manager = KeyFramesManager {
1187 inner: KeyFrame::new(),
1188 logger: keyframes_logger,
1189 keyframe_interval,
1190 last_encoded_bytes: 0,
1191 forced_timestamp: None,
1192 locked: false,
1193 };
1194 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1195 let parallel_rt = ParallelRt::new(parts.parallel_rt_metadata)?;
1196
1197 let runtime_config = config.runtime.clone().unwrap_or_default();
1198 runtime_config.validate()?;
1199
1200 Ok(CuRuntime {
1201 subsystem_code: subsystem.code(),
1202 instance_id,
1203 tasks,
1204 bridges,
1205 resources,
1206 monitor,
1207 execution_probe,
1208 clock,
1209 copperlists_manager,
1210 keyframes_manager,
1211 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1212 parallel_rt,
1213 runtime_config,
1214 })
1215 }
1216}
1217
1218#[derive(Clone, Encode, Decode)]
1222pub struct KeyFrame {
1223 pub culistid: u64,
1225 pub timestamp: CuTime,
1227 pub serialized_tasks: Vec<u8>,
1229}
1230
1231impl KeyFrame {
1232 fn new() -> Self {
1233 KeyFrame {
1234 culistid: 0,
1235 timestamp: CuTime::default(),
1236 serialized_tasks: Vec::new(),
1237 }
1238 }
1239
1240 fn reset(&mut self, culistid: u64, timestamp: CuTime) {
1242 self.culistid = culistid;
1243 self.timestamp = timestamp;
1244 self.serialized_tasks.clear();
1245 }
1246
1247 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
1249 let cfg = bincode::config::standard();
1250 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
1251 BincodeAdapter(task).encode(&mut sizer)?;
1252 let need = sizer.into_writer().bytes_written as usize;
1253
1254 let start = self.serialized_tasks.len();
1255 self.serialized_tasks.resize(start + need, 0);
1256 let mut enc =
1257 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
1258 BincodeAdapter(task).encode(&mut enc)?;
1259 Ok(need)
1260 }
1261}
1262
1263#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1265pub enum RuntimeLifecycleConfigSource {
1266 ProgrammaticOverride,
1267 ExternalFile,
1268 BundledDefault,
1269}
1270
1271#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1273pub struct RuntimeLifecycleStackInfo {
1274 pub app_name: String,
1275 pub app_version: String,
1276 pub git_commit: Option<String>,
1277 pub git_dirty: Option<bool>,
1278 pub subsystem_id: Option<String>,
1279 pub subsystem_code: u16,
1280 pub instance_id: u32,
1281}
1282
1283#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1285pub enum RuntimeLifecycleEvent {
1286 Instantiated {
1287 config_source: RuntimeLifecycleConfigSource,
1288 effective_config_ron: String,
1289 stack: RuntimeLifecycleStackInfo,
1290 },
1291 MissionStarted {
1292 mission: String,
1293 },
1294 MissionStopped {
1295 mission: String,
1296 reason: String,
1299 },
1300 Panic {
1302 message: String,
1303 file: Option<String>,
1304 line: Option<u32>,
1305 column: Option<u32>,
1306 },
1307 ShutdownCompleted,
1308}
1309
1310#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1312pub struct RuntimeLifecycleRecord {
1313 pub timestamp: CuTime,
1314 pub event: RuntimeLifecycleEvent,
1315}
1316
1317impl<
1318 CT,
1319 CB,
1320 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1321 M: CuMonitor,
1322 const NBCL: usize,
1323> CuRuntime<CT, CB, P, M, NBCL>
1324{
1325 #[inline]
1329 pub fn record_execution_marker(&self, marker: ExecutionMarker) {
1330 self.execution_probe.record(marker);
1331 }
1332
1333 #[inline]
1338 pub fn execution_probe_ref(&self) -> &RuntimeExecutionProbe {
1339 #[cfg(feature = "std")]
1340 {
1341 self.execution_probe.as_ref()
1342 }
1343
1344 #[cfg(not(feature = "std"))]
1345 {
1346 &self.execution_probe
1347 }
1348 }
1349}
1350
1351#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1356pub enum CuTaskType {
1357 Source,
1358 Regular,
1359 Sink,
1360}
1361
1362impl From<TaskKind> for CuTaskType {
1363 fn from(value: TaskKind) -> Self {
1364 match value {
1365 TaskKind::Source => CuTaskType::Source,
1366 TaskKind::Regular => CuTaskType::Regular,
1367 TaskKind::Sink => CuTaskType::Sink,
1368 }
1369 }
1370}
1371
1372#[derive(Debug, Clone)]
1373pub struct CuOutputPack {
1374 pub culist_index: u32,
1375 pub msg_types: Vec<String>,
1376}
1377
1378#[derive(Debug, Clone)]
1379pub struct CuInputMsg {
1380 pub culist_index: u32,
1381 pub msg_type: String,
1382 pub src_port: usize,
1383 pub edge_id: usize,
1384 pub connection_order: usize,
1385}
1386
1387pub struct CuExecutionStep {
1389 pub node_id: NodeId,
1391 pub node: Node,
1393 pub task_type: CuTaskType,
1395
1396 pub input_msg_indices_types: Vec<CuInputMsg>,
1398
1399 pub output_msg_pack: Option<CuOutputPack>,
1401}
1402
1403impl Debug for CuExecutionStep {
1404 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1405 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
1406 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
1407 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
1408 f.write_str(
1409 format!(
1410 " input_msg_types: {:?}\n",
1411 self.input_msg_indices_types
1412 )
1413 .as_str(),
1414 )?;
1415 f.write_str(format!(" output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
1416 Ok(())
1417 }
1418}
1419
1420pub struct CuExecutionLoop {
1425 pub steps: Vec<CuExecutionUnit>,
1426 pub loop_count: Option<u32>,
1427}
1428
1429impl Debug for CuExecutionLoop {
1430 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1431 f.write_str("CuExecutionLoop:\n")?;
1432 for step in &self.steps {
1433 match step {
1434 CuExecutionUnit::Step(step) => {
1435 step.fmt(f)?;
1436 }
1437 CuExecutionUnit::Loop(l) => {
1438 l.fmt(f)?;
1439 }
1440 }
1441 }
1442
1443 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
1444 Ok(())
1445 }
1446}
1447
1448#[derive(Debug)]
1450pub enum CuExecutionUnit {
1451 Step(Box<CuExecutionStep>),
1452 Loop(CuExecutionLoop),
1453}
1454
1455fn find_output_pack_from_nodeid(
1456 node_id: NodeId,
1457 steps: &Vec<CuExecutionUnit>,
1458) -> Option<CuOutputPack> {
1459 for step in steps {
1460 match step {
1461 CuExecutionUnit::Loop(loop_unit) => {
1462 if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
1463 return Some(output_pack);
1464 }
1465 }
1466 CuExecutionUnit::Step(step) if step.node_id == node_id => {
1467 return step.output_msg_pack.clone();
1468 }
1469 _ => {}
1470 }
1471 }
1472 None
1473}
1474
1475pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuResult<CuTaskType> {
1476 let node = graph
1477 .get_node(node_id)
1478 .ok_or_else(|| CuError::from(format!("Node id {node_id} not found")))?;
1479
1480 if node.get_flavor() == crate::config::Flavor::Task {
1481 return resolve_task_kind_for_id(graph, node_id).map(Into::into);
1482 }
1483
1484 let has_inputs = !graph.get_dst_edges(node_id)?.is_empty();
1485 let has_outputs = !graph.get_src_edges(node_id)?.is_empty();
1486 Ok(match (has_inputs, has_outputs) {
1487 (false, true) => CuTaskType::Source,
1488 (true, false) => CuTaskType::Sink,
1489 _ => CuTaskType::Regular,
1490 })
1491}
1492
1493fn sort_inputs_by_connection_order(input_msg_indices_types: &mut [CuInputMsg]) {
1498 input_msg_indices_types.sort_by_key(|input| input.connection_order);
1499}
1500
1501fn plan_tasks_tree_branch(
1503 graph: &CuGraph,
1504 mut next_culist_output_index: u32,
1505 starting_point: NodeId,
1506 plan: &mut Vec<CuExecutionUnit>,
1507) -> CuResult<(u32, bool)> {
1508 #[cfg(all(feature = "std", feature = "macro_debug"))]
1509 eprintln!("-- starting branch from node {starting_point}");
1510
1511 let mut handled = false;
1512
1513 for id in graph.bfs_nodes(starting_point) {
1514 let node_ref = graph.get_node(id).unwrap();
1515 #[cfg(all(feature = "std", feature = "macro_debug"))]
1516 eprintln!(" Visiting node: {node_ref:?}");
1517
1518 let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
1519 let output_msg_pack: Option<CuOutputPack>;
1520 let task_type = find_task_type_for_id(graph, id)?;
1521
1522 match task_type {
1523 CuTaskType::Source => {
1524 #[cfg(all(feature = "std", feature = "macro_debug"))]
1525 eprintln!(" → Source node, assign output index {next_culist_output_index}");
1526 let msg_types = graph.get_node_output_msg_types_by_id(id)?;
1527 if msg_types.is_empty() {
1528 return Err(CuError::from(format!(
1529 "Source node '{}' has no declared outputs",
1530 node_ref.get_id()
1531 )));
1532 }
1533 output_msg_pack = Some(CuOutputPack {
1534 culist_index: next_culist_output_index,
1535 msg_types,
1536 });
1537 next_culist_output_index += 1;
1538 }
1539 CuTaskType::Sink => {
1540 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1541 edge_ids.sort();
1542 #[cfg(all(feature = "std", feature = "macro_debug"))]
1543 eprintln!(" → Sink with incoming edges: {edge_ids:?}");
1544 for edge_id in edge_ids {
1545 let edge = graph
1546 .edge(edge_id)
1547 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1548 let pid = graph
1549 .get_node_id_by_name(edge.src.as_str())
1550 .unwrap_or_else(|| {
1551 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1552 });
1553 let output_pack = find_output_pack_from_nodeid(pid, plan);
1554 if let Some(output_pack) = output_pack {
1555 #[cfg(all(feature = "std", feature = "macro_debug"))]
1556 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
1557 let msg_type = edge.msg.as_str();
1558 let src_port = output_pack
1559 .msg_types
1560 .iter()
1561 .position(|msg| msg == msg_type)
1562 .unwrap_or_else(|| {
1563 panic!(
1564 "Missing output port for message type '{msg_type}' on node {pid}"
1565 )
1566 });
1567 input_msg_indices_types.push(CuInputMsg {
1568 culist_index: output_pack.culist_index,
1569 msg_type: msg_type.to_string(),
1570 src_port,
1571 edge_id,
1572 connection_order: edge.order,
1573 });
1574 } else {
1575 #[cfg(all(feature = "std", feature = "macro_debug"))]
1576 eprintln!(" ✗ Input from {pid} not ready, returning");
1577 return Ok((next_culist_output_index, handled));
1578 }
1579 }
1580 output_msg_pack = Some(CuOutputPack {
1581 culist_index: next_culist_output_index,
1582 msg_types: Vec::from(["()".to_string()]),
1583 });
1584 next_culist_output_index += 1;
1585 }
1586 CuTaskType::Regular => {
1587 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1588 edge_ids.sort();
1589 #[cfg(all(feature = "std", feature = "macro_debug"))]
1590 eprintln!(" → Regular task with incoming edges: {edge_ids:?}");
1591 for edge_id in edge_ids {
1592 let edge = graph
1593 .edge(edge_id)
1594 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1595 let pid = graph
1596 .get_node_id_by_name(edge.src.as_str())
1597 .unwrap_or_else(|| {
1598 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1599 });
1600 let output_pack = find_output_pack_from_nodeid(pid, plan);
1601 if let Some(output_pack) = output_pack {
1602 #[cfg(all(feature = "std", feature = "macro_debug"))]
1603 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
1604 let msg_type = edge.msg.as_str();
1605 let src_port = output_pack
1606 .msg_types
1607 .iter()
1608 .position(|msg| msg == msg_type)
1609 .unwrap_or_else(|| {
1610 panic!(
1611 "Missing output port for message type '{msg_type}' on node {pid}"
1612 )
1613 });
1614 input_msg_indices_types.push(CuInputMsg {
1615 culist_index: output_pack.culist_index,
1616 msg_type: msg_type.to_string(),
1617 src_port,
1618 edge_id,
1619 connection_order: edge.order,
1620 });
1621 } else {
1622 #[cfg(all(feature = "std", feature = "macro_debug"))]
1623 eprintln!(" ✗ Input from {pid} not ready, returning");
1624 return Ok((next_culist_output_index, handled));
1625 }
1626 }
1627 let msg_types = graph.get_node_output_msg_types_by_id(id)?;
1628 if msg_types.is_empty() {
1629 return Err(CuError::from(format!(
1630 "Regular node '{}' has no declared outputs",
1631 node_ref.get_id()
1632 )));
1633 }
1634 output_msg_pack = Some(CuOutputPack {
1635 culist_index: next_culist_output_index,
1636 msg_types,
1637 });
1638 next_culist_output_index += 1;
1639 }
1640 }
1641
1642 sort_inputs_by_connection_order(&mut input_msg_indices_types);
1643
1644 if let Some(pos) = plan
1645 .iter()
1646 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
1647 {
1648 #[cfg(all(feature = "std", feature = "macro_debug"))]
1649 eprintln!(" → Already in plan, modifying existing step");
1650 let mut step = plan.remove(pos);
1651 if let CuExecutionUnit::Step(ref mut s) = step {
1652 s.input_msg_indices_types = input_msg_indices_types;
1653 }
1654 plan.push(step);
1655 } else {
1656 #[cfg(all(feature = "std", feature = "macro_debug"))]
1657 eprintln!(" → New step added to plan");
1658 let step = CuExecutionStep {
1659 node_id: id,
1660 node: node_ref.clone(),
1661 task_type,
1662 input_msg_indices_types,
1663 output_msg_pack,
1664 };
1665 plan.push(CuExecutionUnit::Step(Box::new(step)));
1666 }
1667
1668 handled = true;
1669 }
1670
1671 #[cfg(all(feature = "std", feature = "macro_debug"))]
1672 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
1673 Ok((next_culist_output_index, handled))
1674}
1675
1676pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1679 #[cfg(all(feature = "std", feature = "macro_debug"))]
1680 eprintln!("[runtime plan]");
1681 let mut plan = Vec::new();
1682 let mut next_culist_output_index = 0u32;
1683
1684 let mut queue: VecDeque<NodeId> = VecDeque::new();
1685 for node_id in graph.node_ids() {
1686 if find_task_type_for_id(graph, node_id)? == CuTaskType::Source {
1687 queue.push_back(node_id);
1688 }
1689 }
1690
1691 #[cfg(all(feature = "std", feature = "macro_debug"))]
1692 eprintln!("Initial source nodes: {queue:?}");
1693
1694 while let Some(start_node) = queue.pop_front() {
1695 #[cfg(all(feature = "std", feature = "macro_debug"))]
1696 eprintln!("→ Starting BFS from source {start_node}");
1697 for node_id in graph.bfs_nodes(start_node) {
1698 let already_in_plan = plan
1699 .iter()
1700 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1701 if already_in_plan {
1702 #[cfg(all(feature = "std", feature = "macro_debug"))]
1703 eprintln!(" → Node {node_id} already planned, skipping");
1704 continue;
1705 }
1706
1707 #[cfg(all(feature = "std", feature = "macro_debug"))]
1708 eprintln!(" Planning from node {node_id}");
1709 let (new_index, handled) =
1710 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan)?;
1711 next_culist_output_index = new_index;
1712
1713 if !handled {
1714 #[cfg(all(feature = "std", feature = "macro_debug"))]
1715 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1716 continue;
1717 }
1718
1719 #[cfg(all(feature = "std", feature = "macro_debug"))]
1720 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
1721 for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1722 #[cfg(all(feature = "std", feature = "macro_debug"))]
1723 eprintln!(" → Enqueueing neighbor {neighbor}");
1724 queue.push_back(neighbor);
1725 }
1726 }
1727 }
1728
1729 let mut planned_nodes = BTreeSet::new();
1730 for unit in &plan {
1731 if let CuExecutionUnit::Step(step) = unit {
1732 planned_nodes.insert(step.node_id);
1733 }
1734 }
1735
1736 let mut missing = Vec::new();
1737 for node_id in graph.node_ids() {
1738 if !planned_nodes.contains(&node_id) {
1739 if let Some(node) = graph.get_node(node_id) {
1740 missing.push(node.get_id().to_string());
1741 } else {
1742 missing.push(format!("node_id_{node_id}"));
1743 }
1744 }
1745 }
1746
1747 if !missing.is_empty() {
1748 missing.sort();
1749 return Err(CuError::from(format!(
1750 "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1751 missing.join(", ")
1752 )));
1753 }
1754
1755 Ok(CuExecutionLoop {
1756 steps: plan,
1757 loop_count: None,
1758 })
1759}
1760
1761#[cfg(test)]
1763mod tests {
1764 use super::*;
1765 use crate::config::Node;
1766 use crate::context::CuContext;
1767 use crate::cutask::CuSinkTask;
1768 use crate::cutask::{CuSrcTask, Freezable};
1769 use crate::monitoring::NoMonitor;
1770 use crate::reflect::Reflect;
1771 use bincode::Encode;
1772 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1773 use serde_derive::{Deserialize, Serialize};
1774
1775 #[derive(Reflect)]
1776 pub struct TestSource {}
1777
1778 impl Freezable for TestSource {}
1779
1780 impl CuSrcTask for TestSource {
1781 type Resources<'r> = ();
1782 type Output<'m> = ();
1783 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1784 where
1785 Self: Sized,
1786 {
1787 Ok(Self {})
1788 }
1789
1790 fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1791 Ok(())
1792 }
1793 }
1794
1795 #[derive(Reflect)]
1796 pub struct TestSink {}
1797
1798 impl Freezable for TestSink {}
1799
1800 impl CuSinkTask for TestSink {
1801 type Resources<'r> = ();
1802 type Input<'m> = ();
1803
1804 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1805 where
1806 Self: Sized,
1807 {
1808 Ok(Self {})
1809 }
1810
1811 fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1812 Ok(())
1813 }
1814 }
1815
1816 type Tasks = (TestSource, TestSink);
1818 type TestRuntime = CuRuntime<Tasks, (), Msgs, NoMonitor, 2>;
1819 const TEST_NBCL: usize = 2;
1820
1821 #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1822 struct Msgs(());
1823
1824 impl ErasedCuStampedDataSet for Msgs {
1825 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1826 Vec::new()
1827 }
1828 }
1829
1830 impl MatchingTasks for Msgs {
1831 fn get_all_task_ids() -> &'static [&'static str] {
1832 &[]
1833 }
1834 }
1835
1836 impl CuListZeroedInit for Msgs {
1837 fn init_zeroed(&mut self) {}
1838 }
1839
1840 #[cfg(feature = "std")]
1841 fn tasks_instanciator(
1842 all_instances_configs: Vec<Option<&ComponentConfig>>,
1843 _resources: &mut ResourceManager,
1844 ) -> CuResult<Tasks> {
1845 Ok((
1846 TestSource::new(all_instances_configs[0], ())?,
1847 TestSink::new(all_instances_configs[1], ())?,
1848 ))
1849 }
1850
1851 #[cfg(not(feature = "std"))]
1852 fn tasks_instanciator(
1853 all_instances_configs: Vec<Option<&ComponentConfig>>,
1854 _resources: &mut ResourceManager,
1855 ) -> CuResult<Tasks> {
1856 Ok((
1857 TestSource::new(all_instances_configs[0], ())?,
1858 TestSink::new(all_instances_configs[1], ())?,
1859 ))
1860 }
1861
1862 fn monitor_instanciator(
1863 _config: &CuConfig,
1864 metadata: CuMonitoringMetadata,
1865 runtime: CuMonitoringRuntime,
1866 ) -> NoMonitor {
1867 NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
1868 }
1869
1870 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1871 Ok(())
1872 }
1873
1874 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1875 Ok(ResourceManager::new(&[]))
1876 }
1877
1878 #[derive(Debug)]
1879 struct FakeWriter {}
1880
1881 impl<E: Encode> WriteStream<E> for FakeWriter {
1882 fn log(&mut self, _obj: &E) -> CuResult<()> {
1883 Ok(())
1884 }
1885 }
1886
1887 #[test]
1888 fn test_runtime_instantiation() {
1889 let mut config = CuConfig::default();
1890 let graph = config.get_graph_mut(None).unwrap();
1891 graph.add_node(Node::new("a", "TestSource")).unwrap();
1892 graph.add_node(Node::new("b", "TestSink")).unwrap();
1893 graph.connect(0, 1, "()").unwrap();
1894 let runtime: CuResult<TestRuntime> =
1895 CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
1896 RobotClock::default(),
1897 &config,
1898 crate::config::DEFAULT_MISSION_ID,
1899 CuRuntimeParts::new(
1900 tasks_instanciator,
1901 &[],
1902 &[],
1903 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1904 &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
1905 monitor_instanciator,
1906 bridges_instanciator,
1907 ),
1908 FakeWriter {},
1909 FakeWriter {},
1910 )
1911 .try_with_resources_instantiator(resources_instanciator)
1912 .and_then(|builder| builder.build());
1913 assert!(runtime.is_ok());
1914 }
1915
1916 #[test]
1917 fn test_rate_target_period_rejects_zero() {
1918 let err = rate_target_period(0).expect_err("zero rate target should fail");
1919 assert!(
1920 err.to_string()
1921 .contains("Runtime rate target cannot be zero"),
1922 "unexpected error: {err}"
1923 );
1924 }
1925
1926 #[test]
1927 fn test_loop_rate_limiter_advances_to_next_period_when_on_time() {
1928 let (clock, mock) = RobotClock::mock();
1929 let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
1930 assert_eq!(limiter.next_deadline(), CuTime::from_nanos(10_000_000));
1931
1932 mock.set_value(10_000_000);
1933 limiter.mark_tick(&clock);
1934
1935 assert_eq!(limiter.next_deadline(), CuTime::from_nanos(20_000_000));
1936 }
1937
1938 #[test]
1939 fn test_loop_rate_limiter_skips_missed_periods_without_resetting_phase() {
1940 let (clock, mock) = RobotClock::mock();
1941 let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
1942
1943 mock.set_value(35_000_000);
1944 limiter.mark_tick(&clock);
1945
1946 assert_eq!(limiter.next_deadline(), CuTime::from_nanos(40_000_000));
1947 }
1948
1949 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
1950 #[test]
1951 fn test_loop_rate_limiter_spin_window_is_fixed_scheduler_window() {
1952 let (clock, _) = RobotClock::mock();
1953 let limiter = LoopRateLimiter::from_rate_target_hz(1_000, &clock).unwrap();
1954 assert_eq!(limiter.spin_window(), CuDuration::from(200_000));
1955
1956 let fast = LoopRateLimiter::from_rate_target_hz(10_000, &clock).unwrap();
1957 assert_eq!(fast.spin_window(), CuDuration::from(200_000));
1958 }
1959
1960 #[cfg(not(feature = "async-cl-io"))]
1961 #[test]
1962 fn test_copperlists_manager_lifecycle() {
1963 let mut config = CuConfig::default();
1964 let graph = config.get_graph_mut(None).unwrap();
1965 graph.add_node(Node::new("a", "TestSource")).unwrap();
1966 graph.add_node(Node::new("b", "TestSink")).unwrap();
1967 graph.connect(0, 1, "()").unwrap();
1968
1969 let mut runtime: TestRuntime =
1970 CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
1971 RobotClock::default(),
1972 &config,
1973 crate::config::DEFAULT_MISSION_ID,
1974 CuRuntimeParts::new(
1975 tasks_instanciator,
1976 &[],
1977 &[],
1978 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1979 &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
1980 monitor_instanciator,
1981 bridges_instanciator,
1982 ),
1983 FakeWriter {},
1984 FakeWriter {},
1985 )
1986 .try_with_resources_instantiator(resources_instanciator)
1987 .and_then(|builder| builder.build())
1988 .unwrap();
1989
1990 {
1992 let copperlists = &mut runtime.copperlists_manager;
1993 let culist0 = copperlists
1994 .create()
1995 .expect("Ran out of space for copper lists");
1996 let id = culist0.id;
1997 assert_eq!(id, 0);
1998 culist0.change_state(CopperListState::Processing);
1999 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2000 }
2001
2002 {
2003 let copperlists = &mut runtime.copperlists_manager;
2004 let culist1 = copperlists
2005 .create()
2006 .expect("Ran out of space for copper lists");
2007 let id = culist1.id;
2008 assert_eq!(id, 1);
2009 culist1.change_state(CopperListState::Processing);
2010 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2011 }
2012
2013 {
2014 let copperlists = &mut runtime.copperlists_manager;
2015 let culist2 = copperlists.create();
2016 assert!(culist2.is_err());
2017 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2018 let _ = copperlists.end_of_processing(1);
2020 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2021 }
2022
2023 {
2025 let copperlists = &mut runtime.copperlists_manager;
2026 let culist2 = copperlists
2027 .create()
2028 .expect("Ran out of space for copper lists");
2029 let id = culist2.id;
2030 assert_eq!(id, 2);
2031 culist2.change_state(CopperListState::Processing);
2032 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2033 let _ = copperlists.end_of_processing(0);
2035 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2037
2038 let _ = copperlists.end_of_processing(2);
2040 assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2043 }
2044 }
2045
2046 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2047 #[derive(Debug, Default)]
2048 struct RecordingWriter {
2049 ids: Arc<Mutex<Vec<u64>>>,
2050 }
2051
2052 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2053 impl WriteStream<CopperList<Msgs>> for RecordingWriter {
2054 fn log(&mut self, culist: &CopperList<Msgs>) -> CuResult<()> {
2055 self.ids.lock().unwrap().push(culist.id);
2056 std::thread::sleep(std::time::Duration::from_millis(2));
2057 Ok(())
2058 }
2059 }
2060
2061 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2062 #[test]
2063 fn test_async_copperlists_manager_flushes_in_order() {
2064 let ids = Arc::new(Mutex::new(Vec::new()));
2065 let mut copperlists = CopperListsManager::<Msgs, 4>::new(Some(Box::new(RecordingWriter {
2066 ids: ids.clone(),
2067 })))
2068 .unwrap();
2069
2070 for expected_id in 0..4 {
2071 let culist = copperlists.create().unwrap();
2072 assert_eq!(culist.id, expected_id);
2073 culist.change_state(CopperListState::Processing);
2074 copperlists.end_of_processing(expected_id).unwrap();
2075 }
2076
2077 copperlists.finish_pending().unwrap();
2078 assert_eq!(copperlists.available_copper_lists().unwrap(), 4);
2079 assert_eq!(*ids.lock().unwrap(), vec![0, 1, 2, 3]);
2080 }
2081
2082 #[test]
2083 fn test_runtime_task_input_order() {
2084 let mut config = CuConfig::default();
2085 let graph = config.get_graph_mut(None).unwrap();
2086 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
2087 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
2088 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
2089
2090 assert_eq!(src1_id, 0);
2091 assert_eq!(src2_id, 1);
2092
2093 let src1_type = "src1_type";
2095 let src2_type = "src2_type";
2096 graph.connect(src2_id, sink_id, src2_type).unwrap();
2097 graph.connect(src1_id, sink_id, src1_type).unwrap();
2098
2099 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
2100 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
2101 assert_eq!(src1_edge_id, 1);
2104 assert_eq!(src2_edge_id, 0);
2105
2106 let runtime = compute_runtime_plan(graph).unwrap();
2107 let sink_step = runtime
2108 .steps
2109 .iter()
2110 .find_map(|step| match step {
2111 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
2112 _ => None,
2113 })
2114 .unwrap();
2115
2116 assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
2119 assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
2120 }
2121
2122 #[test]
2123 fn test_runtime_output_ports_unique_ordered() {
2124 let mut config = CuConfig::default();
2125 let graph = config.get_graph_mut(None).unwrap();
2126 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2127 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2128 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2129 let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
2130 let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
2131
2132 graph.connect(src_id, dst_a_id, "msg::A").unwrap();
2133 graph.connect(src_id, dst_b_id, "msg::B").unwrap();
2134 graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
2135 graph.connect(src_id, dst_c_id, "msg::C").unwrap();
2136
2137 let runtime = compute_runtime_plan(graph).unwrap();
2138 let src_step = runtime
2139 .steps
2140 .iter()
2141 .find_map(|step| match step {
2142 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2143 _ => None,
2144 })
2145 .unwrap();
2146
2147 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2148 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
2149
2150 let dst_a_step = runtime
2151 .steps
2152 .iter()
2153 .find_map(|step| match step {
2154 CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
2155 _ => None,
2156 })
2157 .unwrap();
2158 let dst_b_step = runtime
2159 .steps
2160 .iter()
2161 .find_map(|step| match step {
2162 CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
2163 _ => None,
2164 })
2165 .unwrap();
2166 let dst_a2_step = runtime
2167 .steps
2168 .iter()
2169 .find_map(|step| match step {
2170 CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
2171 _ => None,
2172 })
2173 .unwrap();
2174 let dst_c_step = runtime
2175 .steps
2176 .iter()
2177 .find_map(|step| match step {
2178 CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
2179 _ => None,
2180 })
2181 .unwrap();
2182
2183 assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
2184 assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
2185 assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
2186 assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
2187 }
2188
2189 #[test]
2190 fn test_runtime_output_ports_fanout_single() {
2191 let mut config = CuConfig::default();
2192 let graph = config.get_graph_mut(None).unwrap();
2193 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2194 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2195 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2196
2197 graph.connect(src_id, dst_a_id, "i32").unwrap();
2198 graph.connect(src_id, dst_b_id, "i32").unwrap();
2199
2200 let runtime = compute_runtime_plan(graph).unwrap();
2201 let src_step = runtime
2202 .steps
2203 .iter()
2204 .find_map(|step| match step {
2205 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2206 _ => None,
2207 })
2208 .unwrap();
2209
2210 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2211 assert_eq!(output_pack.msg_types, vec!["i32"]);
2212 }
2213
2214 #[test]
2215 fn test_runtime_output_ports_include_nc_outputs() {
2216 let mut config = CuConfig::default();
2217 let graph = config.get_graph_mut(None).unwrap();
2218 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2219 let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
2220 graph.connect(src_id, dst_id, "msg::A").unwrap();
2221 graph
2222 .get_node_mut(src_id)
2223 .expect("missing source node")
2224 .add_nc_output("msg::B", usize::MAX);
2225
2226 let runtime = compute_runtime_plan(graph).unwrap();
2227 let src_step = runtime
2228 .steps
2229 .iter()
2230 .find_map(|step| match step {
2231 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2232 _ => None,
2233 })
2234 .unwrap();
2235 let dst_step = runtime
2236 .steps
2237 .iter()
2238 .find_map(|step| match step {
2239 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2240 _ => None,
2241 })
2242 .unwrap();
2243
2244 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2245 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2246 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
2247 }
2248
2249 #[test]
2250 fn test_runtime_plan_infers_regular_task_when_outputs_are_nc_only() {
2251 let txt = r#"(
2252 tasks: [
2253 (id: "src", type: "a"),
2254 (id: "regular", type: "b"),
2255 ],
2256 cnx: [
2257 (src: "src", dst: "regular", msg: "msg::A"),
2258 (src: "regular", dst: "__nc__", msg: "msg::B"),
2259 ]
2260 )"#;
2261 let config = CuConfig::deserialize_ron(txt).unwrap();
2262 let graph = config.get_graph(None).unwrap();
2263 let regular_id = graph.get_node_id_by_name("regular").unwrap();
2264
2265 let runtime = compute_runtime_plan(graph).unwrap();
2266 let regular_step = runtime
2267 .steps
2268 .iter()
2269 .find_map(|step| match step {
2270 CuExecutionUnit::Step(step) if step.node_id == regular_id => Some(step),
2271 _ => None,
2272 })
2273 .unwrap();
2274
2275 assert_eq!(regular_step.task_type, CuTaskType::Regular);
2276 assert_eq!(
2277 regular_step.output_msg_pack.as_ref().unwrap().msg_types,
2278 vec!["msg::B"]
2279 );
2280 }
2281
2282 #[test]
2283 fn test_runtime_output_ports_respect_connection_order_with_nc() {
2284 let txt = r#"(
2285 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2286 cnx: [
2287 (src: "src", dst: "__nc__", msg: "msg::A"),
2288 (src: "src", dst: "sink", msg: "msg::B"),
2289 ]
2290 )"#;
2291 let config = CuConfig::deserialize_ron(txt).unwrap();
2292 let graph = config.get_graph(None).unwrap();
2293 let src_id = graph.get_node_id_by_name("src").unwrap();
2294 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2295
2296 let runtime = compute_runtime_plan(graph).unwrap();
2297 let src_step = runtime
2298 .steps
2299 .iter()
2300 .find_map(|step| match step {
2301 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2302 _ => None,
2303 })
2304 .unwrap();
2305 let dst_step = runtime
2306 .steps
2307 .iter()
2308 .find_map(|step| match step {
2309 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2310 _ => None,
2311 })
2312 .unwrap();
2313
2314 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2315 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2316 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2317 }
2318
2319 #[cfg(feature = "std")]
2320 #[test]
2321 fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
2322 let txt = r#"(
2323 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2324 cnx: [
2325 (src: "src", dst: "__nc__", msg: "msg::A"),
2326 (src: "src", dst: "sink", msg: "msg::B"),
2327 ]
2328 )"#;
2329 let tmp = tempfile::NamedTempFile::new().unwrap();
2330 std::fs::write(tmp.path(), txt).unwrap();
2331 let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
2332 let graph = config.get_graph(None).unwrap();
2333 let src_id = graph.get_node_id_by_name("src").unwrap();
2334 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2335
2336 let runtime = compute_runtime_plan(graph).unwrap();
2337 let src_step = runtime
2338 .steps
2339 .iter()
2340 .find_map(|step| match step {
2341 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2342 _ => None,
2343 })
2344 .unwrap();
2345 let dst_step = runtime
2346 .steps
2347 .iter()
2348 .find_map(|step| match step {
2349 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2350 _ => None,
2351 })
2352 .unwrap();
2353
2354 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2355 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2356 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2357 }
2358
2359 #[test]
2360 fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
2361 let txt = r#"(
2362 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2363 cnx: [
2364 (src: "src", dst: "__nc__", msg: "i32"),
2365 (src: "src", dst: "sink", msg: "bool"),
2366 ]
2367 )"#;
2368 let config = CuConfig::deserialize_ron(txt).unwrap();
2369 let graph = config.get_graph(None).unwrap();
2370 let src_id = graph.get_node_id_by_name("src").unwrap();
2371 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2372
2373 let runtime = compute_runtime_plan(graph).unwrap();
2374 let src_step = runtime
2375 .steps
2376 .iter()
2377 .find_map(|step| match step {
2378 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2379 _ => None,
2380 })
2381 .unwrap();
2382 let dst_step = runtime
2383 .steps
2384 .iter()
2385 .find_map(|step| match step {
2386 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2387 _ => None,
2388 })
2389 .unwrap();
2390
2391 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2392 assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
2393 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2394 }
2395
2396 #[test]
2397 fn test_runtime_plan_diamond_case1() {
2398 let mut config = CuConfig::default();
2400 let graph = config.get_graph_mut(None).unwrap();
2401 let cam0_id = graph
2402 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2403 .unwrap();
2404 let inf0_id = graph
2405 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2406 .unwrap();
2407 let broadcast_id = graph
2408 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2409 .unwrap();
2410
2411 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2413 graph.connect(cam0_id, inf0_id, "i32").unwrap();
2414 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2415
2416 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2417 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
2418
2419 assert_eq!(edge_cam0_to_inf0, 0);
2420 assert_eq!(edge_cam0_to_broadcast, 1);
2421
2422 let runtime = compute_runtime_plan(graph).unwrap();
2423 let broadcast_step = runtime
2424 .steps
2425 .iter()
2426 .find_map(|step| match step {
2427 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2428 _ => None,
2429 })
2430 .unwrap();
2431
2432 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2433 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2434 }
2435
2436 #[test]
2437 fn test_runtime_plan_diamond_case2() {
2438 let mut config = CuConfig::default();
2440 let graph = config.get_graph_mut(None).unwrap();
2441 let cam0_id = graph
2442 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2443 .unwrap();
2444 let inf0_id = graph
2445 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2446 .unwrap();
2447 let broadcast_id = graph
2448 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2449 .unwrap();
2450
2451 graph.connect(cam0_id, inf0_id, "i32").unwrap();
2453 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2454 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2455
2456 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2457 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
2458
2459 assert_eq!(edge_cam0_to_broadcast, 0);
2460 assert_eq!(edge_cam0_to_inf0, 1);
2461
2462 let runtime = compute_runtime_plan(graph).unwrap();
2463 let broadcast_step = runtime
2464 .steps
2465 .iter()
2466 .find_map(|step| match step {
2467 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2468 _ => None,
2469 })
2470 .unwrap();
2471
2472 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2473 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2474 }
2475}