1use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9#[cfg(feature = "std")]
10use crate::monitoring::ExecutionProbeHandle;
11#[cfg(feature = "std")]
12use crate::monitoring::MonitorExecutionProbe;
13use crate::monitoring::{
14 ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
15 ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
16};
17use crate::resource::ResourceManager;
18use compact_str::CompactString;
19use cu29_clock::{ClockProvider, CuTime, RobotClock};
20use cu29_traits::CuResult;
21use cu29_traits::WriteStream;
22use cu29_traits::{CopperListTuple, CuError};
23
24#[cfg(target_os = "none")]
25#[allow(unused_imports)]
26use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
27#[cfg(target_os = "none")]
28#[allow(unused_imports)]
29use cu29_log_derive::info;
30#[cfg(target_os = "none")]
31#[allow(unused_imports)]
32use cu29_log_runtime::log;
33#[cfg(all(target_os = "none", debug_assertions))]
34#[allow(unused_imports)]
35use cu29_log_runtime::log_debug_mode;
36#[cfg(target_os = "none")]
37#[allow(unused_imports)]
38use cu29_value::to_value;
39
40use alloc::boxed::Box;
41use alloc::collections::{BTreeSet, VecDeque};
42use alloc::format;
43use alloc::string::{String, ToString};
44use alloc::vec::Vec;
45use bincode::enc::EncoderImpl;
46use bincode::enc::write::{SizeWriter, SliceWriter};
47use bincode::error::EncodeError;
48use bincode::{Decode, Encode};
49use core::fmt::Result as FmtResult;
50use core::fmt::{Debug, Formatter};
51
52#[cfg(feature = "std")]
53use cu29_log_runtime::LoggerRuntime;
54#[cfg(feature = "std")]
55use cu29_unifiedlog::UnifiedLoggerWrite;
56#[cfg(feature = "std")]
57use std::sync::{Arc, Mutex};
58
59#[cfg(feature = "std")]
61pub struct CopperContext {
62 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
63 pub logger_runtime: LoggerRuntime,
64 pub clock: RobotClock,
65}
66
67#[inline]
75pub fn perf_now(_clock: &RobotClock) -> CuTime {
76 #[cfg(all(feature = "std", feature = "sysclock-perf"))]
77 {
78 static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
79 return PERF_CLOCK.get_or_init(RobotClock::new).now();
80 }
81
82 #[allow(unreachable_code)]
83 _clock.now()
84}
85
86pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
88 pub inner: CuListsManager<P, NBCL>,
89 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
91 pub last_encoded_bytes: u64,
93}
94
95impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
96 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
97 let mut is_top = true;
98 let mut nb_done = 0;
99 for cl in self.inner.iter_mut() {
100 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
101 cl.change_state(CopperListState::DoneProcessing);
102 }
103 if is_top && cl.get_state() == CopperListState::DoneProcessing {
104 if let Some(logger) = &mut self.logger {
105 cl.change_state(CopperListState::BeingSerialized);
106 logger.log(cl)?;
107 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
108 }
109 cl.change_state(CopperListState::Free);
110 nb_done += 1;
111 } else {
112 is_top = false;
113 }
114 }
115 for _ in 0..nb_done {
116 let _ = self.inner.pop();
117 }
118 Ok(())
119 }
120
121 pub fn available_copper_lists(&self) -> usize {
122 NBCL - self.inner.len()
123 }
124}
125
126pub struct KeyFramesManager {
128 inner: KeyFrame,
130
131 forced_timestamp: Option<CuTime>,
133
134 locked: bool,
136
137 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
139
140 keyframe_interval: u32,
142
143 pub last_encoded_bytes: u64,
145}
146
147impl KeyFramesManager {
148 fn is_keyframe(&self, culistid: u64) -> bool {
149 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
150 }
151
152 pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
153 if self.is_keyframe(culistid) {
154 if self.locked && self.inner.culistid == culistid {
156 return;
157 }
158 let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
159 self.inner.reset(culistid, ts);
160 self.locked = false;
161 }
162 }
163
164 #[cfg(feature = "std")]
166 pub fn set_forced_timestamp(&mut self, ts: CuTime) {
167 self.forced_timestamp = Some(ts);
168 }
169
170 pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
171 if self.is_keyframe(culistid) {
172 if self.locked {
173 return Ok(0);
175 }
176 if self.inner.culistid != culistid {
177 return Err(CuError::from(format!(
178 "Freezing task for culistid {} but current keyframe is {}",
179 culistid, self.inner.culistid
180 )));
181 }
182 self.inner
183 .add_frozen_task(task)
184 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
185 } else {
186 Ok(0)
187 }
188 }
189
190 pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
192 self.freeze_task(culistid, item)
193 }
194
195 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
196 if self.is_keyframe(culistid) {
197 let logger = self.logger.as_mut().unwrap();
198 logger.log(&self.inner)?;
199 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
200 self.locked = false;
202 Ok(())
203 } else {
204 self.last_encoded_bytes = 0;
206 Ok(())
207 }
208 }
209
210 #[cfg(feature = "std")]
212 pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
213 self.inner = keyframe.clone();
214 self.forced_timestamp = Some(keyframe.timestamp);
215 self.locked = true;
216 }
217}
218
219pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
223 pub clock: RobotClock, pub tasks: CT,
228
229 pub bridges: CB,
231
232 pub resources: ResourceManager,
234
235 pub monitor: M,
237
238 #[cfg(feature = "std")]
244 pub execution_probe: ExecutionProbeHandle,
245 #[cfg(not(feature = "std"))]
246 pub execution_probe: RuntimeExecutionProbe,
247
248 pub copperlists_manager: CopperListsManager<P, NBCL>,
250
251 pub keyframes_manager: KeyFramesManager,
253
254 pub runtime_config: RuntimeConfig,
256}
257
258impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
260 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
261{
262 fn get_clock(&self) -> RobotClock {
263 self.clock.clone()
264 }
265}
266
267#[derive(Clone, Encode, Decode)]
271pub struct KeyFrame {
272 pub culistid: u64,
274 pub timestamp: CuTime,
276 pub serialized_tasks: Vec<u8>,
278}
279
280impl KeyFrame {
281 fn new() -> Self {
282 KeyFrame {
283 culistid: 0,
284 timestamp: CuTime::default(),
285 serialized_tasks: Vec::new(),
286 }
287 }
288
289 fn reset(&mut self, culistid: u64, timestamp: CuTime) {
291 self.culistid = culistid;
292 self.timestamp = timestamp;
293 self.serialized_tasks.clear();
294 }
295
296 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
298 let cfg = bincode::config::standard();
299 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
300 BincodeAdapter(task).encode(&mut sizer)?;
301 let need = sizer.into_writer().bytes_written as usize;
302
303 let start = self.serialized_tasks.len();
304 self.serialized_tasks.resize(start + need, 0);
305 let mut enc =
306 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
307 BincodeAdapter(task).encode(&mut enc)?;
308 Ok(need)
309 }
310}
311
312#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
314pub enum RuntimeLifecycleConfigSource {
315 ProgrammaticOverride,
316 ExternalFile,
317 BundledDefault,
318}
319
320#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
322pub struct RuntimeLifecycleStackInfo {
323 pub app_name: String,
324 pub app_version: String,
325 pub git_commit: Option<String>,
326 pub git_dirty: Option<bool>,
327}
328
329#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
331pub enum RuntimeLifecycleEvent {
332 Instantiated {
333 config_source: RuntimeLifecycleConfigSource,
334 effective_config_ron: String,
335 stack: RuntimeLifecycleStackInfo,
336 },
337 MissionStarted {
338 mission: String,
339 },
340 MissionStopped {
341 mission: String,
342 reason: String,
345 },
346 Panic {
348 message: String,
349 file: Option<String>,
350 line: Option<u32>,
351 column: Option<u32>,
352 },
353 ShutdownCompleted,
354}
355
356#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
358pub struct RuntimeLifecycleRecord {
359 pub timestamp: CuTime,
360 pub event: RuntimeLifecycleEvent,
361}
362
363impl<
364 CT,
365 CB,
366 P: CopperListTuple + CuListZeroedInit + Default + 'static,
367 M: CuMonitor,
368 const NBCL: usize,
369> CuRuntime<CT, CB, P, M, NBCL>
370{
371 #[inline]
375 pub fn record_execution_marker(&self, marker: ExecutionMarker) {
376 self.execution_probe.record(marker);
377 }
378
379 #[allow(clippy::too_many_arguments)]
381 #[cfg(feature = "std")]
382 pub fn new(
383 clock: RobotClock,
384 config: &CuConfig,
385 mission: &str,
386 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
387 tasks_instanciator: impl for<'c> Fn(
388 Vec<Option<&'c ComponentConfig>>,
389 &mut ResourceManager,
390 ) -> CuResult<CT>,
391 monitored_components: &'static [MonitorComponentMetadata],
392 culist_component_mapping: &'static [ComponentId],
393 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
394 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
395 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
396 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
397 ) -> CuResult<Self> {
398 let resources = resources_instanciator(config)?;
399 Self::new_with_resources(
400 clock,
401 config,
402 mission,
403 resources,
404 tasks_instanciator,
405 monitored_components,
406 culist_component_mapping,
407 monitor_instanciator,
408 bridges_instanciator,
409 copperlists_logger,
410 keyframes_logger,
411 )
412 }
413
414 #[allow(clippy::too_many_arguments)]
415 #[cfg(feature = "std")]
416 pub fn new_with_resources(
417 clock: RobotClock,
418 config: &CuConfig,
419 mission: &str,
420 mut resources: ResourceManager,
421 tasks_instanciator: impl for<'c> Fn(
422 Vec<Option<&'c ComponentConfig>>,
423 &mut ResourceManager,
424 ) -> CuResult<CT>,
425 monitored_components: &'static [MonitorComponentMetadata],
426 culist_component_mapping: &'static [ComponentId],
427 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
428 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
429 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
430 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
431 ) -> CuResult<Self> {
432 let graph = config.get_graph(Some(mission))?;
433 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
434 .get_all_nodes()
435 .iter()
436 .map(|(_, node)| node.get_instance_config())
437 .collect();
438
439 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
440 let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
441 let monitor_metadata = CuMonitoringMetadata::new(
442 CompactString::from(mission),
443 monitored_components,
444 culist_component_mapping,
445 CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
446 build_monitor_topology(config, mission)?,
447 None,
448 )?;
449 let monitor_runtime =
450 CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
451 let monitor = monitor_instanciator(config, monitor_metadata, monitor_runtime);
452 let bridges = bridges_instanciator(config, &mut resources)?;
453
454 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
455 Some(logging_config) if logging_config.enable_task_logging => (
456 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
457 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
458 logging_config.keyframe_interval.unwrap(), ),
460 Some(_) => (None, None, 0), None => (
462 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
464 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
465 DEFAULT_KEYFRAME_INTERVAL,
466 ),
467 };
468
469 let copperlists_manager = CopperListsManager {
470 inner: CuListsManager::new(),
471 logger: copperlists_logger,
472 last_encoded_bytes: 0,
473 };
474 #[cfg(target_os = "none")]
475 {
476 let cl_size = core::mem::size_of::<CopperList<P>>();
477 let total_bytes = cl_size.saturating_mul(NBCL);
478 info!(
479 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
480 NBCL, cl_size, total_bytes
481 );
482 }
483
484 let keyframes_manager = KeyFramesManager {
485 inner: KeyFrame::new(),
486 logger: keyframes_logger,
487 keyframe_interval,
488 last_encoded_bytes: 0,
489 forced_timestamp: None,
490 locked: false,
491 };
492
493 let runtime_config = config.runtime.clone().unwrap_or_default();
494
495 let runtime = Self {
496 tasks,
497 bridges,
498 resources,
499 monitor,
500 execution_probe,
501 clock,
502 copperlists_manager,
503 keyframes_manager,
504 runtime_config,
505 };
506
507 Ok(runtime)
508 }
509
510 #[allow(clippy::too_many_arguments)]
511 #[cfg(not(feature = "std"))]
512 pub fn new(
513 clock: RobotClock,
514 config: &CuConfig,
515 mission: &str,
516 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
517 tasks_instanciator: impl for<'c> Fn(
518 Vec<Option<&'c ComponentConfig>>,
519 &mut ResourceManager,
520 ) -> CuResult<CT>,
521 monitored_components: &'static [MonitorComponentMetadata],
522 culist_component_mapping: &'static [ComponentId],
523 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
524 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
525 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
526 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
527 ) -> CuResult<Self> {
528 #[cfg(target_os = "none")]
529 info!("CuRuntime::new: resources instanciator");
530 let resources = resources_instanciator(config)?;
531 Self::new_with_resources(
532 clock,
533 config,
534 mission,
535 resources,
536 tasks_instanciator,
537 monitored_components,
538 culist_component_mapping,
539 monitor_instanciator,
540 bridges_instanciator,
541 copperlists_logger,
542 keyframes_logger,
543 )
544 }
545
546 #[allow(clippy::too_many_arguments)]
547 #[cfg(not(feature = "std"))]
548 pub fn new_with_resources(
549 clock: RobotClock,
550 config: &CuConfig,
551 mission: &str,
552 mut resources: ResourceManager,
553 tasks_instanciator: impl for<'c> Fn(
554 Vec<Option<&'c ComponentConfig>>,
555 &mut ResourceManager,
556 ) -> CuResult<CT>,
557 monitored_components: &'static [MonitorComponentMetadata],
558 culist_component_mapping: &'static [ComponentId],
559 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
560 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
561 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
562 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
563 ) -> CuResult<Self> {
564 #[cfg(target_os = "none")]
565 info!("CuRuntime::new: get graph");
566 let graph = config.get_graph(Some(mission))?;
567 #[cfg(target_os = "none")]
568 info!("CuRuntime::new: graph ok");
569 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
570 .get_all_nodes()
571 .iter()
572 .map(|(_, node)| node.get_instance_config())
573 .collect();
574
575 #[cfg(target_os = "none")]
576 info!("CuRuntime::new: tasks instanciator");
577 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
578
579 #[cfg(target_os = "none")]
580 info!("CuRuntime::new: monitor instanciator");
581 let monitor_metadata = CuMonitoringMetadata::new(
582 CompactString::from(mission),
583 monitored_components,
584 culist_component_mapping,
585 CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
586 build_monitor_topology(config, mission)?,
587 None,
588 )?;
589 let monitor_runtime = CuMonitoringRuntime::unavailable();
590 let monitor = monitor_instanciator(config, monitor_metadata, monitor_runtime);
591 let execution_probe = RuntimeExecutionProbe::default();
592 #[cfg(target_os = "none")]
593 info!("CuRuntime::new: monitor instanciator ok");
594 #[cfg(target_os = "none")]
595 info!("CuRuntime::new: bridges instanciator");
596 let bridges = bridges_instanciator(config, &mut resources)?;
597
598 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
599 Some(logging_config) if logging_config.enable_task_logging => (
600 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
601 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
602 logging_config.keyframe_interval.unwrap(), ),
604 Some(_) => (None, None, 0), None => (
606 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
608 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
609 DEFAULT_KEYFRAME_INTERVAL,
610 ),
611 };
612
613 let copperlists_manager = CopperListsManager {
614 inner: CuListsManager::new(),
615 logger: copperlists_logger,
616 last_encoded_bytes: 0,
617 };
618 #[cfg(target_os = "none")]
619 {
620 let cl_size = core::mem::size_of::<CopperList<P>>();
621 let total_bytes = cl_size.saturating_mul(NBCL);
622 info!(
623 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
624 NBCL, cl_size, total_bytes
625 );
626 }
627
628 let keyframes_manager = KeyFramesManager {
629 inner: KeyFrame::new(),
630 logger: keyframes_logger,
631 keyframe_interval,
632 last_encoded_bytes: 0,
633 forced_timestamp: None,
634 locked: false,
635 };
636
637 let runtime_config = config.runtime.clone().unwrap_or_default();
638
639 let runtime = Self {
640 tasks,
641 bridges,
642 resources,
643 monitor,
644 execution_probe,
645 clock,
646 copperlists_manager,
647 keyframes_manager,
648 runtime_config,
649 };
650
651 Ok(runtime)
652 }
653}
654
655#[derive(Debug, PartialEq, Eq, Clone, Copy)]
660pub enum CuTaskType {
661 Source,
662 Regular,
663 Sink,
664}
665
666#[derive(Debug, Clone)]
667pub struct CuOutputPack {
668 pub culist_index: u32,
669 pub msg_types: Vec<String>,
670}
671
672#[derive(Debug, Clone)]
673pub struct CuInputMsg {
674 pub culist_index: u32,
675 pub msg_type: String,
676 pub src_port: usize,
677 pub edge_id: usize,
678}
679
680pub struct CuExecutionStep {
682 pub node_id: NodeId,
684 pub node: Node,
686 pub task_type: CuTaskType,
688
689 pub input_msg_indices_types: Vec<CuInputMsg>,
691
692 pub output_msg_pack: Option<CuOutputPack>,
694}
695
696impl Debug for CuExecutionStep {
697 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
698 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
699 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
700 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
701 f.write_str(
702 format!(
703 " input_msg_types: {:?}\n",
704 self.input_msg_indices_types
705 )
706 .as_str(),
707 )?;
708 f.write_str(format!(" output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
709 Ok(())
710 }
711}
712
713pub struct CuExecutionLoop {
718 pub steps: Vec<CuExecutionUnit>,
719 pub loop_count: Option<u32>,
720}
721
722impl Debug for CuExecutionLoop {
723 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
724 f.write_str("CuExecutionLoop:\n")?;
725 for step in &self.steps {
726 match step {
727 CuExecutionUnit::Step(step) => {
728 step.fmt(f)?;
729 }
730 CuExecutionUnit::Loop(l) => {
731 l.fmt(f)?;
732 }
733 }
734 }
735
736 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
737 Ok(())
738 }
739}
740
741#[derive(Debug)]
743pub enum CuExecutionUnit {
744 Step(Box<CuExecutionStep>),
745 Loop(CuExecutionLoop),
746}
747
748fn find_output_pack_from_nodeid(
749 node_id: NodeId,
750 steps: &Vec<CuExecutionUnit>,
751) -> Option<CuOutputPack> {
752 for step in steps {
753 match step {
754 CuExecutionUnit::Loop(loop_unit) => {
755 if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
756 return Some(output_pack);
757 }
758 }
759 CuExecutionUnit::Step(step) => {
760 if step.node_id == node_id {
761 return step.output_msg_pack.clone();
762 }
763 }
764 }
765 }
766 None
767}
768
769pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
770 if graph.incoming_neighbor_count(node_id) == 0 {
771 CuTaskType::Source
772 } else if graph.outgoing_neighbor_count(node_id) == 0 {
773 CuTaskType::Sink
774 } else {
775 CuTaskType::Regular
776 }
777}
778
779fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
782 input_msg_indices_types.sort_by_key(|input| input.edge_id);
783}
784
785fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
786 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
787 edge_ids.sort();
788
789 let mut msg_order: Vec<(usize, String)> = Vec::new();
790 let mut record_msg = |msg: String, order: usize| {
791 if let Some((existing_order, _)) = msg_order
792 .iter_mut()
793 .find(|(_, existing_msg)| *existing_msg == msg)
794 {
795 if order < *existing_order {
796 *existing_order = order;
797 }
798 return;
799 }
800 msg_order.push((order, msg));
801 };
802
803 for edge_id in edge_ids {
804 if let Some(edge) = graph.edge(edge_id) {
805 let order = if edge.order == usize::MAX {
806 edge_id
807 } else {
808 edge.order
809 };
810 record_msg(edge.msg.clone(), order);
811 }
812 }
813 if let Some(node) = graph.get_node(node_id) {
814 for (msg, order) in node.nc_outputs_with_order() {
815 record_msg(msg.clone(), order);
816 }
817 }
818
819 msg_order.sort_by(|(order_a, msg_a), (order_b, msg_b)| {
820 order_a.cmp(order_b).then_with(|| msg_a.cmp(msg_b))
821 });
822 msg_order.into_iter().map(|(_, msg)| msg).collect()
823}
824fn plan_tasks_tree_branch(
826 graph: &CuGraph,
827 mut next_culist_output_index: u32,
828 starting_point: NodeId,
829 plan: &mut Vec<CuExecutionUnit>,
830) -> (u32, bool) {
831 #[cfg(all(feature = "std", feature = "macro_debug"))]
832 eprintln!("-- starting branch from node {starting_point}");
833
834 let mut handled = false;
835
836 for id in graph.bfs_nodes(starting_point) {
837 let node_ref = graph.get_node(id).unwrap();
838 #[cfg(all(feature = "std", feature = "macro_debug"))]
839 eprintln!(" Visiting node: {node_ref:?}");
840
841 let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
842 let output_msg_pack: Option<CuOutputPack>;
843 let task_type = find_task_type_for_id(graph, id);
844
845 match task_type {
846 CuTaskType::Source => {
847 #[cfg(all(feature = "std", feature = "macro_debug"))]
848 eprintln!(" → Source node, assign output index {next_culist_output_index}");
849 let msg_types = collect_output_msg_types(graph, id);
850 if msg_types.is_empty() {
851 panic!(
852 "Source node '{}' has no outgoing connections",
853 node_ref.get_id()
854 );
855 }
856 output_msg_pack = Some(CuOutputPack {
857 culist_index: next_culist_output_index,
858 msg_types,
859 });
860 next_culist_output_index += 1;
861 }
862 CuTaskType::Sink => {
863 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
864 edge_ids.sort();
865 #[cfg(all(feature = "std", feature = "macro_debug"))]
866 eprintln!(" → Sink with incoming edges: {edge_ids:?}");
867 for edge_id in edge_ids {
868 let edge = graph
869 .edge(edge_id)
870 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
871 let pid = graph
872 .get_node_id_by_name(edge.src.as_str())
873 .unwrap_or_else(|| {
874 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
875 });
876 let output_pack = find_output_pack_from_nodeid(pid, plan);
877 if let Some(output_pack) = output_pack {
878 #[cfg(all(feature = "std", feature = "macro_debug"))]
879 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
880 let msg_type = edge.msg.as_str();
881 let src_port = output_pack
882 .msg_types
883 .iter()
884 .position(|msg| msg == msg_type)
885 .unwrap_or_else(|| {
886 panic!(
887 "Missing output port for message type '{msg_type}' on node {pid}"
888 )
889 });
890 input_msg_indices_types.push(CuInputMsg {
891 culist_index: output_pack.culist_index,
892 msg_type: msg_type.to_string(),
893 src_port,
894 edge_id,
895 });
896 } else {
897 #[cfg(all(feature = "std", feature = "macro_debug"))]
898 eprintln!(" ✗ Input from {pid} not ready, returning");
899 return (next_culist_output_index, handled);
900 }
901 }
902 output_msg_pack = Some(CuOutputPack {
903 culist_index: next_culist_output_index,
904 msg_types: Vec::from(["()".to_string()]),
905 });
906 next_culist_output_index += 1;
907 }
908 CuTaskType::Regular => {
909 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
910 edge_ids.sort();
911 #[cfg(all(feature = "std", feature = "macro_debug"))]
912 eprintln!(" → Regular task with incoming edges: {edge_ids:?}");
913 for edge_id in edge_ids {
914 let edge = graph
915 .edge(edge_id)
916 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
917 let pid = graph
918 .get_node_id_by_name(edge.src.as_str())
919 .unwrap_or_else(|| {
920 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
921 });
922 let output_pack = find_output_pack_from_nodeid(pid, plan);
923 if let Some(output_pack) = output_pack {
924 #[cfg(all(feature = "std", feature = "macro_debug"))]
925 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
926 let msg_type = edge.msg.as_str();
927 let src_port = output_pack
928 .msg_types
929 .iter()
930 .position(|msg| msg == msg_type)
931 .unwrap_or_else(|| {
932 panic!(
933 "Missing output port for message type '{msg_type}' on node {pid}"
934 )
935 });
936 input_msg_indices_types.push(CuInputMsg {
937 culist_index: output_pack.culist_index,
938 msg_type: msg_type.to_string(),
939 src_port,
940 edge_id,
941 });
942 } else {
943 #[cfg(all(feature = "std", feature = "macro_debug"))]
944 eprintln!(" ✗ Input from {pid} not ready, returning");
945 return (next_culist_output_index, handled);
946 }
947 }
948 let msg_types = collect_output_msg_types(graph, id);
949 if msg_types.is_empty() {
950 panic!(
951 "Regular node '{}' has no outgoing connections",
952 node_ref.get_id()
953 );
954 }
955 output_msg_pack = Some(CuOutputPack {
956 culist_index: next_culist_output_index,
957 msg_types,
958 });
959 next_culist_output_index += 1;
960 }
961 }
962
963 sort_inputs_by_cnx_id(&mut input_msg_indices_types);
964
965 if let Some(pos) = plan
966 .iter()
967 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
968 {
969 #[cfg(all(feature = "std", feature = "macro_debug"))]
970 eprintln!(" → Already in plan, modifying existing step");
971 let mut step = plan.remove(pos);
972 if let CuExecutionUnit::Step(ref mut s) = step {
973 s.input_msg_indices_types = input_msg_indices_types;
974 }
975 plan.push(step);
976 } else {
977 #[cfg(all(feature = "std", feature = "macro_debug"))]
978 eprintln!(" → New step added to plan");
979 let step = CuExecutionStep {
980 node_id: id,
981 node: node_ref.clone(),
982 task_type,
983 input_msg_indices_types,
984 output_msg_pack,
985 };
986 plan.push(CuExecutionUnit::Step(Box::new(step)));
987 }
988
989 handled = true;
990 }
991
992 #[cfg(all(feature = "std", feature = "macro_debug"))]
993 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
994 (next_culist_output_index, handled)
995}
996
997pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1000 #[cfg(all(feature = "std", feature = "macro_debug"))]
1001 eprintln!("[runtime plan]");
1002 let mut plan = Vec::new();
1003 let mut next_culist_output_index = 0u32;
1004
1005 let mut queue: VecDeque<NodeId> = graph
1006 .node_ids()
1007 .into_iter()
1008 .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
1009 .collect();
1010
1011 #[cfg(all(feature = "std", feature = "macro_debug"))]
1012 eprintln!("Initial source nodes: {queue:?}");
1013
1014 while let Some(start_node) = queue.pop_front() {
1015 #[cfg(all(feature = "std", feature = "macro_debug"))]
1016 eprintln!("→ Starting BFS from source {start_node}");
1017 for node_id in graph.bfs_nodes(start_node) {
1018 let already_in_plan = plan
1019 .iter()
1020 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1021 if already_in_plan {
1022 #[cfg(all(feature = "std", feature = "macro_debug"))]
1023 eprintln!(" → Node {node_id} already planned, skipping");
1024 continue;
1025 }
1026
1027 #[cfg(all(feature = "std", feature = "macro_debug"))]
1028 eprintln!(" Planning from node {node_id}");
1029 let (new_index, handled) =
1030 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
1031 next_culist_output_index = new_index;
1032
1033 if !handled {
1034 #[cfg(all(feature = "std", feature = "macro_debug"))]
1035 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1036 continue;
1037 }
1038
1039 #[cfg(all(feature = "std", feature = "macro_debug"))]
1040 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
1041 for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1042 #[cfg(all(feature = "std", feature = "macro_debug"))]
1043 eprintln!(" → Enqueueing neighbor {neighbor}");
1044 queue.push_back(neighbor);
1045 }
1046 }
1047 }
1048
1049 let mut planned_nodes = BTreeSet::new();
1050 for unit in &plan {
1051 if let CuExecutionUnit::Step(step) = unit {
1052 planned_nodes.insert(step.node_id);
1053 }
1054 }
1055
1056 let mut missing = Vec::new();
1057 for node_id in graph.node_ids() {
1058 if !planned_nodes.contains(&node_id) {
1059 if let Some(node) = graph.get_node(node_id) {
1060 missing.push(node.get_id().to_string());
1061 } else {
1062 missing.push(format!("node_id_{node_id}"));
1063 }
1064 }
1065 }
1066
1067 if !missing.is_empty() {
1068 missing.sort();
1069 return Err(CuError::from(format!(
1070 "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1071 missing.join(", ")
1072 )));
1073 }
1074
1075 Ok(CuExecutionLoop {
1076 steps: plan,
1077 loop_count: None,
1078 })
1079}
1080
1081#[cfg(test)]
1083mod tests {
1084 use super::*;
1085 use crate::config::Node;
1086 use crate::context::CuContext;
1087 use crate::cutask::CuSinkTask;
1088 use crate::cutask::{CuSrcTask, Freezable};
1089 use crate::monitoring::NoMonitor;
1090 use crate::reflect::Reflect;
1091 use bincode::Encode;
1092 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1093 use serde_derive::{Deserialize, Serialize};
1094
1095 #[derive(Reflect)]
1096 pub struct TestSource {}
1097
1098 impl Freezable for TestSource {}
1099
1100 impl CuSrcTask for TestSource {
1101 type Resources<'r> = ();
1102 type Output<'m> = ();
1103 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1104 where
1105 Self: Sized,
1106 {
1107 Ok(Self {})
1108 }
1109
1110 fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1111 Ok(())
1112 }
1113 }
1114
1115 #[derive(Reflect)]
1116 pub struct TestSink {}
1117
1118 impl Freezable for TestSink {}
1119
1120 impl CuSinkTask for TestSink {
1121 type Resources<'r> = ();
1122 type Input<'m> = ();
1123
1124 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1125 where
1126 Self: Sized,
1127 {
1128 Ok(Self {})
1129 }
1130
1131 fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1132 Ok(())
1133 }
1134 }
1135
1136 type Tasks = (TestSource, TestSink);
1138
1139 #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1140 struct Msgs(());
1141
1142 impl ErasedCuStampedDataSet for Msgs {
1143 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1144 Vec::new()
1145 }
1146 }
1147
1148 impl MatchingTasks for Msgs {
1149 fn get_all_task_ids() -> &'static [&'static str] {
1150 &[]
1151 }
1152 }
1153
1154 impl CuListZeroedInit for Msgs {
1155 fn init_zeroed(&mut self) {}
1156 }
1157
1158 #[cfg(feature = "std")]
1159 fn tasks_instanciator(
1160 all_instances_configs: Vec<Option<&ComponentConfig>>,
1161 _resources: &mut ResourceManager,
1162 ) -> CuResult<Tasks> {
1163 Ok((
1164 TestSource::new(all_instances_configs[0], ())?,
1165 TestSink::new(all_instances_configs[1], ())?,
1166 ))
1167 }
1168
1169 #[cfg(not(feature = "std"))]
1170 fn tasks_instanciator(
1171 all_instances_configs: Vec<Option<&ComponentConfig>>,
1172 _resources: &mut ResourceManager,
1173 ) -> CuResult<Tasks> {
1174 Ok((
1175 TestSource::new(all_instances_configs[0], ())?,
1176 TestSink::new(all_instances_configs[1], ())?,
1177 ))
1178 }
1179
1180 fn monitor_instanciator(
1181 _config: &CuConfig,
1182 metadata: CuMonitoringMetadata,
1183 runtime: CuMonitoringRuntime,
1184 ) -> NoMonitor {
1185 NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
1186 }
1187
1188 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1189 Ok(())
1190 }
1191
1192 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1193 Ok(ResourceManager::new(&[]))
1194 }
1195
1196 #[derive(Debug)]
1197 struct FakeWriter {}
1198
1199 impl<E: Encode> WriteStream<E> for FakeWriter {
1200 fn log(&mut self, _obj: &E) -> CuResult<()> {
1201 Ok(())
1202 }
1203 }
1204
1205 #[test]
1206 fn test_runtime_instantiation() {
1207 let mut config = CuConfig::default();
1208 let graph = config.get_graph_mut(None).unwrap();
1209 graph.add_node(Node::new("a", "TestSource")).unwrap();
1210 graph.add_node(Node::new("b", "TestSink")).unwrap();
1211 graph.connect(0, 1, "()").unwrap();
1212 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1213 RobotClock::default(),
1214 &config,
1215 crate::config::DEFAULT_MISSION_ID,
1216 resources_instanciator,
1217 tasks_instanciator,
1218 &[],
1219 &[],
1220 monitor_instanciator,
1221 bridges_instanciator,
1222 FakeWriter {},
1223 FakeWriter {},
1224 );
1225 assert!(runtime.is_ok());
1226 }
1227
1228 #[test]
1229 fn test_copperlists_manager_lifecycle() {
1230 let mut config = CuConfig::default();
1231 let graph = config.get_graph_mut(None).unwrap();
1232 graph.add_node(Node::new("a", "TestSource")).unwrap();
1233 graph.add_node(Node::new("b", "TestSink")).unwrap();
1234 graph.connect(0, 1, "()").unwrap();
1235
1236 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1237 RobotClock::default(),
1238 &config,
1239 crate::config::DEFAULT_MISSION_ID,
1240 resources_instanciator,
1241 tasks_instanciator,
1242 &[],
1243 &[],
1244 monitor_instanciator,
1245 bridges_instanciator,
1246 FakeWriter {},
1247 FakeWriter {},
1248 )
1249 .unwrap();
1250
1251 {
1253 let copperlists = &mut runtime.copperlists_manager;
1254 let culist0 = copperlists
1255 .inner
1256 .create()
1257 .expect("Ran out of space for copper lists");
1258 let id = culist0.id;
1260 assert_eq!(id, 0);
1261 culist0.change_state(CopperListState::Processing);
1262 assert_eq!(copperlists.available_copper_lists(), 1);
1263 }
1264
1265 {
1266 let copperlists = &mut runtime.copperlists_manager;
1267 let culist1 = copperlists
1268 .inner
1269 .create()
1270 .expect("Ran out of space for copper lists"); let id = culist1.id;
1272 assert_eq!(id, 1);
1273 culist1.change_state(CopperListState::Processing);
1274 assert_eq!(copperlists.available_copper_lists(), 0);
1275 }
1276
1277 {
1278 let copperlists = &mut runtime.copperlists_manager;
1279 let culist2 = copperlists.inner.create();
1280 assert!(culist2.is_none());
1281 assert_eq!(copperlists.available_copper_lists(), 0);
1282 let _ = copperlists.end_of_processing(1);
1284 assert_eq!(copperlists.available_copper_lists(), 1);
1285 }
1286
1287 {
1289 let copperlists = &mut runtime.copperlists_manager;
1290 let culist2 = copperlists
1291 .inner
1292 .create()
1293 .expect("Ran out of space for copper lists"); let id = culist2.id;
1295 assert_eq!(id, 2);
1296 culist2.change_state(CopperListState::Processing);
1297 assert_eq!(copperlists.available_copper_lists(), 0);
1298 let _ = copperlists.end_of_processing(0);
1300 assert_eq!(copperlists.available_copper_lists(), 0);
1302
1303 let _ = copperlists.end_of_processing(2);
1305 assert_eq!(copperlists.available_copper_lists(), 2);
1308 }
1309 }
1310
1311 #[test]
1312 fn test_runtime_task_input_order() {
1313 let mut config = CuConfig::default();
1314 let graph = config.get_graph_mut(None).unwrap();
1315 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1316 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1317 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1318
1319 assert_eq!(src1_id, 0);
1320 assert_eq!(src2_id, 1);
1321
1322 let src1_type = "src1_type";
1324 let src2_type = "src2_type";
1325 graph.connect(src2_id, sink_id, src2_type).unwrap();
1326 graph.connect(src1_id, sink_id, src1_type).unwrap();
1327
1328 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1329 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1330 assert_eq!(src1_edge_id, 1);
1333 assert_eq!(src2_edge_id, 0);
1334
1335 let runtime = compute_runtime_plan(graph).unwrap();
1336 let sink_step = runtime
1337 .steps
1338 .iter()
1339 .find_map(|step| match step {
1340 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1341 _ => None,
1342 })
1343 .unwrap();
1344
1345 assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
1348 assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
1349 }
1350
1351 #[test]
1352 fn test_runtime_output_ports_unique_ordered() {
1353 let mut config = CuConfig::default();
1354 let graph = config.get_graph_mut(None).unwrap();
1355 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1356 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1357 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1358 let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
1359 let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
1360
1361 graph.connect(src_id, dst_a_id, "msg::A").unwrap();
1362 graph.connect(src_id, dst_b_id, "msg::B").unwrap();
1363 graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
1364 graph.connect(src_id, dst_c_id, "msg::C").unwrap();
1365
1366 let runtime = compute_runtime_plan(graph).unwrap();
1367 let src_step = runtime
1368 .steps
1369 .iter()
1370 .find_map(|step| match step {
1371 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1372 _ => None,
1373 })
1374 .unwrap();
1375
1376 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1377 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
1378
1379 let dst_a_step = runtime
1380 .steps
1381 .iter()
1382 .find_map(|step| match step {
1383 CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
1384 _ => None,
1385 })
1386 .unwrap();
1387 let dst_b_step = runtime
1388 .steps
1389 .iter()
1390 .find_map(|step| match step {
1391 CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
1392 _ => None,
1393 })
1394 .unwrap();
1395 let dst_a2_step = runtime
1396 .steps
1397 .iter()
1398 .find_map(|step| match step {
1399 CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
1400 _ => None,
1401 })
1402 .unwrap();
1403 let dst_c_step = runtime
1404 .steps
1405 .iter()
1406 .find_map(|step| match step {
1407 CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
1408 _ => None,
1409 })
1410 .unwrap();
1411
1412 assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
1413 assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
1414 assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
1415 assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
1416 }
1417
1418 #[test]
1419 fn test_runtime_output_ports_fanout_single() {
1420 let mut config = CuConfig::default();
1421 let graph = config.get_graph_mut(None).unwrap();
1422 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1423 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1424 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1425
1426 graph.connect(src_id, dst_a_id, "i32").unwrap();
1427 graph.connect(src_id, dst_b_id, "i32").unwrap();
1428
1429 let runtime = compute_runtime_plan(graph).unwrap();
1430 let src_step = runtime
1431 .steps
1432 .iter()
1433 .find_map(|step| match step {
1434 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1435 _ => None,
1436 })
1437 .unwrap();
1438
1439 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1440 assert_eq!(output_pack.msg_types, vec!["i32"]);
1441 }
1442
1443 #[test]
1444 fn test_runtime_output_ports_include_nc_outputs() {
1445 let mut config = CuConfig::default();
1446 let graph = config.get_graph_mut(None).unwrap();
1447 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1448 let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
1449 graph.connect(src_id, dst_id, "msg::A").unwrap();
1450 graph
1451 .get_node_mut(src_id)
1452 .expect("missing source node")
1453 .add_nc_output("msg::B", usize::MAX);
1454
1455 let runtime = compute_runtime_plan(graph).unwrap();
1456 let src_step = runtime
1457 .steps
1458 .iter()
1459 .find_map(|step| match step {
1460 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1461 _ => None,
1462 })
1463 .unwrap();
1464 let dst_step = runtime
1465 .steps
1466 .iter()
1467 .find_map(|step| match step {
1468 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
1469 _ => None,
1470 })
1471 .unwrap();
1472
1473 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1474 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
1475 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
1476 }
1477
1478 #[test]
1479 fn test_runtime_output_ports_respect_connection_order_with_nc() {
1480 let txt = r#"(
1481 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
1482 cnx: [
1483 (src: "src", dst: "__nc__", msg: "msg::A"),
1484 (src: "src", dst: "sink", msg: "msg::B"),
1485 ]
1486 )"#;
1487 let config = CuConfig::deserialize_ron(txt).unwrap();
1488 let graph = config.get_graph(None).unwrap();
1489 let src_id = graph.get_node_id_by_name("src").unwrap();
1490 let dst_id = graph.get_node_id_by_name("sink").unwrap();
1491
1492 let runtime = compute_runtime_plan(graph).unwrap();
1493 let src_step = runtime
1494 .steps
1495 .iter()
1496 .find_map(|step| match step {
1497 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1498 _ => None,
1499 })
1500 .unwrap();
1501 let dst_step = runtime
1502 .steps
1503 .iter()
1504 .find_map(|step| match step {
1505 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
1506 _ => None,
1507 })
1508 .unwrap();
1509
1510 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1511 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
1512 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
1513 }
1514
1515 #[cfg(feature = "std")]
1516 #[test]
1517 fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
1518 let txt = r#"(
1519 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
1520 cnx: [
1521 (src: "src", dst: "__nc__", msg: "msg::A"),
1522 (src: "src", dst: "sink", msg: "msg::B"),
1523 ]
1524 )"#;
1525 let tmp = tempfile::NamedTempFile::new().unwrap();
1526 std::fs::write(tmp.path(), txt).unwrap();
1527 let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
1528 let graph = config.get_graph(None).unwrap();
1529 let src_id = graph.get_node_id_by_name("src").unwrap();
1530 let dst_id = graph.get_node_id_by_name("sink").unwrap();
1531
1532 let runtime = compute_runtime_plan(graph).unwrap();
1533 let src_step = runtime
1534 .steps
1535 .iter()
1536 .find_map(|step| match step {
1537 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1538 _ => None,
1539 })
1540 .unwrap();
1541 let dst_step = runtime
1542 .steps
1543 .iter()
1544 .find_map(|step| match step {
1545 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
1546 _ => None,
1547 })
1548 .unwrap();
1549
1550 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1551 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
1552 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
1553 }
1554
1555 #[test]
1556 fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
1557 let txt = r#"(
1558 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
1559 cnx: [
1560 (src: "src", dst: "__nc__", msg: "i32"),
1561 (src: "src", dst: "sink", msg: "bool"),
1562 ]
1563 )"#;
1564 let config = CuConfig::deserialize_ron(txt).unwrap();
1565 let graph = config.get_graph(None).unwrap();
1566 let src_id = graph.get_node_id_by_name("src").unwrap();
1567 let dst_id = graph.get_node_id_by_name("sink").unwrap();
1568
1569 let runtime = compute_runtime_plan(graph).unwrap();
1570 let src_step = runtime
1571 .steps
1572 .iter()
1573 .find_map(|step| match step {
1574 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1575 _ => None,
1576 })
1577 .unwrap();
1578 let dst_step = runtime
1579 .steps
1580 .iter()
1581 .find_map(|step| match step {
1582 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
1583 _ => None,
1584 })
1585 .unwrap();
1586
1587 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1588 assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
1589 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
1590 }
1591
1592 #[test]
1593 fn test_runtime_plan_diamond_case1() {
1594 let mut config = CuConfig::default();
1596 let graph = config.get_graph_mut(None).unwrap();
1597 let cam0_id = graph
1598 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1599 .unwrap();
1600 let inf0_id = graph
1601 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1602 .unwrap();
1603 let broadcast_id = graph
1604 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1605 .unwrap();
1606
1607 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1609 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1610 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1611
1612 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1613 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1614
1615 assert_eq!(edge_cam0_to_inf0, 0);
1616 assert_eq!(edge_cam0_to_broadcast, 1);
1617
1618 let runtime = compute_runtime_plan(graph).unwrap();
1619 let broadcast_step = runtime
1620 .steps
1621 .iter()
1622 .find_map(|step| match step {
1623 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1624 _ => None,
1625 })
1626 .unwrap();
1627
1628 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1629 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1630 }
1631
1632 #[test]
1633 fn test_runtime_plan_diamond_case2() {
1634 let mut config = CuConfig::default();
1636 let graph = config.get_graph_mut(None).unwrap();
1637 let cam0_id = graph
1638 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1639 .unwrap();
1640 let inf0_id = graph
1641 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1642 .unwrap();
1643 let broadcast_id = graph
1644 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1645 .unwrap();
1646
1647 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1649 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1650 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1651
1652 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1653 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1654
1655 assert_eq!(edge_cam0_to_broadcast, 0);
1656 assert_eq!(edge_cam0_to_inf0, 1);
1657
1658 let runtime = compute_runtime_plan(graph).unwrap();
1659 let broadcast_step = runtime
1660 .steps
1661 .iter()
1662 .find_map(|step| match step {
1663 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1664 _ => None,
1665 })
1666 .unwrap();
1667
1668 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1669 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1670 }
1671}