1use proc_macro::TokenStream;
2use quote::{format_ident, quote};
3use std::collections::{BTreeMap, HashMap};
4use std::fs::read_to_string;
5use syn::Fields::{Named, Unnamed};
6use syn::meta::parser;
7use syn::{
8 Field, Fields, ItemImpl, ItemStruct, LitStr, Type, TypeTuple, parse_macro_input, parse_quote,
9 parse_str,
10};
11
12#[cfg(feature = "macro_debug")]
13use crate::format::rustfmt_generated_code;
14use crate::utils::{config_id_to_bridge_const, config_id_to_enum, config_id_to_struct_member};
15use cu29_runtime::config::CuConfig;
16use cu29_runtime::config::{
17 BridgeChannelConfigRepresentation, CuGraph, Flavor, Node, NodeId, ResourceBundleConfig,
18 read_configuration,
19};
20use cu29_runtime::curuntime::{
21 CuExecutionLoop, CuExecutionStep, CuExecutionUnit, CuTaskType, compute_runtime_plan,
22 find_task_type_for_id,
23};
24use cu29_traits::{CuError, CuResult};
25use proc_macro2::{Ident, Span};
26
27mod bundle_resources;
28mod format;
29mod resources;
30mod utils;
31
32const DEFAULT_CLNB: usize = 2; #[inline]
35fn int2sliceindex(i: u32) -> syn::Index {
36 syn::Index::from(i as usize)
37}
38
39#[inline(always)]
40fn return_error(msg: String) -> TokenStream {
41 syn::Error::new(Span::call_site(), msg)
42 .to_compile_error()
43 .into()
44}
45
46fn rtsan_guard_tokens() -> proc_macro2::TokenStream {
47 if cfg!(feature = "rtsan") {
48 quote! {
49 let _rt_guard = ::cu29::rtsan::ScopedSanitizeRealtime::default();
50 }
51 } else {
52 quote! {}
53 }
54}
55
56#[proc_macro]
57pub fn resources(input: TokenStream) -> TokenStream {
58 resources::resources(input)
59}
60
61#[proc_macro]
62pub fn bundle_resources(input: TokenStream) -> TokenStream {
63 bundle_resources::bundle_resources(input)
64}
65
66#[proc_macro]
70pub fn gen_cumsgs(config_path_lit: TokenStream) -> TokenStream {
71 #[cfg(feature = "std")]
72 let std = true;
73
74 #[cfg(not(feature = "std"))]
75 let std = false;
76
77 let config = parse_macro_input!(config_path_lit as LitStr).value();
78 if !std::path::Path::new(&config_full_path(&config)).exists() {
79 return return_error(format!(
80 "The configuration file `{config}` does not exist. Please provide a valid path."
81 ));
82 }
83 #[cfg(feature = "macro_debug")]
84 eprintln!("[gen culist support with {config:?}]");
85 let cuconfig = match read_config(&config) {
86 Ok(cuconfig) => cuconfig,
87 Err(e) => return return_error(e.to_string()),
88 };
89 let graph = cuconfig
90 .get_graph(None) .expect("Could not find the specified mission for gen_cumsgs");
92 let task_specs = CuTaskSpecSet::from_graph(graph);
93 let channel_usage = collect_bridge_channel_usage(graph);
94 let mut bridge_specs = build_bridge_specs(&cuconfig, graph, &channel_usage);
95 let (culist_plan, exec_entities, plan_to_original) =
96 match build_execution_plan(graph, &task_specs, &mut bridge_specs) {
97 Ok(plan) => plan,
98 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
99 };
100 let task_member_names = collect_task_member_names(graph);
101 let (culist_order, node_output_positions) = collect_culist_metadata(
102 &culist_plan,
103 &exec_entities,
104 &mut bridge_specs,
105 &plan_to_original,
106 );
107
108 #[cfg(feature = "macro_debug")]
109 eprintln!(
110 "[The CuStampedDataSet matching tasks ids are {:?}]",
111 culist_order
112 );
113
114 let support = gen_culist_support(
115 &culist_plan,
116 &culist_order,
117 &node_output_positions,
118 &task_member_names,
119 );
120
121 let extra_imports = if !std {
122 quote! {
123 use core::fmt::Debug;
124 use core::fmt::Formatter;
125 use core::fmt::Result as FmtResult;
126 use alloc::vec;
127 }
128 } else {
129 quote! {
130 use std::fmt::Debug;
131 use std::fmt::Formatter;
132 use std::fmt::Result as FmtResult;
133 }
134 };
135
136 let with_uses = quote! {
137 mod cumsgs {
138 use cu29::bincode::Encode;
139 use cu29::bincode::enc::Encoder;
140 use cu29::bincode::error::EncodeError;
141 use cu29::bincode::Decode;
142 use cu29::bincode::de::Decoder;
143 use cu29::bincode::error::DecodeError;
144 use cu29::copperlist::CopperList;
145 use cu29::prelude::ErasedCuStampedData;
146 use cu29::prelude::ErasedCuStampedDataSet;
147 use cu29::prelude::MatchingTasks;
148 use cu29::prelude::Serialize;
149 use cu29::prelude::CuMsg;
150 use cu29::prelude::CuMsgMetadata;
151 use cu29::prelude::CuListZeroedInit;
152 use cu29::prelude::CuCompactString;
153 #extra_imports
154 #support
155 }
156 use cumsgs::CuStampedDataSet;
157 type CuMsgs=CuStampedDataSet;
158 };
159 with_uses.into()
160}
161
162fn gen_culist_support(
164 runtime_plan: &CuExecutionLoop,
165 culist_indices_in_plan_order: &[usize],
166 node_output_positions: &HashMap<NodeId, usize>,
167 task_member_names: &[(NodeId, String)],
168) -> proc_macro2::TokenStream {
169 #[cfg(feature = "macro_debug")]
170 eprintln!("[Extract msgs types]");
171 let output_packs = extract_output_packs(runtime_plan);
172 let slot_types: Vec<Type> = output_packs.iter().map(|pack| pack.slot_type()).collect();
173
174 let culist_size = output_packs.len();
175
176 #[cfg(feature = "macro_debug")]
177 eprintln!("[build the copperlist struct]");
178 let msgs_types_tuple: TypeTuple = build_culist_tuple(&slot_types);
179
180 #[cfg(feature = "macro_debug")]
181 eprintln!("[build the copperlist tuple bincode support]");
182 let msgs_types_tuple_encode = build_culist_tuple_encode(&slot_types);
183 let msgs_types_tuple_decode = build_culist_tuple_decode(&slot_types);
184
185 #[cfg(feature = "macro_debug")]
186 eprintln!("[build the copperlist tuple debug support]");
187 let msgs_types_tuple_debug = build_culist_tuple_debug(&slot_types);
188
189 #[cfg(feature = "macro_debug")]
190 eprintln!("[build the copperlist tuple serialize support]");
191 let msgs_types_tuple_serialize = build_culist_tuple_serialize(&slot_types);
192
193 #[cfg(feature = "macro_debug")]
194 eprintln!("[build the default tuple support]");
195 let msgs_types_tuple_default = build_culist_tuple_default(&slot_types);
196
197 #[cfg(feature = "macro_debug")]
198 eprintln!("[build erasedcumsgs]");
199
200 let erasedmsg_trait_impl = build_culist_erasedcumsgs(&output_packs);
201
202 let metadata_accessors: Vec<proc_macro2::TokenStream> = culist_indices_in_plan_order
203 .iter()
204 .map(|idx| {
205 let slot_index = syn::Index::from(*idx);
206 let pack = output_packs
207 .get(*idx)
208 .unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
209 if pack.is_multi() {
210 quote! { &culist.msgs.0.#slot_index.0.metadata }
211 } else {
212 quote! { &culist.msgs.0.#slot_index.metadata }
213 }
214 })
215 .collect();
216 let mut zeroed_init_tokens: Vec<proc_macro2::TokenStream> = Vec::new();
217 for idx in culist_indices_in_plan_order {
218 let slot_index = syn::Index::from(*idx);
219 let pack = output_packs
220 .get(*idx)
221 .unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
222 if pack.is_multi() {
223 for port_idx in 0..pack.msg_types.len() {
224 let port_index = syn::Index::from(port_idx);
225 zeroed_init_tokens.push(quote! {
226 self.0.#slot_index.#port_index.metadata.status_txt = CuCompactString::default();
227 });
228 }
229 } else {
230 zeroed_init_tokens.push(quote! {
231 self.0.#slot_index.metadata.status_txt = CuCompactString::default();
232 });
233 }
234 }
235 let collect_metadata_function = quote! {
236 pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a CuMsgMetadata; #culist_size] {
237 [#( #metadata_accessors, )*]
238 }
239 };
240
241 let task_name_literals: Vec<String> = task_member_names
242 .iter()
243 .map(|(_, name)| name.clone())
244 .collect();
245
246 let mut methods = Vec::new();
247 for (node_id, name) in task_member_names {
248 let output_position = node_output_positions
249 .get(node_id)
250 .unwrap_or_else(|| panic!("Task {name} (id: {node_id}) not found in execution order"));
251 let pack = output_packs
252 .get(*output_position)
253 .unwrap_or_else(|| panic!("Missing output pack for task {name}"));
254 let slot_index = syn::Index::from(*output_position);
255
256 if pack.msg_types.len() == 1 {
257 let fn_name = format_ident!("get_{}_output", name);
258 let payload_type = pack.msg_types.first().unwrap();
259 methods.push(quote! {
260 #[allow(dead_code)]
261 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
262 &self.0.#slot_index
263 }
264 });
265 } else {
266 let outputs_fn = format_ident!("get_{}_outputs", name);
267 let slot_type = pack.slot_type();
268 for (port_idx, payload_type) in pack.msg_types.iter().enumerate() {
269 let fn_name = format_ident!("get_{}_output_{}", name, port_idx);
270 let port_index = syn::Index::from(port_idx);
271 methods.push(quote! {
272 #[allow(dead_code)]
273 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
274 &self.0.#slot_index.#port_index
275 }
276 });
277 }
278 methods.push(quote! {
279 #[allow(dead_code)]
280 pub fn #outputs_fn(&self) -> &#slot_type {
281 &self.0.#slot_index
282 }
283 });
284 }
285 }
286
287 quote! {
289 #collect_metadata_function
290
291 pub struct CuStampedDataSet(pub #msgs_types_tuple);
292
293 pub type CuList = CopperList<CuStampedDataSet>;
294
295 impl CuStampedDataSet {
296 #(#methods)*
297
298 #[allow(dead_code)]
299 fn get_tuple(&self) -> &#msgs_types_tuple {
300 &self.0
301 }
302
303 #[allow(dead_code)]
304 fn get_tuple_mut(&mut self) -> &mut #msgs_types_tuple {
305 &mut self.0
306 }
307 }
308
309 impl MatchingTasks for CuStampedDataSet {
310 #[allow(dead_code)]
311 fn get_all_task_ids() -> &'static [&'static str] {
312 &[#(#task_name_literals),*]
313 }
314 }
315
316 #msgs_types_tuple_encode
318 #msgs_types_tuple_decode
319
320 #msgs_types_tuple_debug
322
323 #msgs_types_tuple_serialize
325
326 #msgs_types_tuple_default
328
329 #erasedmsg_trait_impl
331
332 impl CuListZeroedInit for CuStampedDataSet {
333 fn init_zeroed(&mut self) {
334 #(#zeroed_init_tokens)*
335 }
336 }
337 }
338}
339
340fn gen_sim_support(
341 runtime_plan: &CuExecutionLoop,
342 exec_entities: &[ExecutionEntity],
343) -> proc_macro2::TokenStream {
344 #[cfg(feature = "macro_debug")]
345 eprintln!("[Sim: Build SimEnum]");
346 let plan_enum: Vec<proc_macro2::TokenStream> = runtime_plan
347 .steps
348 .iter()
349 .filter_map(|unit| match unit {
350 CuExecutionUnit::Step(step) => {
351 if !matches!(
352 exec_entities[step.node_id as usize].kind,
353 ExecutionEntityKind::Task { .. }
354 ) {
355 return None;
356 }
357 let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
358 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
359 let inputs: Vec<Type> = step
360 .input_msg_indices_types
361 .iter()
362 .map(|input| {
363 parse_str::<Type>(format!("CuMsg<{}>", input.msg_type).as_str()).unwrap()
364 })
365 .collect();
366 let output: Option<Type> = step.output_msg_pack.as_ref().map(|pack| {
367 let msg_types: Vec<Type> = pack
368 .msg_types
369 .iter()
370 .map(|msg_type| {
371 parse_str::<Type>(msg_type.as_str()).unwrap_or_else(|_| {
372 panic!("Could not transform {msg_type} into a message Rust type.")
373 })
374 })
375 .collect();
376 build_output_slot_type(&msg_types)
377 });
378 let no_output = parse_str::<Type>("CuMsg<()>").unwrap();
379 let output = output.as_ref().unwrap_or(&no_output);
380
381 let inputs_type = if inputs.is_empty() {
382 quote! { () }
383 } else if inputs.len() == 1 {
384 let input = inputs.first().unwrap();
385 quote! { &'a #input }
386 } else {
387 quote! { &'a (#(&'a #inputs),*) }
388 };
389
390 Some(quote! {
391 #enum_ident(CuTaskCallbackState<#inputs_type, &'a mut #output>)
392 })
393 }
394 CuExecutionUnit::Loop(_) => {
395 todo!("Needs to be implemented")
396 }
397 })
398 .collect();
399 let mut variants = plan_enum;
400 variants.push(quote! { __Phantom(core::marker::PhantomData<&'a ()>) });
401 quote! {
402 #[allow(dead_code, unused_lifetimes)]
404 pub enum SimStep<'a> {
405 #(#variants),*
406 }
407 }
408}
409
410#[proc_macro_attribute]
414pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
415 #[cfg(feature = "macro_debug")]
416 eprintln!("[entry]");
417 let mut application_struct = parse_macro_input!(input as ItemStruct);
418
419 let application_name = &application_struct.ident;
420 let builder_name = format_ident!("{}Builder", application_name);
421
422 let mut config_file: Option<LitStr> = None;
423 let mut sim_mode = false;
424
425 #[cfg(feature = "std")]
426 let std = true;
427
428 #[cfg(not(feature = "std"))]
429 let std = false;
430
431 let rt_guard = rtsan_guard_tokens();
432
433 let attribute_config_parser = parser(|meta| {
435 if meta.path.is_ident("config") {
436 config_file = Some(meta.value()?.parse()?);
437 Ok(())
438 } else if meta.path.is_ident("sim_mode") {
439 if meta.input.peek(syn::Token![=]) {
441 meta.input.parse::<syn::Token![=]>()?;
442 let value: syn::LitBool = meta.input.parse()?;
443 sim_mode = value.value();
444 Ok(())
445 } else {
446 sim_mode = true;
448 Ok(())
449 }
450 } else {
451 Err(meta.error("unsupported property"))
452 }
453 });
454
455 #[cfg(feature = "macro_debug")]
456 eprintln!("[parse]");
457 parse_macro_input!(args with attribute_config_parser);
459
460 let config_file = match config_file {
471 Some(file) => file.value(),
472 None => {
473 return return_error(
474 "Expected config file attribute like #[CopperRuntime(config = \"path\")]"
475 .to_string(),
476 );
477 }
478 };
479
480 if !std::path::Path::new(&config_full_path(&config_file)).exists() {
481 return return_error(format!(
482 "The configuration file `{config_file}` does not exist. Please provide a valid path."
483 ));
484 }
485
486 let copper_config = match read_config(&config_file) {
487 Ok(cuconfig) => cuconfig,
488 Err(e) => return return_error(e.to_string()),
489 };
490 let copper_config_content = match read_to_string(config_full_path(config_file.as_str())) {
491 Ok(ok) => ok,
492 Err(e) => {
493 return return_error(format!(
494 "Could not read the config file (should not happen because we just succeeded just before). {e}"
495 ));
496 }
497 };
498
499 #[cfg(feature = "macro_debug")]
500 eprintln!("[build monitor type]");
501 let monitor_type = if let Some(monitor_config) = copper_config.get_monitor_config() {
502 let monitor_type = parse_str::<Type>(monitor_config.get_type())
503 .expect("Could not transform the monitor type name into a Rust type.");
504 quote! { #monitor_type }
505 } else {
506 quote! { NoMonitor }
507 };
508
509 #[cfg(feature = "macro_debug")]
511 eprintln!("[build runtime field]");
512 let runtime_field: Field = if sim_mode {
514 parse_quote! {
515 copper_runtime: cu29::curuntime::CuRuntime<CuSimTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
516 }
517 } else {
518 parse_quote! {
519 copper_runtime: cu29::curuntime::CuRuntime<CuTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
520 }
521 };
522
523 #[cfg(feature = "macro_debug")]
524 eprintln!("[match struct anonymity]");
525 match &mut application_struct.fields {
526 Named(fields_named) => {
527 fields_named.named.push(runtime_field);
528 }
529 Unnamed(fields_unnamed) => {
530 fields_unnamed.unnamed.push(runtime_field);
531 }
532 Fields::Unit => {
533 panic!(
534 "This struct is a unit struct, it should have named or unnamed fields. use struct Something {{}} and not struct Something;"
535 )
536 }
537 };
538
539 let all_missions = copper_config.graphs.get_all_missions_graphs();
540 let mut all_missions_tokens = Vec::<proc_macro2::TokenStream>::new();
541 for (mission, graph) in &all_missions {
542 let mission_mod = parse_str::<Ident>(mission.as_str())
543 .expect("Could not make an identifier of the mission name");
544
545 #[cfg(feature = "macro_debug")]
546 eprintln!("[extract tasks ids & types]");
547 let task_specs = CuTaskSpecSet::from_graph(graph);
548
549 let culist_channel_usage = collect_bridge_channel_usage(graph);
550 let mut culist_bridge_specs =
551 build_bridge_specs(&copper_config, graph, &culist_channel_usage);
552 let (culist_plan, culist_exec_entities, culist_plan_to_original) =
553 match build_execution_plan(graph, &task_specs, &mut culist_bridge_specs) {
554 Ok(plan) => plan,
555 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
556 };
557 let task_member_names = collect_task_member_names(graph);
558 let (culist_call_order, node_output_positions) = collect_culist_metadata(
559 &culist_plan,
560 &culist_exec_entities,
561 &mut culist_bridge_specs,
562 &culist_plan_to_original,
563 );
564
565 #[cfg(feature = "macro_debug")]
566 {
567 eprintln!("[runtime plan for mission {mission}]");
568 eprintln!("{culist_plan:?}");
569 }
570
571 let culist_support: proc_macro2::TokenStream = gen_culist_support(
572 &culist_plan,
573 &culist_call_order,
574 &node_output_positions,
575 &task_member_names,
576 );
577
578 let bundle_specs = match build_bundle_specs(&copper_config, mission.as_str()) {
579 Ok(specs) => specs,
580 Err(e) => return return_error(e.to_string()),
581 };
582 let threadpool_bundle_index = if task_specs.background_flags.iter().any(|&flag| flag) {
583 match bundle_specs
584 .iter()
585 .position(|bundle| bundle.id == "threadpool")
586 {
587 Some(index) => Some(index),
588 None => {
589 return return_error(
590 "Background tasks require the threadpool bundle to be configured"
591 .to_string(),
592 );
593 }
594 }
595 } else {
596 None
597 };
598
599 let resource_specs =
600 match collect_resource_specs(graph, &task_specs, &culist_bridge_specs, &bundle_specs) {
601 Ok(specs) => specs,
602 Err(e) => return return_error(e.to_string()),
603 };
604
605 let (resources_module, resources_instanciator_fn) =
606 match build_resources_module(&bundle_specs) {
607 Ok(tokens) => tokens,
608 Err(e) => return return_error(e.to_string()),
609 };
610 let task_resource_mappings =
611 match build_task_resource_mappings(&resource_specs, &task_specs) {
612 Ok(tokens) => tokens,
613 Err(e) => return return_error(e.to_string()),
614 };
615 let bridge_resource_mappings =
616 build_bridge_resource_mappings(&resource_specs, &culist_bridge_specs);
617
618 let ids = build_monitored_ids(&task_specs.ids, &mut culist_bridge_specs);
619
620 let bridge_types: Vec<Type> = culist_bridge_specs
621 .iter()
622 .map(|spec| spec.type_path.clone())
623 .collect();
624 let bridges_type_tokens: proc_macro2::TokenStream = if bridge_types.is_empty() {
625 quote! { () }
626 } else {
627 let tuple: TypeTuple = parse_quote! { (#(#bridge_types),*,) };
628 quote! { #tuple }
629 };
630
631 let bridge_binding_idents: Vec<Ident> = culist_bridge_specs
632 .iter()
633 .enumerate()
634 .map(|(idx, _)| format_ident!("bridge_{idx}"))
635 .collect();
636
637 let bridge_init_statements: Vec<proc_macro2::TokenStream> = culist_bridge_specs
638 .iter()
639 .enumerate()
640 .map(|(idx, spec)| {
641 let binding_ident = &bridge_binding_idents[idx];
642 let bridge_mapping_ref = bridge_resource_mappings.refs[idx].clone();
643 let bridge_type = &spec.type_path;
644 let bridge_name = spec.id.clone();
645 let config_index = syn::Index::from(spec.config_index);
646 let binding_error = LitStr::new(
647 &format!("Failed to bind resources for bridge '{}'", bridge_name),
648 Span::call_site(),
649 );
650 let tx_configs: Vec<proc_macro2::TokenStream> = spec
651 .tx_channels
652 .iter()
653 .map(|channel| {
654 let const_ident = &channel.const_ident;
655 let channel_name = channel.id.clone();
656 let channel_config_index = syn::Index::from(channel.config_index);
657 quote! {
658 {
659 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
660 cu29::config::BridgeChannelConfigRepresentation::Tx { route, config, .. } => {
661 (route.clone(), config.clone())
662 }
663 _ => panic!(
664 "Bridge '{}' channel '{}' expected to be Tx",
665 #bridge_name,
666 #channel_name
667 ),
668 };
669 cu29::cubridge::BridgeChannelConfig::from_static(
670 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
671 channel_route,
672 channel_config,
673 )
674 }
675 }
676 })
677 .collect();
678 let rx_configs: Vec<proc_macro2::TokenStream> = spec
679 .rx_channels
680 .iter()
681 .map(|channel| {
682 let const_ident = &channel.const_ident;
683 let channel_name = channel.id.clone();
684 let channel_config_index = syn::Index::from(channel.config_index);
685 quote! {
686 {
687 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
688 cu29::config::BridgeChannelConfigRepresentation::Rx { route, config, .. } => {
689 (route.clone(), config.clone())
690 }
691 _ => panic!(
692 "Bridge '{}' channel '{}' expected to be Rx",
693 #bridge_name,
694 #channel_name
695 ),
696 };
697 cu29::cubridge::BridgeChannelConfig::from_static(
698 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
699 channel_route,
700 channel_config,
701 )
702 }
703 }
704 })
705 .collect();
706 quote! {
707 let #binding_ident = {
708 let bridge_cfg = config
709 .bridges
710 .get(#config_index)
711 .unwrap_or_else(|| panic!("Bridge '{}' missing from configuration", #bridge_name));
712 let bridge_mapping = #bridge_mapping_ref;
713 let bridge_resources = <<#bridge_type as cu29::cubridge::CuBridge>::Resources<'_> as ResourceBindings>::from_bindings(
714 resources,
715 bridge_mapping,
716 )
717 .map_err(|e| cu29::CuError::new_with_cause(#binding_error, e))?;
718 let tx_channels: &[cu29::cubridge::BridgeChannelConfig<
719 <<#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet>::Id,
720 >] = &[#(#tx_configs),*];
721 let rx_channels: &[cu29::cubridge::BridgeChannelConfig<
722 <<#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet>::Id,
723 >] = &[#(#rx_configs),*];
724 <#bridge_type as cu29::cubridge::CuBridge>::new(
725 bridge_cfg.config.as_ref(),
726 tx_channels,
727 rx_channels,
728 bridge_resources,
729 )?
730 };
731 }
732 })
733 .collect();
734
735 let bridges_instanciator = if culist_bridge_specs.is_empty() {
736 quote! {
737 pub fn bridges_instanciator(_config: &CuConfig, resources: &mut ResourceManager) -> CuResult<CuBridges> {
738 let _ = resources;
739 Ok(())
740 }
741 }
742 } else {
743 let bridge_bindings = bridge_binding_idents.clone();
744 quote! {
745 pub fn bridges_instanciator(config: &CuConfig, resources: &mut ResourceManager) -> CuResult<CuBridges> {
746 #(#bridge_init_statements)*
747 Ok((#(#bridge_bindings),*,))
748 }
749 }
750 };
751
752 let all_sim_tasks_types: Vec<Type> = task_specs
753 .ids
754 .iter()
755 .zip(&task_specs.cutypes)
756 .zip(&task_specs.sim_task_types)
757 .zip(&task_specs.background_flags)
758 .zip(&task_specs.run_in_sim_flags)
759 .zip(task_specs.output_types.iter())
760 .map(|(((((task_id, task_type), sim_type), background), run_in_sim), output_type)| {
761 match task_type {
762 CuTaskType::Source => {
763 if *background {
764 panic!("CuSrcTask {task_id} cannot be a background task, it should be a regular task.");
765 }
766 if *run_in_sim {
767 sim_type.clone()
768 } else {
769 let msg_type = graph
770 .get_node_output_msg_type(task_id.as_str())
771 .unwrap_or_else(|| panic!("CuSrcTask {task_id} should have an outgoing connection with a valid output msg type"));
772 let sim_task_name = format!("CuSimSrcTask<{msg_type}>");
773 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
774 }
775 }
776 CuTaskType::Regular => {
777 if *background {
778 if let Some(out_ty) = output_type {
779 parse_quote!(CuAsyncTask<#sim_type, #out_ty>)
780 } else {
781 panic!("{task_id}: If a task is background, it has to have an output");
782 }
783 } else {
784 sim_type.clone()
786 }
787 },
788 CuTaskType::Sink => {
789 if *background {
790 panic!("CuSinkTask {task_id} cannot be a background task, it should be a regular task.");
791 }
792
793 if *run_in_sim {
794 sim_type.clone()
796 }
797 else {
798 let msg_types = graph
800 .get_node_input_msg_types(task_id.as_str())
801 .unwrap_or_else(|| panic!("CuSinkTask {task_id} should have an incoming connection with a valid input msg type"));
802 let msg_type = if msg_types.len() == 1 {
803 format!("({},)", msg_types[0])
804 } else {
805 format!("({})", msg_types.join(", "))
806 };
807 let sim_task_name = format!("CuSimSinkTask<{msg_type}>");
808 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
809 }
810 }
811 }
812 })
813 .collect();
814
815 #[cfg(feature = "macro_debug")]
816 eprintln!("[build task tuples]");
817
818 let task_types = &task_specs.task_types;
819 let task_types_tuple: TypeTuple = if task_types.is_empty() {
822 parse_quote! { () }
823 } else {
824 parse_quote! { (#(#task_types),*,) }
825 };
826
827 let task_types_tuple_sim: TypeTuple = if all_sim_tasks_types.is_empty() {
828 parse_quote! { () }
829 } else {
830 parse_quote! { (#(#all_sim_tasks_types),*,) }
831 };
832
833 #[cfg(feature = "macro_debug")]
834 eprintln!("[gen instances]");
835 let task_sim_instances_init_code = all_sim_tasks_types
836 .iter()
837 .enumerate()
838 .map(|(index, ty)| {
839 let additional_error_info = format!(
840 "Failed to get create instance for {}, instance index {}.",
841 task_specs.type_names[index], index
842 );
843 let mapping_ref = task_resource_mappings.refs[index].clone();
844 let background = task_specs.background_flags[index];
845 let inner_task_type = &task_specs.sim_task_types[index];
846 match task_specs.cutypes[index] {
847 CuTaskType::Source => quote! {
848 {
849 let resources = <<#ty as CuSrcTask>::Resources<'_> as ResourceBindings>::from_bindings(
850 resources,
851 #mapping_ref,
852 ).map_err(|e| e.add_cause(#additional_error_info))?;
853 <#ty as CuSrcTask>::new(all_instances_configs[#index], resources)
854 .map_err(|e| e.add_cause(#additional_error_info))?
855 }
856 },
857 CuTaskType::Regular => {
858 if background {
859 let threadpool_bundle_index = threadpool_bundle_index
860 .expect("threadpool bundle missing for background tasks");
861 quote! {
862 {
863 let inner_resources = <<#inner_task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
864 resources,
865 #mapping_ref,
866 ).map_err(|e| e.add_cause(#additional_error_info))?;
867 let threadpool_key = cu29::resource::ResourceKey::new(
868 cu29::resource::BundleIndex::new(#threadpool_bundle_index),
869 <cu29::resource::ThreadPoolBundle as cu29::resource::ResourceBundleDecl>::Id::BgThreads as usize,
870 );
871 let threadpool = resources.borrow_shared_arc(threadpool_key)?;
872 let resources = cu29::cuasynctask::CuAsyncTaskResources {
873 inner: inner_resources,
874 threadpool,
875 };
876 <#ty as CuTask>::new(all_instances_configs[#index], resources)
877 .map_err(|e| e.add_cause(#additional_error_info))?
878 }
879 }
880 } else {
881 quote! {
882 {
883 let resources = <<#ty as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
884 resources,
885 #mapping_ref,
886 ).map_err(|e| e.add_cause(#additional_error_info))?;
887 <#ty as CuTask>::new(all_instances_configs[#index], resources)
888 .map_err(|e| e.add_cause(#additional_error_info))?
889 }
890 }
891 }
892 }
893 CuTaskType::Sink => quote! {
894 {
895 let resources = <<#ty as CuSinkTask>::Resources<'_> as ResourceBindings>::from_bindings(
896 resources,
897 #mapping_ref,
898 ).map_err(|e| e.add_cause(#additional_error_info))?;
899 <#ty as CuSinkTask>::new(all_instances_configs[#index], resources)
900 .map_err(|e| e.add_cause(#additional_error_info))?
901 }
902 },
903 }
904 })
905 .collect::<Vec<_>>();
906
907 let task_instances_init_code = task_specs
908 .instantiation_types
909 .iter()
910 .zip(&task_specs.background_flags)
911 .enumerate()
912 .map(|(index, (task_type, background))| {
913 let additional_error_info = format!(
914 "Failed to get create instance for {}, instance index {}.",
915 task_specs.type_names[index], index
916 );
917 let mapping_ref = task_resource_mappings.refs[index].clone();
918 let inner_task_type = &task_specs.sim_task_types[index];
919 match task_specs.cutypes[index] {
920 CuTaskType::Source => quote! {
921 {
922 let resources = <<#task_type as CuSrcTask>::Resources<'_> as ResourceBindings>::from_bindings(
923 resources,
924 #mapping_ref,
925 ).map_err(|e| e.add_cause(#additional_error_info))?;
926 <#task_type as CuSrcTask>::new(all_instances_configs[#index], resources)
927 .map_err(|e| e.add_cause(#additional_error_info))?
928 }
929 },
930 CuTaskType::Regular => {
931 if *background {
932 let threadpool_bundle_index = threadpool_bundle_index
933 .expect("threadpool bundle missing for background tasks");
934 quote! {
935 {
936 let inner_resources = <<#inner_task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
937 resources,
938 #mapping_ref,
939 ).map_err(|e| e.add_cause(#additional_error_info))?;
940 let threadpool_key = cu29::resource::ResourceKey::new(
941 cu29::resource::BundleIndex::new(#threadpool_bundle_index),
942 <cu29::resource::ThreadPoolBundle as cu29::resource::ResourceBundleDecl>::Id::BgThreads as usize,
943 );
944 let threadpool = resources.borrow_shared_arc(threadpool_key)?;
945 let resources = cu29::cuasynctask::CuAsyncTaskResources {
946 inner: inner_resources,
947 threadpool,
948 };
949 <#task_type as CuTask>::new(all_instances_configs[#index], resources)
950 .map_err(|e| e.add_cause(#additional_error_info))?
951 }
952 }
953 } else {
954 quote! {
955 {
956 let resources = <<#task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
957 resources,
958 #mapping_ref,
959 ).map_err(|e| e.add_cause(#additional_error_info))?;
960 <#task_type as CuTask>::new(all_instances_configs[#index], resources)
961 .map_err(|e| e.add_cause(#additional_error_info))?
962 }
963 }
964 }
965 }
966 CuTaskType::Sink => quote! {
967 {
968 let resources = <<#task_type as CuSinkTask>::Resources<'_> as ResourceBindings>::from_bindings(
969 resources,
970 #mapping_ref,
971 ).map_err(|e| e.add_cause(#additional_error_info))?;
972 <#task_type as CuSinkTask>::new(all_instances_configs[#index], resources)
973 .map_err(|e| e.add_cause(#additional_error_info))?
974 }
975 },
976 }
977 })
978 .collect::<Vec<_>>();
979
980 let (
983 task_restore_code,
984 task_start_calls,
985 task_stop_calls,
986 task_preprocess_calls,
987 task_postprocess_calls,
988 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>, Vec<_>) = itertools::multiunzip(
989 (0..task_specs.task_types.len())
990 .map(|index| {
991 let task_index = int2sliceindex(index as u32);
992 let task_tuple_index = syn::Index::from(index);
993 let task_enum_name = config_id_to_enum(&task_specs.ids[index]);
994 let enum_name = Ident::new(&task_enum_name, Span::call_site());
995 (
996 quote! {
998 tasks.#task_tuple_index.thaw(&mut decoder).map_err(|e| CuError::from("Failed to thaw").add_cause(&e.to_string()))?
999 },
1000 { let monitoring_action = quote! {
1002 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Start, &error);
1003 match decision {
1004 Decision::Abort => {
1005 debug!("Start: ABORT decision from monitoring. Task '{}' errored out \
1006 during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
1007 return Ok(());
1008
1009 }
1010 Decision::Ignore => {
1011 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out \
1012 during start. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
1013 }
1014 Decision::Shutdown => {
1015 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out \
1016 during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
1017 return Err(CuError::new_with_cause("Task errored out during start.", error));
1018 }
1019 }
1020 };
1021
1022 let call_sim_callback = if sim_mode {
1023 quote! {
1024 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Start));
1026
1027 let doit = if let SimOverride::Errored(reason) = ovr {
1028 let error: CuError = reason.into();
1029 #monitoring_action
1030 false
1031 }
1032 else {
1033 ovr == SimOverride::ExecuteByRuntime
1034 };
1035 }
1036 } else {
1037 quote! {
1038 let doit = true; }
1040 };
1041
1042
1043 quote! {
1044 #call_sim_callback
1045 if doit {
1046 let task = &mut self.copper_runtime.tasks.#task_index;
1047 if let Err(error) = task.start(&self.copper_runtime.clock) {
1048 #monitoring_action
1049 }
1050 }
1051 }
1052 },
1053 { let monitoring_action = quote! {
1055 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Stop, &error);
1056 match decision {
1057 Decision::Abort => {
1058 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out \
1059 during stop. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
1060 return Ok(());
1061
1062 }
1063 Decision::Ignore => {
1064 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out \
1065 during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
1066 }
1067 Decision::Shutdown => {
1068 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out \
1069 during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
1070 return Err(CuError::new_with_cause("Task errored out during stop.", error));
1071 }
1072 }
1073 };
1074 let call_sim_callback = if sim_mode {
1075 quote! {
1076 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Stop));
1078
1079 let doit = if let SimOverride::Errored(reason) = ovr {
1080 let error: CuError = reason.into();
1081 #monitoring_action
1082 false
1083 }
1084 else {
1085 ovr == SimOverride::ExecuteByRuntime
1086 };
1087 }
1088 } else {
1089 quote! {
1090 let doit = true; }
1092 };
1093 quote! {
1094 #call_sim_callback
1095 if doit {
1096 let task = &mut self.copper_runtime.tasks.#task_index;
1097 if let Err(error) = task.stop(&self.copper_runtime.clock) {
1098 #monitoring_action
1099 }
1100 }
1101 }
1102 },
1103 { let monitoring_action = quote! {
1105 let decision = monitor.process_error(#index, CuTaskState::Preprocess, &error);
1106 match decision {
1107 Decision::Abort => {
1108 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out \
1109 during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
1110 return Ok(());
1111
1112 }
1113 Decision::Ignore => {
1114 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out \
1115 during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
1116 }
1117 Decision::Shutdown => {
1118 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
1119 during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
1120 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
1121 }
1122 }
1123 };
1124 let call_sim_callback = if sim_mode {
1125 quote! {
1126 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Preprocess));
1128
1129 let doit = if let SimOverride::Errored(reason) = ovr {
1130 let error: CuError = reason.into();
1131 #monitoring_action
1132 false
1133 } else {
1134 ovr == SimOverride::ExecuteByRuntime
1135 };
1136 }
1137 } else {
1138 quote! {
1139 let doit = true; }
1141 };
1142 quote! {
1143 #call_sim_callback
1144 if doit {
1145 let maybe_error = {
1146 #rt_guard
1147 tasks.#task_index.preprocess(clock)
1148 };
1149 if let Err(error) = maybe_error {
1150 #monitoring_action
1151 }
1152 }
1153 }
1154 },
1155 { let monitoring_action = quote! {
1157 let decision = monitor.process_error(#index, CuTaskState::Postprocess, &error);
1158 match decision {
1159 Decision::Abort => {
1160 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out \
1161 during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
1162 return Ok(());
1163
1164 }
1165 Decision::Ignore => {
1166 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out \
1167 during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
1168 }
1169 Decision::Shutdown => {
1170 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
1171 during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
1172 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
1173 }
1174 }
1175 };
1176 let call_sim_callback = if sim_mode {
1177 quote! {
1178 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Postprocess));
1180
1181 let doit = if let SimOverride::Errored(reason) = ovr {
1182 let error: CuError = reason.into();
1183 #monitoring_action
1184 false
1185 } else {
1186 ovr == SimOverride::ExecuteByRuntime
1187 };
1188 }
1189 } else {
1190 quote! {
1191 let doit = true; }
1193 };
1194 quote! {
1195 #call_sim_callback
1196 if doit {
1197 let maybe_error = {
1198 #rt_guard
1199 tasks.#task_index.postprocess(clock)
1200 };
1201 if let Err(error) = maybe_error {
1202 #monitoring_action
1203 }
1204 }
1205 }
1206 }
1207 )
1208 })
1209 );
1210
1211 let bridge_start_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1212 .iter()
1213 .map(|spec| {
1214 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1215 let monitor_index = syn::Index::from(
1216 spec.monitor_index
1217 .expect("Bridge missing monitor index for start"),
1218 );
1219 quote! {
1220 {
1221 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
1222 if let Err(error) = bridge.start(&self.copper_runtime.clock) {
1223 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Start, &error);
1224 match decision {
1225 Decision::Abort => {
1226 debug!("Start: ABORT decision from monitoring. Task '{}' errored out during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1227 return Ok(());
1228 }
1229 Decision::Ignore => {
1230 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out during start. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1231 }
1232 Decision::Shutdown => {
1233 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1234 return Err(CuError::new_with_cause("Task errored out during start.", error));
1235 }
1236 }
1237 }
1238 }
1239 }
1240 })
1241 .collect();
1242
1243 let bridge_stop_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1244 .iter()
1245 .map(|spec| {
1246 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1247 let monitor_index = syn::Index::from(
1248 spec.monitor_index
1249 .expect("Bridge missing monitor index for stop"),
1250 );
1251 quote! {
1252 {
1253 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
1254 if let Err(error) = bridge.stop(&self.copper_runtime.clock) {
1255 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Stop, &error);
1256 match decision {
1257 Decision::Abort => {
1258 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out during stop. Aborting all the other stops.", #mission_mod::TASKS_IDS[#monitor_index]);
1259 return Ok(());
1260 }
1261 Decision::Ignore => {
1262 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1263 }
1264 Decision::Shutdown => {
1265 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1266 return Err(CuError::new_with_cause("Task errored out during stop.", error));
1267 }
1268 }
1269 }
1270 }
1271 }
1272 })
1273 .collect();
1274
1275 let bridge_preprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1276 .iter()
1277 .map(|spec| {
1278 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1279 let monitor_index = syn::Index::from(
1280 spec.monitor_index
1281 .expect("Bridge missing monitor index for preprocess"),
1282 );
1283 quote! {
1284 {
1285 let bridge = &mut bridges.#bridge_index;
1286 let maybe_error = {
1287 #rt_guard
1288 bridge.preprocess(clock)
1289 };
1290 if let Err(error) = maybe_error {
1291 let decision = monitor.process_error(#monitor_index, CuTaskState::Preprocess, &error);
1292 match decision {
1293 Decision::Abort => {
1294 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1295 return Ok(());
1296 }
1297 Decision::Ignore => {
1298 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1299 }
1300 Decision::Shutdown => {
1301 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1302 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
1303 }
1304 }
1305 }
1306 }
1307 }
1308 })
1309 .collect();
1310
1311 let bridge_postprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1312 .iter()
1313 .map(|spec| {
1314 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1315 let monitor_index = syn::Index::from(
1316 spec.monitor_index
1317 .expect("Bridge missing monitor index for postprocess"),
1318 );
1319 quote! {
1320 {
1321 let bridge = &mut bridges.#bridge_index;
1322 let maybe_error = {
1323 #rt_guard
1324 bridge.postprocess(clock)
1325 };
1326 if let Err(error) = maybe_error {
1327 let decision = monitor.process_error(#monitor_index, CuTaskState::Postprocess, &error);
1328 match decision {
1329 Decision::Abort => {
1330 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1331 return Ok(());
1332 }
1333 Decision::Ignore => {
1334 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1335 }
1336 Decision::Shutdown => {
1337 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1338 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
1339 }
1340 }
1341 }
1342 }
1343 }
1344 })
1345 .collect();
1346
1347 let mut start_calls = bridge_start_calls;
1348 start_calls.extend(task_start_calls);
1349 let mut stop_calls = task_stop_calls;
1350 stop_calls.extend(bridge_stop_calls);
1351 let mut preprocess_calls = bridge_preprocess_calls;
1352 preprocess_calls.extend(task_preprocess_calls);
1353 let mut postprocess_calls = task_postprocess_calls;
1354 postprocess_calls.extend(bridge_postprocess_calls);
1355
1356 let output_pack_sizes = collect_output_pack_sizes(&culist_plan);
1357 let runtime_plan_code_and_logging: Vec<(
1358 proc_macro2::TokenStream,
1359 proc_macro2::TokenStream,
1360 )> = culist_plan
1361 .steps
1362 .iter()
1363 .map(|unit| match unit {
1364 CuExecutionUnit::Step(step) => {
1365 #[cfg(feature = "macro_debug")]
1366 eprintln!(
1367 "{} -> {} as {:?}. task_id: {} Input={:?}, Output={:?}",
1368 step.node.get_id(),
1369 step.node.get_type(),
1370 step.task_type,
1371 step.node_id,
1372 step.input_msg_indices_types,
1373 step.output_msg_pack
1374 );
1375
1376 match &culist_exec_entities[step.node_id as usize].kind {
1377 ExecutionEntityKind::Task { task_index } => generate_task_execution_tokens(
1378 step,
1379 *task_index,
1380 &task_specs,
1381 &output_pack_sizes,
1382 sim_mode,
1383 &mission_mod,
1384 ),
1385 ExecutionEntityKind::BridgeRx {
1386 bridge_index,
1387 channel_index,
1388 } => {
1389 let spec = &culist_bridge_specs[*bridge_index];
1390 generate_bridge_rx_execution_tokens(
1391 step,
1392 spec,
1393 *channel_index,
1394 &mission_mod,
1395 )
1396 }
1397 ExecutionEntityKind::BridgeTx {
1398 bridge_index,
1399 channel_index,
1400 } => {
1401 let spec = &culist_bridge_specs[*bridge_index];
1402 generate_bridge_tx_execution_tokens(
1403 step,
1404 spec,
1405 *channel_index,
1406 &output_pack_sizes,
1407 &mission_mod,
1408 )
1409 }
1410 }
1411 }
1412 CuExecutionUnit::Loop(_) => {
1413 panic!("Execution loops are not supported in runtime generation");
1414 }
1415 })
1416 .collect();
1417
1418 let sim_support = if sim_mode {
1419 Some(gen_sim_support(&culist_plan, &culist_exec_entities))
1420 } else {
1421 None
1422 };
1423
1424 let (new, run_one_iteration, start_all_tasks, stop_all_tasks, run) = if sim_mode {
1425 (
1426 quote! {
1427 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<Self>
1428 },
1429 quote! {
1430 fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1431 },
1432 quote! {
1433 fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1434 },
1435 quote! {
1436 fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1437 },
1438 quote! {
1439 fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1440 },
1441 )
1442 } else {
1443 (
1444 if std {
1445 quote! {
1446 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>) -> CuResult<Self>
1447 }
1448 } else {
1449 quote! {
1450 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>) -> CuResult<Self>
1452 }
1453 },
1454 quote! {
1455 fn run_one_iteration(&mut self) -> CuResult<()>
1456 },
1457 quote! {
1458 fn start_all_tasks(&mut self) -> CuResult<()>
1459 },
1460 quote! {
1461 fn stop_all_tasks(&mut self) -> CuResult<()>
1462 },
1463 quote! {
1464 fn run(&mut self) -> CuResult<()>
1465 },
1466 )
1467 };
1468
1469 let sim_callback_arg = if sim_mode {
1470 Some(quote!(sim_callback))
1471 } else {
1472 None
1473 };
1474
1475 let app_trait = if sim_mode {
1476 quote!(CuSimApplication)
1477 } else {
1478 quote!(CuApplication)
1479 };
1480
1481 let sim_callback_on_new_calls = task_specs.ids.iter().enumerate().map(|(i, id)| {
1482 let enum_name = config_id_to_enum(id);
1483 let enum_ident = Ident::new(&enum_name, Span::call_site());
1484 quote! {
1485 sim_callback(SimStep::#enum_ident(CuTaskCallbackState::New(all_instances_configs[#i].cloned())));
1487 }
1488 });
1489
1490 let sim_callback_on_new = if sim_mode {
1491 Some(quote! {
1492 let graph = config.get_graph(Some(#mission)).expect("Could not find the mission #mission");
1493 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1494 .get_all_nodes()
1495 .iter()
1496 .map(|(_, node)| node.get_instance_config())
1497 .collect();
1498 #(#sim_callback_on_new_calls)*
1499 })
1500 } else {
1501 None
1502 };
1503
1504 let (runtime_plan_code, preprocess_logging_calls): (Vec<_>, Vec<_>) =
1505 itertools::multiunzip(runtime_plan_code_and_logging);
1506
1507 let config_load_stmt = if std {
1508 quote! {
1509 let config = if let Some(overridden_config) = config_override {
1510 debug!("CuConfig: Overridden programmatically: {}", overridden_config.serialize_ron());
1511 overridden_config
1512 } else if ::std::path::Path::new(config_filename).exists() {
1513 debug!("CuConfig: Reading configuration from file: {}", config_filename);
1514 cu29::config::read_configuration(config_filename)?
1515 } else {
1516 let original_config = Self::original_config();
1517 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1518 cu29::config::read_configuration_str(original_config, None)?
1519 };
1520 }
1521 } else {
1522 quote! {
1523 let original_config = Self::original_config();
1525 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1526 let config = cu29::config::read_configuration_str(original_config, None)?;
1527 }
1528 };
1529
1530 let init_resources_sig = if std {
1531 quote! {
1532 pub fn init_resources(config_override: Option<CuConfig>) -> CuResult<AppResources>
1533 }
1534 } else {
1535 quote! {
1536 pub fn init_resources() -> CuResult<AppResources>
1537 }
1538 };
1539
1540 let init_resources_call = if std {
1541 quote! { Self::init_resources(config_override)? }
1542 } else {
1543 quote! { Self::init_resources()? }
1544 };
1545
1546 let new_with_resources_sig = if sim_mode {
1547 quote! {
1548 pub fn new_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
1549 clock: RobotClock,
1550 unified_logger: Arc<Mutex<L>>,
1551 app_resources: AppResources,
1552 sim_callback: &mut impl FnMut(SimStep) -> SimOverride,
1553 ) -> CuResult<Self>
1554 }
1555 } else {
1556 quote! {
1557 pub fn new_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
1558 clock: RobotClock,
1559 unified_logger: Arc<Mutex<L>>,
1560 app_resources: AppResources,
1561 ) -> CuResult<Self>
1562 }
1563 };
1564
1565 let new_with_resources_call = if sim_mode {
1566 quote! { Self::new_with_resources(clock, unified_logger, app_resources, sim_callback) }
1567 } else {
1568 quote! { Self::new_with_resources(clock, unified_logger, app_resources) }
1569 };
1570
1571 let kill_handler = if std {
1572 Some(quote! {
1573 ctrlc::set_handler(move || {
1574 STOP_FLAG.store(true, Ordering::SeqCst);
1575 }).expect("Error setting Ctrl-C handler");
1576 })
1577 } else {
1578 None
1579 };
1580
1581 let run_loop = if std {
1582 quote! {
1583 loop {
1584 let iter_start = self.copper_runtime.clock.now();
1585 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1586
1587 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1588 let period: CuDuration = (1_000_000_000u64 / rate).into();
1589 let elapsed = self.copper_runtime.clock.now() - iter_start;
1590 if elapsed < period {
1591 std::thread::sleep(std::time::Duration::from_nanos(period.as_nanos() - elapsed.as_nanos()));
1592 }
1593 }
1594
1595 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1596 break result;
1597 }
1598 }
1599 }
1600 } else {
1601 quote! {
1602 loop {
1603 let iter_start = self.copper_runtime.clock.now();
1604 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1605 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1606 let period: CuDuration = (1_000_000_000u64 / rate).into();
1607 let elapsed = self.copper_runtime.clock.now() - iter_start;
1608 if elapsed < period {
1609 busy_wait_for(period - elapsed);
1610 }
1611 }
1612
1613 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1614 break result;
1615 }
1616 }
1617 }
1618 };
1619
1620 #[cfg(feature = "macro_debug")]
1621 eprintln!("[build the run methods]");
1622 let run_methods = quote! {
1623
1624 #run_one_iteration {
1625
1626 let runtime = &mut self.copper_runtime;
1628 let clock = &runtime.clock;
1629 let monitor = &mut runtime.monitor;
1630 let tasks = &mut runtime.tasks;
1631 let bridges = &mut runtime.bridges;
1632 let cl_manager = &mut runtime.copperlists_manager;
1633 let kf_manager = &mut runtime.keyframes_manager;
1634
1635 #(#preprocess_calls)*
1637
1638 let culist = cl_manager.inner.create().expect("Ran out of space for copper lists"); let clid = culist.id;
1640 kf_manager.reset(clid, clock); culist.change_state(cu29::copperlist::CopperListState::Processing);
1642 culist.msgs.init_zeroed();
1643 {
1644 let msgs = &mut culist.msgs.0;
1645 #(#runtime_plan_code)*
1646 } monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
1648
1649 #(#preprocess_logging_calls)*
1651
1652 cl_manager.end_of_processing(clid)?;
1653 kf_manager.end_of_processing(clid)?;
1654
1655 #(#postprocess_calls)*
1657 Ok(())
1658 }
1659
1660 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
1661 let runtime = &mut self.copper_runtime;
1662 let clock = &runtime.clock;
1663 let tasks = &mut runtime.tasks;
1664 let config = cu29::bincode::config::standard();
1665 let reader = cu29::bincode::de::read::SliceReader::new(&keyframe.serialized_tasks);
1666 let mut decoder = DecoderImpl::new(reader, config, ());
1667 #(#task_restore_code);*;
1668 Ok(())
1669 }
1670
1671 #start_all_tasks {
1672 #(#start_calls)*
1673 self.copper_runtime.monitor.start(&self.copper_runtime.clock)?;
1674 Ok(())
1675 }
1676
1677 #stop_all_tasks {
1678 #(#stop_calls)*
1679 self.copper_runtime.monitor.stop(&self.copper_runtime.clock)?;
1680 Ok(())
1681 }
1682
1683 #run {
1684 static STOP_FLAG: AtomicBool = AtomicBool::new(false);
1685
1686 #kill_handler
1687
1688 <Self as #app_trait<S, L>>::start_all_tasks(self, #sim_callback_arg)?;
1689 let result = #run_loop;
1690
1691 if result.is_err() {
1692 error!("A task errored out: {}", &result);
1693 }
1694 <Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
1695 result
1696 }
1697 };
1698
1699 let tasks_type = if sim_mode {
1700 quote!(CuSimTasks)
1701 } else {
1702 quote!(CuTasks)
1703 };
1704
1705 let tasks_instanciator_fn = if sim_mode {
1706 quote!(tasks_instanciator_sim)
1707 } else {
1708 quote!(tasks_instanciator)
1709 };
1710
1711 let app_impl_decl = if sim_mode {
1712 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuSimApplication<S, L> for #application_name)
1713 } else {
1714 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuApplication<S, L> for #application_name)
1715 };
1716
1717 let simstep_type_decl = if sim_mode {
1718 quote!(
1719 type Step<'z> = SimStep<'z>;
1720 )
1721 } else {
1722 quote!()
1723 };
1724
1725 let app_resources_struct = quote! {
1726 pub struct AppResources {
1727 pub config: CuConfig,
1728 pub resources: ResourceManager,
1729 }
1730 };
1731
1732 let init_resources_fn = quote! {
1733 #init_resources_sig {
1734 let config_filename = #config_file;
1735
1736 #[cfg(target_os = "none")]
1737 ::cu29::prelude::info!("CuApp init: config file {}", config_filename);
1738 #[cfg(target_os = "none")]
1739 ::cu29::prelude::info!("CuApp init: loading config");
1740 #config_load_stmt
1741 #[cfg(target_os = "none")]
1742 ::cu29::prelude::info!("CuApp init: config loaded");
1743 if let Some(runtime) = &config.runtime {
1744 #[cfg(target_os = "none")]
1745 ::cu29::prelude::info!(
1746 "CuApp init: rate_target_hz={}",
1747 runtime.rate_target_hz.unwrap_or(0)
1748 );
1749 } else {
1750 #[cfg(target_os = "none")]
1751 ::cu29::prelude::info!("CuApp init: rate_target_hz=none");
1752 }
1753
1754 #[cfg(target_os = "none")]
1755 ::cu29::prelude::info!("CuApp init: building resources");
1756 let resources = #mission_mod::resources_instanciator(&config)?;
1757 #[cfg(target_os = "none")]
1758 ::cu29::prelude::info!("CuApp init: resources ready");
1759
1760 Ok(AppResources { config, resources })
1761 }
1762 };
1763
1764 let new_with_resources_fn = quote! {
1765 #new_with_resources_sig {
1766 let AppResources { config, resources } = app_resources;
1767
1768 #[cfg(target_os = "none")]
1769 {
1770 let structured_stream = ::cu29::prelude::stream_write::<
1771 ::cu29::prelude::CuLogEntry,
1772 S,
1773 >(
1774 unified_logger.clone(),
1775 ::cu29::prelude::UnifiedLogType::StructuredLogLine,
1776 4096 * 10,
1777 )?;
1778 let _logger_runtime = ::cu29::prelude::LoggerRuntime::init(
1779 clock.clone(),
1780 structured_stream,
1781 None::<::cu29::prelude::NullLog>,
1782 );
1783 }
1784
1785 let mut default_section_size = size_of::<super::#mission_mod::CuList>() * 64;
1788 if let Some(section_size_mib) = config.logging.as_ref().and_then(|l| l.section_size_mib) {
1790 default_section_size = section_size_mib as usize * 1024usize * 1024usize;
1792 }
1793 #[cfg(target_os = "none")]
1794 ::cu29::prelude::info!(
1795 "CuApp new: copperlist section size={}",
1796 default_section_size
1797 );
1798 #[cfg(target_os = "none")]
1799 ::cu29::prelude::info!("CuApp new: creating copperlist stream");
1800 let copperlist_stream = stream_write::<#mission_mod::CuList, S>(
1801 unified_logger.clone(),
1802 UnifiedLogType::CopperList,
1803 default_section_size,
1804 )?;
1808 #[cfg(target_os = "none")]
1809 ::cu29::prelude::info!("CuApp new: copperlist stream ready");
1810
1811 #[cfg(target_os = "none")]
1812 ::cu29::prelude::info!("CuApp new: creating keyframes stream");
1813 let keyframes_stream = stream_write::<KeyFrame, S>(
1814 unified_logger.clone(),
1815 UnifiedLogType::FrozenTasks,
1816 1024 * 1024 * 10, )?;
1818 #[cfg(target_os = "none")]
1819 ::cu29::prelude::info!("CuApp new: keyframes stream ready");
1820
1821 #[cfg(target_os = "none")]
1822 ::cu29::prelude::info!("CuApp new: building runtime");
1823 let copper_runtime = CuRuntime::<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>::new_with_resources(
1824 clock,
1825 &config,
1826 Some(#mission),
1827 resources,
1828 #mission_mod::#tasks_instanciator_fn,
1829 #mission_mod::monitor_instanciator,
1830 #mission_mod::bridges_instanciator,
1831 copperlist_stream,
1832 keyframes_stream)?;
1833 #[cfg(target_os = "none")]
1834 ::cu29::prelude::info!("CuApp new: runtime built");
1835
1836 let application = Ok(#application_name { copper_runtime });
1837
1838 #sim_callback_on_new
1839
1840 application
1841 }
1842 };
1843
1844 let app_inherent_impl = quote! {
1845 impl #application_name {
1846 pub fn original_config() -> String {
1847 #copper_config_content.to_string()
1848 }
1849
1850 #init_resources_fn
1851
1852 #new_with_resources_fn
1853 }
1854 };
1855
1856 #[cfg(feature = "std")]
1857 #[cfg(feature = "macro_debug")]
1858 eprintln!("[build result]");
1859 let application_impl = quote! {
1860 #app_impl_decl {
1861 #simstep_type_decl
1862
1863 #new {
1864 let app_resources = #init_resources_call;
1865 #new_with_resources_call
1866 }
1867
1868 fn get_original_config() -> String {
1869 Self::original_config()
1870 }
1871
1872 #run_methods
1873 }
1874 };
1875
1876 let (
1877 builder_struct,
1878 builder_new,
1879 builder_impl,
1880 builder_sim_callback_method,
1881 builder_build_sim_callback_arg,
1882 ) = if sim_mode {
1883 (
1884 quote! {
1885 #[allow(dead_code)]
1886 pub struct #builder_name <'a, F> {
1887 clock: Option<RobotClock>,
1888 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1889 config_override: Option<CuConfig>,
1890 sim_callback: Option<&'a mut F>
1891 }
1892 },
1893 quote! {
1894 #[allow(dead_code)]
1895 pub fn new() -> Self {
1896 Self {
1897 clock: None,
1898 unified_logger: None,
1899 config_override: None,
1900 sim_callback: None,
1901 }
1902 }
1903 },
1904 quote! {
1905 impl<'a, F> #builder_name <'a, F>
1906 where
1907 F: FnMut(SimStep) -> SimOverride,
1908 },
1909 Some(quote! {
1910 pub fn with_sim_callback(mut self, sim_callback: &'a mut F) -> Self
1911 {
1912 self.sim_callback = Some(sim_callback);
1913 self
1914 }
1915 }),
1916 Some(quote! {
1917 self.sim_callback
1918 .ok_or(CuError::from("Sim callback missing from builder"))?,
1919 }),
1920 )
1921 } else {
1922 (
1923 quote! {
1924 #[allow(dead_code)]
1925 pub struct #builder_name {
1926 clock: Option<RobotClock>,
1927 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1928 config_override: Option<CuConfig>,
1929 }
1930 },
1931 quote! {
1932 #[allow(dead_code)]
1933 pub fn new() -> Self {
1934 Self {
1935 clock: None,
1936 unified_logger: None,
1937 config_override: None,
1938 }
1939 }
1940 },
1941 quote! {
1942 impl #builder_name
1943 },
1944 None,
1945 None,
1946 )
1947 };
1948
1949 let std_application_impl = if sim_mode {
1951 Some(quote! {
1953 impl #application_name {
1954 pub fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1955 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self, sim_callback)
1956 }
1957 pub fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1958 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self, sim_callback)
1959 }
1960 pub fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1961 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self, sim_callback)
1962 }
1963 pub fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1964 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self, sim_callback)
1965 }
1966 }
1967 })
1968 } else if std {
1969 Some(quote! {
1971 impl #application_name {
1972 pub fn start_all_tasks(&mut self) -> CuResult<()> {
1973 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self)
1974 }
1975 pub fn run_one_iteration(&mut self) -> CuResult<()> {
1976 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self)
1977 }
1978 pub fn run(&mut self) -> CuResult<()> {
1979 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self)
1980 }
1981 pub fn stop_all_tasks(&mut self) -> CuResult<()> {
1982 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self)
1983 }
1984 }
1985 })
1986 } else {
1987 None };
1989
1990 let application_builder = if std {
1991 Some(quote! {
1992 #builder_struct
1993
1994 #builder_impl
1995 {
1996 #builder_new
1997
1998 #[allow(dead_code)]
1999 pub fn with_clock(mut self, clock: RobotClock) -> Self {
2000 self.clock = Some(clock);
2001 self
2002 }
2003
2004 #[allow(dead_code)]
2005 pub fn with_unified_logger(mut self, unified_logger: Arc<Mutex<UnifiedLoggerWrite>>) -> Self {
2006 self.unified_logger = Some(unified_logger);
2007 self
2008 }
2009
2010 #[allow(dead_code)]
2011 pub fn with_context(mut self, copper_ctx: &CopperContext) -> Self {
2012 self.clock = Some(copper_ctx.clock.clone());
2013 self.unified_logger = Some(copper_ctx.unified_logger.clone());
2014 self
2015 }
2016
2017 #[allow(dead_code)]
2018 pub fn with_config(mut self, config_override: CuConfig) -> Self {
2019 self.config_override = Some(config_override);
2020 self
2021 }
2022
2023 #builder_sim_callback_method
2024
2025 #[allow(dead_code)]
2026 pub fn build(self) -> CuResult<#application_name> {
2027 #application_name::new(
2028 self.clock
2029 .ok_or(CuError::from("Clock missing from builder"))?,
2030 self.unified_logger
2031 .ok_or(CuError::from("Unified logger missing from builder"))?,
2032 self.config_override,
2033 #builder_build_sim_callback_arg
2034 )
2035 }
2036 }
2037 })
2038 } else {
2039 None
2041 };
2042
2043 let sim_imports = if sim_mode {
2044 Some(quote! {
2045 use cu29::simulation::SimOverride;
2046 use cu29::simulation::CuTaskCallbackState;
2047 use cu29::simulation::CuSimSrcTask;
2048 use cu29::simulation::CuSimSinkTask;
2049 use cu29::prelude::app::CuSimApplication;
2050 })
2051 } else {
2052 None
2053 };
2054
2055 let sim_tasks = if sim_mode {
2056 Some(quote! {
2057 pub type CuSimTasks = #task_types_tuple_sim;
2060 })
2061 } else {
2062 None
2063 };
2064
2065 let sim_inst_body = if task_sim_instances_init_code.is_empty() {
2066 quote! {
2067 let _ = resources;
2068 Ok(())
2069 }
2070 } else {
2071 quote! { Ok(( #(#task_sim_instances_init_code),*, )) }
2072 };
2073
2074 let sim_tasks_instanciator = if sim_mode {
2075 Some(quote! {
2076 pub fn tasks_instanciator_sim(
2077 all_instances_configs: Vec<Option<&ComponentConfig>>,
2078 resources: &mut ResourceManager,
2079 ) -> CuResult<CuSimTasks> {
2080 #sim_inst_body
2081 }})
2082 } else {
2083 None
2084 };
2085
2086 let tasks_inst_body_std = if task_instances_init_code.is_empty() {
2087 quote! {
2088 let _ = resources;
2089 Ok(())
2090 }
2091 } else {
2092 quote! { Ok(( #(#task_instances_init_code),*, )) }
2093 };
2094
2095 let tasks_inst_body_nostd = if task_instances_init_code.is_empty() {
2096 quote! {
2097 let _ = resources;
2098 Ok(())
2099 }
2100 } else {
2101 quote! { Ok(( #(#task_instances_init_code),*, )) }
2102 };
2103
2104 let tasks_instanciator = if std {
2105 quote! {
2106 pub fn tasks_instanciator<'c>(
2107 all_instances_configs: Vec<Option<&'c ComponentConfig>>,
2108 resources: &mut ResourceManager,
2109 ) -> CuResult<CuTasks> {
2110 #tasks_inst_body_std
2111 }
2112 }
2113 } else {
2114 quote! {
2116 pub fn tasks_instanciator<'c>(
2117 all_instances_configs: Vec<Option<&'c ComponentConfig>>,
2118 resources: &mut ResourceManager,
2119 ) -> CuResult<CuTasks> {
2120 #tasks_inst_body_nostd
2121 }
2122 }
2123 };
2124
2125 let imports = if std {
2126 quote! {
2127 use cu29::rayon::ThreadPool;
2128 use cu29::cuasynctask::CuAsyncTask;
2129 use cu29::curuntime::CopperContext;
2130 use cu29::resource::{ResourceBindings, ResourceManager};
2131 use cu29::prelude::SectionStorage;
2132 use cu29::prelude::UnifiedLoggerWrite;
2133 use cu29::prelude::memmap::MmapSectionStorage;
2134 use std::fmt::{Debug, Formatter};
2135 use std::fmt::Result as FmtResult;
2136 use std::mem::size_of;
2137 use std::sync::Arc;
2138 use std::sync::atomic::{AtomicBool, Ordering};
2139 use std::sync::Mutex;
2140 }
2141 } else {
2142 quote! {
2143 use alloc::sync::Arc;
2144 use alloc::string::String;
2145 use alloc::string::ToString;
2146 use core::sync::atomic::{AtomicBool, Ordering};
2147 use core::fmt::{Debug, Formatter};
2148 use core::fmt::Result as FmtResult;
2149 use core::mem::size_of;
2150 use spin::Mutex;
2151 use cu29::prelude::SectionStorage;
2152 use cu29::resource::{ResourceBindings, ResourceManager};
2153 }
2154 };
2155
2156 let task_mapping_defs = task_resource_mappings.defs.clone();
2157 let bridge_mapping_defs = bridge_resource_mappings.defs.clone();
2158
2159 let mission_mod_tokens = quote! {
2161 mod #mission_mod {
2162 use super::*; use cu29::bincode::Encode;
2165 use cu29::bincode::enc::Encoder;
2166 use cu29::bincode::error::EncodeError;
2167 use cu29::bincode::Decode;
2168 use cu29::bincode::de::Decoder;
2169 use cu29::bincode::de::DecoderImpl;
2170 use cu29::bincode::error::DecodeError;
2171 use cu29::clock::RobotClock;
2172 use cu29::config::CuConfig;
2173 use cu29::config::ComponentConfig;
2174 use cu29::curuntime::CuRuntime;
2175 use cu29::curuntime::KeyFrame;
2176 use cu29::CuResult;
2177 use cu29::CuError;
2178 use cu29::cutask::CuSrcTask;
2179 use cu29::cutask::CuSinkTask;
2180 use cu29::cutask::CuTask;
2181 use cu29::cutask::CuMsg;
2182 use cu29::cutask::CuMsgMetadata;
2183 use cu29::copperlist::CopperList;
2184 use cu29::monitoring::CuMonitor; use cu29::monitoring::CuTaskState;
2186 use cu29::monitoring::Decision;
2187 use cu29::prelude::app::CuApplication;
2188 use cu29::prelude::debug;
2189 use cu29::prelude::stream_write;
2190 use cu29::prelude::UnifiedLogType;
2191 use cu29::prelude::UnifiedLogWrite;
2192
2193 #imports
2194
2195 #sim_imports
2196
2197 #[allow(unused_imports)]
2199 use cu29::monitoring::NoMonitor;
2200
2201 pub type CuTasks = #task_types_tuple;
2205 pub type CuBridges = #bridges_type_tokens;
2206 #resources_module
2207 #resources_instanciator_fn
2208 #task_mapping_defs
2209 #bridge_mapping_defs
2210
2211 #sim_tasks
2212 #sim_support
2213 #sim_tasks_instanciator
2214
2215 pub const TASKS_IDS: &'static [&'static str] = &[#( #ids ),*];
2216
2217 #culist_support
2218 #tasks_instanciator
2219 #bridges_instanciator
2220
2221 pub fn monitor_instanciator(config: &CuConfig) -> #monitor_type {
2222 #monitor_type::new(config, #mission_mod::TASKS_IDS).expect("Failed to create the given monitor.")
2223 }
2224
2225 #app_resources_struct
2227 pub #application_struct
2228
2229 #app_inherent_impl
2230 #application_impl
2231
2232 #std_application_impl
2233
2234 #application_builder
2235 }
2236
2237 };
2238 all_missions_tokens.push(mission_mod_tokens);
2239 }
2240
2241 let default_application_tokens = if all_missions.contains_key("default") {
2242 let default_builder = if std {
2243 Some(quote! {
2244 #[allow(unused_imports)]
2246 use default::#builder_name;
2247 })
2248 } else {
2249 None
2250 };
2251 quote! {
2252 #default_builder
2253
2254 #[allow(unused_imports)]
2255 use default::AppResources;
2256
2257 #[allow(unused_imports)]
2258 use default::resources as app_resources;
2259
2260 #[allow(unused_imports)]
2261 use default::#application_name;
2262 }
2263 } else {
2264 quote!() };
2266
2267 let result: proc_macro2::TokenStream = quote! {
2268 #(#all_missions_tokens)*
2269 #default_application_tokens
2270 };
2271
2272 #[cfg(feature = "macro_debug")]
2274 {
2275 let formatted_code = rustfmt_generated_code(result.to_string());
2276 eprintln!("\n === Gen. Runtime ===\n");
2277 eprintln!("{formatted_code}");
2278 eprintln!("\n === === === === === ===\n");
2281 }
2282 result.into()
2283}
2284
2285fn read_config(config_file: &str) -> CuResult<CuConfig> {
2286 let filename = config_full_path(config_file);
2287
2288 read_configuration(filename.as_str())
2289}
2290
2291fn config_full_path(config_file: &str) -> String {
2292 let mut config_full_path = utils::caller_crate_root();
2293 config_full_path.push(config_file);
2294 let filename = config_full_path
2295 .as_os_str()
2296 .to_str()
2297 .expect("Could not interpret the config file name");
2298 filename.to_string()
2299}
2300
2301fn extract_tasks_output_types(graph: &CuGraph) -> Vec<Option<Type>> {
2302 graph
2303 .get_all_nodes()
2304 .iter()
2305 .map(|(_, node)| {
2306 let id = node.get_id();
2307 let type_str = graph.get_node_output_msg_type(id.as_str());
2308 type_str.map(|type_str| {
2309 parse_str::<Type>(type_str.as_str()).expect("Could not parse output message type.")
2310 })
2311 })
2312 .collect()
2313}
2314
2315struct CuTaskSpecSet {
2316 pub ids: Vec<String>,
2317 pub cutypes: Vec<CuTaskType>,
2318 pub background_flags: Vec<bool>,
2319 pub logging_enabled: Vec<bool>,
2320 pub type_names: Vec<String>,
2321 pub task_types: Vec<Type>,
2322 pub instantiation_types: Vec<Type>,
2323 pub sim_task_types: Vec<Type>,
2324 pub run_in_sim_flags: Vec<bool>,
2325 #[allow(dead_code)]
2326 pub output_types: Vec<Option<Type>>,
2327 pub node_id_to_task_index: Vec<Option<usize>>,
2328}
2329
2330impl CuTaskSpecSet {
2331 pub fn from_graph(graph: &CuGraph) -> Self {
2332 let all_id_nodes: Vec<(NodeId, &Node)> = graph
2333 .get_all_nodes()
2334 .into_iter()
2335 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
2336 .collect();
2337
2338 let ids = all_id_nodes
2339 .iter()
2340 .map(|(_, node)| node.get_id().to_string())
2341 .collect();
2342
2343 let cutypes = all_id_nodes
2344 .iter()
2345 .map(|(id, _)| find_task_type_for_id(graph, *id))
2346 .collect();
2347
2348 let background_flags: Vec<bool> = all_id_nodes
2349 .iter()
2350 .map(|(_, node)| node.is_background())
2351 .collect();
2352
2353 let logging_enabled: Vec<bool> = all_id_nodes
2354 .iter()
2355 .map(|(_, node)| node.is_logging_enabled())
2356 .collect();
2357
2358 let type_names: Vec<String> = all_id_nodes
2359 .iter()
2360 .map(|(_, node)| node.get_type().to_string())
2361 .collect();
2362
2363 let output_types = extract_tasks_output_types(graph);
2364
2365 let task_types = type_names
2366 .iter()
2367 .zip(background_flags.iter())
2368 .zip(output_types.iter())
2369 .map(|((name, &background), output_type)| {
2370 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
2371 panic!("Could not transform {name} into a Task Rust type: {error}");
2372 });
2373 if background {
2374 if let Some(output_type) = output_type {
2375 parse_quote!(CuAsyncTask<#name_type, #output_type>)
2376 } else {
2377 panic!("{name}: If a task is background, it has to have an output");
2378 }
2379 } else {
2380 name_type
2381 }
2382 })
2383 .collect();
2384
2385 let instantiation_types = type_names
2386 .iter()
2387 .zip(background_flags.iter())
2388 .zip(output_types.iter())
2389 .map(|((name, &background), output_type)| {
2390 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
2391 panic!("Could not transform {name} into a Task Rust type: {error}");
2392 });
2393 if background {
2394 if let Some(output_type) = output_type {
2395 parse_quote!(CuAsyncTask::<#name_type, #output_type>)
2396 } else {
2397 panic!("{name}: If a task is background, it has to have an output");
2398 }
2399 } else {
2400 name_type
2401 }
2402 })
2403 .collect();
2404
2405 let sim_task_types = type_names
2406 .iter()
2407 .map(|name| {
2408 parse_str::<Type>(name).unwrap_or_else(|err| {
2409 eprintln!("Could not transform {name} into a Task Rust type.");
2410 panic!("{err}")
2411 })
2412 })
2413 .collect();
2414
2415 let run_in_sim_flags = all_id_nodes
2416 .iter()
2417 .map(|(_, node)| node.is_run_in_sim())
2418 .collect();
2419
2420 let mut node_id_to_task_index = vec![None; graph.node_count()];
2421 for (index, (node_id, _)) in all_id_nodes.iter().enumerate() {
2422 node_id_to_task_index[*node_id as usize] = Some(index);
2423 }
2424
2425 Self {
2426 ids,
2427 cutypes,
2428 background_flags,
2429 logging_enabled,
2430 type_names,
2431 task_types,
2432 instantiation_types,
2433 sim_task_types,
2434 run_in_sim_flags,
2435 output_types,
2436 node_id_to_task_index,
2437 }
2438 }
2439}
2440
2441#[derive(Clone)]
2442struct OutputPack {
2443 msg_types: Vec<Type>,
2444}
2445
2446impl OutputPack {
2447 fn slot_type(&self) -> Type {
2448 build_output_slot_type(&self.msg_types)
2449 }
2450
2451 fn is_multi(&self) -> bool {
2452 self.msg_types.len() > 1
2453 }
2454}
2455
2456fn build_output_slot_type(msg_types: &[Type]) -> Type {
2457 if msg_types.is_empty() {
2458 parse_quote! { () }
2459 } else if msg_types.len() == 1 {
2460 let msg_type = msg_types.first().unwrap();
2461 parse_quote! { CuMsg<#msg_type> }
2462 } else {
2463 parse_quote! { ( #( CuMsg<#msg_types> ),* ) }
2464 }
2465}
2466
2467fn extract_output_packs(runtime_plan: &CuExecutionLoop) -> Vec<OutputPack> {
2468 let mut packs: Vec<(u32, OutputPack)> = runtime_plan
2469 .steps
2470 .iter()
2471 .filter_map(|unit| match unit {
2472 CuExecutionUnit::Step(step) => {
2473 if let Some(output_pack) = &step.output_msg_pack {
2474 let msg_types: Vec<Type> = output_pack
2475 .msg_types
2476 .iter()
2477 .map(|output_msg_type| {
2478 parse_str::<Type>(output_msg_type.as_str()).unwrap_or_else(|_| {
2479 panic!(
2480 "Could not transform {output_msg_type} into a message Rust type."
2481 )
2482 })
2483 })
2484 .collect();
2485 Some((output_pack.culist_index, OutputPack { msg_types }))
2486 } else {
2487 None
2488 }
2489 }
2490 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
2491 })
2492 .collect();
2493
2494 packs.sort_by_key(|(index, _)| *index);
2495 packs.into_iter().map(|(_, pack)| pack).collect()
2496}
2497
2498fn collect_output_pack_sizes(runtime_plan: &CuExecutionLoop) -> Vec<usize> {
2499 let mut sizes: Vec<(u32, usize)> = runtime_plan
2500 .steps
2501 .iter()
2502 .filter_map(|unit| match unit {
2503 CuExecutionUnit::Step(step) => step
2504 .output_msg_pack
2505 .as_ref()
2506 .map(|output_pack| (output_pack.culist_index, output_pack.msg_types.len())),
2507 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
2508 })
2509 .collect();
2510
2511 sizes.sort_by_key(|(index, _)| *index);
2512 sizes.into_iter().map(|(_, size)| size).collect()
2513}
2514
2515fn build_culist_tuple(slot_types: &[Type]) -> TypeTuple {
2517 if slot_types.is_empty() {
2518 parse_quote! { () }
2519 } else {
2520 parse_quote! { ( #( #slot_types ),* ) }
2521 }
2522}
2523
2524fn build_culist_tuple_encode(slot_types: &[Type]) -> ItemImpl {
2526 let indices: Vec<usize> = (0..slot_types.len()).collect();
2527
2528 let encode_fields: Vec<_> = indices
2530 .iter()
2531 .map(|i| {
2532 let idx = syn::Index::from(*i);
2533 quote! { self.0.#idx.encode(encoder)?; }
2534 })
2535 .collect();
2536
2537 parse_quote! {
2538 impl Encode for CuStampedDataSet {
2539 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
2540 #(#encode_fields)*
2541 Ok(())
2542 }
2543 }
2544 }
2545}
2546
2547fn build_culist_tuple_decode(slot_types: &[Type]) -> ItemImpl {
2549 let indices: Vec<usize> = (0..slot_types.len()).collect();
2550
2551 let decode_fields: Vec<_> = indices
2552 .iter()
2553 .map(|i| {
2554 let slot_type = &slot_types[*i];
2555 quote! { <#slot_type as Decode<()>>::decode(decoder)? }
2556 })
2557 .collect();
2558
2559 parse_quote! {
2560 impl Decode<()> for CuStampedDataSet {
2561 fn decode<D: Decoder<Context=()>>(decoder: &mut D) -> Result<Self, DecodeError> {
2562 Ok(CuStampedDataSet ((
2563 #(#decode_fields),*
2564 )))
2565 }
2566 }
2567 }
2568}
2569
2570fn build_culist_erasedcumsgs(output_packs: &[OutputPack]) -> ItemImpl {
2571 let mut casted_fields: Vec<proc_macro2::TokenStream> = Vec::new();
2572 for (idx, pack) in output_packs.iter().enumerate() {
2573 let slot_index = syn::Index::from(idx);
2574 if pack.is_multi() {
2575 for port_idx in 0..pack.msg_types.len() {
2576 let port_index = syn::Index::from(port_idx);
2577 casted_fields.push(quote! {
2578 &self.0.#slot_index.#port_index as &dyn ErasedCuStampedData
2579 });
2580 }
2581 } else {
2582 casted_fields.push(quote! { &self.0.#slot_index as &dyn ErasedCuStampedData });
2583 }
2584 }
2585 parse_quote! {
2586 impl ErasedCuStampedDataSet for CuStampedDataSet {
2587 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
2588 vec![
2589 #(#casted_fields),*
2590 ]
2591 }
2592 }
2593 }
2594}
2595
2596fn build_culist_tuple_debug(slot_types: &[Type]) -> ItemImpl {
2597 let indices: Vec<usize> = (0..slot_types.len()).collect();
2598
2599 let debug_fields: Vec<_> = indices
2600 .iter()
2601 .map(|i| {
2602 let idx = syn::Index::from(*i);
2603 quote! { .field(&self.0.#idx) }
2604 })
2605 .collect();
2606
2607 parse_quote! {
2608 impl Debug for CuStampedDataSet {
2609 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
2610 f.debug_tuple("CuStampedDataSet")
2611 #(#debug_fields)*
2612 .finish()
2613 }
2614 }
2615 }
2616}
2617
2618fn build_culist_tuple_serialize(slot_types: &[Type]) -> ItemImpl {
2620 let indices: Vec<usize> = (0..slot_types.len()).collect();
2621 let tuple_len = slot_types.len();
2622
2623 let serialize_fields: Vec<_> = indices
2625 .iter()
2626 .map(|i| {
2627 let idx = syn::Index::from(*i);
2628 quote! { &self.0.#idx }
2629 })
2630 .collect();
2631
2632 parse_quote! {
2633 impl Serialize for CuStampedDataSet {
2634 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2635 where
2636 S: serde::Serializer,
2637 {
2638 use serde::ser::SerializeTuple;
2639 let mut tuple = serializer.serialize_tuple(#tuple_len)?;
2640 #(tuple.serialize_element(#serialize_fields)?;)*
2641 tuple.end()
2642 }
2643 }
2644 }
2645}
2646
2647fn build_culist_tuple_default(slot_types: &[Type]) -> ItemImpl {
2649 let default_fields: Vec<_> = slot_types
2650 .iter()
2651 .map(|slot_type| quote! { <#slot_type as Default>::default() })
2652 .collect();
2653
2654 parse_quote! {
2655 impl Default for CuStampedDataSet {
2656 fn default() -> CuStampedDataSet
2657 {
2658 CuStampedDataSet((
2659 #(#default_fields),*
2660 ))
2661 }
2662 }
2663 }
2664}
2665
2666fn collect_bridge_channel_usage(graph: &CuGraph) -> HashMap<BridgeChannelKey, String> {
2667 let mut usage = HashMap::new();
2668 for cnx in graph.edges() {
2669 if let Some(channel) = &cnx.src_channel {
2670 let key = BridgeChannelKey {
2671 bridge_id: cnx.src.clone(),
2672 channel_id: channel.clone(),
2673 direction: BridgeChannelDirection::Rx,
2674 };
2675 usage
2676 .entry(key)
2677 .and_modify(|msg| {
2678 if msg != &cnx.msg {
2679 panic!(
2680 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2681 cnx.src, channel, msg, cnx.msg
2682 );
2683 }
2684 })
2685 .or_insert(cnx.msg.clone());
2686 }
2687 if let Some(channel) = &cnx.dst_channel {
2688 let key = BridgeChannelKey {
2689 bridge_id: cnx.dst.clone(),
2690 channel_id: channel.clone(),
2691 direction: BridgeChannelDirection::Tx,
2692 };
2693 usage
2694 .entry(key)
2695 .and_modify(|msg| {
2696 if msg != &cnx.msg {
2697 panic!(
2698 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2699 cnx.dst, channel, msg, cnx.msg
2700 );
2701 }
2702 })
2703 .or_insert(cnx.msg.clone());
2704 }
2705 }
2706 usage
2707}
2708
2709fn build_bridge_specs(
2710 config: &CuConfig,
2711 graph: &CuGraph,
2712 channel_usage: &HashMap<BridgeChannelKey, String>,
2713) -> Vec<BridgeSpec> {
2714 let mut specs = Vec::new();
2715 for (bridge_index, bridge_cfg) in config.bridges.iter().enumerate() {
2716 if graph.get_node_id_by_name(bridge_cfg.id.as_str()).is_none() {
2717 continue;
2718 }
2719
2720 let type_path = parse_str::<Type>(bridge_cfg.type_.as_str()).unwrap_or_else(|err| {
2721 panic!(
2722 "Could not parse bridge type '{}' for '{}': {err}",
2723 bridge_cfg.type_, bridge_cfg.id
2724 )
2725 });
2726
2727 let mut rx_channels = Vec::new();
2728 let mut tx_channels = Vec::new();
2729
2730 for (channel_index, channel) in bridge_cfg.channels.iter().enumerate() {
2731 match channel {
2732 BridgeChannelConfigRepresentation::Rx { id, .. } => {
2733 let key = BridgeChannelKey {
2734 bridge_id: bridge_cfg.id.clone(),
2735 channel_id: id.clone(),
2736 direction: BridgeChannelDirection::Rx,
2737 };
2738 if let Some(msg_type) = channel_usage.get(&key) {
2739 let msg_type_name = msg_type.clone();
2740 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2741 panic!(
2742 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2743 bridge_cfg.id, id
2744 )
2745 });
2746 let const_ident =
2747 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2748 rx_channels.push(BridgeChannelSpec {
2749 id: id.clone(),
2750 const_ident,
2751 msg_type,
2752 msg_type_name,
2753 config_index: channel_index,
2754 plan_node_id: None,
2755 culist_index: None,
2756 monitor_index: None,
2757 });
2758 }
2759 }
2760 BridgeChannelConfigRepresentation::Tx { id, .. } => {
2761 let key = BridgeChannelKey {
2762 bridge_id: bridge_cfg.id.clone(),
2763 channel_id: id.clone(),
2764 direction: BridgeChannelDirection::Tx,
2765 };
2766 if let Some(msg_type) = channel_usage.get(&key) {
2767 let msg_type_name = msg_type.clone();
2768 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2769 panic!(
2770 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2771 bridge_cfg.id, id
2772 )
2773 });
2774 let const_ident =
2775 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2776 tx_channels.push(BridgeChannelSpec {
2777 id: id.clone(),
2778 const_ident,
2779 msg_type,
2780 msg_type_name,
2781 config_index: channel_index,
2782 plan_node_id: None,
2783 culist_index: None,
2784 monitor_index: None,
2785 });
2786 }
2787 }
2788 }
2789 }
2790
2791 if rx_channels.is_empty() && tx_channels.is_empty() {
2792 continue;
2793 }
2794
2795 specs.push(BridgeSpec {
2796 id: bridge_cfg.id.clone(),
2797 type_path,
2798 config_index: bridge_index,
2799 tuple_index: 0,
2800 monitor_index: None,
2801 rx_channels,
2802 tx_channels,
2803 });
2804 }
2805
2806 for (tuple_index, spec) in specs.iter_mut().enumerate() {
2807 spec.tuple_index = tuple_index;
2808 }
2809
2810 specs
2811}
2812
2813fn collect_task_member_names(graph: &CuGraph) -> Vec<(NodeId, String)> {
2814 graph
2815 .get_all_nodes()
2816 .iter()
2817 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
2818 .map(|(node_id, node)| (*node_id, config_id_to_struct_member(node.get_id().as_str())))
2819 .collect()
2820}
2821
2822#[derive(Clone, Copy)]
2823enum ResourceOwner {
2824 Task(usize),
2825 Bridge(usize),
2826}
2827
2828#[derive(Clone)]
2829struct ResourceKeySpec {
2830 bundle_index: usize,
2831 provider_path: syn::Path,
2832 resource_name: String,
2833 binding_name: String,
2834 owner: ResourceOwner,
2835}
2836
2837fn parse_resource_path(path: &str) -> CuResult<(String, String)> {
2838 let (bundle_id, name) = path.split_once('.').ok_or_else(|| {
2839 CuError::from(format!(
2840 "Resource '{path}' is missing a bundle prefix (expected bundle.resource)"
2841 ))
2842 })?;
2843
2844 if bundle_id.is_empty() || name.is_empty() {
2845 return Err(CuError::from(format!(
2846 "Resource '{path}' must use the 'bundle.resource' format"
2847 )));
2848 }
2849
2850 Ok((bundle_id.to_string(), name.to_string()))
2851}
2852
2853fn collect_resource_specs(
2854 graph: &CuGraph,
2855 task_specs: &CuTaskSpecSet,
2856 bridge_specs: &[BridgeSpec],
2857 bundle_specs: &[BundleSpec],
2858) -> CuResult<Vec<ResourceKeySpec>> {
2859 let mut bridge_lookup: BTreeMap<String, usize> = BTreeMap::new();
2860 for (idx, spec) in bridge_specs.iter().enumerate() {
2861 bridge_lookup.insert(spec.id.clone(), idx);
2862 }
2863
2864 let mut bundle_lookup: HashMap<String, (usize, syn::Path)> = HashMap::new();
2865 for (index, bundle) in bundle_specs.iter().enumerate() {
2866 bundle_lookup.insert(bundle.id.clone(), (index, bundle.provider_path.clone()));
2867 }
2868
2869 let mut specs = Vec::new();
2870
2871 for (node_id, node) in graph.get_all_nodes() {
2872 let resources = node.get_resources();
2873 if let Some(resources) = resources {
2874 let task_index = task_specs.node_id_to_task_index[node_id as usize];
2875 let owner = if let Some(task_index) = task_index {
2876 ResourceOwner::Task(task_index)
2877 } else if node.get_flavor() == Flavor::Bridge {
2878 let bridge_index = bridge_lookup.get(&node.get_id()).ok_or_else(|| {
2879 CuError::from(format!(
2880 "Resource mapping attached to unknown bridge node '{}'",
2881 node.get_id()
2882 ))
2883 })?;
2884 ResourceOwner::Bridge(*bridge_index)
2885 } else {
2886 return Err(CuError::from(format!(
2887 "Resource mapping attached to non-task node '{}'",
2888 node.get_id()
2889 )));
2890 };
2891
2892 for (binding_name, path) in resources {
2893 let (bundle_id, resource_name) = parse_resource_path(path)?;
2894 let (bundle_index, provider_path) =
2895 bundle_lookup.get(&bundle_id).ok_or_else(|| {
2896 CuError::from(format!(
2897 "Resource '{}' references unknown bundle '{}'",
2898 path, bundle_id
2899 ))
2900 })?;
2901 specs.push(ResourceKeySpec {
2902 bundle_index: *bundle_index,
2903 provider_path: provider_path.clone(),
2904 resource_name,
2905 binding_name: binding_name.clone(),
2906 owner,
2907 });
2908 }
2909 }
2910 }
2911
2912 Ok(specs)
2913}
2914
2915fn build_bundle_list<'a>(config: &'a CuConfig, mission: &str) -> Vec<&'a ResourceBundleConfig> {
2916 config
2917 .resources
2918 .iter()
2919 .filter(|bundle| {
2920 bundle
2921 .missions
2922 .as_ref()
2923 .is_none_or(|missions| missions.iter().any(|m| m == mission))
2924 })
2925 .collect()
2926}
2927
2928struct BundleSpec {
2929 id: String,
2930 provider_path: syn::Path,
2931}
2932
2933fn build_bundle_specs(config: &CuConfig, mission: &str) -> CuResult<Vec<BundleSpec>> {
2934 build_bundle_list(config, mission)
2935 .into_iter()
2936 .map(|bundle| {
2937 let provider_path: syn::Path =
2938 syn::parse_str(bundle.provider.as_str()).map_err(|err| {
2939 CuError::from(format!(
2940 "Failed to parse provider path '{}' for bundle '{}': {err}",
2941 bundle.provider, bundle.id
2942 ))
2943 })?;
2944 Ok(BundleSpec {
2945 id: bundle.id.clone(),
2946 provider_path,
2947 })
2948 })
2949 .collect()
2950}
2951
2952fn build_resources_module(
2953 bundle_specs: &[BundleSpec],
2954) -> CuResult<(proc_macro2::TokenStream, proc_macro2::TokenStream)> {
2955 let bundle_consts = bundle_specs.iter().enumerate().map(|(index, bundle)| {
2956 let const_ident = Ident::new(
2957 &config_id_to_bridge_const(bundle.id.as_str()),
2958 Span::call_site(),
2959 );
2960 quote! { pub const #const_ident: BundleIndex = BundleIndex::new(#index); }
2961 });
2962
2963 let resources_module = quote! {
2964 pub mod resources {
2965 #![allow(dead_code)]
2966 use cu29::resource::BundleIndex;
2967
2968 pub mod bundles {
2969 use super::BundleIndex;
2970 #(#bundle_consts)*
2971 }
2972 }
2973 };
2974
2975 let bundle_counts = bundle_specs.iter().map(|bundle| {
2976 let provider_path = &bundle.provider_path;
2977 quote! { <#provider_path as cu29::resource::ResourceBundleDecl>::Id::COUNT }
2978 });
2979
2980 let bundle_inits = bundle_specs
2981 .iter()
2982 .enumerate()
2983 .map(|(index, bundle)| {
2984 let bundle_id = LitStr::new(bundle.id.as_str(), Span::call_site());
2985 let provider_path = &bundle.provider_path;
2986 quote! {
2987 let bundle_cfg = config
2988 .resources
2989 .iter()
2990 .find(|b| b.id == #bundle_id)
2991 .unwrap_or_else(|| panic!("Resource bundle '{}' missing from configuration", #bundle_id));
2992 let bundle_ctx = cu29::resource::BundleContext::<#provider_path>::new(
2993 cu29::resource::BundleIndex::new(#index),
2994 #bundle_id,
2995 );
2996 <#provider_path as cu29::resource::ResourceBundle>::build(
2997 bundle_ctx,
2998 bundle_cfg.config.as_ref(),
2999 &mut manager,
3000 )?;
3001 }
3002 })
3003 .collect::<Vec<_>>();
3004
3005 let resources_instanciator = quote! {
3006 pub fn resources_instanciator(config: &CuConfig) -> CuResult<cu29::resource::ResourceManager> {
3007 let bundle_counts: &[usize] = &[ #(#bundle_counts),* ];
3008 let mut manager = cu29::resource::ResourceManager::new(bundle_counts);
3009 #(#bundle_inits)*
3010 Ok(manager)
3011 }
3012 };
3013
3014 Ok((resources_module, resources_instanciator))
3015}
3016
3017struct ResourceMappingTokens {
3018 defs: proc_macro2::TokenStream,
3019 refs: Vec<proc_macro2::TokenStream>,
3020}
3021
3022fn build_task_resource_mappings(
3023 resource_specs: &[ResourceKeySpec],
3024 task_specs: &CuTaskSpecSet,
3025) -> CuResult<ResourceMappingTokens> {
3026 let mut per_task: Vec<Vec<&ResourceKeySpec>> = vec![Vec::new(); task_specs.ids.len()];
3027
3028 for spec in resource_specs {
3029 let ResourceOwner::Task(task_index) = spec.owner else {
3030 continue;
3031 };
3032 per_task
3033 .get_mut(task_index)
3034 .ok_or_else(|| {
3035 CuError::from(format!(
3036 "Resource '{}' mapped to invalid task index {}",
3037 spec.binding_name, task_index
3038 ))
3039 })?
3040 .push(spec);
3041 }
3042
3043 let mut mapping_defs = Vec::new();
3044 let mut mapping_refs = Vec::new();
3045
3046 for (idx, entries) in per_task.iter().enumerate() {
3047 if entries.is_empty() {
3048 mapping_refs.push(quote! { None });
3049 continue;
3050 }
3051
3052 let binding_task_type = if task_specs.background_flags[idx] {
3053 &task_specs.sim_task_types[idx]
3054 } else {
3055 &task_specs.task_types[idx]
3056 };
3057
3058 let binding_trait = match task_specs.cutypes[idx] {
3059 CuTaskType::Source => quote! { CuSrcTask },
3060 CuTaskType::Regular => quote! { CuTask },
3061 CuTaskType::Sink => quote! { CuSinkTask },
3062 };
3063
3064 let entries_ident = format_ident!("TASK{}_RES_ENTRIES", idx);
3065 let map_ident = format_ident!("TASK{}_RES_MAPPING", idx);
3066 let binding_type = quote! {
3067 <<#binding_task_type as #binding_trait>::Resources<'_> as ResourceBindings>::Binding
3068 };
3069 let entry_tokens = entries.iter().map(|spec| {
3070 let binding_ident =
3071 Ident::new(&config_id_to_enum(spec.binding_name.as_str()), Span::call_site());
3072 let resource_ident =
3073 Ident::new(&config_id_to_enum(spec.resource_name.as_str()), Span::call_site());
3074 let bundle_index = spec.bundle_index;
3075 let provider_path = &spec.provider_path;
3076 quote! {
3077 (#binding_type::#binding_ident, cu29::resource::ResourceKey::new(
3078 cu29::resource::BundleIndex::new(#bundle_index),
3079 <#provider_path as cu29::resource::ResourceBundleDecl>::Id::#resource_ident as usize,
3080 ))
3081 }
3082 });
3083
3084 mapping_defs.push(quote! {
3085 const #entries_ident: &[(#binding_type, cu29::resource::ResourceKey)] = &[ #(#entry_tokens),* ];
3086 const #map_ident: cu29::resource::ResourceBindingMap<#binding_type> =
3087 cu29::resource::ResourceBindingMap::new(#entries_ident);
3088 });
3089 mapping_refs.push(quote! { Some(&#map_ident) });
3090 }
3091
3092 Ok(ResourceMappingTokens {
3093 defs: quote! { #(#mapping_defs)* },
3094 refs: mapping_refs,
3095 })
3096}
3097
3098fn build_bridge_resource_mappings(
3099 resource_specs: &[ResourceKeySpec],
3100 bridge_specs: &[BridgeSpec],
3101) -> ResourceMappingTokens {
3102 let mut per_bridge: Vec<Vec<&ResourceKeySpec>> = vec![Vec::new(); bridge_specs.len()];
3103
3104 for spec in resource_specs {
3105 let ResourceOwner::Bridge(bridge_index) = spec.owner else {
3106 continue;
3107 };
3108 per_bridge[bridge_index].push(spec);
3109 }
3110
3111 let mut mapping_defs = Vec::new();
3112 let mut mapping_refs = Vec::new();
3113
3114 for (idx, entries) in per_bridge.iter().enumerate() {
3115 if entries.is_empty() {
3116 mapping_refs.push(quote! { None });
3117 continue;
3118 }
3119
3120 let bridge_type = &bridge_specs[idx].type_path;
3121 let binding_type = quote! {
3122 <<#bridge_type as cu29::cubridge::CuBridge>::Resources<'_> as ResourceBindings>::Binding
3123 };
3124 let entries_ident = format_ident!("BRIDGE{}_RES_ENTRIES", idx);
3125 let map_ident = format_ident!("BRIDGE{}_RES_MAPPING", idx);
3126 let entry_tokens = entries.iter().map(|spec| {
3127 let binding_ident =
3128 Ident::new(&config_id_to_enum(spec.binding_name.as_str()), Span::call_site());
3129 let resource_ident =
3130 Ident::new(&config_id_to_enum(spec.resource_name.as_str()), Span::call_site());
3131 let bundle_index = spec.bundle_index;
3132 let provider_path = &spec.provider_path;
3133 quote! {
3134 (#binding_type::#binding_ident, cu29::resource::ResourceKey::new(
3135 cu29::resource::BundleIndex::new(#bundle_index),
3136 <#provider_path as cu29::resource::ResourceBundleDecl>::Id::#resource_ident as usize,
3137 ))
3138 }
3139 });
3140
3141 mapping_defs.push(quote! {
3142 const #entries_ident: &[(#binding_type, cu29::resource::ResourceKey)] = &[ #(#entry_tokens),* ];
3143 const #map_ident: cu29::resource::ResourceBindingMap<#binding_type> =
3144 cu29::resource::ResourceBindingMap::new(#entries_ident);
3145 });
3146 mapping_refs.push(quote! { Some(&#map_ident) });
3147 }
3148
3149 ResourceMappingTokens {
3150 defs: quote! { #(#mapping_defs)* },
3151 refs: mapping_refs,
3152 }
3153}
3154
3155fn build_execution_plan(
3156 graph: &CuGraph,
3157 task_specs: &CuTaskSpecSet,
3158 bridge_specs: &mut [BridgeSpec],
3159) -> CuResult<(
3160 CuExecutionLoop,
3161 Vec<ExecutionEntity>,
3162 HashMap<NodeId, NodeId>,
3163)> {
3164 let mut plan_graph = CuGraph::default();
3165 let mut exec_entities = Vec::new();
3166 let mut original_to_plan = HashMap::new();
3167 let mut plan_to_original = HashMap::new();
3168 let mut name_to_original = HashMap::new();
3169 let mut channel_nodes = HashMap::new();
3170
3171 for (node_id, node) in graph.get_all_nodes() {
3172 name_to_original.insert(node.get_id(), node_id);
3173 if node.get_flavor() != Flavor::Task {
3174 continue;
3175 }
3176 let plan_node_id = plan_graph.add_node(node.clone())?;
3177 let task_index = task_specs.node_id_to_task_index[node_id as usize]
3178 .expect("Task missing from specifications");
3179 plan_to_original.insert(plan_node_id, node_id);
3180 original_to_plan.insert(node_id, plan_node_id);
3181 if plan_node_id as usize != exec_entities.len() {
3182 panic!("Unexpected node ordering while mirroring tasks in plan graph");
3183 }
3184 exec_entities.push(ExecutionEntity {
3185 kind: ExecutionEntityKind::Task { task_index },
3186 });
3187 }
3188
3189 for (bridge_index, spec) in bridge_specs.iter_mut().enumerate() {
3190 for (channel_index, channel_spec) in spec.rx_channels.iter_mut().enumerate() {
3191 let mut node = Node::new(
3192 format!("{}::rx::{}", spec.id, channel_spec.id).as_str(),
3193 "__CuBridgeRxChannel",
3194 );
3195 node.set_flavor(Flavor::Bridge);
3196 let plan_node_id = plan_graph.add_node(node)?;
3197 if plan_node_id as usize != exec_entities.len() {
3198 panic!("Unexpected node ordering while inserting bridge rx channel");
3199 }
3200 channel_spec.plan_node_id = Some(plan_node_id);
3201 exec_entities.push(ExecutionEntity {
3202 kind: ExecutionEntityKind::BridgeRx {
3203 bridge_index,
3204 channel_index,
3205 },
3206 });
3207 channel_nodes.insert(
3208 BridgeChannelKey {
3209 bridge_id: spec.id.clone(),
3210 channel_id: channel_spec.id.clone(),
3211 direction: BridgeChannelDirection::Rx,
3212 },
3213 plan_node_id,
3214 );
3215 }
3216
3217 for (channel_index, channel_spec) in spec.tx_channels.iter_mut().enumerate() {
3218 let mut node = Node::new(
3219 format!("{}::tx::{}", spec.id, channel_spec.id).as_str(),
3220 "__CuBridgeTxChannel",
3221 );
3222 node.set_flavor(Flavor::Bridge);
3223 let plan_node_id = plan_graph.add_node(node)?;
3224 if plan_node_id as usize != exec_entities.len() {
3225 panic!("Unexpected node ordering while inserting bridge tx channel");
3226 }
3227 channel_spec.plan_node_id = Some(plan_node_id);
3228 exec_entities.push(ExecutionEntity {
3229 kind: ExecutionEntityKind::BridgeTx {
3230 bridge_index,
3231 channel_index,
3232 },
3233 });
3234 channel_nodes.insert(
3235 BridgeChannelKey {
3236 bridge_id: spec.id.clone(),
3237 channel_id: channel_spec.id.clone(),
3238 direction: BridgeChannelDirection::Tx,
3239 },
3240 plan_node_id,
3241 );
3242 }
3243 }
3244
3245 for cnx in graph.edges() {
3246 let src_plan = if let Some(channel) = &cnx.src_channel {
3247 let key = BridgeChannelKey {
3248 bridge_id: cnx.src.clone(),
3249 channel_id: channel.clone(),
3250 direction: BridgeChannelDirection::Rx,
3251 };
3252 *channel_nodes
3253 .get(&key)
3254 .unwrap_or_else(|| panic!("Bridge source {:?} missing from plan graph", key))
3255 } else {
3256 let node_id = name_to_original
3257 .get(&cnx.src)
3258 .copied()
3259 .unwrap_or_else(|| panic!("Unknown source node '{}'", cnx.src));
3260 *original_to_plan
3261 .get(&node_id)
3262 .unwrap_or_else(|| panic!("Source node '{}' missing from plan", cnx.src))
3263 };
3264
3265 let dst_plan = if let Some(channel) = &cnx.dst_channel {
3266 let key = BridgeChannelKey {
3267 bridge_id: cnx.dst.clone(),
3268 channel_id: channel.clone(),
3269 direction: BridgeChannelDirection::Tx,
3270 };
3271 *channel_nodes
3272 .get(&key)
3273 .unwrap_or_else(|| panic!("Bridge destination {:?} missing from plan graph", key))
3274 } else {
3275 let node_id = name_to_original
3276 .get(&cnx.dst)
3277 .copied()
3278 .unwrap_or_else(|| panic!("Unknown destination node '{}'", cnx.dst));
3279 *original_to_plan
3280 .get(&node_id)
3281 .unwrap_or_else(|| panic!("Destination node '{}' missing from plan", cnx.dst))
3282 };
3283
3284 plan_graph
3285 .connect_ext(
3286 src_plan,
3287 dst_plan,
3288 &cnx.msg,
3289 cnx.missions.clone(),
3290 None,
3291 None,
3292 )
3293 .map_err(|e| CuError::from(e.to_string()))?;
3294 }
3295
3296 let runtime_plan = compute_runtime_plan(&plan_graph)?;
3297 Ok((runtime_plan, exec_entities, plan_to_original))
3298}
3299
3300fn collect_culist_metadata(
3301 runtime_plan: &CuExecutionLoop,
3302 exec_entities: &[ExecutionEntity],
3303 bridge_specs: &mut [BridgeSpec],
3304 plan_to_original: &HashMap<NodeId, NodeId>,
3305) -> (Vec<usize>, HashMap<NodeId, usize>) {
3306 let mut culist_order = Vec::new();
3307 let mut node_output_positions = HashMap::new();
3308
3309 for unit in &runtime_plan.steps {
3310 if let CuExecutionUnit::Step(step) = unit
3311 && let Some(output_pack) = &step.output_msg_pack
3312 {
3313 let output_idx = output_pack.culist_index;
3314 culist_order.push(output_idx as usize);
3315 match &exec_entities[step.node_id as usize].kind {
3316 ExecutionEntityKind::Task { .. } => {
3317 if let Some(original_node_id) = plan_to_original.get(&step.node_id) {
3318 node_output_positions.insert(*original_node_id, output_idx as usize);
3319 }
3320 }
3321 ExecutionEntityKind::BridgeRx {
3322 bridge_index,
3323 channel_index,
3324 } => {
3325 bridge_specs[*bridge_index].rx_channels[*channel_index].culist_index =
3326 Some(output_idx as usize);
3327 }
3328 ExecutionEntityKind::BridgeTx { .. } => {}
3329 }
3330 }
3331 }
3332
3333 (culist_order, node_output_positions)
3334}
3335
3336#[allow(dead_code)]
3337fn build_monitored_ids(task_ids: &[String], bridge_specs: &mut [BridgeSpec]) -> Vec<String> {
3338 let mut names = task_ids.to_vec();
3339 for spec in bridge_specs.iter_mut() {
3340 spec.monitor_index = Some(names.len());
3341 names.push(format!("bridge::{}", spec.id));
3342 for channel in spec.rx_channels.iter_mut() {
3343 channel.monitor_index = Some(names.len());
3344 names.push(format!("bridge::{}::rx::{}", spec.id, channel.id));
3345 }
3346 for channel in spec.tx_channels.iter_mut() {
3347 channel.monitor_index = Some(names.len());
3348 names.push(format!("bridge::{}::tx::{}", spec.id, channel.id));
3349 }
3350 }
3351 names
3352}
3353
3354fn generate_task_execution_tokens(
3355 step: &CuExecutionStep,
3356 task_index: usize,
3357 task_specs: &CuTaskSpecSet,
3358 output_pack_sizes: &[usize],
3359 sim_mode: bool,
3360 mission_mod: &Ident,
3361) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
3362 let node_index = int2sliceindex(task_index as u32);
3363 let task_instance = quote! { tasks.#node_index };
3364 let comment_str = format!(
3365 "DEBUG ->> {} ({:?}) Id:{} I:{:?} O:{:?}",
3366 step.node.get_id(),
3367 step.task_type,
3368 step.node_id,
3369 step.input_msg_indices_types,
3370 step.output_msg_pack
3371 );
3372 let comment_tokens = quote! {{
3373 let _ = stringify!(#comment_str);
3374 }};
3375 let tid = task_index;
3376 let task_enum_name = config_id_to_enum(&task_specs.ids[tid]);
3377 let enum_name = Ident::new(&task_enum_name, Span::call_site());
3378 let rt_guard = rtsan_guard_tokens();
3379
3380 let output_pack = step
3381 .output_msg_pack
3382 .as_ref()
3383 .expect("Task should have an output message pack.");
3384 let output_culist_index = int2sliceindex(output_pack.culist_index);
3385 let output_ports: Vec<syn::Index> = (0..output_pack.msg_types.len())
3386 .map(syn::Index::from)
3387 .collect();
3388 let output_clear_payload = if output_ports.len() == 1 {
3389 quote! { cumsg_output.clear_payload(); }
3390 } else {
3391 quote! { #(cumsg_output.#output_ports.clear_payload();)* }
3392 };
3393 let output_start_time = if output_ports.len() == 1 {
3394 quote! { cumsg_output.metadata.process_time.start = clock.now().into(); }
3395 } else {
3396 quote! {
3397 let start_time = clock.now().into();
3398 #(cumsg_output.#output_ports.metadata.process_time.start = start_time;)*
3399 }
3400 };
3401 let output_end_time = if output_ports.len() == 1 {
3402 quote! { cumsg_output.metadata.process_time.end = clock.now().into(); }
3403 } else {
3404 quote! {
3405 let end_time = clock.now().into();
3406 #(cumsg_output.#output_ports.metadata.process_time.end = end_time;)*
3407 }
3408 };
3409
3410 match step.task_type {
3411 CuTaskType::Source => {
3412 let monitoring_action = quote! {
3413 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
3414 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
3415 match decision {
3416 Decision::Abort => {
3417 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
3418 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
3419 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
3420 cl_manager.end_of_processing(clid)?;
3421 return Ok(());
3422 }
3423 Decision::Ignore => {
3424 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
3425 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
3426 let cumsg_output = &mut msgs.#output_culist_index;
3427 #output_clear_payload
3428 }
3429 Decision::Shutdown => {
3430 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
3431 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
3432 return Err(CuError::new_with_cause("Task errored out during process.", error));
3433 }
3434 }
3435 };
3436
3437 let call_sim_callback = if sim_mode {
3438 quote! {
3439 let doit = {
3440 let cumsg_output = &mut msgs.#output_culist_index;
3441 let state = CuTaskCallbackState::Process((), cumsg_output);
3442 let ovr = sim_callback(SimStep::#enum_name(state));
3443
3444 if let SimOverride::Errored(reason) = ovr {
3445 let error: CuError = reason.into();
3446 #monitoring_action
3447 false
3448 } else {
3449 ovr == SimOverride::ExecuteByRuntime
3450 }
3451 };
3452 }
3453 } else {
3454 quote! { let doit = true; }
3455 };
3456
3457 let logging_tokens = if !task_specs.logging_enabled[tid] {
3458 quote! {
3459 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
3460 #output_clear_payload
3461 }
3462 } else {
3463 quote!()
3464 };
3465
3466 (
3467 quote! {
3468 {
3469 #comment_tokens
3470 kf_manager.freeze_task(clid, &#task_instance)?;
3471 #call_sim_callback
3472 let cumsg_output = &mut msgs.#output_culist_index;
3473 #output_start_time
3474 let maybe_error = if doit {
3475 #rt_guard
3476 #task_instance.process(clock, cumsg_output)
3477 } else {
3478 Ok(())
3479 };
3480 #output_end_time
3481 if let Err(error) = maybe_error {
3482 #monitoring_action
3483 }
3484 }
3485 },
3486 logging_tokens,
3487 )
3488 }
3489 CuTaskType::Sink => {
3490 let input_exprs: Vec<proc_macro2::TokenStream> = step
3491 .input_msg_indices_types
3492 .iter()
3493 .map(|input| {
3494 let input_index = int2sliceindex(input.culist_index);
3495 let output_size = output_pack_sizes
3496 .get(input.culist_index as usize)
3497 .copied()
3498 .unwrap_or_else(|| {
3499 panic!(
3500 "Missing output pack size for culist index {}",
3501 input.culist_index
3502 )
3503 });
3504 if output_size > 1 {
3505 let port_index = syn::Index::from(input.src_port);
3506 quote! { msgs.#input_index.#port_index }
3507 } else {
3508 quote! { msgs.#input_index }
3509 }
3510 })
3511 .collect();
3512 let inputs_type = if input_exprs.len() == 1 {
3513 let input = input_exprs.first().unwrap();
3514 quote! { #input }
3515 } else {
3516 quote! { (#(&#input_exprs),*) }
3517 };
3518
3519 let monitoring_action = quote! {
3520 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
3521 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
3522 match decision {
3523 Decision::Abort => {
3524 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
3525 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
3526 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
3527 cl_manager.end_of_processing(clid)?;
3528 return Ok(());
3529 }
3530 Decision::Ignore => {
3531 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
3532 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
3533 let cumsg_output = &mut msgs.#output_culist_index;
3534 #output_clear_payload
3535 }
3536 Decision::Shutdown => {
3537 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
3538 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
3539 return Err(CuError::new_with_cause("Task errored out during process.", error));
3540 }
3541 }
3542 };
3543
3544 let call_sim_callback = if sim_mode {
3545 quote! {
3546 let doit = {
3547 let cumsg_input = &#inputs_type;
3548 let cumsg_output = &mut msgs.#output_culist_index;
3549 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
3550 let ovr = sim_callback(SimStep::#enum_name(state));
3551
3552 if let SimOverride::Errored(reason) = ovr {
3553 let error: CuError = reason.into();
3554 #monitoring_action
3555 false
3556 } else {
3557 ovr == SimOverride::ExecuteByRuntime
3558 }
3559 };
3560 }
3561 } else {
3562 quote! { let doit = true; }
3563 };
3564
3565 (
3566 quote! {
3567 {
3568 #comment_tokens
3569 kf_manager.freeze_task(clid, &#task_instance)?;
3570 #call_sim_callback
3571 let cumsg_input = &#inputs_type;
3572 let cumsg_output = &mut msgs.#output_culist_index;
3573 #output_start_time
3574 let maybe_error = if doit {
3575 #rt_guard
3576 #task_instance.process(clock, cumsg_input)
3577 } else {
3578 Ok(())
3579 };
3580 #output_end_time
3581 if let Err(error) = maybe_error {
3582 #monitoring_action
3583 }
3584 }
3585 },
3586 quote! {},
3587 )
3588 }
3589 CuTaskType::Regular => {
3590 let input_exprs: Vec<proc_macro2::TokenStream> = step
3591 .input_msg_indices_types
3592 .iter()
3593 .map(|input| {
3594 let input_index = int2sliceindex(input.culist_index);
3595 let output_size = output_pack_sizes
3596 .get(input.culist_index as usize)
3597 .copied()
3598 .unwrap_or_else(|| {
3599 panic!(
3600 "Missing output pack size for culist index {}",
3601 input.culist_index
3602 )
3603 });
3604 if output_size > 1 {
3605 let port_index = syn::Index::from(input.src_port);
3606 quote! { msgs.#input_index.#port_index }
3607 } else {
3608 quote! { msgs.#input_index }
3609 }
3610 })
3611 .collect();
3612 let inputs_type = if input_exprs.len() == 1 {
3613 let input = input_exprs.first().unwrap();
3614 quote! { #input }
3615 } else {
3616 quote! { (#(&#input_exprs),*) }
3617 };
3618
3619 let monitoring_action = quote! {
3620 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
3621 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
3622 match decision {
3623 Decision::Abort => {
3624 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
3625 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
3626 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
3627 cl_manager.end_of_processing(clid)?;
3628 return Ok(());
3629 }
3630 Decision::Ignore => {
3631 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
3632 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
3633 let cumsg_output = &mut msgs.#output_culist_index;
3634 #output_clear_payload
3635 }
3636 Decision::Shutdown => {
3637 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
3638 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
3639 return Err(CuError::new_with_cause("Task errored out during process.", error));
3640 }
3641 }
3642 };
3643
3644 let call_sim_callback = if sim_mode {
3645 quote! {
3646 let doit = {
3647 let cumsg_input = &#inputs_type;
3648 let cumsg_output = &mut msgs.#output_culist_index;
3649 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
3650 let ovr = sim_callback(SimStep::#enum_name(state));
3651
3652 if let SimOverride::Errored(reason) = ovr {
3653 let error: CuError = reason.into();
3654 #monitoring_action
3655 false
3656 }
3657 else {
3658 ovr == SimOverride::ExecuteByRuntime
3659 }
3660 };
3661 }
3662 } else {
3663 quote! { let doit = true; }
3664 };
3665
3666 let logging_tokens = if !task_specs.logging_enabled[tid] {
3667 quote! {
3668 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
3669 #output_clear_payload
3670 }
3671 } else {
3672 quote!()
3673 };
3674
3675 (
3676 quote! {
3677 {
3678 #comment_tokens
3679 kf_manager.freeze_task(clid, &#task_instance)?;
3680 #call_sim_callback
3681 let cumsg_input = &#inputs_type;
3682 let cumsg_output = &mut msgs.#output_culist_index;
3683 #output_start_time
3684 let maybe_error = if doit {
3685 #rt_guard
3686 #task_instance.process(clock, cumsg_input, cumsg_output)
3687 } else {
3688 Ok(())
3689 };
3690 #output_end_time
3691 if let Err(error) = maybe_error {
3692 #monitoring_action
3693 }
3694 }
3695 },
3696 logging_tokens,
3697 )
3698 }
3699 }
3700}
3701
3702fn generate_bridge_rx_execution_tokens(
3703 step: &CuExecutionStep,
3704 bridge_spec: &BridgeSpec,
3705 channel_index: usize,
3706 mission_mod: &Ident,
3707) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
3708 let rt_guard = rtsan_guard_tokens();
3709 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
3710 let channel = &bridge_spec.rx_channels[channel_index];
3711 let output_pack = step
3712 .output_msg_pack
3713 .as_ref()
3714 .expect("Bridge Rx channel missing output pack");
3715 let port_index = output_pack
3716 .msg_types
3717 .iter()
3718 .position(|msg| msg == &channel.msg_type_name)
3719 .unwrap_or_else(|| {
3720 panic!(
3721 "Bridge Rx channel '{}' missing output port for '{}'",
3722 channel.id, channel.msg_type_name
3723 )
3724 });
3725 let culist_index_ts = int2sliceindex(output_pack.culist_index);
3726 let output_ref = if output_pack.msg_types.len() == 1 {
3727 quote! { &mut msgs.#culist_index_ts }
3728 } else {
3729 let port_index = syn::Index::from(port_index);
3730 quote! { &mut msgs.#culist_index_ts.#port_index }
3731 };
3732 let monitor_index = syn::Index::from(
3733 channel
3734 .monitor_index
3735 .expect("Bridge Rx channel missing monitor index"),
3736 );
3737 let bridge_type = &bridge_spec.type_path;
3738 let const_ident = &channel.const_ident;
3739 (
3740 quote! {
3741 {
3742 let bridge = &mut bridges.#bridge_tuple_index;
3743 let cumsg_output = #output_ref;
3744 cumsg_output.metadata.process_time.start = clock.now().into();
3745 let maybe_error = {
3746 #rt_guard
3747 bridge.receive(
3748 clock,
3749 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
3750 cumsg_output,
3751 )
3752 };
3753 cumsg_output.metadata.process_time.end = clock.now().into();
3754 if let Err(error) = maybe_error {
3755 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
3756 match decision {
3757 Decision::Abort => {
3758 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
3759 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
3760 cl_manager.end_of_processing(clid)?;
3761 return Ok(());
3762 }
3763 Decision::Ignore => {
3764 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
3765 let cumsg_output = #output_ref;
3766 cumsg_output.clear_payload();
3767 }
3768 Decision::Shutdown => {
3769 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
3770 return Err(CuError::new_with_cause("Task errored out during process.", error));
3771 }
3772 }
3773 }
3774 }
3775 },
3776 quote! {},
3777 )
3778}
3779
3780fn generate_bridge_tx_execution_tokens(
3781 step: &CuExecutionStep,
3782 bridge_spec: &BridgeSpec,
3783 channel_index: usize,
3784 output_pack_sizes: &[usize],
3785 mission_mod: &Ident,
3786) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
3787 let rt_guard = rtsan_guard_tokens();
3788 let channel = &bridge_spec.tx_channels[channel_index];
3789 let monitor_index = syn::Index::from(
3790 channel
3791 .monitor_index
3792 .expect("Bridge Tx channel missing monitor index"),
3793 );
3794 let input = step
3795 .input_msg_indices_types
3796 .first()
3797 .expect("Bridge Tx channel should have exactly one input");
3798 let input_index = int2sliceindex(input.culist_index);
3799 let output_size = output_pack_sizes
3800 .get(input.culist_index as usize)
3801 .copied()
3802 .unwrap_or_else(|| {
3803 panic!(
3804 "Missing output pack size for culist index {}",
3805 input.culist_index
3806 )
3807 });
3808 let input_ref = if output_size > 1 {
3809 let port_index = syn::Index::from(input.src_port);
3810 quote! { &mut msgs.#input_index.#port_index }
3811 } else {
3812 quote! { &mut msgs.#input_index }
3813 };
3814 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
3815 let bridge_type = &bridge_spec.type_path;
3816 let const_ident = &channel.const_ident;
3817 (
3818 quote! {
3819 {
3820 let bridge = &mut bridges.#bridge_tuple_index;
3821 let cumsg_input = #input_ref;
3822 cumsg_input.metadata.process_time.start = clock.now().into();
3824 let maybe_error = {
3825 #rt_guard
3826 bridge.send(
3827 clock,
3828 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
3829 &*cumsg_input,
3830 )
3831 };
3832 if let Err(error) = maybe_error {
3833 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
3834 match decision {
3835 Decision::Abort => {
3836 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
3837 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
3838 cl_manager.end_of_processing(clid)?;
3839 return Ok(());
3840 }
3841 Decision::Ignore => {
3842 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
3843 }
3844 Decision::Shutdown => {
3845 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
3846 return Err(CuError::new_with_cause("Task errored out during process.", error));
3847 }
3848 }
3849 }
3850 cumsg_input.metadata.process_time.end = clock.now().into();
3851 }
3852 },
3853 quote! {},
3854 )
3855}
3856
3857#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
3858enum BridgeChannelDirection {
3859 Rx,
3860 Tx,
3861}
3862
3863#[derive(Clone, Debug, PartialEq, Eq, Hash)]
3864struct BridgeChannelKey {
3865 bridge_id: String,
3866 channel_id: String,
3867 direction: BridgeChannelDirection,
3868}
3869
3870#[derive(Clone)]
3871struct BridgeChannelSpec {
3872 id: String,
3873 const_ident: Ident,
3874 #[allow(dead_code)]
3875 msg_type: Type,
3876 msg_type_name: String,
3877 config_index: usize,
3878 plan_node_id: Option<NodeId>,
3879 culist_index: Option<usize>,
3880 monitor_index: Option<usize>,
3881}
3882
3883#[derive(Clone)]
3884struct BridgeSpec {
3885 id: String,
3886 type_path: Type,
3887 config_index: usize,
3888 tuple_index: usize,
3889 monitor_index: Option<usize>,
3890 rx_channels: Vec<BridgeChannelSpec>,
3891 tx_channels: Vec<BridgeChannelSpec>,
3892}
3893
3894#[derive(Clone)]
3895struct ExecutionEntity {
3896 kind: ExecutionEntityKind,
3897}
3898
3899#[derive(Clone)]
3900enum ExecutionEntityKind {
3901 Task {
3902 task_index: usize,
3903 },
3904 BridgeRx {
3905 bridge_index: usize,
3906 channel_index: usize,
3907 },
3908 BridgeTx {
3909 bridge_index: usize,
3910 channel_index: usize,
3911 },
3912}
3913
3914#[cfg(test)]
3915mod tests {
3916 #[test]
3918 fn test_compile_fail() {
3919 use rustc_version::{Channel, version_meta};
3920 use std::{fs, path::Path};
3921
3922 let dir = Path::new("tests/compile_fail");
3923 for entry in fs::read_dir(dir).unwrap() {
3924 let entry = entry.unwrap();
3925 if !entry.file_type().unwrap().is_dir() {
3926 continue;
3927 }
3928 for file in fs::read_dir(entry.path()).unwrap() {
3929 let file = file.unwrap();
3930 let p = file.path();
3931 if p.extension().and_then(|x| x.to_str()) != Some("rs") {
3932 continue;
3933 }
3934
3935 let base = p.with_extension("stderr"); let src = match version_meta().unwrap().channel {
3937 Channel::Beta => Path::new(&format!("{}.beta", base.display())).to_path_buf(),
3938 _ => Path::new(&format!("{}.stable", base.display())).to_path_buf(),
3939 };
3940
3941 if src.exists() {
3942 fs::copy(src, &base).unwrap();
3943 }
3944 }
3945 }
3946
3947 let t = trybuild::TestCases::new();
3948 t.compile_fail("tests/compile_fail/*/*.rs");
3949 }
3950
3951 #[test]
3952 fn bridge_resources_are_collected() {
3953 use super::*;
3954 use cu29::config::{CuGraph, Flavor, Node};
3955 use std::collections::HashMap;
3956 use syn::parse_str;
3957
3958 let mut graph = CuGraph::default();
3959 let mut node = Node::new_with_flavor("radio", "bridge::Dummy", Flavor::Bridge);
3960 let mut res = HashMap::new();
3961 res.insert("serial".to_string(), "fc.serial0".to_string());
3962 node.set_resources(Some(res));
3963 graph.add_node(node).expect("bridge node");
3964
3965 let task_specs = CuTaskSpecSet::from_graph(&graph);
3966 let bridge_spec = BridgeSpec {
3967 id: "radio".to_string(),
3968 type_path: parse_str("bridge::Dummy").unwrap(),
3969 config_index: 0,
3970 tuple_index: 0,
3971 monitor_index: None,
3972 rx_channels: Vec::new(),
3973 tx_channels: Vec::new(),
3974 };
3975
3976 let mut config = cu29::config::CuConfig::default();
3977 config.resources.push(ResourceBundleConfig {
3978 id: "fc".to_string(),
3979 provider: "board::Bundle".to_string(),
3980 config: None,
3981 missions: None,
3982 });
3983 let bundle_specs = build_bundle_specs(&config, "default").expect("bundle specs");
3984 let specs = collect_resource_specs(&graph, &task_specs, &[bridge_spec], &bundle_specs)
3985 .expect("collect specs");
3986 assert_eq!(specs.len(), 1);
3987 assert!(matches!(specs[0].owner, ResourceOwner::Bridge(0)));
3988 assert_eq!(specs[0].binding_name, "serial");
3989 assert_eq!(specs[0].bundle_index, 0);
3990 assert_eq!(specs[0].resource_name, "serial0");
3991 }
3992}