1use crate::app::Subsystem;
6use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
7use crate::config::{CuConfig, CuGraph, MAX_RATE_TARGET_HZ, NodeId, RuntimeConfig};
8use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
9use crate::cutask::{BincodeAdapter, Freezable};
10#[cfg(feature = "std")]
11use crate::monitoring::ExecutionProbeHandle;
12#[cfg(feature = "std")]
13use crate::monitoring::MonitorExecutionProbe;
14use crate::monitoring::{
15 ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
16 ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
17 take_last_completed_handle_bytes,
18};
19#[cfg(all(feature = "std", feature = "parallel-rt"))]
20use crate::parallel_rt::{ParallelRt, ParallelRtMetadata};
21use crate::resource::ResourceManager;
22use compact_str::CompactString;
23use cu29_clock::{ClockProvider, CuDuration, CuTime, RobotClock};
24use cu29_traits::CuResult;
25use cu29_traits::WriteStream;
26use cu29_traits::{CopperListTuple, CuError};
27
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
31#[cfg(target_os = "none")]
32#[allow(unused_imports)]
33use cu29_log_derive::info;
34#[cfg(target_os = "none")]
35#[allow(unused_imports)]
36use cu29_log_runtime::log;
37#[cfg(all(target_os = "none", debug_assertions))]
38#[allow(unused_imports)]
39use cu29_log_runtime::log_debug_mode;
40#[cfg(target_os = "none")]
41#[allow(unused_imports)]
42use cu29_value::to_value;
43
44#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
45use alloc::alloc::{alloc_zeroed, handle_alloc_error};
46use alloc::boxed::Box;
47use alloc::collections::{BTreeSet, VecDeque};
48use alloc::format;
49use alloc::string::{String, ToString};
50use alloc::vec::Vec;
51use bincode::enc::EncoderImpl;
52use bincode::enc::write::{SizeWriter, SliceWriter};
53use bincode::error::EncodeError;
54use bincode::{Decode, Encode};
55#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
56use core::alloc::Layout;
57use core::fmt::Result as FmtResult;
58use core::fmt::{Debug, Formatter};
59use core::marker::PhantomData;
60
61#[cfg(all(feature = "std", feature = "async-cl-io"))]
62use std::sync::mpsc::{Receiver, SyncSender, TryRecvError, sync_channel};
63#[cfg(all(feature = "std", feature = "async-cl-io"))]
64use std::thread::JoinHandle;
65
66pub type TasksInstantiator<CT> =
67 for<'c> fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>;
68pub type BridgesInstantiator<CB> = fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>;
69pub type MonitorInstantiator<M> = fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M;
70
71pub struct CuRuntimeParts<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI> {
72 pub tasks_instanciator: TI,
73 pub monitored_components: &'static [MonitorComponentMetadata],
74 pub culist_component_mapping: &'static [ComponentId],
75 #[cfg(all(feature = "std", feature = "parallel-rt"))]
76 pub parallel_rt_metadata: &'static ParallelRtMetadata,
77 pub monitor_instanciator: MI,
78 pub bridges_instanciator: BI,
79 _payload: PhantomData<(CT, CB, P, M, [(); NBCL])>,
80}
81
82impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI>
83 CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>
84{
85 pub const fn new(
86 tasks_instanciator: TI,
87 monitored_components: &'static [MonitorComponentMetadata],
88 culist_component_mapping: &'static [ComponentId],
89 #[cfg(all(feature = "std", feature = "parallel-rt"))]
90 parallel_rt_metadata: &'static ParallelRtMetadata,
91 monitor_instanciator: MI,
92 bridges_instanciator: BI,
93 ) -> Self {
94 Self {
95 tasks_instanciator,
96 monitored_components,
97 culist_component_mapping,
98 #[cfg(all(feature = "std", feature = "parallel-rt"))]
99 parallel_rt_metadata,
100 monitor_instanciator,
101 bridges_instanciator,
102 _payload: PhantomData,
103 }
104 }
105}
106
107pub struct CuRuntimeBuilder<
108 'cfg,
109 CT,
110 CB,
111 P: CopperListTuple,
112 M: CuMonitor,
113 const NBCL: usize,
114 TI,
115 BI,
116 MI,
117 CLW,
118 KFW,
119> {
120 clock: RobotClock,
121 config: &'cfg CuConfig,
122 mission: &'cfg str,
123 subsystem: Subsystem,
124 instance_id: u32,
125 resources: Option<ResourceManager>,
126 parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
127 copperlists_logger: CLW,
128 keyframes_logger: KFW,
129}
130
131impl<'cfg, CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI, CLW, KFW>
132 CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
133{
134 pub fn new(
135 clock: RobotClock,
136 config: &'cfg CuConfig,
137 mission: &'cfg str,
138 parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
139 copperlists_logger: CLW,
140 keyframes_logger: KFW,
141 ) -> Self {
142 Self {
143 clock,
144 config,
145 mission,
146 subsystem: Subsystem::new(None, 0),
147 instance_id: 0,
148 resources: None,
149 parts,
150 copperlists_logger,
151 keyframes_logger,
152 }
153 }
154
155 pub fn with_subsystem(mut self, subsystem: Subsystem) -> Self {
156 self.subsystem = subsystem;
157 self
158 }
159
160 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
161 self.instance_id = instance_id;
162 self
163 }
164
165 pub fn with_resources(mut self, resources: ResourceManager) -> Self {
166 self.resources = Some(resources);
167 self
168 }
169
170 pub fn try_with_resources_instantiator(
171 mut self,
172 resources_instantiator: impl FnOnce(&CuConfig) -> CuResult<ResourceManager>,
173 ) -> CuResult<Self> {
174 self.resources = Some(resources_instantiator(self.config)?);
175 Ok(self)
176 }
177}
178
179#[inline]
190pub fn perf_now(_clock: &RobotClock) -> CuTime {
191 #[cfg(all(feature = "std", feature = "sysclock-perf"))]
192 {
193 static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
194 return PERF_CLOCK.get_or_init(RobotClock::new).now();
195 }
196
197 #[allow(unreachable_code)]
198 _clock.now()
199}
200
201#[cfg(all(feature = "std", feature = "high-precision-limiter"))]
202const HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS: u64 = 200_000;
203
204#[inline]
206pub fn rate_target_period(rate_target_hz: u64) -> CuResult<CuDuration> {
207 if rate_target_hz == 0 {
208 return Err(CuError::from(
209 "Runtime rate target cannot be zero. Set runtime.rate_target_hz to at least 1.",
210 ));
211 }
212
213 if rate_target_hz > MAX_RATE_TARGET_HZ {
214 return Err(CuError::from(format!(
215 "Runtime rate target ({rate_target_hz} Hz) exceeds the supported maximum of {MAX_RATE_TARGET_HZ} Hz."
216 )));
217 }
218
219 Ok(CuDuration::from(MAX_RATE_TARGET_HZ / rate_target_hz))
220}
221
222#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229pub struct LoopRateLimiter {
230 period: CuDuration,
231 next_deadline: CuTime,
232}
233
234impl LoopRateLimiter {
235 #[inline]
236 pub fn from_rate_target_hz(rate_target_hz: u64, clock: &RobotClock) -> CuResult<Self> {
237 let period = rate_target_period(rate_target_hz)?;
238 Ok(Self {
239 period,
240 next_deadline: clock.now() + period,
241 })
242 }
243
244 #[inline]
245 pub fn is_ready(&self, clock: &RobotClock) -> bool {
246 self.remaining(clock).is_none()
247 }
248
249 #[inline]
250 pub fn remaining(&self, clock: &RobotClock) -> Option<CuDuration> {
251 let now = clock.now();
252 if now < self.next_deadline {
253 Some(self.next_deadline - now)
254 } else {
255 None
256 }
257 }
258
259 #[inline]
260 pub fn wait_until_ready(&self, clock: &RobotClock) {
261 let deadline = self.next_deadline;
262 let Some(remaining) = self.remaining(clock) else {
263 return;
264 };
265
266 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
267 {
268 let spin_window = self.spin_window();
269 if remaining > spin_window {
270 std::thread::sleep(std::time::Duration::from(remaining - spin_window));
271 }
272 while clock.now() < deadline {
273 core::hint::spin_loop();
274 }
275 }
276
277 #[cfg(all(feature = "std", not(feature = "high-precision-limiter")))]
278 {
279 let _ = deadline;
280 std::thread::sleep(std::time::Duration::from(remaining));
281 }
282
283 #[cfg(not(feature = "std"))]
284 {
285 let _ = remaining;
286 while clock.now() < deadline {
287 core::hint::spin_loop();
288 }
289 }
290 }
291
292 #[inline]
293 pub fn mark_tick(&mut self, clock: &RobotClock) {
294 self.advance_from(clock.now());
295 }
296
297 #[inline]
298 pub fn limit(&mut self, clock: &RobotClock) {
299 self.wait_until_ready(clock);
300 self.mark_tick(clock);
301 }
302
303 #[inline]
304 fn advance_from(&mut self, now: CuTime) {
305 let steps = if now < self.next_deadline {
306 1
307 } else {
308 (now - self.next_deadline).as_nanos() / self.period.as_nanos() + 1
309 };
310 self.next_deadline += steps * self.period;
311 }
312
313 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
314 #[inline]
315 fn spin_window(&self) -> CuDuration {
316 let _ = self.period;
317 CuDuration::from(HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS)
318 }
319
320 #[cfg(test)]
321 #[inline]
322 fn next_deadline(&self) -> CuTime {
323 self.next_deadline
324 }
325}
326
327#[cfg(all(feature = "std", feature = "async-cl-io"))]
328#[doc(hidden)]
329pub trait AsyncCopperListPayload: Send {}
330
331#[cfg(all(feature = "std", feature = "async-cl-io"))]
332impl<T: Send> AsyncCopperListPayload for T {}
333
334#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
335#[doc(hidden)]
336pub trait AsyncCopperListPayload {}
337
338#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
339impl<T> AsyncCopperListPayload for T {}
340
341#[derive(Clone, Copy, Debug, PartialEq, Eq)]
348pub enum ProcessStepOutcome {
349 Continue,
350 AbortCopperList,
351}
352
353pub type ProcessStepResult = CuResult<ProcessStepOutcome>;
355
356pub struct SyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
358 inner: CuListsManager<P, NBCL>,
359 logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
361 pub last_encoded_bytes: u64,
363 pub last_handle_bytes: u64,
365}
366
367impl<P: CopperListTuple + Default, const NBCL: usize> SyncCopperListsManager<P, NBCL> {
368 pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
369 where
370 P: CuListZeroedInit,
371 {
372 Ok(Self {
373 inner: CuListsManager::new(),
374 logger,
375 last_encoded_bytes: 0,
376 last_handle_bytes: 0,
377 })
378 }
379
380 pub fn next_cl_id(&self) -> u64 {
381 self.inner.next_cl_id()
382 }
383
384 pub fn last_cl_id(&self) -> u64 {
385 self.inner.last_cl_id()
386 }
387
388 pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
389 self.inner
390 .create()
391 .ok_or_else(|| CuError::from("Ran out of space for copper lists"))
392 }
393
394 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
395 let mut is_top = true;
396 let mut nb_done = 0;
397 self.last_handle_bytes = 0;
398 for cl in self.inner.iter_mut() {
399 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
400 cl.change_state(CopperListState::DoneProcessing);
401 }
402 if is_top && cl.get_state() == CopperListState::DoneProcessing {
403 if let Some(logger) = &mut self.logger {
404 cl.change_state(CopperListState::BeingSerialized);
405 logger.log(cl)?;
406 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
407 self.last_handle_bytes = take_last_completed_handle_bytes();
408 }
409 cl.change_state(CopperListState::Free);
410 nb_done += 1;
411 } else {
412 is_top = false;
413 }
414 }
415 for _ in 0..nb_done {
416 let _ = self.inner.pop();
417 }
418 Ok(())
419 }
420
421 pub fn finish_pending(&mut self) -> CuResult<()> {
422 Ok(())
423 }
424
425 pub fn available_copper_lists(&mut self) -> CuResult<usize> {
426 Ok(NBCL - self.inner.len())
427 }
428
429 #[cfg(feature = "std")]
430 pub fn end_of_processing_boxed(
431 &mut self,
432 mut culist: Box<CopperList<P>>,
433 ) -> CuResult<OwnedCopperListSubmission<P>> {
434 culist.change_state(CopperListState::DoneProcessing);
435 self.last_encoded_bytes = 0;
436 self.last_handle_bytes = 0;
437 if let Some(logger) = &mut self.logger {
438 culist.change_state(CopperListState::BeingSerialized);
439 logger.log(&culist)?;
440 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
441 self.last_handle_bytes = take_last_completed_handle_bytes();
442 }
443 culist.change_state(CopperListState::Free);
444 Ok(OwnedCopperListSubmission::Recycled(culist))
445 }
446
447 #[cfg(feature = "std")]
448 pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
449 Ok(None)
450 }
451
452 #[cfg(feature = "std")]
453 pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
454 Err(CuError::from(
455 "Synchronous CopperList I/O cannot block waiting for boxed completions",
456 ))
457 }
458
459 #[cfg(feature = "std")]
460 pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
461 Ok(Vec::new())
462 }
463}
464
465#[cfg(feature = "std")]
467pub enum OwnedCopperListSubmission<P: CopperListTuple> {
468 Recycled(Box<CopperList<P>>),
470 Pending,
472}
473
474#[cfg(all(feature = "std", feature = "async-cl-io"))]
475struct AsyncCopperListCompletion<P: CopperListTuple> {
476 culist: Box<CopperList<P>>,
477 log_result: CuResult<(u64, u64)>,
478}
479
480#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
481fn allocate_zeroed_copperlist<P>() -> Box<CopperList<P>>
482where
483 P: CopperListTuple + CuListZeroedInit,
484{
485 let mut culist = unsafe {
487 let layout = Layout::new::<CopperList<P>>();
488 let ptr = alloc_zeroed(layout) as *mut CopperList<P>;
489 if ptr.is_null() {
490 handle_alloc_error(layout);
491 }
492 Box::from_raw(ptr)
493 };
494 culist.msgs.init_zeroed();
495 culist
496}
497
498#[cfg(all(feature = "std", feature = "parallel-rt"))]
499pub fn allocate_boxed_copperlists<P, const NBCL: usize>() -> Vec<Box<CopperList<P>>>
500where
501 P: CopperListTuple + CuListZeroedInit,
502{
503 let mut free_pool = Vec::with_capacity(NBCL);
504 for _ in 0..NBCL {
505 free_pool.push(allocate_zeroed_copperlist::<P>());
506 }
507 free_pool
508}
509
510#[cfg(all(feature = "std", feature = "async-cl-io"))]
512pub struct AsyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
513 free_pool: Vec<Box<CopperList<P>>>,
514 current: Option<Box<CopperList<P>>>,
515 pending_count: usize,
516 next_cl_id: u64,
517 pending_sender: Option<SyncSender<Box<CopperList<P>>>>,
518 completion_receiver: Option<Receiver<AsyncCopperListCompletion<P>>>,
519 worker_handle: Option<JoinHandle<()>>,
520 pub last_encoded_bytes: u64,
522 pub last_handle_bytes: u64,
524}
525
526#[cfg(all(feature = "std", feature = "async-cl-io"))]
527impl<P: CopperListTuple + Default, const NBCL: usize> AsyncCopperListsManager<P, NBCL> {
528 pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
529 where
530 P: CuListZeroedInit + AsyncCopperListPayload + 'static,
531 {
532 let mut free_pool = Vec::with_capacity(NBCL);
533 for _ in 0..NBCL {
534 free_pool.push(allocate_zeroed_copperlist::<P>());
535 }
536
537 let (pending_sender, completion_receiver, worker_handle) = if let Some(mut logger) = logger
538 {
539 let (pending_sender, pending_receiver) = sync_channel::<Box<CopperList<P>>>(NBCL);
540 let (completion_sender, completion_receiver) =
541 sync_channel::<AsyncCopperListCompletion<P>>(NBCL);
542 let worker_handle = std::thread::Builder::new()
543 .name("cu-async-cl-io".to_string())
544 .spawn(move || {
545 while let Ok(mut culist) = pending_receiver.recv() {
546 culist.change_state(CopperListState::BeingSerialized);
547 let log_result = logger.log(&culist).map(|_| {
548 (
549 logger.last_log_bytes().unwrap_or(0) as u64,
550 take_last_completed_handle_bytes(),
551 )
552 });
553 let should_stop = log_result.is_err();
554 if completion_sender
555 .send(AsyncCopperListCompletion { culist, log_result })
556 .is_err()
557 {
558 break;
559 }
560 if should_stop {
561 break;
562 }
563 }
564 })
565 .map_err(|e| {
566 CuError::from("Failed to spawn async CopperList serializer thread")
567 .add_cause(e.to_string().as_str())
568 })?;
569 (
570 Some(pending_sender),
571 Some(completion_receiver),
572 Some(worker_handle),
573 )
574 } else {
575 (None, None, None)
576 };
577
578 Ok(Self {
579 free_pool,
580 current: None,
581 pending_count: 0,
582 next_cl_id: 0,
583 pending_sender,
584 completion_receiver,
585 worker_handle,
586 last_encoded_bytes: 0,
587 last_handle_bytes: 0,
588 })
589 }
590
591 pub fn next_cl_id(&self) -> u64 {
592 self.next_cl_id
593 }
594
595 pub fn last_cl_id(&self) -> u64 {
596 self.next_cl_id.saturating_sub(1)
597 }
598
599 pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
600 if self.current.is_some() {
601 return Err(CuError::from(
602 "Attempted to create a CopperList while another one is still active",
603 ));
604 }
605
606 self.reclaim_completed()?;
607 while self.free_pool.is_empty() {
608 self.wait_for_completion()?;
609 }
610
611 let culist = self
612 .free_pool
613 .pop()
614 .ok_or_else(|| CuError::from("Ran out of space for copper lists"))?;
615 self.current = Some(culist);
616
617 let current = self
618 .current
619 .as_mut()
620 .expect("current CopperList is missing");
621 current.id = self.next_cl_id;
622 current.change_state(CopperListState::Initialized);
623 self.next_cl_id += 1;
624 Ok(current.as_mut())
625 }
626
627 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
628 self.reclaim_completed()?;
629
630 let mut culist = self.current.take().ok_or_else(|| {
631 CuError::from("Attempted to finish processing without an active CopperList")
632 })?;
633
634 if culist.id != culistid {
635 return Err(CuError::from(format!(
636 "Attempted to finish CopperList #{culistid} while CopperList #{} is active",
637 culist.id
638 )));
639 }
640
641 culist.change_state(CopperListState::DoneProcessing);
642 self.last_encoded_bytes = 0;
643 self.last_handle_bytes = 0;
644
645 if let Some(pending_sender) = &self.pending_sender {
646 culist.change_state(CopperListState::QueuedForSerialization);
647 pending_sender.send(culist).map_err(|e| {
648 CuError::from("Failed to enqueue CopperList for async serialization")
649 .add_cause(e.to_string().as_str())
650 })?;
651 self.pending_count += 1;
652 self.reclaim_completed()?;
653 } else {
654 culist.change_state(CopperListState::Free);
655 self.free_pool.push(culist);
656 }
657
658 Ok(())
659 }
660
661 pub fn finish_pending(&mut self) -> CuResult<()> {
662 if self.current.is_some() {
663 return Err(CuError::from(
664 "Cannot flush CopperList I/O while a CopperList is still active",
665 ));
666 }
667
668 while self.pending_count > 0 {
669 self.wait_for_completion()?;
670 }
671 Ok(())
672 }
673
674 pub fn available_copper_lists(&mut self) -> CuResult<usize> {
675 self.reclaim_completed()?;
676 Ok(self.free_pool.len())
677 }
678
679 pub fn end_of_processing_boxed(
680 &mut self,
681 mut culist: Box<CopperList<P>>,
682 ) -> CuResult<OwnedCopperListSubmission<P>> {
683 self.reclaim_completed()?;
684 culist.change_state(CopperListState::DoneProcessing);
685 self.last_encoded_bytes = 0;
686 self.last_handle_bytes = 0;
687
688 if let Some(pending_sender) = &self.pending_sender {
689 culist.change_state(CopperListState::QueuedForSerialization);
690 pending_sender.send(culist).map_err(|e| {
691 CuError::from("Failed to enqueue CopperList for async serialization")
692 .add_cause(e.to_string().as_str())
693 })?;
694 self.pending_count += 1;
695 self.reclaim_completed()?;
696 Ok(OwnedCopperListSubmission::Pending)
697 } else {
698 culist.change_state(CopperListState::Free);
699 Ok(OwnedCopperListSubmission::Recycled(culist))
700 }
701 }
702
703 pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
704 let recv_result = {
705 let Some(completion_receiver) = self.completion_receiver.as_ref() else {
706 return Ok(None);
707 };
708 completion_receiver.try_recv()
709 };
710 match recv_result {
711 Ok(completion) => self.handle_completion(completion).map(Some),
712 Err(TryRecvError::Empty) => Ok(None),
713 Err(TryRecvError::Disconnected) => Err(CuError::from(
714 "Async CopperList serializer thread disconnected unexpectedly",
715 )),
716 }
717 }
718
719 pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
720 let completion = self
721 .completion_receiver
722 .as_ref()
723 .ok_or_else(|| {
724 CuError::from("No async CopperList serializer is active to return a free slot")
725 })?
726 .recv()
727 .map_err(|e| {
728 CuError::from("Failed to receive completion from async CopperList serializer")
729 .add_cause(e.to_string().as_str())
730 })?;
731 self.handle_completion(completion)
732 }
733
734 pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
735 let mut reclaimed = Vec::with_capacity(self.pending_count);
736 if self.current.is_some() {
737 return Err(CuError::from(
738 "Cannot flush CopperList I/O while a CopperList is still active",
739 ));
740 }
741 while self.pending_count > 0 {
742 reclaimed.push(self.wait_reclaim_boxed()?);
743 }
744 Ok(reclaimed)
745 }
746
747 fn reclaim_completed(&mut self) -> CuResult<()> {
748 loop {
749 let Some(culist) = self.try_reclaim_boxed()? else {
750 break;
751 };
752 self.free_pool.push(culist);
753 }
754 Ok(())
755 }
756
757 fn wait_for_completion(&mut self) -> CuResult<()> {
758 let culist = self.wait_reclaim_boxed()?;
759 self.free_pool.push(culist);
760 Ok(())
761 }
762
763 fn handle_completion(
764 &mut self,
765 mut completion: AsyncCopperListCompletion<P>,
766 ) -> CuResult<Box<CopperList<P>>> {
767 self.pending_count = self.pending_count.saturating_sub(1);
768 if let Ok((encoded_bytes, handle_bytes)) = completion.log_result.as_ref() {
769 self.last_encoded_bytes = *encoded_bytes;
770 self.last_handle_bytes = *handle_bytes;
771 }
772 completion.culist.change_state(CopperListState::Free);
773 completion.log_result?;
774 Ok(completion.culist)
775 }
776
777 fn shutdown_worker(&mut self) -> CuResult<()> {
778 self.finish_pending()?;
779 self.pending_sender.take();
780 if let Some(worker_handle) = self.worker_handle.take() {
781 worker_handle.join().map_err(|_| {
782 CuError::from("Async CopperList serializer thread panicked while joining")
783 })?;
784 }
785 Ok(())
786 }
787}
788
789#[cfg(all(feature = "std", feature = "async-cl-io"))]
790impl<P: CopperListTuple + Default, const NBCL: usize> Drop for AsyncCopperListsManager<P, NBCL> {
791 fn drop(&mut self) {
792 let _ = self.shutdown_worker();
793 }
794}
795
796#[cfg(all(feature = "std", feature = "async-cl-io"))]
797pub type CopperListsManager<P, const NBCL: usize> = AsyncCopperListsManager<P, NBCL>;
798
799#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
800pub type CopperListsManager<P, const NBCL: usize> = SyncCopperListsManager<P, NBCL>;
801
802pub struct KeyFramesManager {
804 inner: KeyFrame,
806
807 forced_timestamp: Option<CuTime>,
809
810 locked: bool,
812
813 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
815
816 keyframe_interval: u32,
818
819 pub last_encoded_bytes: u64,
821}
822
823impl KeyFramesManager {
824 fn is_keyframe(&self, culistid: u64) -> bool {
825 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
826 }
827
828 #[inline]
829 pub fn captures_keyframe(&self, culistid: u64) -> bool {
830 self.is_keyframe(culistid)
831 }
832
833 pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
834 if self.is_keyframe(culistid) {
835 if self.locked && self.inner.culistid == culistid {
837 return;
838 }
839 let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
840 self.inner.reset(culistid, ts);
841 self.locked = false;
842 }
843 }
844
845 #[cfg(feature = "std")]
847 pub fn set_forced_timestamp(&mut self, ts: CuTime) {
848 self.forced_timestamp = Some(ts);
849 }
850
851 pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
852 if self.is_keyframe(culistid) {
853 if self.locked {
854 return Ok(0);
856 }
857 if self.inner.culistid != culistid {
858 return Err(CuError::from(format!(
859 "Freezing task for culistid {} but current keyframe is {}",
860 culistid, self.inner.culistid
861 )));
862 }
863 self.inner
864 .add_frozen_task(task)
865 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
866 } else {
867 Ok(0)
868 }
869 }
870
871 pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
873 self.freeze_task(culistid, item)
874 }
875
876 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
877 if self.is_keyframe(culistid) {
878 let logger = self.logger.as_mut().unwrap();
879 logger.log(&self.inner)?;
880 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
881 self.locked = false;
883 Ok(())
884 } else {
885 self.last_encoded_bytes = 0;
887 Ok(())
888 }
889 }
890
891 #[cfg(feature = "std")]
893 pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
894 self.inner = keyframe.clone();
895 self.forced_timestamp = Some(keyframe.timestamp);
896 self.locked = true;
897 }
898}
899
900pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
904 pub clock: RobotClock, subsystem_code: u16,
909
910 pub instance_id: u32,
912
913 pub tasks: CT,
915
916 pub bridges: CB,
918
919 pub resources: ResourceManager,
921
922 pub monitor: M,
924
925 #[cfg(feature = "std")]
931 pub execution_probe: ExecutionProbeHandle,
932 #[cfg(not(feature = "std"))]
933 pub execution_probe: RuntimeExecutionProbe,
934
935 pub copperlists_manager: CopperListsManager<P, NBCL>,
937
938 pub keyframes_manager: KeyFramesManager,
940
941 #[cfg(all(feature = "std", feature = "parallel-rt"))]
943 pub parallel_rt: ParallelRt<NBCL>,
944
945 pub runtime_config: RuntimeConfig,
947}
948
949impl<
951 CT,
952 CB,
953 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload,
954 M: CuMonitor,
955 const NBCL: usize,
956> ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
957{
958 fn get_clock(&self) -> RobotClock {
959 self.clock.clone()
960 }
961}
962
963impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> CuRuntime<CT, CB, P, M, NBCL> {
964 #[inline]
966 pub fn clock(&self) -> RobotClock {
967 self.clock.clone()
968 }
969
970 #[inline]
972 pub fn subsystem_code(&self) -> u16 {
973 self.subsystem_code
974 }
975
976 #[inline]
978 pub fn instance_id(&self) -> u32 {
979 self.instance_id
980 }
981}
982
983impl<
984 'cfg,
985 CT,
986 CB,
987 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
988 M: CuMonitor,
989 const NBCL: usize,
990 TI,
991 BI,
992 MI,
993 CLW,
994 KFW,
995> CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
996where
997 TI: for<'c> Fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>,
998 BI: Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
999 MI: Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1000 CLW: WriteStream<CopperList<P>> + 'static,
1001 KFW: WriteStream<KeyFrame> + 'static,
1002{
1003 pub fn build(self) -> CuResult<CuRuntime<CT, CB, P, M, NBCL>> {
1004 let Self {
1005 clock,
1006 config,
1007 mission,
1008 subsystem,
1009 instance_id,
1010 resources,
1011 parts,
1012 copperlists_logger,
1013 keyframes_logger,
1014 } = self;
1015 let mut resources =
1016 resources.ok_or_else(|| CuError::from("Resources missing from CuRuntimeBuilder"))?;
1017
1018 let graph = config.get_graph(Some(mission))?;
1019 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1020 .get_all_nodes()
1021 .iter()
1022 .map(|(_, node)| node.get_instance_config())
1023 .collect();
1024
1025 let tasks = (parts.tasks_instanciator)(all_instances_configs, &mut resources)?;
1026
1027 #[cfg(feature = "std")]
1028 let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
1029 #[cfg(not(feature = "std"))]
1030 let execution_probe = RuntimeExecutionProbe::default();
1031 let monitor_metadata = CuMonitoringMetadata::new(
1032 CompactString::from(mission),
1033 parts.monitored_components,
1034 parts.culist_component_mapping,
1035 CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
1036 build_monitor_topology(config, mission)?,
1037 None,
1038 )?
1039 .with_subsystem_id(subsystem.id())
1040 .with_instance_id(instance_id);
1041 #[cfg(feature = "std")]
1042 let monitor_runtime =
1043 CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
1044 #[cfg(not(feature = "std"))]
1045 let monitor_runtime = CuMonitoringRuntime::unavailable();
1046 let monitor = (parts.monitor_instanciator)(config, monitor_metadata, monitor_runtime);
1047 let bridges = (parts.bridges_instanciator)(config, &mut resources)?;
1048
1049 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
1050 Some(logging_config) if logging_config.enable_task_logging => (
1051 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1052 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1053 logging_config.keyframe_interval.unwrap(),
1054 ),
1055 Some(_) => (None, None, 0),
1056 None => (
1057 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1058 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1059 DEFAULT_KEYFRAME_INTERVAL,
1060 ),
1061 };
1062
1063 let copperlists_manager = CopperListsManager::new(copperlists_logger)?;
1064 #[cfg(target_os = "none")]
1065 {
1066 let cl_size = core::mem::size_of::<CopperList<P>>();
1067 let total_bytes = cl_size.saturating_mul(NBCL);
1068 info!(
1069 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
1070 NBCL, cl_size, total_bytes
1071 );
1072 }
1073
1074 let keyframes_manager = KeyFramesManager {
1075 inner: KeyFrame::new(),
1076 logger: keyframes_logger,
1077 keyframe_interval,
1078 last_encoded_bytes: 0,
1079 forced_timestamp: None,
1080 locked: false,
1081 };
1082 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1083 let parallel_rt = ParallelRt::new(parts.parallel_rt_metadata)?;
1084
1085 let runtime_config = config.runtime.clone().unwrap_or_default();
1086 runtime_config.validate()?;
1087
1088 Ok(CuRuntime {
1089 subsystem_code: subsystem.code(),
1090 instance_id,
1091 tasks,
1092 bridges,
1093 resources,
1094 monitor,
1095 execution_probe,
1096 clock,
1097 copperlists_manager,
1098 keyframes_manager,
1099 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1100 parallel_rt,
1101 runtime_config,
1102 })
1103 }
1104}
1105
1106#[derive(Clone, Encode, Decode)]
1110pub struct KeyFrame {
1111 pub culistid: u64,
1113 pub timestamp: CuTime,
1115 pub serialized_tasks: Vec<u8>,
1117}
1118
1119impl KeyFrame {
1120 fn new() -> Self {
1121 KeyFrame {
1122 culistid: 0,
1123 timestamp: CuTime::default(),
1124 serialized_tasks: Vec::new(),
1125 }
1126 }
1127
1128 fn reset(&mut self, culistid: u64, timestamp: CuTime) {
1130 self.culistid = culistid;
1131 self.timestamp = timestamp;
1132 self.serialized_tasks.clear();
1133 }
1134
1135 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
1137 let cfg = bincode::config::standard();
1138 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
1139 BincodeAdapter(task).encode(&mut sizer)?;
1140 let need = sizer.into_writer().bytes_written as usize;
1141
1142 let start = self.serialized_tasks.len();
1143 self.serialized_tasks.resize(start + need, 0);
1144 let mut enc =
1145 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
1146 BincodeAdapter(task).encode(&mut enc)?;
1147 Ok(need)
1148 }
1149}
1150
1151#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1153pub enum RuntimeLifecycleConfigSource {
1154 ProgrammaticOverride,
1155 ExternalFile,
1156 BundledDefault,
1157}
1158
1159#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1161pub struct RuntimeLifecycleStackInfo {
1162 pub app_name: String,
1163 pub app_version: String,
1164 pub git_commit: Option<String>,
1165 pub git_dirty: Option<bool>,
1166 pub subsystem_id: Option<String>,
1167 pub subsystem_code: u16,
1168 pub instance_id: u32,
1169}
1170
1171#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1173pub enum RuntimeLifecycleEvent {
1174 Instantiated {
1175 config_source: RuntimeLifecycleConfigSource,
1176 effective_config_ron: String,
1177 stack: RuntimeLifecycleStackInfo,
1178 },
1179 MissionStarted {
1180 mission: String,
1181 },
1182 MissionStopped {
1183 mission: String,
1184 reason: String,
1187 },
1188 Panic {
1190 message: String,
1191 file: Option<String>,
1192 line: Option<u32>,
1193 column: Option<u32>,
1194 },
1195 ShutdownCompleted,
1196}
1197
1198#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1200pub struct RuntimeLifecycleRecord {
1201 pub timestamp: CuTime,
1202 pub event: RuntimeLifecycleEvent,
1203}
1204
1205impl<
1206 CT,
1207 CB,
1208 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1209 M: CuMonitor,
1210 const NBCL: usize,
1211> CuRuntime<CT, CB, P, M, NBCL>
1212{
1213 #[inline]
1217 pub fn record_execution_marker(&self, marker: ExecutionMarker) {
1218 self.execution_probe.record(marker);
1219 }
1220
1221 #[inline]
1226 pub fn execution_probe_ref(&self) -> &RuntimeExecutionProbe {
1227 #[cfg(feature = "std")]
1228 {
1229 self.execution_probe.as_ref()
1230 }
1231
1232 #[cfg(not(feature = "std"))]
1233 {
1234 &self.execution_probe
1235 }
1236 }
1237
1238 #[allow(clippy::too_many_arguments)]
1240 #[cfg(feature = "std")]
1241 #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new(...).")]
1242 pub fn new(
1243 clock: RobotClock,
1244 subsystem_code: u16,
1245 config: &CuConfig,
1246 mission: &str,
1247 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
1248 tasks_instanciator: impl for<'c> Fn(
1249 Vec<Option<&'c ComponentConfig>>,
1250 &mut ResourceManager,
1251 ) -> CuResult<CT>,
1252 monitored_components: &'static [MonitorComponentMetadata],
1253 culist_component_mapping: &'static [ComponentId],
1254 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1255 parallel_rt_metadata: &'static ParallelRtMetadata,
1256 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1257 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1258 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1259 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1260 ) -> CuResult<Self> {
1261 let parts = CuRuntimeParts::new(
1262 tasks_instanciator,
1263 monitored_components,
1264 culist_component_mapping,
1265 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1266 parallel_rt_metadata,
1267 monitor_instanciator,
1268 bridges_instanciator,
1269 );
1270 CuRuntimeBuilder::new(
1271 clock,
1272 config,
1273 mission,
1274 parts,
1275 copperlists_logger,
1276 keyframes_logger,
1277 )
1278 .with_subsystem(Subsystem::new(None, subsystem_code))
1279 .try_with_resources_instantiator(resources_instanciator)?
1280 .build()
1281 }
1282
1283 #[allow(clippy::too_many_arguments)]
1284 #[cfg(feature = "std")]
1285 #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new_with_resources(...).")]
1286 pub fn new_with_resources(
1287 clock: RobotClock,
1288 subsystem_code: u16,
1289 config: &CuConfig,
1290 mission: &str,
1291 resources: ResourceManager,
1292 tasks_instanciator: impl for<'c> Fn(
1293 Vec<Option<&'c ComponentConfig>>,
1294 &mut ResourceManager,
1295 ) -> CuResult<CT>,
1296 monitored_components: &'static [MonitorComponentMetadata],
1297 culist_component_mapping: &'static [ComponentId],
1298 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1299 parallel_rt_metadata: &'static ParallelRtMetadata,
1300 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1301 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1302 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1303 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1304 ) -> CuResult<Self> {
1305 let parts = CuRuntimeParts::new(
1306 tasks_instanciator,
1307 monitored_components,
1308 culist_component_mapping,
1309 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1310 parallel_rt_metadata,
1311 monitor_instanciator,
1312 bridges_instanciator,
1313 );
1314 CuRuntimeBuilder::new(
1315 clock,
1316 config,
1317 mission,
1318 parts,
1319 copperlists_logger,
1320 keyframes_logger,
1321 )
1322 .with_subsystem(Subsystem::new(None, subsystem_code))
1323 .with_resources(resources)
1324 .build()
1325 }
1326
1327 #[allow(clippy::too_many_arguments)]
1328 #[cfg(not(feature = "std"))]
1329 #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new(...).")]
1330 pub fn new(
1331 clock: RobotClock,
1332 subsystem_code: u16,
1333 config: &CuConfig,
1334 mission: &str,
1335 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
1336 tasks_instanciator: impl for<'c> Fn(
1337 Vec<Option<&'c ComponentConfig>>,
1338 &mut ResourceManager,
1339 ) -> CuResult<CT>,
1340 monitored_components: &'static [MonitorComponentMetadata],
1341 culist_component_mapping: &'static [ComponentId],
1342 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1343 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1344 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1345 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1346 ) -> CuResult<Self> {
1347 let parts = CuRuntimeParts::new(
1348 tasks_instanciator,
1349 monitored_components,
1350 culist_component_mapping,
1351 monitor_instanciator,
1352 bridges_instanciator,
1353 );
1354 CuRuntimeBuilder::new(
1355 clock,
1356 config,
1357 mission,
1358 parts,
1359 copperlists_logger,
1360 keyframes_logger,
1361 )
1362 .with_subsystem(Subsystem::new(None, subsystem_code))
1363 .try_with_resources_instantiator(resources_instanciator)?
1364 .build()
1365 }
1366
1367 #[allow(clippy::too_many_arguments)]
1368 #[cfg(not(feature = "std"))]
1369 #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new_with_resources(...).")]
1370 pub fn new_with_resources(
1371 clock: RobotClock,
1372 subsystem_code: u16,
1373 config: &CuConfig,
1374 mission: &str,
1375 resources: ResourceManager,
1376 tasks_instanciator: impl for<'c> Fn(
1377 Vec<Option<&'c ComponentConfig>>,
1378 &mut ResourceManager,
1379 ) -> CuResult<CT>,
1380 monitored_components: &'static [MonitorComponentMetadata],
1381 culist_component_mapping: &'static [ComponentId],
1382 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1383 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1384 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1385 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1386 ) -> CuResult<Self> {
1387 let parts = CuRuntimeParts::new(
1388 tasks_instanciator,
1389 monitored_components,
1390 culist_component_mapping,
1391 monitor_instanciator,
1392 bridges_instanciator,
1393 );
1394 CuRuntimeBuilder::new(
1395 clock,
1396 config,
1397 mission,
1398 parts,
1399 copperlists_logger,
1400 keyframes_logger,
1401 )
1402 .with_subsystem(Subsystem::new(None, subsystem_code))
1403 .with_resources(resources)
1404 .build()
1405 }
1406}
1407
1408#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1413pub enum CuTaskType {
1414 Source,
1415 Regular,
1416 Sink,
1417}
1418
1419#[derive(Debug, Clone)]
1420pub struct CuOutputPack {
1421 pub culist_index: u32,
1422 pub msg_types: Vec<String>,
1423}
1424
1425#[derive(Debug, Clone)]
1426pub struct CuInputMsg {
1427 pub culist_index: u32,
1428 pub msg_type: String,
1429 pub src_port: usize,
1430 pub edge_id: usize,
1431}
1432
1433pub struct CuExecutionStep {
1435 pub node_id: NodeId,
1437 pub node: Node,
1439 pub task_type: CuTaskType,
1441
1442 pub input_msg_indices_types: Vec<CuInputMsg>,
1444
1445 pub output_msg_pack: Option<CuOutputPack>,
1447}
1448
1449impl Debug for CuExecutionStep {
1450 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1451 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
1452 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
1453 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
1454 f.write_str(
1455 format!(
1456 " input_msg_types: {:?}\n",
1457 self.input_msg_indices_types
1458 )
1459 .as_str(),
1460 )?;
1461 f.write_str(format!(" output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
1462 Ok(())
1463 }
1464}
1465
1466pub struct CuExecutionLoop {
1471 pub steps: Vec<CuExecutionUnit>,
1472 pub loop_count: Option<u32>,
1473}
1474
1475impl Debug for CuExecutionLoop {
1476 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1477 f.write_str("CuExecutionLoop:\n")?;
1478 for step in &self.steps {
1479 match step {
1480 CuExecutionUnit::Step(step) => {
1481 step.fmt(f)?;
1482 }
1483 CuExecutionUnit::Loop(l) => {
1484 l.fmt(f)?;
1485 }
1486 }
1487 }
1488
1489 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
1490 Ok(())
1491 }
1492}
1493
1494#[derive(Debug)]
1496pub enum CuExecutionUnit {
1497 Step(Box<CuExecutionStep>),
1498 Loop(CuExecutionLoop),
1499}
1500
1501fn find_output_pack_from_nodeid(
1502 node_id: NodeId,
1503 steps: &Vec<CuExecutionUnit>,
1504) -> Option<CuOutputPack> {
1505 for step in steps {
1506 match step {
1507 CuExecutionUnit::Loop(loop_unit) => {
1508 if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
1509 return Some(output_pack);
1510 }
1511 }
1512 CuExecutionUnit::Step(step) => {
1513 if step.node_id == node_id {
1514 return step.output_msg_pack.clone();
1515 }
1516 }
1517 }
1518 }
1519 None
1520}
1521
1522pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
1523 if graph.incoming_neighbor_count(node_id) == 0 {
1524 CuTaskType::Source
1525 } else if graph.outgoing_neighbor_count(node_id) == 0 {
1526 CuTaskType::Sink
1527 } else {
1528 CuTaskType::Regular
1529 }
1530}
1531
1532fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
1535 input_msg_indices_types.sort_by_key(|input| input.edge_id);
1536}
1537
1538fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
1539 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
1540 edge_ids.sort();
1541
1542 let mut msg_order: Vec<(usize, String)> = Vec::new();
1543 let mut record_msg = |msg: String, order: usize| {
1544 if let Some((existing_order, _)) = msg_order
1545 .iter_mut()
1546 .find(|(_, existing_msg)| *existing_msg == msg)
1547 {
1548 if order < *existing_order {
1549 *existing_order = order;
1550 }
1551 return;
1552 }
1553 msg_order.push((order, msg));
1554 };
1555
1556 for edge_id in edge_ids {
1557 if let Some(edge) = graph.edge(edge_id) {
1558 let order = if edge.order == usize::MAX {
1559 edge_id
1560 } else {
1561 edge.order
1562 };
1563 record_msg(edge.msg.clone(), order);
1564 }
1565 }
1566 if let Some(node) = graph.get_node(node_id) {
1567 for (msg, order) in node.nc_outputs_with_order() {
1568 record_msg(msg.clone(), order);
1569 }
1570 }
1571
1572 msg_order.sort_by(|(order_a, msg_a), (order_b, msg_b)| {
1573 order_a.cmp(order_b).then_with(|| msg_a.cmp(msg_b))
1574 });
1575 msg_order.into_iter().map(|(_, msg)| msg).collect()
1576}
1577fn plan_tasks_tree_branch(
1579 graph: &CuGraph,
1580 mut next_culist_output_index: u32,
1581 starting_point: NodeId,
1582 plan: &mut Vec<CuExecutionUnit>,
1583) -> (u32, bool) {
1584 #[cfg(all(feature = "std", feature = "macro_debug"))]
1585 eprintln!("-- starting branch from node {starting_point}");
1586
1587 let mut handled = false;
1588
1589 for id in graph.bfs_nodes(starting_point) {
1590 let node_ref = graph.get_node(id).unwrap();
1591 #[cfg(all(feature = "std", feature = "macro_debug"))]
1592 eprintln!(" Visiting node: {node_ref:?}");
1593
1594 let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
1595 let output_msg_pack: Option<CuOutputPack>;
1596 let task_type = find_task_type_for_id(graph, id);
1597
1598 match task_type {
1599 CuTaskType::Source => {
1600 #[cfg(all(feature = "std", feature = "macro_debug"))]
1601 eprintln!(" → Source node, assign output index {next_culist_output_index}");
1602 let msg_types = collect_output_msg_types(graph, id);
1603 if msg_types.is_empty() {
1604 panic!(
1605 "Source node '{}' has no outgoing connections",
1606 node_ref.get_id()
1607 );
1608 }
1609 output_msg_pack = Some(CuOutputPack {
1610 culist_index: next_culist_output_index,
1611 msg_types,
1612 });
1613 next_culist_output_index += 1;
1614 }
1615 CuTaskType::Sink => {
1616 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1617 edge_ids.sort();
1618 #[cfg(all(feature = "std", feature = "macro_debug"))]
1619 eprintln!(" → Sink with incoming edges: {edge_ids:?}");
1620 for edge_id in edge_ids {
1621 let edge = graph
1622 .edge(edge_id)
1623 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1624 let pid = graph
1625 .get_node_id_by_name(edge.src.as_str())
1626 .unwrap_or_else(|| {
1627 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1628 });
1629 let output_pack = find_output_pack_from_nodeid(pid, plan);
1630 if let Some(output_pack) = output_pack {
1631 #[cfg(all(feature = "std", feature = "macro_debug"))]
1632 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
1633 let msg_type = edge.msg.as_str();
1634 let src_port = output_pack
1635 .msg_types
1636 .iter()
1637 .position(|msg| msg == msg_type)
1638 .unwrap_or_else(|| {
1639 panic!(
1640 "Missing output port for message type '{msg_type}' on node {pid}"
1641 )
1642 });
1643 input_msg_indices_types.push(CuInputMsg {
1644 culist_index: output_pack.culist_index,
1645 msg_type: msg_type.to_string(),
1646 src_port,
1647 edge_id,
1648 });
1649 } else {
1650 #[cfg(all(feature = "std", feature = "macro_debug"))]
1651 eprintln!(" ✗ Input from {pid} not ready, returning");
1652 return (next_culist_output_index, handled);
1653 }
1654 }
1655 output_msg_pack = Some(CuOutputPack {
1656 culist_index: next_culist_output_index,
1657 msg_types: Vec::from(["()".to_string()]),
1658 });
1659 next_culist_output_index += 1;
1660 }
1661 CuTaskType::Regular => {
1662 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1663 edge_ids.sort();
1664 #[cfg(all(feature = "std", feature = "macro_debug"))]
1665 eprintln!(" → Regular task with incoming edges: {edge_ids:?}");
1666 for edge_id in edge_ids {
1667 let edge = graph
1668 .edge(edge_id)
1669 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1670 let pid = graph
1671 .get_node_id_by_name(edge.src.as_str())
1672 .unwrap_or_else(|| {
1673 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1674 });
1675 let output_pack = find_output_pack_from_nodeid(pid, plan);
1676 if let Some(output_pack) = output_pack {
1677 #[cfg(all(feature = "std", feature = "macro_debug"))]
1678 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
1679 let msg_type = edge.msg.as_str();
1680 let src_port = output_pack
1681 .msg_types
1682 .iter()
1683 .position(|msg| msg == msg_type)
1684 .unwrap_or_else(|| {
1685 panic!(
1686 "Missing output port for message type '{msg_type}' on node {pid}"
1687 )
1688 });
1689 input_msg_indices_types.push(CuInputMsg {
1690 culist_index: output_pack.culist_index,
1691 msg_type: msg_type.to_string(),
1692 src_port,
1693 edge_id,
1694 });
1695 } else {
1696 #[cfg(all(feature = "std", feature = "macro_debug"))]
1697 eprintln!(" ✗ Input from {pid} not ready, returning");
1698 return (next_culist_output_index, handled);
1699 }
1700 }
1701 let msg_types = collect_output_msg_types(graph, id);
1702 if msg_types.is_empty() {
1703 panic!(
1704 "Regular node '{}' has no outgoing connections",
1705 node_ref.get_id()
1706 );
1707 }
1708 output_msg_pack = Some(CuOutputPack {
1709 culist_index: next_culist_output_index,
1710 msg_types,
1711 });
1712 next_culist_output_index += 1;
1713 }
1714 }
1715
1716 sort_inputs_by_cnx_id(&mut input_msg_indices_types);
1717
1718 if let Some(pos) = plan
1719 .iter()
1720 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
1721 {
1722 #[cfg(all(feature = "std", feature = "macro_debug"))]
1723 eprintln!(" → Already in plan, modifying existing step");
1724 let mut step = plan.remove(pos);
1725 if let CuExecutionUnit::Step(ref mut s) = step {
1726 s.input_msg_indices_types = input_msg_indices_types;
1727 }
1728 plan.push(step);
1729 } else {
1730 #[cfg(all(feature = "std", feature = "macro_debug"))]
1731 eprintln!(" → New step added to plan");
1732 let step = CuExecutionStep {
1733 node_id: id,
1734 node: node_ref.clone(),
1735 task_type,
1736 input_msg_indices_types,
1737 output_msg_pack,
1738 };
1739 plan.push(CuExecutionUnit::Step(Box::new(step)));
1740 }
1741
1742 handled = true;
1743 }
1744
1745 #[cfg(all(feature = "std", feature = "macro_debug"))]
1746 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
1747 (next_culist_output_index, handled)
1748}
1749
1750pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1753 #[cfg(all(feature = "std", feature = "macro_debug"))]
1754 eprintln!("[runtime plan]");
1755 let mut plan = Vec::new();
1756 let mut next_culist_output_index = 0u32;
1757
1758 let mut queue: VecDeque<NodeId> = graph
1759 .node_ids()
1760 .into_iter()
1761 .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
1762 .collect();
1763
1764 #[cfg(all(feature = "std", feature = "macro_debug"))]
1765 eprintln!("Initial source nodes: {queue:?}");
1766
1767 while let Some(start_node) = queue.pop_front() {
1768 #[cfg(all(feature = "std", feature = "macro_debug"))]
1769 eprintln!("→ Starting BFS from source {start_node}");
1770 for node_id in graph.bfs_nodes(start_node) {
1771 let already_in_plan = plan
1772 .iter()
1773 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1774 if already_in_plan {
1775 #[cfg(all(feature = "std", feature = "macro_debug"))]
1776 eprintln!(" → Node {node_id} already planned, skipping");
1777 continue;
1778 }
1779
1780 #[cfg(all(feature = "std", feature = "macro_debug"))]
1781 eprintln!(" Planning from node {node_id}");
1782 let (new_index, handled) =
1783 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
1784 next_culist_output_index = new_index;
1785
1786 if !handled {
1787 #[cfg(all(feature = "std", feature = "macro_debug"))]
1788 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1789 continue;
1790 }
1791
1792 #[cfg(all(feature = "std", feature = "macro_debug"))]
1793 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
1794 for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1795 #[cfg(all(feature = "std", feature = "macro_debug"))]
1796 eprintln!(" → Enqueueing neighbor {neighbor}");
1797 queue.push_back(neighbor);
1798 }
1799 }
1800 }
1801
1802 let mut planned_nodes = BTreeSet::new();
1803 for unit in &plan {
1804 if let CuExecutionUnit::Step(step) = unit {
1805 planned_nodes.insert(step.node_id);
1806 }
1807 }
1808
1809 let mut missing = Vec::new();
1810 for node_id in graph.node_ids() {
1811 if !planned_nodes.contains(&node_id) {
1812 if let Some(node) = graph.get_node(node_id) {
1813 missing.push(node.get_id().to_string());
1814 } else {
1815 missing.push(format!("node_id_{node_id}"));
1816 }
1817 }
1818 }
1819
1820 if !missing.is_empty() {
1821 missing.sort();
1822 return Err(CuError::from(format!(
1823 "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1824 missing.join(", ")
1825 )));
1826 }
1827
1828 Ok(CuExecutionLoop {
1829 steps: plan,
1830 loop_count: None,
1831 })
1832}
1833
1834#[cfg(test)]
1836mod tests {
1837 use super::*;
1838 use crate::config::Node;
1839 use crate::context::CuContext;
1840 use crate::cutask::CuSinkTask;
1841 use crate::cutask::{CuSrcTask, Freezable};
1842 use crate::monitoring::NoMonitor;
1843 use crate::reflect::Reflect;
1844 use bincode::Encode;
1845 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1846 use serde_derive::{Deserialize, Serialize};
1847
1848 #[derive(Reflect)]
1849 pub struct TestSource {}
1850
1851 impl Freezable for TestSource {}
1852
1853 impl CuSrcTask for TestSource {
1854 type Resources<'r> = ();
1855 type Output<'m> = ();
1856 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1857 where
1858 Self: Sized,
1859 {
1860 Ok(Self {})
1861 }
1862
1863 fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1864 Ok(())
1865 }
1866 }
1867
1868 #[derive(Reflect)]
1869 pub struct TestSink {}
1870
1871 impl Freezable for TestSink {}
1872
1873 impl CuSinkTask for TestSink {
1874 type Resources<'r> = ();
1875 type Input<'m> = ();
1876
1877 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1878 where
1879 Self: Sized,
1880 {
1881 Ok(Self {})
1882 }
1883
1884 fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1885 Ok(())
1886 }
1887 }
1888
1889 type Tasks = (TestSource, TestSink);
1891 type TestRuntime = CuRuntime<Tasks, (), Msgs, NoMonitor, 2>;
1892 const TEST_NBCL: usize = 2;
1893
1894 #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1895 struct Msgs(());
1896
1897 impl ErasedCuStampedDataSet for Msgs {
1898 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1899 Vec::new()
1900 }
1901 }
1902
1903 impl MatchingTasks for Msgs {
1904 fn get_all_task_ids() -> &'static [&'static str] {
1905 &[]
1906 }
1907 }
1908
1909 impl CuListZeroedInit for Msgs {
1910 fn init_zeroed(&mut self) {}
1911 }
1912
1913 #[cfg(feature = "std")]
1914 fn tasks_instanciator(
1915 all_instances_configs: Vec<Option<&ComponentConfig>>,
1916 _resources: &mut ResourceManager,
1917 ) -> CuResult<Tasks> {
1918 Ok((
1919 TestSource::new(all_instances_configs[0], ())?,
1920 TestSink::new(all_instances_configs[1], ())?,
1921 ))
1922 }
1923
1924 #[cfg(not(feature = "std"))]
1925 fn tasks_instanciator(
1926 all_instances_configs: Vec<Option<&ComponentConfig>>,
1927 _resources: &mut ResourceManager,
1928 ) -> CuResult<Tasks> {
1929 Ok((
1930 TestSource::new(all_instances_configs[0], ())?,
1931 TestSink::new(all_instances_configs[1], ())?,
1932 ))
1933 }
1934
1935 fn monitor_instanciator(
1936 _config: &CuConfig,
1937 metadata: CuMonitoringMetadata,
1938 runtime: CuMonitoringRuntime,
1939 ) -> NoMonitor {
1940 NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
1941 }
1942
1943 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1944 Ok(())
1945 }
1946
1947 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1948 Ok(ResourceManager::new(&[]))
1949 }
1950
1951 #[derive(Debug)]
1952 struct FakeWriter {}
1953
1954 impl<E: Encode> WriteStream<E> for FakeWriter {
1955 fn log(&mut self, _obj: &E) -> CuResult<()> {
1956 Ok(())
1957 }
1958 }
1959
1960 #[test]
1961 fn test_runtime_instantiation() {
1962 let mut config = CuConfig::default();
1963 let graph = config.get_graph_mut(None).unwrap();
1964 graph.add_node(Node::new("a", "TestSource")).unwrap();
1965 graph.add_node(Node::new("b", "TestSink")).unwrap();
1966 graph.connect(0, 1, "()").unwrap();
1967 let runtime: CuResult<TestRuntime> =
1968 CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
1969 RobotClock::default(),
1970 &config,
1971 crate::config::DEFAULT_MISSION_ID,
1972 CuRuntimeParts::new(
1973 tasks_instanciator,
1974 &[],
1975 &[],
1976 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1977 &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
1978 monitor_instanciator,
1979 bridges_instanciator,
1980 ),
1981 FakeWriter {},
1982 FakeWriter {},
1983 )
1984 .try_with_resources_instantiator(resources_instanciator)
1985 .and_then(|builder| builder.build());
1986 assert!(runtime.is_ok());
1987 }
1988
1989 #[test]
1990 fn test_rate_target_period_rejects_zero() {
1991 let err = rate_target_period(0).expect_err("zero rate target should fail");
1992 assert!(
1993 err.to_string()
1994 .contains("Runtime rate target cannot be zero"),
1995 "unexpected error: {err}"
1996 );
1997 }
1998
1999 #[test]
2000 fn test_loop_rate_limiter_advances_to_next_period_when_on_time() {
2001 let (clock, mock) = RobotClock::mock();
2002 let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2003 assert_eq!(limiter.next_deadline(), CuDuration::from(10_000_000));
2004
2005 mock.set_value(10_000_000);
2006 limiter.mark_tick(&clock);
2007
2008 assert_eq!(limiter.next_deadline(), CuDuration::from(20_000_000));
2009 }
2010
2011 #[test]
2012 fn test_loop_rate_limiter_skips_missed_periods_without_resetting_phase() {
2013 let (clock, mock) = RobotClock::mock();
2014 let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2015
2016 mock.set_value(35_000_000);
2017 limiter.mark_tick(&clock);
2018
2019 assert_eq!(limiter.next_deadline(), CuDuration::from(40_000_000));
2020 }
2021
2022 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
2023 #[test]
2024 fn test_loop_rate_limiter_spin_window_is_fixed_scheduler_window() {
2025 let (clock, _) = RobotClock::mock();
2026 let limiter = LoopRateLimiter::from_rate_target_hz(1_000, &clock).unwrap();
2027 assert_eq!(limiter.spin_window(), CuDuration::from(200_000));
2028
2029 let fast = LoopRateLimiter::from_rate_target_hz(10_000, &clock).unwrap();
2030 assert_eq!(fast.spin_window(), CuDuration::from(200_000));
2031 }
2032
2033 #[cfg(not(feature = "async-cl-io"))]
2034 #[test]
2035 fn test_copperlists_manager_lifecycle() {
2036 let mut config = CuConfig::default();
2037 let graph = config.get_graph_mut(None).unwrap();
2038 graph.add_node(Node::new("a", "TestSource")).unwrap();
2039 graph.add_node(Node::new("b", "TestSink")).unwrap();
2040 graph.connect(0, 1, "()").unwrap();
2041
2042 let mut runtime: TestRuntime =
2043 CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
2044 RobotClock::default(),
2045 &config,
2046 crate::config::DEFAULT_MISSION_ID,
2047 CuRuntimeParts::new(
2048 tasks_instanciator,
2049 &[],
2050 &[],
2051 #[cfg(all(feature = "std", feature = "parallel-rt"))]
2052 &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2053 monitor_instanciator,
2054 bridges_instanciator,
2055 ),
2056 FakeWriter {},
2057 FakeWriter {},
2058 )
2059 .try_with_resources_instantiator(resources_instanciator)
2060 .and_then(|builder| builder.build())
2061 .unwrap();
2062
2063 {
2065 let copperlists = &mut runtime.copperlists_manager;
2066 let culist0 = copperlists
2067 .create()
2068 .expect("Ran out of space for copper lists");
2069 let id = culist0.id;
2070 assert_eq!(id, 0);
2071 culist0.change_state(CopperListState::Processing);
2072 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2073 }
2074
2075 {
2076 let copperlists = &mut runtime.copperlists_manager;
2077 let culist1 = copperlists
2078 .create()
2079 .expect("Ran out of space for copper lists");
2080 let id = culist1.id;
2081 assert_eq!(id, 1);
2082 culist1.change_state(CopperListState::Processing);
2083 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2084 }
2085
2086 {
2087 let copperlists = &mut runtime.copperlists_manager;
2088 let culist2 = copperlists.create();
2089 assert!(culist2.is_err());
2090 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2091 let _ = copperlists.end_of_processing(1);
2093 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2094 }
2095
2096 {
2098 let copperlists = &mut runtime.copperlists_manager;
2099 let culist2 = copperlists
2100 .create()
2101 .expect("Ran out of space for copper lists");
2102 let id = culist2.id;
2103 assert_eq!(id, 2);
2104 culist2.change_state(CopperListState::Processing);
2105 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2106 let _ = copperlists.end_of_processing(0);
2108 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2110
2111 let _ = copperlists.end_of_processing(2);
2113 assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2116 }
2117 }
2118
2119 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2120 #[derive(Debug, Default)]
2121 struct RecordingWriter {
2122 ids: Arc<Mutex<Vec<u64>>>,
2123 }
2124
2125 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2126 impl WriteStream<CopperList<Msgs>> for RecordingWriter {
2127 fn log(&mut self, culist: &CopperList<Msgs>) -> CuResult<()> {
2128 self.ids.lock().unwrap().push(culist.id);
2129 std::thread::sleep(std::time::Duration::from_millis(2));
2130 Ok(())
2131 }
2132 }
2133
2134 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2135 #[test]
2136 fn test_async_copperlists_manager_flushes_in_order() {
2137 let ids = Arc::new(Mutex::new(Vec::new()));
2138 let mut copperlists = CopperListsManager::<Msgs, 4>::new(Some(Box::new(RecordingWriter {
2139 ids: ids.clone(),
2140 })))
2141 .unwrap();
2142
2143 for expected_id in 0..4 {
2144 let culist = copperlists.create().unwrap();
2145 assert_eq!(culist.id, expected_id);
2146 culist.change_state(CopperListState::Processing);
2147 copperlists.end_of_processing(expected_id).unwrap();
2148 }
2149
2150 copperlists.finish_pending().unwrap();
2151 assert_eq!(copperlists.available_copper_lists().unwrap(), 4);
2152 assert_eq!(*ids.lock().unwrap(), vec![0, 1, 2, 3]);
2153 }
2154
2155 #[test]
2156 fn test_runtime_task_input_order() {
2157 let mut config = CuConfig::default();
2158 let graph = config.get_graph_mut(None).unwrap();
2159 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
2160 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
2161 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
2162
2163 assert_eq!(src1_id, 0);
2164 assert_eq!(src2_id, 1);
2165
2166 let src1_type = "src1_type";
2168 let src2_type = "src2_type";
2169 graph.connect(src2_id, sink_id, src2_type).unwrap();
2170 graph.connect(src1_id, sink_id, src1_type).unwrap();
2171
2172 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
2173 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
2174 assert_eq!(src1_edge_id, 1);
2177 assert_eq!(src2_edge_id, 0);
2178
2179 let runtime = compute_runtime_plan(graph).unwrap();
2180 let sink_step = runtime
2181 .steps
2182 .iter()
2183 .find_map(|step| match step {
2184 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
2185 _ => None,
2186 })
2187 .unwrap();
2188
2189 assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
2192 assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
2193 }
2194
2195 #[test]
2196 fn test_runtime_output_ports_unique_ordered() {
2197 let mut config = CuConfig::default();
2198 let graph = config.get_graph_mut(None).unwrap();
2199 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2200 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2201 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2202 let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
2203 let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
2204
2205 graph.connect(src_id, dst_a_id, "msg::A").unwrap();
2206 graph.connect(src_id, dst_b_id, "msg::B").unwrap();
2207 graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
2208 graph.connect(src_id, dst_c_id, "msg::C").unwrap();
2209
2210 let runtime = compute_runtime_plan(graph).unwrap();
2211 let src_step = runtime
2212 .steps
2213 .iter()
2214 .find_map(|step| match step {
2215 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2216 _ => None,
2217 })
2218 .unwrap();
2219
2220 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2221 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
2222
2223 let dst_a_step = runtime
2224 .steps
2225 .iter()
2226 .find_map(|step| match step {
2227 CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
2228 _ => None,
2229 })
2230 .unwrap();
2231 let dst_b_step = runtime
2232 .steps
2233 .iter()
2234 .find_map(|step| match step {
2235 CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
2236 _ => None,
2237 })
2238 .unwrap();
2239 let dst_a2_step = runtime
2240 .steps
2241 .iter()
2242 .find_map(|step| match step {
2243 CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
2244 _ => None,
2245 })
2246 .unwrap();
2247 let dst_c_step = runtime
2248 .steps
2249 .iter()
2250 .find_map(|step| match step {
2251 CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
2252 _ => None,
2253 })
2254 .unwrap();
2255
2256 assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
2257 assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
2258 assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
2259 assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
2260 }
2261
2262 #[test]
2263 fn test_runtime_output_ports_fanout_single() {
2264 let mut config = CuConfig::default();
2265 let graph = config.get_graph_mut(None).unwrap();
2266 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2267 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2268 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2269
2270 graph.connect(src_id, dst_a_id, "i32").unwrap();
2271 graph.connect(src_id, dst_b_id, "i32").unwrap();
2272
2273 let runtime = compute_runtime_plan(graph).unwrap();
2274 let src_step = runtime
2275 .steps
2276 .iter()
2277 .find_map(|step| match step {
2278 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2279 _ => None,
2280 })
2281 .unwrap();
2282
2283 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2284 assert_eq!(output_pack.msg_types, vec!["i32"]);
2285 }
2286
2287 #[test]
2288 fn test_runtime_output_ports_include_nc_outputs() {
2289 let mut config = CuConfig::default();
2290 let graph = config.get_graph_mut(None).unwrap();
2291 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2292 let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
2293 graph.connect(src_id, dst_id, "msg::A").unwrap();
2294 graph
2295 .get_node_mut(src_id)
2296 .expect("missing source node")
2297 .add_nc_output("msg::B", usize::MAX);
2298
2299 let runtime = compute_runtime_plan(graph).unwrap();
2300 let src_step = runtime
2301 .steps
2302 .iter()
2303 .find_map(|step| match step {
2304 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2305 _ => None,
2306 })
2307 .unwrap();
2308 let dst_step = runtime
2309 .steps
2310 .iter()
2311 .find_map(|step| match step {
2312 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2313 _ => None,
2314 })
2315 .unwrap();
2316
2317 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2318 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2319 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
2320 }
2321
2322 #[test]
2323 fn test_runtime_output_ports_respect_connection_order_with_nc() {
2324 let txt = r#"(
2325 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2326 cnx: [
2327 (src: "src", dst: "__nc__", msg: "msg::A"),
2328 (src: "src", dst: "sink", msg: "msg::B"),
2329 ]
2330 )"#;
2331 let config = CuConfig::deserialize_ron(txt).unwrap();
2332 let graph = config.get_graph(None).unwrap();
2333 let src_id = graph.get_node_id_by_name("src").unwrap();
2334 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2335
2336 let runtime = compute_runtime_plan(graph).unwrap();
2337 let src_step = runtime
2338 .steps
2339 .iter()
2340 .find_map(|step| match step {
2341 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2342 _ => None,
2343 })
2344 .unwrap();
2345 let dst_step = runtime
2346 .steps
2347 .iter()
2348 .find_map(|step| match step {
2349 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2350 _ => None,
2351 })
2352 .unwrap();
2353
2354 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2355 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2356 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2357 }
2358
2359 #[cfg(feature = "std")]
2360 #[test]
2361 fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
2362 let txt = r#"(
2363 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2364 cnx: [
2365 (src: "src", dst: "__nc__", msg: "msg::A"),
2366 (src: "src", dst: "sink", msg: "msg::B"),
2367 ]
2368 )"#;
2369 let tmp = tempfile::NamedTempFile::new().unwrap();
2370 std::fs::write(tmp.path(), txt).unwrap();
2371 let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
2372 let graph = config.get_graph(None).unwrap();
2373 let src_id = graph.get_node_id_by_name("src").unwrap();
2374 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2375
2376 let runtime = compute_runtime_plan(graph).unwrap();
2377 let src_step = runtime
2378 .steps
2379 .iter()
2380 .find_map(|step| match step {
2381 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2382 _ => None,
2383 })
2384 .unwrap();
2385 let dst_step = runtime
2386 .steps
2387 .iter()
2388 .find_map(|step| match step {
2389 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2390 _ => None,
2391 })
2392 .unwrap();
2393
2394 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2395 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2396 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2397 }
2398
2399 #[test]
2400 fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
2401 let txt = r#"(
2402 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2403 cnx: [
2404 (src: "src", dst: "__nc__", msg: "i32"),
2405 (src: "src", dst: "sink", msg: "bool"),
2406 ]
2407 )"#;
2408 let config = CuConfig::deserialize_ron(txt).unwrap();
2409 let graph = config.get_graph(None).unwrap();
2410 let src_id = graph.get_node_id_by_name("src").unwrap();
2411 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2412
2413 let runtime = compute_runtime_plan(graph).unwrap();
2414 let src_step = runtime
2415 .steps
2416 .iter()
2417 .find_map(|step| match step {
2418 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2419 _ => None,
2420 })
2421 .unwrap();
2422 let dst_step = runtime
2423 .steps
2424 .iter()
2425 .find_map(|step| match step {
2426 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2427 _ => None,
2428 })
2429 .unwrap();
2430
2431 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2432 assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
2433 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2434 }
2435
2436 #[test]
2437 fn test_runtime_plan_diamond_case1() {
2438 let mut config = CuConfig::default();
2440 let graph = config.get_graph_mut(None).unwrap();
2441 let cam0_id = graph
2442 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2443 .unwrap();
2444 let inf0_id = graph
2445 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2446 .unwrap();
2447 let broadcast_id = graph
2448 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2449 .unwrap();
2450
2451 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2453 graph.connect(cam0_id, inf0_id, "i32").unwrap();
2454 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2455
2456 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2457 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
2458
2459 assert_eq!(edge_cam0_to_inf0, 0);
2460 assert_eq!(edge_cam0_to_broadcast, 1);
2461
2462 let runtime = compute_runtime_plan(graph).unwrap();
2463 let broadcast_step = runtime
2464 .steps
2465 .iter()
2466 .find_map(|step| match step {
2467 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2468 _ => None,
2469 })
2470 .unwrap();
2471
2472 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2473 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2474 }
2475
2476 #[test]
2477 fn test_runtime_plan_diamond_case2() {
2478 let mut config = CuConfig::default();
2480 let graph = config.get_graph_mut(None).unwrap();
2481 let cam0_id = graph
2482 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2483 .unwrap();
2484 let inf0_id = graph
2485 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2486 .unwrap();
2487 let broadcast_id = graph
2488 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2489 .unwrap();
2490
2491 graph.connect(cam0_id, inf0_id, "i32").unwrap();
2493 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2494 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2495
2496 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2497 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
2498
2499 assert_eq!(edge_cam0_to_broadcast, 0);
2500 assert_eq!(edge_cam0_to_inf0, 1);
2501
2502 let runtime = compute_runtime_plan(graph).unwrap();
2503 let broadcast_step = runtime
2504 .steps
2505 .iter()
2506 .find_map(|step| match step {
2507 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2508 _ => None,
2509 })
2510 .unwrap();
2511
2512 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2513 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2514 }
2515}