1use crate::app::Subsystem;
6use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node, TaskKind};
7use crate::config::{
8 CuConfig, CuGraph, MAX_RATE_TARGET_HZ, NodeId, RuntimeConfig, resolve_task_kind_for_id,
9};
10use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
11use crate::cutask::{BincodeAdapter, Freezable};
12#[cfg(feature = "std")]
13use crate::monitoring::ExecutionProbeHandle;
14#[cfg(feature = "std")]
15use crate::monitoring::MonitorExecutionProbe;
16use crate::monitoring::{
17 ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
18 ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
19 take_last_completed_handle_bytes,
20};
21#[cfg(all(feature = "std", feature = "parallel-rt"))]
22use crate::parallel_rt::{ParallelRt, ParallelRtMetadata};
23use crate::resource::ResourceManager;
24use compact_str::CompactString;
25use cu29_clock::{ClockProvider, CuDuration, CuTime, RobotClock};
26use cu29_traits::CuResult;
27use cu29_traits::WriteStream;
28use cu29_traits::{CopperListTuple, CuError};
29
30#[cfg(target_os = "none")]
31#[allow(unused_imports)]
32use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
33#[cfg(target_os = "none")]
34#[allow(unused_imports)]
35use cu29_log_derive::info;
36#[cfg(target_os = "none")]
37#[allow(unused_imports)]
38use cu29_log_runtime::log;
39#[cfg(all(target_os = "none", debug_assertions))]
40#[allow(unused_imports)]
41use cu29_log_runtime::log_debug_mode;
42#[cfg(target_os = "none")]
43#[allow(unused_imports)]
44use cu29_value::to_value;
45
46#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
47use alloc::alloc::{alloc_zeroed, handle_alloc_error};
48use alloc::boxed::Box;
49use alloc::collections::{BTreeSet, VecDeque};
50use alloc::format;
51use alloc::string::{String, ToString};
52use alloc::vec::Vec;
53use bincode::enc::EncoderImpl;
54use bincode::enc::write::{SizeWriter, SliceWriter};
55use bincode::error::EncodeError;
56use bincode::{Decode, Encode};
57#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
58use core::alloc::Layout;
59use core::fmt::Result as FmtResult;
60use core::fmt::{Debug, Formatter};
61use core::marker::PhantomData;
62
63#[cfg(all(feature = "std", feature = "async-cl-io"))]
64use std::sync::mpsc::{Receiver, SyncSender, TryRecvError, sync_channel};
65#[cfg(all(feature = "std", feature = "async-cl-io"))]
66use std::thread::JoinHandle;
67
68#[doc(hidden)]
69pub type TasksInstantiator<CT> =
70 for<'c> fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>;
71#[doc(hidden)]
72pub type BridgesInstantiator<CB> = fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>;
73#[doc(hidden)]
74pub type MonitorInstantiator<M> = fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M;
75
76#[doc(hidden)]
77pub struct CuRuntimeParts<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI> {
78 pub tasks_instanciator: TI,
79 pub monitored_components: &'static [MonitorComponentMetadata],
80 pub culist_component_mapping: &'static [ComponentId],
81 #[cfg(all(feature = "std", feature = "parallel-rt"))]
82 pub parallel_rt_metadata: &'static ParallelRtMetadata,
83 pub monitor_instanciator: MI,
84 pub bridges_instanciator: BI,
85 _payload: PhantomData<(CT, CB, P, M, [(); NBCL])>,
86}
87
88impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI>
89 CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>
90{
91 pub const fn new(
92 tasks_instanciator: TI,
93 monitored_components: &'static [MonitorComponentMetadata],
94 culist_component_mapping: &'static [ComponentId],
95 #[cfg(all(feature = "std", feature = "parallel-rt"))]
96 parallel_rt_metadata: &'static ParallelRtMetadata,
97 monitor_instanciator: MI,
98 bridges_instanciator: BI,
99 ) -> Self {
100 Self {
101 tasks_instanciator,
102 monitored_components,
103 culist_component_mapping,
104 #[cfg(all(feature = "std", feature = "parallel-rt"))]
105 parallel_rt_metadata,
106 monitor_instanciator,
107 bridges_instanciator,
108 _payload: PhantomData,
109 }
110 }
111}
112
113#[doc(hidden)]
114pub struct CuRuntimeBuilder<
115 'cfg,
116 CT,
117 CB,
118 P: CopperListTuple,
119 M: CuMonitor,
120 const NBCL: usize,
121 TI,
122 BI,
123 MI,
124 CLW,
125 KFW,
126> {
127 clock: RobotClock,
128 config: &'cfg CuConfig,
129 mission: &'cfg str,
130 subsystem: Subsystem,
131 instance_id: u32,
132 resources: Option<ResourceManager>,
133 parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
134 copperlists_logger: CLW,
135 keyframes_logger: KFW,
136}
137
138impl<'cfg, CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI, CLW, KFW>
139 CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
140{
141 pub fn new(
142 clock: RobotClock,
143 config: &'cfg CuConfig,
144 mission: &'cfg str,
145 parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
146 copperlists_logger: CLW,
147 keyframes_logger: KFW,
148 ) -> Self {
149 Self {
150 clock,
151 config,
152 mission,
153 subsystem: Subsystem::new(None, 0),
154 instance_id: 0,
155 resources: None,
156 parts,
157 copperlists_logger,
158 keyframes_logger,
159 }
160 }
161
162 pub fn with_subsystem(mut self, subsystem: Subsystem) -> Self {
163 self.subsystem = subsystem;
164 self
165 }
166
167 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
168 self.instance_id = instance_id;
169 self
170 }
171
172 pub fn with_resources(mut self, resources: ResourceManager) -> Self {
173 self.resources = Some(resources);
174 self
175 }
176
177 pub fn try_with_resources_instantiator(
178 mut self,
179 resources_instantiator: impl FnOnce(&CuConfig) -> CuResult<ResourceManager>,
180 ) -> CuResult<Self> {
181 self.resources = Some(resources_instantiator(self.config)?);
182 Ok(self)
183 }
184}
185
186#[inline]
197pub fn perf_now(_clock: &RobotClock) -> CuTime {
198 #[cfg(all(feature = "std", feature = "sysclock-perf"))]
199 {
200 static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
201 return PERF_CLOCK.get_or_init(RobotClock::new).now();
202 }
203
204 #[allow(unreachable_code)]
205 _clock.now()
206}
207
208#[cfg(all(feature = "std", feature = "high-precision-limiter"))]
209const HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS: u64 = 200_000;
210
211#[inline]
213pub fn rate_target_period(rate_target_hz: u64) -> CuResult<CuDuration> {
214 if rate_target_hz == 0 {
215 return Err(CuError::from(
216 "Runtime rate target cannot be zero. Set runtime.rate_target_hz to at least 1.",
217 ));
218 }
219
220 if rate_target_hz > MAX_RATE_TARGET_HZ {
221 return Err(CuError::from(format!(
222 "Runtime rate target ({rate_target_hz} Hz) exceeds the supported maximum of {MAX_RATE_TARGET_HZ} Hz."
223 )));
224 }
225
226 Ok(CuDuration::from(MAX_RATE_TARGET_HZ / rate_target_hz))
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq)]
236pub struct LoopRateLimiter {
237 period: CuDuration,
238 next_deadline: CuTime,
239}
240
241impl LoopRateLimiter {
242 #[inline]
243 pub fn from_rate_target_hz(rate_target_hz: u64, clock: &RobotClock) -> CuResult<Self> {
244 let period = rate_target_period(rate_target_hz)?;
245 Ok(Self {
246 period,
247 next_deadline: clock.now() + period,
248 })
249 }
250
251 #[inline]
252 pub fn is_ready(&self, clock: &RobotClock) -> bool {
253 self.remaining(clock).is_none()
254 }
255
256 #[inline]
257 pub fn remaining(&self, clock: &RobotClock) -> Option<CuDuration> {
258 let now = clock.now();
259 if now < self.next_deadline {
260 Some(self.next_deadline - now)
261 } else {
262 None
263 }
264 }
265
266 #[inline]
267 pub fn wait_until_ready(&self, clock: &RobotClock) {
268 let deadline = self.next_deadline;
269 let Some(remaining) = self.remaining(clock) else {
270 return;
271 };
272
273 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
274 {
275 let spin_window = self.spin_window();
276 if remaining > spin_window {
277 std::thread::sleep(std::time::Duration::from(remaining - spin_window));
278 }
279 while clock.now() < deadline {
280 core::hint::spin_loop();
281 }
282 }
283
284 #[cfg(all(feature = "std", not(feature = "high-precision-limiter")))]
285 {
286 let _ = deadline;
287 std::thread::sleep(std::time::Duration::from(remaining));
288 }
289
290 #[cfg(not(feature = "std"))]
291 {
292 let _ = remaining;
293 while clock.now() < deadline {
294 core::hint::spin_loop();
295 }
296 }
297 }
298
299 #[inline]
300 pub fn mark_tick(&mut self, clock: &RobotClock) {
301 self.advance_from(clock.now());
302 }
303
304 #[inline]
305 pub fn limit(&mut self, clock: &RobotClock) {
306 self.wait_until_ready(clock);
307 self.mark_tick(clock);
308 }
309
310 #[inline]
311 fn advance_from(&mut self, now: CuTime) {
312 let steps = if now < self.next_deadline {
313 1
314 } else {
315 (now - self.next_deadline).as_nanos() / self.period.as_nanos() + 1
316 };
317 self.next_deadline += steps * self.period;
318 }
319
320 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
321 #[inline]
322 fn spin_window(&self) -> CuDuration {
323 let _ = self.period;
324 CuDuration::from(HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS)
325 }
326
327 #[cfg(test)]
328 #[inline]
329 fn next_deadline(&self) -> CuTime {
330 self.next_deadline
331 }
332}
333
334#[cfg(all(feature = "std", feature = "async-cl-io"))]
335#[doc(hidden)]
336pub trait AsyncCopperListPayload: Send {}
337
338#[cfg(all(feature = "std", feature = "async-cl-io"))]
339impl<T: Send> AsyncCopperListPayload for T {}
340
341#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
342#[doc(hidden)]
343pub trait AsyncCopperListPayload {}
344
345#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
346impl<T> AsyncCopperListPayload for T {}
347
348#[derive(Clone, Copy, Debug, PartialEq, Eq)]
355#[doc(hidden)]
356pub enum ProcessStepOutcome {
357 Continue,
358 AbortCopperList,
359}
360
361#[doc(hidden)]
363pub type ProcessStepResult = CuResult<ProcessStepOutcome>;
364
365#[cfg(feature = "remote-debug")]
366fn encode_completed_copperlist_snapshot<P: CopperListTuple>(
367 cl: &CopperList<P>,
368) -> CuResult<Vec<u8>> {
369 bincode::encode_to_vec(cl, bincode::config::standard())
370 .map_err(|e| CuError::new_with_cause("Failed to encode completed CopperList snapshot", e))
371}
372
373#[doc(hidden)]
375pub struct SyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
376 inner: CuListsManager<P, NBCL>,
377 logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
379 #[cfg(feature = "remote-debug")]
381 last_completed_encoded: Option<Vec<u8>>,
382 pub last_encoded_bytes: u64,
384 pub last_handle_bytes: u64,
386}
387
388impl<P: CopperListTuple + Default, const NBCL: usize> SyncCopperListsManager<P, NBCL> {
389 pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
390 where
391 P: CuListZeroedInit,
392 {
393 Ok(Self {
394 inner: CuListsManager::new(),
395 logger,
396 #[cfg(feature = "remote-debug")]
397 last_completed_encoded: None,
398 last_encoded_bytes: 0,
399 last_handle_bytes: 0,
400 })
401 }
402
403 pub fn next_cl_id(&self) -> u64 {
404 self.inner.next_cl_id()
405 }
406
407 pub fn last_cl_id(&self) -> u64 {
408 self.inner.last_cl_id()
409 }
410
411 pub fn peek(&self) -> Option<&CopperList<P>> {
412 self.inner.peek()
413 }
414
415 #[cfg(feature = "remote-debug")]
416 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
417 self.last_completed_encoded.as_deref()
418 }
419
420 #[cfg(not(feature = "remote-debug"))]
421 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
422 None
423 }
424
425 #[cfg(feature = "remote-debug")]
426 pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
427 self.last_completed_encoded = snapshot;
428 }
429
430 #[cfg(not(feature = "remote-debug"))]
431 pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
432
433 pub fn create(&mut self) -> CuResult<&mut CopperList<P>>
434 where
435 P: CuListZeroedInit,
436 {
437 self.inner
438 .create()
439 .ok_or_else(|| CuError::from("Ran out of space for copper lists"))
440 }
441
442 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
443 #[cfg(debug_assertions)]
444 self.debug_assert_end_of_processing_target(culistid);
445
446 let mut is_top = true;
447 let mut nb_done = 0;
448 self.last_encoded_bytes = 0;
449 self.last_handle_bytes = 0;
450 #[cfg(feature = "remote-debug")]
451 let last_completed_encoded = &mut self.last_completed_encoded;
452 for cl in self.inner.iter_mut() {
453 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
454 cl.change_state(CopperListState::DoneProcessing);
455 #[cfg(feature = "remote-debug")]
456 {
457 *last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
458 }
459 }
460 if is_top && cl.get_state() == CopperListState::DoneProcessing {
461 if let Some(logger) = &mut self.logger {
462 cl.change_state(CopperListState::BeingSerialized);
463 logger.log(cl)?;
464 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
465 self.last_handle_bytes = take_last_completed_handle_bytes();
466 }
467 cl.change_state(CopperListState::Free);
468 nb_done += 1;
469 } else {
470 is_top = false;
471 }
472 }
473 for _ in 0..nb_done {
474 let _ = self.inner.pop();
475 }
476 Ok(())
477 }
478
479 pub fn finish_pending(&mut self) -> CuResult<()> {
480 Ok(())
481 }
482
483 pub fn available_copper_lists(&mut self) -> CuResult<usize> {
484 Ok(NBCL - self.inner.len())
485 }
486
487 #[cfg(feature = "std")]
488 pub fn end_of_processing_boxed(
489 &mut self,
490 mut culist: Box<CopperList<P>>,
491 ) -> CuResult<OwnedCopperListSubmission<P>> {
492 #[cfg(debug_assertions)]
493 debug_assert_processing_completion_state(culist.as_ref(), "sync boxed end_of_processing");
494
495 culist.change_state(CopperListState::DoneProcessing);
496 self.last_encoded_bytes = 0;
497 self.last_handle_bytes = 0;
498 if let Some(logger) = &mut self.logger {
499 culist.change_state(CopperListState::BeingSerialized);
500 logger.log(&culist)?;
501 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
502 self.last_handle_bytes = take_last_completed_handle_bytes();
503 }
504 culist.change_state(CopperListState::Free);
505 Ok(OwnedCopperListSubmission::Recycled(culist))
506 }
507
508 #[cfg(feature = "std")]
509 pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
510 Ok(None)
511 }
512
513 #[cfg(feature = "std")]
514 pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
515 Err(CuError::from(
516 "Synchronous CopperList I/O cannot block waiting for boxed completions",
517 ))
518 }
519
520 #[cfg(feature = "std")]
521 pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
522 Ok(Vec::new())
523 }
524
525 #[cfg(debug_assertions)]
526 fn debug_assert_end_of_processing_target(&self, culistid: u64) {
527 let mut matches = 0usize;
528 let mut state = None;
529 for cl in self.inner.iter() {
530 if cl.id == culistid {
531 matches += 1;
532 state = Some(cl.get_state());
533 }
534 }
535
536 assert_eq!(
537 matches, 1,
538 "sync end_of_processing expected exactly one active CopperList #{culistid}, found {matches}"
539 );
540 assert_eq!(
541 state,
542 Some(CopperListState::Processing),
543 "sync end_of_processing expected CopperList #{culistid} to be Processing, found {:?}",
544 state
545 );
546 }
547}
548
549#[cfg(feature = "std")]
551#[doc(hidden)]
552pub enum OwnedCopperListSubmission<P: CopperListTuple> {
553 Recycled(Box<CopperList<P>>),
555 Pending,
557}
558
559#[cfg(all(feature = "std", feature = "async-cl-io"))]
560struct AsyncCopperListCompletion<P: CopperListTuple> {
561 culist: Box<CopperList<P>>,
562 log_result: CuResult<(u64, u64)>,
563}
564
565#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
566fn allocate_zeroed_copperlist<P>() -> Box<CopperList<P>>
567where
568 P: CopperListTuple + CuListZeroedInit,
569{
570 let mut culist = unsafe {
572 let layout = Layout::new::<CopperList<P>>();
573 let ptr = alloc_zeroed(layout) as *mut CopperList<P>;
574 if ptr.is_null() {
575 handle_alloc_error(layout);
576 }
577 Box::from_raw(ptr)
578 };
579 culist.msgs.init_zeroed();
580 culist
581}
582
583#[cfg(all(feature = "std", feature = "parallel-rt"))]
584pub fn allocate_boxed_copperlists<P, const NBCL: usize>() -> Vec<Box<CopperList<P>>>
585where
586 P: CopperListTuple + CuListZeroedInit,
587{
588 let mut free_pool = Vec::with_capacity(NBCL);
589 for _ in 0..NBCL {
590 free_pool.push(allocate_zeroed_copperlist::<P>());
591 }
592 free_pool
593}
594
595#[cfg(all(feature = "std", feature = "async-cl-io"))]
597#[doc(hidden)]
598pub struct AsyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
599 free_pool: Vec<Box<CopperList<P>>>,
600 current: Option<Box<CopperList<P>>>,
601 #[cfg(feature = "remote-debug")]
602 last_completed_encoded: Option<Vec<u8>>,
603 pending_count: usize,
604 next_cl_id: u64,
605 pending_sender: Option<SyncSender<Box<CopperList<P>>>>,
606 completion_receiver: Option<Receiver<AsyncCopperListCompletion<P>>>,
607 worker_handle: Option<JoinHandle<()>>,
608 pub last_encoded_bytes: u64,
610 pub last_handle_bytes: u64,
612}
613
614#[cfg(all(feature = "std", feature = "async-cl-io"))]
615impl<P: CopperListTuple + Default, const NBCL: usize> AsyncCopperListsManager<P, NBCL> {
616 pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
617 where
618 P: CuListZeroedInit + AsyncCopperListPayload + 'static,
619 {
620 let mut free_pool = Vec::with_capacity(NBCL);
621 for _ in 0..NBCL {
622 free_pool.push(allocate_zeroed_copperlist::<P>());
623 }
624
625 let (pending_sender, completion_receiver, worker_handle) = if let Some(mut logger) = logger
626 {
627 let (pending_sender, pending_receiver) = sync_channel::<Box<CopperList<P>>>(NBCL);
628 let (completion_sender, completion_receiver) =
629 sync_channel::<AsyncCopperListCompletion<P>>(NBCL);
630 let worker_handle = std::thread::Builder::new()
631 .name("cu-async-cl-io".to_string())
632 .spawn(move || {
633 while let Ok(mut culist) = pending_receiver.recv() {
634 culist.change_state(CopperListState::BeingSerialized);
635 let log_result = logger.log(&culist).map(|_| {
636 (
637 logger.last_log_bytes().unwrap_or(0) as u64,
638 take_last_completed_handle_bytes(),
639 )
640 });
641 let should_stop = log_result.is_err();
642 if completion_sender
643 .send(AsyncCopperListCompletion { culist, log_result })
644 .is_err()
645 {
646 break;
647 }
648 if should_stop {
649 break;
650 }
651 }
652 })
653 .map_err(|e| {
654 CuError::from("Failed to spawn async CopperList serializer thread")
655 .add_cause(e.to_string().as_str())
656 })?;
657 (
658 Some(pending_sender),
659 Some(completion_receiver),
660 Some(worker_handle),
661 )
662 } else {
663 (None, None, None)
664 };
665
666 Ok(Self {
667 free_pool,
668 current: None,
669 #[cfg(feature = "remote-debug")]
670 last_completed_encoded: None,
671 pending_count: 0,
672 next_cl_id: 0,
673 pending_sender,
674 completion_receiver,
675 worker_handle,
676 last_encoded_bytes: 0,
677 last_handle_bytes: 0,
678 })
679 }
680
681 pub fn next_cl_id(&self) -> u64 {
682 self.next_cl_id
683 }
684
685 pub fn last_cl_id(&self) -> u64 {
686 self.next_cl_id.saturating_sub(1)
687 }
688
689 pub fn peek(&self) -> Option<&CopperList<P>> {
690 self.current.as_deref()
691 }
692
693 #[cfg(feature = "remote-debug")]
694 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
695 self.last_completed_encoded.as_deref()
696 }
697
698 #[cfg(not(feature = "remote-debug"))]
699 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
700 None
701 }
702
703 #[cfg(feature = "remote-debug")]
704 pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
705 self.last_completed_encoded = snapshot;
706 }
707
708 #[cfg(not(feature = "remote-debug"))]
709 pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
710
711 pub fn create(&mut self) -> CuResult<&mut CopperList<P>>
712 where
713 P: CuListZeroedInit,
714 {
715 if self.current.is_some() {
716 return Err(CuError::from(
717 "Attempted to create a CopperList while another one is still active",
718 ));
719 }
720
721 self.reclaim_completed()?;
722 while self.free_pool.is_empty() {
723 self.wait_for_completion()?;
724 }
725
726 let culist = self
727 .free_pool
728 .pop()
729 .ok_or_else(|| CuError::from("Ran out of space for copper lists"))?;
730 self.current = Some(culist);
731
732 let current = self
733 .current
734 .as_mut()
735 .expect("current CopperList is missing");
736 current.reset_for_runtime_use(self.next_cl_id);
737 self.next_cl_id += 1;
738 Ok(current.as_mut())
739 }
740
741 #[cfg(feature = "remote-debug")]
742 fn capture_completed_snapshot(&mut self, cl: &CopperList<P>) -> CuResult<()> {
743 self.last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
744 Ok(())
745 }
746
747 #[cfg(not(feature = "remote-debug"))]
748 fn capture_completed_snapshot(&mut self, _cl: &CopperList<P>) -> CuResult<()> {
749 Ok(())
750 }
751
752 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
753 self.reclaim_completed()?;
754
755 let mut culist = self.current.take().ok_or_else(|| {
756 CuError::from("Attempted to finish processing without an active CopperList")
757 })?;
758
759 if culist.id != culistid {
760 return Err(CuError::from(format!(
761 "Attempted to finish CopperList #{culistid} while CopperList #{} is active",
762 culist.id
763 )));
764 }
765 #[cfg(debug_assertions)]
766 debug_assert_processing_completion_state(culist.as_ref(), "async end_of_processing");
767
768 culist.change_state(CopperListState::DoneProcessing);
769 self.capture_completed_snapshot(&culist)?;
770 self.last_encoded_bytes = 0;
771 self.last_handle_bytes = 0;
772
773 if let Some(pending_sender) = &self.pending_sender {
774 culist.change_state(CopperListState::QueuedForSerialization);
775 pending_sender.send(culist).map_err(|e| {
776 CuError::from("Failed to enqueue CopperList for async serialization")
777 .add_cause(e.to_string().as_str())
778 })?;
779 self.pending_count += 1;
780 self.reclaim_completed()?;
781 } else {
782 culist.change_state(CopperListState::Free);
783 self.free_pool.push(culist);
784 }
785
786 Ok(())
787 }
788
789 pub fn finish_pending(&mut self) -> CuResult<()> {
790 if self.current.is_some() {
791 return Err(CuError::from(
792 "Cannot flush CopperList I/O while a CopperList is still active",
793 ));
794 }
795
796 while self.pending_count > 0 {
797 self.wait_for_completion()?;
798 }
799 Ok(())
800 }
801
802 pub fn available_copper_lists(&mut self) -> CuResult<usize> {
803 self.reclaim_completed()?;
804 Ok(self.free_pool.len())
805 }
806
807 pub fn end_of_processing_boxed(
808 &mut self,
809 mut culist: Box<CopperList<P>>,
810 ) -> CuResult<OwnedCopperListSubmission<P>> {
811 self.reclaim_completed()?;
812 #[cfg(debug_assertions)]
813 debug_assert_processing_completion_state(culist.as_ref(), "async boxed end_of_processing");
814 culist.change_state(CopperListState::DoneProcessing);
815 self.capture_completed_snapshot(&culist)?;
816 self.last_encoded_bytes = 0;
817 self.last_handle_bytes = 0;
818
819 if let Some(pending_sender) = &self.pending_sender {
820 culist.change_state(CopperListState::QueuedForSerialization);
821 pending_sender.send(culist).map_err(|e| {
822 CuError::from("Failed to enqueue CopperList for async serialization")
823 .add_cause(e.to_string().as_str())
824 })?;
825 self.pending_count += 1;
826 self.reclaim_completed()?;
827 Ok(OwnedCopperListSubmission::Pending)
828 } else {
829 culist.change_state(CopperListState::Free);
830 Ok(OwnedCopperListSubmission::Recycled(culist))
831 }
832 }
833
834 pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
835 let recv_result = {
836 let Some(completion_receiver) = self.completion_receiver.as_ref() else {
837 return Ok(None);
838 };
839 completion_receiver.try_recv()
840 };
841 match recv_result {
842 Ok(completion) => self.handle_completion(completion).map(Some),
843 Err(TryRecvError::Empty) => Ok(None),
844 Err(TryRecvError::Disconnected) => Err(CuError::from(
845 "Async CopperList serializer thread disconnected unexpectedly",
846 )),
847 }
848 }
849
850 pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
851 let completion = self
852 .completion_receiver
853 .as_ref()
854 .ok_or_else(|| {
855 CuError::from("No async CopperList serializer is active to return a free slot")
856 })?
857 .recv()
858 .map_err(|e| {
859 CuError::from("Failed to receive completion from async CopperList serializer")
860 .add_cause(e.to_string().as_str())
861 })?;
862 self.handle_completion(completion)
863 }
864
865 pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
866 let mut reclaimed = Vec::with_capacity(self.pending_count);
867 if self.current.is_some() {
868 return Err(CuError::from(
869 "Cannot flush CopperList I/O while a CopperList is still active",
870 ));
871 }
872 while self.pending_count > 0 {
873 reclaimed.push(self.wait_reclaim_boxed()?);
874 }
875 Ok(reclaimed)
876 }
877
878 fn reclaim_completed(&mut self) -> CuResult<()> {
879 loop {
880 let Some(culist) = self.try_reclaim_boxed()? else {
881 break;
882 };
883 self.free_pool.push(culist);
884 }
885 Ok(())
886 }
887
888 fn wait_for_completion(&mut self) -> CuResult<()> {
889 let culist = self.wait_reclaim_boxed()?;
890 self.free_pool.push(culist);
891 Ok(())
892 }
893
894 fn handle_completion(
895 &mut self,
896 mut completion: AsyncCopperListCompletion<P>,
897 ) -> CuResult<Box<CopperList<P>>> {
898 self.pending_count = self.pending_count.saturating_sub(1);
899 if let Ok((encoded_bytes, handle_bytes)) = completion.log_result.as_ref() {
900 self.last_encoded_bytes = *encoded_bytes;
901 self.last_handle_bytes = *handle_bytes;
902 }
903 completion.culist.change_state(CopperListState::Free);
904 completion.log_result?;
905 Ok(completion.culist)
906 }
907
908 fn shutdown_worker(&mut self) -> CuResult<()> {
909 self.finish_pending()?;
910 self.pending_sender.take();
911 if let Some(worker_handle) = self.worker_handle.take() {
912 worker_handle.join().map_err(|_| {
913 CuError::from("Async CopperList serializer thread panicked while joining")
914 })?;
915 }
916 Ok(())
917 }
918}
919
920#[cfg(all(feature = "std", feature = "async-cl-io"))]
921impl<P: CopperListTuple + Default, const NBCL: usize> Drop for AsyncCopperListsManager<P, NBCL> {
922 fn drop(&mut self) {
923 let _ = self.shutdown_worker();
924 }
925}
926
927#[cfg(all(feature = "std", debug_assertions))]
928fn debug_assert_processing_completion_state<P: CopperListTuple>(
929 culist: &CopperList<P>,
930 context: &str,
931) {
932 assert_eq!(
933 culist.get_state(),
934 CopperListState::Processing,
935 "{context} expected CopperList #{} to be Processing, found {}",
936 culist.id,
937 culist.get_state()
938 );
939}
940
941#[cfg(all(feature = "std", feature = "async-cl-io"))]
942#[doc(hidden)]
943pub type CopperListsManager<P, const NBCL: usize> = AsyncCopperListsManager<P, NBCL>;
944
945#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
946#[doc(hidden)]
947pub type CopperListsManager<P, const NBCL: usize> = SyncCopperListsManager<P, NBCL>;
948
949pub struct KeyFramesManager {
951 inner: KeyFrame,
953
954 forced_timestamp: Option<CuTime>,
956
957 locked: bool,
959
960 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
962
963 keyframe_interval: u32,
965
966 pub last_encoded_bytes: u64,
968}
969
970impl KeyFramesManager {
971 fn is_keyframe(&self, culistid: u64) -> bool {
972 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
973 }
974
975 #[inline]
976 pub fn captures_keyframe(&self, culistid: u64) -> bool {
977 self.is_keyframe(culistid)
978 }
979
980 pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
981 if self.is_keyframe(culistid) {
982 if self.locked && self.inner.culistid == culistid {
984 return;
985 }
986 let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
987 self.inner.reset(culistid, ts);
988 self.locked = false;
989 }
990 }
991
992 #[cfg(feature = "std")]
994 pub fn set_forced_timestamp(&mut self, ts: CuTime) {
995 self.forced_timestamp = Some(ts);
996 }
997
998 pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
999 if self.is_keyframe(culistid) {
1000 if self.locked {
1001 return Ok(0);
1003 }
1004 if self.inner.culistid != culistid {
1005 return Err(CuError::from(format!(
1006 "Freezing task for culistid {} but current keyframe is {}",
1007 culistid, self.inner.culistid
1008 )));
1009 }
1010 self.inner
1011 .add_frozen_task(task)
1012 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
1013 } else {
1014 Ok(0)
1015 }
1016 }
1017
1018 pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
1020 self.freeze_task(culistid, item)
1021 }
1022
1023 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
1024 if self.is_keyframe(culistid) {
1025 let logger = self.logger.as_mut().unwrap();
1026 logger.log(&self.inner)?;
1027 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
1028 self.locked = false;
1030 Ok(())
1031 } else {
1032 self.last_encoded_bytes = 0;
1034 Ok(())
1035 }
1036 }
1037
1038 #[cfg(feature = "std")]
1040 pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
1041 self.inner = keyframe.clone();
1042 self.forced_timestamp = Some(keyframe.timestamp);
1043 self.locked = true;
1044 }
1045}
1046
1047pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
1051 clock: RobotClock,
1053
1054 subsystem_code: u16,
1056
1057 #[doc(hidden)]
1059 pub instance_id: u32,
1060
1061 #[doc(hidden)]
1063 pub tasks: CT,
1064
1065 #[doc(hidden)]
1067 pub bridges: CB,
1068
1069 #[doc(hidden)]
1071 pub resources: ResourceManager,
1072
1073 #[doc(hidden)]
1075 pub monitor: M,
1076
1077 #[cfg(feature = "std")]
1083 #[doc(hidden)]
1084 pub execution_probe: ExecutionProbeHandle,
1085 #[cfg(not(feature = "std"))]
1086 #[doc(hidden)]
1087 pub execution_probe: RuntimeExecutionProbe,
1088
1089 #[doc(hidden)]
1091 pub copperlists_manager: CopperListsManager<P, NBCL>,
1092
1093 #[doc(hidden)]
1095 pub keyframes_manager: KeyFramesManager,
1096
1097 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1099 #[doc(hidden)]
1100 pub parallel_rt: ParallelRt<NBCL>,
1101
1102 #[doc(hidden)]
1104 pub runtime_config: RuntimeConfig,
1105}
1106
1107impl<
1109 CT,
1110 CB,
1111 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload,
1112 M: CuMonitor,
1113 const NBCL: usize,
1114> ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
1115{
1116 fn get_clock(&self) -> RobotClock {
1117 self.clock.clone()
1118 }
1119}
1120
1121impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> CuRuntime<CT, CB, P, M, NBCL> {
1122 #[inline]
1124 pub fn clock(&self) -> RobotClock {
1125 self.clock.clone()
1126 }
1127
1128 #[doc(hidden)]
1130 #[inline]
1131 pub fn clock_ref(&self) -> &RobotClock {
1132 &self.clock
1133 }
1134
1135 #[inline]
1137 pub fn subsystem_code(&self) -> u16 {
1138 self.subsystem_code
1139 }
1140
1141 #[inline]
1143 pub fn instance_id(&self) -> u32 {
1144 self.instance_id
1145 }
1146}
1147
1148impl<
1149 'cfg,
1150 CT,
1151 CB,
1152 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1153 M: CuMonitor,
1154 const NBCL: usize,
1155 TI,
1156 BI,
1157 MI,
1158 CLW,
1159 KFW,
1160> CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
1161where
1162 TI: for<'c> Fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>,
1163 BI: Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1164 MI: Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1165 CLW: WriteStream<CopperList<P>> + 'static,
1166 KFW: WriteStream<KeyFrame> + 'static,
1167{
1168 pub fn build(self) -> CuResult<CuRuntime<CT, CB, P, M, NBCL>> {
1169 let Self {
1170 clock,
1171 config,
1172 mission,
1173 subsystem,
1174 instance_id,
1175 resources,
1176 parts,
1177 copperlists_logger,
1178 keyframes_logger,
1179 } = self;
1180 let mut resources =
1181 resources.ok_or_else(|| CuError::from("Resources missing from CuRuntimeBuilder"))?;
1182
1183 let graph = config.get_graph(Some(mission))?;
1184 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1185 .get_all_nodes()
1186 .iter()
1187 .map(|(_, node)| node.get_instance_config())
1188 .collect();
1189
1190 let tasks = (parts.tasks_instanciator)(all_instances_configs, &mut resources)?;
1191
1192 #[cfg(feature = "std")]
1193 let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
1194 #[cfg(not(feature = "std"))]
1195 let execution_probe = RuntimeExecutionProbe::default();
1196 let monitor_metadata = CuMonitoringMetadata::new(
1197 CompactString::from(mission),
1198 parts.monitored_components,
1199 parts.culist_component_mapping,
1200 CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
1201 build_monitor_topology(config, mission)?,
1202 None,
1203 )?
1204 .with_subsystem_id(subsystem.id())
1205 .with_instance_id(instance_id);
1206 #[cfg(feature = "std")]
1207 let monitor_runtime =
1208 CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
1209 #[cfg(not(feature = "std"))]
1210 let monitor_runtime = CuMonitoringRuntime::unavailable();
1211 let monitor = (parts.monitor_instanciator)(config, monitor_metadata, monitor_runtime);
1212 let bridges = (parts.bridges_instanciator)(config, &mut resources)?;
1213
1214 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
1215 Some(logging_config) if logging_config.enable_task_logging => (
1216 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1217 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1218 logging_config.keyframe_interval.unwrap(),
1219 ),
1220 Some(_) => (None, None, 0),
1221 None => (
1222 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1223 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1224 DEFAULT_KEYFRAME_INTERVAL,
1225 ),
1226 };
1227
1228 let copperlists_manager = CopperListsManager::new(copperlists_logger)?;
1229 #[cfg(target_os = "none")]
1230 {
1231 let cl_size = core::mem::size_of::<CopperList<P>>();
1232 let total_bytes = cl_size.saturating_mul(NBCL);
1233 info!(
1234 "CuRuntimeBuilder: copperlists count={} cl_size={} total_bytes={}",
1235 NBCL, cl_size, total_bytes
1236 );
1237 }
1238
1239 let keyframes_manager = KeyFramesManager {
1240 inner: KeyFrame::new(),
1241 logger: keyframes_logger,
1242 keyframe_interval,
1243 last_encoded_bytes: 0,
1244 forced_timestamp: None,
1245 locked: false,
1246 };
1247 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1248 let parallel_rt = ParallelRt::new(parts.parallel_rt_metadata)?;
1249
1250 let runtime_config = config.runtime.clone().unwrap_or_default();
1251 runtime_config.validate()?;
1252
1253 Ok(CuRuntime {
1254 subsystem_code: subsystem.code(),
1255 instance_id,
1256 tasks,
1257 bridges,
1258 resources,
1259 monitor,
1260 execution_probe,
1261 clock,
1262 copperlists_manager,
1263 keyframes_manager,
1264 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1265 parallel_rt,
1266 runtime_config,
1267 })
1268 }
1269}
1270
1271#[derive(Clone, Encode, Decode)]
1275pub struct KeyFrame {
1276 pub culistid: u64,
1278 pub timestamp: CuTime,
1280 pub serialized_tasks: Vec<u8>,
1282}
1283
1284impl KeyFrame {
1285 fn new() -> Self {
1286 KeyFrame {
1287 culistid: 0,
1288 timestamp: CuTime::default(),
1289 serialized_tasks: Vec::new(),
1290 }
1291 }
1292
1293 fn reset(&mut self, culistid: u64, timestamp: CuTime) {
1295 self.culistid = culistid;
1296 self.timestamp = timestamp;
1297 self.serialized_tasks.clear();
1298 }
1299
1300 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
1302 let cfg = bincode::config::standard();
1303 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
1304 BincodeAdapter(task).encode(&mut sizer)?;
1305 let need = sizer.into_writer().bytes_written as usize;
1306
1307 let start = self.serialized_tasks.len();
1308 self.serialized_tasks.resize(start + need, 0);
1309 let mut enc =
1310 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
1311 BincodeAdapter(task).encode(&mut enc)?;
1312 Ok(need)
1313 }
1314}
1315
1316#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1318pub enum RuntimeLifecycleConfigSource {
1319 ProgrammaticOverride,
1320 ExternalFile,
1321 BundledDefault,
1322}
1323
1324#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1326pub struct RuntimeLifecycleStackInfo {
1327 pub app_name: String,
1328 pub app_version: String,
1329 pub git_commit: Option<String>,
1330 pub git_dirty: Option<bool>,
1331 pub subsystem_id: Option<String>,
1332 pub subsystem_code: u16,
1333 pub instance_id: u32,
1334}
1335
1336#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1338pub enum RuntimeLifecycleEvent {
1339 Instantiated {
1340 config_source: RuntimeLifecycleConfigSource,
1341 effective_config_ron: String,
1342 stack: RuntimeLifecycleStackInfo,
1343 },
1344 MissionStarted {
1345 mission: String,
1346 },
1347 MissionStopped {
1348 mission: String,
1349 reason: String,
1352 },
1353 Panic {
1355 message: String,
1356 file: Option<String>,
1357 line: Option<u32>,
1358 column: Option<u32>,
1359 },
1360 ShutdownCompleted,
1361}
1362
1363#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1365pub struct RuntimeLifecycleRecord {
1366 pub timestamp: CuTime,
1367 pub event: RuntimeLifecycleEvent,
1368}
1369
1370impl<
1371 CT,
1372 CB,
1373 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1374 M: CuMonitor,
1375 const NBCL: usize,
1376> CuRuntime<CT, CB, P, M, NBCL>
1377{
1378 #[inline]
1382 pub fn record_execution_marker(&self, marker: ExecutionMarker) {
1383 self.execution_probe.record(marker);
1384 }
1385
1386 #[inline]
1391 pub fn execution_probe_ref(&self) -> &RuntimeExecutionProbe {
1392 #[cfg(feature = "std")]
1393 {
1394 self.execution_probe.as_ref()
1395 }
1396
1397 #[cfg(not(feature = "std"))]
1398 {
1399 &self.execution_probe
1400 }
1401 }
1402}
1403
1404#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1409pub enum CuTaskType {
1410 Source,
1411 Regular,
1412 Sink,
1413}
1414
1415impl From<TaskKind> for CuTaskType {
1416 fn from(value: TaskKind) -> Self {
1417 match value {
1418 TaskKind::Source => CuTaskType::Source,
1419 TaskKind::Regular => CuTaskType::Regular,
1420 TaskKind::Sink => CuTaskType::Sink,
1421 }
1422 }
1423}
1424
1425#[derive(Debug, Clone)]
1426pub struct CuOutputPack {
1427 pub culist_index: u32,
1428 pub msg_types: Vec<String>,
1429}
1430
1431#[derive(Debug, Clone)]
1432pub struct CuInputMsg {
1433 pub culist_index: u32,
1434 pub msg_type: String,
1435 pub src_port: usize,
1436 pub edge_id: usize,
1437 pub connection_order: usize,
1438}
1439
1440pub struct CuExecutionStep {
1442 pub node_id: NodeId,
1444 pub node: Node,
1446 pub task_type: CuTaskType,
1448
1449 pub input_msg_indices_types: Vec<CuInputMsg>,
1451
1452 pub output_msg_pack: Option<CuOutputPack>,
1454}
1455
1456impl Debug for CuExecutionStep {
1457 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1458 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
1459 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
1460 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
1461 f.write_str(
1462 format!(
1463 " input_msg_types: {:?}\n",
1464 self.input_msg_indices_types
1465 )
1466 .as_str(),
1467 )?;
1468 f.write_str(format!(" output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
1469 Ok(())
1470 }
1471}
1472
1473pub struct CuExecutionLoop {
1478 pub steps: Vec<CuExecutionUnit>,
1479 pub loop_count: Option<u32>,
1480}
1481
1482impl Debug for CuExecutionLoop {
1483 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1484 f.write_str("CuExecutionLoop:\n")?;
1485 for step in &self.steps {
1486 match step {
1487 CuExecutionUnit::Step(step) => {
1488 step.fmt(f)?;
1489 }
1490 CuExecutionUnit::Loop(l) => {
1491 l.fmt(f)?;
1492 }
1493 }
1494 }
1495
1496 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
1497 Ok(())
1498 }
1499}
1500
1501#[derive(Debug)]
1503pub enum CuExecutionUnit {
1504 Step(Box<CuExecutionStep>),
1505 Loop(CuExecutionLoop),
1506}
1507
1508fn find_output_pack_from_nodeid(
1509 node_id: NodeId,
1510 steps: &Vec<CuExecutionUnit>,
1511) -> Option<CuOutputPack> {
1512 for step in steps {
1513 match step {
1514 CuExecutionUnit::Loop(loop_unit) => {
1515 if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
1516 return Some(output_pack);
1517 }
1518 }
1519 CuExecutionUnit::Step(step) if step.node_id == node_id => {
1520 return step.output_msg_pack.clone();
1521 }
1522 _ => {}
1523 }
1524 }
1525 None
1526}
1527
1528pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuResult<CuTaskType> {
1529 let node = graph
1530 .get_node(node_id)
1531 .ok_or_else(|| CuError::from(format!("Node id {node_id} not found")))?;
1532
1533 if node.get_flavor() == crate::config::Flavor::Task {
1534 return resolve_task_kind_for_id(graph, node_id).map(Into::into);
1535 }
1536
1537 let has_inputs = !graph.get_dst_edges(node_id)?.is_empty();
1538 let has_outputs = !graph.get_src_edges(node_id)?.is_empty();
1539 Ok(match (has_inputs, has_outputs) {
1540 (false, true) => CuTaskType::Source,
1541 (true, false) => CuTaskType::Sink,
1542 _ => CuTaskType::Regular,
1543 })
1544}
1545
1546fn sort_inputs_by_connection_order(input_msg_indices_types: &mut [CuInputMsg]) {
1551 input_msg_indices_types.sort_by_key(|input| input.connection_order);
1552}
1553
1554fn plan_tasks_tree_branch(
1556 graph: &CuGraph,
1557 mut next_culist_output_index: u32,
1558 starting_point: NodeId,
1559 plan: &mut Vec<CuExecutionUnit>,
1560) -> CuResult<(u32, bool)> {
1561 #[cfg(all(feature = "std", feature = "macro_debug"))]
1562 eprintln!("-- starting branch from node {starting_point}");
1563
1564 let mut handled = false;
1565
1566 for id in graph.bfs_nodes(starting_point) {
1567 let node_ref = graph.get_node(id).unwrap();
1568 #[cfg(all(feature = "std", feature = "macro_debug"))]
1569 eprintln!(" Visiting node: {node_ref:?}");
1570
1571 let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
1572 let output_msg_pack: Option<CuOutputPack>;
1573 let task_type = find_task_type_for_id(graph, id)?;
1574
1575 match task_type {
1576 CuTaskType::Source => {
1577 #[cfg(all(feature = "std", feature = "macro_debug"))]
1578 eprintln!(" → Source node, assign output index {next_culist_output_index}");
1579 let msg_types = graph.get_node_output_msg_types_by_id(id)?;
1580 if msg_types.is_empty() {
1581 return Err(CuError::from(format!(
1582 "Source node '{}' has no declared outputs",
1583 node_ref.get_id()
1584 )));
1585 }
1586 output_msg_pack = Some(CuOutputPack {
1587 culist_index: next_culist_output_index,
1588 msg_types,
1589 });
1590 next_culist_output_index += 1;
1591 }
1592 CuTaskType::Sink => {
1593 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1594 edge_ids.sort();
1595 #[cfg(all(feature = "std", feature = "macro_debug"))]
1596 eprintln!(" → Sink with incoming edges: {edge_ids:?}");
1597 for edge_id in edge_ids {
1598 let edge = graph
1599 .edge(edge_id)
1600 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1601 let pid = graph
1602 .get_node_id_by_name(edge.src.as_str())
1603 .unwrap_or_else(|| {
1604 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1605 });
1606 let output_pack = find_output_pack_from_nodeid(pid, plan);
1607 if let Some(output_pack) = output_pack {
1608 #[cfg(all(feature = "std", feature = "macro_debug"))]
1609 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
1610 let msg_type = edge.msg.as_str();
1611 let src_port = output_pack
1612 .msg_types
1613 .iter()
1614 .position(|msg| msg == msg_type)
1615 .unwrap_or_else(|| {
1616 panic!(
1617 "Missing output port for message type '{msg_type}' on node {pid}"
1618 )
1619 });
1620 input_msg_indices_types.push(CuInputMsg {
1621 culist_index: output_pack.culist_index,
1622 msg_type: msg_type.to_string(),
1623 src_port,
1624 edge_id,
1625 connection_order: edge.order,
1626 });
1627 } else {
1628 #[cfg(all(feature = "std", feature = "macro_debug"))]
1629 eprintln!(" ✗ Input from {pid} not ready, returning");
1630 return Ok((next_culist_output_index, handled));
1631 }
1632 }
1633 output_msg_pack = Some(CuOutputPack {
1634 culist_index: next_culist_output_index,
1635 msg_types: Vec::from(["()".to_string()]),
1636 });
1637 next_culist_output_index += 1;
1638 }
1639 CuTaskType::Regular => {
1640 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1641 edge_ids.sort();
1642 #[cfg(all(feature = "std", feature = "macro_debug"))]
1643 eprintln!(" → Regular task with incoming edges: {edge_ids:?}");
1644 for edge_id in edge_ids {
1645 let edge = graph
1646 .edge(edge_id)
1647 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1648 let pid = graph
1649 .get_node_id_by_name(edge.src.as_str())
1650 .unwrap_or_else(|| {
1651 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1652 });
1653 let output_pack = find_output_pack_from_nodeid(pid, plan);
1654 if let Some(output_pack) = output_pack {
1655 #[cfg(all(feature = "std", feature = "macro_debug"))]
1656 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
1657 let msg_type = edge.msg.as_str();
1658 let src_port = output_pack
1659 .msg_types
1660 .iter()
1661 .position(|msg| msg == msg_type)
1662 .unwrap_or_else(|| {
1663 panic!(
1664 "Missing output port for message type '{msg_type}' on node {pid}"
1665 )
1666 });
1667 input_msg_indices_types.push(CuInputMsg {
1668 culist_index: output_pack.culist_index,
1669 msg_type: msg_type.to_string(),
1670 src_port,
1671 edge_id,
1672 connection_order: edge.order,
1673 });
1674 } else {
1675 #[cfg(all(feature = "std", feature = "macro_debug"))]
1676 eprintln!(" ✗ Input from {pid} not ready, returning");
1677 return Ok((next_culist_output_index, handled));
1678 }
1679 }
1680 let msg_types = graph.get_node_output_msg_types_by_id(id)?;
1681 if msg_types.is_empty() {
1682 return Err(CuError::from(format!(
1683 "Regular node '{}' has no declared outputs",
1684 node_ref.get_id()
1685 )));
1686 }
1687 output_msg_pack = Some(CuOutputPack {
1688 culist_index: next_culist_output_index,
1689 msg_types,
1690 });
1691 next_culist_output_index += 1;
1692 }
1693 }
1694
1695 sort_inputs_by_connection_order(&mut input_msg_indices_types);
1696
1697 if let Some(pos) = plan
1698 .iter()
1699 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
1700 {
1701 #[cfg(all(feature = "std", feature = "macro_debug"))]
1702 eprintln!(" → Already in plan, modifying existing step");
1703 let mut step = plan.remove(pos);
1704 if let CuExecutionUnit::Step(ref mut s) = step {
1705 s.input_msg_indices_types = input_msg_indices_types;
1706 }
1707 plan.push(step);
1708 } else {
1709 #[cfg(all(feature = "std", feature = "macro_debug"))]
1710 eprintln!(" → New step added to plan");
1711 let step = CuExecutionStep {
1712 node_id: id,
1713 node: node_ref.clone(),
1714 task_type,
1715 input_msg_indices_types,
1716 output_msg_pack,
1717 };
1718 plan.push(CuExecutionUnit::Step(Box::new(step)));
1719 }
1720
1721 handled = true;
1722 }
1723
1724 #[cfg(all(feature = "std", feature = "macro_debug"))]
1725 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
1726 Ok((next_culist_output_index, handled))
1727}
1728
1729pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1732 #[cfg(all(feature = "std", feature = "macro_debug"))]
1733 eprintln!("[runtime plan]");
1734 let mut plan = Vec::new();
1735 let mut next_culist_output_index = 0u32;
1736
1737 let mut queue: VecDeque<NodeId> = VecDeque::new();
1738 for node_id in graph.node_ids() {
1739 if find_task_type_for_id(graph, node_id)? == CuTaskType::Source {
1740 queue.push_back(node_id);
1741 }
1742 }
1743
1744 #[cfg(all(feature = "std", feature = "macro_debug"))]
1745 eprintln!("Initial source nodes: {queue:?}");
1746
1747 while let Some(start_node) = queue.pop_front() {
1748 #[cfg(all(feature = "std", feature = "macro_debug"))]
1749 eprintln!("→ Starting BFS from source {start_node}");
1750 for node_id in graph.bfs_nodes(start_node) {
1751 let already_in_plan = plan
1752 .iter()
1753 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1754 if already_in_plan {
1755 #[cfg(all(feature = "std", feature = "macro_debug"))]
1756 eprintln!(" → Node {node_id} already planned, skipping");
1757 continue;
1758 }
1759
1760 #[cfg(all(feature = "std", feature = "macro_debug"))]
1761 eprintln!(" Planning from node {node_id}");
1762 let (new_index, handled) =
1763 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan)?;
1764 next_culist_output_index = new_index;
1765
1766 if !handled {
1767 #[cfg(all(feature = "std", feature = "macro_debug"))]
1768 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1769 continue;
1770 }
1771
1772 #[cfg(all(feature = "std", feature = "macro_debug"))]
1773 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
1774 for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1775 #[cfg(all(feature = "std", feature = "macro_debug"))]
1776 eprintln!(" → Enqueueing neighbor {neighbor}");
1777 queue.push_back(neighbor);
1778 }
1779 }
1780 }
1781
1782 let mut planned_nodes = BTreeSet::new();
1783 for unit in &plan {
1784 if let CuExecutionUnit::Step(step) = unit {
1785 planned_nodes.insert(step.node_id);
1786 }
1787 }
1788
1789 let mut missing = Vec::new();
1790 for node_id in graph.node_ids() {
1791 if !planned_nodes.contains(&node_id) {
1792 if let Some(node) = graph.get_node(node_id) {
1793 missing.push(node.get_id().to_string());
1794 } else {
1795 missing.push(format!("node_id_{node_id}"));
1796 }
1797 }
1798 }
1799
1800 if !missing.is_empty() {
1801 missing.sort();
1802 return Err(CuError::from(format!(
1803 "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1804 missing.join(", ")
1805 )));
1806 }
1807
1808 Ok(CuExecutionLoop {
1809 steps: plan,
1810 loop_count: None,
1811 })
1812}
1813
1814#[cfg(test)]
1816mod tests {
1817 use super::*;
1818 use crate::config::Node;
1819 use crate::context::CuContext;
1820 use crate::cutask::CuSinkTask;
1821 use crate::cutask::{CuSrcTask, Freezable};
1822 use crate::monitoring::NoMonitor;
1823 use crate::reflect::Reflect;
1824 use bincode::Encode;
1825 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1826 use serde_derive::{Deserialize, Serialize};
1827 #[cfg(feature = "std")]
1828 use std::sync::{Arc, Mutex};
1829
1830 #[derive(Reflect)]
1831 pub struct TestSource {}
1832
1833 impl Freezable for TestSource {}
1834
1835 impl CuSrcTask for TestSource {
1836 type Resources<'r> = ();
1837 type Output<'m> = ();
1838 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1839 where
1840 Self: Sized,
1841 {
1842 Ok(Self {})
1843 }
1844
1845 fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1846 Ok(())
1847 }
1848 }
1849
1850 #[derive(Reflect)]
1851 pub struct TestSink {}
1852
1853 impl Freezable for TestSink {}
1854
1855 impl CuSinkTask for TestSink {
1856 type Resources<'r> = ();
1857 type Input<'m> = ();
1858
1859 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1860 where
1861 Self: Sized,
1862 {
1863 Ok(Self {})
1864 }
1865
1866 fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1867 Ok(())
1868 }
1869 }
1870
1871 type Tasks = (TestSource, TestSink);
1873 type TestRuntime = CuRuntime<Tasks, (), Msgs, NoMonitor, 2>;
1874 const TEST_NBCL: usize = 2;
1875
1876 #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1877 struct Msgs(());
1878
1879 impl ErasedCuStampedDataSet for Msgs {
1880 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1881 Vec::new()
1882 }
1883 }
1884
1885 impl MatchingTasks for Msgs {
1886 fn get_all_task_ids() -> &'static [&'static str] {
1887 &[]
1888 }
1889 }
1890
1891 impl CuListZeroedInit for Msgs {
1892 fn init_zeroed(&mut self) {}
1893 }
1894
1895 #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1896 struct IntMsgs(i32);
1897
1898 impl ErasedCuStampedDataSet for IntMsgs {
1899 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1900 Vec::new()
1901 }
1902 }
1903
1904 impl MatchingTasks for IntMsgs {
1905 fn get_all_task_ids() -> &'static [&'static str] {
1906 &[]
1907 }
1908 }
1909
1910 impl CuListZeroedInit for IntMsgs {
1911 fn init_zeroed(&mut self) {}
1912 }
1913
1914 #[cfg(feature = "std")]
1915 fn tasks_instanciator(
1916 all_instances_configs: Vec<Option<&ComponentConfig>>,
1917 _resources: &mut ResourceManager,
1918 ) -> CuResult<Tasks> {
1919 Ok((
1920 TestSource::new(all_instances_configs[0], ())?,
1921 TestSink::new(all_instances_configs[1], ())?,
1922 ))
1923 }
1924
1925 #[cfg(not(feature = "std"))]
1926 fn tasks_instanciator(
1927 all_instances_configs: Vec<Option<&ComponentConfig>>,
1928 _resources: &mut ResourceManager,
1929 ) -> CuResult<Tasks> {
1930 Ok((
1931 TestSource::new(all_instances_configs[0], ())?,
1932 TestSink::new(all_instances_configs[1], ())?,
1933 ))
1934 }
1935
1936 fn monitor_instanciator(
1937 _config: &CuConfig,
1938 metadata: CuMonitoringMetadata,
1939 runtime: CuMonitoringRuntime,
1940 ) -> NoMonitor {
1941 NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
1942 }
1943
1944 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1945 Ok(())
1946 }
1947
1948 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1949 Ok(ResourceManager::new(&[]))
1950 }
1951
1952 #[derive(Debug)]
1953 struct FakeWriter {}
1954
1955 impl<E: Encode> WriteStream<E> for FakeWriter {
1956 fn log(&mut self, _obj: &E) -> CuResult<()> {
1957 Ok(())
1958 }
1959 }
1960
1961 #[cfg(not(feature = "async-cl-io"))]
1962 #[derive(Debug)]
1963 struct RecordingSyncWriter {
1964 ids: Arc<Mutex<Vec<u64>>>,
1965 last_log_bytes: usize,
1966 fail_on: Option<u64>,
1967 }
1968
1969 #[cfg(not(feature = "async-cl-io"))]
1970 impl WriteStream<CopperList<IntMsgs>> for RecordingSyncWriter {
1971 fn log(&mut self, culist: &CopperList<IntMsgs>) -> CuResult<()> {
1972 self.ids.lock().unwrap().push(culist.id);
1973 if self.fail_on == Some(culist.id) {
1974 return Err(CuError::from(format!(
1975 "logger failed for CopperList #{}",
1976 culist.id
1977 )));
1978 }
1979 Ok(())
1980 }
1981
1982 fn last_log_bytes(&self) -> Option<usize> {
1983 Some(self.last_log_bytes)
1984 }
1985 }
1986
1987 #[test]
1988 fn test_runtime_instantiation() {
1989 let mut config = CuConfig::default();
1990 let graph = config.get_graph_mut(None).unwrap();
1991 graph.add_node(Node::new("a", "TestSource")).unwrap();
1992 graph.add_node(Node::new("b", "TestSink")).unwrap();
1993 graph.connect(0, 1, "()").unwrap();
1994 let runtime: CuResult<TestRuntime> =
1995 CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
1996 RobotClock::default(),
1997 &config,
1998 crate::config::DEFAULT_MISSION_ID,
1999 CuRuntimeParts::new(
2000 tasks_instanciator,
2001 &[],
2002 &[],
2003 #[cfg(all(feature = "std", feature = "parallel-rt"))]
2004 &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2005 monitor_instanciator,
2006 bridges_instanciator,
2007 ),
2008 FakeWriter {},
2009 FakeWriter {},
2010 )
2011 .try_with_resources_instantiator(resources_instanciator)
2012 .and_then(|builder| builder.build());
2013 assert!(runtime.is_ok());
2014 }
2015
2016 #[test]
2017 fn test_rate_target_period_rejects_zero() {
2018 let err = rate_target_period(0).expect_err("zero rate target should fail");
2019 assert!(
2020 err.to_string()
2021 .contains("Runtime rate target cannot be zero"),
2022 "unexpected error: {err}"
2023 );
2024 }
2025
2026 #[test]
2027 fn test_loop_rate_limiter_advances_to_next_period_when_on_time() {
2028 let (clock, mock) = RobotClock::mock();
2029 let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2030 assert_eq!(limiter.next_deadline(), CuTime::from_nanos(10_000_000));
2031
2032 mock.set_value(10_000_000);
2033 limiter.mark_tick(&clock);
2034
2035 assert_eq!(limiter.next_deadline(), CuTime::from_nanos(20_000_000));
2036 }
2037
2038 #[test]
2039 fn test_loop_rate_limiter_skips_missed_periods_without_resetting_phase() {
2040 let (clock, mock) = RobotClock::mock();
2041 let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2042
2043 mock.set_value(35_000_000);
2044 limiter.mark_tick(&clock);
2045
2046 assert_eq!(limiter.next_deadline(), CuTime::from_nanos(40_000_000));
2047 }
2048
2049 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
2050 #[test]
2051 fn test_loop_rate_limiter_spin_window_is_fixed_scheduler_window() {
2052 let (clock, _) = RobotClock::mock();
2053 let limiter = LoopRateLimiter::from_rate_target_hz(1_000, &clock).unwrap();
2054 assert_eq!(limiter.spin_window(), CuDuration::from(200_000));
2055
2056 let fast = LoopRateLimiter::from_rate_target_hz(10_000, &clock).unwrap();
2057 assert_eq!(fast.spin_window(), CuDuration::from(200_000));
2058 }
2059
2060 #[cfg(not(feature = "async-cl-io"))]
2061 #[test]
2062 fn test_copperlists_manager_lifecycle() {
2063 let mut config = CuConfig::default();
2064 let graph = config.get_graph_mut(None).unwrap();
2065 graph.add_node(Node::new("a", "TestSource")).unwrap();
2066 graph.add_node(Node::new("b", "TestSink")).unwrap();
2067 graph.connect(0, 1, "()").unwrap();
2068
2069 let mut runtime: TestRuntime =
2070 CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
2071 RobotClock::default(),
2072 &config,
2073 crate::config::DEFAULT_MISSION_ID,
2074 CuRuntimeParts::new(
2075 tasks_instanciator,
2076 &[],
2077 &[],
2078 #[cfg(all(feature = "std", feature = "parallel-rt"))]
2079 &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2080 monitor_instanciator,
2081 bridges_instanciator,
2082 ),
2083 FakeWriter {},
2084 FakeWriter {},
2085 )
2086 .try_with_resources_instantiator(resources_instanciator)
2087 .and_then(|builder| builder.build())
2088 .unwrap();
2089
2090 {
2092 let copperlists = &mut runtime.copperlists_manager;
2093 let culist0 = copperlists
2094 .create()
2095 .expect("Ran out of space for copper lists");
2096 let id = culist0.id;
2097 assert_eq!(id, 0);
2098 culist0.change_state(CopperListState::Processing);
2099 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2100 }
2101
2102 {
2103 let copperlists = &mut runtime.copperlists_manager;
2104 let culist1 = copperlists
2105 .create()
2106 .expect("Ran out of space for copper lists");
2107 let id = culist1.id;
2108 assert_eq!(id, 1);
2109 culist1.change_state(CopperListState::Processing);
2110 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2111 }
2112
2113 {
2114 let copperlists = &mut runtime.copperlists_manager;
2115 let culist2 = copperlists.create();
2116 assert!(culist2.is_err());
2117 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2118 let _ = copperlists.end_of_processing(1);
2120 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2121 }
2122
2123 {
2125 let copperlists = &mut runtime.copperlists_manager;
2126 let culist2 = copperlists
2127 .create()
2128 .expect("Ran out of space for copper lists");
2129 let id = culist2.id;
2130 assert_eq!(id, 2);
2131 culist2.change_state(CopperListState::Processing);
2132 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2133 let _ = copperlists.end_of_processing(0);
2135 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2137
2138 let _ = copperlists.end_of_processing(2);
2140 assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2143 }
2144 }
2145
2146 #[cfg(not(feature = "async-cl-io"))]
2147 #[test]
2148 fn test_sync_copperlists_accessors_passthrough_to_inner_manager() {
2149 let mut copperlists = SyncCopperListsManager::<IntMsgs, 2>::new(None).unwrap();
2150
2151 assert_eq!(copperlists.next_cl_id(), 0);
2152 assert_eq!(copperlists.last_cl_id(), 0);
2153 assert!(copperlists.peek().is_none());
2154
2155 {
2156 let culist = copperlists.create().unwrap();
2157 culist.msgs.0 = 11;
2158 assert_eq!(culist.id, 0);
2159 assert_eq!(culist.get_state(), CopperListState::Initialized);
2160 }
2161
2162 assert_eq!(copperlists.next_cl_id(), 1);
2163 assert_eq!(copperlists.last_cl_id(), 0);
2164 let peeked = copperlists.peek().unwrap();
2165 assert_eq!(peeked.id, 0);
2166 assert_eq!(peeked.msgs.0, 11);
2167 assert_eq!(peeked.get_state(), CopperListState::Initialized);
2168 }
2169
2170 #[cfg(not(feature = "async-cl-io"))]
2171 #[test]
2172 fn test_sync_reclaimed_slot_reuse_reinitializes_state_but_preserves_payload_storage() {
2173 let mut copperlists = SyncCopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2174
2175 {
2176 let culist = copperlists.create().unwrap();
2177 culist.msgs.0 = 41;
2178 culist.change_state(CopperListState::Processing);
2179 assert_eq!(culist.id, 0);
2180 }
2181
2182 copperlists.end_of_processing(0).unwrap();
2183 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2184
2185 let reused = copperlists.create().unwrap();
2186 assert_eq!(reused.id, 1);
2187 assert_eq!(reused.get_state(), CopperListState::Initialized);
2188 assert_eq!(reused.msgs.0, 41);
2189 }
2190
2191 #[cfg(all(not(feature = "async-cl-io"), debug_assertions))]
2192 #[test]
2193 #[should_panic(expected = "sync end_of_processing expected exactly one active CopperList #99")]
2194 fn test_sync_end_of_processing_unknown_id_panics_in_debug() {
2195 let mut copperlists = SyncCopperListsManager::<IntMsgs, 2>::new(None).unwrap();
2196
2197 {
2198 let culist = copperlists.create().unwrap();
2199 culist.msgs.0 = 10;
2200 culist.change_state(CopperListState::Processing);
2201 }
2202 {
2203 let culist = copperlists.create().unwrap();
2204 culist.msgs.0 = 20;
2205 culist.change_state(CopperListState::Processing);
2206 }
2207
2208 let _ = copperlists.end_of_processing(99);
2209 }
2210
2211 #[cfg(all(not(feature = "async-cl-io"), debug_assertions))]
2212 #[test]
2213 #[should_panic(expected = "sync end_of_processing expected CopperList #0 to be Processing")]
2214 fn test_sync_end_of_processing_wrong_state_panics_in_debug() {
2215 let mut copperlists = SyncCopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2216
2217 {
2218 let culist = copperlists.create().unwrap();
2219 culist.msgs.0 = 10;
2220 assert_eq!(culist.get_state(), CopperListState::Initialized);
2221 }
2222
2223 let _ = copperlists.end_of_processing(0);
2224 }
2225
2226 #[cfg(not(feature = "async-cl-io"))]
2227 #[test]
2228 fn test_sync_end_of_processing_serializes_done_suffix_from_newest_to_oldest() {
2229 let ids = Arc::new(Mutex::new(Vec::new()));
2230 let mut copperlists =
2231 SyncCopperListsManager::<IntMsgs, 2>::new(Some(Box::new(RecordingSyncWriter {
2232 ids: ids.clone(),
2233 last_log_bytes: 17,
2234 fail_on: None,
2235 })))
2236 .unwrap();
2237
2238 {
2239 let culist = copperlists.create().unwrap();
2240 culist.msgs.0 = 10;
2241 culist.change_state(CopperListState::Processing);
2242 }
2243 {
2244 let culist = copperlists.create().unwrap();
2245 culist.msgs.0 = 20;
2246 culist.change_state(CopperListState::Processing);
2247 }
2248
2249 copperlists.end_of_processing(0).unwrap();
2250 assert!(ids.lock().unwrap().is_empty());
2251 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2252
2253 copperlists.end_of_processing(1).unwrap();
2254
2255 assert_eq!(*ids.lock().unwrap(), vec![1, 0]);
2256 assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2257 }
2258
2259 #[cfg(not(feature = "async-cl-io"))]
2260 #[test]
2261 fn test_sync_end_of_processing_updates_logger_counters_on_success() {
2262 let ids = Arc::new(Mutex::new(Vec::new()));
2263 let mut copperlists =
2264 SyncCopperListsManager::<IntMsgs, 1>::new(Some(Box::new(RecordingSyncWriter {
2265 ids: ids.clone(),
2266 last_log_bytes: 17,
2267 fail_on: None,
2268 })))
2269 .unwrap();
2270 let io_cache = crate::monitoring::CuMsgIoCache::<1>::default();
2271
2272 {
2273 let culist = copperlists.create().unwrap();
2274 culist.msgs.0 = 10;
2275 culist.change_state(CopperListState::Processing);
2276 }
2277
2278 {
2279 let capture = crate::monitoring::start_copperlist_io_capture(&io_cache);
2280 capture.select_slot(0);
2281 crate::monitoring::record_payload_handle_bytes(32);
2282 }
2283
2284 copperlists.end_of_processing(0).unwrap();
2285
2286 assert_eq!(*ids.lock().unwrap(), vec![0]);
2287 assert_eq!(copperlists.last_encoded_bytes, 17);
2288 assert_eq!(copperlists.last_handle_bytes, 32);
2289 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2290 }
2291
2292 #[cfg(not(feature = "async-cl-io"))]
2293 #[test]
2294 fn test_sync_end_of_processing_preserves_slot_on_logger_error() {
2295 let ids = Arc::new(Mutex::new(Vec::new()));
2296 let mut copperlists =
2297 SyncCopperListsManager::<IntMsgs, 1>::new(Some(Box::new(RecordingSyncWriter {
2298 ids: ids.clone(),
2299 last_log_bytes: 17,
2300 fail_on: Some(0),
2301 })))
2302 .unwrap();
2303
2304 {
2305 let culist = copperlists.create().unwrap();
2306 culist.change_state(CopperListState::Processing);
2307 }
2308
2309 let err = copperlists.end_of_processing(0).unwrap_err();
2310
2311 assert!(
2312 err.to_string().contains("logger failed for CopperList #0"),
2313 "unexpected error: {err}"
2314 );
2315 assert_eq!(*ids.lock().unwrap(), vec![0]);
2316 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2317 assert_eq!(copperlists.last_encoded_bytes, 0);
2318 assert_eq!(copperlists.last_handle_bytes, 0);
2319
2320 let peeked = copperlists.peek().unwrap();
2321 assert_eq!(peeked.id, 0);
2322 assert_eq!(peeked.get_state(), CopperListState::BeingSerialized);
2323 }
2324
2325 #[cfg(all(not(feature = "async-cl-io"), feature = "std", debug_assertions))]
2326 #[test]
2327 #[should_panic(
2328 expected = "sync boxed end_of_processing expected CopperList #7 to be Processing"
2329 )]
2330 fn test_sync_end_of_processing_boxed_wrong_state_panics_in_debug() {
2331 let mut copperlists = SyncCopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2332 let culist = Box::new(CopperList::new(7, IntMsgs::default()));
2333
2334 let _ = copperlists.end_of_processing_boxed(culist);
2335 }
2336
2337 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2338 #[derive(Debug, Default)]
2339 struct RecordingWriter {
2340 ids: Arc<Mutex<Vec<u64>>>,
2341 }
2342
2343 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2344 impl WriteStream<CopperList<Msgs>> for RecordingWriter {
2345 fn log(&mut self, culist: &CopperList<Msgs>) -> CuResult<()> {
2346 self.ids.lock().unwrap().push(culist.id);
2347 std::thread::sleep(std::time::Duration::from_millis(2));
2348 Ok(())
2349 }
2350 }
2351
2352 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2353 #[test]
2354 fn test_async_copperlists_manager_flushes_in_order() {
2355 let ids = Arc::new(Mutex::new(Vec::new()));
2356 let mut copperlists = CopperListsManager::<Msgs, 4>::new(Some(Box::new(RecordingWriter {
2357 ids: ids.clone(),
2358 })))
2359 .unwrap();
2360
2361 for expected_id in 0..4 {
2362 let culist = copperlists.create().unwrap();
2363 assert_eq!(culist.id, expected_id);
2364 culist.change_state(CopperListState::Processing);
2365 copperlists.end_of_processing(expected_id).unwrap();
2366 }
2367
2368 copperlists.finish_pending().unwrap();
2369 assert_eq!(copperlists.available_copper_lists().unwrap(), 4);
2370 assert_eq!(*ids.lock().unwrap(), vec![0, 1, 2, 3]);
2371 }
2372
2373 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2374 #[test]
2375 fn test_async_create_reinitializes_reclaimed_slot_state_but_preserves_payload_storage() {
2376 let mut copperlists = CopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2377
2378 {
2379 let culist = copperlists.create().unwrap();
2380 assert_eq!(culist.id, 0);
2381 assert_eq!(culist.get_state(), CopperListState::Initialized);
2382 culist.msgs.0 = 41;
2383 culist.change_state(CopperListState::Processing);
2384 }
2385
2386 copperlists.end_of_processing(0).unwrap();
2387 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2388
2389 let reused = copperlists.create().unwrap();
2390 assert_eq!(reused.id, 1);
2391 assert_eq!(reused.get_state(), CopperListState::Initialized);
2392 assert_eq!(reused.msgs.0, 41);
2393 }
2394
2395 #[cfg(all(feature = "std", feature = "async-cl-io", debug_assertions))]
2396 #[test]
2397 #[should_panic(expected = "async end_of_processing expected CopperList #0 to be Processing")]
2398 fn test_async_end_of_processing_wrong_state_panics_in_debug() {
2399 let mut copperlists = CopperListsManager::<IntMsgs, 1>::new(None).unwrap();
2400
2401 let culist = copperlists.create().unwrap();
2402 assert_eq!(culist.id, 0);
2403 assert_eq!(culist.get_state(), CopperListState::Initialized);
2404
2405 let _ = copperlists.end_of_processing(0);
2406 }
2407
2408 #[test]
2409 fn test_runtime_task_input_order() {
2410 let mut config = CuConfig::default();
2411 let graph = config.get_graph_mut(None).unwrap();
2412 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
2413 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
2414 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
2415
2416 assert_eq!(src1_id, 0);
2417 assert_eq!(src2_id, 1);
2418
2419 let src1_type = "src1_type";
2421 let src2_type = "src2_type";
2422 graph.connect(src2_id, sink_id, src2_type).unwrap();
2423 graph.connect(src1_id, sink_id, src1_type).unwrap();
2424
2425 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
2426 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
2427 assert_eq!(src1_edge_id, 1);
2430 assert_eq!(src2_edge_id, 0);
2431
2432 let runtime = compute_runtime_plan(graph).unwrap();
2433 let sink_step = runtime
2434 .steps
2435 .iter()
2436 .find_map(|step| match step {
2437 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
2438 _ => None,
2439 })
2440 .unwrap();
2441
2442 assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
2445 assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
2446 }
2447
2448 #[test]
2449 fn test_runtime_output_ports_unique_ordered() {
2450 let mut config = CuConfig::default();
2451 let graph = config.get_graph_mut(None).unwrap();
2452 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2453 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2454 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2455 let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
2456 let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
2457
2458 graph.connect(src_id, dst_a_id, "msg::A").unwrap();
2459 graph.connect(src_id, dst_b_id, "msg::B").unwrap();
2460 graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
2461 graph.connect(src_id, dst_c_id, "msg::C").unwrap();
2462
2463 let runtime = compute_runtime_plan(graph).unwrap();
2464 let src_step = runtime
2465 .steps
2466 .iter()
2467 .find_map(|step| match step {
2468 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2469 _ => None,
2470 })
2471 .unwrap();
2472
2473 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2474 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
2475
2476 let dst_a_step = runtime
2477 .steps
2478 .iter()
2479 .find_map(|step| match step {
2480 CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
2481 _ => None,
2482 })
2483 .unwrap();
2484 let dst_b_step = runtime
2485 .steps
2486 .iter()
2487 .find_map(|step| match step {
2488 CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
2489 _ => None,
2490 })
2491 .unwrap();
2492 let dst_a2_step = runtime
2493 .steps
2494 .iter()
2495 .find_map(|step| match step {
2496 CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
2497 _ => None,
2498 })
2499 .unwrap();
2500 let dst_c_step = runtime
2501 .steps
2502 .iter()
2503 .find_map(|step| match step {
2504 CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
2505 _ => None,
2506 })
2507 .unwrap();
2508
2509 assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
2510 assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
2511 assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
2512 assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
2513 }
2514
2515 #[test]
2516 fn test_runtime_output_ports_fanout_single() {
2517 let mut config = CuConfig::default();
2518 let graph = config.get_graph_mut(None).unwrap();
2519 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2520 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2521 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2522
2523 graph.connect(src_id, dst_a_id, "i32").unwrap();
2524 graph.connect(src_id, dst_b_id, "i32").unwrap();
2525
2526 let runtime = compute_runtime_plan(graph).unwrap();
2527 let src_step = runtime
2528 .steps
2529 .iter()
2530 .find_map(|step| match step {
2531 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2532 _ => None,
2533 })
2534 .unwrap();
2535
2536 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2537 assert_eq!(output_pack.msg_types, vec!["i32"]);
2538 }
2539
2540 #[test]
2541 fn test_runtime_output_ports_include_nc_outputs() {
2542 let mut config = CuConfig::default();
2543 let graph = config.get_graph_mut(None).unwrap();
2544 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2545 let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
2546 graph.connect(src_id, dst_id, "msg::A").unwrap();
2547 graph
2548 .get_node_mut(src_id)
2549 .expect("missing source node")
2550 .add_nc_output("msg::B", usize::MAX);
2551
2552 let runtime = compute_runtime_plan(graph).unwrap();
2553 let src_step = runtime
2554 .steps
2555 .iter()
2556 .find_map(|step| match step {
2557 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2558 _ => None,
2559 })
2560 .unwrap();
2561 let dst_step = runtime
2562 .steps
2563 .iter()
2564 .find_map(|step| match step {
2565 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2566 _ => None,
2567 })
2568 .unwrap();
2569
2570 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2571 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2572 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
2573 }
2574
2575 #[test]
2576 fn test_runtime_plan_infers_regular_task_when_outputs_are_nc_only() {
2577 let txt = r#"(
2578 tasks: [
2579 (id: "src", type: "a"),
2580 (id: "regular", type: "b"),
2581 ],
2582 cnx: [
2583 (src: "src", dst: "regular", msg: "msg::A"),
2584 (src: "regular", dst: "__nc__", msg: "msg::B"),
2585 ]
2586 )"#;
2587 let config = CuConfig::deserialize_ron(txt).unwrap();
2588 let graph = config.get_graph(None).unwrap();
2589 let regular_id = graph.get_node_id_by_name("regular").unwrap();
2590
2591 let runtime = compute_runtime_plan(graph).unwrap();
2592 let regular_step = runtime
2593 .steps
2594 .iter()
2595 .find_map(|step| match step {
2596 CuExecutionUnit::Step(step) if step.node_id == regular_id => Some(step),
2597 _ => None,
2598 })
2599 .unwrap();
2600
2601 assert_eq!(regular_step.task_type, CuTaskType::Regular);
2602 assert_eq!(
2603 regular_step.output_msg_pack.as_ref().unwrap().msg_types,
2604 vec!["msg::B"]
2605 );
2606 }
2607
2608 #[test]
2609 fn test_runtime_output_ports_respect_connection_order_with_nc() {
2610 let txt = r#"(
2611 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2612 cnx: [
2613 (src: "src", dst: "__nc__", msg: "msg::A"),
2614 (src: "src", dst: "sink", msg: "msg::B"),
2615 ]
2616 )"#;
2617 let config = CuConfig::deserialize_ron(txt).unwrap();
2618 let graph = config.get_graph(None).unwrap();
2619 let src_id = graph.get_node_id_by_name("src").unwrap();
2620 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2621
2622 let runtime = compute_runtime_plan(graph).unwrap();
2623 let src_step = runtime
2624 .steps
2625 .iter()
2626 .find_map(|step| match step {
2627 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2628 _ => None,
2629 })
2630 .unwrap();
2631 let dst_step = runtime
2632 .steps
2633 .iter()
2634 .find_map(|step| match step {
2635 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2636 _ => None,
2637 })
2638 .unwrap();
2639
2640 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2641 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2642 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2643 }
2644
2645 #[cfg(feature = "std")]
2646 #[test]
2647 fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
2648 let txt = r#"(
2649 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2650 cnx: [
2651 (src: "src", dst: "__nc__", msg: "msg::A"),
2652 (src: "src", dst: "sink", msg: "msg::B"),
2653 ]
2654 )"#;
2655 let tmp = tempfile::NamedTempFile::new().unwrap();
2656 std::fs::write(tmp.path(), txt).unwrap();
2657 let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
2658 let graph = config.get_graph(None).unwrap();
2659 let src_id = graph.get_node_id_by_name("src").unwrap();
2660 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2661
2662 let runtime = compute_runtime_plan(graph).unwrap();
2663 let src_step = runtime
2664 .steps
2665 .iter()
2666 .find_map(|step| match step {
2667 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2668 _ => None,
2669 })
2670 .unwrap();
2671 let dst_step = runtime
2672 .steps
2673 .iter()
2674 .find_map(|step| match step {
2675 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2676 _ => None,
2677 })
2678 .unwrap();
2679
2680 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2681 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2682 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2683 }
2684
2685 #[test]
2686 fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
2687 let txt = r#"(
2688 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2689 cnx: [
2690 (src: "src", dst: "__nc__", msg: "i32"),
2691 (src: "src", dst: "sink", msg: "bool"),
2692 ]
2693 )"#;
2694 let config = CuConfig::deserialize_ron(txt).unwrap();
2695 let graph = config.get_graph(None).unwrap();
2696 let src_id = graph.get_node_id_by_name("src").unwrap();
2697 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2698
2699 let runtime = compute_runtime_plan(graph).unwrap();
2700 let src_step = runtime
2701 .steps
2702 .iter()
2703 .find_map(|step| match step {
2704 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2705 _ => None,
2706 })
2707 .unwrap();
2708 let dst_step = runtime
2709 .steps
2710 .iter()
2711 .find_map(|step| match step {
2712 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2713 _ => None,
2714 })
2715 .unwrap();
2716
2717 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2718 assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
2719 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2720 }
2721
2722 #[test]
2723 fn test_runtime_plan_diamond_case1() {
2724 let mut config = CuConfig::default();
2726 let graph = config.get_graph_mut(None).unwrap();
2727 let cam0_id = graph
2728 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2729 .unwrap();
2730 let inf0_id = graph
2731 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2732 .unwrap();
2733 let broadcast_id = graph
2734 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2735 .unwrap();
2736
2737 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2739 graph.connect(cam0_id, inf0_id, "i32").unwrap();
2740 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2741
2742 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2743 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
2744
2745 assert_eq!(edge_cam0_to_inf0, 0);
2746 assert_eq!(edge_cam0_to_broadcast, 1);
2747
2748 let runtime = compute_runtime_plan(graph).unwrap();
2749 let broadcast_step = runtime
2750 .steps
2751 .iter()
2752 .find_map(|step| match step {
2753 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2754 _ => None,
2755 })
2756 .unwrap();
2757
2758 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2759 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2760 }
2761
2762 #[test]
2763 fn test_runtime_plan_diamond_case2() {
2764 let mut config = CuConfig::default();
2766 let graph = config.get_graph_mut(None).unwrap();
2767 let cam0_id = graph
2768 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2769 .unwrap();
2770 let inf0_id = graph
2771 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2772 .unwrap();
2773 let broadcast_id = graph
2774 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2775 .unwrap();
2776
2777 graph.connect(cam0_id, inf0_id, "i32").unwrap();
2779 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2780 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2781
2782 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2783 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
2784
2785 assert_eq!(edge_cam0_to_broadcast, 0);
2786 assert_eq!(edge_cam0_to_inf0, 1);
2787
2788 let runtime = compute_runtime_plan(graph).unwrap();
2789 let broadcast_step = runtime
2790 .steps
2791 .iter()
2792 .find_map(|step| match step {
2793 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2794 _ => None,
2795 })
2796 .unwrap();
2797
2798 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2799 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2800 }
2801}