1use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16#[cfg(target_os = "none")]
17#[allow(unused_imports)]
18use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
19#[cfg(target_os = "none")]
20#[allow(unused_imports)]
21use cu29_log_derive::info;
22#[cfg(target_os = "none")]
23#[allow(unused_imports)]
24use cu29_log_runtime::log;
25#[cfg(all(target_os = "none", debug_assertions))]
26#[allow(unused_imports)]
27use cu29_log_runtime::log_debug_mode;
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_value::to_value;
31
32use alloc::boxed::Box;
33use alloc::collections::{BTreeSet, VecDeque};
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use bincode::enc::EncoderImpl;
38use bincode::enc::write::{SizeWriter, SliceWriter};
39use bincode::error::EncodeError;
40use bincode::{Decode, Encode};
41use core::fmt::Result as FmtResult;
42use core::fmt::{Debug, Formatter};
43
44#[cfg(feature = "std")]
45use cu29_log_runtime::LoggerRuntime;
46#[cfg(feature = "std")]
47use cu29_unifiedlog::UnifiedLoggerWrite;
48#[cfg(feature = "std")]
49use std::sync::{Arc, Mutex};
50
51#[cfg(feature = "std")]
53pub struct CopperContext {
54 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
55 pub logger_runtime: LoggerRuntime,
56 pub clock: RobotClock,
57}
58
59pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
61 pub inner: CuListsManager<P, NBCL>,
62 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
64}
65
66impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
67 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
68 let mut is_top = true;
69 let mut nb_done = 0;
70 for cl in self.inner.iter_mut() {
71 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
72 cl.change_state(CopperListState::DoneProcessing);
73 }
74 if is_top && cl.get_state() == CopperListState::DoneProcessing {
75 if let Some(logger) = &mut self.logger {
76 cl.change_state(CopperListState::BeingSerialized);
77 logger.log(cl)?;
78 }
79 cl.change_state(CopperListState::Free);
80 nb_done += 1;
81 } else {
82 is_top = false;
83 }
84 }
85 for _ in 0..nb_done {
86 let _ = self.inner.pop();
87 }
88 Ok(())
89 }
90
91 pub fn available_copper_lists(&self) -> usize {
92 NBCL - self.inner.len()
93 }
94}
95
96pub struct KeyFramesManager {
98 inner: KeyFrame,
100
101 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
103
104 keyframe_interval: u32,
106}
107
108impl KeyFramesManager {
109 fn is_keyframe(&self, culistid: u32) -> bool {
110 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
111 }
112
113 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
114 if self.is_keyframe(culistid) {
115 self.inner.reset(culistid, clock.now());
116 }
117 }
118
119 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
120 if self.is_keyframe(culistid) {
121 if self.inner.culistid != culistid {
122 panic!("Freezing task for a different culistid");
123 }
124 self.inner
125 .add_frozen_task(task)
126 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
127 } else {
128 Ok(0)
129 }
130 }
131
132 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
133 if self.is_keyframe(culistid) {
134 let logger = self.logger.as_mut().unwrap();
135 logger.log(&self.inner)
136 } else {
137 Ok(())
138 }
139 }
140}
141
142pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
146 pub clock: RobotClock, pub tasks: CT,
151
152 pub bridges: CB,
154
155 pub resources: ResourceManager,
157
158 pub monitor: M,
160
161 pub copperlists_manager: CopperListsManager<P, NBCL>,
163
164 pub keyframes_manager: KeyFramesManager,
166
167 pub runtime_config: RuntimeConfig,
169}
170
171impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
173 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
174{
175 fn get_clock(&self) -> RobotClock {
176 self.clock.clone()
177 }
178}
179
180#[derive(Encode, Decode)]
184pub struct KeyFrame {
185 pub culistid: u32,
187 pub timestamp: CuTime,
189 pub serialized_tasks: Vec<u8>,
191}
192
193impl KeyFrame {
194 fn new() -> Self {
195 KeyFrame {
196 culistid: 0,
197 timestamp: CuTime::default(),
198 serialized_tasks: Vec::new(),
199 }
200 }
201
202 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
204 self.culistid = culistid;
205 self.timestamp = timestamp;
206 self.serialized_tasks.clear();
207 }
208
209 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
211 let cfg = bincode::config::standard();
212 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
213 BincodeAdapter(task).encode(&mut sizer)?;
214 let need = sizer.into_writer().bytes_written as usize;
215
216 let start = self.serialized_tasks.len();
217 self.serialized_tasks.resize(start + need, 0);
218 let mut enc =
219 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
220 BincodeAdapter(task).encode(&mut enc)?;
221 Ok(need)
222 }
223}
224
225impl<
226 CT,
227 CB,
228 P: CopperListTuple + CuListZeroedInit + Default + 'static,
229 M: CuMonitor,
230 const NBCL: usize,
231> CuRuntime<CT, CB, P, M, NBCL>
232{
233 #[allow(clippy::too_many_arguments)]
235 #[cfg(feature = "std")]
236 pub fn new(
237 clock: RobotClock,
238 config: &CuConfig,
239 mission: Option<&str>,
240 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
241 tasks_instanciator: impl for<'c> Fn(
242 Vec<Option<&'c ComponentConfig>>,
243 &mut ResourceManager,
244 ) -> CuResult<CT>,
245 monitor_instanciator: impl Fn(&CuConfig) -> M,
246 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
247 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
248 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
249 ) -> CuResult<Self> {
250 let resources = resources_instanciator(config)?;
251 Self::new_with_resources(
252 clock,
253 config,
254 mission,
255 resources,
256 tasks_instanciator,
257 monitor_instanciator,
258 bridges_instanciator,
259 copperlists_logger,
260 keyframes_logger,
261 )
262 }
263
264 #[allow(clippy::too_many_arguments)]
265 #[cfg(feature = "std")]
266 pub fn new_with_resources(
267 clock: RobotClock,
268 config: &CuConfig,
269 mission: Option<&str>,
270 mut resources: ResourceManager,
271 tasks_instanciator: impl for<'c> Fn(
272 Vec<Option<&'c ComponentConfig>>,
273 &mut ResourceManager,
274 ) -> CuResult<CT>,
275 monitor_instanciator: impl Fn(&CuConfig) -> M,
276 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
277 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
278 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
279 ) -> CuResult<Self> {
280 let graph = config.get_graph(mission)?;
281 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
282 .get_all_nodes()
283 .iter()
284 .map(|(_, node)| node.get_instance_config())
285 .collect();
286
287 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
288 let mut monitor = monitor_instanciator(config);
289 if let Ok(topology) = build_monitor_topology(config, mission) {
290 monitor.set_topology(topology);
291 }
292 let bridges = bridges_instanciator(config, &mut resources)?;
293
294 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
295 Some(logging_config) if logging_config.enable_task_logging => (
296 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
297 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
298 logging_config.keyframe_interval.unwrap(), ),
300 Some(_) => (None, None, 0), None => (
302 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
304 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
305 DEFAULT_KEYFRAME_INTERVAL,
306 ),
307 };
308
309 let copperlists_manager = CopperListsManager {
310 inner: CuListsManager::new(),
311 logger: copperlists_logger,
312 };
313 #[cfg(target_os = "none")]
314 {
315 let cl_size = core::mem::size_of::<CopperList<P>>();
316 let total_bytes = cl_size.saturating_mul(NBCL);
317 info!(
318 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
319 NBCL, cl_size, total_bytes
320 );
321 }
322
323 let keyframes_manager = KeyFramesManager {
324 inner: KeyFrame::new(),
325 logger: keyframes_logger,
326 keyframe_interval,
327 };
328
329 let runtime_config = config.runtime.clone().unwrap_or_default();
330
331 let runtime = Self {
332 tasks,
333 bridges,
334 resources,
335 monitor,
336 clock,
337 copperlists_manager,
338 keyframes_manager,
339 runtime_config,
340 };
341
342 Ok(runtime)
343 }
344
345 #[allow(clippy::too_many_arguments)]
346 #[cfg(not(feature = "std"))]
347 pub fn new(
348 clock: RobotClock,
349 config: &CuConfig,
350 mission: Option<&str>,
351 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
352 tasks_instanciator: impl for<'c> Fn(
353 Vec<Option<&'c ComponentConfig>>,
354 &mut ResourceManager,
355 ) -> CuResult<CT>,
356 monitor_instanciator: impl Fn(&CuConfig) -> M,
357 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
358 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
359 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
360 ) -> CuResult<Self> {
361 #[cfg(target_os = "none")]
362 info!("CuRuntime::new: resources instanciator");
363 let resources = resources_instanciator(config)?;
364 Self::new_with_resources(
365 clock,
366 config,
367 mission,
368 resources,
369 tasks_instanciator,
370 monitor_instanciator,
371 bridges_instanciator,
372 copperlists_logger,
373 keyframes_logger,
374 )
375 }
376
377 #[allow(clippy::too_many_arguments)]
378 #[cfg(not(feature = "std"))]
379 pub fn new_with_resources(
380 clock: RobotClock,
381 config: &CuConfig,
382 mission: Option<&str>,
383 mut resources: ResourceManager,
384 tasks_instanciator: impl for<'c> Fn(
385 Vec<Option<&'c ComponentConfig>>,
386 &mut ResourceManager,
387 ) -> CuResult<CT>,
388 monitor_instanciator: impl Fn(&CuConfig) -> M,
389 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
390 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
391 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
392 ) -> CuResult<Self> {
393 #[cfg(target_os = "none")]
394 info!("CuRuntime::new: get graph");
395 let graph = config.get_graph(mission)?;
396 #[cfg(target_os = "none")]
397 info!("CuRuntime::new: graph ok");
398 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
399 .get_all_nodes()
400 .iter()
401 .map(|(_, node)| node.get_instance_config())
402 .collect();
403
404 #[cfg(target_os = "none")]
405 info!("CuRuntime::new: tasks instanciator");
406 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
407
408 #[cfg(target_os = "none")]
409 info!("CuRuntime::new: monitor instanciator");
410 let mut monitor = monitor_instanciator(config);
411 #[cfg(target_os = "none")]
412 info!("CuRuntime::new: monitor instanciator ok");
413 #[cfg(target_os = "none")]
414 info!("CuRuntime::new: build monitor topology");
415 if let Ok(topology) = build_monitor_topology(config, mission) {
416 #[cfg(target_os = "none")]
417 info!("CuRuntime::new: monitor topology ok");
418 monitor.set_topology(topology);
419 #[cfg(target_os = "none")]
420 info!("CuRuntime::new: monitor topology set");
421 }
422 #[cfg(target_os = "none")]
423 info!("CuRuntime::new: bridges instanciator");
424 let bridges = bridges_instanciator(config, &mut resources)?;
425
426 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
427 Some(logging_config) if logging_config.enable_task_logging => (
428 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
429 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
430 logging_config.keyframe_interval.unwrap(), ),
432 Some(_) => (None, None, 0), None => (
434 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
436 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
437 DEFAULT_KEYFRAME_INTERVAL,
438 ),
439 };
440
441 let copperlists_manager = CopperListsManager {
442 inner: CuListsManager::new(),
443 logger: copperlists_logger,
444 };
445 #[cfg(target_os = "none")]
446 {
447 let cl_size = core::mem::size_of::<CopperList<P>>();
448 let total_bytes = cl_size.saturating_mul(NBCL);
449 info!(
450 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
451 NBCL, cl_size, total_bytes
452 );
453 }
454
455 let keyframes_manager = KeyFramesManager {
456 inner: KeyFrame::new(),
457 logger: keyframes_logger,
458 keyframe_interval,
459 };
460
461 let runtime_config = config.runtime.clone().unwrap_or_default();
462
463 let runtime = Self {
464 tasks,
465 bridges,
466 resources,
467 monitor,
468 clock,
469 copperlists_manager,
470 keyframes_manager,
471 runtime_config,
472 };
473
474 Ok(runtime)
475 }
476}
477
478#[derive(Debug, PartialEq, Eq, Clone, Copy)]
483pub enum CuTaskType {
484 Source,
485 Regular,
486 Sink,
487}
488
489#[derive(Debug, Clone)]
490pub struct CuOutputPack {
491 pub culist_index: u32,
492 pub msg_types: Vec<String>,
493}
494
495#[derive(Debug, Clone)]
496pub struct CuInputMsg {
497 pub culist_index: u32,
498 pub msg_type: String,
499 pub src_port: usize,
500 pub edge_id: usize,
501}
502
503pub struct CuExecutionStep {
505 pub node_id: NodeId,
507 pub node: Node,
509 pub task_type: CuTaskType,
511
512 pub input_msg_indices_types: Vec<CuInputMsg>,
514
515 pub output_msg_pack: Option<CuOutputPack>,
517}
518
519impl Debug for CuExecutionStep {
520 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
521 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
522 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
523 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
524 f.write_str(
525 format!(
526 " input_msg_types: {:?}\n",
527 self.input_msg_indices_types
528 )
529 .as_str(),
530 )?;
531 f.write_str(format!(" output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
532 Ok(())
533 }
534}
535
536pub struct CuExecutionLoop {
541 pub steps: Vec<CuExecutionUnit>,
542 pub loop_count: Option<u32>,
543}
544
545impl Debug for CuExecutionLoop {
546 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
547 f.write_str("CuExecutionLoop:\n")?;
548 for step in &self.steps {
549 match step {
550 CuExecutionUnit::Step(step) => {
551 step.fmt(f)?;
552 }
553 CuExecutionUnit::Loop(l) => {
554 l.fmt(f)?;
555 }
556 }
557 }
558
559 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
560 Ok(())
561 }
562}
563
564#[derive(Debug)]
566pub enum CuExecutionUnit {
567 Step(CuExecutionStep),
568 Loop(CuExecutionLoop),
569}
570
571fn find_output_pack_from_nodeid(
572 node_id: NodeId,
573 steps: &Vec<CuExecutionUnit>,
574) -> Option<CuOutputPack> {
575 for step in steps {
576 match step {
577 CuExecutionUnit::Loop(loop_unit) => {
578 if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
579 return Some(output_pack);
580 }
581 }
582 CuExecutionUnit::Step(step) => {
583 if step.node_id == node_id {
584 return step.output_msg_pack.clone();
585 }
586 }
587 }
588 }
589 None
590}
591
592pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
593 if graph.incoming_neighbor_count(node_id) == 0 {
594 CuTaskType::Source
595 } else if graph.outgoing_neighbor_count(node_id) == 0 {
596 CuTaskType::Sink
597 } else {
598 CuTaskType::Regular
599 }
600}
601
602fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
605 input_msg_indices_types.sort_by_key(|input| input.edge_id);
606}
607
608fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
609 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
610 edge_ids.sort();
611
612 let mut msg_types = Vec::new();
613 let mut seen = Vec::new();
614 for edge_id in edge_ids {
615 if let Some(edge) = graph.edge(edge_id) {
616 if seen.iter().any(|msg| msg == &edge.msg) {
617 continue;
618 }
619 seen.push(edge.msg.clone());
620 msg_types.push(edge.msg.clone());
621 }
622 }
623 msg_types
624}
625fn plan_tasks_tree_branch(
627 graph: &CuGraph,
628 mut next_culist_output_index: u32,
629 starting_point: NodeId,
630 plan: &mut Vec<CuExecutionUnit>,
631) -> (u32, bool) {
632 #[cfg(all(feature = "std", feature = "macro_debug"))]
633 eprintln!("-- starting branch from node {starting_point}");
634
635 let mut handled = false;
636
637 for id in graph.bfs_nodes(starting_point) {
638 let node_ref = graph.get_node(id).unwrap();
639 #[cfg(all(feature = "std", feature = "macro_debug"))]
640 eprintln!(" Visiting node: {node_ref:?}");
641
642 let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
643 let output_msg_pack: Option<CuOutputPack>;
644 let task_type = find_task_type_for_id(graph, id);
645
646 match task_type {
647 CuTaskType::Source => {
648 #[cfg(all(feature = "std", feature = "macro_debug"))]
649 eprintln!(" → Source node, assign output index {next_culist_output_index}");
650 let msg_types = collect_output_msg_types(graph, id);
651 if msg_types.is_empty() {
652 panic!(
653 "Source node '{}' has no outgoing connections",
654 node_ref.get_id()
655 );
656 }
657 output_msg_pack = Some(CuOutputPack {
658 culist_index: next_culist_output_index,
659 msg_types,
660 });
661 next_culist_output_index += 1;
662 }
663 CuTaskType::Sink => {
664 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
665 edge_ids.sort();
666 #[cfg(all(feature = "std", feature = "macro_debug"))]
667 eprintln!(" → Sink with incoming edges: {edge_ids:?}");
668 for edge_id in edge_ids {
669 let edge = graph
670 .edge(edge_id)
671 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
672 let pid = graph
673 .get_node_id_by_name(edge.src.as_str())
674 .unwrap_or_else(|| {
675 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
676 });
677 let output_pack = find_output_pack_from_nodeid(pid, plan);
678 if let Some(output_pack) = output_pack {
679 #[cfg(all(feature = "std", feature = "macro_debug"))]
680 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
681 let msg_type = edge.msg.as_str();
682 let src_port = output_pack
683 .msg_types
684 .iter()
685 .position(|msg| msg == msg_type)
686 .unwrap_or_else(|| {
687 panic!(
688 "Missing output port for message type '{msg_type}' on node {pid}"
689 )
690 });
691 input_msg_indices_types.push(CuInputMsg {
692 culist_index: output_pack.culist_index,
693 msg_type: msg_type.to_string(),
694 src_port,
695 edge_id,
696 });
697 } else {
698 #[cfg(all(feature = "std", feature = "macro_debug"))]
699 eprintln!(" ✗ Input from {pid} not ready, returning");
700 return (next_culist_output_index, handled);
701 }
702 }
703 output_msg_pack = Some(CuOutputPack {
704 culist_index: next_culist_output_index,
705 msg_types: Vec::from(["()".to_string()]),
706 });
707 next_culist_output_index += 1;
708 }
709 CuTaskType::Regular => {
710 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
711 edge_ids.sort();
712 #[cfg(all(feature = "std", feature = "macro_debug"))]
713 eprintln!(" → Regular task with incoming edges: {edge_ids:?}");
714 for edge_id in edge_ids {
715 let edge = graph
716 .edge(edge_id)
717 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
718 let pid = graph
719 .get_node_id_by_name(edge.src.as_str())
720 .unwrap_or_else(|| {
721 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
722 });
723 let output_pack = find_output_pack_from_nodeid(pid, plan);
724 if let Some(output_pack) = output_pack {
725 #[cfg(all(feature = "std", feature = "macro_debug"))]
726 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
727 let msg_type = edge.msg.as_str();
728 let src_port = output_pack
729 .msg_types
730 .iter()
731 .position(|msg| msg == msg_type)
732 .unwrap_or_else(|| {
733 panic!(
734 "Missing output port for message type '{msg_type}' on node {pid}"
735 )
736 });
737 input_msg_indices_types.push(CuInputMsg {
738 culist_index: output_pack.culist_index,
739 msg_type: msg_type.to_string(),
740 src_port,
741 edge_id,
742 });
743 } else {
744 #[cfg(all(feature = "std", feature = "macro_debug"))]
745 eprintln!(" ✗ Input from {pid} not ready, returning");
746 return (next_culist_output_index, handled);
747 }
748 }
749 let msg_types = collect_output_msg_types(graph, id);
750 if msg_types.is_empty() {
751 panic!(
752 "Regular node '{}' has no outgoing connections",
753 node_ref.get_id()
754 );
755 }
756 output_msg_pack = Some(CuOutputPack {
757 culist_index: next_culist_output_index,
758 msg_types,
759 });
760 next_culist_output_index += 1;
761 }
762 }
763
764 sort_inputs_by_cnx_id(&mut input_msg_indices_types);
765
766 if let Some(pos) = plan
767 .iter()
768 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
769 {
770 #[cfg(all(feature = "std", feature = "macro_debug"))]
771 eprintln!(" → Already in plan, modifying existing step");
772 let mut step = plan.remove(pos);
773 if let CuExecutionUnit::Step(ref mut s) = step {
774 s.input_msg_indices_types = input_msg_indices_types;
775 }
776 plan.push(step);
777 } else {
778 #[cfg(all(feature = "std", feature = "macro_debug"))]
779 eprintln!(" → New step added to plan");
780 let step = CuExecutionStep {
781 node_id: id,
782 node: node_ref.clone(),
783 task_type,
784 input_msg_indices_types,
785 output_msg_pack,
786 };
787 plan.push(CuExecutionUnit::Step(step));
788 }
789
790 handled = true;
791 }
792
793 #[cfg(all(feature = "std", feature = "macro_debug"))]
794 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
795 (next_culist_output_index, handled)
796}
797
798pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
801 #[cfg(all(feature = "std", feature = "macro_debug"))]
802 eprintln!("[runtime plan]");
803 let mut plan = Vec::new();
804 let mut next_culist_output_index = 0u32;
805
806 let mut queue: VecDeque<NodeId> = graph
807 .node_ids()
808 .into_iter()
809 .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
810 .collect();
811
812 #[cfg(all(feature = "std", feature = "macro_debug"))]
813 eprintln!("Initial source nodes: {queue:?}");
814
815 while let Some(start_node) = queue.pop_front() {
816 #[cfg(all(feature = "std", feature = "macro_debug"))]
817 eprintln!("→ Starting BFS from source {start_node}");
818 for node_id in graph.bfs_nodes(start_node) {
819 let already_in_plan = plan
820 .iter()
821 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
822 if already_in_plan {
823 #[cfg(all(feature = "std", feature = "macro_debug"))]
824 eprintln!(" → Node {node_id} already planned, skipping");
825 continue;
826 }
827
828 #[cfg(all(feature = "std", feature = "macro_debug"))]
829 eprintln!(" Planning from node {node_id}");
830 let (new_index, handled) =
831 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
832 next_culist_output_index = new_index;
833
834 if !handled {
835 #[cfg(all(feature = "std", feature = "macro_debug"))]
836 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
837 continue;
838 }
839
840 #[cfg(all(feature = "std", feature = "macro_debug"))]
841 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
842 for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
843 #[cfg(all(feature = "std", feature = "macro_debug"))]
844 eprintln!(" → Enqueueing neighbor {neighbor}");
845 queue.push_back(neighbor);
846 }
847 }
848 }
849
850 let mut planned_nodes = BTreeSet::new();
851 for unit in &plan {
852 if let CuExecutionUnit::Step(step) = unit {
853 planned_nodes.insert(step.node_id);
854 }
855 }
856
857 let mut missing = Vec::new();
858 for node_id in graph.node_ids() {
859 if !planned_nodes.contains(&node_id) {
860 if let Some(node) = graph.get_node(node_id) {
861 missing.push(node.get_id().to_string());
862 } else {
863 missing.push(format!("node_id_{node_id}"));
864 }
865 }
866 }
867
868 if !missing.is_empty() {
869 missing.sort();
870 return Err(CuError::from(format!(
871 "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
872 missing.join(", ")
873 )));
874 }
875
876 Ok(CuExecutionLoop {
877 steps: plan,
878 loop_count: None,
879 })
880}
881
882#[cfg(test)]
884mod tests {
885 use super::*;
886 use crate::config::Node;
887 use crate::cutask::CuSinkTask;
888 use crate::cutask::{CuSrcTask, Freezable};
889 use crate::monitoring::NoMonitor;
890 use bincode::Encode;
891 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
892 use serde_derive::{Deserialize, Serialize};
893
894 pub struct TestSource {}
895
896 impl Freezable for TestSource {}
897
898 impl CuSrcTask for TestSource {
899 type Resources<'r> = ();
900 type Output<'m> = ();
901 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
902 where
903 Self: Sized,
904 {
905 Ok(Self {})
906 }
907
908 fn process(
909 &mut self,
910 _clock: &RobotClock,
911 _empty_msg: &mut Self::Output<'_>,
912 ) -> CuResult<()> {
913 Ok(())
914 }
915 }
916
917 pub struct TestSink {}
918
919 impl Freezable for TestSink {}
920
921 impl CuSinkTask for TestSink {
922 type Resources<'r> = ();
923 type Input<'m> = ();
924
925 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
926 where
927 Self: Sized,
928 {
929 Ok(Self {})
930 }
931
932 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
933 Ok(())
934 }
935 }
936
937 type Tasks = (TestSource, TestSink);
939
940 #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
941 struct Msgs(());
942
943 impl ErasedCuStampedDataSet for Msgs {
944 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
945 Vec::new()
946 }
947 }
948
949 impl MatchingTasks for Msgs {
950 fn get_all_task_ids() -> &'static [&'static str] {
951 &[]
952 }
953 }
954
955 impl CuListZeroedInit for Msgs {
956 fn init_zeroed(&mut self) {}
957 }
958
959 #[cfg(feature = "std")]
960 fn tasks_instanciator(
961 all_instances_configs: Vec<Option<&ComponentConfig>>,
962 _resources: &mut ResourceManager,
963 ) -> CuResult<Tasks> {
964 Ok((
965 TestSource::new(all_instances_configs[0], ())?,
966 TestSink::new(all_instances_configs[1], ())?,
967 ))
968 }
969
970 #[cfg(not(feature = "std"))]
971 fn tasks_instanciator(
972 all_instances_configs: Vec<Option<&ComponentConfig>>,
973 _resources: &mut ResourceManager,
974 ) -> CuResult<Tasks> {
975 Ok((
976 TestSource::new(all_instances_configs[0], ())?,
977 TestSink::new(all_instances_configs[1], ())?,
978 ))
979 }
980
981 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
982 NoMonitor {}
983 }
984
985 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
986 Ok(())
987 }
988
989 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
990 Ok(ResourceManager::new(&[]))
991 }
992
993 #[derive(Debug)]
994 struct FakeWriter {}
995
996 impl<E: Encode> WriteStream<E> for FakeWriter {
997 fn log(&mut self, _obj: &E) -> CuResult<()> {
998 Ok(())
999 }
1000 }
1001
1002 #[test]
1003 fn test_runtime_instantiation() {
1004 let mut config = CuConfig::default();
1005 let graph = config.get_graph_mut(None).unwrap();
1006 graph.add_node(Node::new("a", "TestSource")).unwrap();
1007 graph.add_node(Node::new("b", "TestSink")).unwrap();
1008 graph.connect(0, 1, "()").unwrap();
1009 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1010 RobotClock::default(),
1011 &config,
1012 None,
1013 resources_instanciator,
1014 tasks_instanciator,
1015 monitor_instanciator,
1016 bridges_instanciator,
1017 FakeWriter {},
1018 FakeWriter {},
1019 );
1020 assert!(runtime.is_ok());
1021 }
1022
1023 #[test]
1024 fn test_copperlists_manager_lifecycle() {
1025 let mut config = CuConfig::default();
1026 let graph = config.get_graph_mut(None).unwrap();
1027 graph.add_node(Node::new("a", "TestSource")).unwrap();
1028 graph.add_node(Node::new("b", "TestSink")).unwrap();
1029 graph.connect(0, 1, "()").unwrap();
1030
1031 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1032 RobotClock::default(),
1033 &config,
1034 None,
1035 resources_instanciator,
1036 tasks_instanciator,
1037 monitor_instanciator,
1038 bridges_instanciator,
1039 FakeWriter {},
1040 FakeWriter {},
1041 )
1042 .unwrap();
1043
1044 {
1046 let copperlists = &mut runtime.copperlists_manager;
1047 let culist0 = copperlists
1048 .inner
1049 .create()
1050 .expect("Ran out of space for copper lists");
1051 let id = culist0.id;
1053 assert_eq!(id, 0);
1054 culist0.change_state(CopperListState::Processing);
1055 assert_eq!(copperlists.available_copper_lists(), 1);
1056 }
1057
1058 {
1059 let copperlists = &mut runtime.copperlists_manager;
1060 let culist1 = copperlists
1061 .inner
1062 .create()
1063 .expect("Ran out of space for copper lists"); let id = culist1.id;
1065 assert_eq!(id, 1);
1066 culist1.change_state(CopperListState::Processing);
1067 assert_eq!(copperlists.available_copper_lists(), 0);
1068 }
1069
1070 {
1071 let copperlists = &mut runtime.copperlists_manager;
1072 let culist2 = copperlists.inner.create();
1073 assert!(culist2.is_none());
1074 assert_eq!(copperlists.available_copper_lists(), 0);
1075 let _ = copperlists.end_of_processing(1);
1077 assert_eq!(copperlists.available_copper_lists(), 1);
1078 }
1079
1080 {
1082 let copperlists = &mut runtime.copperlists_manager;
1083 let culist2 = copperlists
1084 .inner
1085 .create()
1086 .expect("Ran out of space for copper lists"); let id = culist2.id;
1088 assert_eq!(id, 2);
1089 culist2.change_state(CopperListState::Processing);
1090 assert_eq!(copperlists.available_copper_lists(), 0);
1091 let _ = copperlists.end_of_processing(0);
1093 assert_eq!(copperlists.available_copper_lists(), 0);
1095
1096 let _ = copperlists.end_of_processing(2);
1098 assert_eq!(copperlists.available_copper_lists(), 2);
1101 }
1102 }
1103
1104 #[test]
1105 fn test_runtime_task_input_order() {
1106 let mut config = CuConfig::default();
1107 let graph = config.get_graph_mut(None).unwrap();
1108 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1109 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1110 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1111
1112 assert_eq!(src1_id, 0);
1113 assert_eq!(src2_id, 1);
1114
1115 let src1_type = "src1_type";
1117 let src2_type = "src2_type";
1118 graph.connect(src2_id, sink_id, src2_type).unwrap();
1119 graph.connect(src1_id, sink_id, src1_type).unwrap();
1120
1121 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1122 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1123 assert_eq!(src1_edge_id, 1);
1126 assert_eq!(src2_edge_id, 0);
1127
1128 let runtime = compute_runtime_plan(graph).unwrap();
1129 let sink_step = runtime
1130 .steps
1131 .iter()
1132 .find_map(|step| match step {
1133 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1134 _ => None,
1135 })
1136 .unwrap();
1137
1138 assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
1141 assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
1142 }
1143
1144 #[test]
1145 fn test_runtime_output_ports_unique_ordered() {
1146 let mut config = CuConfig::default();
1147 let graph = config.get_graph_mut(None).unwrap();
1148 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1149 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1150 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1151 let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
1152 let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
1153
1154 graph.connect(src_id, dst_a_id, "msg::A").unwrap();
1155 graph.connect(src_id, dst_b_id, "msg::B").unwrap();
1156 graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
1157 graph.connect(src_id, dst_c_id, "msg::C").unwrap();
1158
1159 let runtime = compute_runtime_plan(graph).unwrap();
1160 let src_step = runtime
1161 .steps
1162 .iter()
1163 .find_map(|step| match step {
1164 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1165 _ => None,
1166 })
1167 .unwrap();
1168
1169 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1170 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
1171
1172 let dst_a_step = runtime
1173 .steps
1174 .iter()
1175 .find_map(|step| match step {
1176 CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
1177 _ => None,
1178 })
1179 .unwrap();
1180 let dst_b_step = runtime
1181 .steps
1182 .iter()
1183 .find_map(|step| match step {
1184 CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
1185 _ => None,
1186 })
1187 .unwrap();
1188 let dst_a2_step = runtime
1189 .steps
1190 .iter()
1191 .find_map(|step| match step {
1192 CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
1193 _ => None,
1194 })
1195 .unwrap();
1196 let dst_c_step = runtime
1197 .steps
1198 .iter()
1199 .find_map(|step| match step {
1200 CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
1201 _ => None,
1202 })
1203 .unwrap();
1204
1205 assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
1206 assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
1207 assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
1208 assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
1209 }
1210
1211 #[test]
1212 fn test_runtime_output_ports_fanout_single() {
1213 let mut config = CuConfig::default();
1214 let graph = config.get_graph_mut(None).unwrap();
1215 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1216 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1217 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1218
1219 graph.connect(src_id, dst_a_id, "i32").unwrap();
1220 graph.connect(src_id, dst_b_id, "i32").unwrap();
1221
1222 let runtime = compute_runtime_plan(graph).unwrap();
1223 let src_step = runtime
1224 .steps
1225 .iter()
1226 .find_map(|step| match step {
1227 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1228 _ => None,
1229 })
1230 .unwrap();
1231
1232 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1233 assert_eq!(output_pack.msg_types, vec!["i32"]);
1234 }
1235
1236 #[test]
1237 fn test_runtime_plan_diamond_case1() {
1238 let mut config = CuConfig::default();
1240 let graph = config.get_graph_mut(None).unwrap();
1241 let cam0_id = graph
1242 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1243 .unwrap();
1244 let inf0_id = graph
1245 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1246 .unwrap();
1247 let broadcast_id = graph
1248 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1249 .unwrap();
1250
1251 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1253 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1254 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1255
1256 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1257 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1258
1259 assert_eq!(edge_cam0_to_inf0, 0);
1260 assert_eq!(edge_cam0_to_broadcast, 1);
1261
1262 let runtime = compute_runtime_plan(graph).unwrap();
1263 let broadcast_step = runtime
1264 .steps
1265 .iter()
1266 .find_map(|step| match step {
1267 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1268 _ => None,
1269 })
1270 .unwrap();
1271
1272 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1273 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1274 }
1275
1276 #[test]
1277 fn test_runtime_plan_diamond_case2() {
1278 let mut config = CuConfig::default();
1280 let graph = config.get_graph_mut(None).unwrap();
1281 let cam0_id = graph
1282 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1283 .unwrap();
1284 let inf0_id = graph
1285 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1286 .unwrap();
1287 let broadcast_id = graph
1288 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1289 .unwrap();
1290
1291 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1293 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1294 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1295
1296 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1297 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1298
1299 assert_eq!(edge_cam0_to_broadcast, 0);
1300 assert_eq!(edge_cam0_to_inf0, 1);
1301
1302 let runtime = compute_runtime_plan(graph).unwrap();
1303 let broadcast_step = runtime
1304 .steps
1305 .iter()
1306 .find_map(|step| match step {
1307 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1308 _ => None,
1309 })
1310 .unwrap();
1311
1312 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1313 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1314 }
1315}