1use proc_macro::TokenStream;
2use quote::{format_ident, quote};
3use std::collections::{BTreeMap, HashMap};
4use std::fs::read_to_string;
5use std::path::Path;
6use std::process::Command;
7use syn::Fields::{Named, Unnamed};
8use syn::meta::parser;
9use syn::parse::Parser;
10use syn::{
11 Field, Fields, ItemImpl, ItemStruct, LitStr, Type, TypeTuple, parse_macro_input, parse_quote,
12 parse_str,
13};
14
15use crate::utils::{config_id_to_bridge_const, config_id_to_enum, config_id_to_struct_member};
16use cu29_runtime::config::CuConfig;
17use cu29_runtime::config::{
18 BridgeChannelConfigRepresentation, ConfigGraphs, CuGraph, Flavor, Node, NodeId,
19 ResourceBundleConfig, read_configuration,
20};
21use cu29_runtime::curuntime::{
22 CuExecutionLoop, CuExecutionStep, CuExecutionUnit, CuTaskType, compute_runtime_plan,
23 find_task_type_for_id,
24};
25use cu29_traits::{CuError, CuResult};
26use proc_macro2::{Ident, Span};
27
28mod bundle_resources;
29mod resources;
30mod utils;
31
32const DEFAULT_CLNB: usize = 2; #[inline]
35fn int2sliceindex(i: u32) -> syn::Index {
36 syn::Index::from(i as usize)
37}
38
39#[inline(always)]
40fn return_error(msg: String) -> TokenStream {
41 syn::Error::new(Span::call_site(), msg)
42 .to_compile_error()
43 .into()
44}
45
46fn rtsan_guard_tokens() -> proc_macro2::TokenStream {
47 if cfg!(feature = "rtsan") {
48 quote! {
49 let _rt_guard = ::cu29::rtsan::ScopedSanitizeRealtime::default();
50 }
51 } else {
52 quote! {}
53 }
54}
55
56fn git_output_trimmed(repo_root: &Path, args: &[&str]) -> Option<String> {
57 let output = Command::new("git")
58 .arg("-C")
59 .arg(repo_root)
60 .args(args)
61 .output()
62 .ok()?;
63 if !output.status.success() {
64 return None;
65 }
66 let stdout = String::from_utf8(output.stdout).ok()?;
67 Some(stdout.trim().to_string())
68}
69
70fn detect_git_info(repo_root: &Path) -> (Option<String>, Option<bool>) {
71 let in_repo = git_output_trimmed(repo_root, &["rev-parse", "--is-inside-work-tree"])
72 .is_some_and(|value| value == "true");
73 if !in_repo {
74 return (None, None);
75 }
76
77 let commit = git_output_trimmed(repo_root, &["rev-parse", "HEAD"]).filter(|s| !s.is_empty());
78 let dirty = git_output_trimmed(repo_root, &["status", "--porcelain"]).map(|s| !s.is_empty());
80 (commit, dirty)
81}
82
83#[derive(Debug, Clone)]
84struct CopperRuntimeArgs {
85 config_path: String,
86 subsystem_id: Option<String>,
87 sim_mode: bool,
88 ignore_resources: bool,
89}
90
91impl CopperRuntimeArgs {
92 fn parse_tokens(args: proc_macro2::TokenStream) -> Result<Self, syn::Error> {
93 let mut config_file: Option<LitStr> = None;
94 let mut subsystem_id: Option<LitStr> = None;
95 let mut sim_mode = false;
96 let mut ignore_resources = false;
97
98 let parser = parser(|meta| {
99 if meta.path.is_ident("config") {
100 config_file = Some(meta.value()?.parse()?);
101 Ok(())
102 } else if meta.path.is_ident("subsystem") {
103 subsystem_id = Some(meta.value()?.parse()?);
104 Ok(())
105 } else if meta.path.is_ident("sim_mode") {
106 if meta.input.peek(syn::Token![=]) {
107 meta.input.parse::<syn::Token![=]>()?;
108 let value: syn::LitBool = meta.input.parse()?;
109 sim_mode = value.value();
110 } else {
111 sim_mode = true;
112 }
113 Ok(())
114 } else if meta.path.is_ident("ignore_resources") {
115 if meta.input.peek(syn::Token![=]) {
116 meta.input.parse::<syn::Token![=]>()?;
117 let value: syn::LitBool = meta.input.parse()?;
118 ignore_resources = value.value();
119 } else {
120 ignore_resources = true;
121 }
122 Ok(())
123 } else {
124 Err(meta.error("unsupported property"))
125 }
126 });
127
128 parser.parse2(args)?;
129
130 let config_path = config_file
131 .ok_or_else(|| {
132 syn::Error::new(
133 Span::call_site(),
134 "Expected config file attribute like #[copper_runtime(config = \"path\")]",
135 )
136 })?
137 .value();
138
139 Ok(Self {
140 config_path,
141 subsystem_id: subsystem_id.map(|value| value.value()),
142 sim_mode,
143 ignore_resources,
144 })
145 }
146}
147
148#[derive(Debug)]
149struct ResolvedRuntimeConfig {
150 local_config: CuConfig,
151 bundled_local_config_content: String,
152 subsystem_id: Option<String>,
153 subsystem_code: u16,
154}
155
156#[proc_macro]
157pub fn resources(input: TokenStream) -> TokenStream {
158 resources::resources(input)
159}
160
161#[proc_macro]
162pub fn bundle_resources(input: TokenStream) -> TokenStream {
163 bundle_resources::bundle_resources(input)
164}
165
166#[proc_macro]
170pub fn gen_cumsgs(config_path_lit: TokenStream) -> TokenStream {
171 #[cfg(feature = "std")]
172 let std = true;
173
174 #[cfg(not(feature = "std"))]
175 let std = false;
176 let config = parse_macro_input!(config_path_lit as LitStr).value();
177 if !std::path::Path::new(&config_full_path(&config)).exists() {
178 return return_error(format!(
179 "The configuration file `{config}` does not exist. Please provide a valid path."
180 ));
181 }
182 #[cfg(feature = "macro_debug")]
183 eprintln!("[gen culist support with {config:?}]");
184 let cuconfig = match read_config(&config) {
185 Ok(cuconfig) => cuconfig,
186 Err(e) => return return_error(e.to_string()),
187 };
188
189 let extra_imports = if !std {
190 quote! {
191 use core::fmt::Debug;
192 use core::fmt::Formatter;
193 use core::fmt::Result as FmtResult;
194 use alloc::vec;
195 use alloc::vec::Vec;
196 }
197 } else {
198 quote! {
199 use std::fmt::Debug;
200 use std::fmt::Formatter;
201 use std::fmt::Result as FmtResult;
202 }
203 };
204
205 let common_imports = quote! {
206 use cu29::bincode::Encode;
207 use cu29::bincode::enc::Encoder;
208 use cu29::bincode::error::EncodeError;
209 use cu29::bincode::Decode;
210 use cu29::bincode::de::Decoder;
211 use cu29::bincode::error::DecodeError;
212 use cu29::copperlist::CopperList;
213 use cu29::prelude::ErasedCuStampedData;
214 use cu29::prelude::ErasedCuStampedDataSet;
215 use cu29::prelude::MatchingTasks;
216 use cu29::prelude::Serialize;
217 use cu29::prelude::CuMsg;
218 use cu29::prelude::CuMsgMetadata;
219 use cu29::prelude::CuListZeroedInit;
220 use cu29::prelude::CuCompactString;
221 #extra_imports
222 };
223
224 let with_uses = match &cuconfig.graphs {
225 ConfigGraphs::Simple(graph) => {
226 let support = match build_gen_cumsgs_support(&cuconfig, graph, None) {
227 Ok(support) => support,
228 Err(e) => return return_error(e.to_string()),
229 };
230
231 quote! {
232 mod cumsgs {
233 #common_imports
234 #support
235 }
236 use cumsgs::CuStampedDataSet;
237 type CuMsgs=CuStampedDataSet;
238 }
239 }
240 ConfigGraphs::Missions(graphs) => {
241 let mut missions: Vec<_> = graphs.iter().collect();
242 missions.sort_by(|a, b| a.0.cmp(b.0));
243
244 let mut mission_modules = Vec::<proc_macro2::TokenStream>::new();
245 for (mission, graph) in missions {
246 let mission_mod = match parse_str::<Ident>(mission.as_str()) {
247 Ok(id) => id,
248 Err(_) => {
249 return return_error(format!(
250 "Mission '{mission}' is not a valid Rust identifier for gen_cumsgs output."
251 ));
252 }
253 };
254
255 let support = match build_gen_cumsgs_support(&cuconfig, graph, Some(mission)) {
256 Ok(support) => support,
257 Err(e) => return return_error(e.to_string()),
258 };
259
260 mission_modules.push(quote! {
261 pub mod #mission_mod {
262 #common_imports
263 #support
264 }
265 });
266 }
267
268 let default_exports = if graphs.contains_key("default") {
269 quote! {
270 use cumsgs::default::CuStampedDataSet;
271 type CuMsgs=CuStampedDataSet;
272 }
273 } else {
274 quote! {}
275 };
276
277 quote! {
278 mod cumsgs {
279 #(#mission_modules)*
280 }
281 #default_exports
282 }
283 }
284 };
285 with_uses.into()
286}
287
288fn build_gen_cumsgs_support(
289 cuconfig: &CuConfig,
290 graph: &CuGraph,
291 mission_label: Option<&str>,
292) -> CuResult<proc_macro2::TokenStream> {
293 let task_specs = CuTaskSpecSet::from_graph(graph);
294 let channel_usage = collect_bridge_channel_usage(graph);
295 let mut bridge_specs = build_bridge_specs(cuconfig, graph, &channel_usage);
296 let (culist_plan, exec_entities, plan_to_original) =
297 build_execution_plan(graph, &task_specs, &mut bridge_specs).map_err(|e| {
298 if let Some(mission) = mission_label {
299 CuError::from(format!(
300 "Could not compute copperlist plan for mission '{mission}': {e}"
301 ))
302 } else {
303 CuError::from(format!("Could not compute copperlist plan: {e}"))
304 }
305 })?;
306 let task_names = collect_task_names(graph);
307 let (culist_order, node_output_positions) = collect_culist_metadata(
308 &culist_plan,
309 &exec_entities,
310 &mut bridge_specs,
311 &plan_to_original,
312 );
313
314 #[cfg(feature = "macro_debug")]
315 if let Some(mission) = mission_label {
316 eprintln!(
317 "[The CuStampedDataSet matching tasks ids for mission '{mission}' are {:?}]",
318 culist_order
319 );
320 } else {
321 eprintln!(
322 "[The CuStampedDataSet matching tasks ids are {:?}]",
323 culist_order
324 );
325 }
326
327 Ok(gen_culist_support(
328 &culist_plan,
329 &culist_order,
330 &node_output_positions,
331 &task_names,
332 &bridge_specs,
333 ))
334}
335
336fn gen_culist_support(
338 runtime_plan: &CuExecutionLoop,
339 culist_indices_in_plan_order: &[usize],
340 node_output_positions: &HashMap<NodeId, usize>,
341 task_names: &[(NodeId, String, String)],
342 bridge_specs: &[BridgeSpec],
343) -> proc_macro2::TokenStream {
344 #[cfg(feature = "macro_debug")]
345 eprintln!("[Extract msgs types]");
346 let output_packs = extract_output_packs(runtime_plan);
347 let slot_types: Vec<Type> = output_packs.iter().map(|pack| pack.slot_type()).collect();
348
349 let culist_size = output_packs.len();
350
351 #[cfg(feature = "macro_debug")]
352 eprintln!("[build the copperlist struct]");
353 let msgs_types_tuple: TypeTuple = build_culist_tuple(&slot_types);
354 let cumsg_count: usize = output_packs.iter().map(|pack| pack.msg_types.len()).sum();
355
356 #[cfg(feature = "macro_debug")]
357 eprintln!("[build the copperlist tuple bincode support]");
358 let msgs_types_tuple_encode = build_culist_tuple_encode(&output_packs);
359 let msgs_types_tuple_decode = build_culist_tuple_decode(&slot_types, cumsg_count);
360
361 #[cfg(feature = "macro_debug")]
362 eprintln!("[build the copperlist tuple debug support]");
363 let msgs_types_tuple_debug = build_culist_tuple_debug(&slot_types);
364
365 #[cfg(feature = "macro_debug")]
366 eprintln!("[build the copperlist tuple serialize support]");
367 let msgs_types_tuple_serialize = build_culist_tuple_serialize(&slot_types);
368
369 #[cfg(feature = "macro_debug")]
370 eprintln!("[build the default tuple support]");
371 let msgs_types_tuple_default = build_culist_tuple_default(&slot_types, cumsg_count);
372
373 #[cfg(feature = "macro_debug")]
374 eprintln!("[build erasedcumsgs]");
375
376 let erasedmsg_trait_impl = build_culist_erasedcumsgs(&output_packs);
377
378 let metadata_accessors: Vec<proc_macro2::TokenStream> = culist_indices_in_plan_order
379 .iter()
380 .map(|idx| {
381 let slot_index = syn::Index::from(*idx);
382 let pack = output_packs
383 .get(*idx)
384 .unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
385 if pack.is_multi() {
386 quote! { &culist.msgs.0.#slot_index.0.metadata }
387 } else {
388 quote! { &culist.msgs.0.#slot_index.metadata }
389 }
390 })
391 .collect();
392 let mut zeroed_init_tokens: Vec<proc_macro2::TokenStream> = Vec::new();
393 for idx in culist_indices_in_plan_order {
394 let slot_index = syn::Index::from(*idx);
395 let pack = output_packs
396 .get(*idx)
397 .unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
398 if pack.is_multi() {
399 for port_idx in 0..pack.msg_types.len() {
400 let port_index = syn::Index::from(port_idx);
401 zeroed_init_tokens.push(quote! {
402 self.0.#slot_index.#port_index.metadata.status_txt = CuCompactString::default();
403 self.0.#slot_index.#port_index.metadata.process_time.start =
404 cu29::clock::OptionCuTime::none();
405 self.0.#slot_index.#port_index.metadata.process_time.end =
406 cu29::clock::OptionCuTime::none();
407 self.0.#slot_index.#port_index.metadata.origin = None;
408 });
409 }
410 } else {
411 zeroed_init_tokens.push(quote! {
412 self.0.#slot_index.metadata.status_txt = CuCompactString::default();
413 self.0.#slot_index.metadata.process_time.start = cu29::clock::OptionCuTime::none();
414 self.0.#slot_index.metadata.process_time.end = cu29::clock::OptionCuTime::none();
415 self.0.#slot_index.metadata.origin = None;
416 });
417 }
418 }
419 let collect_metadata_function = quote! {
420 pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a CuMsgMetadata; #culist_size] {
421 [#( #metadata_accessors, )*]
422 }
423 };
424
425 let payload_bytes_accumulators: Vec<proc_macro2::TokenStream> = culist_indices_in_plan_order
426 .iter()
427 .scan(0usize, |flat_idx, idx| {
428 let slot_index = syn::Index::from(*idx);
429 let pack = output_packs
430 .get(*idx)
431 .unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
432 if pack.is_multi() {
433 let iter = (0..pack.msg_types.len()).map(|port_idx| {
434 let port_index = syn::Index::from(port_idx);
435 let cache_index = syn::Index::from(*flat_idx);
436 *flat_idx += 1;
437 quote! {
438 if let Some(payload) = culist.msgs.0.#slot_index.#port_index.payload() {
439 let cached = culist.msgs.1.get(#cache_index);
440 let io = if cached.present {
441 cu29::monitoring::PayloadIoStats {
442 resident_bytes: cached.resident_bytes as usize,
443 encoded_bytes: cached.encoded_bytes as usize,
444 handle_bytes: cached.handle_bytes as usize,
445 }
446 } else {
447 cu29::monitoring::payload_io_stats(payload)?
448 };
449 raw += io.resident_bytes;
450 handles += io.handle_bytes;
451 }
452 }
453 });
454 Some(quote! { #(#iter)* })
455 } else {
456 let cache_index = syn::Index::from(*flat_idx);
457 *flat_idx += 1;
458 Some(quote! {
459 if let Some(payload) = culist.msgs.0.#slot_index.payload() {
460 let cached = culist.msgs.1.get(#cache_index);
461 let io = if cached.present {
462 cu29::monitoring::PayloadIoStats {
463 resident_bytes: cached.resident_bytes as usize,
464 encoded_bytes: cached.encoded_bytes as usize,
465 handle_bytes: cached.handle_bytes as usize,
466 }
467 } else {
468 cu29::monitoring::payload_io_stats(payload)?
469 };
470 raw += io.resident_bytes;
471 handles += io.handle_bytes;
472 }
473 })
474 }
475 })
476 .collect();
477
478 let payload_raw_bytes_accumulators: Vec<proc_macro2::TokenStream> = output_packs
479 .iter()
480 .enumerate()
481 .scan(0usize, |flat_idx, (slot_idx, pack)| {
482 let slot_index = syn::Index::from(slot_idx);
483 if pack.is_multi() {
484 let iter = (0..pack.msg_types.len()).map(|port_idx| {
485 let port_index = syn::Index::from(port_idx);
486 let cache_index = syn::Index::from(*flat_idx);
487 *flat_idx += 1;
488 quote! {
489 if let Some(payload) = self.0.#slot_index.#port_index.payload() {
490 let cached = self.1.get(#cache_index);
491 bytes.push(if cached.present {
492 Some(cached.resident_bytes)
493 } else {
494 cu29::monitoring::payload_io_stats(payload)
495 .ok()
496 .map(|io| io.resident_bytes as u64)
497 });
498 } else {
499 bytes.push(None);
500 }
501 }
502 });
503 Some(quote! { #(#iter)* })
504 } else {
505 let cache_index = syn::Index::from(*flat_idx);
506 *flat_idx += 1;
507 Some(quote! {
508 if let Some(payload) = self.0.#slot_index.payload() {
509 let cached = self.1.get(#cache_index);
510 bytes.push(if cached.present {
511 Some(cached.resident_bytes)
512 } else {
513 cu29::monitoring::payload_io_stats(payload)
514 .ok()
515 .map(|io| io.resident_bytes as u64)
516 });
517 } else {
518 bytes.push(None);
519 }
520 })
521 }
522 })
523 .collect();
524
525 let compute_payload_bytes_fn = quote! {
526 pub fn compute_payload_bytes(culist: &CuList) -> cu29::prelude::CuResult<(u64, u64)> {
527 let mut raw: usize = 0;
528 let mut handles: usize = 0;
529 #(#payload_bytes_accumulators)*
530 Ok((raw as u64, handles as u64))
531 }
532 };
533
534 let payload_raw_bytes_impl = quote! {
535 impl ::cu29::CuPayloadRawBytes for CuStampedDataSet {
536 fn payload_raw_bytes(&self) -> Vec<Option<u64>> {
537 let mut bytes: Vec<Option<u64>> = Vec::with_capacity(#cumsg_count);
538 #(#payload_raw_bytes_accumulators)*
539 bytes
540 }
541 }
542 };
543
544 let mut slot_origin_ids: Vec<Option<String>> = vec![None; output_packs.len()];
545 let mut slot_task_names: Vec<Option<String>> = vec![None; output_packs.len()];
546
547 let mut methods = Vec::new();
548 for (node_id, task_id, member_name) in task_names {
549 let output_position = node_output_positions.get(node_id).unwrap_or_else(|| {
550 panic!("Task {task_id} (node id: {node_id}) not found in execution order")
551 });
552 let pack = output_packs
553 .get(*output_position)
554 .unwrap_or_else(|| panic!("Missing output pack for task {task_id}"));
555 let slot_index = syn::Index::from(*output_position);
556 slot_origin_ids[*output_position] = Some(task_id.clone());
557 slot_task_names[*output_position] = Some(member_name.clone());
558
559 if pack.msg_types.len() == 1 {
560 let fn_name = format_ident!("get_{}_output", member_name);
561 let payload_type = pack.msg_types.first().unwrap();
562 methods.push(quote! {
563 #[allow(dead_code)]
564 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
565 &self.0.#slot_index
566 }
567 });
568 } else {
569 let outputs_fn = format_ident!("get_{}_outputs", member_name);
570 let slot_type = pack.slot_type();
571 for (port_idx, payload_type) in pack.msg_types.iter().enumerate() {
572 let fn_name = format_ident!("get_{}_output_{}", member_name, port_idx);
573 let port_index = syn::Index::from(port_idx);
574 methods.push(quote! {
575 #[allow(dead_code)]
576 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
577 &self.0.#slot_index.#port_index
578 }
579 });
580 }
581 methods.push(quote! {
582 #[allow(dead_code)]
583 pub fn #outputs_fn(&self) -> &#slot_type {
584 &self.0.#slot_index
585 }
586 });
587 }
588 }
589
590 for spec in bridge_specs {
591 for channel in &spec.rx_channels {
592 if let Some(culist_index) = channel.culist_index {
593 let origin_id = format!("bridge::{}::rx::{}", spec.id, channel.id);
594 let Some(existing_slot) = slot_origin_ids.get_mut(culist_index) else {
595 panic!(
596 "Bridge origin '{origin_id}' points to out-of-range copperlist slot {culist_index}"
597 );
598 };
599 if let Some(existing) = existing_slot.as_ref() {
600 panic!(
601 "Duplicate slot origin assignment for slot {culist_index}: '{existing}' and '{origin_id}'"
602 );
603 }
604 *existing_slot = Some(origin_id.clone());
605 let Some(slot_name) = slot_task_names.get_mut(culist_index) else {
606 panic!(
607 "Bridge origin '{origin_id}' points to out-of-range name slot {culist_index}"
608 );
609 };
610 *slot_name = Some(origin_id);
611 }
612 }
613 for channel in &spec.tx_channels {
614 if let Some(culist_index) = channel.culist_index {
615 let origin_id = format!("bridge::{}::tx::{}", spec.id, channel.id);
616 let Some(existing_slot) = slot_origin_ids.get_mut(culist_index) else {
617 panic!(
618 "Bridge origin '{origin_id}' points to out-of-range copperlist slot {culist_index}"
619 );
620 };
621 if let Some(existing) = existing_slot.as_ref() {
622 panic!(
623 "Duplicate slot origin assignment for slot {culist_index}: '{existing}' and '{origin_id}'"
624 );
625 }
626 *existing_slot = Some(origin_id.clone());
627 let Some(slot_name) = slot_task_names.get_mut(culist_index) else {
628 panic!(
629 "Bridge origin '{origin_id}' points to out-of-range name slot {culist_index}"
630 );
631 };
632 *slot_name = Some(origin_id);
633 }
634 }
635 }
636
637 let task_name_literals = flatten_slot_origin_ids(&output_packs, slot_origin_ids);
638
639 let mut logviz_blocks = Vec::new();
640 for (slot_idx, pack) in output_packs.iter().enumerate() {
641 if pack.msg_types.is_empty() {
642 continue;
643 }
644 let slot_index = syn::Index::from(slot_idx);
645 let slot_name = slot_task_names.get(slot_idx).and_then(|name| name.as_ref());
646
647 if pack.is_multi() {
648 for (port_idx, _) in pack.msg_types.iter().enumerate() {
649 let port_index = syn::Index::from(port_idx);
650 let path_expr = if let Some(name) = slot_name {
651 let lit = LitStr::new(name, Span::call_site());
652 quote! { format!("{}/{}", #lit, #port_idx) }
653 } else {
654 quote! { format!("slot_{}/{}", #slot_idx, #port_idx) }
655 };
656 logviz_blocks.push(quote! {
657 {
658 let msg = &self.0.#slot_index.#port_index;
659 if let Some(payload) = msg.payload() {
660 ::cu29_logviz::apply_tov(rec, &msg.tov);
661 let path = #path_expr;
662 ::cu29_logviz::log_payload_auto(rec, &path, payload)?;
663 }
664 }
665 });
666 }
667 } else {
668 let path_expr = if let Some(name) = slot_name {
669 let lit = LitStr::new(name, Span::call_site());
670 quote! { #lit.to_string() }
671 } else {
672 quote! { format!("slot_{}", #slot_idx) }
673 };
674 logviz_blocks.push(quote! {
675 {
676 let msg = &self.0.#slot_index;
677 if let Some(payload) = msg.payload() {
678 ::cu29_logviz::apply_tov(rec, &msg.tov);
679 let path = #path_expr;
680 ::cu29_logviz::log_payload_auto(rec, &path, payload)?;
681 }
682 }
683 });
684 }
685 }
686
687 let logviz_impl = if cfg!(feature = "logviz") {
688 quote! {
689 impl ::cu29_logviz::LogvizDataSet for CuStampedDataSet {
690 fn logviz_emit(
691 &self,
692 rec: &::cu29_logviz::RecordingStream,
693 ) -> ::cu29::prelude::CuResult<()> {
694 #(#logviz_blocks)*
695 Ok(())
696 }
697 }
698 }
699 } else {
700 quote! {}
701 };
702 for spec in bridge_specs {
704 for channel in &spec.rx_channels {
705 if let Some(culist_index) = channel.culist_index {
706 let slot_index = syn::Index::from(culist_index);
707 let bridge_name = config_id_to_struct_member(spec.id.as_str());
708 let channel_name = config_id_to_struct_member(channel.id.as_str());
709 let fn_name = format_ident!("get_{}_rx_{}", bridge_name, channel_name);
710 let msg_type = &channel.msg_type;
711
712 methods.push(quote! {
713 #[allow(dead_code)]
714 pub fn #fn_name(&self) -> &CuMsg<#msg_type> {
715 &self.0.#slot_index
716 }
717 });
718 }
719 }
720 }
721
722 quote! {
724 #collect_metadata_function
725 #compute_payload_bytes_fn
726
727 pub struct CuStampedDataSet(pub #msgs_types_tuple, cu29::monitoring::CuMsgIoCache<#cumsg_count>);
728
729 pub type CuList = CopperList<CuStampedDataSet>;
730
731 impl CuStampedDataSet {
732 #(#methods)*
733
734 #[allow(dead_code)]
735 fn get_tuple(&self) -> &#msgs_types_tuple {
736 &self.0
737 }
738
739 #[allow(dead_code)]
740 fn get_tuple_mut(&mut self) -> &mut #msgs_types_tuple {
741 &mut self.0
742 }
743 }
744
745 #payload_raw_bytes_impl
746 #logviz_impl
747
748 impl MatchingTasks for CuStampedDataSet {
749 #[allow(dead_code)]
750 fn get_all_task_ids() -> &'static [&'static str] {
751 &[#(#task_name_literals),*]
752 }
753 }
754
755 #msgs_types_tuple_encode
761 #msgs_types_tuple_decode
762
763 #msgs_types_tuple_debug
765
766 #msgs_types_tuple_serialize
768
769 #msgs_types_tuple_default
771
772 #erasedmsg_trait_impl
774
775 impl CuListZeroedInit for CuStampedDataSet {
776 fn init_zeroed(&mut self) {
777 self.1.clear();
778 #(#zeroed_init_tokens)*
779 }
780 }
781 }
782}
783
784fn gen_sim_support(
785 runtime_plan: &CuExecutionLoop,
786 exec_entities: &[ExecutionEntity],
787 bridge_specs: &[BridgeSpec],
788) -> proc_macro2::TokenStream {
789 #[cfg(feature = "macro_debug")]
790 eprintln!("[Sim: Build SimEnum]");
791 let plan_enum: Vec<proc_macro2::TokenStream> = runtime_plan
792 .steps
793 .iter()
794 .map(|unit| match unit {
795 CuExecutionUnit::Step(step) => match &exec_entities[step.node_id as usize].kind {
796 ExecutionEntityKind::Task { .. } => {
797 let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
798 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
799 let inputs: Vec<Type> = step
800 .input_msg_indices_types
801 .iter()
802 .map(|input| {
803 parse_str::<Type>(format!("CuMsg<{}>", input.msg_type).as_str()).unwrap()
804 })
805 .collect();
806 let output: Option<Type> = step.output_msg_pack.as_ref().map(|pack| {
807 let msg_types: Vec<Type> = pack
808 .msg_types
809 .iter()
810 .map(|msg_type| {
811 parse_str::<Type>(msg_type.as_str()).unwrap_or_else(|_| {
812 panic!("Could not transform {msg_type} into a message Rust type.")
813 })
814 })
815 .collect();
816 build_output_slot_type(&msg_types)
817 });
818 let no_output = parse_str::<Type>("CuMsg<()>").unwrap();
819 let output = output.as_ref().unwrap_or(&no_output);
820
821 let inputs_type = if inputs.is_empty() {
822 quote! { () }
823 } else if inputs.len() == 1 {
824 let input = inputs.first().unwrap();
825 quote! { &'a #input }
826 } else {
827 quote! { &'a (#(&'a #inputs),*) }
828 };
829
830 quote! {
831 #enum_ident(CuTaskCallbackState<#inputs_type, &'a mut #output>)
832 }
833 }
834 ExecutionEntityKind::BridgeRx { bridge_index, channel_index } => {
835 let bridge_spec = &bridge_specs[*bridge_index];
836 let channel = &bridge_spec.rx_channels[*channel_index];
837 let enum_entry_name = config_id_to_enum(&format!("{}_rx_{}", bridge_spec.id, channel.id));
838 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
839 let channel_type: Type = parse_str::<Type>(channel.msg_type_name.as_str()).unwrap();
840 let bridge_type = runtime_bridge_type_for_spec(bridge_spec, true);
841 let _const_ident = &channel.const_ident;
842 quote! {
843 #enum_ident {
844 channel: &'static cu29::cubridge::BridgeChannel<< <#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet >::Id, #channel_type>,
845 msg: &'a mut CuMsg<#channel_type>,
846 }
847 }
848 }
849 ExecutionEntityKind::BridgeTx { bridge_index, channel_index } => {
850 let bridge_spec = &bridge_specs[*bridge_index];
851 let channel = &bridge_spec.tx_channels[*channel_index];
852 let enum_entry_name = config_id_to_enum(&format!("{}_tx_{}", bridge_spec.id, channel.id));
853 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
854 let channel_type: Type = parse_str::<Type>(channel.msg_type_name.as_str()).unwrap();
855 let output_pack = step
856 .output_msg_pack
857 .as_ref()
858 .expect("Bridge Tx channel missing output pack for sim support");
859 let output_types: Vec<Type> = output_pack
860 .msg_types
861 .iter()
862 .map(|msg_type| {
863 parse_str::<Type>(msg_type.as_str()).unwrap_or_else(|_| {
864 panic!("Could not transform {msg_type} into a message Rust type.")
865 })
866 })
867 .collect();
868 let output_type = build_output_slot_type(&output_types);
869 let bridge_type = runtime_bridge_type_for_spec(bridge_spec, true);
870 let _const_ident = &channel.const_ident;
871 quote! {
872 #enum_ident {
873 channel: &'static cu29::cubridge::BridgeChannel<< <#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet >::Id, #channel_type>,
874 msg: &'a CuMsg<#channel_type>,
875 output: &'a mut #output_type,
876 }
877 }
878 }
879 },
880 CuExecutionUnit::Loop(_) => {
881 todo!("Needs to be implemented")
882 }
883 })
884 .collect();
885
886 let mut variants = plan_enum;
888
889 for bridge_spec in bridge_specs {
891 let enum_entry_name = config_id_to_enum(&format!("{}_bridge", bridge_spec.id));
892 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
893 variants.push(quote! {
894 #enum_ident(cu29::simulation::CuBridgeLifecycleState)
895 });
896 }
897
898 variants.push(quote! { __Phantom(core::marker::PhantomData<&'a ()>) });
899 quote! {
900 #[allow(dead_code, unused_lifetimes)]
902 pub enum SimStep<'a> {
903 #(#variants),*
904 }
905 }
906}
907
908fn gen_recorded_replay_support(
909 runtime_plan: &CuExecutionLoop,
910 exec_entities: &[ExecutionEntity],
911 bridge_specs: &[BridgeSpec],
912) -> proc_macro2::TokenStream {
913 let replay_arms: Vec<proc_macro2::TokenStream> = runtime_plan
914 .steps
915 .iter()
916 .filter_map(|unit| match unit {
917 CuExecutionUnit::Step(step) => match &exec_entities[step.node_id as usize].kind {
918 ExecutionEntityKind::Task { .. } => {
919 let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
920 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
921 let output_pack = step
922 .output_msg_pack
923 .as_ref()
924 .expect("Task step missing output pack for recorded replay");
925 let culist_index = int2sliceindex(output_pack.culist_index);
926 Some(quote! {
927 SimStep::#enum_ident(CuTaskCallbackState::Process(_, output)) => {
928 *output = recorded.msgs.0.#culist_index.clone();
929 SimOverride::ExecutedBySim
930 }
931 })
932 }
933 ExecutionEntityKind::BridgeRx {
934 bridge_index,
935 channel_index,
936 } => {
937 let bridge_spec = &bridge_specs[*bridge_index];
938 let channel = &bridge_spec.rx_channels[*channel_index];
939 let enum_entry_name =
940 config_id_to_enum(&format!("{}_rx_{}", bridge_spec.id, channel.id));
941 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
942 let output_pack = step
943 .output_msg_pack
944 .as_ref()
945 .expect("Bridge Rx channel missing output pack for recorded replay");
946 let port_index = output_pack
947 .msg_types
948 .iter()
949 .position(|msg| msg == &channel.msg_type_name)
950 .unwrap_or_else(|| {
951 panic!(
952 "Bridge Rx channel '{}' missing output port for '{}'",
953 channel.id, channel.msg_type_name
954 )
955 });
956 let culist_index = int2sliceindex(output_pack.culist_index);
957 let recorded_slot = if output_pack.msg_types.len() == 1 {
958 quote! { recorded.msgs.0.#culist_index.clone() }
959 } else {
960 let port_index = syn::Index::from(port_index);
961 quote! { recorded.msgs.0.#culist_index.#port_index.clone() }
962 };
963 Some(quote! {
964 SimStep::#enum_ident { msg, .. } => {
965 *msg = #recorded_slot;
966 SimOverride::ExecutedBySim
967 }
968 })
969 }
970 ExecutionEntityKind::BridgeTx {
971 bridge_index,
972 channel_index,
973 } => {
974 let bridge_spec = &bridge_specs[*bridge_index];
975 let channel = &bridge_spec.tx_channels[*channel_index];
976 let enum_entry_name =
977 config_id_to_enum(&format!("{}_tx_{}", bridge_spec.id, channel.id));
978 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
979 let output_pack = step
980 .output_msg_pack
981 .as_ref()
982 .expect("Bridge Tx channel missing output pack for recorded replay");
983 let culist_index = int2sliceindex(output_pack.culist_index);
984 Some(quote! {
985 SimStep::#enum_ident { output, .. } => {
986 *output = recorded.msgs.0.#culist_index.clone();
987 SimOverride::ExecutedBySim
988 }
989 })
990 }
991 },
992 CuExecutionUnit::Loop(_) => None,
993 })
994 .collect();
995
996 quote! {
997 #[allow(dead_code)]
998 pub fn recorded_replay_step<'a>(
999 step: SimStep<'a>,
1000 recorded: &CopperList<CuStampedDataSet>,
1001 ) -> SimOverride {
1002 match step {
1003 #(#replay_arms),*,
1004 _ => SimOverride::ExecuteByRuntime,
1005 }
1006 }
1007 }
1008}
1009
1010#[proc_macro_attribute]
1018pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
1019 #[cfg(feature = "macro_debug")]
1020 eprintln!("[entry]");
1021 let mut application_struct = parse_macro_input!(input as ItemStruct);
1022
1023 let application_name = &application_struct.ident;
1024 let builder_name = format_ident!("{}Builder", application_name);
1025 let runtime_args = match CopperRuntimeArgs::parse_tokens(args.into()) {
1026 Ok(runtime_args) => runtime_args,
1027 Err(err) => return err.to_compile_error().into(),
1028 };
1029 let config_file = runtime_args.config_path.clone();
1030 let sim_mode = runtime_args.sim_mode;
1031 let ignore_resources = runtime_args.ignore_resources;
1032
1033 #[cfg(feature = "std")]
1034 let std = true;
1035
1036 #[cfg(not(feature = "std"))]
1037 let std = false;
1038 let signal_handler = cfg!(feature = "signal-handler");
1039 let parallel_rt_enabled = cfg!(feature = "parallel-rt");
1040 let rt_guard = rtsan_guard_tokens();
1041
1042 if ignore_resources && !sim_mode {
1043 return return_error(
1044 "`ignore_resources` is only supported when `sim_mode` is enabled".to_string(),
1045 );
1046 }
1047
1048 let resolved_runtime_config = match resolve_runtime_config(&runtime_args) {
1058 Ok(resolved_runtime_config) => resolved_runtime_config,
1059 Err(e) => return return_error(e.to_string()),
1060 };
1061 let subsystem_code = resolved_runtime_config.subsystem_code;
1062 let subsystem_id = resolved_runtime_config.subsystem_id.clone();
1063 let copper_config_content = resolved_runtime_config.bundled_local_config_content.clone();
1064 let copper_config = resolved_runtime_config.local_config;
1065 let copperlist_count = copper_config
1066 .logging
1067 .as_ref()
1068 .and_then(|logging| logging.copperlist_count)
1069 .unwrap_or(DEFAULT_CLNB);
1070 let copperlist_count_tokens = proc_macro2::Literal::usize_unsuffixed(copperlist_count);
1071 let caller_root = utils::caller_crate_root();
1072 let (git_commit, git_dirty) = detect_git_info(&caller_root);
1073 let git_commit_tokens = if let Some(commit) = git_commit {
1074 quote! { Some(#commit.to_string()) }
1075 } else {
1076 quote! { None }
1077 };
1078 let git_dirty_tokens = if let Some(dirty) = git_dirty {
1079 quote! { Some(#dirty) }
1080 } else {
1081 quote! { None }
1082 };
1083 let subsystem_code_literal = proc_macro2::Literal::u16_unsuffixed(subsystem_code);
1084 let subsystem_id_tokens = if let Some(subsystem_id) = subsystem_id.as_deref() {
1085 quote! { Some(#subsystem_id) }
1086 } else {
1087 quote! { None }
1088 };
1089
1090 #[cfg(feature = "macro_debug")]
1091 eprintln!("[build monitor type]");
1092 let monitor_configs = copper_config.get_monitor_configs();
1093 let (monitor_type, monitor_instanciator_body) = if monitor_configs.is_empty() {
1094 (
1095 quote! { NoMonitor },
1096 quote! {
1097 let monitor_metadata = metadata.with_subsystem_id(#subsystem_id_tokens);
1098 let monitor = NoMonitor::new(monitor_metadata, runtime)
1099 .expect("Failed to create NoMonitor.");
1100 monitor
1101 },
1102 )
1103 } else if monitor_configs.len() == 1 {
1104 let only_monitor_type = parse_str::<Type>(monitor_configs[0].get_type())
1105 .expect("Could not transform the monitor type name into a Rust type.");
1106 (
1107 quote! { #only_monitor_type },
1108 quote! {
1109 let monitor_metadata = metadata.with_monitor_config(
1110 config
1111 .get_monitor_configs()
1112 .first()
1113 .and_then(|entry| entry.get_config().cloned())
1114 )
1115 .with_subsystem_id(#subsystem_id_tokens);
1116 let monitor = #only_monitor_type::new(monitor_metadata, runtime)
1117 .expect("Failed to create the given monitor.");
1118 monitor
1119 },
1120 )
1121 } else {
1122 let monitor_types: Vec<Type> = monitor_configs
1123 .iter()
1124 .map(|monitor_config| {
1125 parse_str::<Type>(monitor_config.get_type())
1126 .expect("Could not transform the monitor type name into a Rust type.")
1127 })
1128 .collect();
1129 let monitor_bindings: Vec<Ident> = (0..monitor_types.len())
1130 .map(|idx| format_ident!("__cu_monitor_{idx}"))
1131 .collect();
1132 let monitor_indices: Vec<syn::Index> =
1133 (0..monitor_types.len()).map(syn::Index::from).collect();
1134
1135 let monitor_builders: Vec<proc_macro2::TokenStream> = monitor_types
1136 .iter()
1137 .zip(monitor_bindings.iter())
1138 .zip(monitor_indices.iter())
1139 .map(|((monitor_ty, monitor_binding), monitor_idx)| {
1140 quote! {
1141 let __cu_monitor_cfg_entry = config
1142 .get_monitor_configs()
1143 .get(#monitor_idx)
1144 .and_then(|entry| entry.get_config().cloned());
1145 let __cu_monitor_metadata = metadata
1146 .clone()
1147 .with_monitor_config(__cu_monitor_cfg_entry)
1148 .with_subsystem_id(#subsystem_id_tokens);
1149 let #monitor_binding = #monitor_ty::new(__cu_monitor_metadata, runtime.clone())
1150 .expect("Failed to create one of the configured monitors.");
1151 }
1152 })
1153 .collect();
1154 let tuple_type: TypeTuple = parse_quote! { (#(#monitor_types),*,) };
1155 (
1156 quote! { #tuple_type },
1157 quote! {
1158 #(#monitor_builders)*
1159 let monitor: #tuple_type = (#(#monitor_bindings),*,);
1160 monitor
1161 },
1162 )
1163 };
1164
1165 #[cfg(feature = "macro_debug")]
1167 eprintln!("[build runtime field]");
1168 let runtime_field: Field = if sim_mode {
1170 parse_quote! {
1171 copper_runtime: cu29::curuntime::CuRuntime<CuSimTasks, CuBridges, CuStampedDataSet, #monitor_type, #copperlist_count_tokens>
1172 }
1173 } else {
1174 parse_quote! {
1175 copper_runtime: cu29::curuntime::CuRuntime<CuTasks, CuBridges, CuStampedDataSet, #monitor_type, #copperlist_count_tokens>
1176 }
1177 };
1178 let lifecycle_stream_field: Field = parse_quote! {
1179 runtime_lifecycle_stream: Option<Box<dyn WriteStream<RuntimeLifecycleRecord>>>
1180 };
1181 let logger_runtime_field: Field = parse_quote! {
1182 logger_runtime: cu29::prelude::LoggerRuntime
1183 };
1184
1185 #[cfg(feature = "macro_debug")]
1186 eprintln!("[match struct anonymity]");
1187 match &mut application_struct.fields {
1188 Named(fields_named) => {
1189 fields_named.named.push(runtime_field);
1190 fields_named.named.push(lifecycle_stream_field);
1191 fields_named.named.push(logger_runtime_field);
1192 }
1193 Unnamed(fields_unnamed) => {
1194 fields_unnamed.unnamed.push(runtime_field);
1195 fields_unnamed.unnamed.push(lifecycle_stream_field);
1196 fields_unnamed.unnamed.push(logger_runtime_field);
1197 }
1198 Fields::Unit => {
1199 panic!(
1200 "This struct is a unit struct, it should have named or unnamed fields. use struct Something {{}} and not struct Something;"
1201 )
1202 }
1203 };
1204
1205 let all_missions = copper_config.graphs.get_all_missions_graphs();
1206 let mut all_missions_tokens = Vec::<proc_macro2::TokenStream>::new();
1207 for (mission, graph) in &all_missions {
1208 let git_commit_tokens = git_commit_tokens.clone();
1209 let git_dirty_tokens = git_dirty_tokens.clone();
1210 let mission_mod = parse_str::<Ident>(mission.as_str())
1211 .expect("Could not make an identifier of the mission name");
1212
1213 #[cfg(feature = "macro_debug")]
1214 eprintln!("[extract tasks ids & types]");
1215 let task_specs = CuTaskSpecSet::from_graph(graph);
1216
1217 let culist_channel_usage = collect_bridge_channel_usage(graph);
1218 let mut culist_bridge_specs =
1219 build_bridge_specs(&copper_config, graph, &culist_channel_usage);
1220 let (culist_plan, culist_exec_entities, culist_plan_to_original) =
1221 match build_execution_plan(graph, &task_specs, &mut culist_bridge_specs) {
1222 Ok(plan) => plan,
1223 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
1224 };
1225 let task_names = collect_task_names(graph);
1226 let (culist_call_order, node_output_positions) = collect_culist_metadata(
1227 &culist_plan,
1228 &culist_exec_entities,
1229 &mut culist_bridge_specs,
1230 &culist_plan_to_original,
1231 );
1232
1233 #[cfg(feature = "macro_debug")]
1234 {
1235 eprintln!("[runtime plan for mission {mission}]");
1236 eprintln!("{culist_plan:?}");
1237 }
1238
1239 let culist_support: proc_macro2::TokenStream = gen_culist_support(
1240 &culist_plan,
1241 &culist_call_order,
1242 &node_output_positions,
1243 &task_names,
1244 &culist_bridge_specs,
1245 );
1246
1247 let (
1248 threadpool_bundle_index,
1249 resources_module,
1250 resources_instanciator_fn,
1251 task_resource_mappings,
1252 bridge_resource_mappings,
1253 ) = if ignore_resources {
1254 if task_specs.background_flags.iter().any(|&flag| flag) {
1255 return return_error(
1256 "`ignore_resources` cannot be used with background tasks because they require the threadpool resource bundle"
1257 .to_string(),
1258 );
1259 }
1260
1261 let bundle_specs: Vec<BundleSpec> = Vec::new();
1262 let resource_specs: Vec<ResourceKeySpec> = Vec::new();
1263 let (resources_module, resources_instanciator_fn) =
1264 match build_resources_module(&bundle_specs) {
1265 Ok(tokens) => tokens,
1266 Err(e) => return return_error(e.to_string()),
1267 };
1268 let task_resource_mappings =
1269 match build_task_resource_mappings(&resource_specs, &task_specs) {
1270 Ok(tokens) => tokens,
1271 Err(e) => return return_error(e.to_string()),
1272 };
1273 let bridge_resource_mappings =
1274 build_bridge_resource_mappings(&resource_specs, &culist_bridge_specs, sim_mode);
1275 (
1276 None,
1277 resources_module,
1278 resources_instanciator_fn,
1279 task_resource_mappings,
1280 bridge_resource_mappings,
1281 )
1282 } else {
1283 let bundle_specs = match build_bundle_specs(&copper_config, mission.as_str()) {
1284 Ok(specs) => specs,
1285 Err(e) => return return_error(e.to_string()),
1286 };
1287 let threadpool_bundle_index = if task_specs.background_flags.iter().any(|&flag| flag) {
1288 match bundle_specs
1289 .iter()
1290 .position(|bundle| bundle.id == "threadpool")
1291 {
1292 Some(index) => Some(index),
1293 None => {
1294 return return_error(
1295 "Background tasks require the threadpool bundle to be configured"
1296 .to_string(),
1297 );
1298 }
1299 }
1300 } else {
1301 None
1302 };
1303
1304 let resource_specs = match collect_resource_specs(
1305 graph,
1306 &task_specs,
1307 &culist_bridge_specs,
1308 &bundle_specs,
1309 ) {
1310 Ok(specs) => specs,
1311 Err(e) => return return_error(e.to_string()),
1312 };
1313
1314 let (resources_module, resources_instanciator_fn) =
1315 match build_resources_module(&bundle_specs) {
1316 Ok(tokens) => tokens,
1317 Err(e) => return return_error(e.to_string()),
1318 };
1319 let task_resource_mappings =
1320 match build_task_resource_mappings(&resource_specs, &task_specs) {
1321 Ok(tokens) => tokens,
1322 Err(e) => return return_error(e.to_string()),
1323 };
1324 let bridge_resource_mappings =
1325 build_bridge_resource_mappings(&resource_specs, &culist_bridge_specs, sim_mode);
1326 (
1327 threadpool_bundle_index,
1328 resources_module,
1329 resources_instanciator_fn,
1330 task_resource_mappings,
1331 bridge_resource_mappings,
1332 )
1333 };
1334
1335 let task_ids = task_specs.ids.clone();
1336 let ids = build_monitored_ids(&task_ids, &mut culist_bridge_specs);
1337 let parallel_rt_stage_entries = match build_parallel_rt_stage_entries(
1338 &culist_plan,
1339 &culist_exec_entities,
1340 &task_specs,
1341 &culist_bridge_specs,
1342 ) {
1343 Ok(entries) => entries,
1344 Err(e) => return return_error(e.to_string()),
1345 };
1346 let parallel_rt_metadata_defs = if std && parallel_rt_enabled {
1347 Some(quote! {
1348 pub const PARALLEL_RT_STAGES: &'static [cu29::parallel_rt::ParallelRtStageMetadata] =
1349 &[#( #parallel_rt_stage_entries ),*];
1350 pub const PARALLEL_RT_METADATA: cu29::parallel_rt::ParallelRtMetadata =
1351 cu29::parallel_rt::ParallelRtMetadata::new(PARALLEL_RT_STAGES);
1352 })
1353 } else {
1354 None
1355 };
1356 let monitored_component_entries: Vec<proc_macro2::TokenStream> = ids
1357 .iter()
1358 .enumerate()
1359 .map(|(idx, id)| {
1360 let id_lit = LitStr::new(id, Span::call_site());
1361 if idx < task_specs.task_types.len() {
1362 let task_ty = &task_specs.task_types[idx];
1363 let component_type = match task_specs.cutypes[idx] {
1364 CuTaskType::Source => quote! { cu29::monitoring::ComponentType::Source },
1365 CuTaskType::Regular => quote! { cu29::monitoring::ComponentType::Task },
1366 CuTaskType::Sink => quote! { cu29::monitoring::ComponentType::Sink },
1367 };
1368 quote! {
1369 cu29::monitoring::MonitorComponentMetadata::new(
1370 #id_lit,
1371 #component_type,
1372 Some(stringify!(#task_ty)),
1373 )
1374 }
1375 } else {
1376 quote! {
1377 cu29::monitoring::MonitorComponentMetadata::new(
1378 #id_lit,
1379 cu29::monitoring::ComponentType::Bridge,
1380 None,
1381 )
1382 }
1383 }
1384 })
1385 .collect();
1386 let culist_component_mapping = match build_monitor_culist_component_mapping(
1387 &culist_plan,
1388 &culist_exec_entities,
1389 &culist_bridge_specs,
1390 ) {
1391 Ok(mapping) => mapping,
1392 Err(e) => return return_error(e),
1393 };
1394
1395 let task_reflect_read_arms: Vec<proc_macro2::TokenStream> = task_specs
1396 .ids
1397 .iter()
1398 .enumerate()
1399 .map(|(index, task_id)| {
1400 let task_index = syn::Index::from(index);
1401 let task_id_lit = LitStr::new(task_id, Span::call_site());
1402 quote! {
1403 #task_id_lit => Some(&self.copper_runtime.tasks.#task_index as &dyn cu29::reflect::Reflect),
1404 }
1405 })
1406 .collect();
1407
1408 let task_reflect_write_arms: Vec<proc_macro2::TokenStream> = task_specs
1409 .ids
1410 .iter()
1411 .enumerate()
1412 .map(|(index, task_id)| {
1413 let task_index = syn::Index::from(index);
1414 let task_id_lit = LitStr::new(task_id, Span::call_site());
1415 quote! {
1416 #task_id_lit => Some(&mut self.copper_runtime.tasks.#task_index as &mut dyn cu29::reflect::Reflect),
1417 }
1418 })
1419 .collect();
1420
1421 let mut reflect_registry_types: BTreeMap<String, Type> = BTreeMap::new();
1422 let mut add_reflect_type = |ty: Type| {
1423 let key = quote! { #ty }.to_string();
1424 reflect_registry_types.entry(key).or_insert(ty);
1425 };
1426
1427 for task_type in &task_specs.task_types {
1428 add_reflect_type(task_type.clone());
1429 }
1430
1431 let mut sim_bridge_channel_decls = Vec::<proc_macro2::TokenStream>::new();
1432 let bridge_runtime_types: Vec<Type> = culist_bridge_specs
1433 .iter()
1434 .map(|spec| {
1435 if sim_mode && !spec.run_in_sim {
1436 let (tx_set_ident, tx_id_ident, rx_set_ident, rx_id_ident) =
1437 sim_bridge_channel_set_idents(spec.tuple_index);
1438
1439 if !spec.tx_channels.is_empty() {
1440 let tx_entries = spec.tx_channels.iter().map(|channel| {
1441 let entry_ident = Ident::new(
1442 &channel.const_ident.to_string().to_lowercase(),
1443 Span::call_site(),
1444 );
1445 let msg_type = &channel.msg_type;
1446 quote! { #entry_ident => #msg_type, }
1447 });
1448 sim_bridge_channel_decls.push(quote! {
1449 cu29::tx_channels! {
1450 pub struct #tx_set_ident : #tx_id_ident {
1451 #(#tx_entries)*
1452 }
1453 }
1454 });
1455 }
1456
1457 if !spec.rx_channels.is_empty() {
1458 let rx_entries = spec.rx_channels.iter().map(|channel| {
1459 let entry_ident = Ident::new(
1460 &channel.const_ident.to_string().to_lowercase(),
1461 Span::call_site(),
1462 );
1463 let msg_type = &channel.msg_type;
1464 quote! { #entry_ident => #msg_type, }
1465 });
1466 sim_bridge_channel_decls.push(quote! {
1467 cu29::rx_channels! {
1468 pub struct #rx_set_ident : #rx_id_ident {
1469 #(#rx_entries)*
1470 }
1471 }
1472 });
1473 }
1474 }
1475 runtime_bridge_type_for_spec(spec, sim_mode)
1476 })
1477 .collect();
1478 let sim_bridge_channel_defs = quote! { #(#sim_bridge_channel_decls)* };
1479
1480 for (bridge_index, bridge_spec) in culist_bridge_specs.iter().enumerate() {
1481 add_reflect_type(bridge_runtime_types[bridge_index].clone());
1482 for channel in bridge_spec
1483 .rx_channels
1484 .iter()
1485 .chain(bridge_spec.tx_channels.iter())
1486 {
1487 add_reflect_type(channel.msg_type.clone());
1488 }
1489 }
1490
1491 for output_pack in extract_output_packs(&culist_plan) {
1492 for msg_type in output_pack.msg_types {
1493 add_reflect_type(msg_type);
1494 }
1495 }
1496
1497 let reflect_type_registration_calls: Vec<proc_macro2::TokenStream> = reflect_registry_types
1498 .values()
1499 .map(|ty| {
1500 quote! {
1501 registry.register::<#ty>();
1502 }
1503 })
1504 .collect();
1505
1506 let bridges_type_tokens: proc_macro2::TokenStream = if bridge_runtime_types.is_empty() {
1507 quote! { () }
1508 } else {
1509 let bridge_types_for_tuple = bridge_runtime_types.clone();
1510 let tuple: TypeTuple = parse_quote! { (#(#bridge_types_for_tuple),*,) };
1511 quote! { #tuple }
1512 };
1513
1514 let bridge_binding_idents: Vec<Ident> = culist_bridge_specs
1515 .iter()
1516 .enumerate()
1517 .map(|(idx, _)| format_ident!("bridge_{idx}"))
1518 .collect();
1519
1520 let bridge_init_statements: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1521 .iter()
1522 .enumerate()
1523 .map(|(idx, spec)| {
1524 let binding_ident = &bridge_binding_idents[idx];
1525 let bridge_mapping_ref = bridge_resource_mappings.refs[idx].clone();
1526 let bridge_type = &bridge_runtime_types[idx];
1527 let bridge_name = spec.id.clone();
1528 let config_index = syn::Index::from(spec.config_index);
1529 let binding_error = LitStr::new(
1530 &format!("Failed to bind resources for bridge '{}'", bridge_name),
1531 Span::call_site(),
1532 );
1533 let tx_configs: Vec<proc_macro2::TokenStream> = spec
1534 .tx_channels
1535 .iter()
1536 .map(|channel| {
1537 let const_ident = &channel.const_ident;
1538 let channel_name = channel.id.clone();
1539 let channel_config_index = syn::Index::from(channel.config_index);
1540 quote! {
1541 {
1542 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
1543 cu29::config::BridgeChannelConfigRepresentation::Tx { route, config, .. } => {
1544 (route.clone(), config.clone())
1545 }
1546 _ => panic!(
1547 "Bridge '{}' channel '{}' expected to be Tx",
1548 #bridge_name,
1549 #channel_name
1550 ),
1551 };
1552 cu29::cubridge::BridgeChannelConfig::from_static(
1553 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
1554 channel_route,
1555 channel_config,
1556 )
1557 }
1558 }
1559 })
1560 .collect();
1561 let rx_configs: Vec<proc_macro2::TokenStream> = spec
1562 .rx_channels
1563 .iter()
1564 .map(|channel| {
1565 let const_ident = &channel.const_ident;
1566 let channel_name = channel.id.clone();
1567 let channel_config_index = syn::Index::from(channel.config_index);
1568 quote! {
1569 {
1570 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
1571 cu29::config::BridgeChannelConfigRepresentation::Rx { route, config, .. } => {
1572 (route.clone(), config.clone())
1573 }
1574 _ => panic!(
1575 "Bridge '{}' channel '{}' expected to be Rx",
1576 #bridge_name,
1577 #channel_name
1578 ),
1579 };
1580 cu29::cubridge::BridgeChannelConfig::from_static(
1581 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
1582 channel_route,
1583 channel_config,
1584 )
1585 }
1586 }
1587 })
1588 .collect();
1589 quote! {
1590 let #binding_ident = {
1591 let bridge_cfg = config
1592 .bridges
1593 .get(#config_index)
1594 .unwrap_or_else(|| panic!("Bridge '{}' missing from configuration", #bridge_name));
1595 let bridge_mapping = #bridge_mapping_ref;
1596 let bridge_resources = <<#bridge_type as cu29::cubridge::CuBridge>::Resources<'_> as ResourceBindings>::from_bindings(
1597 resources,
1598 bridge_mapping,
1599 )
1600 .map_err(|e| cu29::CuError::new_with_cause(#binding_error, e))?;
1601 let tx_channels: &[cu29::cubridge::BridgeChannelConfig<
1602 <<#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet>::Id,
1603 >] = &[#(#tx_configs),*];
1604 let rx_channels: &[cu29::cubridge::BridgeChannelConfig<
1605 <<#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet>::Id,
1606 >] = &[#(#rx_configs),*];
1607 <#bridge_type as cu29::cubridge::CuBridge>::new(
1608 bridge_cfg.config.as_ref(),
1609 tx_channels,
1610 rx_channels,
1611 bridge_resources,
1612 )?
1613 };
1614 }
1615 })
1616 .collect();
1617
1618 let bridges_instanciator = if culist_bridge_specs.is_empty() {
1619 quote! {
1620 pub fn bridges_instanciator(_config: &CuConfig, resources: &mut ResourceManager) -> CuResult<CuBridges> {
1621 let _ = resources;
1622 Ok(())
1623 }
1624 }
1625 } else {
1626 let bridge_bindings = bridge_binding_idents.clone();
1627 quote! {
1628 pub fn bridges_instanciator(config: &CuConfig, resources: &mut ResourceManager) -> CuResult<CuBridges> {
1629 #(#bridge_init_statements)*
1630 Ok((#(#bridge_bindings),*,))
1631 }
1632 }
1633 };
1634
1635 let all_sim_tasks_types: Vec<Type> = task_specs
1636 .ids
1637 .iter()
1638 .zip(&task_specs.cutypes)
1639 .zip(&task_specs.sim_task_types)
1640 .zip(&task_specs.background_flags)
1641 .zip(&task_specs.run_in_sim_flags)
1642 .zip(task_specs.output_types.iter())
1643 .map(|(((((task_id, task_type), sim_type), background), run_in_sim), output_type)| {
1644 match task_type {
1645 CuTaskType::Source => {
1646 if *background {
1647 panic!("CuSrcTask {task_id} cannot be a background task, it should be a regular task.");
1648 }
1649 if *run_in_sim {
1650 sim_type.clone()
1651 } else {
1652 let msg_type = graph
1653 .get_node_output_msg_type(task_id.as_str())
1654 .unwrap_or_else(|| panic!("CuSrcTask {task_id} should have an outgoing connection with a valid output msg type"));
1655 let sim_task_name = format!("CuSimSrcTask<{msg_type}>");
1656 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
1657 }
1658 }
1659 CuTaskType::Regular => {
1660 if *background {
1661 if let Some(out_ty) = output_type {
1662 parse_quote!(CuAsyncTask<#sim_type, #out_ty>)
1663 } else {
1664 panic!("{task_id}: If a task is background, it has to have an output");
1665 }
1666 } else {
1667 sim_type.clone()
1669 }
1670 },
1671 CuTaskType::Sink => {
1672 if *background {
1673 panic!("CuSinkTask {task_id} cannot be a background task, it should be a regular task.");
1674 }
1675
1676 if *run_in_sim {
1677 sim_type.clone()
1679 }
1680 else {
1681 let msg_types = graph
1683 .get_node_input_msg_types(task_id.as_str())
1684 .unwrap_or_else(|| panic!("CuSinkTask {task_id} should have an incoming connection with a valid input msg type"));
1685 let msg_type = if msg_types.len() == 1 {
1686 format!("({},)", msg_types[0])
1687 } else {
1688 format!("({})", msg_types.join(", "))
1689 };
1690 let sim_task_name = format!("CuSimSinkTask<{msg_type}>");
1691 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
1692 }
1693 }
1694 }
1695 })
1696 .collect();
1697
1698 #[cfg(feature = "macro_debug")]
1699 eprintln!("[build task tuples]");
1700
1701 let task_types = &task_specs.task_types;
1702 let task_types_tuple: TypeTuple = if task_types.is_empty() {
1705 parse_quote! { () }
1706 } else {
1707 parse_quote! { (#(#task_types),*,) }
1708 };
1709
1710 let task_types_tuple_sim: TypeTuple = if all_sim_tasks_types.is_empty() {
1711 parse_quote! { () }
1712 } else {
1713 parse_quote! { (#(#all_sim_tasks_types),*,) }
1714 };
1715
1716 #[cfg(feature = "macro_debug")]
1717 eprintln!("[gen instances]");
1718 let task_sim_instances_init_code = all_sim_tasks_types
1719 .iter()
1720 .enumerate()
1721 .map(|(index, ty)| {
1722 let additional_error_info = format!(
1723 "Failed to get create instance for {}, instance index {}.",
1724 task_specs.type_names[index], index
1725 );
1726 let mapping_ref = task_resource_mappings.refs[index].clone();
1727 let background = task_specs.background_flags[index];
1728 let inner_task_type = &task_specs.sim_task_types[index];
1729 match task_specs.cutypes[index] {
1730 CuTaskType::Source => quote! {
1731 {
1732 let resources = <<#ty as CuSrcTask>::Resources<'_> as ResourceBindings>::from_bindings(
1733 resources,
1734 #mapping_ref,
1735 ).map_err(|e| e.add_cause(#additional_error_info))?;
1736 <#ty as CuSrcTask>::new(all_instances_configs[#index], resources)
1737 .map_err(|e| e.add_cause(#additional_error_info))?
1738 }
1739 },
1740 CuTaskType::Regular => {
1741 if background {
1742 let threadpool_bundle_index = threadpool_bundle_index
1743 .expect("threadpool bundle missing for background tasks");
1744 quote! {
1745 {
1746 let inner_resources = <<#inner_task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
1747 resources,
1748 #mapping_ref,
1749 ).map_err(|e| e.add_cause(#additional_error_info))?;
1750 let threadpool_key = cu29::resource::ResourceKey::new(
1751 cu29::resource::BundleIndex::new(#threadpool_bundle_index),
1752 <cu29::resource::ThreadPoolBundle as cu29::resource::ResourceBundleDecl>::Id::BgThreads as usize,
1753 );
1754 let threadpool = resources.borrow_shared_arc(threadpool_key)?;
1755 let resources = cu29::cuasynctask::CuAsyncTaskResources {
1756 inner: inner_resources,
1757 threadpool,
1758 };
1759 <#ty as CuTask>::new(all_instances_configs[#index], resources)
1760 .map_err(|e| e.add_cause(#additional_error_info))?
1761 }
1762 }
1763 } else {
1764 quote! {
1765 {
1766 let resources = <<#ty as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
1767 resources,
1768 #mapping_ref,
1769 ).map_err(|e| e.add_cause(#additional_error_info))?;
1770 <#ty as CuTask>::new(all_instances_configs[#index], resources)
1771 .map_err(|e| e.add_cause(#additional_error_info))?
1772 }
1773 }
1774 }
1775 }
1776 CuTaskType::Sink => quote! {
1777 {
1778 let resources = <<#ty as CuSinkTask>::Resources<'_> as ResourceBindings>::from_bindings(
1779 resources,
1780 #mapping_ref,
1781 ).map_err(|e| e.add_cause(#additional_error_info))?;
1782 <#ty as CuSinkTask>::new(all_instances_configs[#index], resources)
1783 .map_err(|e| e.add_cause(#additional_error_info))?
1784 }
1785 },
1786 }
1787 })
1788 .collect::<Vec<_>>();
1789
1790 let task_instances_init_code = task_specs
1791 .instantiation_types
1792 .iter()
1793 .zip(&task_specs.background_flags)
1794 .enumerate()
1795 .map(|(index, (task_type, background))| {
1796 let additional_error_info = format!(
1797 "Failed to get create instance for {}, instance index {}.",
1798 task_specs.type_names[index], index
1799 );
1800 let mapping_ref = task_resource_mappings.refs[index].clone();
1801 let inner_task_type = &task_specs.sim_task_types[index];
1802 match task_specs.cutypes[index] {
1803 CuTaskType::Source => quote! {
1804 {
1805 let resources = <<#task_type as CuSrcTask>::Resources<'_> as ResourceBindings>::from_bindings(
1806 resources,
1807 #mapping_ref,
1808 ).map_err(|e| e.add_cause(#additional_error_info))?;
1809 <#task_type as CuSrcTask>::new(all_instances_configs[#index], resources)
1810 .map_err(|e| e.add_cause(#additional_error_info))?
1811 }
1812 },
1813 CuTaskType::Regular => {
1814 if *background {
1815 let threadpool_bundle_index = threadpool_bundle_index
1816 .expect("threadpool bundle missing for background tasks");
1817 quote! {
1818 {
1819 let inner_resources = <<#inner_task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
1820 resources,
1821 #mapping_ref,
1822 ).map_err(|e| e.add_cause(#additional_error_info))?;
1823 let threadpool_key = cu29::resource::ResourceKey::new(
1824 cu29::resource::BundleIndex::new(#threadpool_bundle_index),
1825 <cu29::resource::ThreadPoolBundle as cu29::resource::ResourceBundleDecl>::Id::BgThreads as usize,
1826 );
1827 let threadpool = resources.borrow_shared_arc(threadpool_key)?;
1828 let resources = cu29::cuasynctask::CuAsyncTaskResources {
1829 inner: inner_resources,
1830 threadpool,
1831 };
1832 <#task_type as CuTask>::new(all_instances_configs[#index], resources)
1833 .map_err(|e| e.add_cause(#additional_error_info))?
1834 }
1835 }
1836 } else {
1837 quote! {
1838 {
1839 let resources = <<#task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
1840 resources,
1841 #mapping_ref,
1842 ).map_err(|e| e.add_cause(#additional_error_info))?;
1843 <#task_type as CuTask>::new(all_instances_configs[#index], resources)
1844 .map_err(|e| e.add_cause(#additional_error_info))?
1845 }
1846 }
1847 }
1848 }
1849 CuTaskType::Sink => quote! {
1850 {
1851 let resources = <<#task_type as CuSinkTask>::Resources<'_> as ResourceBindings>::from_bindings(
1852 resources,
1853 #mapping_ref,
1854 ).map_err(|e| e.add_cause(#additional_error_info))?;
1855 <#task_type as CuSinkTask>::new(all_instances_configs[#index], resources)
1856 .map_err(|e| e.add_cause(#additional_error_info))?
1857 }
1858 },
1859 }
1860 })
1861 .collect::<Vec<_>>();
1862
1863 let (
1866 task_restore_code,
1867 task_start_calls,
1868 task_stop_calls,
1869 task_preprocess_calls,
1870 task_postprocess_calls,
1871 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>, Vec<_>) = itertools::multiunzip(
1872 (0..task_specs.task_types.len())
1873 .map(|index| {
1874 let task_index = int2sliceindex(index as u32);
1875 let task_tuple_index = syn::Index::from(index);
1876 let task_enum_name = config_id_to_enum(&task_specs.ids[index]);
1877 let enum_name = Ident::new(&task_enum_name, Span::call_site());
1878 (
1879 quote! {
1881 tasks.#task_tuple_index.thaw(&mut decoder).map_err(|e| CuError::from("Failed to thaw").add_cause(&e.to_string()))?
1882 },
1883 { let monitoring_action = quote! {
1885 let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#index), CuComponentState::Start, &error);
1886 match decision {
1887 Decision::Abort => {
1888 debug!("Start: ABORT decision from monitoring. Component '{}' errored out \
1889 during start. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
1890 return Ok(());
1891
1892 }
1893 Decision::Ignore => {
1894 debug!("Start: IGNORE decision from monitoring. Component '{}' errored out \
1895 during start. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
1896 }
1897 Decision::Shutdown => {
1898 debug!("Start: SHUTDOWN decision from monitoring. Component '{}' errored out \
1899 during start. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
1900 return Err(CuError::new_with_cause("Component errored out during start.", error));
1901 }
1902 }
1903 };
1904
1905 let call_sim_callback = if sim_mode {
1906 quote! {
1907 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Start));
1909
1910 let doit = if let SimOverride::Errored(reason) = ovr {
1911 let error: CuError = reason.into();
1912 #monitoring_action
1913 false
1914 }
1915 else {
1916 ovr == SimOverride::ExecuteByRuntime
1917 };
1918 }
1919 } else {
1920 quote! {
1921 let doit = true; }
1923 };
1924
1925
1926 quote! {
1927 #call_sim_callback
1928 if doit {
1929 self.copper_runtime.record_execution_marker(
1930 cu29::monitoring::ExecutionMarker {
1931 component_id: cu29::monitoring::ComponentId::new(#index),
1932 step: CuComponentState::Start,
1933 culistid: None,
1934 }
1935 );
1936 let task = &mut self.copper_runtime.tasks.#task_index;
1937 ctx.set_current_task(#index);
1938 if let Err(error) = task.start(&ctx) {
1939 #monitoring_action
1940 }
1941 }
1942 }
1943 },
1944 { let monitoring_action = quote! {
1946 let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#index), CuComponentState::Stop, &error);
1947 match decision {
1948 Decision::Abort => {
1949 debug!("Stop: ABORT decision from monitoring. Component '{}' errored out \
1950 during stop. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
1951 return Ok(());
1952
1953 }
1954 Decision::Ignore => {
1955 debug!("Stop: IGNORE decision from monitoring. Component '{}' errored out \
1956 during stop. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
1957 }
1958 Decision::Shutdown => {
1959 debug!("Stop: SHUTDOWN decision from monitoring. Component '{}' errored out \
1960 during stop. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
1961 return Err(CuError::new_with_cause("Component errored out during stop.", error));
1962 }
1963 }
1964 };
1965 let call_sim_callback = if sim_mode {
1966 quote! {
1967 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Stop));
1969
1970 let doit = if let SimOverride::Errored(reason) = ovr {
1971 let error: CuError = reason.into();
1972 #monitoring_action
1973 false
1974 }
1975 else {
1976 ovr == SimOverride::ExecuteByRuntime
1977 };
1978 }
1979 } else {
1980 quote! {
1981 let doit = true; }
1983 };
1984 quote! {
1985 #call_sim_callback
1986 if doit {
1987 self.copper_runtime.record_execution_marker(
1988 cu29::monitoring::ExecutionMarker {
1989 component_id: cu29::monitoring::ComponentId::new(#index),
1990 step: CuComponentState::Stop,
1991 culistid: None,
1992 }
1993 );
1994 let task = &mut self.copper_runtime.tasks.#task_index;
1995 ctx.set_current_task(#index);
1996 if let Err(error) = task.stop(&ctx) {
1997 #monitoring_action
1998 }
1999 }
2000 }
2001 },
2002 { let monitoring_action = quote! {
2004 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#index), CuComponentState::Preprocess, &error);
2005 match decision {
2006 Decision::Abort => {
2007 debug!("Preprocess: ABORT decision from monitoring. Component '{}' errored out \
2008 during preprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
2009 return Ok(());
2010
2011 }
2012 Decision::Ignore => {
2013 debug!("Preprocess: IGNORE decision from monitoring. Component '{}' errored out \
2014 during preprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
2015 }
2016 Decision::Shutdown => {
2017 debug!("Preprocess: SHUTDOWN decision from monitoring. Component '{}' errored out \
2018 during preprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
2019 return Err(CuError::new_with_cause("Component errored out during preprocess.", error));
2020 }
2021 }
2022 };
2023 let call_sim_callback = if sim_mode {
2024 quote! {
2025 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Preprocess));
2027
2028 let doit = if let SimOverride::Errored(reason) = ovr {
2029 let error: CuError = reason.into();
2030 #monitoring_action
2031 false
2032 } else {
2033 ovr == SimOverride::ExecuteByRuntime
2034 };
2035 }
2036 } else {
2037 quote! {
2038 let doit = true; }
2040 };
2041 quote! {
2042 #call_sim_callback
2043 if doit {
2044 execution_probe.record(cu29::monitoring::ExecutionMarker {
2045 component_id: cu29::monitoring::ComponentId::new(#index),
2046 step: CuComponentState::Preprocess,
2047 culistid: None,
2048 });
2049 ctx.set_current_task(#index);
2050 let maybe_error = {
2051 #rt_guard
2052 tasks.#task_index.preprocess(&ctx)
2053 };
2054 if let Err(error) = maybe_error {
2055 #monitoring_action
2056 }
2057 }
2058 }
2059 },
2060 { let monitoring_action = quote! {
2062 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#index), CuComponentState::Postprocess, &error);
2063 match decision {
2064 Decision::Abort => {
2065 debug!("Postprocess: ABORT decision from monitoring. Component '{}' errored out \
2066 during postprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
2067 return Ok(());
2068
2069 }
2070 Decision::Ignore => {
2071 debug!("Postprocess: IGNORE decision from monitoring. Component '{}' errored out \
2072 during postprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
2073 }
2074 Decision::Shutdown => {
2075 debug!("Postprocess: SHUTDOWN decision from monitoring. Component '{}' errored out \
2076 during postprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
2077 return Err(CuError::new_with_cause("Component errored out during postprocess.", error));
2078 }
2079 }
2080 };
2081 let call_sim_callback = if sim_mode {
2082 quote! {
2083 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Postprocess));
2085
2086 let doit = if let SimOverride::Errored(reason) = ovr {
2087 let error: CuError = reason.into();
2088 #monitoring_action
2089 false
2090 } else {
2091 ovr == SimOverride::ExecuteByRuntime
2092 };
2093 }
2094 } else {
2095 quote! {
2096 let doit = true; }
2098 };
2099 quote! {
2100 #call_sim_callback
2101 if doit {
2102 execution_probe.record(cu29::monitoring::ExecutionMarker {
2103 component_id: cu29::monitoring::ComponentId::new(#index),
2104 step: CuComponentState::Postprocess,
2105 culistid: None,
2106 });
2107 ctx.set_current_task(#index);
2108 let maybe_error = {
2109 #rt_guard
2110 tasks.#task_index.postprocess(&ctx)
2111 };
2112 if let Err(error) = maybe_error {
2113 #monitoring_action
2114 }
2115 }
2116 }
2117 }
2118 )
2119 })
2120 );
2121
2122 let bridge_start_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
2123 .iter()
2124 .map(|spec| {
2125 let bridge_index = int2sliceindex(spec.tuple_index as u32);
2126 let monitor_index = syn::Index::from(
2127 spec.monitor_index
2128 .expect("Bridge missing monitor index for start"),
2129 );
2130 let enum_ident = Ident::new(
2131 &config_id_to_enum(&format!("{}_bridge", spec.id)),
2132 Span::call_site(),
2133 );
2134 let call_sim = if sim_mode {
2135 quote! {
2136 let doit = {
2137 let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Start);
2138 let ovr = sim_callback(state);
2139 if let SimOverride::Errored(reason) = ovr {
2140 let error: CuError = reason.into();
2141 let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Start, &error);
2142 match decision {
2143 Decision::Abort => { debug!("Start: ABORT decision from monitoring. Component '{}' errored out during start. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Ok(()); }
2144 Decision::Ignore => { debug!("Start: IGNORE decision from monitoring. Component '{}' errored out during start. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); false }
2145 Decision::Shutdown => { debug!("Start: SHUTDOWN decision from monitoring. Component '{}' errored out during start. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Err(CuError::new_with_cause("Component errored out during start.", error)); }
2146 }
2147 } else {
2148 ovr == SimOverride::ExecuteByRuntime
2149 }
2150 };
2151 }
2152 } else {
2153 quote! { let doit = true; }
2154 };
2155 quote! {
2156 {
2157 #call_sim
2158 if !doit { return Ok(()); }
2159 self.copper_runtime.record_execution_marker(
2160 cu29::monitoring::ExecutionMarker {
2161 component_id: cu29::monitoring::ComponentId::new(#monitor_index),
2162 step: CuComponentState::Start,
2163 culistid: None,
2164 }
2165 );
2166 ctx.clear_current_task();
2167 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
2168 if let Err(error) = bridge.start(&ctx) {
2169 let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Start, &error);
2170 match decision {
2171 Decision::Abort => {
2172 debug!("Start: ABORT decision from monitoring. Component '{}' errored out during start. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2173 return Ok(());
2174 }
2175 Decision::Ignore => {
2176 debug!("Start: IGNORE decision from monitoring. Component '{}' errored out during start. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2177 }
2178 Decision::Shutdown => {
2179 debug!("Start: SHUTDOWN decision from monitoring. Component '{}' errored out during start. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2180 return Err(CuError::new_with_cause("Component errored out during start.", error));
2181 }
2182 }
2183 }
2184 }
2185 }
2186 })
2187 .collect();
2188
2189 let bridge_stop_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
2190 .iter()
2191 .map(|spec| {
2192 let bridge_index = int2sliceindex(spec.tuple_index as u32);
2193 let monitor_index = syn::Index::from(
2194 spec.monitor_index
2195 .expect("Bridge missing monitor index for stop"),
2196 );
2197 let enum_ident = Ident::new(
2198 &config_id_to_enum(&format!("{}_bridge", spec.id)),
2199 Span::call_site(),
2200 );
2201 let call_sim = if sim_mode {
2202 quote! {
2203 let doit = {
2204 let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Stop);
2205 let ovr = sim_callback(state);
2206 if let SimOverride::Errored(reason) = ovr {
2207 let error: CuError = reason.into();
2208 let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Stop, &error);
2209 match decision {
2210 Decision::Abort => { debug!("Stop: ABORT decision from monitoring. Component '{}' errored out during stop. Aborting all the other stops.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Ok(()); }
2211 Decision::Ignore => { debug!("Stop: IGNORE decision from monitoring. Component '{}' errored out during stop. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); false }
2212 Decision::Shutdown => { debug!("Stop: SHUTDOWN decision from monitoring. Component '{}' errored out during stop. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Err(CuError::new_with_cause("Component errored out during stop.", error)); }
2213 }
2214 } else {
2215 ovr == SimOverride::ExecuteByRuntime
2216 }
2217 };
2218 }
2219 } else {
2220 quote! { let doit = true; }
2221 };
2222 quote! {
2223 {
2224 #call_sim
2225 if !doit { return Ok(()); }
2226 self.copper_runtime.record_execution_marker(
2227 cu29::monitoring::ExecutionMarker {
2228 component_id: cu29::monitoring::ComponentId::new(#monitor_index),
2229 step: CuComponentState::Stop,
2230 culistid: None,
2231 }
2232 );
2233 ctx.clear_current_task();
2234 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
2235 if let Err(error) = bridge.stop(&ctx) {
2236 let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Stop, &error);
2237 match decision {
2238 Decision::Abort => {
2239 debug!("Stop: ABORT decision from monitoring. Component '{}' errored out during stop. Aborting all the other stops.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2240 return Ok(());
2241 }
2242 Decision::Ignore => {
2243 debug!("Stop: IGNORE decision from monitoring. Component '{}' errored out during stop. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2244 }
2245 Decision::Shutdown => {
2246 debug!("Stop: SHUTDOWN decision from monitoring. Component '{}' errored out during stop. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2247 return Err(CuError::new_with_cause("Component errored out during stop.", error));
2248 }
2249 }
2250 }
2251 }
2252 }
2253 })
2254 .collect();
2255
2256 let bridge_preprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
2257 .iter()
2258 .map(|spec| {
2259 let bridge_index = int2sliceindex(spec.tuple_index as u32);
2260 let monitor_index = syn::Index::from(
2261 spec.monitor_index
2262 .expect("Bridge missing monitor index for preprocess"),
2263 );
2264 let enum_ident = Ident::new(
2265 &config_id_to_enum(&format!("{}_bridge", spec.id)),
2266 Span::call_site(),
2267 );
2268 let call_sim = if sim_mode {
2269 quote! {
2270 let doit = {
2271 let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Preprocess);
2272 let ovr = sim_callback(state);
2273 if let SimOverride::Errored(reason) = ovr {
2274 let error: CuError = reason.into();
2275 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Preprocess, &error);
2276 match decision {
2277 Decision::Abort => { debug!("Preprocess: ABORT decision from monitoring. Component '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Ok(()); }
2278 Decision::Ignore => { debug!("Preprocess: IGNORE decision from monitoring. Component '{}' errored out during preprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); false }
2279 Decision::Shutdown => { debug!("Preprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Err(CuError::new_with_cause("Component errored out during preprocess.", error)); }
2280 }
2281 } else {
2282 ovr == SimOverride::ExecuteByRuntime
2283 }
2284 };
2285 }
2286 } else {
2287 quote! { let doit = true; }
2288 };
2289 quote! {
2290 {
2291 #call_sim
2292 if doit {
2293 ctx.clear_current_task();
2294 let bridge = &mut __cu_bridges.#bridge_index;
2295 execution_probe.record(cu29::monitoring::ExecutionMarker {
2296 component_id: cu29::monitoring::ComponentId::new(#monitor_index),
2297 step: CuComponentState::Preprocess,
2298 culistid: None,
2299 });
2300 let maybe_error = {
2301 #rt_guard
2302 bridge.preprocess(&ctx)
2303 };
2304 if let Err(error) = maybe_error {
2305 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Preprocess, &error);
2306 match decision {
2307 Decision::Abort => {
2308 debug!("Preprocess: ABORT decision from monitoring. Component '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2309 return Ok(());
2310 }
2311 Decision::Ignore => {
2312 debug!("Preprocess: IGNORE decision from monitoring. Component '{}' errored out during preprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2313 }
2314 Decision::Shutdown => {
2315 debug!("Preprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2316 return Err(CuError::new_with_cause("Component errored out during preprocess.", error));
2317 }
2318 }
2319 }
2320 }
2321 }
2322 }
2323 })
2324 .collect();
2325
2326 let bridge_postprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
2327 .iter()
2328 .map(|spec| {
2329 let bridge_index = int2sliceindex(spec.tuple_index as u32);
2330 let monitor_index = syn::Index::from(
2331 spec.monitor_index
2332 .expect("Bridge missing monitor index for postprocess"),
2333 );
2334 let enum_ident = Ident::new(
2335 &config_id_to_enum(&format!("{}_bridge", spec.id)),
2336 Span::call_site(),
2337 );
2338 let call_sim = if sim_mode {
2339 quote! {
2340 let doit = {
2341 let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Postprocess);
2342 let ovr = sim_callback(state);
2343 if let SimOverride::Errored(reason) = ovr {
2344 let error: CuError = reason.into();
2345 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Postprocess, &error);
2346 match decision {
2347 Decision::Abort => { debug!("Postprocess: ABORT decision from monitoring. Component '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Ok(()); }
2348 Decision::Ignore => { debug!("Postprocess: IGNORE decision from monitoring. Component '{}' errored out during postprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); false }
2349 Decision::Shutdown => { debug!("Postprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Err(CuError::new_with_cause("Component errored out during postprocess.", error)); }
2350 }
2351 } else {
2352 ovr == SimOverride::ExecuteByRuntime
2353 }
2354 };
2355 }
2356 } else {
2357 quote! { let doit = true; }
2358 };
2359 quote! {
2360 {
2361 #call_sim
2362 if doit {
2363 ctx.clear_current_task();
2364 let bridge = &mut __cu_bridges.#bridge_index;
2365 kf_manager.freeze_any(clid, bridge)?;
2366 execution_probe.record(cu29::monitoring::ExecutionMarker {
2367 component_id: cu29::monitoring::ComponentId::new(#monitor_index),
2368 step: CuComponentState::Postprocess,
2369 culistid: Some(clid),
2370 });
2371 let maybe_error = {
2372 #rt_guard
2373 bridge.postprocess(&ctx)
2374 };
2375 if let Err(error) = maybe_error {
2376 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Postprocess, &error);
2377 match decision {
2378 Decision::Abort => {
2379 debug!("Postprocess: ABORT decision from monitoring. Component '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2380 return Ok(());
2381 }
2382 Decision::Ignore => {
2383 debug!("Postprocess: IGNORE decision from monitoring. Component '{}' errored out during postprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2384 }
2385 Decision::Shutdown => {
2386 debug!("Postprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
2387 return Err(CuError::new_with_cause("Component errored out during postprocess.", error));
2388 }
2389 }
2390 }
2391 }
2392 }
2393 }
2394 })
2395 .collect();
2396
2397 let mut start_calls = bridge_start_calls;
2398 start_calls.extend(task_start_calls);
2399 let mut stop_calls = task_stop_calls;
2400 stop_calls.extend(bridge_stop_calls);
2401 let mut preprocess_calls = bridge_preprocess_calls;
2402 preprocess_calls.extend(task_preprocess_calls);
2403 let mut postprocess_calls = task_postprocess_calls;
2404 postprocess_calls.extend(bridge_postprocess_calls);
2405 let parallel_rt_run_supported = std && parallel_rt_enabled && !sim_mode;
2406
2407 let bridge_restore_code: Vec<proc_macro2::TokenStream> = culist_bridge_specs
2409 .iter()
2410 .enumerate()
2411 .map(|(index, _)| {
2412 let bridge_tuple_index = syn::Index::from(index);
2413 quote! {
2414 __cu_bridges.#bridge_tuple_index
2415 .thaw(&mut decoder)
2416 .map_err(|e| CuError::from("Failed to thaw bridge").add_cause(&e.to_string()))?
2417 }
2418 })
2419 .collect();
2420
2421 let output_pack_sizes = collect_output_pack_sizes(&culist_plan);
2422 let runtime_plan_code_and_logging: Vec<(
2423 proc_macro2::TokenStream,
2424 proc_macro2::TokenStream,
2425 )> = culist_plan
2426 .steps
2427 .iter()
2428 .map(|unit| match unit {
2429 CuExecutionUnit::Step(step) => {
2430 #[cfg(feature = "macro_debug")]
2431 eprintln!(
2432 "{} -> {} as {:?}. task_id: {} Input={:?}, Output={:?}",
2433 step.node.get_id(),
2434 step.node.get_type(),
2435 step.task_type,
2436 step.node_id,
2437 step.input_msg_indices_types,
2438 step.output_msg_pack
2439 );
2440
2441 match &culist_exec_entities[step.node_id as usize].kind {
2442 ExecutionEntityKind::Task { task_index } => generate_task_execution_tokens(
2443 step,
2444 *task_index,
2445 &task_specs,
2446 StepGenerationContext::new(
2447 &output_pack_sizes,
2448 sim_mode,
2449 &mission_mod,
2450 ParallelLifecyclePlacement::default(),
2451 false,
2452 ),
2453 TaskExecutionTokens::new(quote! {}, {
2454 let node_index = int2sliceindex(*task_index as u32);
2455 quote! { tasks.#node_index }
2456 }),
2457 ),
2458 ExecutionEntityKind::BridgeRx {
2459 bridge_index,
2460 channel_index,
2461 } => {
2462 let spec = &culist_bridge_specs[*bridge_index];
2463 generate_bridge_rx_execution_tokens(
2464 step,
2465 spec,
2466 *channel_index,
2467 StepGenerationContext::new(
2468 &output_pack_sizes,
2469 sim_mode,
2470 &mission_mod,
2471 ParallelLifecyclePlacement::default(),
2472 false,
2473 ),
2474 {
2475 let bridge_tuple_index =
2476 int2sliceindex(spec.tuple_index as u32);
2477 quote! { let bridge = &mut __cu_bridges.#bridge_tuple_index; }
2478 },
2479 )
2480 }
2481 ExecutionEntityKind::BridgeTx {
2482 bridge_index,
2483 channel_index,
2484 } => {
2485 let spec = &culist_bridge_specs[*bridge_index];
2486 generate_bridge_tx_execution_tokens(
2487 step,
2488 spec,
2489 *channel_index,
2490 StepGenerationContext::new(
2491 &output_pack_sizes,
2492 sim_mode,
2493 &mission_mod,
2494 ParallelLifecyclePlacement::default(),
2495 false,
2496 ),
2497 {
2498 let bridge_tuple_index =
2499 int2sliceindex(spec.tuple_index as u32);
2500 quote! { let bridge = &mut __cu_bridges.#bridge_tuple_index; }
2501 },
2502 )
2503 }
2504 }
2505 }
2506 CuExecutionUnit::Loop(_) => {
2507 panic!("Execution loops are not supported in runtime generation");
2508 }
2509 })
2510 .collect();
2511 let parallel_lifecycle_placements = if parallel_rt_run_supported {
2512 Some(build_parallel_lifecycle_placements(
2513 &culist_plan,
2514 &culist_exec_entities,
2515 ))
2516 } else {
2517 None
2518 };
2519 let runtime_plan_parallel_code_and_logging: Option<
2520 Vec<(proc_macro2::TokenStream, proc_macro2::TokenStream)>,
2521 > = if parallel_rt_run_supported {
2522 Some(
2523 culist_plan
2524 .steps
2525 .iter()
2526 .enumerate()
2527 .map(|(step_index, unit)| match unit {
2528 CuExecutionUnit::Step(step) => match &culist_exec_entities
2529 [step.node_id as usize]
2530 .kind
2531 {
2532 ExecutionEntityKind::Task { task_index } => {
2533 let task_index_ts = int2sliceindex(*task_index as u32);
2534 generate_task_execution_tokens(
2535 step,
2536 *task_index,
2537 &task_specs,
2538 StepGenerationContext::new(
2539 &output_pack_sizes,
2540 false,
2541 &mission_mod,
2542 parallel_lifecycle_placements
2543 .as_ref()
2544 .expect("parallel lifecycle placements missing")[step_index],
2545 true,
2546 ),
2547 TaskExecutionTokens::new(quote! {
2548 let _task_lock = step_rt.task_locks.#task_index_ts.lock().expect("parallel task lock poisoned");
2549 let task = unsafe { step_rt.task_ptrs.#task_index_ts.as_mut() };
2550 }, quote! { (*task) }),
2551 )
2552 }
2553 ExecutionEntityKind::BridgeRx {
2554 bridge_index,
2555 channel_index,
2556 } => {
2557 let spec = &culist_bridge_specs[*bridge_index];
2558 let bridge_index_ts = int2sliceindex(spec.tuple_index as u32);
2559 generate_bridge_rx_execution_tokens(
2560 step,
2561 spec,
2562 *channel_index,
2563 StepGenerationContext::new(
2564 &output_pack_sizes,
2565 false,
2566 &mission_mod,
2567 parallel_lifecycle_placements
2568 .as_ref()
2569 .expect("parallel lifecycle placements missing")
2570 [step_index],
2571 true,
2572 ),
2573 quote! {
2574 let _bridge_lock = step_rt.bridge_locks.#bridge_index_ts.lock().expect("parallel bridge lock poisoned");
2575 let bridge = unsafe { step_rt.bridge_ptrs.#bridge_index_ts.as_mut() };
2576 },
2577 )
2578 }
2579 ExecutionEntityKind::BridgeTx {
2580 bridge_index,
2581 channel_index,
2582 } => {
2583 let spec = &culist_bridge_specs[*bridge_index];
2584 let bridge_index_ts = int2sliceindex(spec.tuple_index as u32);
2585 generate_bridge_tx_execution_tokens(
2586 step,
2587 spec,
2588 *channel_index,
2589 StepGenerationContext::new(
2590 &output_pack_sizes,
2591 false,
2592 &mission_mod,
2593 parallel_lifecycle_placements
2594 .as_ref()
2595 .expect("parallel lifecycle placements missing")[step_index],
2596 true,
2597 ),
2598 quote! {
2599 let _bridge_lock = step_rt.bridge_locks.#bridge_index_ts.lock().expect("parallel bridge lock poisoned");
2600 let bridge = unsafe { step_rt.bridge_ptrs.#bridge_index_ts.as_mut() };
2601 },
2602 )
2603 }
2604 },
2605 CuExecutionUnit::Loop(_) => {
2606 panic!("Execution loops are not supported in runtime generation");
2607 }
2608 })
2609 .collect(),
2610 )
2611 } else {
2612 None
2613 };
2614
2615 let sim_support = if sim_mode {
2616 Some(gen_sim_support(
2617 &culist_plan,
2618 &culist_exec_entities,
2619 &culist_bridge_specs,
2620 ))
2621 } else {
2622 None
2623 };
2624
2625 let recorded_replay_support = if sim_mode {
2626 Some(gen_recorded_replay_support(
2627 &culist_plan,
2628 &culist_exec_entities,
2629 &culist_bridge_specs,
2630 ))
2631 } else {
2632 None
2633 };
2634
2635 let (run_one_iteration, start_all_tasks, stop_all_tasks, run) = if sim_mode {
2636 (
2637 quote! {
2638 fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
2639 },
2640 quote! {
2641 fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
2642 },
2643 quote! {
2644 fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
2645 },
2646 quote! {
2647 fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
2648 },
2649 )
2650 } else {
2651 (
2652 quote! {
2653 fn run_one_iteration(&mut self) -> CuResult<()>
2654 },
2655 quote! {
2656 fn start_all_tasks(&mut self) -> CuResult<()>
2657 },
2658 quote! {
2659 fn stop_all_tasks(&mut self) -> CuResult<()>
2660 },
2661 quote! {
2662 fn run(&mut self) -> CuResult<()>
2663 },
2664 )
2665 };
2666
2667 let sim_callback_arg = if sim_mode {
2668 Some(quote!(sim_callback))
2669 } else {
2670 None
2671 };
2672
2673 let app_trait = if sim_mode {
2674 quote!(CuSimApplication)
2675 } else {
2676 quote!(CuApplication)
2677 };
2678
2679 let sim_callback_on_new_calls = task_specs.ids.iter().enumerate().map(|(i, id)| {
2680 let enum_name = config_id_to_enum(id);
2681 let enum_ident = Ident::new(&enum_name, Span::call_site());
2682 quote! {
2683 sim_callback(SimStep::#enum_ident(CuTaskCallbackState::New(all_instances_configs[#i].cloned())));
2685 }
2686 });
2687
2688 let sim_callback_on_new_bridges = culist_bridge_specs.iter().map(|spec| {
2689 let enum_ident = Ident::new(
2690 &config_id_to_enum(&format!("{}_bridge", spec.id)),
2691 Span::call_site(),
2692 );
2693 let cfg_index = syn::Index::from(spec.config_index);
2694 quote! {
2695 sim_callback(SimStep::#enum_ident(
2696 cu29::simulation::CuBridgeLifecycleState::New(config.bridges[#cfg_index].config.clone())
2697 ));
2698 }
2699 });
2700
2701 let sim_callback_on_new = if sim_mode {
2702 Some(quote! {
2703 let graph = config.get_graph(Some(#mission)).expect("Could not find the mission #mission");
2704 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
2705 .get_all_nodes()
2706 .iter()
2707 .map(|(_, node)| node.get_instance_config())
2708 .collect();
2709 #(#sim_callback_on_new_calls)*
2710 #(#sim_callback_on_new_bridges)*
2711 })
2712 } else {
2713 None
2714 };
2715
2716 let (runtime_plan_code, preprocess_logging_calls): (Vec<_>, Vec<_>) =
2717 itertools::multiunzip(runtime_plan_code_and_logging);
2718 let process_step_tasks_type = if sim_mode {
2719 quote!(CuSimTasks)
2720 } else {
2721 quote!(CuTasks)
2722 };
2723 let (
2724 parallel_process_step_idents,
2725 parallel_process_step_fn_defs,
2726 parallel_stage_worker_spawns,
2727 ): (
2728 Vec<Ident>,
2729 Vec<proc_macro2::TokenStream>,
2730 Vec<proc_macro2::TokenStream>,
2731 ) = if let Some(runtime_plan_parallel_code_and_logging) =
2732 &runtime_plan_parallel_code_and_logging
2733 {
2734 let (runtime_plan_parallel_step_code, _): (Vec<_>, Vec<_>) =
2735 itertools::multiunzip(runtime_plan_parallel_code_and_logging.clone());
2736 let parallel_process_step_idents: Vec<Ident> = (0..runtime_plan_parallel_step_code
2737 .len())
2738 .map(|index| format_ident!("__cu_parallel_process_step_{index}"))
2739 .collect();
2740 let parallel_process_step_fn_defs: Vec<proc_macro2::TokenStream> =
2741 parallel_process_step_idents
2742 .iter()
2743 .zip(runtime_plan_parallel_step_code.iter())
2744 .map(|(step_ident, step_code)| {
2745 quote! {
2746 #[inline(always)]
2747 fn #step_ident(
2748 step_rt: &mut ParallelProcessStepRuntime<'_>,
2749 ) -> cu29::curuntime::ProcessStepResult {
2750 let clock = step_rt.clock;
2751 let execution_probe = step_rt.execution_probe;
2752 let monitor = step_rt.monitor;
2753 let kf_manager = ParallelKeyFrameAccessor::new(
2754 step_rt.kf_manager_ptr,
2755 step_rt.kf_lock,
2756 );
2757 let culist = &mut *step_rt.culist;
2758 let clid = step_rt.clid;
2759 let ctx = &mut step_rt.ctx;
2760 let msgs = &mut culist.msgs.0;
2761 #step_code
2762 }
2763 }
2764 })
2765 .collect();
2766 let parallel_stage_worker_spawns: Vec<proc_macro2::TokenStream> =
2767 parallel_process_step_idents
2768 .iter()
2769 .enumerate()
2770 .map(|(stage_index, step_ident)| {
2771 let stage_index_lit = syn::Index::from(stage_index);
2772 let receiver_ident =
2773 format_ident!("__cu_parallel_stage_rx_{stage_index}");
2774 quote! {
2775 {
2776 let mut #receiver_ident = stage_receivers
2777 .next()
2778 .expect("parallel stage receiver missing");
2779 let mut next_stage_tx = stage_senders.next();
2780 let done_tx = done_tx.clone();
2781 let shutdown = std::sync::Arc::clone(&shutdown);
2782 let clock = clock.clone();
2783 let instance_id = instance_id;
2784 let subsystem_code = subsystem_code;
2785 let execution_probe_ptr = execution_probe_ptr;
2786 let monitor_ptr = monitor_ptr;
2787 let task_ptrs = task_ptrs;
2788 let task_locks = std::sync::Arc::clone(&task_locks);
2789 let bridge_ptrs = bridge_ptrs;
2790 let bridge_locks = std::sync::Arc::clone(&bridge_locks);
2791 let kf_manager_ptr = kf_manager_ptr;
2792 let kf_lock = std::sync::Arc::clone(&kf_lock);
2793 scope.spawn(move || {
2794 loop {
2795 let job = match #receiver_ident.recv() {
2796 Ok(job) => job,
2797 Err(_) => break,
2798 };
2799 let clid = job.clid;
2800 let culist = job.culist;
2801
2802 let terminal_result = if shutdown.load(Ordering::Acquire) {
2803 #mission_mod::ParallelWorkerResult {
2804 clid,
2805 culist: Some(culist),
2806 outcome: Err(CuError::from(
2807 "Parallel runtime shutting down after an earlier stage failure",
2808 )),
2809 raw_payload_bytes: 0,
2810 handle_bytes: 0,
2811 }
2812 } else {
2813 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
2814 let execution_probe = unsafe { execution_probe_ptr.as_ref() };
2815 let monitor = unsafe { monitor_ptr.as_ref() };
2816 let mut culist = culist;
2817 let mut step_rt = #mission_mod::ParallelProcessStepRuntime {
2818 clock: &clock,
2819 execution_probe,
2820 monitor,
2821 task_ptrs: &task_ptrs,
2822 task_locks: task_locks.as_ref(),
2823 bridge_ptrs: &bridge_ptrs,
2824 bridge_locks: bridge_locks.as_ref(),
2825 kf_manager_ptr,
2826 kf_lock: kf_lock.as_ref(),
2827 culist: culist.as_mut(),
2828 clid,
2829 ctx: cu29::context::CuContext::from_runtime_metadata(
2830 clock.clone(),
2831 clid,
2832 instance_id,
2833 subsystem_code,
2834 #mission_mod::TASK_IDS,
2835 ),
2836 };
2837 let outcome = #step_ident(&mut step_rt);
2838 drop(step_rt);
2839 (culist, outcome)
2840 })) {
2841 Ok((culist, Ok(cu29::curuntime::ProcessStepOutcome::Continue))) => {
2842 if shutdown.load(Ordering::Acquire) {
2843 #mission_mod::ParallelWorkerResult {
2844 clid,
2845 culist: Some(culist),
2846 outcome: Err(CuError::from(
2847 "Parallel runtime shutting down after an earlier stage failure",
2848 )),
2849 raw_payload_bytes: 0,
2850 handle_bytes: 0,
2851 }
2852 } else if let Some(next_stage_tx) = next_stage_tx.as_mut() {
2853 let forwarded_job = #mission_mod::ParallelWorkerJob { clid, culist };
2854 match next_stage_tx.send(forwarded_job) {
2855 Ok(()) => continue,
2856 Err(send_error) => {
2857 let failed_job = send_error.0;
2858 shutdown.store(true, Ordering::Release);
2859 #mission_mod::ParallelWorkerResult {
2860 clid,
2861 culist: Some(failed_job.culist),
2862 outcome: Err(CuError::from(format!(
2863 "Parallel stage {} could not hand CopperList #{} to the next stage",
2864 #stage_index_lit,
2865 clid
2866 ))),
2867 raw_payload_bytes: 0,
2868 handle_bytes: 0,
2869 }
2870 }
2871 }
2872 } else {
2873 #mission_mod::ParallelWorkerResult {
2874 clid,
2875 culist: Some(culist),
2876 outcome: Ok(cu29::curuntime::ProcessStepOutcome::Continue),
2877 raw_payload_bytes: 0,
2878 handle_bytes: 0,
2879 }
2880 }
2881 }
2882 Ok((culist, Ok(cu29::curuntime::ProcessStepOutcome::AbortCopperList))) => {
2883 #mission_mod::ParallelWorkerResult {
2884 clid,
2885 culist: Some(culist),
2886 outcome: Ok(cu29::curuntime::ProcessStepOutcome::AbortCopperList),
2887 raw_payload_bytes: 0,
2888 handle_bytes: 0,
2889 }
2890 }
2891 Ok((culist, Err(error))) => {
2892 shutdown.store(true, Ordering::Release);
2893 #mission_mod::ParallelWorkerResult {
2894 clid,
2895 culist: Some(culist),
2896 outcome: Err(error),
2897 raw_payload_bytes: 0,
2898 handle_bytes: 0,
2899 }
2900 }
2901 Err(payload) => {
2902 shutdown.store(true, Ordering::Release);
2903 let panic_message =
2904 cu29::monitoring::panic_payload_to_string(payload.as_ref());
2905 #mission_mod::ParallelWorkerResult {
2906 clid,
2907 culist: None,
2908 outcome: Err(CuError::from(format!(
2909 "Panic while processing CopperList #{} in stage {}: {}",
2910 clid,
2911 #stage_index_lit,
2912 panic_message
2913 ))),
2914 raw_payload_bytes: 0,
2915 handle_bytes: 0,
2916 }
2917 }
2918 }
2919 };
2920
2921 if done_tx.send(terminal_result).is_err() {
2922 break;
2923 }
2924 }
2925 });
2926 }
2927 }
2928 })
2929 .collect();
2930 (
2931 parallel_process_step_idents,
2932 parallel_process_step_fn_defs,
2933 parallel_stage_worker_spawns,
2934 )
2935 } else {
2936 (Vec::new(), Vec::new(), Vec::new())
2937 };
2938 let parallel_process_stage_count_tokens =
2939 proc_macro2::Literal::usize_unsuffixed(parallel_process_step_idents.len());
2940 let parallel_task_ptrs_type = if task_types.is_empty() {
2941 quote! { () }
2942 } else {
2943 let elems = task_types
2944 .iter()
2945 .map(|ty| quote! { ParallelSharedPtr<#ty> });
2946 quote! { (#(#elems),*,) }
2947 };
2948 let parallel_task_locks_type = if task_types.is_empty() {
2949 quote! { () }
2950 } else {
2951 let elems = (0..task_types.len()).map(|_| quote! { std::sync::Mutex<()> });
2952 quote! { (#(#elems),*,) }
2953 };
2954 let parallel_task_ptr_values = if task_types.is_empty() {
2955 quote! { () }
2956 } else {
2957 let elems = (0..task_types.len()).map(|index| {
2958 let index = syn::Index::from(index);
2959 quote! { ParallelSharedPtr::new(&mut runtime.tasks.#index as *mut _) }
2960 });
2961 quote! { (#(#elems),*,) }
2962 };
2963 let parallel_task_lock_values = if task_types.is_empty() {
2964 quote! { () }
2965 } else {
2966 let elems = (0..task_types.len()).map(|_| quote! { std::sync::Mutex::new(()) });
2967 quote! { (#(#elems),*,) }
2968 };
2969 let parallel_bridge_ptrs_type = if bridge_runtime_types.is_empty() {
2970 quote! { () }
2971 } else {
2972 let elems = bridge_runtime_types
2973 .iter()
2974 .map(|ty| quote! { ParallelSharedPtr<#ty> });
2975 quote! { (#(#elems),*,) }
2976 };
2977 let parallel_bridge_locks_type = if bridge_runtime_types.is_empty() {
2978 quote! { () }
2979 } else {
2980 let elems = (0..bridge_runtime_types.len()).map(|_| quote! { std::sync::Mutex<()> });
2981 quote! { (#(#elems),*,) }
2982 };
2983 let parallel_bridge_ptr_values = if bridge_runtime_types.is_empty() {
2984 quote! { () }
2985 } else {
2986 let elems = (0..bridge_runtime_types.len()).map(|index| {
2987 let index = syn::Index::from(index);
2988 quote! { ParallelSharedPtr::new(&mut runtime.bridges.#index as *mut _) }
2989 });
2990 quote! { (#(#elems),*,) }
2991 };
2992 let parallel_bridge_lock_values = if bridge_runtime_types.is_empty() {
2993 quote! { () }
2994 } else {
2995 let elems =
2996 (0..bridge_runtime_types.len()).map(|_| quote! { std::sync::Mutex::new(()) });
2997 quote! { (#(#elems),*,) }
2998 };
2999 let parallel_rt_support_tokens = if parallel_rt_run_supported {
3000 quote! {
3001 type ParallelTaskPtrs = #parallel_task_ptrs_type;
3002 type ParallelTaskLocks = #parallel_task_locks_type;
3003 type ParallelBridgePtrs = #parallel_bridge_ptrs_type;
3004 type ParallelBridgeLocks = #parallel_bridge_locks_type;
3005
3006 struct ParallelSharedPtr<T>(*mut T);
3007
3008 impl<T> Clone for ParallelSharedPtr<T> {
3009 #[inline(always)]
3010 fn clone(&self) -> Self {
3011 *self
3012 }
3013 }
3014
3015 impl<T> Copy for ParallelSharedPtr<T> {}
3016
3017 impl<T> ParallelSharedPtr<T> {
3018 #[inline(always)]
3019 const fn new(ptr: *mut T) -> Self {
3020 Self(ptr)
3021 }
3022
3023 #[inline(always)]
3024 const fn from_ref(ptr: *const T) -> Self {
3025 Self(ptr as *mut T)
3026 }
3027
3028 #[inline(always)]
3029 unsafe fn as_mut<'a>(self) -> &'a mut T {
3030 unsafe { &mut *self.0 }
3031 }
3032
3033 #[inline(always)]
3034 unsafe fn as_ref<'a>(self) -> &'a T {
3035 unsafe { &*self.0 }
3036 }
3037 }
3038
3039 unsafe impl<T: Send> Send for ParallelSharedPtr<T> {}
3040 unsafe impl<T: Send> Sync for ParallelSharedPtr<T> {}
3041
3042 struct ParallelKeyFrameAccessor<'a> {
3043 ptr: ParallelSharedPtr<cu29::curuntime::KeyFramesManager>,
3044 lock: &'a std::sync::Mutex<()>,
3045 }
3046
3047 impl<'a> ParallelKeyFrameAccessor<'a> {
3048 #[inline(always)]
3049 fn new(
3050 ptr: ParallelSharedPtr<cu29::curuntime::KeyFramesManager>,
3051 lock: &'a std::sync::Mutex<()>,
3052 ) -> Self {
3053 Self { ptr, lock }
3054 }
3055
3056 #[inline(always)]
3057 fn freeze_task(
3058 &self,
3059 culistid: u64,
3060 task: &impl cu29::cutask::Freezable,
3061 ) -> CuResult<usize> {
3062 let _guard = self.lock.lock().expect("parallel keyframe lock poisoned");
3063 let manager = unsafe { self.ptr.as_mut() };
3064 manager.freeze_task(culistid, task)
3065 }
3066
3067 #[inline(always)]
3068 fn freeze_any(
3069 &self,
3070 culistid: u64,
3071 item: &impl cu29::cutask::Freezable,
3072 ) -> CuResult<usize> {
3073 let _guard = self.lock.lock().expect("parallel keyframe lock poisoned");
3074 let manager = unsafe { self.ptr.as_mut() };
3075 manager.freeze_any(culistid, item)
3076 }
3077 }
3078
3079 struct ParallelProcessStepRuntime<'a> {
3080 clock: &'a RobotClock,
3081 execution_probe: &'a cu29::monitoring::RuntimeExecutionProbe,
3082 monitor: &'a #monitor_type,
3083 task_ptrs: &'a ParallelTaskPtrs,
3084 task_locks: &'a ParallelTaskLocks,
3085 bridge_ptrs: &'a ParallelBridgePtrs,
3086 bridge_locks: &'a ParallelBridgeLocks,
3087 kf_manager_ptr: ParallelSharedPtr<cu29::curuntime::KeyFramesManager>,
3088 kf_lock: &'a std::sync::Mutex<()>,
3089 culist: &'a mut CuList,
3090 clid: u64,
3091 ctx: cu29::context::CuContext,
3092 }
3093
3094 struct ParallelWorkerJob {
3095 clid: u64,
3096 culist: Box<CuList>,
3097 }
3098
3099 struct ParallelWorkerResult {
3100 clid: u64,
3101 culist: Option<Box<CuList>>,
3102 outcome: cu29::curuntime::ProcessStepResult,
3103 raw_payload_bytes: u64,
3104 handle_bytes: u64,
3105 }
3106
3107 #[inline(always)]
3108 fn assert_parallel_rt_send_bounds()
3109 where
3110 CuList: Send,
3111 #process_step_tasks_type: Send,
3112 CuBridges: Send,
3113 #monitor_type: Sync,
3114 {
3115 }
3116
3117 #(#parallel_process_step_fn_defs)*
3118 }
3119 } else {
3120 quote! {}
3121 };
3122
3123 let config_load_stmt =
3124 build_config_load_stmt(std, application_name, subsystem_id.as_deref());
3125
3126 let copperlist_count_check = quote! {
3127 let configured_copperlist_count = config
3128 .logging
3129 .as_ref()
3130 .and_then(|logging| logging.copperlist_count)
3131 .unwrap_or(#copperlist_count_tokens);
3132 if configured_copperlist_count != #copperlist_count_tokens {
3133 return Err(CuError::from(format!(
3134 "Configured logging.copperlist_count ({configured_copperlist_count}) does not match the runtime compiled into this binary ({})",
3135 #copperlist_count_tokens
3136 )));
3137 }
3138 };
3139
3140 let prepare_config_sig = if std {
3141 quote! {
3142 fn prepare_config(
3143 instance_id: u32,
3144 config_override: Option<CuConfig>,
3145 ) -> CuResult<(CuConfig, RuntimeLifecycleConfigSource)>
3146 }
3147 } else {
3148 quote! {
3149 fn prepare_config() -> CuResult<(CuConfig, RuntimeLifecycleConfigSource)>
3150 }
3151 };
3152
3153 let prepare_config_call = if std {
3154 quote! { Self::prepare_config(instance_id, config_override)? }
3155 } else {
3156 quote! { Self::prepare_config()? }
3157 };
3158
3159 let prepare_resources_sig = if std {
3160 quote! {
3161 pub fn prepare_resources_for_instance(
3162 instance_id: u32,
3163 config_override: Option<CuConfig>,
3164 ) -> CuResult<AppResources>
3165 }
3166 } else {
3167 quote! {
3168 pub fn prepare_resources() -> CuResult<AppResources>
3169 }
3170 };
3171
3172 let prepare_resources_compat_fn = if std {
3173 Some(quote! {
3174 pub fn prepare_resources(
3175 config_override: Option<CuConfig>,
3176 ) -> CuResult<AppResources> {
3177 Self::prepare_resources_for_instance(0, config_override)
3178 }
3179 })
3180 } else {
3181 None
3182 };
3183
3184 let init_resources_compat_fn = if std {
3185 Some(quote! {
3186 pub fn init_resources_for_instance(
3187 instance_id: u32,
3188 config_override: Option<CuConfig>,
3189 ) -> CuResult<AppResources> {
3190 Self::prepare_resources_for_instance(instance_id, config_override)
3191 }
3192
3193 pub fn init_resources(
3194 config_override: Option<CuConfig>,
3195 ) -> CuResult<AppResources> {
3196 Self::prepare_resources(config_override)
3197 }
3198 })
3199 } else {
3200 Some(quote! {
3201 pub fn init_resources() -> CuResult<AppResources> {
3202 Self::prepare_resources()
3203 }
3204 })
3205 };
3206
3207 let build_with_resources_sig = if sim_mode {
3208 quote! {
3209 fn build_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
3210 clock: RobotClock,
3211 unified_logger: Arc<Mutex<L>>,
3212 app_resources: AppResources,
3213 instance_id: u32,
3214 sim_callback: &mut impl FnMut(SimStep) -> SimOverride,
3215 ) -> CuResult<Self>
3216 }
3217 } else {
3218 quote! {
3219 fn build_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
3220 clock: RobotClock,
3221 unified_logger: Arc<Mutex<L>>,
3222 app_resources: AppResources,
3223 instance_id: u32,
3224 ) -> CuResult<Self>
3225 }
3226 };
3227 let parallel_rt_metadata_arg = if std && parallel_rt_enabled {
3228 Some(quote! {
3229 &#mission_mod::PARALLEL_RT_METADATA,
3230 })
3231 } else {
3232 None
3233 };
3234
3235 let kill_handler = if std && signal_handler {
3236 Some(quote! {
3237 ctrlc::set_handler(move || {
3238 STOP_FLAG.store(true, Ordering::SeqCst);
3239 }).expect("Error setting Ctrl-C handler");
3240 })
3241 } else {
3242 None
3243 };
3244
3245 let run_loop = if std {
3246 quote! {{
3247 let mut rate_limiter = self
3248 .copper_runtime
3249 .runtime_config
3250 .rate_target_hz
3251 .map(|rate| cu29::curuntime::LoopRateLimiter::from_rate_target_hz(
3252 rate,
3253 &self.copper_runtime.clock,
3254 ))
3255 .transpose()?;
3256 loop {
3257 let result = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(
3258 || <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg)
3259 )) {
3260 Ok(result) => result,
3261 Err(payload) => {
3262 let panic_message = cu29::monitoring::panic_payload_to_string(payload.as_ref());
3263 self.copper_runtime.monitor.process_panic(&panic_message);
3264 let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::Panic {
3265 message: panic_message.clone(),
3266 file: None,
3267 line: None,
3268 column: None,
3269 });
3270 Err(CuError::from(format!(
3271 "Panic while running one iteration: {}",
3272 panic_message
3273 )))
3274 }
3275 };
3276
3277 if let Some(rate_limiter) = rate_limiter.as_mut() {
3278 rate_limiter.limit(&self.copper_runtime.clock);
3279 }
3280
3281 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
3282 break result;
3283 }
3284 }
3285 }}
3286 } else {
3287 quote! {{
3288 let mut rate_limiter = self
3289 .copper_runtime
3290 .runtime_config
3291 .rate_target_hz
3292 .map(|rate| cu29::curuntime::LoopRateLimiter::from_rate_target_hz(
3293 rate,
3294 &self.copper_runtime.clock,
3295 ))
3296 .transpose()?;
3297 loop {
3298 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
3299 if let Some(rate_limiter) = rate_limiter.as_mut() {
3300 rate_limiter.limit(&self.copper_runtime.clock);
3301 }
3302
3303 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
3304 break result;
3305 }
3306 }
3307 }}
3308 };
3309
3310 #[cfg(feature = "macro_debug")]
3311 eprintln!("[build the run methods]");
3312 let run_body: proc_macro2::TokenStream = if parallel_rt_run_supported {
3313 quote! {
3314 static STOP_FLAG: AtomicBool = AtomicBool::new(false);
3315
3316 #kill_handler
3317
3318 <Self as #app_trait<S, L>>::start_all_tasks(self)?;
3319 let result = std::thread::scope(|scope| -> CuResult<()> {
3320 #mission_mod::assert_parallel_rt_send_bounds();
3321
3322 let runtime = &mut self.copper_runtime;
3323 let clock = &runtime.clock;
3324 let instance_id = runtime.instance_id();
3325 let subsystem_code = runtime.subsystem_code();
3326 let execution_probe = runtime.execution_probe.as_ref();
3327 let monitor = &runtime.monitor;
3328 let cl_manager = &mut runtime.copperlists_manager;
3329 let parallel_rt = &runtime.parallel_rt;
3330 let execution_probe_ptr =
3331 #mission_mod::ParallelSharedPtr::from_ref(execution_probe as *const _);
3332 let monitor_ptr =
3333 #mission_mod::ParallelSharedPtr::from_ref(monitor as *const _);
3334 let task_ptrs: #mission_mod::ParallelTaskPtrs = #parallel_task_ptr_values;
3335 let task_locks = std::sync::Arc::new(#parallel_task_lock_values);
3336 let bridge_ptrs: #mission_mod::ParallelBridgePtrs = #parallel_bridge_ptr_values;
3337 let bridge_locks = std::sync::Arc::new(#parallel_bridge_lock_values);
3338 let kf_manager_ptr =
3339 #mission_mod::ParallelSharedPtr::new(&mut runtime.keyframes_manager as *mut _);
3340 let kf_lock = std::sync::Arc::new(std::sync::Mutex::new(()));
3341 let mut free_copperlists =
3342 cu29::curuntime::allocate_boxed_copperlists::<CuStampedDataSet, #copperlist_count_tokens>();
3343 let start_clid = cl_manager.next_cl_id();
3344 parallel_rt.reset_cursors(start_clid);
3345
3346 let stage_count = #parallel_process_stage_count_tokens;
3347 debug_assert_eq!(parallel_rt.metadata().process_stage_count(), stage_count);
3348 if stage_count == 0 {
3349 return Err(CuError::from(
3350 "Parallel runtime requires at least one generated process stage",
3351 ));
3352 }
3353
3354 let queue_capacity = parallel_rt.in_flight_limit().max(1);
3355 let mut stage_senders = Vec::with_capacity(stage_count);
3356 let mut stage_receivers = Vec::with_capacity(stage_count);
3357 for _stage_index in 0..stage_count {
3358 let (stage_tx, stage_rx) =
3359 cu29::parallel_queue::stage_queue::<#mission_mod::ParallelWorkerJob>(
3360 queue_capacity,
3361 );
3362 stage_senders.push(stage_tx);
3363 stage_receivers.push(stage_rx);
3364 }
3365 let (done_tx, done_rx) =
3366 std::sync::mpsc::channel::<#mission_mod::ParallelWorkerResult>();
3367 let shutdown = std::sync::Arc::new(AtomicBool::new(false));
3368 let mut stage_senders = stage_senders.into_iter();
3369 let mut entry_stage_tx = stage_senders
3370 .next()
3371 .expect("parallel stage pipeline has no entry queue");
3372 let mut stage_receivers = stage_receivers.into_iter();
3373 #(#parallel_stage_worker_spawns)*
3374 drop(done_tx);
3375
3376 let mut dispatch_limiter = runtime
3377 .runtime_config
3378 .rate_target_hz
3379 .map(|rate| cu29::curuntime::LoopRateLimiter::from_rate_target_hz(rate, clock))
3380 .transpose()?;
3381 let mut in_flight = 0usize;
3382 let mut stop_launching = false;
3383 let mut next_launch_clid = start_clid;
3384 let mut next_commit_clid = start_clid;
3385 let mut pending_results =
3386 std::collections::BTreeMap::<u64, #mission_mod::ParallelWorkerResult>::new();
3387 let mut active_keyframe_clid: Option<u64> = None;
3388 let mut fatal_error: Option<CuError> = None;
3389
3390 loop {
3391 while let Some(recycled_culist) = cl_manager.try_reclaim_boxed()? {
3392 free_copperlists.push(recycled_culist);
3393 }
3394
3395 if !stop_launching && fatal_error.is_none() {
3396 let next_clid = next_launch_clid;
3397 let rate_ready = dispatch_limiter
3398 .as_ref()
3399 .map(|limiter| limiter.is_ready(clock))
3400 .unwrap_or(true);
3401 let keyframe_ready = {
3402 let _keyframe_lock = kf_lock.lock().expect("parallel keyframe lock poisoned");
3403 let kf_manager = unsafe { kf_manager_ptr.as_mut() };
3404 active_keyframe_clid.is_none() || !kf_manager.captures_keyframe(next_clid)
3405 };
3406
3407 if in_flight < parallel_rt.in_flight_limit()
3408 && rate_ready
3409 && keyframe_ready
3410 && !free_copperlists.is_empty()
3411 {
3412 let should_launch = true;
3415
3416 if should_launch {
3417 let mut culist = free_copperlists
3418 .pop()
3419 .expect("parallel CopperList pool unexpectedly empty");
3420 let clid = next_clid;
3421 culist.id = clid;
3422 culist.change_state(cu29::copperlist::CopperListState::Initialized);
3423 {
3424 let _keyframe_lock =
3425 kf_lock.lock().expect("parallel keyframe lock poisoned");
3426 let kf_manager = unsafe { kf_manager_ptr.as_mut() };
3427 kf_manager.reset(clid, clock);
3428 if kf_manager.captures_keyframe(clid) {
3429 active_keyframe_clid = Some(clid);
3430 }
3431 }
3432 culist.change_state(cu29::copperlist::CopperListState::Processing);
3433 culist.msgs.init_zeroed();
3434 entry_stage_tx
3435 .send(#mission_mod::ParallelWorkerJob {
3436 clid,
3437 culist,
3438 })
3439 .map_err(|e| {
3440 shutdown.store(true, Ordering::Release);
3441 CuError::from("Failed to enqueue CopperList for parallel stage processing")
3442 .add_cause(e.to_string().as_str())
3443 })?;
3444 next_launch_clid += 1;
3445 in_flight += 1;
3446 if let Some(limiter) = dispatch_limiter.as_mut() {
3447 limiter.mark_tick(clock);
3448 }
3449 }
3450
3451 if STOP_FLAG.load(Ordering::SeqCst) {
3452 stop_launching = true;
3453 }
3454 continue;
3455 }
3456 }
3457
3458 if in_flight == 0 {
3459 if stop_launching || fatal_error.is_some() {
3460 break;
3461 }
3462
3463 if free_copperlists.is_empty() {
3464 free_copperlists.push(cl_manager.wait_reclaim_boxed()?);
3465 continue;
3466 }
3467
3468 if let Some(limiter) = dispatch_limiter.as_ref()
3469 && !limiter.is_ready(clock)
3470 {
3471 limiter.wait_until_ready(clock);
3472 continue;
3473 }
3474 }
3475
3476 let recv_result = if !stop_launching && fatal_error.is_none() {
3477 if let Some(limiter) = dispatch_limiter.as_ref() {
3478 if let Some(remaining) = limiter.remaining(clock)
3479 && in_flight > 0
3480 {
3481 done_rx.recv_timeout(std::time::Duration::from(remaining))
3482 } else {
3483 done_rx
3484 .recv()
3485 .map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected)
3486 }
3487 } else {
3488 done_rx
3489 .recv()
3490 .map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected)
3491 }
3492 } else {
3493 done_rx
3494 .recv()
3495 .map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected)
3496 };
3497
3498 let worker_result = match recv_result {
3499 Ok(worker_result) => worker_result,
3500 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
3501 if STOP_FLAG.load(Ordering::SeqCst) {
3502 stop_launching = true;
3503 }
3504 continue;
3505 }
3506 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
3507 shutdown.store(true, Ordering::Release);
3508 return Err(CuError::from(
3509 "Parallel stage worker disconnected unexpectedly",
3510 ));
3511 }
3512 };
3513 in_flight = in_flight.saturating_sub(1);
3514 pending_results.insert(worker_result.clid, worker_result);
3515
3516 while let Some(worker_result) = pending_results.remove(&next_commit_clid) {
3517 if fatal_error.is_none()
3518 && parallel_rt.current_commit_clid() != worker_result.clid
3519 {
3520 shutdown.store(true, Ordering::Release);
3521 fatal_error = Some(CuError::from(format!(
3522 "Parallel commit checkpoint out of sync: expected {}, got {}",
3523 parallel_rt.current_commit_clid(),
3524 worker_result.clid
3525 )));
3526 stop_launching = true;
3527 }
3528
3529 let mut worker_result = worker_result;
3530 if fatal_error.is_none() {
3531 match worker_result.outcome {
3532 Ok(cu29::curuntime::ProcessStepOutcome::AbortCopperList) => {
3533 let mut culist = worker_result
3534 .culist
3535 .take()
3536 .expect("parallel abort result missing CopperList ownership");
3537 let mut commit_ctx = cu29::context::CuContext::from_runtime_metadata(
3538 clock.clone(),
3539 worker_result.clid,
3540 instance_id,
3541 subsystem_code,
3542 #mission_mod::TASK_IDS,
3543 );
3544 commit_ctx.clear_current_task();
3545 let monitor_result = monitor.process_copperlist(
3546 &commit_ctx,
3547 #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)),
3548 );
3549 match cl_manager.end_of_processing_boxed(culist)? {
3550 cu29::curuntime::OwnedCopperListSubmission::Recycled(culist) => {
3551 free_copperlists.push(culist);
3552 }
3553 cu29::curuntime::OwnedCopperListSubmission::Pending => {}
3554 }
3555 monitor_result?;
3556 }
3557 Ok(cu29::curuntime::ProcessStepOutcome::Continue) => {
3558 let mut culist = worker_result
3559 .culist
3560 .take()
3561 .expect("parallel worker result missing CopperList ownership");
3562 let mut commit_ctx = cu29::context::CuContext::from_runtime_metadata(
3563 clock.clone(),
3564 worker_result.clid,
3565 instance_id,
3566 subsystem_code,
3567 #mission_mod::TASK_IDS,
3568 );
3569 commit_ctx.clear_current_task();
3570 let monitor_result = monitor.process_copperlist(
3571 &commit_ctx,
3572 #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)),
3573 );
3574
3575 #(#preprocess_logging_calls)*
3576
3577 match cl_manager.end_of_processing_boxed(culist)? {
3578 cu29::curuntime::OwnedCopperListSubmission::Recycled(culist) => {
3579 free_copperlists.push(culist);
3580 }
3581 cu29::curuntime::OwnedCopperListSubmission::Pending => {}
3582 }
3583 let keyframe_bytes = {
3584 let _keyframe_lock =
3585 kf_lock.lock().expect("parallel keyframe lock poisoned");
3586 let kf_manager = unsafe { kf_manager_ptr.as_mut() };
3587 kf_manager.end_of_processing(worker_result.clid)?;
3588 kf_manager.last_encoded_bytes
3589 };
3590 monitor_result?;
3591 let stats = cu29::monitoring::CopperListIoStats {
3592 raw_culist_bytes: core::mem::size_of::<CuList>() as u64
3593 + cl_manager.last_handle_bytes,
3594 handle_bytes: cl_manager.last_handle_bytes,
3595 encoded_culist_bytes: cl_manager.last_encoded_bytes,
3596 keyframe_bytes,
3597 structured_log_bytes_total: ::cu29::prelude::structured_log_bytes_total(),
3598 culistid: worker_result.clid,
3599 };
3600 monitor.observe_copperlist_io(stats);
3601
3602 }
3605 Err(error) => {
3606 shutdown.store(true, Ordering::Release);
3607 stop_launching = true;
3608 fatal_error = Some(error);
3609 if let Some(mut culist) = worker_result.culist.take() {
3610 culist.change_state(cu29::copperlist::CopperListState::Free);
3611 free_copperlists.push(culist);
3612 }
3613 }
3614 }
3615 } else if let Some(mut culist) = worker_result.culist.take() {
3616 culist.change_state(cu29::copperlist::CopperListState::Free);
3617 free_copperlists.push(culist);
3618 }
3619
3620 if active_keyframe_clid == Some(worker_result.clid) {
3621 active_keyframe_clid = None;
3622 }
3623 parallel_rt.release_commit(worker_result.clid + 1);
3624 next_commit_clid += 1;
3625 }
3626
3627 if STOP_FLAG.load(Ordering::SeqCst) {
3628 stop_launching = true;
3629 }
3630 }
3631
3632 drop(entry_stage_tx);
3633 free_copperlists.extend(cl_manager.finish_pending_boxed()?);
3634 if let Some(error) = fatal_error {
3635 Err(error)
3636 } else {
3637 Ok(())
3638 }
3639 });
3640
3641 if result.is_err() {
3642 error!("A task errored out: {}", &result);
3643 }
3644 <Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
3645 let _ = self.log_shutdown_completed();
3646 result
3647 }
3648 } else {
3649 quote! {
3650 static STOP_FLAG: AtomicBool = AtomicBool::new(false);
3651
3652 #kill_handler
3653
3654 <Self as #app_trait<S, L>>::start_all_tasks(self, #sim_callback_arg)?;
3655 let result = #run_loop;
3656
3657 if result.is_err() {
3658 error!("A task errored out: {}", &result);
3659 }
3660 <Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
3661 let _ = self.log_shutdown_completed();
3662 result
3663 }
3664 };
3665 let run_methods: proc_macro2::TokenStream = quote! {
3666
3667 #run_one_iteration {
3668
3669 let runtime = &mut self.copper_runtime;
3671 let clock = &runtime.clock;
3672 let instance_id = runtime.instance_id();
3673 let subsystem_code = runtime.subsystem_code();
3674 let execution_probe = &runtime.execution_probe;
3675 let monitor = &mut runtime.monitor;
3676 let tasks = &mut runtime.tasks;
3677 let __cu_bridges = &mut runtime.bridges;
3678 let cl_manager = &mut runtime.copperlists_manager;
3679 let kf_manager = &mut runtime.keyframes_manager;
3680 let iteration_clid = cl_manager.next_cl_id();
3681 let mut ctx = cu29::context::CuContext::from_runtime_metadata(
3682 clock.clone(),
3683 iteration_clid,
3684 instance_id,
3685 subsystem_code,
3686 #mission_mod::TASK_IDS,
3687 );
3688 let mut __cu_abort_copperlist = false;
3689
3690 #(#preprocess_calls)*
3692
3693 let culist = cl_manager.create()?;
3694 let clid = culist.id;
3695 debug_assert_eq!(clid, iteration_clid);
3696 kf_manager.reset(clid, clock); culist.change_state(cu29::copperlist::CopperListState::Processing);
3698 culist.msgs.init_zeroed();
3699 let mut ctx = cu29::context::CuContext::from_runtime_metadata(
3700 clock.clone(),
3701 iteration_clid,
3702 instance_id,
3703 subsystem_code,
3704 #mission_mod::TASK_IDS,
3705 );
3706 {
3707 let msgs = &mut culist.msgs.0;
3708 '__cu_process_steps: {
3709 #(#runtime_plan_code)*
3710 }
3711 } if __cu_abort_copperlist {
3713 ctx.clear_current_task();
3714 let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
3715 cl_manager.end_of_processing(clid)?;
3716 monitor_result?;
3717 return Ok(());
3718 }
3719 ctx.clear_current_task();
3720 let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
3721
3722 #(#preprocess_logging_calls)*
3724
3725 cl_manager.end_of_processing(clid)?;
3726 kf_manager.end_of_processing(clid)?;
3727 monitor_result?;
3728 let stats = cu29::monitoring::CopperListIoStats {
3729 raw_culist_bytes: core::mem::size_of::<CuList>() as u64 + cl_manager.last_handle_bytes,
3730 handle_bytes: cl_manager.last_handle_bytes,
3731 encoded_culist_bytes: cl_manager.last_encoded_bytes,
3732 keyframe_bytes: kf_manager.last_encoded_bytes,
3733 structured_log_bytes_total: ::cu29::prelude::structured_log_bytes_total(),
3734 culistid: clid,
3735 };
3736 monitor.observe_copperlist_io(stats);
3737
3738 #(#postprocess_calls)*
3740 Ok(())
3741 }
3742
3743 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
3744 let runtime = &mut self.copper_runtime;
3745 let clock = &runtime.clock;
3746 let tasks = &mut runtime.tasks;
3747 let __cu_bridges = &mut runtime.bridges;
3748 let config = cu29::bincode::config::standard();
3749 let reader = cu29::bincode::de::read::SliceReader::new(&keyframe.serialized_tasks);
3750 let mut decoder = DecoderImpl::new(reader, config, ());
3751 #(#task_restore_code);*;
3752 #(#bridge_restore_code);*;
3753 Ok(())
3754 }
3755
3756 #start_all_tasks {
3757 let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::MissionStarted {
3758 mission: #mission.to_string(),
3759 });
3760 let lifecycle_clid = self.copper_runtime.copperlists_manager.last_cl_id();
3761 let mut ctx = cu29::context::CuContext::from_runtime_metadata(
3762 self.copper_runtime.clock.clone(),
3763 lifecycle_clid,
3764 self.copper_runtime.instance_id(),
3765 self.copper_runtime.subsystem_code(),
3766 #mission_mod::TASK_IDS,
3767 );
3768 #(#start_calls)*
3769 ctx.clear_current_task();
3770 self.copper_runtime.monitor.start(&ctx)?;
3771 Ok(())
3772 }
3773
3774 #stop_all_tasks {
3775 let lifecycle_clid = self.copper_runtime.copperlists_manager.last_cl_id();
3776 let mut ctx = cu29::context::CuContext::from_runtime_metadata(
3777 self.copper_runtime.clock.clone(),
3778 lifecycle_clid,
3779 self.copper_runtime.instance_id(),
3780 self.copper_runtime.subsystem_code(),
3781 #mission_mod::TASK_IDS,
3782 );
3783 #(#stop_calls)*
3784 ctx.clear_current_task();
3785 self.copper_runtime.monitor.stop(&ctx)?;
3786 self.copper_runtime.copperlists_manager.finish_pending()?;
3787 let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::MissionStopped {
3790 mission: #mission.to_string(),
3791 reason: "stop_all_tasks".to_string(),
3792 });
3793 Ok(())
3794 }
3795
3796 #run {
3797 #run_body
3798 }
3799 };
3800
3801 let tasks_type = if sim_mode {
3802 quote!(CuSimTasks)
3803 } else {
3804 quote!(CuTasks)
3805 };
3806
3807 let tasks_instanciator_fn = if sim_mode {
3808 quote!(tasks_instanciator_sim)
3809 } else {
3810 quote!(tasks_instanciator)
3811 };
3812
3813 let app_impl_decl = if sim_mode {
3814 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuSimApplication<S, L> for #application_name)
3815 } else {
3816 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuApplication<S, L> for #application_name)
3817 };
3818
3819 let simstep_type_decl = if sim_mode {
3820 quote!(
3821 type Step<'z> = SimStep<'z>;
3822 )
3823 } else {
3824 quote!()
3825 };
3826
3827 let app_resources_struct = quote! {
3828 pub struct AppResources {
3829 pub config: CuConfig,
3830 pub config_source: RuntimeLifecycleConfigSource,
3831 pub resources: ResourceManager,
3832 }
3833 };
3834
3835 let prepare_config_fn = quote! {
3836 #prepare_config_sig {
3837 let config_filename = #config_file;
3838
3839 #[cfg(target_os = "none")]
3840 ::cu29::prelude::info!("CuApp init: config file {}", config_filename);
3841 #[cfg(target_os = "none")]
3842 ::cu29::prelude::info!("CuApp init: loading config");
3843 #config_load_stmt
3844 #copperlist_count_check
3845 #[cfg(target_os = "none")]
3846 ::cu29::prelude::info!("CuApp init: config loaded");
3847 if let Some(runtime) = &config.runtime {
3848 #[cfg(target_os = "none")]
3849 ::cu29::prelude::info!(
3850 "CuApp init: rate_target_hz={}",
3851 runtime.rate_target_hz.unwrap_or(0)
3852 );
3853 } else {
3854 #[cfg(target_os = "none")]
3855 ::cu29::prelude::info!("CuApp init: rate_target_hz=none");
3856 }
3857
3858 Ok((config, config_source))
3859 }
3860 };
3861
3862 let prepare_resources_fn = quote! {
3863 #prepare_resources_sig {
3864 let (config, config_source) = #prepare_config_call;
3865
3866 #[cfg(target_os = "none")]
3867 ::cu29::prelude::info!("CuApp init: building resources");
3868 let resources = #mission_mod::resources_instanciator(&config)?;
3869 #[cfg(target_os = "none")]
3870 ::cu29::prelude::info!("CuApp init: resources ready");
3871
3872 Ok(AppResources {
3873 config,
3874 config_source,
3875 resources,
3876 })
3877 }
3878 };
3879
3880 let new_with_resources_compat_fn = if sim_mode {
3881 quote! {
3882 pub fn new_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
3883 clock: RobotClock,
3884 unified_logger: Arc<Mutex<L>>,
3885 app_resources: AppResources,
3886 instance_id: u32,
3887 sim_callback: &mut impl FnMut(SimStep) -> SimOverride,
3888 ) -> CuResult<Self> {
3889 Self::build_with_resources(
3890 clock,
3891 unified_logger,
3892 app_resources,
3893 instance_id,
3894 sim_callback,
3895 )
3896 }
3897 }
3898 } else {
3899 quote! {
3900 pub fn new_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
3901 clock: RobotClock,
3902 unified_logger: Arc<Mutex<L>>,
3903 app_resources: AppResources,
3904 instance_id: u32,
3905 ) -> CuResult<Self> {
3906 Self::build_with_resources(clock, unified_logger, app_resources, instance_id)
3907 }
3908 }
3909 };
3910
3911 let build_with_resources_fn = quote! {
3912 #build_with_resources_sig {
3913 let AppResources {
3914 config,
3915 config_source,
3916 resources,
3917 } = app_resources;
3918
3919 let structured_stream = ::cu29::prelude::stream_write::<
3920 ::cu29::prelude::CuLogEntry,
3921 S,
3922 >(
3923 unified_logger.clone(),
3924 ::cu29::prelude::UnifiedLogType::StructuredLogLine,
3925 4096 * 10,
3926 )?;
3927 let logger_runtime = ::cu29::prelude::LoggerRuntime::init(
3928 clock.clone(),
3929 structured_stream,
3930 None::<::cu29::prelude::NullLog>,
3931 );
3932
3933 let mut default_section_size = size_of::<super::#mission_mod::CuList>() * 64;
3936 if let Some(section_size_mib) = config.logging.as_ref().and_then(|l| l.section_size_mib) {
3938 default_section_size = section_size_mib as usize * 1024usize * 1024usize;
3940 }
3941 #[cfg(target_os = "none")]
3942 ::cu29::prelude::info!(
3943 "CuApp new: copperlist section size={}",
3944 default_section_size
3945 );
3946 #[cfg(target_os = "none")]
3947 ::cu29::prelude::info!("CuApp new: creating copperlist stream");
3948 let copperlist_stream = stream_write::<#mission_mod::CuList, S>(
3949 unified_logger.clone(),
3950 UnifiedLogType::CopperList,
3951 default_section_size,
3952 )?;
3956 #[cfg(target_os = "none")]
3957 ::cu29::prelude::info!("CuApp new: copperlist stream ready");
3958
3959 #[cfg(target_os = "none")]
3960 ::cu29::prelude::info!("CuApp new: creating keyframes stream");
3961 let keyframes_stream = stream_write::<KeyFrame, S>(
3962 unified_logger.clone(),
3963 UnifiedLogType::FrozenTasks,
3964 1024 * 1024 * 10, )?;
3966 #[cfg(target_os = "none")]
3967 ::cu29::prelude::info!("CuApp new: keyframes stream ready");
3968
3969 #[cfg(target_os = "none")]
3970 ::cu29::prelude::info!("CuApp new: creating runtime lifecycle stream");
3971 let mut runtime_lifecycle_stream = stream_write::<RuntimeLifecycleRecord, S>(
3972 unified_logger.clone(),
3973 UnifiedLogType::RuntimeLifecycle,
3974 1024 * 64, )?;
3976 let effective_config_ron = config
3977 .serialize_ron()
3978 .unwrap_or_else(|_| "<failed to serialize config>".to_string());
3979 let stack_info = RuntimeLifecycleStackInfo {
3980 app_name: env!("CARGO_PKG_NAME").to_string(),
3981 app_version: env!("CARGO_PKG_VERSION").to_string(),
3982 git_commit: #git_commit_tokens,
3983 git_dirty: #git_dirty_tokens,
3984 subsystem_id: #application_name::subsystem().id().map(str::to_string),
3985 subsystem_code: #application_name::subsystem().code(),
3986 instance_id,
3987 };
3988 runtime_lifecycle_stream.log(&RuntimeLifecycleRecord {
3989 timestamp: clock.now(),
3990 event: RuntimeLifecycleEvent::Instantiated {
3991 config_source,
3992 effective_config_ron,
3993 stack: stack_info,
3994 },
3995 })?;
3996 #[cfg(target_os = "none")]
3997 ::cu29::prelude::info!("CuApp new: runtime lifecycle stream ready");
3998
3999 #[cfg(target_os = "none")]
4000 ::cu29::prelude::info!("CuApp new: building runtime");
4001 let copper_runtime = CuRuntimeBuilder::<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #copperlist_count_tokens, _, _, _, _, _>::new(
4002 clock,
4003 &config,
4004 #mission,
4005 CuRuntimeParts::new(
4006 #mission_mod::#tasks_instanciator_fn,
4007 #mission_mod::MONITORED_COMPONENTS,
4008 #mission_mod::CULIST_COMPONENT_MAPPING,
4009 #parallel_rt_metadata_arg
4010 #mission_mod::monitor_instanciator,
4011 #mission_mod::bridges_instanciator,
4012 ),
4013 copperlist_stream,
4014 keyframes_stream,
4015 )
4016 .with_subsystem(#application_name::subsystem())
4017 .with_instance_id(instance_id)
4018 .with_resources(resources)
4019 .build()?;
4020 #[cfg(target_os = "none")]
4021 ::cu29::prelude::info!("CuApp new: runtime built");
4022
4023 let application = Ok(#application_name {
4024 copper_runtime,
4025 runtime_lifecycle_stream: Some(Box::new(runtime_lifecycle_stream)),
4026 logger_runtime,
4027 });
4028
4029 #sim_callback_on_new
4030
4031 application
4032 }
4033 };
4034
4035 let app_inherent_impl = quote! {
4036 impl #application_name {
4037 const SUBSYSTEM: cu29::prelude::app::Subsystem =
4038 cu29::prelude::app::Subsystem::new(#subsystem_id_tokens, #subsystem_code_literal);
4039
4040 #[inline]
4041 pub fn subsystem() -> cu29::prelude::app::Subsystem {
4042 Self::SUBSYSTEM
4043 }
4044
4045 pub fn original_config() -> String {
4046 #copper_config_content.to_string()
4047 }
4048
4049 pub fn register_reflect_types(registry: &mut cu29::reflect::TypeRegistry) {
4050 #(#reflect_type_registration_calls)*
4051 }
4052
4053 #[inline]
4055 pub fn clock(&self) -> cu29::clock::RobotClock {
4056 self.copper_runtime.clock()
4057 }
4058
4059 pub fn log_runtime_lifecycle_event(
4061 &mut self,
4062 event: RuntimeLifecycleEvent,
4063 ) -> CuResult<()> {
4064 let timestamp = self.copper_runtime.clock.now();
4065 let Some(stream) = self.runtime_lifecycle_stream.as_mut() else {
4066 return Err(CuError::from("Runtime lifecycle stream is not initialized"));
4067 };
4068 stream.log(&RuntimeLifecycleRecord { timestamp, event })
4069 }
4070
4071 pub fn log_shutdown_completed(&mut self) -> CuResult<()> {
4075 self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::ShutdownCompleted)
4076 }
4077
4078 #prepare_config_fn
4079 #prepare_resources_compat_fn
4080 #prepare_resources_fn
4081 #init_resources_compat_fn
4082 #new_with_resources_compat_fn
4083 #build_with_resources_fn
4084
4085 #[inline]
4087 pub fn copper_runtime_mut(&mut self) -> &mut CuRuntime<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #copperlist_count_tokens> {
4088 &mut self.copper_runtime
4089 }
4090 }
4091 };
4092
4093 let app_metadata_impl = quote! {
4094 impl cu29::prelude::app::CuSubsystemMetadata for #application_name {
4095 fn subsystem() -> cu29::prelude::app::Subsystem {
4096 #application_name::subsystem()
4097 }
4098 }
4099 };
4100
4101 let app_reflect_impl = quote! {
4102 impl cu29::reflect::ReflectTaskIntrospection for #application_name {
4103 fn reflect_task(&self, task_id: &str) -> Option<&dyn cu29::reflect::Reflect> {
4104 match task_id {
4105 #(#task_reflect_read_arms)*
4106 _ => None,
4107 }
4108 }
4109
4110 fn reflect_task_mut(
4111 &mut self,
4112 task_id: &str,
4113 ) -> Option<&mut dyn cu29::reflect::Reflect> {
4114 match task_id {
4115 #(#task_reflect_write_arms)*
4116 _ => None,
4117 }
4118 }
4119
4120 fn register_reflect_types(registry: &mut cu29::reflect::TypeRegistry) {
4121 #application_name::register_reflect_types(registry);
4122 }
4123 }
4124 };
4125
4126 #[cfg(feature = "std")]
4127 #[cfg(feature = "macro_debug")]
4128 eprintln!("[build result]");
4129 let application_impl = quote! {
4130 #app_impl_decl {
4131 #simstep_type_decl
4132
4133 fn get_original_config() -> String {
4134 Self::original_config()
4135 }
4136
4137 #run_methods
4138 }
4139 };
4140
4141 let recorded_replay_app_impl = if sim_mode {
4142 Some(quote! {
4143 impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
4144 CuRecordedReplayApplication<S, L> for #application_name
4145 {
4146 type RecordedDataSet = #mission_mod::CuStampedDataSet;
4147
4148 fn replay_recorded_copperlist(
4149 &mut self,
4150 clock_mock: &RobotClockMock,
4151 copperlist: &CopperList<Self::RecordedDataSet>,
4152 keyframe: Option<&KeyFrame>,
4153 ) -> CuResult<()> {
4154 if let Some(keyframe) = keyframe {
4155 if keyframe.culistid != copperlist.id {
4156 return Err(CuError::from(format!(
4157 "Recorded keyframe culistid {} does not match copperlist {}",
4158 keyframe.culistid, copperlist.id
4159 )));
4160 }
4161
4162 if !self.copper_runtime_mut().captures_keyframe(copperlist.id) {
4163 return Err(CuError::from(format!(
4164 "CopperList {} is not configured to capture a keyframe in this runtime",
4165 copperlist.id
4166 )));
4167 }
4168
4169 self.copper_runtime_mut()
4170 .set_forced_keyframe_timestamp(keyframe.timestamp);
4171 self.copper_runtime_mut().lock_keyframe(keyframe);
4172 clock_mock.set_value(keyframe.timestamp.as_nanos());
4173 } else {
4174 let timestamp =
4175 cu29::simulation::recorded_copperlist_timestamp(copperlist)
4176 .ok_or_else(|| {
4177 CuError::from(format!(
4178 "Recorded copperlist {} has no process_time.start timestamps",
4179 copperlist.id
4180 ))
4181 })?;
4182 clock_mock.set_value(timestamp.as_nanos());
4183 }
4184
4185 let mut sim_callback = |step: SimStep<'_>| -> SimOverride {
4186 #mission_mod::recorded_replay_step(step, copperlist)
4187 };
4188 <Self as CuSimApplication<S, L>>::run_one_iteration(self, &mut sim_callback)
4189 }
4190 }
4191 })
4192 } else {
4193 None
4194 };
4195
4196 let distributed_replay_app_impl = if sim_mode {
4197 Some(quote! {
4198 impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
4199 cu29::prelude::app::CuDistributedReplayApplication<S, L> for #application_name
4200 {
4201 fn build_distributed_replay(
4202 clock: cu29::clock::RobotClock,
4203 unified_logger: std::sync::Arc<std::sync::Mutex<L>>,
4204 instance_id: u32,
4205 config_override: Option<cu29::config::CuConfig>,
4206 ) -> CuResult<Self> {
4207 let mut noop =
4208 |_step: SimStep<'_>| cu29::simulation::SimOverride::ExecuteByRuntime;
4209 let builder = Self::builder()
4210 .with_logger::<S, L>(unified_logger)
4211 .with_clock(clock)
4212 .with_instance_id(instance_id);
4213 let builder = if let Some(config_override) = config_override {
4214 builder.with_config(config_override)
4215 } else {
4216 builder
4217 };
4218 builder.with_sim_callback(&mut noop).build()
4219 }
4220 }
4221 })
4222 } else {
4223 None
4224 };
4225
4226 let builder_prepare_config_call = if std {
4227 quote! { #application_name::prepare_config(self.instance_id, self.config_override)? }
4228 } else {
4229 quote! {{
4230 let _ = self.config_override;
4231 #application_name::prepare_config()?
4232 }}
4233 };
4234
4235 let builder_with_config_method = if std {
4236 Some(quote! {
4237 #[allow(dead_code)]
4238 pub fn with_config(mut self, config_override: CuConfig) -> Self {
4239 self.config_override = Some(config_override);
4240 self
4241 }
4242 })
4243 } else {
4244 None
4245 };
4246
4247 let builder_default_clock = if std {
4248 quote! { Some(RobotClock::default()) }
4249 } else {
4250 quote! { None }
4251 };
4252
4253 let (
4254 builder_struct,
4255 builder_impl,
4256 builder_ctor,
4257 builder_log_path_generics,
4258 builder_sim_callback_method,
4259 builder_build_sim_callback_arg,
4260 ) = if sim_mode {
4261 (
4262 quote! {
4263 #[allow(dead_code)]
4264 pub struct #builder_name<'a, F, S, L, R>
4265 where
4266 S: SectionStorage + 'static,
4267 L: UnifiedLogWrite<S> + 'static,
4268 R: FnOnce(&CuConfig) -> CuResult<ResourceManager>,
4269 F: FnMut(SimStep) -> SimOverride,
4270 {
4271 clock: Option<RobotClock>,
4272 unified_logger: Arc<Mutex<L>>,
4273 instance_id: u32,
4274 config_override: Option<CuConfig>,
4275 resources_factory: R,
4276 sim_callback: Option<&'a mut F>,
4277 _storage: core::marker::PhantomData<S>,
4278 }
4279 },
4280 quote! {
4281 impl<'a, F, S, L, R> #builder_name<'a, F, S, L, R>
4282 where
4283 S: SectionStorage + 'static,
4284 L: UnifiedLogWrite<S> + 'static,
4285 R: FnOnce(&CuConfig) -> CuResult<ResourceManager>,
4286 F: FnMut(SimStep) -> SimOverride,
4287 },
4288 quote! {
4289 #[allow(dead_code)]
4290 pub fn builder<'a, F>() -> #builder_name<'a, F, cu29::prelude::NoopSectionStorage, cu29::prelude::NoopLogger, fn(&CuConfig) -> CuResult<ResourceManager>>
4291 where
4292 F: FnMut(SimStep) -> SimOverride,
4293 {
4294 #builder_name {
4295 clock: #builder_default_clock,
4296 unified_logger: Arc::new(Mutex::new(cu29::prelude::NoopLogger::new())),
4297 instance_id: 0,
4298 config_override: None,
4299 resources_factory: #mission_mod::resources_instanciator as fn(&CuConfig) -> CuResult<ResourceManager>,
4300 sim_callback: None,
4301 _storage: core::marker::PhantomData,
4302 }
4303 }
4304 },
4305 quote! {'a, F, MmapSectionStorage, UnifiedLoggerWrite, R},
4306 Some(quote! {
4307 #[allow(dead_code)]
4308 pub fn with_sim_callback(mut self, sim_callback: &'a mut F) -> Self {
4309 self.sim_callback = Some(sim_callback);
4310 self
4311 }
4312 }),
4313 Some(quote! {
4314 self.sim_callback
4315 .ok_or(CuError::from("Sim callback missing from builder"))?,
4316 }),
4317 )
4318 } else {
4319 (
4320 quote! {
4321 #[allow(dead_code)]
4322 pub struct #builder_name<S, L, R>
4323 where
4324 S: SectionStorage + 'static,
4325 L: UnifiedLogWrite<S> + 'static,
4326 R: FnOnce(&CuConfig) -> CuResult<ResourceManager>,
4327 {
4328 clock: Option<RobotClock>,
4329 unified_logger: Arc<Mutex<L>>,
4330 instance_id: u32,
4331 config_override: Option<CuConfig>,
4332 resources_factory: R,
4333 _storage: core::marker::PhantomData<S>,
4334 }
4335 },
4336 quote! {
4337 impl<S, L, R> #builder_name<S, L, R>
4338 where
4339 S: SectionStorage + 'static,
4340 L: UnifiedLogWrite<S> + 'static,
4341 R: FnOnce(&CuConfig) -> CuResult<ResourceManager>,
4342 },
4343 quote! {
4344 #[allow(dead_code)]
4345 pub fn builder() -> #builder_name<cu29::prelude::NoopSectionStorage, cu29::prelude::NoopLogger, fn(&CuConfig) -> CuResult<ResourceManager>> {
4346 #builder_name {
4347 clock: #builder_default_clock,
4348 unified_logger: Arc::new(Mutex::new(cu29::prelude::NoopLogger::new())),
4349 instance_id: 0,
4350 config_override: None,
4351 resources_factory: #mission_mod::resources_instanciator as fn(&CuConfig) -> CuResult<ResourceManager>,
4352 _storage: core::marker::PhantomData,
4353 }
4354 }
4355 },
4356 quote! {MmapSectionStorage, UnifiedLoggerWrite, R},
4357 None,
4358 None,
4359 )
4360 };
4361
4362 let builder_with_logger_generics = if sim_mode {
4363 quote! {'a, F, S2, L2, R}
4364 } else {
4365 quote! {S2, L2, R}
4366 };
4367
4368 let builder_with_resources_generics = if sim_mode {
4369 quote! {'a, F, S, L, R2}
4370 } else {
4371 quote! {S, L, R2}
4372 };
4373
4374 let builder_sim_callback_field_copy = if sim_mode {
4375 Some(quote! {
4376 sim_callback: self.sim_callback,
4377 })
4378 } else {
4379 None
4380 };
4381
4382 let builder_with_log_path_method = if std {
4383 Some(quote! {
4384 #[allow(dead_code)]
4385 pub fn with_log_path(
4386 self,
4387 path: impl AsRef<std::path::Path>,
4388 slab_size: Option<usize>,
4389 ) -> CuResult<#builder_name<#builder_log_path_generics>> {
4390 let preallocated_size = slab_size.unwrap_or(1024 * 1024 * 10);
4391 let logger = cu29::prelude::UnifiedLoggerBuilder::new()
4392 .write(true)
4393 .create(true)
4394 .file_base_name(path.as_ref())
4395 .preallocated_size(preallocated_size)
4396 .build()
4397 .map_err(|e| CuError::new_with_cause("Failed to create unified logger", e))?;
4398 let logger = match logger {
4399 cu29::prelude::UnifiedLogger::Write(logger) => logger,
4400 cu29::prelude::UnifiedLogger::Read(_) => {
4401 return Err(CuError::from(
4402 "UnifiedLoggerBuilder did not create a write-capable logger",
4403 ));
4404 }
4405 };
4406 Ok(self.with_logger::<MmapSectionStorage, UnifiedLoggerWrite>(Arc::new(Mutex::new(
4407 logger,
4408 ))))
4409 }
4410 })
4411 } else {
4412 None
4413 };
4414
4415 let builder_with_unified_logger_method = if std {
4416 Some(quote! {
4417 #[allow(dead_code)]
4418 pub fn with_unified_logger(
4419 self,
4420 unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
4421 ) -> #builder_name<#builder_log_path_generics> {
4422 self.with_logger::<MmapSectionStorage, UnifiedLoggerWrite>(unified_logger)
4423 }
4424 })
4425 } else {
4426 None
4427 };
4428
4429 let std_application_impl = if sim_mode {
4431 Some(quote! {
4433 impl #application_name {
4434 pub fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
4435 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self, sim_callback)
4436 }
4437 pub fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
4438 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self, sim_callback)
4439 }
4440 pub fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
4441 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self, sim_callback)
4442 }
4443 pub fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
4444 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self, sim_callback)
4445 }
4446 pub fn replay_recorded_copperlist(
4447 &mut self,
4448 clock_mock: &RobotClockMock,
4449 copperlist: &CopperList<CuStampedDataSet>,
4450 keyframe: Option<&KeyFrame>,
4451 ) -> CuResult<()> {
4452 <Self as CuRecordedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>>::replay_recorded_copperlist(
4453 self,
4454 clock_mock,
4455 copperlist,
4456 keyframe,
4457 )
4458 }
4459 }
4460 })
4461 } else if std {
4462 Some(quote! {
4464 impl #application_name {
4465 pub fn start_all_tasks(&mut self) -> CuResult<()> {
4466 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self)
4467 }
4468 pub fn run_one_iteration(&mut self) -> CuResult<()> {
4469 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self)
4470 }
4471 pub fn run(&mut self) -> CuResult<()> {
4472 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self)
4473 }
4474 pub fn stop_all_tasks(&mut self) -> CuResult<()> {
4475 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self)
4476 }
4477 }
4478 })
4479 } else {
4480 None };
4482
4483 let application_builder = Some(quote! {
4484 #builder_struct
4485
4486 #builder_impl
4487 {
4488 #[allow(dead_code)]
4489 pub fn with_clock(mut self, clock: RobotClock) -> Self {
4490 self.clock = Some(clock);
4491 self
4492 }
4493
4494 #[allow(dead_code)]
4495 pub fn with_logger<S2, L2>(
4496 self,
4497 unified_logger: Arc<Mutex<L2>>,
4498 ) -> #builder_name<#builder_with_logger_generics>
4499 where
4500 S2: SectionStorage + 'static,
4501 L2: UnifiedLogWrite<S2> + 'static,
4502 {
4503 #builder_name {
4504 clock: self.clock,
4505 unified_logger,
4506 instance_id: self.instance_id,
4507 config_override: self.config_override,
4508 resources_factory: self.resources_factory,
4509 #builder_sim_callback_field_copy
4510 _storage: core::marker::PhantomData,
4511 }
4512 }
4513
4514 #builder_with_unified_logger_method
4515
4516 #[allow(dead_code)]
4517 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
4518 self.instance_id = instance_id;
4519 self
4520 }
4521
4522 pub fn with_resources<R2>(self, resources_factory: R2) -> #builder_name<#builder_with_resources_generics>
4523 where
4524 R2: FnOnce(&CuConfig) -> CuResult<ResourceManager>,
4525 {
4526 #builder_name {
4527 clock: self.clock,
4528 unified_logger: self.unified_logger,
4529 instance_id: self.instance_id,
4530 config_override: self.config_override,
4531 resources_factory,
4532 #builder_sim_callback_field_copy
4533 _storage: core::marker::PhantomData,
4534 }
4535 }
4536
4537 #builder_with_config_method
4538 #builder_with_log_path_method
4539 #builder_sim_callback_method
4540
4541 #[allow(dead_code)]
4542 pub fn build(self) -> CuResult<#application_name> {
4543 let clock = self
4544 .clock
4545 .ok_or(CuError::from("Clock missing from builder"))?;
4546 let (config, config_source) = #builder_prepare_config_call;
4547 let resources = (self.resources_factory)(&config)?;
4548 let app_resources = AppResources {
4549 config,
4550 config_source,
4551 resources,
4552 };
4553 #application_name::build_with_resources(
4554 clock,
4555 self.unified_logger,
4556 app_resources,
4557 self.instance_id,
4558 #builder_build_sim_callback_arg
4559 )
4560 }
4561 }
4562 });
4563
4564 let app_builder_inherent_impl = quote! {
4565 impl #application_name {
4566 #builder_ctor
4567 }
4568 };
4569
4570 let sim_imports = if sim_mode {
4571 Some(quote! {
4572 use cu29::simulation::SimOverride;
4573 use cu29::simulation::CuTaskCallbackState;
4574 use cu29::simulation::CuSimSrcTask;
4575 use cu29::simulation::CuSimSinkTask;
4576 use cu29::simulation::CuSimBridge;
4577 use cu29::prelude::app::CuSimApplication;
4578 use cu29::prelude::app::CuRecordedReplayApplication;
4579 use cu29::cubridge::BridgeChannelSet;
4580 })
4581 } else {
4582 None
4583 };
4584
4585 let sim_tasks = if sim_mode {
4586 Some(quote! {
4587 pub type CuSimTasks = #task_types_tuple_sim;
4590 })
4591 } else {
4592 None
4593 };
4594
4595 let sim_inst_body = if task_sim_instances_init_code.is_empty() {
4596 quote! {
4597 let _ = resources;
4598 Ok(())
4599 }
4600 } else {
4601 quote! { Ok(( #(#task_sim_instances_init_code),*, )) }
4602 };
4603
4604 let sim_tasks_instanciator = if sim_mode {
4605 Some(quote! {
4606 pub fn tasks_instanciator_sim(
4607 all_instances_configs: Vec<Option<&ComponentConfig>>,
4608 resources: &mut ResourceManager,
4609 ) -> CuResult<CuSimTasks> {
4610 #sim_inst_body
4611 }})
4612 } else {
4613 None
4614 };
4615
4616 let tasks_inst_body_std = if task_instances_init_code.is_empty() {
4617 quote! {
4618 let _ = resources;
4619 Ok(())
4620 }
4621 } else {
4622 quote! { Ok(( #(#task_instances_init_code),*, )) }
4623 };
4624
4625 let tasks_inst_body_nostd = if task_instances_init_code.is_empty() {
4626 quote! {
4627 let _ = resources;
4628 Ok(())
4629 }
4630 } else {
4631 quote! { Ok(( #(#task_instances_init_code),*, )) }
4632 };
4633
4634 let tasks_instanciator = if std {
4635 quote! {
4636 pub fn tasks_instanciator<'c>(
4637 all_instances_configs: Vec<Option<&'c ComponentConfig>>,
4638 resources: &mut ResourceManager,
4639 ) -> CuResult<CuTasks> {
4640 #tasks_inst_body_std
4641 }
4642 }
4643 } else {
4644 quote! {
4646 pub fn tasks_instanciator<'c>(
4647 all_instances_configs: Vec<Option<&'c ComponentConfig>>,
4648 resources: &mut ResourceManager,
4649 ) -> CuResult<CuTasks> {
4650 #tasks_inst_body_nostd
4651 }
4652 }
4653 };
4654
4655 let imports = if std {
4656 quote! {
4657 use cu29::rayon::ThreadPool;
4658 use cu29::cuasynctask::CuAsyncTask;
4659 use cu29::resource::{ResourceBindings, ResourceManager};
4660 use cu29::prelude::SectionStorage;
4661 use cu29::prelude::UnifiedLoggerWrite;
4662 use cu29::prelude::memmap::MmapSectionStorage;
4663 use std::fmt::{Debug, Formatter};
4664 use std::fmt::Result as FmtResult;
4665 use std::mem::size_of;
4666 use std::boxed::Box;
4667 use std::sync::Arc;
4668 use std::sync::atomic::{AtomicBool, Ordering};
4669 use std::sync::Mutex;
4670 }
4671 } else {
4672 quote! {
4673 use alloc::boxed::Box;
4674 use alloc::sync::Arc;
4675 use alloc::string::String;
4676 use alloc::string::ToString;
4677 use core::sync::atomic::{AtomicBool, Ordering};
4678 use core::fmt::{Debug, Formatter};
4679 use core::fmt::Result as FmtResult;
4680 use core::mem::size_of;
4681 use spin::Mutex;
4682 use cu29::prelude::SectionStorage;
4683 use cu29::resource::{ResourceBindings, ResourceManager};
4684 }
4685 };
4686
4687 let task_mapping_defs = task_resource_mappings.defs.clone();
4688 let bridge_mapping_defs = bridge_resource_mappings.defs.clone();
4689
4690 let mission_mod_tokens = quote! {
4692 mod #mission_mod {
4693 use super::*; use cu29::bincode::Encode;
4696 use cu29::bincode::enc::Encoder;
4697 use cu29::bincode::error::EncodeError;
4698 use cu29::bincode::Decode;
4699 use cu29::bincode::de::Decoder;
4700 use cu29::bincode::de::DecoderImpl;
4701 use cu29::bincode::error::DecodeError;
4702 use cu29::clock::RobotClock;
4703 use cu29::clock::RobotClockMock;
4704 use cu29::config::CuConfig;
4705 use cu29::config::ComponentConfig;
4706 use cu29::curuntime::CuRuntime;
4707 use cu29::curuntime::CuRuntimeBuilder;
4708 use cu29::curuntime::CuRuntimeParts;
4709 use cu29::curuntime::KeyFrame;
4710 use cu29::curuntime::RuntimeLifecycleConfigSource;
4711 use cu29::curuntime::RuntimeLifecycleEvent;
4712 use cu29::curuntime::RuntimeLifecycleRecord;
4713 use cu29::curuntime::RuntimeLifecycleStackInfo;
4714 use cu29::CuResult;
4715 use cu29::CuError;
4716 use cu29::cutask::CuSrcTask;
4717 use cu29::cutask::CuSinkTask;
4718 use cu29::cutask::CuTask;
4719 use cu29::cutask::CuMsg;
4720 use cu29::cutask::CuMsgMetadata;
4721 use cu29::copperlist::CopperList;
4722 use cu29::monitoring::CuMonitor; use cu29::monitoring::CuComponentState;
4724 use cu29::monitoring::Decision;
4725 use cu29::prelude::app::CuApplication;
4726 use cu29::prelude::debug;
4727 use cu29::prelude::stream_write;
4728 use cu29::prelude::UnifiedLogType;
4729 use cu29::prelude::UnifiedLogWrite;
4730 use cu29::prelude::WriteStream;
4731
4732 #imports
4733
4734 #sim_imports
4735
4736 #[allow(unused_imports)]
4738 use cu29::monitoring::NoMonitor;
4739
4740 pub type CuTasks = #task_types_tuple;
4744 pub type CuBridges = #bridges_type_tokens;
4745 #sim_bridge_channel_defs
4746 #resources_module
4747 #resources_instanciator_fn
4748 #task_mapping_defs
4749 #bridge_mapping_defs
4750
4751 #sim_tasks
4752 #sim_support
4753 #recorded_replay_support
4754 #sim_tasks_instanciator
4755
4756 pub const TASK_IDS: &'static [&'static str] = &[#( #task_ids ),*];
4757 pub const MONITORED_COMPONENTS: &'static [cu29::monitoring::MonitorComponentMetadata] =
4758 &[#( #monitored_component_entries ),*];
4759 pub const CULIST_COMPONENT_MAPPING: &'static [cu29::monitoring::ComponentId] =
4760 &[#( cu29::monitoring::ComponentId::new(#culist_component_mapping) ),*];
4761 pub const MONITOR_LAYOUT: cu29::monitoring::CopperListLayout =
4762 cu29::monitoring::CopperListLayout::new(
4763 MONITORED_COMPONENTS,
4764 CULIST_COMPONENT_MAPPING,
4765 );
4766 #parallel_rt_metadata_defs
4767
4768 #[inline]
4769 pub fn monitor_component_label(
4770 component_id: cu29::monitoring::ComponentId,
4771 ) -> &'static str {
4772 MONITORED_COMPONENTS[component_id.index()].id()
4773 }
4774
4775 #culist_support
4776 #parallel_rt_support_tokens
4777
4778 #tasks_instanciator
4779 #bridges_instanciator
4780
4781 pub fn monitor_instanciator(
4782 config: &CuConfig,
4783 metadata: ::cu29::monitoring::CuMonitoringMetadata,
4784 runtime: ::cu29::monitoring::CuMonitoringRuntime,
4785 ) -> #monitor_type {
4786 #monitor_instanciator_body
4787 }
4788
4789 #app_resources_struct
4791 pub #application_struct
4792
4793 #app_inherent_impl
4794 #app_builder_inherent_impl
4795 #app_metadata_impl
4796 #app_reflect_impl
4797 #application_impl
4798 #recorded_replay_app_impl
4799 #distributed_replay_app_impl
4800
4801 #std_application_impl
4802
4803 #application_builder
4804 }
4805
4806 };
4807 all_missions_tokens.push(mission_mod_tokens);
4808 }
4809
4810 let default_application_tokens = if all_missions.contains_key("default") {
4811 let default_builder = quote! {
4812 #[allow(unused_imports)]
4813 use default::#builder_name;
4814 };
4815 quote! {
4816 #default_builder
4817
4818 #[allow(unused_imports)]
4819 use default::AppResources;
4820
4821 #[allow(unused_imports)]
4822 use default::resources as app_resources;
4823
4824 #[allow(unused_imports)]
4825 use default::#application_name;
4826 }
4827 } else {
4828 quote!() };
4830
4831 let result: proc_macro2::TokenStream = quote! {
4832 #(#all_missions_tokens)*
4833 #default_application_tokens
4834 };
4835
4836 result.into()
4837}
4838
4839fn resolve_runtime_config(args: &CopperRuntimeArgs) -> CuResult<ResolvedRuntimeConfig> {
4840 let caller_root = utils::caller_crate_root();
4841 resolve_runtime_config_with_root(args, &caller_root)
4842}
4843
4844fn resolve_runtime_config_with_root(
4845 args: &CopperRuntimeArgs,
4846 caller_root: &Path,
4847) -> CuResult<ResolvedRuntimeConfig> {
4848 let filename = config_full_path_from_root(caller_root, &args.config_path);
4849 if !Path::new(&filename).exists() {
4850 return Err(CuError::from(format!(
4851 "The configuration file `{}` does not exist. Please provide a valid path.",
4852 args.config_path
4853 )));
4854 }
4855
4856 if let Some(subsystem_id) = args.subsystem_id.as_deref() {
4857 let multi_config = cu29_runtime::config::read_multi_configuration(filename.as_str())
4858 .map_err(|e| {
4859 CuError::from(format!(
4860 "When `subsystem = \"{subsystem_id}\"` is provided, `config = \"{}\"` must point to a valid multi-Copper configuration: {e}",
4861 args.config_path
4862 ))
4863 })?;
4864 let subsystem = multi_config.subsystem(subsystem_id).ok_or_else(|| {
4865 CuError::from(format!(
4866 "Subsystem '{subsystem_id}' was not found in multi-Copper configuration '{}'.",
4867 args.config_path
4868 ))
4869 })?;
4870 let bundled_local_config_content = read_to_string(&subsystem.config_path).map_err(|e| {
4871 CuError::from(format!(
4872 "Failed to read bundled local configuration for subsystem '{subsystem_id}' from '{}'.",
4873 subsystem.config_path
4874 ))
4875 .add_cause(e.to_string().as_str())
4876 })?;
4877
4878 Ok(ResolvedRuntimeConfig {
4879 local_config: subsystem.config.clone(),
4880 bundled_local_config_content,
4881 subsystem_id: Some(subsystem_id.to_string()),
4882 subsystem_code: subsystem.subsystem_code,
4883 })
4884 } else {
4885 Ok(ResolvedRuntimeConfig {
4886 local_config: read_configuration(filename.as_str())?,
4887 bundled_local_config_content: read_to_string(&filename).map_err(|e| {
4888 CuError::from(format!(
4889 "Could not read the configuration file '{}'.",
4890 args.config_path
4891 ))
4892 .add_cause(e.to_string().as_str())
4893 })?,
4894 subsystem_id: None,
4895 subsystem_code: 0,
4896 })
4897 }
4898}
4899
4900fn build_config_load_stmt(
4901 std_enabled: bool,
4902 application_name: &Ident,
4903 subsystem_id: Option<&str>,
4904) -> proc_macro2::TokenStream {
4905 if std_enabled {
4906 if let Some(subsystem_id) = subsystem_id {
4907 quote! {
4908 let (config, config_source) = if let Some(overridden_config) = config_override {
4909 debug!("CuConfig: Overridden programmatically.");
4910 (overridden_config, RuntimeLifecycleConfigSource::ProgrammaticOverride)
4911 } else if ::std::path::Path::new(config_filename).exists() {
4912 let subsystem_id = #application_name::subsystem()
4913 .id()
4914 .expect("generated multi-Copper runtime is missing a subsystem id");
4915 debug!(
4916 "CuConfig: Reading multi-Copper configuration from file: {} (subsystem={})",
4917 config_filename,
4918 subsystem_id
4919 );
4920 let multi_config = cu29::config::read_multi_configuration(config_filename)?;
4921 (
4922 multi_config.resolve_subsystem_config_for_instance(subsystem_id, instance_id)?,
4923 RuntimeLifecycleConfigSource::ExternalFile,
4924 )
4925 } else {
4926 let original_config = Self::original_config();
4927 debug!(
4928 "CuConfig: Using the bundled subsystem configuration compiled into the binary (subsystem={}).",
4929 #subsystem_id
4930 );
4931 if instance_id != 0 {
4932 debug!(
4933 "CuConfig: runtime file '{}' is missing, so instance-specific overrides for instance_id={} cannot be resolved; using bundled subsystem defaults.",
4934 config_filename,
4935 instance_id
4936 );
4937 }
4938 (
4939 cu29::config::read_configuration_str(original_config, None)?,
4940 RuntimeLifecycleConfigSource::BundledDefault,
4941 )
4942 };
4943 }
4944 } else {
4945 quote! {
4946 let _ = instance_id;
4947 let (config, config_source) = if let Some(overridden_config) = config_override {
4948 debug!("CuConfig: Overridden programmatically.");
4949 (overridden_config, RuntimeLifecycleConfigSource::ProgrammaticOverride)
4950 } else if ::std::path::Path::new(config_filename).exists() {
4951 debug!("CuConfig: Reading configuration from file: {}", config_filename);
4952 (
4953 cu29::config::read_configuration(config_filename)?,
4954 RuntimeLifecycleConfigSource::ExternalFile,
4955 )
4956 } else {
4957 let original_config = Self::original_config();
4958 debug!("CuConfig: Using the bundled configuration compiled into the binary.");
4959 (
4960 cu29::config::read_configuration_str(original_config, None)?,
4961 RuntimeLifecycleConfigSource::BundledDefault,
4962 )
4963 };
4964 }
4965 }
4966 } else {
4967 quote! {
4968 let original_config = Self::original_config();
4970 debug!("CuConfig: Using the bundled configuration compiled into the binary.");
4971 let config = cu29::config::read_configuration_str(original_config, None)?;
4972 let config_source = RuntimeLifecycleConfigSource::BundledDefault;
4973 }
4974 }
4975}
4976
4977fn config_full_path(config_file: &str) -> String {
4978 config_full_path_from_root(&utils::caller_crate_root(), config_file)
4979}
4980
4981fn config_full_path_from_root(caller_root: &Path, config_file: &str) -> String {
4982 let mut config_full_path = caller_root.to_path_buf();
4983 config_full_path.push(config_file);
4984 let filename = config_full_path
4985 .as_os_str()
4986 .to_str()
4987 .expect("Could not interpret the config file name");
4988 filename.to_string()
4989}
4990
4991fn read_config(config_file: &str) -> CuResult<CuConfig> {
4992 let filename = config_full_path(config_file);
4993 read_configuration(filename.as_str())
4994}
4995
4996fn extract_tasks_output_types(graph: &CuGraph) -> Vec<Option<Type>> {
4997 graph
4998 .get_all_nodes()
4999 .iter()
5000 .map(|(_, node)| {
5001 let id = node.get_id();
5002 let type_str = graph.get_node_output_msg_type(id.as_str());
5003 type_str.map(|type_str| {
5004 parse_str::<Type>(type_str.as_str()).expect("Could not parse output message type.")
5005 })
5006 })
5007 .collect()
5008}
5009
5010struct CuTaskSpecSet {
5011 pub ids: Vec<String>,
5012 pub cutypes: Vec<CuTaskType>,
5013 pub background_flags: Vec<bool>,
5014 pub logging_enabled: Vec<bool>,
5015 pub type_names: Vec<String>,
5016 pub task_types: Vec<Type>,
5017 pub instantiation_types: Vec<Type>,
5018 pub sim_task_types: Vec<Type>,
5019 pub run_in_sim_flags: Vec<bool>,
5020 #[allow(dead_code)]
5021 pub output_types: Vec<Option<Type>>,
5022 pub node_id_to_task_index: Vec<Option<usize>>,
5023}
5024
5025impl CuTaskSpecSet {
5026 pub fn from_graph(graph: &CuGraph) -> Self {
5027 let all_id_nodes: Vec<(NodeId, &Node)> = graph
5028 .get_all_nodes()
5029 .into_iter()
5030 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
5031 .collect();
5032
5033 let ids = all_id_nodes
5034 .iter()
5035 .map(|(_, node)| node.get_id().to_string())
5036 .collect();
5037
5038 let cutypes = all_id_nodes
5039 .iter()
5040 .map(|(id, _)| find_task_type_for_id(graph, *id))
5041 .collect();
5042
5043 let background_flags: Vec<bool> = all_id_nodes
5044 .iter()
5045 .map(|(_, node)| node.is_background())
5046 .collect();
5047
5048 let logging_enabled: Vec<bool> = all_id_nodes
5049 .iter()
5050 .map(|(_, node)| node.is_logging_enabled())
5051 .collect();
5052
5053 let type_names: Vec<String> = all_id_nodes
5054 .iter()
5055 .map(|(_, node)| node.get_type().to_string())
5056 .collect();
5057
5058 let output_types = extract_tasks_output_types(graph);
5059
5060 let task_types = type_names
5061 .iter()
5062 .zip(background_flags.iter())
5063 .zip(output_types.iter())
5064 .map(|((name, &background), output_type)| {
5065 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
5066 panic!("Could not transform {name} into a Task Rust type: {error}");
5067 });
5068 if background {
5069 if let Some(output_type) = output_type {
5070 parse_quote!(CuAsyncTask<#name_type, #output_type>)
5071 } else {
5072 panic!("{name}: If a task is background, it has to have an output");
5073 }
5074 } else {
5075 name_type
5076 }
5077 })
5078 .collect();
5079
5080 let instantiation_types = type_names
5081 .iter()
5082 .zip(background_flags.iter())
5083 .zip(output_types.iter())
5084 .map(|((name, &background), output_type)| {
5085 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
5086 panic!("Could not transform {name} into a Task Rust type: {error}");
5087 });
5088 if background {
5089 if let Some(output_type) = output_type {
5090 parse_quote!(CuAsyncTask::<#name_type, #output_type>)
5091 } else {
5092 panic!("{name}: If a task is background, it has to have an output");
5093 }
5094 } else {
5095 name_type
5096 }
5097 })
5098 .collect();
5099
5100 let sim_task_types = type_names
5101 .iter()
5102 .map(|name| {
5103 parse_str::<Type>(name).unwrap_or_else(|err| {
5104 eprintln!("Could not transform {name} into a Task Rust type.");
5105 panic!("{err}")
5106 })
5107 })
5108 .collect();
5109
5110 let run_in_sim_flags = all_id_nodes
5111 .iter()
5112 .map(|(_, node)| node.is_run_in_sim())
5113 .collect();
5114
5115 let mut node_id_to_task_index = vec![None; graph.node_count()];
5116 for (index, (node_id, _)) in all_id_nodes.iter().enumerate() {
5117 node_id_to_task_index[*node_id as usize] = Some(index);
5118 }
5119
5120 Self {
5121 ids,
5122 cutypes,
5123 background_flags,
5124 logging_enabled,
5125 type_names,
5126 task_types,
5127 instantiation_types,
5128 sim_task_types,
5129 run_in_sim_flags,
5130 output_types,
5131 node_id_to_task_index,
5132 }
5133 }
5134}
5135
5136#[derive(Clone)]
5137struct OutputPack {
5138 msg_types: Vec<Type>,
5139}
5140
5141impl OutputPack {
5142 fn slot_type(&self) -> Type {
5143 build_output_slot_type(&self.msg_types)
5144 }
5145
5146 fn is_multi(&self) -> bool {
5147 self.msg_types.len() > 1
5148 }
5149}
5150
5151fn build_output_slot_type(msg_types: &[Type]) -> Type {
5152 if msg_types.is_empty() {
5153 parse_quote! { () }
5154 } else if msg_types.len() == 1 {
5155 let msg_type = msg_types.first().unwrap();
5156 parse_quote! { CuMsg<#msg_type> }
5157 } else {
5158 parse_quote! { ( #( CuMsg<#msg_types> ),* ) }
5159 }
5160}
5161
5162fn flatten_slot_origin_ids(
5163 output_packs: &[OutputPack],
5164 slot_origin_ids: Vec<Option<String>>,
5165) -> Vec<String> {
5166 let mut ids = Vec::new();
5167 for (slot, pack) in output_packs.iter().enumerate() {
5168 if pack.msg_types.is_empty() {
5169 continue;
5170 }
5171 let origin = slot_origin_ids
5172 .get(slot)
5173 .and_then(|origin| origin.as_ref())
5174 .unwrap_or_else(|| panic!("Missing slot origin id for copperlist output slot {slot}"));
5175 for _ in 0..pack.msg_types.len() {
5176 ids.push(origin.clone());
5177 }
5178 }
5179 ids
5180}
5181
5182fn extract_output_packs(runtime_plan: &CuExecutionLoop) -> Vec<OutputPack> {
5183 let mut packs: Vec<(u32, OutputPack)> = runtime_plan
5184 .steps
5185 .iter()
5186 .filter_map(|unit| match unit {
5187 CuExecutionUnit::Step(step) => {
5188 if let Some(output_pack) = &step.output_msg_pack {
5189 let msg_types: Vec<Type> = output_pack
5190 .msg_types
5191 .iter()
5192 .map(|output_msg_type| {
5193 parse_str::<Type>(output_msg_type.as_str()).unwrap_or_else(|_| {
5194 panic!(
5195 "Could not transform {output_msg_type} into a message Rust type."
5196 )
5197 })
5198 })
5199 .collect();
5200 Some((output_pack.culist_index, OutputPack { msg_types }))
5201 } else {
5202 None
5203 }
5204 }
5205 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
5206 })
5207 .collect();
5208
5209 packs.sort_by_key(|(index, _)| *index);
5210 packs.into_iter().map(|(_, pack)| pack).collect()
5211}
5212
5213fn collect_output_pack_sizes(runtime_plan: &CuExecutionLoop) -> Vec<usize> {
5214 let mut sizes: Vec<(u32, usize)> = runtime_plan
5215 .steps
5216 .iter()
5217 .filter_map(|unit| match unit {
5218 CuExecutionUnit::Step(step) => step
5219 .output_msg_pack
5220 .as_ref()
5221 .map(|output_pack| (output_pack.culist_index, output_pack.msg_types.len())),
5222 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
5223 })
5224 .collect();
5225
5226 sizes.sort_by_key(|(index, _)| *index);
5227 sizes.into_iter().map(|(_, size)| size).collect()
5228}
5229
5230fn build_culist_tuple(slot_types: &[Type]) -> TypeTuple {
5232 if slot_types.is_empty() {
5233 parse_quote! { () }
5234 } else {
5235 parse_quote! { ( #( #slot_types ),* ) }
5236 }
5237}
5238
5239fn build_culist_tuple_encode(output_packs: &[OutputPack]) -> ItemImpl {
5241 let mut flat_idx = 0usize;
5242 let mut encode_fields = Vec::new();
5243
5244 for (slot_idx, pack) in output_packs.iter().enumerate() {
5245 let slot_index = syn::Index::from(slot_idx);
5246 if pack.is_multi() {
5247 for port_idx in 0..pack.msg_types.len() {
5248 let port_index = syn::Index::from(port_idx);
5249 let cache_index = flat_idx;
5250 flat_idx += 1;
5251 encode_fields.push(quote! {
5252 __cu_capture.select_slot(#cache_index);
5253 self.0.#slot_index.#port_index.encode(encoder)?;
5254 });
5255 }
5256 } else {
5257 let cache_index = flat_idx;
5258 flat_idx += 1;
5259 encode_fields.push(quote! {
5260 __cu_capture.select_slot(#cache_index);
5261 self.0.#slot_index.encode(encoder)?;
5262 });
5263 }
5264 }
5265
5266 parse_quote! {
5267 impl Encode for CuStampedDataSet {
5268 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
5269 let __cu_capture = cu29::monitoring::start_copperlist_io_capture(&self.1);
5270 #(#encode_fields)*
5271 Ok(())
5272 }
5273 }
5274 }
5275}
5276
5277fn build_culist_tuple_decode(slot_types: &[Type], cumsg_count: usize) -> ItemImpl {
5279 let indices: Vec<usize> = (0..slot_types.len()).collect();
5280
5281 let decode_fields: Vec<_> = indices
5282 .iter()
5283 .map(|i| {
5284 let slot_type = &slot_types[*i];
5285 quote! { <#slot_type as Decode<()>>::decode(decoder)? }
5286 })
5287 .collect();
5288
5289 parse_quote! {
5290 impl Decode<()> for CuStampedDataSet {
5291 fn decode<D: Decoder<Context=()>>(decoder: &mut D) -> Result<Self, DecodeError> {
5292 Ok(CuStampedDataSet(
5293 (
5294 #(#decode_fields),*
5295 ),
5296 cu29::monitoring::CuMsgIoCache::<#cumsg_count>::default(),
5297 ))
5298 }
5299 }
5300 }
5301}
5302
5303fn build_culist_erasedcumsgs(output_packs: &[OutputPack]) -> ItemImpl {
5304 let mut casted_fields: Vec<proc_macro2::TokenStream> = Vec::new();
5305 for (idx, pack) in output_packs.iter().enumerate() {
5306 let slot_index = syn::Index::from(idx);
5307 if pack.is_multi() {
5308 for port_idx in 0..pack.msg_types.len() {
5309 let port_index = syn::Index::from(port_idx);
5310 casted_fields.push(quote! {
5311 &self.0.#slot_index.#port_index as &dyn ErasedCuStampedData
5312 });
5313 }
5314 } else {
5315 casted_fields.push(quote! { &self.0.#slot_index as &dyn ErasedCuStampedData });
5316 }
5317 }
5318 parse_quote! {
5319 impl ErasedCuStampedDataSet for CuStampedDataSet {
5320 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
5321 vec![
5322 #(#casted_fields),*
5323 ]
5324 }
5325 }
5326 }
5327}
5328
5329fn build_culist_tuple_debug(slot_types: &[Type]) -> ItemImpl {
5330 let indices: Vec<usize> = (0..slot_types.len()).collect();
5331
5332 let debug_fields: Vec<_> = indices
5333 .iter()
5334 .map(|i| {
5335 let idx = syn::Index::from(*i);
5336 quote! { .field(&self.0.#idx) }
5337 })
5338 .collect();
5339
5340 parse_quote! {
5341 impl Debug for CuStampedDataSet {
5342 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
5343 f.debug_tuple("CuStampedDataSet")
5344 #(#debug_fields)*
5345 .finish()
5346 }
5347 }
5348 }
5349}
5350
5351fn build_culist_tuple_serialize(slot_types: &[Type]) -> ItemImpl {
5353 let indices: Vec<usize> = (0..slot_types.len()).collect();
5354 let tuple_len = slot_types.len();
5355
5356 let serialize_fields: Vec<_> = indices
5358 .iter()
5359 .map(|i| {
5360 let idx = syn::Index::from(*i);
5361 quote! { &self.0.#idx }
5362 })
5363 .collect();
5364
5365 parse_quote! {
5366 impl Serialize for CuStampedDataSet {
5367 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
5368 where
5369 S: serde::Serializer,
5370 {
5371 use serde::ser::SerializeTuple;
5372 let mut tuple = serializer.serialize_tuple(#tuple_len)?;
5373 #(tuple.serialize_element(#serialize_fields)?;)*
5374 tuple.end()
5375 }
5376 }
5377 }
5378}
5379
5380fn build_culist_tuple_default(slot_types: &[Type], cumsg_count: usize) -> ItemImpl {
5382 let default_fields: Vec<_> = slot_types
5383 .iter()
5384 .map(|slot_type| quote! { <#slot_type as Default>::default() })
5385 .collect();
5386
5387 parse_quote! {
5388 impl Default for CuStampedDataSet {
5389 fn default() -> CuStampedDataSet
5390 {
5391 CuStampedDataSet(
5392 (
5393 #(#default_fields),*
5394 ),
5395 cu29::monitoring::CuMsgIoCache::<#cumsg_count>::default(),
5396 )
5397 }
5398 }
5399 }
5400}
5401
5402fn collect_bridge_channel_usage(graph: &CuGraph) -> HashMap<BridgeChannelKey, String> {
5403 let mut usage = HashMap::new();
5404 for cnx in graph.edges() {
5405 if let Some(channel) = &cnx.src_channel {
5406 let key = BridgeChannelKey {
5407 bridge_id: cnx.src.clone(),
5408 channel_id: channel.clone(),
5409 direction: BridgeChannelDirection::Rx,
5410 };
5411 usage
5412 .entry(key)
5413 .and_modify(|msg| {
5414 if msg != &cnx.msg {
5415 panic!(
5416 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
5417 cnx.src, channel, msg, cnx.msg
5418 );
5419 }
5420 })
5421 .or_insert(cnx.msg.clone());
5422 }
5423 if let Some(channel) = &cnx.dst_channel {
5424 let key = BridgeChannelKey {
5425 bridge_id: cnx.dst.clone(),
5426 channel_id: channel.clone(),
5427 direction: BridgeChannelDirection::Tx,
5428 };
5429 usage
5430 .entry(key)
5431 .and_modify(|msg| {
5432 if msg != &cnx.msg {
5433 panic!(
5434 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
5435 cnx.dst, channel, msg, cnx.msg
5436 );
5437 }
5438 })
5439 .or_insert(cnx.msg.clone());
5440 }
5441 }
5442 usage
5443}
5444
5445fn build_bridge_specs(
5446 config: &CuConfig,
5447 graph: &CuGraph,
5448 channel_usage: &HashMap<BridgeChannelKey, String>,
5449) -> Vec<BridgeSpec> {
5450 let mut specs = Vec::new();
5451 for (bridge_index, bridge_cfg) in config.bridges.iter().enumerate() {
5452 if graph.get_node_id_by_name(bridge_cfg.id.as_str()).is_none() {
5453 continue;
5454 }
5455
5456 let type_path = parse_str::<Type>(bridge_cfg.type_.as_str()).unwrap_or_else(|err| {
5457 panic!(
5458 "Could not parse bridge type '{}' for '{}': {err}",
5459 bridge_cfg.type_, bridge_cfg.id
5460 )
5461 });
5462
5463 let mut rx_channels = Vec::new();
5464 let mut tx_channels = Vec::new();
5465
5466 for (channel_index, channel) in bridge_cfg.channels.iter().enumerate() {
5467 match channel {
5468 BridgeChannelConfigRepresentation::Rx { id, .. } => {
5469 let key = BridgeChannelKey {
5470 bridge_id: bridge_cfg.id.clone(),
5471 channel_id: id.clone(),
5472 direction: BridgeChannelDirection::Rx,
5473 };
5474 if let Some(msg_type) = channel_usage.get(&key) {
5475 let msg_type_name = msg_type.clone();
5476 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
5477 panic!(
5478 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
5479 bridge_cfg.id, id
5480 )
5481 });
5482 let const_ident =
5483 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
5484 rx_channels.push(BridgeChannelSpec {
5485 id: id.clone(),
5486 const_ident,
5487 msg_type,
5488 msg_type_name,
5489 config_index: channel_index,
5490 plan_node_id: None,
5491 culist_index: None,
5492 monitor_index: None,
5493 });
5494 }
5495 }
5496 BridgeChannelConfigRepresentation::Tx { id, .. } => {
5497 let key = BridgeChannelKey {
5498 bridge_id: bridge_cfg.id.clone(),
5499 channel_id: id.clone(),
5500 direction: BridgeChannelDirection::Tx,
5501 };
5502 if let Some(msg_type) = channel_usage.get(&key) {
5503 let msg_type_name = msg_type.clone();
5504 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
5505 panic!(
5506 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
5507 bridge_cfg.id, id
5508 )
5509 });
5510 let const_ident =
5511 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
5512 tx_channels.push(BridgeChannelSpec {
5513 id: id.clone(),
5514 const_ident,
5515 msg_type,
5516 msg_type_name,
5517 config_index: channel_index,
5518 plan_node_id: None,
5519 culist_index: None,
5520 monitor_index: None,
5521 });
5522 }
5523 }
5524 }
5525 }
5526
5527 if rx_channels.is_empty() && tx_channels.is_empty() {
5528 continue;
5529 }
5530
5531 specs.push(BridgeSpec {
5532 id: bridge_cfg.id.clone(),
5533 type_path,
5534 run_in_sim: bridge_cfg.is_run_in_sim(),
5535 config_index: bridge_index,
5536 tuple_index: 0,
5537 monitor_index: None,
5538 rx_channels,
5539 tx_channels,
5540 });
5541 }
5542
5543 for (tuple_index, spec) in specs.iter_mut().enumerate() {
5544 spec.tuple_index = tuple_index;
5545 }
5546
5547 specs
5548}
5549
5550fn collect_task_names(graph: &CuGraph) -> Vec<(NodeId, String, String)> {
5551 graph
5552 .get_all_nodes()
5553 .iter()
5554 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
5555 .map(|(node_id, node)| {
5556 (
5557 *node_id,
5558 node.get_id().to_string(),
5559 config_id_to_struct_member(node.get_id().as_str()),
5560 )
5561 })
5562 .collect()
5563}
5564
5565#[derive(Clone, Copy)]
5566enum ResourceOwner {
5567 Task(usize),
5568 Bridge(usize),
5569}
5570
5571#[derive(Clone)]
5572struct ResourceKeySpec {
5573 bundle_index: usize,
5574 provider_path: syn::Path,
5575 resource_name: String,
5576 binding_name: String,
5577 owner: ResourceOwner,
5578}
5579
5580fn parse_resource_path(path: &str) -> CuResult<(String, String)> {
5581 let (bundle_id, name) = path.split_once('.').ok_or_else(|| {
5582 CuError::from(format!(
5583 "Resource '{path}' is missing a bundle prefix (expected bundle.resource)"
5584 ))
5585 })?;
5586
5587 if bundle_id.is_empty() || name.is_empty() {
5588 return Err(CuError::from(format!(
5589 "Resource '{path}' must use the 'bundle.resource' format"
5590 )));
5591 }
5592
5593 Ok((bundle_id.to_string(), name.to_string()))
5594}
5595
5596fn collect_resource_specs(
5597 graph: &CuGraph,
5598 task_specs: &CuTaskSpecSet,
5599 bridge_specs: &[BridgeSpec],
5600 bundle_specs: &[BundleSpec],
5601) -> CuResult<Vec<ResourceKeySpec>> {
5602 let mut bridge_lookup: BTreeMap<String, usize> = BTreeMap::new();
5603 for (idx, spec) in bridge_specs.iter().enumerate() {
5604 bridge_lookup.insert(spec.id.clone(), idx);
5605 }
5606
5607 let mut bundle_lookup: HashMap<String, (usize, syn::Path)> = HashMap::new();
5608 for (index, bundle) in bundle_specs.iter().enumerate() {
5609 bundle_lookup.insert(bundle.id.clone(), (index, bundle.provider_path.clone()));
5610 }
5611
5612 let mut specs = Vec::new();
5613
5614 for (node_id, node) in graph.get_all_nodes() {
5615 let resources = node.get_resources();
5616 if let Some(resources) = resources {
5617 let task_index = task_specs.node_id_to_task_index[node_id as usize];
5618 let owner = if let Some(task_index) = task_index {
5619 ResourceOwner::Task(task_index)
5620 } else if node.get_flavor() == Flavor::Bridge {
5621 let bridge_index = bridge_lookup.get(&node.get_id()).ok_or_else(|| {
5622 CuError::from(format!(
5623 "Resource mapping attached to unknown bridge node '{}'",
5624 node.get_id()
5625 ))
5626 })?;
5627 ResourceOwner::Bridge(*bridge_index)
5628 } else {
5629 return Err(CuError::from(format!(
5630 "Resource mapping attached to non-task node '{}'",
5631 node.get_id()
5632 )));
5633 };
5634
5635 for (binding_name, path) in resources {
5636 let (bundle_id, resource_name) = parse_resource_path(path)?;
5637 let (bundle_index, provider_path) =
5638 bundle_lookup.get(&bundle_id).ok_or_else(|| {
5639 CuError::from(format!(
5640 "Resource '{}' references unknown bundle '{}'",
5641 path, bundle_id
5642 ))
5643 })?;
5644 specs.push(ResourceKeySpec {
5645 bundle_index: *bundle_index,
5646 provider_path: provider_path.clone(),
5647 resource_name,
5648 binding_name: binding_name.clone(),
5649 owner,
5650 });
5651 }
5652 }
5653 }
5654
5655 Ok(specs)
5656}
5657
5658fn build_bundle_list<'a>(config: &'a CuConfig, mission: &str) -> Vec<&'a ResourceBundleConfig> {
5659 config
5660 .resources
5661 .iter()
5662 .filter(|bundle| {
5663 bundle
5664 .missions
5665 .as_ref()
5666 .is_none_or(|missions| missions.iter().any(|m| m == mission))
5667 })
5668 .collect()
5669}
5670
5671struct BundleSpec {
5672 id: String,
5673 provider_path: syn::Path,
5674}
5675
5676fn build_bundle_specs(config: &CuConfig, mission: &str) -> CuResult<Vec<BundleSpec>> {
5677 build_bundle_list(config, mission)
5678 .into_iter()
5679 .map(|bundle| {
5680 let provider_path: syn::Path =
5681 syn::parse_str(bundle.provider.as_str()).map_err(|err| {
5682 CuError::from(format!(
5683 "Failed to parse provider path '{}' for bundle '{}': {err}",
5684 bundle.provider, bundle.id
5685 ))
5686 })?;
5687 Ok(BundleSpec {
5688 id: bundle.id.clone(),
5689 provider_path,
5690 })
5691 })
5692 .collect()
5693}
5694
5695fn build_resources_module(
5696 bundle_specs: &[BundleSpec],
5697) -> CuResult<(proc_macro2::TokenStream, proc_macro2::TokenStream)> {
5698 let bundle_consts = bundle_specs.iter().enumerate().map(|(index, bundle)| {
5699 let const_ident = Ident::new(
5700 &config_id_to_bridge_const(bundle.id.as_str()),
5701 Span::call_site(),
5702 );
5703 quote! { pub const #const_ident: BundleIndex = BundleIndex::new(#index); }
5704 });
5705
5706 let resources_module = quote! {
5707 pub mod resources {
5708 #![allow(dead_code)]
5709 use cu29::resource::BundleIndex;
5710
5711 pub mod bundles {
5712 use super::BundleIndex;
5713 #(#bundle_consts)*
5714 }
5715 }
5716 };
5717
5718 let bundle_counts = bundle_specs.iter().map(|bundle| {
5719 let provider_path = &bundle.provider_path;
5720 quote! { <#provider_path as cu29::resource::ResourceBundleDecl>::Id::COUNT }
5721 });
5722
5723 let bundle_inits = bundle_specs
5724 .iter()
5725 .enumerate()
5726 .map(|(index, bundle)| {
5727 let bundle_id = LitStr::new(bundle.id.as_str(), Span::call_site());
5728 let provider_path = &bundle.provider_path;
5729 quote! {
5730 let bundle_cfg = config
5731 .resources
5732 .iter()
5733 .find(|b| b.id == #bundle_id)
5734 .unwrap_or_else(|| panic!("Resource bundle '{}' missing from configuration", #bundle_id));
5735 let bundle_ctx = cu29::resource::BundleContext::<#provider_path>::new(
5736 cu29::resource::BundleIndex::new(#index),
5737 #bundle_id,
5738 );
5739 <#provider_path as cu29::resource::ResourceBundle>::build(
5740 bundle_ctx,
5741 bundle_cfg.config.as_ref(),
5742 &mut manager,
5743 )?;
5744 }
5745 })
5746 .collect::<Vec<_>>();
5747
5748 let resources_instanciator = quote! {
5749 pub fn resources_instanciator(config: &CuConfig) -> CuResult<cu29::resource::ResourceManager> {
5750 let bundle_counts: &[usize] = &[ #(#bundle_counts),* ];
5751 let mut manager = cu29::resource::ResourceManager::new(bundle_counts);
5752 #(#bundle_inits)*
5753 Ok(manager)
5754 }
5755 };
5756
5757 Ok((resources_module, resources_instanciator))
5758}
5759
5760struct ResourceMappingTokens {
5761 defs: proc_macro2::TokenStream,
5762 refs: Vec<proc_macro2::TokenStream>,
5763}
5764
5765fn build_task_resource_mappings(
5766 resource_specs: &[ResourceKeySpec],
5767 task_specs: &CuTaskSpecSet,
5768) -> CuResult<ResourceMappingTokens> {
5769 let mut per_task: Vec<Vec<&ResourceKeySpec>> = vec![Vec::new(); task_specs.ids.len()];
5770
5771 for spec in resource_specs {
5772 let ResourceOwner::Task(task_index) = spec.owner else {
5773 continue;
5774 };
5775 per_task
5776 .get_mut(task_index)
5777 .ok_or_else(|| {
5778 CuError::from(format!(
5779 "Resource '{}' mapped to invalid task index {}",
5780 spec.binding_name, task_index
5781 ))
5782 })?
5783 .push(spec);
5784 }
5785
5786 let mut mapping_defs = Vec::new();
5787 let mut mapping_refs = Vec::new();
5788
5789 for (idx, entries) in per_task.iter().enumerate() {
5790 if entries.is_empty() {
5791 mapping_refs.push(quote! { None });
5792 continue;
5793 }
5794
5795 let binding_task_type = if task_specs.background_flags[idx] {
5796 &task_specs.sim_task_types[idx]
5797 } else {
5798 &task_specs.task_types[idx]
5799 };
5800
5801 let binding_trait = match task_specs.cutypes[idx] {
5802 CuTaskType::Source => quote! { CuSrcTask },
5803 CuTaskType::Regular => quote! { CuTask },
5804 CuTaskType::Sink => quote! { CuSinkTask },
5805 };
5806
5807 let entries_ident = format_ident!("TASK{}_RES_ENTRIES", idx);
5808 let map_ident = format_ident!("TASK{}_RES_MAPPING", idx);
5809 let binding_type = quote! {
5810 <<#binding_task_type as #binding_trait>::Resources<'_> as ResourceBindings>::Binding
5811 };
5812 let entry_tokens = entries.iter().map(|spec| {
5813 let binding_ident =
5814 Ident::new(&config_id_to_enum(spec.binding_name.as_str()), Span::call_site());
5815 let resource_ident =
5816 Ident::new(&config_id_to_enum(spec.resource_name.as_str()), Span::call_site());
5817 let bundle_index = spec.bundle_index;
5818 let provider_path = &spec.provider_path;
5819 quote! {
5820 (#binding_type::#binding_ident, cu29::resource::ResourceKey::new(
5821 cu29::resource::BundleIndex::new(#bundle_index),
5822 <#provider_path as cu29::resource::ResourceBundleDecl>::Id::#resource_ident as usize,
5823 ))
5824 }
5825 });
5826
5827 mapping_defs.push(quote! {
5828 const #entries_ident: &[(#binding_type, cu29::resource::ResourceKey)] = &[ #(#entry_tokens),* ];
5829 const #map_ident: cu29::resource::ResourceBindingMap<#binding_type> =
5830 cu29::resource::ResourceBindingMap::new(#entries_ident);
5831 });
5832 mapping_refs.push(quote! { Some(&#map_ident) });
5833 }
5834
5835 Ok(ResourceMappingTokens {
5836 defs: quote! { #(#mapping_defs)* },
5837 refs: mapping_refs,
5838 })
5839}
5840
5841fn build_bridge_resource_mappings(
5842 resource_specs: &[ResourceKeySpec],
5843 bridge_specs: &[BridgeSpec],
5844 sim_mode: bool,
5845) -> ResourceMappingTokens {
5846 let mut per_bridge: Vec<Vec<&ResourceKeySpec>> = vec![Vec::new(); bridge_specs.len()];
5847
5848 for spec in resource_specs {
5849 let ResourceOwner::Bridge(bridge_index) = spec.owner else {
5850 continue;
5851 };
5852 if sim_mode && !bridge_specs[bridge_index].run_in_sim {
5853 continue;
5854 }
5855 per_bridge[bridge_index].push(spec);
5856 }
5857
5858 let mut mapping_defs = Vec::new();
5859 let mut mapping_refs = Vec::new();
5860
5861 for (idx, entries) in per_bridge.iter().enumerate() {
5862 if entries.is_empty() {
5863 mapping_refs.push(quote! { None });
5864 continue;
5865 }
5866
5867 let bridge_type = &bridge_specs[idx].type_path;
5868 let binding_type = quote! {
5869 <<#bridge_type as cu29::cubridge::CuBridge>::Resources<'_> as ResourceBindings>::Binding
5870 };
5871 let entries_ident = format_ident!("BRIDGE{}_RES_ENTRIES", idx);
5872 let map_ident = format_ident!("BRIDGE{}_RES_MAPPING", idx);
5873 let entry_tokens = entries.iter().map(|spec| {
5874 let binding_ident =
5875 Ident::new(&config_id_to_enum(spec.binding_name.as_str()), Span::call_site());
5876 let resource_ident =
5877 Ident::new(&config_id_to_enum(spec.resource_name.as_str()), Span::call_site());
5878 let bundle_index = spec.bundle_index;
5879 let provider_path = &spec.provider_path;
5880 quote! {
5881 (#binding_type::#binding_ident, cu29::resource::ResourceKey::new(
5882 cu29::resource::BundleIndex::new(#bundle_index),
5883 <#provider_path as cu29::resource::ResourceBundleDecl>::Id::#resource_ident as usize,
5884 ))
5885 }
5886 });
5887
5888 mapping_defs.push(quote! {
5889 const #entries_ident: &[(#binding_type, cu29::resource::ResourceKey)] = &[ #(#entry_tokens),* ];
5890 const #map_ident: cu29::resource::ResourceBindingMap<#binding_type> =
5891 cu29::resource::ResourceBindingMap::new(#entries_ident);
5892 });
5893 mapping_refs.push(quote! { Some(&#map_ident) });
5894 }
5895
5896 ResourceMappingTokens {
5897 defs: quote! { #(#mapping_defs)* },
5898 refs: mapping_refs,
5899 }
5900}
5901
5902fn build_execution_plan(
5903 graph: &CuGraph,
5904 task_specs: &CuTaskSpecSet,
5905 bridge_specs: &mut [BridgeSpec],
5906) -> CuResult<(
5907 CuExecutionLoop,
5908 Vec<ExecutionEntity>,
5909 HashMap<NodeId, NodeId>,
5910)> {
5911 let mut plan_graph = CuGraph::default();
5912 let mut exec_entities = Vec::new();
5913 let mut original_to_plan = HashMap::new();
5914 let mut plan_to_original = HashMap::new();
5915 let mut name_to_original = HashMap::new();
5916 let mut channel_nodes = HashMap::new();
5917
5918 for (node_id, node) in graph.get_all_nodes() {
5919 name_to_original.insert(node.get_id(), node_id);
5920 if node.get_flavor() != Flavor::Task {
5921 continue;
5922 }
5923 let plan_node_id = plan_graph.add_node(node.clone())?;
5924 let task_index = task_specs.node_id_to_task_index[node_id as usize]
5925 .expect("Task missing from specifications");
5926 plan_to_original.insert(plan_node_id, node_id);
5927 original_to_plan.insert(node_id, plan_node_id);
5928 if plan_node_id as usize != exec_entities.len() {
5929 panic!("Unexpected node ordering while mirroring tasks in plan graph");
5930 }
5931 exec_entities.push(ExecutionEntity {
5932 kind: ExecutionEntityKind::Task { task_index },
5933 });
5934 }
5935
5936 for (bridge_index, spec) in bridge_specs.iter_mut().enumerate() {
5937 for (channel_index, channel_spec) in spec.rx_channels.iter_mut().enumerate() {
5938 let mut node = Node::new(
5939 format!("{}::rx::{}", spec.id, channel_spec.id).as_str(),
5940 "__CuBridgeRxChannel",
5941 );
5942 node.set_flavor(Flavor::Bridge);
5943 let plan_node_id = plan_graph.add_node(node)?;
5944 if plan_node_id as usize != exec_entities.len() {
5945 panic!("Unexpected node ordering while inserting bridge rx channel");
5946 }
5947 channel_spec.plan_node_id = Some(plan_node_id);
5948 exec_entities.push(ExecutionEntity {
5949 kind: ExecutionEntityKind::BridgeRx {
5950 bridge_index,
5951 channel_index,
5952 },
5953 });
5954 channel_nodes.insert(
5955 BridgeChannelKey {
5956 bridge_id: spec.id.clone(),
5957 channel_id: channel_spec.id.clone(),
5958 direction: BridgeChannelDirection::Rx,
5959 },
5960 plan_node_id,
5961 );
5962 }
5963
5964 for (channel_index, channel_spec) in spec.tx_channels.iter_mut().enumerate() {
5965 let mut node = Node::new(
5966 format!("{}::tx::{}", spec.id, channel_spec.id).as_str(),
5967 "__CuBridgeTxChannel",
5968 );
5969 node.set_flavor(Flavor::Bridge);
5970 let plan_node_id = plan_graph.add_node(node)?;
5971 if plan_node_id as usize != exec_entities.len() {
5972 panic!("Unexpected node ordering while inserting bridge tx channel");
5973 }
5974 channel_spec.plan_node_id = Some(plan_node_id);
5975 exec_entities.push(ExecutionEntity {
5976 kind: ExecutionEntityKind::BridgeTx {
5977 bridge_index,
5978 channel_index,
5979 },
5980 });
5981 channel_nodes.insert(
5982 BridgeChannelKey {
5983 bridge_id: spec.id.clone(),
5984 channel_id: channel_spec.id.clone(),
5985 direction: BridgeChannelDirection::Tx,
5986 },
5987 plan_node_id,
5988 );
5989 }
5990 }
5991
5992 for cnx in graph.edges() {
5993 let src_plan = if let Some(channel) = &cnx.src_channel {
5994 let key = BridgeChannelKey {
5995 bridge_id: cnx.src.clone(),
5996 channel_id: channel.clone(),
5997 direction: BridgeChannelDirection::Rx,
5998 };
5999 *channel_nodes
6000 .get(&key)
6001 .unwrap_or_else(|| panic!("Bridge source {:?} missing from plan graph", key))
6002 } else {
6003 let node_id = name_to_original
6004 .get(&cnx.src)
6005 .copied()
6006 .unwrap_or_else(|| panic!("Unknown source node '{}'", cnx.src));
6007 *original_to_plan
6008 .get(&node_id)
6009 .unwrap_or_else(|| panic!("Source node '{}' missing from plan", cnx.src))
6010 };
6011
6012 let dst_plan = if let Some(channel) = &cnx.dst_channel {
6013 let key = BridgeChannelKey {
6014 bridge_id: cnx.dst.clone(),
6015 channel_id: channel.clone(),
6016 direction: BridgeChannelDirection::Tx,
6017 };
6018 *channel_nodes
6019 .get(&key)
6020 .unwrap_or_else(|| panic!("Bridge destination {:?} missing from plan graph", key))
6021 } else {
6022 let node_id = name_to_original
6023 .get(&cnx.dst)
6024 .copied()
6025 .unwrap_or_else(|| panic!("Unknown destination node '{}'", cnx.dst));
6026 *original_to_plan
6027 .get(&node_id)
6028 .unwrap_or_else(|| panic!("Destination node '{}' missing from plan", cnx.dst))
6029 };
6030
6031 plan_graph
6032 .connect_ext_with_order(
6033 src_plan,
6034 dst_plan,
6035 &cnx.msg,
6036 cnx.missions.clone(),
6037 None,
6038 None,
6039 cnx.order,
6040 )
6041 .map_err(|e| CuError::from(e.to_string()))?;
6042 }
6043
6044 let runtime_plan = compute_runtime_plan(&plan_graph)?;
6045 Ok((runtime_plan, exec_entities, plan_to_original))
6046}
6047
6048fn collect_culist_metadata(
6049 runtime_plan: &CuExecutionLoop,
6050 exec_entities: &[ExecutionEntity],
6051 bridge_specs: &mut [BridgeSpec],
6052 plan_to_original: &HashMap<NodeId, NodeId>,
6053) -> (Vec<usize>, HashMap<NodeId, usize>) {
6054 let mut culist_order = Vec::new();
6055 let mut node_output_positions = HashMap::new();
6056
6057 for unit in &runtime_plan.steps {
6058 if let CuExecutionUnit::Step(step) = unit
6059 && let Some(output_pack) = &step.output_msg_pack
6060 {
6061 let output_idx = output_pack.culist_index;
6062 culist_order.push(output_idx as usize);
6063 match &exec_entities[step.node_id as usize].kind {
6064 ExecutionEntityKind::Task { .. } => {
6065 if let Some(original_node_id) = plan_to_original.get(&step.node_id) {
6066 node_output_positions.insert(*original_node_id, output_idx as usize);
6067 }
6068 }
6069 ExecutionEntityKind::BridgeRx {
6070 bridge_index,
6071 channel_index,
6072 } => {
6073 bridge_specs[*bridge_index].rx_channels[*channel_index].culist_index =
6074 Some(output_idx as usize);
6075 }
6076 ExecutionEntityKind::BridgeTx {
6077 bridge_index,
6078 channel_index,
6079 } => {
6080 bridge_specs[*bridge_index].tx_channels[*channel_index].culist_index =
6081 Some(output_idx as usize);
6082 }
6083 }
6084 }
6085 }
6086
6087 (culist_order, node_output_positions)
6088}
6089
6090fn build_monitor_culist_component_mapping(
6091 runtime_plan: &CuExecutionLoop,
6092 exec_entities: &[ExecutionEntity],
6093 bridge_specs: &[BridgeSpec],
6094) -> Result<Vec<usize>, String> {
6095 let mut mapping = Vec::new();
6096 for unit in &runtime_plan.steps {
6097 if let CuExecutionUnit::Step(step) = unit
6098 && step.output_msg_pack.is_some()
6099 {
6100 let Some(entity) = exec_entities.get(step.node_id as usize) else {
6101 return Err(format!(
6102 "Missing execution entity for plan node {} while building monitor mapping",
6103 step.node_id
6104 ));
6105 };
6106 let component_index = match &entity.kind {
6107 ExecutionEntityKind::Task { task_index } => *task_index,
6108 ExecutionEntityKind::BridgeRx {
6109 bridge_index,
6110 channel_index,
6111 } => bridge_specs
6112 .get(*bridge_index)
6113 .and_then(|spec| spec.rx_channels.get(*channel_index))
6114 .and_then(|channel| channel.monitor_index)
6115 .ok_or_else(|| {
6116 format!(
6117 "Missing monitor index for bridge rx {}:{}",
6118 bridge_index, channel_index
6119 )
6120 })?,
6121 ExecutionEntityKind::BridgeTx {
6122 bridge_index,
6123 channel_index,
6124 } => bridge_specs
6125 .get(*bridge_index)
6126 .and_then(|spec| spec.tx_channels.get(*channel_index))
6127 .and_then(|channel| channel.monitor_index)
6128 .ok_or_else(|| {
6129 format!(
6130 "Missing monitor index for bridge tx {}:{}",
6131 bridge_index, channel_index
6132 )
6133 })?,
6134 };
6135 mapping.push(component_index);
6136 }
6137 }
6138 Ok(mapping)
6139}
6140
6141fn build_parallel_rt_stage_entries(
6142 runtime_plan: &CuExecutionLoop,
6143 exec_entities: &[ExecutionEntity],
6144 task_specs: &CuTaskSpecSet,
6145 bridge_specs: &[BridgeSpec],
6146) -> CuResult<Vec<proc_macro2::TokenStream>> {
6147 let mut entries = Vec::new();
6148
6149 for unit in &runtime_plan.steps {
6150 let CuExecutionUnit::Step(step) = unit else {
6151 todo!("parallel runtime metadata for nested loops is not implemented yet")
6152 };
6153
6154 let entity = exec_entities.get(step.node_id as usize).ok_or_else(|| {
6155 CuError::from(format!(
6156 "Missing execution entity for runtime plan node {} while building parallel runtime metadata",
6157 step.node_id
6158 ))
6159 })?;
6160
6161 let (label, kind_tokens, component_index) = match &entity.kind {
6162 ExecutionEntityKind::Task { task_index } => (
6163 task_specs
6164 .ids
6165 .get(*task_index)
6166 .cloned()
6167 .ok_or_else(|| {
6168 CuError::from(format!(
6169 "Missing task id for task index {} while building parallel runtime metadata",
6170 task_index
6171 ))
6172 })?,
6173 quote! { cu29::parallel_rt::ParallelRtStageKind::Task },
6174 *task_index,
6175 ),
6176 ExecutionEntityKind::BridgeRx {
6177 bridge_index,
6178 channel_index,
6179 } => {
6180 let bridge = bridge_specs.get(*bridge_index).ok_or_else(|| {
6181 CuError::from(format!(
6182 "Missing bridge spec {} while building parallel runtime metadata",
6183 bridge_index
6184 ))
6185 })?;
6186 let channel = bridge.rx_channels.get(*channel_index).ok_or_else(|| {
6187 CuError::from(format!(
6188 "Missing bridge rx channel {}:{} while building parallel runtime metadata",
6189 bridge_index, channel_index
6190 ))
6191 })?;
6192 let component_index = channel.monitor_index.ok_or_else(|| {
6193 CuError::from(format!(
6194 "Missing monitor index for bridge rx {}:{} while building parallel runtime metadata",
6195 bridge_index, channel_index
6196 ))
6197 })?;
6198 (
6199 format!("bridge::{}::rx::{}", bridge.id, channel.id),
6200 quote! { cu29::parallel_rt::ParallelRtStageKind::BridgeRx },
6201 component_index,
6202 )
6203 }
6204 ExecutionEntityKind::BridgeTx {
6205 bridge_index,
6206 channel_index,
6207 } => {
6208 let bridge = bridge_specs.get(*bridge_index).ok_or_else(|| {
6209 CuError::from(format!(
6210 "Missing bridge spec {} while building parallel runtime metadata",
6211 bridge_index
6212 ))
6213 })?;
6214 let channel = bridge.tx_channels.get(*channel_index).ok_or_else(|| {
6215 CuError::from(format!(
6216 "Missing bridge tx channel {}:{} while building parallel runtime metadata",
6217 bridge_index, channel_index
6218 ))
6219 })?;
6220 let component_index = channel.monitor_index.ok_or_else(|| {
6221 CuError::from(format!(
6222 "Missing monitor index for bridge tx {}:{} while building parallel runtime metadata",
6223 bridge_index, channel_index
6224 ))
6225 })?;
6226 (
6227 format!("bridge::{}::tx::{}", bridge.id, channel.id),
6228 quote! { cu29::parallel_rt::ParallelRtStageKind::BridgeTx },
6229 component_index,
6230 )
6231 }
6232 };
6233
6234 let node_id = step.node_id;
6235 entries.push(quote! {
6236 cu29::parallel_rt::ParallelRtStageMetadata::new(
6237 #label,
6238 #kind_tokens,
6239 #node_id,
6240 cu29::monitoring::ComponentId::new(#component_index),
6241 )
6242 });
6243 }
6244
6245 Ok(entries)
6246}
6247
6248#[allow(dead_code)]
6249fn build_monitored_ids(task_ids: &[String], bridge_specs: &mut [BridgeSpec]) -> Vec<String> {
6250 let mut names = task_ids.to_vec();
6251 for spec in bridge_specs.iter_mut() {
6252 spec.monitor_index = Some(names.len());
6253 names.push(format!("bridge::{}", spec.id));
6254 for channel in spec.rx_channels.iter_mut() {
6255 channel.monitor_index = Some(names.len());
6256 names.push(format!("bridge::{}::rx::{}", spec.id, channel.id));
6257 }
6258 for channel in spec.tx_channels.iter_mut() {
6259 channel.monitor_index = Some(names.len());
6260 names.push(format!("bridge::{}::tx::{}", spec.id, channel.id));
6261 }
6262 }
6263 names
6264}
6265
6266fn wrap_process_step_tokens(
6267 wrap_process_step: bool,
6268 body: proc_macro2::TokenStream,
6269) -> proc_macro2::TokenStream {
6270 if wrap_process_step {
6271 quote! {{
6272 let __cu_process_step_result: cu29::curuntime::ProcessStepResult = (|| {
6273 #body
6274 Ok(cu29::curuntime::ProcessStepOutcome::Continue)
6275 })();
6276 __cu_process_step_result
6277 }}
6278 } else {
6279 body
6280 }
6281}
6282
6283fn abort_process_step_tokens(wrap_process_step: bool) -> proc_macro2::TokenStream {
6284 if wrap_process_step {
6285 quote! {
6286 return Ok(cu29::curuntime::ProcessStepOutcome::AbortCopperList);
6287 }
6288 } else {
6289 quote! {
6290 __cu_abort_copperlist = true;
6291 break '__cu_process_steps;
6292 }
6293 }
6294}
6295
6296fn parallel_task_lifecycle_tokens(
6297 task_kind: CuTaskType,
6298 task_type: &Type,
6299 component_index: usize,
6300 mission_mod: &Ident,
6301 task_instance: &proc_macro2::TokenStream,
6302 placement: ParallelLifecyclePlacement,
6303) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
6304 let rt_guard = rtsan_guard_tokens();
6305 let abort_process_step = abort_process_step_tokens(true);
6306 let task_trait = match task_kind {
6307 CuTaskType::Source => quote! { cu29::cutask::CuSrcTask },
6308 CuTaskType::Sink => quote! { cu29::cutask::CuSinkTask },
6309 CuTaskType::Regular => quote! { cu29::cutask::CuTask },
6310 };
6311
6312 let preprocess = if placement.preprocess {
6313 quote! {
6314 execution_probe.record(cu29::monitoring::ExecutionMarker {
6315 component_id: cu29::monitoring::ComponentId::new(#component_index),
6316 step: CuComponentState::Preprocess,
6317 culistid: Some(clid),
6318 });
6319 ctx.set_current_task(#component_index);
6320 let maybe_error = {
6321 #rt_guard
6322 <#task_type as #task_trait>::preprocess(&mut #task_instance, &ctx)
6323 };
6324 if let Err(error) = maybe_error {
6325 let decision = monitor.process_error(
6326 cu29::monitoring::ComponentId::new(#component_index),
6327 CuComponentState::Preprocess,
6328 &error,
6329 );
6330 match decision {
6331 Decision::Abort => {
6332 debug!(
6333 "Preprocess: ABORT decision from monitoring. Component '{}' errored out during preprocess. Aborting CopperList {}.",
6334 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index)),
6335 clid
6336 );
6337 #abort_process_step
6338 }
6339 Decision::Ignore => {
6340 debug!(
6341 "Preprocess: IGNORE decision from monitoring. Component '{}' errored out during preprocess. The runtime will continue.",
6342 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6343 );
6344 }
6345 Decision::Shutdown => {
6346 debug!(
6347 "Preprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during preprocess. The runtime cannot continue.",
6348 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6349 );
6350 return Err(CuError::new_with_cause(
6351 "Component errored out during preprocess.",
6352 error,
6353 ));
6354 }
6355 }
6356 }
6357 }
6358 } else {
6359 quote! {}
6360 };
6361
6362 let postprocess = if placement.postprocess {
6363 quote! {
6364 execution_probe.record(cu29::monitoring::ExecutionMarker {
6365 component_id: cu29::monitoring::ComponentId::new(#component_index),
6366 step: CuComponentState::Postprocess,
6367 culistid: Some(clid),
6368 });
6369 ctx.set_current_task(#component_index);
6370 let maybe_error = {
6371 #rt_guard
6372 <#task_type as #task_trait>::postprocess(&mut #task_instance, &ctx)
6373 };
6374 if let Err(error) = maybe_error {
6375 let decision = monitor.process_error(
6376 cu29::monitoring::ComponentId::new(#component_index),
6377 CuComponentState::Postprocess,
6378 &error,
6379 );
6380 match decision {
6381 Decision::Abort => {
6382 debug!(
6383 "Postprocess: ABORT decision from monitoring. Component '{}' errored out during postprocess. Continuing with the completed CopperList.",
6384 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6385 );
6386 }
6387 Decision::Ignore => {
6388 debug!(
6389 "Postprocess: IGNORE decision from monitoring. Component '{}' errored out during postprocess. The runtime will continue.",
6390 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6391 );
6392 }
6393 Decision::Shutdown => {
6394 debug!(
6395 "Postprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during postprocess. The runtime cannot continue.",
6396 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6397 );
6398 return Err(CuError::new_with_cause(
6399 "Component errored out during postprocess.",
6400 error,
6401 ));
6402 }
6403 }
6404 }
6405 }
6406 } else {
6407 quote! {}
6408 };
6409
6410 (preprocess, postprocess)
6411}
6412
6413fn parallel_bridge_lifecycle_tokens(
6414 bridge_type: &Type,
6415 component_index: usize,
6416 mission_mod: &Ident,
6417 placement: ParallelLifecyclePlacement,
6418) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
6419 let rt_guard = rtsan_guard_tokens();
6420 let abort_process_step = abort_process_step_tokens(true);
6421
6422 let preprocess = if placement.preprocess {
6423 quote! {
6424 execution_probe.record(cu29::monitoring::ExecutionMarker {
6425 component_id: cu29::monitoring::ComponentId::new(#component_index),
6426 step: CuComponentState::Preprocess,
6427 culistid: Some(clid),
6428 });
6429 ctx.clear_current_task();
6430 let maybe_error = {
6431 #rt_guard
6432 <#bridge_type as cu29::cubridge::CuBridge>::preprocess(bridge, &ctx)
6433 };
6434 if let Err(error) = maybe_error {
6435 let decision = monitor.process_error(
6436 cu29::monitoring::ComponentId::new(#component_index),
6437 CuComponentState::Preprocess,
6438 &error,
6439 );
6440 match decision {
6441 Decision::Abort => {
6442 debug!(
6443 "Preprocess: ABORT decision from monitoring. Component '{}' errored out during preprocess. Aborting CopperList {}.",
6444 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index)),
6445 clid
6446 );
6447 #abort_process_step
6448 }
6449 Decision::Ignore => {
6450 debug!(
6451 "Preprocess: IGNORE decision from monitoring. Component '{}' errored out during preprocess. The runtime will continue.",
6452 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6453 );
6454 }
6455 Decision::Shutdown => {
6456 debug!(
6457 "Preprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during preprocess. The runtime cannot continue.",
6458 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6459 );
6460 return Err(CuError::new_with_cause(
6461 "Component errored out during preprocess.",
6462 error,
6463 ));
6464 }
6465 }
6466 }
6467 }
6468 } else {
6469 quote! {}
6470 };
6471
6472 let postprocess = if placement.postprocess {
6473 quote! {
6474 kf_manager.freeze_any(clid, bridge)?;
6475 execution_probe.record(cu29::monitoring::ExecutionMarker {
6476 component_id: cu29::monitoring::ComponentId::new(#component_index),
6477 step: CuComponentState::Postprocess,
6478 culistid: Some(clid),
6479 });
6480 ctx.clear_current_task();
6481 let maybe_error = {
6482 #rt_guard
6483 <#bridge_type as cu29::cubridge::CuBridge>::postprocess(bridge, &ctx)
6484 };
6485 if let Err(error) = maybe_error {
6486 let decision = monitor.process_error(
6487 cu29::monitoring::ComponentId::new(#component_index),
6488 CuComponentState::Postprocess,
6489 &error,
6490 );
6491 match decision {
6492 Decision::Abort => {
6493 debug!(
6494 "Postprocess: ABORT decision from monitoring. Component '{}' errored out during postprocess. Continuing with the completed CopperList.",
6495 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6496 );
6497 }
6498 Decision::Ignore => {
6499 debug!(
6500 "Postprocess: IGNORE decision from monitoring. Component '{}' errored out during postprocess. The runtime will continue.",
6501 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6502 );
6503 }
6504 Decision::Shutdown => {
6505 debug!(
6506 "Postprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during postprocess. The runtime cannot continue.",
6507 #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#component_index))
6508 );
6509 return Err(CuError::new_with_cause(
6510 "Component errored out during postprocess.",
6511 error,
6512 ));
6513 }
6514 }
6515 }
6516 }
6517 } else {
6518 quote! {}
6519 };
6520
6521 (preprocess, postprocess)
6522}
6523
6524#[derive(Clone, Copy)]
6525struct StepGenerationContext<'a> {
6526 output_pack_sizes: &'a [usize],
6527 sim_mode: bool,
6528 mission_mod: &'a Ident,
6529 lifecycle_placement: ParallelLifecyclePlacement,
6530 wrap_process_step: bool,
6531}
6532
6533impl<'a> StepGenerationContext<'a> {
6534 fn new(
6535 output_pack_sizes: &'a [usize],
6536 sim_mode: bool,
6537 mission_mod: &'a Ident,
6538 lifecycle_placement: ParallelLifecyclePlacement,
6539 wrap_process_step: bool,
6540 ) -> Self {
6541 Self {
6542 output_pack_sizes,
6543 sim_mode,
6544 mission_mod,
6545 lifecycle_placement,
6546 wrap_process_step,
6547 }
6548 }
6549}
6550
6551struct TaskExecutionTokens {
6552 setup: proc_macro2::TokenStream,
6553 instance: proc_macro2::TokenStream,
6554}
6555
6556impl TaskExecutionTokens {
6557 fn new(setup: proc_macro2::TokenStream, instance: proc_macro2::TokenStream) -> Self {
6558 Self { setup, instance }
6559 }
6560}
6561
6562fn generate_task_execution_tokens(
6563 step: &CuExecutionStep,
6564 task_index: usize,
6565 task_specs: &CuTaskSpecSet,
6566 ctx: StepGenerationContext<'_>,
6567 task_tokens: TaskExecutionTokens,
6568) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
6569 let StepGenerationContext {
6570 output_pack_sizes,
6571 sim_mode,
6572 mission_mod,
6573 lifecycle_placement,
6574 wrap_process_step,
6575 } = ctx;
6576 let TaskExecutionTokens {
6577 setup: task_setup,
6578 instance: task_instance,
6579 } = task_tokens;
6580 let abort_process_step = abort_process_step_tokens(wrap_process_step);
6581 let comment_str = format!(
6582 "DEBUG ->> {} ({:?}) Id:{} I:{:?} O:{:?}",
6583 step.node.get_id(),
6584 step.task_type,
6585 step.node_id,
6586 step.input_msg_indices_types,
6587 step.output_msg_pack
6588 );
6589 let comment_tokens = quote! {{
6590 let _ = stringify!(#comment_str);
6591 }};
6592 let tid = task_index;
6593 let task_enum_name = config_id_to_enum(&task_specs.ids[tid]);
6594 let enum_name = Ident::new(&task_enum_name, Span::call_site());
6595 let task_hint = config_id_to_struct_member(&task_specs.ids[tid]);
6596 let source_slot_match_trait_ident = format_ident!(
6597 "__CuOutputSlotMustMatchTaskOutput__Task_{}__Add_dst___nc___connections_for_unused_outputs",
6598 task_hint
6599 );
6600 let source_slot_match_fn_ident = format_ident!(
6601 "__cu_source_output_slot_or_add_dst___nc___for_unused_outputs__task_{}",
6602 task_hint
6603 );
6604 let regular_slot_match_trait_ident = format_ident!(
6605 "__CuOutputSlotMustMatchTaskOutput__Task_{}__Add_dst___nc___connections_for_unused_outputs",
6606 task_hint
6607 );
6608 let regular_slot_match_fn_ident = format_ident!(
6609 "__cu_task_output_slot_or_add_dst___nc___for_unused_outputs__task_{}",
6610 task_hint
6611 );
6612 let rt_guard = rtsan_guard_tokens();
6613 let run_in_sim_flag = task_specs.run_in_sim_flags[tid];
6614 let task_type = &task_specs.task_types[tid];
6615 let (parallel_task_preprocess, parallel_task_postprocess) = parallel_task_lifecycle_tokens(
6616 step.task_type,
6617 task_type,
6618 tid,
6619 mission_mod,
6620 &task_instance,
6621 lifecycle_placement,
6622 );
6623 let maybe_sim_tick = if sim_mode && !run_in_sim_flag {
6624 quote! {
6625 if !doit {
6626 #task_instance.sim_tick();
6627 }
6628 }
6629 } else {
6630 quote!()
6631 };
6632
6633 let output_pack = step
6634 .output_msg_pack
6635 .as_ref()
6636 .expect("Task should have an output message pack.");
6637 let output_culist_index = int2sliceindex(output_pack.culist_index);
6638 let output_ports: Vec<syn::Index> = (0..output_pack.msg_types.len())
6639 .map(syn::Index::from)
6640 .collect();
6641 let output_clear_payload = if output_ports.len() == 1 {
6642 quote! { cumsg_output.clear_payload(); }
6643 } else {
6644 quote! { #(cumsg_output.#output_ports.clear_payload();)* }
6645 };
6646 let output_start_time = if output_ports.len() == 1 {
6647 quote! {
6648 if cumsg_output.metadata.process_time.start.is_none() {
6649 cumsg_output.metadata.process_time.start = cu29::curuntime::perf_now(clock).into();
6650 }
6651 }
6652 } else {
6653 quote! {
6654 let start_time = cu29::curuntime::perf_now(clock).into();
6655 #( if cumsg_output.#output_ports.metadata.process_time.start.is_none() {
6656 cumsg_output.#output_ports.metadata.process_time.start = start_time;
6657 } )*
6658 }
6659 };
6660 let output_end_time = if output_ports.len() == 1 {
6661 quote! {
6662 if cumsg_output.metadata.process_time.end.is_none() {
6663 cumsg_output.metadata.process_time.end = cu29::curuntime::perf_now(clock).into();
6664 }
6665 }
6666 } else {
6667 quote! {
6668 let end_time = cu29::curuntime::perf_now(clock).into();
6669 #( if cumsg_output.#output_ports.metadata.process_time.end.is_none() {
6670 cumsg_output.#output_ports.metadata.process_time.end = end_time;
6671 } )*
6672 }
6673 };
6674
6675 match step.task_type {
6676 CuTaskType::Source => {
6677 let monitoring_action = quote! {
6678 debug!("Component {}: Error during process: {}", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), &error);
6679 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#tid), CuComponentState::Process, &error);
6680 match decision {
6681 Decision::Abort => {
6682 debug!("Process: ABORT decision from monitoring. Component '{}' errored out \
6683 during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), clid);
6684 #abort_process_step
6685 }
6686 Decision::Ignore => {
6687 debug!("Process: IGNORE decision from monitoring. Component '{}' errored out \
6688 during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
6689 let cumsg_output = &mut msgs.#output_culist_index;
6690 #output_clear_payload
6691 }
6692 Decision::Shutdown => {
6693 debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out \
6694 during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
6695 return Err(CuError::new_with_cause("Component errored out during process.", error));
6696 }
6697 }
6698 };
6699
6700 let call_sim_callback = if sim_mode {
6701 quote! {
6702 let doit = {
6703 let cumsg_output = &mut msgs.#output_culist_index;
6704 let state = CuTaskCallbackState::Process((), cumsg_output);
6705 let ovr = sim_callback(SimStep::#enum_name(state));
6706
6707 if let SimOverride::Errored(reason) = ovr {
6708 let error: CuError = reason.into();
6709 #monitoring_action
6710 false
6711 } else {
6712 ovr == SimOverride::ExecuteByRuntime
6713 }
6714 };
6715 }
6716 } else {
6717 quote! { let doit = true; }
6718 };
6719
6720 let logging_tokens = if !task_specs.logging_enabled[tid] {
6721 quote! {
6722 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
6723 #output_clear_payload
6724 }
6725 } else {
6726 quote!()
6727 };
6728 let source_process_tokens = quote! {
6729 #[allow(non_camel_case_types)]
6730 trait #source_slot_match_trait_ident<Expected> {
6731 fn __cu_cast_output_slot(slot: &mut Self) -> &mut Expected;
6732 }
6733 impl<T> #source_slot_match_trait_ident<T> for T {
6734 fn __cu_cast_output_slot(slot: &mut Self) -> &mut T {
6735 slot
6736 }
6737 }
6738
6739 fn #source_slot_match_fn_ident<'a, Task, Slot>(
6740 _task: &Task,
6741 slot: &'a mut Slot,
6742 ) -> &'a mut Task::Output<'static>
6743 where
6744 Task: cu29::cutask::CuSrcTask,
6745 Slot: #source_slot_match_trait_ident<Task::Output<'static>>,
6746 {
6747 <Slot as #source_slot_match_trait_ident<Task::Output<'static>>>::__cu_cast_output_slot(slot)
6748 }
6749
6750 #output_start_time
6751 let result = {
6752 let cumsg_output = #source_slot_match_fn_ident::<
6753 _,
6754 _,
6755 >(&#task_instance, cumsg_output);
6756 #rt_guard
6757 ctx.set_current_task(#tid);
6758 #task_instance.process(&ctx, cumsg_output)
6759 };
6760 #output_end_time
6761 result
6762 };
6763
6764 (
6765 wrap_process_step_tokens(
6766 wrap_process_step,
6767 quote! {
6768 #task_setup
6769 #parallel_task_preprocess
6770 #comment_tokens
6771 kf_manager.freeze_task(clid, &#task_instance)?;
6772 #call_sim_callback
6773 let cumsg_output = &mut msgs.#output_culist_index;
6774 #maybe_sim_tick
6775 let maybe_error = if doit {
6776 execution_probe.record(cu29::monitoring::ExecutionMarker {
6777 component_id: cu29::monitoring::ComponentId::new(#tid),
6778 step: CuComponentState::Process,
6779 culistid: Some(clid),
6780 });
6781 #source_process_tokens
6782 } else {
6783 Ok(())
6784 };
6785 if let Err(error) = maybe_error {
6786 #monitoring_action
6787 }
6788 #parallel_task_postprocess
6789 },
6790 ),
6791 logging_tokens,
6792 )
6793 }
6794 CuTaskType::Sink => {
6795 let input_exprs: Vec<proc_macro2::TokenStream> = step
6796 .input_msg_indices_types
6797 .iter()
6798 .map(|input| {
6799 let input_index = int2sliceindex(input.culist_index);
6800 let output_size = output_pack_sizes
6801 .get(input.culist_index as usize)
6802 .copied()
6803 .unwrap_or_else(|| {
6804 panic!(
6805 "Missing output pack size for culist index {}",
6806 input.culist_index
6807 )
6808 });
6809 if output_size > 1 {
6810 let port_index = syn::Index::from(input.src_port);
6811 quote! { msgs.#input_index.#port_index }
6812 } else {
6813 quote! { msgs.#input_index }
6814 }
6815 })
6816 .collect();
6817 let inputs_type = if input_exprs.len() == 1 {
6818 let input = input_exprs.first().unwrap();
6819 quote! { #input }
6820 } else {
6821 quote! { (#(&#input_exprs),*) }
6822 };
6823
6824 let monitoring_action = quote! {
6825 debug!("Component {}: Error during process: {}", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), &error);
6826 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#tid), CuComponentState::Process, &error);
6827 match decision {
6828 Decision::Abort => {
6829 debug!("Process: ABORT decision from monitoring. Component '{}' errored out \
6830 during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), clid);
6831 #abort_process_step
6832 }
6833 Decision::Ignore => {
6834 debug!("Process: IGNORE decision from monitoring. Component '{}' errored out \
6835 during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
6836 let cumsg_output = &mut msgs.#output_culist_index;
6837 #output_clear_payload
6838 }
6839 Decision::Shutdown => {
6840 debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out \
6841 during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
6842 return Err(CuError::new_with_cause("Component errored out during process.", error));
6843 }
6844 }
6845 };
6846
6847 let call_sim_callback = if sim_mode {
6848 quote! {
6849 let doit = {
6850 let cumsg_input = &#inputs_type;
6851 let cumsg_output = &mut msgs.#output_culist_index;
6852 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
6853 let ovr = sim_callback(SimStep::#enum_name(state));
6854
6855 if let SimOverride::Errored(reason) = ovr {
6856 let error: CuError = reason.into();
6857 #monitoring_action
6858 false
6859 } else {
6860 ovr == SimOverride::ExecuteByRuntime
6861 }
6862 };
6863 }
6864 } else {
6865 quote! { let doit = true; }
6866 };
6867
6868 (
6869 wrap_process_step_tokens(
6870 wrap_process_step,
6871 quote! {
6872 #task_setup
6873 #parallel_task_preprocess
6874 #comment_tokens
6875 kf_manager.freeze_task(clid, &#task_instance)?;
6876 #call_sim_callback
6877 let cumsg_input = &#inputs_type;
6878 let cumsg_output = &mut msgs.#output_culist_index;
6879 let maybe_error = if doit {
6880 execution_probe.record(cu29::monitoring::ExecutionMarker {
6881 component_id: cu29::monitoring::ComponentId::new(#tid),
6882 step: CuComponentState::Process,
6883 culistid: Some(clid),
6884 });
6885 #output_start_time
6886 let result = {
6887 #rt_guard
6888 ctx.set_current_task(#tid);
6889 #task_instance.process(&ctx, cumsg_input)
6890 };
6891 #output_end_time
6892 result
6893 } else {
6894 Ok(())
6895 };
6896 if let Err(error) = maybe_error {
6897 #monitoring_action
6898 }
6899 #parallel_task_postprocess
6900 },
6901 ),
6902 quote! {},
6903 )
6904 }
6905 CuTaskType::Regular => {
6906 let input_exprs: Vec<proc_macro2::TokenStream> = step
6907 .input_msg_indices_types
6908 .iter()
6909 .map(|input| {
6910 let input_index = int2sliceindex(input.culist_index);
6911 let output_size = output_pack_sizes
6912 .get(input.culist_index as usize)
6913 .copied()
6914 .unwrap_or_else(|| {
6915 panic!(
6916 "Missing output pack size for culist index {}",
6917 input.culist_index
6918 )
6919 });
6920 if output_size > 1 {
6921 let port_index = syn::Index::from(input.src_port);
6922 quote! { msgs.#input_index.#port_index }
6923 } else {
6924 quote! { msgs.#input_index }
6925 }
6926 })
6927 .collect();
6928 let inputs_type = if input_exprs.len() == 1 {
6929 let input = input_exprs.first().unwrap();
6930 quote! { #input }
6931 } else {
6932 quote! { (#(&#input_exprs),*) }
6933 };
6934
6935 let monitoring_action = quote! {
6936 debug!("Component {}: Error during process: {}", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), &error);
6937 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#tid), CuComponentState::Process, &error);
6938 match decision {
6939 Decision::Abort => {
6940 debug!("Process: ABORT decision from monitoring. Component '{}' errored out \
6941 during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), clid);
6942 #abort_process_step
6943 }
6944 Decision::Ignore => {
6945 debug!("Process: IGNORE decision from monitoring. Component '{}' errored out \
6946 during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
6947 let cumsg_output = &mut msgs.#output_culist_index;
6948 #output_clear_payload
6949 }
6950 Decision::Shutdown => {
6951 debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out \
6952 during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
6953 return Err(CuError::new_with_cause("Component errored out during process.", error));
6954 }
6955 }
6956 };
6957
6958 let call_sim_callback = if sim_mode {
6959 quote! {
6960 let doit = {
6961 let cumsg_input = &#inputs_type;
6962 let cumsg_output = &mut msgs.#output_culist_index;
6963 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
6964 let ovr = sim_callback(SimStep::#enum_name(state));
6965
6966 if let SimOverride::Errored(reason) = ovr {
6967 let error: CuError = reason.into();
6968 #monitoring_action
6969 false
6970 }
6971 else {
6972 ovr == SimOverride::ExecuteByRuntime
6973 }
6974 };
6975 }
6976 } else {
6977 quote! { let doit = true; }
6978 };
6979
6980 let logging_tokens = if !task_specs.logging_enabled[tid] {
6981 quote! {
6982 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
6983 #output_clear_payload
6984 }
6985 } else {
6986 quote!()
6987 };
6988 let regular_process_tokens = quote! {
6989 #[allow(non_camel_case_types)]
6990 trait #regular_slot_match_trait_ident<Expected> {
6991 fn __cu_cast_output_slot(slot: &mut Self) -> &mut Expected;
6992 }
6993 impl<T> #regular_slot_match_trait_ident<T> for T {
6994 fn __cu_cast_output_slot(slot: &mut Self) -> &mut T {
6995 slot
6996 }
6997 }
6998
6999 fn #regular_slot_match_fn_ident<'a, Task, Slot>(
7000 _task: &Task,
7001 slot: &'a mut Slot,
7002 ) -> &'a mut Task::Output<'static>
7003 where
7004 Task: cu29::cutask::CuTask,
7005 Slot: #regular_slot_match_trait_ident<Task::Output<'static>>,
7006 {
7007 <Slot as #regular_slot_match_trait_ident<Task::Output<'static>>>::__cu_cast_output_slot(slot)
7008 }
7009
7010 #output_start_time
7011 let result = {
7012 let cumsg_output = #regular_slot_match_fn_ident::<
7013 _,
7014 _,
7015 >(&#task_instance, cumsg_output);
7016 #rt_guard
7017 ctx.set_current_task(#tid);
7018 #task_instance.process(&ctx, cumsg_input, cumsg_output)
7019 };
7020 #output_end_time
7021 result
7022 };
7023
7024 (
7025 wrap_process_step_tokens(
7026 wrap_process_step,
7027 quote! {
7028 #task_setup
7029 #parallel_task_preprocess
7030 #comment_tokens
7031 kf_manager.freeze_task(clid, &#task_instance)?;
7032 #call_sim_callback
7033 let cumsg_input = &#inputs_type;
7034 let cumsg_output = &mut msgs.#output_culist_index;
7035 let maybe_error = if doit {
7036 execution_probe.record(cu29::monitoring::ExecutionMarker {
7037 component_id: cu29::monitoring::ComponentId::new(#tid),
7038 step: CuComponentState::Process,
7039 culistid: Some(clid),
7040 });
7041 #regular_process_tokens
7042 } else {
7043 Ok(())
7044 };
7045 if let Err(error) = maybe_error {
7046 #monitoring_action
7047 }
7048 #parallel_task_postprocess
7049 },
7050 ),
7051 logging_tokens,
7052 )
7053 }
7054 }
7055}
7056
7057fn generate_bridge_rx_execution_tokens(
7058 step: &CuExecutionStep,
7059 bridge_spec: &BridgeSpec,
7060 channel_index: usize,
7061 ctx: StepGenerationContext<'_>,
7062 bridge_setup: proc_macro2::TokenStream,
7063) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
7064 let StepGenerationContext {
7065 output_pack_sizes: _,
7066 sim_mode,
7067 mission_mod,
7068 lifecycle_placement,
7069 wrap_process_step,
7070 } = ctx;
7071 let rt_guard = rtsan_guard_tokens();
7072 let abort_process_step = abort_process_step_tokens(wrap_process_step);
7073 let channel = &bridge_spec.rx_channels[channel_index];
7074 let output_pack = step
7075 .output_msg_pack
7076 .as_ref()
7077 .expect("Bridge Rx channel missing output pack");
7078 let port_index = output_pack
7079 .msg_types
7080 .iter()
7081 .position(|msg| msg == &channel.msg_type_name)
7082 .unwrap_or_else(|| {
7083 panic!(
7084 "Bridge Rx channel '{}' missing output port for '{}'",
7085 channel.id, channel.msg_type_name
7086 )
7087 });
7088 let culist_index_ts = int2sliceindex(output_pack.culist_index);
7089 let output_ref = if output_pack.msg_types.len() == 1 {
7090 quote! { &mut msgs.#culist_index_ts }
7091 } else {
7092 let port_index = syn::Index::from(port_index);
7093 quote! { &mut msgs.#culist_index_ts.#port_index }
7094 };
7095 let monitor_index = syn::Index::from(
7096 channel
7097 .monitor_index
7098 .expect("Bridge Rx channel missing monitor index"),
7099 );
7100 let bridge_type = runtime_bridge_type_for_spec(bridge_spec, sim_mode);
7101 let (parallel_bridge_preprocess, parallel_bridge_postprocess) =
7102 parallel_bridge_lifecycle_tokens(
7103 &bridge_type,
7104 bridge_spec
7105 .monitor_index
7106 .expect("Bridge missing monitor index for lifecycle"),
7107 mission_mod,
7108 lifecycle_placement,
7109 );
7110 let const_ident = &channel.const_ident;
7111 let enum_ident = Ident::new(
7112 &config_id_to_enum(&format!("{}_rx_{}", bridge_spec.id, channel.id)),
7113 Span::call_site(),
7114 );
7115
7116 let call_sim_callback = if sim_mode {
7117 quote! {
7118 let doit = {
7119 let state = SimStep::#enum_ident {
7120 channel: &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
7121 msg: cumsg_output,
7122 };
7123 let ovr = sim_callback(state);
7124 if let SimOverride::Errored(reason) = ovr {
7125 let error: CuError = reason.into();
7126 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Process, &error);
7127 match decision {
7128 Decision::Abort => {
7129 debug!("Process: ABORT decision from monitoring. Component '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)), clid);
7130 #abort_process_step
7131 }
7132 Decision::Ignore => {
7133 debug!("Process: IGNORE decision from monitoring. Component '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
7134 cumsg_output.clear_payload();
7135 false
7136 }
7137 Decision::Shutdown => {
7138 debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
7139 return Err(CuError::new_with_cause("Component errored out during process.", error));
7140 }
7141 }
7142 } else {
7143 ovr == SimOverride::ExecuteByRuntime
7144 }
7145 };
7146 }
7147 } else {
7148 quote! { let doit = true; }
7149 };
7150 (
7151 wrap_process_step_tokens(
7152 wrap_process_step,
7153 quote! {
7154 #bridge_setup
7155 #parallel_bridge_preprocess
7156 let cumsg_output = #output_ref;
7157 #call_sim_callback
7158 if doit {
7159 execution_probe.record(cu29::monitoring::ExecutionMarker {
7160 component_id: cu29::monitoring::ComponentId::new(#monitor_index),
7161 step: CuComponentState::Process,
7162 culistid: Some(clid),
7163 });
7164 cumsg_output.metadata.process_time.start = cu29::curuntime::perf_now(clock).into();
7165 let maybe_error = {
7166 #rt_guard
7167 ctx.clear_current_task();
7168 bridge.receive(
7169 &ctx,
7170 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
7171 cumsg_output,
7172 )
7173 };
7174 cumsg_output.metadata.process_time.end = cu29::curuntime::perf_now(clock).into();
7175 if let Err(error) = maybe_error {
7176 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Process, &error);
7177 match decision {
7178 Decision::Abort => {
7179 debug!("Process: ABORT decision from monitoring. Component '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)), clid);
7180 #abort_process_step
7181 }
7182 Decision::Ignore => {
7183 debug!("Process: IGNORE decision from monitoring. Component '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
7184 cumsg_output.clear_payload();
7185 }
7186 Decision::Shutdown => {
7187 debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
7188 return Err(CuError::new_with_cause("Component errored out during process.", error));
7189 }
7190 }
7191 }
7192 }
7193 #parallel_bridge_postprocess
7194 },
7195 ),
7196 quote! {},
7197 )
7198}
7199
7200fn generate_bridge_tx_execution_tokens(
7201 step: &CuExecutionStep,
7202 bridge_spec: &BridgeSpec,
7203 channel_index: usize,
7204 ctx: StepGenerationContext<'_>,
7205 bridge_setup: proc_macro2::TokenStream,
7206) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
7207 let StepGenerationContext {
7208 output_pack_sizes,
7209 sim_mode,
7210 mission_mod,
7211 lifecycle_placement,
7212 wrap_process_step,
7213 } = ctx;
7214 let rt_guard = rtsan_guard_tokens();
7215 let abort_process_step = abort_process_step_tokens(wrap_process_step);
7216 let channel = &bridge_spec.tx_channels[channel_index];
7217 let monitor_index = syn::Index::from(
7218 channel
7219 .monitor_index
7220 .expect("Bridge Tx channel missing monitor index"),
7221 );
7222 let input = step
7223 .input_msg_indices_types
7224 .first()
7225 .expect("Bridge Tx channel should have exactly one input");
7226 let input_index = int2sliceindex(input.culist_index);
7227 let output_size = output_pack_sizes
7228 .get(input.culist_index as usize)
7229 .copied()
7230 .unwrap_or_else(|| {
7231 panic!(
7232 "Missing output pack size for culist index {}",
7233 input.culist_index
7234 )
7235 });
7236 let input_ref = if output_size > 1 {
7237 let port_index = syn::Index::from(input.src_port);
7238 quote! { &mut msgs.#input_index.#port_index }
7239 } else {
7240 quote! { &mut msgs.#input_index }
7241 };
7242 let output_pack = step
7243 .output_msg_pack
7244 .as_ref()
7245 .expect("Bridge Tx channel missing output pack");
7246 if output_pack.msg_types.len() != 1 {
7247 panic!(
7248 "Bridge Tx channel '{}' expected a single output message slot, got {}",
7249 channel.id,
7250 output_pack.msg_types.len()
7251 );
7252 }
7253 let output_index = int2sliceindex(output_pack.culist_index);
7254 let output_ref = quote! { &mut msgs.#output_index };
7255 let bridge_type = runtime_bridge_type_for_spec(bridge_spec, sim_mode);
7256 let (parallel_bridge_preprocess, parallel_bridge_postprocess) =
7257 parallel_bridge_lifecycle_tokens(
7258 &bridge_type,
7259 bridge_spec
7260 .monitor_index
7261 .expect("Bridge missing monitor index for lifecycle"),
7262 mission_mod,
7263 lifecycle_placement,
7264 );
7265 let const_ident = &channel.const_ident;
7266 let enum_ident = Ident::new(
7267 &config_id_to_enum(&format!("{}_tx_{}", bridge_spec.id, channel.id)),
7268 Span::call_site(),
7269 );
7270
7271 let call_sim_callback = if sim_mode {
7272 quote! {
7273 let doit = {
7274 let state = SimStep::#enum_ident {
7275 channel: &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
7276 msg: &*cumsg_input,
7277 output: cumsg_output,
7278 };
7279 let ovr = sim_callback(state);
7280 if let SimOverride::Errored(reason) = ovr {
7281 let error: CuError = reason.into();
7282 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Process, &error);
7283 match decision {
7284 Decision::Abort => {
7285 debug!("Process: ABORT decision from monitoring. Component '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)), clid);
7286 #abort_process_step
7287 }
7288 Decision::Ignore => {
7289 debug!("Process: IGNORE decision from monitoring. Component '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
7290 false
7291 }
7292 Decision::Shutdown => {
7293 debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
7294 return Err(CuError::new_with_cause("Component errored out during process.", error));
7295 }
7296 }
7297 } else {
7298 ovr == SimOverride::ExecuteByRuntime
7299 }
7300 };
7301 }
7302 } else {
7303 quote! { let doit = true; }
7304 };
7305 (
7306 wrap_process_step_tokens(
7307 wrap_process_step,
7308 quote! {
7309 #bridge_setup
7310 #parallel_bridge_preprocess
7311 let cumsg_input = #input_ref;
7312 let cumsg_output = #output_ref;
7313 #call_sim_callback
7314 if doit {
7315 execution_probe.record(cu29::monitoring::ExecutionMarker {
7316 component_id: cu29::monitoring::ComponentId::new(#monitor_index),
7317 step: CuComponentState::Process,
7318 culistid: Some(clid),
7319 });
7320 cumsg_output.metadata.process_time.start = cu29::curuntime::perf_now(clock).into();
7321 let maybe_error = {
7322 #rt_guard
7323 ctx.clear_current_task();
7324 bridge.send(
7325 &ctx,
7326 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
7327 &*cumsg_input,
7328 )
7329 };
7330 if let Err(error) = maybe_error {
7331 let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Process, &error);
7332 match decision {
7333 Decision::Abort => {
7334 debug!("Process: ABORT decision from monitoring. Component '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)), clid);
7335 #abort_process_step
7336 }
7337 Decision::Ignore => {
7338 debug!("Process: IGNORE decision from monitoring. Component '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
7339 }
7340 Decision::Shutdown => {
7341 debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
7342 return Err(CuError::new_with_cause("Component errored out during process.", error));
7343 }
7344 }
7345 }
7346 cumsg_output.metadata.process_time.end = cu29::curuntime::perf_now(clock).into();
7347 }
7348 #parallel_bridge_postprocess
7349 },
7350 ),
7351 quote! {},
7352 )
7353}
7354
7355#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
7356enum BridgeChannelDirection {
7357 Rx,
7358 Tx,
7359}
7360
7361#[derive(Clone, Debug, PartialEq, Eq, Hash)]
7362struct BridgeChannelKey {
7363 bridge_id: String,
7364 channel_id: String,
7365 direction: BridgeChannelDirection,
7366}
7367
7368#[derive(Clone)]
7369struct BridgeChannelSpec {
7370 id: String,
7371 const_ident: Ident,
7372 #[allow(dead_code)]
7373 msg_type: Type,
7374 msg_type_name: String,
7375 config_index: usize,
7376 plan_node_id: Option<NodeId>,
7377 culist_index: Option<usize>,
7378 monitor_index: Option<usize>,
7379}
7380
7381#[derive(Clone)]
7382struct BridgeSpec {
7383 id: String,
7384 type_path: Type,
7385 run_in_sim: bool,
7386 config_index: usize,
7387 tuple_index: usize,
7388 monitor_index: Option<usize>,
7389 rx_channels: Vec<BridgeChannelSpec>,
7390 tx_channels: Vec<BridgeChannelSpec>,
7391}
7392
7393#[derive(Clone, Copy, Debug, Default)]
7394struct ParallelLifecyclePlacement {
7395 preprocess: bool,
7396 postprocess: bool,
7397}
7398
7399#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
7400enum ParallelLifecycleKey {
7401 Task(usize),
7402 Bridge(usize),
7403}
7404
7405fn build_parallel_lifecycle_placements(
7406 culist_plan: &CuExecutionLoop,
7407 culist_exec_entities: &[ExecutionEntity],
7408) -> Vec<ParallelLifecyclePlacement> {
7409 let step_keys: Vec<Option<ParallelLifecycleKey>> = culist_plan
7410 .steps
7411 .iter()
7412 .map(|unit| match unit {
7413 CuExecutionUnit::Step(step) => {
7414 match &culist_exec_entities[step.node_id as usize].kind {
7415 ExecutionEntityKind::Task { task_index } => {
7416 Some(ParallelLifecycleKey::Task(*task_index))
7417 }
7418 ExecutionEntityKind::BridgeRx { bridge_index, .. }
7419 | ExecutionEntityKind::BridgeTx { bridge_index, .. } => {
7420 Some(ParallelLifecycleKey::Bridge(*bridge_index))
7421 }
7422 }
7423 }
7424 CuExecutionUnit::Loop(_) => None,
7425 })
7426 .collect();
7427
7428 let mut placements = vec![ParallelLifecyclePlacement::default(); step_keys.len()];
7429 let mut seen_forward = std::collections::HashSet::new();
7430 for (index, key) in step_keys.iter().enumerate() {
7431 let Some(key) = key else {
7432 continue;
7433 };
7434 if seen_forward.insert(*key) {
7435 placements[index].preprocess = true;
7436 }
7437 }
7438
7439 let mut seen_reverse = std::collections::HashSet::new();
7440 for (index, key) in step_keys.iter().enumerate().rev() {
7441 let Some(key) = key else {
7442 continue;
7443 };
7444 if seen_reverse.insert(*key) {
7445 placements[index].postprocess = true;
7446 }
7447 }
7448
7449 placements
7450}
7451
7452fn sim_bridge_channel_set_idents(bridge_tuple_index: usize) -> (Ident, Ident, Ident, Ident) {
7453 (
7454 format_ident!("__CuSimBridge{}TxChannels", bridge_tuple_index),
7455 format_ident!("__CuSimBridge{}TxId", bridge_tuple_index),
7456 format_ident!("__CuSimBridge{}RxChannels", bridge_tuple_index),
7457 format_ident!("__CuSimBridge{}RxId", bridge_tuple_index),
7458 )
7459}
7460
7461fn runtime_bridge_type_for_spec(bridge_spec: &BridgeSpec, sim_mode: bool) -> Type {
7462 if sim_mode && !bridge_spec.run_in_sim {
7463 let (tx_set_ident, _tx_id_ident, rx_set_ident, _rx_id_ident) =
7464 sim_bridge_channel_set_idents(bridge_spec.tuple_index);
7465 let tx_type: Type = if bridge_spec.tx_channels.is_empty() {
7466 parse_quote!(cu29::simulation::CuNoBridgeChannels)
7467 } else {
7468 parse_quote!(#tx_set_ident)
7469 };
7470 let rx_type: Type = if bridge_spec.rx_channels.is_empty() {
7471 parse_quote!(cu29::simulation::CuNoBridgeChannels)
7472 } else {
7473 parse_quote!(#rx_set_ident)
7474 };
7475 parse_quote!(cu29::simulation::CuSimBridge<#tx_type, #rx_type>)
7476 } else {
7477 bridge_spec.type_path.clone()
7478 }
7479}
7480
7481#[derive(Clone)]
7482struct ExecutionEntity {
7483 kind: ExecutionEntityKind,
7484}
7485
7486#[derive(Clone)]
7487enum ExecutionEntityKind {
7488 Task {
7489 task_index: usize,
7490 },
7491 BridgeRx {
7492 bridge_index: usize,
7493 channel_index: usize,
7494 },
7495 BridgeTx {
7496 bridge_index: usize,
7497 channel_index: usize,
7498 },
7499}
7500
7501#[cfg(test)]
7502mod tests {
7503 use std::fs;
7504 use std::path::{Path, PathBuf};
7505
7506 fn unique_test_dir(name: &str) -> PathBuf {
7507 let nanos = std::time::SystemTime::now()
7508 .duration_since(std::time::UNIX_EPOCH)
7509 .expect("system clock before unix epoch")
7510 .as_nanos();
7511 std::env::temp_dir().join(format!("cu29_derive_{name}_{nanos}"))
7512 }
7513
7514 fn write_file(path: &Path, content: &str) {
7515 if let Some(parent) = path.parent() {
7516 fs::create_dir_all(parent).expect("create parent dirs");
7517 }
7518 fs::write(path, content).expect("write file");
7519 }
7520
7521 #[test]
7523 fn test_compile_fail() {
7524 use rustc_version::{Channel, version_meta};
7525 use std::{env, fs, path::Path};
7526
7527 let log_index_dir = env::temp_dir()
7528 .join("cu29_derive_trybuild_log_index")
7529 .join("a")
7530 .join("b")
7531 .join("c");
7532 fs::create_dir_all(&log_index_dir).unwrap();
7533 unsafe {
7534 env::set_var("LOG_INDEX_DIR", &log_index_dir);
7535 }
7536
7537 let dir = Path::new("tests/compile_fail");
7538 for entry in fs::read_dir(dir).unwrap() {
7539 let entry = entry.unwrap();
7540 if !entry.file_type().unwrap().is_dir() {
7541 continue;
7542 }
7543 for file in fs::read_dir(entry.path()).unwrap() {
7544 let file = file.unwrap();
7545 let p = file.path();
7546 if p.extension().and_then(|x| x.to_str()) != Some("rs") {
7547 continue;
7548 }
7549
7550 let base = p.with_extension("stderr"); let src = match version_meta().unwrap().channel {
7552 Channel::Beta => Path::new(&format!("{}.beta", base.display())).to_path_buf(),
7553 _ => Path::new(&format!("{}.stable", base.display())).to_path_buf(),
7554 };
7555
7556 if src.exists() {
7557 fs::copy(src, &base).unwrap();
7558 }
7559 }
7560 }
7561
7562 let t = trybuild::TestCases::new();
7563 t.compile_fail("tests/compile_fail/*/*.rs");
7564 t.pass("tests/compile_pass/*/*.rs");
7565 }
7566
7567 #[test]
7568 fn runtime_plan_keeps_nc_order_for_non_first_connected_output() {
7569 use super::*;
7570 use cu29::config::CuConfig;
7571 use cu29::curuntime::{CuExecutionUnit, compute_runtime_plan};
7572
7573 let config: CuConfig =
7574 read_config("tests/config/multi_output_source_non_first_connected_valid.ron")
7575 .expect("failed to read test config");
7576 let graph = config.get_graph(None).expect("missing graph");
7577 let src_id = graph.get_node_id_by_name("src").expect("missing src node");
7578
7579 let runtime = compute_runtime_plan(graph).expect("runtime plan failed");
7580 let src_step = runtime
7581 .steps
7582 .iter()
7583 .find_map(|step| match step {
7584 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
7585 _ => None,
7586 })
7587 .expect("missing source step");
7588
7589 assert_eq!(
7590 src_step.output_msg_pack.as_ref().unwrap().msg_types,
7591 vec!["i32", "bool"]
7592 );
7593 }
7594
7595 #[test]
7596 fn matching_task_ids_are_flattened_per_output_message() {
7597 use super::*;
7598 use cu29::config::CuConfig;
7599
7600 let config: CuConfig =
7601 read_config("tests/config/multi_output_source_non_first_connected_valid.ron")
7602 .expect("failed to read test config");
7603 let graph = config.get_graph(None).expect("missing graph");
7604 let task_specs = CuTaskSpecSet::from_graph(graph);
7605 let channel_usage = collect_bridge_channel_usage(graph);
7606 let mut bridge_specs = build_bridge_specs(&config, graph, &channel_usage);
7607 let (runtime_plan, exec_entities, plan_to_original) =
7608 build_execution_plan(graph, &task_specs, &mut bridge_specs)
7609 .expect("runtime plan failed");
7610 let output_packs = extract_output_packs(&runtime_plan);
7611 let task_names = collect_task_names(graph);
7612 let (_, node_output_positions) = collect_culist_metadata(
7613 &runtime_plan,
7614 &exec_entities,
7615 &mut bridge_specs,
7616 &plan_to_original,
7617 );
7618
7619 let mut slot_origin_ids: Vec<Option<String>> = vec![None; output_packs.len()];
7621 for (node_id, task_id, _) in task_names {
7622 let output_position = node_output_positions
7623 .get(&node_id)
7624 .unwrap_or_else(|| panic!("Task {task_id} (node id: {node_id}) not found"));
7625 slot_origin_ids[*output_position] = Some(task_id);
7626 }
7627
7628 let flattened_ids = flatten_slot_origin_ids(&output_packs, slot_origin_ids);
7629
7630 assert_eq!(
7633 flattened_ids,
7634 vec!["src".to_string(), "src".to_string(), "sink".to_string()]
7635 );
7636 }
7637
7638 #[test]
7639 fn bridge_resources_are_collected() {
7640 use super::*;
7641 use cu29::config::{CuGraph, Flavor, Node};
7642 use std::collections::HashMap;
7643 use syn::parse_str;
7644
7645 let mut graph = CuGraph::default();
7646 let mut node = Node::new_with_flavor("radio", "bridge::Dummy", Flavor::Bridge);
7647 let mut res = HashMap::new();
7648 res.insert("serial".to_string(), "fc.serial0".to_string());
7649 node.set_resources(Some(res));
7650 graph.add_node(node).expect("bridge node");
7651
7652 let task_specs = CuTaskSpecSet::from_graph(&graph);
7653 let bridge_spec = BridgeSpec {
7654 id: "radio".to_string(),
7655 type_path: parse_str("bridge::Dummy").unwrap(),
7656 run_in_sim: true,
7657 config_index: 0,
7658 tuple_index: 0,
7659 monitor_index: None,
7660 rx_channels: Vec::new(),
7661 tx_channels: Vec::new(),
7662 };
7663
7664 let mut config = cu29::config::CuConfig::default();
7665 config.resources.push(ResourceBundleConfig {
7666 id: "fc".to_string(),
7667 provider: "board::Bundle".to_string(),
7668 config: None,
7669 missions: None,
7670 });
7671 let bundle_specs = build_bundle_specs(&config, "default").expect("bundle specs");
7672 let specs = collect_resource_specs(&graph, &task_specs, &[bridge_spec], &bundle_specs)
7673 .expect("collect specs");
7674 assert_eq!(specs.len(), 1);
7675 assert!(matches!(specs[0].owner, ResourceOwner::Bridge(0)));
7676 assert_eq!(specs[0].binding_name, "serial");
7677 assert_eq!(specs[0].bundle_index, 0);
7678 assert_eq!(specs[0].resource_name, "serial0");
7679 }
7680
7681 #[test]
7682 fn copper_runtime_args_parse_subsystem_mode() {
7683 use super::*;
7684 use quote::quote;
7685
7686 let args = CopperRuntimeArgs::parse_tokens(quote!(
7687 config = "multi_copper.ron",
7688 subsystem = "ping",
7689 sim_mode,
7690 ignore_resources
7691 ))
7692 .expect("parse runtime args");
7693
7694 assert_eq!(args.config_path, "multi_copper.ron");
7695 assert_eq!(args.subsystem_id.as_deref(), Some("ping"));
7696 assert!(args.sim_mode);
7697 assert!(args.ignore_resources);
7698 }
7699
7700 #[test]
7701 fn resolve_runtime_config_from_multi_config_selects_local_subsystem() {
7702 use super::*;
7703
7704 let root = unique_test_dir("multi_runtime_resolve");
7705 let alpha_config = root.join("alpha.ron");
7706 let beta_config = root.join("beta.ron");
7707 let network_config = root.join("multi.ron");
7708
7709 write_file(
7710 &alpha_config,
7711 r#"
7712(
7713 tasks: [
7714 (id: "src", type: "AlphaSource", run_in_sim: true),
7715 (id: "sink", type: "AlphaSink", run_in_sim: true),
7716 ],
7717 cnx: [
7718 (src: "src", dst: "sink", msg: "u32"),
7719 ],
7720)
7721"#,
7722 );
7723 write_file(
7724 &beta_config,
7725 r#"
7726(
7727 tasks: [
7728 (id: "src", type: "BetaSource", run_in_sim: true),
7729 (id: "sink", type: "BetaSink", run_in_sim: true),
7730 ],
7731 cnx: [
7732 (src: "src", dst: "sink", msg: "u64"),
7733 ],
7734)
7735"#,
7736 );
7737 write_file(
7738 &network_config,
7739 r#"
7740(
7741 subsystems: [
7742 (id: "beta", config: "beta.ron"),
7743 (id: "alpha", config: "alpha.ron"),
7744 ],
7745 interconnects: [],
7746)
7747"#,
7748 );
7749
7750 let args = CopperRuntimeArgs {
7751 config_path: "multi.ron".to_string(),
7752 subsystem_id: Some("beta".to_string()),
7753 sim_mode: false,
7754 ignore_resources: false,
7755 };
7756
7757 let resolved =
7758 resolve_runtime_config_with_root(&args, &root).expect("resolve multi runtime config");
7759
7760 assert_eq!(resolved.subsystem_id.as_deref(), Some("beta"));
7761 assert_eq!(resolved.subsystem_code, 1);
7762 let graph = resolved
7763 .local_config
7764 .get_graph(None)
7765 .expect("resolved local config graph");
7766 assert!(graph.get_node_id_by_name("src").is_some());
7767 assert!(resolved.bundled_local_config_content.contains("BetaSource"));
7768 }
7769
7770 #[test]
7771 fn resolve_runtime_config_rejects_missing_subsystem() {
7772 use super::*;
7773
7774 let root = unique_test_dir("multi_runtime_missing_subsystem");
7775 let alpha_config = root.join("alpha.ron");
7776 let network_config = root.join("multi.ron");
7777
7778 write_file(
7779 &alpha_config,
7780 r#"
7781(
7782 tasks: [
7783 (id: "src", type: "AlphaSource", run_in_sim: true),
7784 (id: "sink", type: "AlphaSink", run_in_sim: true),
7785 ],
7786 cnx: [
7787 (src: "src", dst: "sink", msg: "u32"),
7788 ],
7789)
7790"#,
7791 );
7792 write_file(
7793 &network_config,
7794 r#"
7795(
7796 subsystems: [
7797 (id: "alpha", config: "alpha.ron"),
7798 ],
7799 interconnects: [],
7800)
7801"#,
7802 );
7803
7804 let args = CopperRuntimeArgs {
7805 config_path: "multi.ron".to_string(),
7806 subsystem_id: Some("missing".to_string()),
7807 sim_mode: false,
7808 ignore_resources: false,
7809 };
7810
7811 let err = resolve_runtime_config_with_root(&args, &root).expect_err("missing subsystem");
7812 assert!(err.to_string().contains("Subsystem 'missing'"));
7813 }
7814}