use proc_macro::TokenStream;
use quote::{format_ident, quote};
use std::collections::{BTreeMap, HashMap};
use std::fs::read_to_string;
use std::path::Path;
use std::process::Command;
use syn::Fields::{Named, Unnamed};
use syn::meta::parser;
use syn::{
Field, Fields, ItemImpl, ItemStruct, LitStr, Type, TypeTuple, parse_macro_input, parse_quote,
parse_str,
};
use crate::utils::{config_id_to_bridge_const, config_id_to_enum, config_id_to_struct_member};
use cu29_runtime::config::CuConfig;
use cu29_runtime::config::{
BridgeChannelConfigRepresentation, ConfigGraphs, CuGraph, Flavor, Node, NodeId,
ResourceBundleConfig, read_configuration,
};
use cu29_runtime::curuntime::{
CuExecutionLoop, CuExecutionStep, CuExecutionUnit, CuTaskType, compute_runtime_plan,
find_task_type_for_id,
};
use cu29_traits::{CuError, CuResult};
use proc_macro2::{Ident, Span};
mod bundle_resources;
mod resources;
mod utils;
const DEFAULT_CLNB: usize = 2;
#[inline]
fn int2sliceindex(i: u32) -> syn::Index {
syn::Index::from(i as usize)
}
#[inline(always)]
fn return_error(msg: String) -> TokenStream {
syn::Error::new(Span::call_site(), msg)
.to_compile_error()
.into()
}
fn rtsan_guard_tokens() -> proc_macro2::TokenStream {
if cfg!(feature = "rtsan") {
quote! {
let _rt_guard = ::cu29::rtsan::ScopedSanitizeRealtime::default();
}
} else {
quote! {}
}
}
fn git_output_trimmed(repo_root: &Path, args: &[&str]) -> Option<String> {
let output = Command::new("git")
.arg("-C")
.arg(repo_root)
.args(args)
.output()
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8(output.stdout).ok()?;
Some(stdout.trim().to_string())
}
fn detect_git_info(repo_root: &Path) -> (Option<String>, Option<bool>) {
let in_repo = git_output_trimmed(repo_root, &["rev-parse", "--is-inside-work-tree"])
.is_some_and(|value| value == "true");
if !in_repo {
return (None, None);
}
let commit = git_output_trimmed(repo_root, &["rev-parse", "HEAD"]).filter(|s| !s.is_empty());
let dirty = git_output_trimmed(repo_root, &["status", "--porcelain"]).map(|s| !s.is_empty());
(commit, dirty)
}
#[proc_macro]
pub fn resources(input: TokenStream) -> TokenStream {
resources::resources(input)
}
#[proc_macro]
pub fn bundle_resources(input: TokenStream) -> TokenStream {
bundle_resources::bundle_resources(input)
}
#[proc_macro]
pub fn gen_cumsgs(config_path_lit: TokenStream) -> TokenStream {
#[cfg(feature = "std")]
let std = true;
#[cfg(not(feature = "std"))]
let std = false;
let config = parse_macro_input!(config_path_lit as LitStr).value();
if !std::path::Path::new(&config_full_path(&config)).exists() {
return return_error(format!(
"The configuration file `{config}` does not exist. Please provide a valid path."
));
}
#[cfg(feature = "macro_debug")]
eprintln!("[gen culist support with {config:?}]");
let cuconfig = match read_config(&config) {
Ok(cuconfig) => cuconfig,
Err(e) => return return_error(e.to_string()),
};
let extra_imports = if !std {
quote! {
use core::fmt::Debug;
use core::fmt::Formatter;
use core::fmt::Result as FmtResult;
use alloc::vec;
use alloc::vec::Vec;
}
} else {
quote! {
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Result as FmtResult;
}
};
let common_imports = quote! {
use cu29::bincode::Encode;
use cu29::bincode::enc::Encoder;
use cu29::bincode::error::EncodeError;
use cu29::bincode::Decode;
use cu29::bincode::de::Decoder;
use cu29::bincode::error::DecodeError;
use cu29::copperlist::CopperList;
use cu29::prelude::ErasedCuStampedData;
use cu29::prelude::ErasedCuStampedDataSet;
use cu29::prelude::MatchingTasks;
use cu29::prelude::Serialize;
use cu29::prelude::CuMsg;
use cu29::prelude::CuMsgMetadata;
use cu29::prelude::CuListZeroedInit;
use cu29::prelude::CuCompactString;
#extra_imports
};
let with_uses = match &cuconfig.graphs {
ConfigGraphs::Simple(graph) => {
let support = match build_gen_cumsgs_support(&cuconfig, graph, None) {
Ok(support) => support,
Err(e) => return return_error(e.to_string()),
};
quote! {
mod cumsgs {
#common_imports
#support
}
use cumsgs::CuStampedDataSet;
type CuMsgs=CuStampedDataSet;
}
}
ConfigGraphs::Missions(graphs) => {
let mut missions: Vec<_> = graphs.iter().collect();
missions.sort_by(|a, b| a.0.cmp(b.0));
let mut mission_modules = Vec::<proc_macro2::TokenStream>::new();
for (mission, graph) in missions {
let mission_mod = match parse_str::<Ident>(mission.as_str()) {
Ok(id) => id,
Err(_) => {
return return_error(format!(
"Mission '{mission}' is not a valid Rust identifier for gen_cumsgs output."
));
}
};
let support = match build_gen_cumsgs_support(&cuconfig, graph, Some(mission)) {
Ok(support) => support,
Err(e) => return return_error(e.to_string()),
};
mission_modules.push(quote! {
pub mod #mission_mod {
#common_imports
#support
}
});
}
let default_exports = if graphs.contains_key("default") {
quote! {
use cumsgs::default::CuStampedDataSet;
type CuMsgs=CuStampedDataSet;
}
} else {
quote! {}
};
quote! {
mod cumsgs {
#(#mission_modules)*
}
#default_exports
}
}
};
with_uses.into()
}
fn build_gen_cumsgs_support(
cuconfig: &CuConfig,
graph: &CuGraph,
mission_label: Option<&str>,
) -> CuResult<proc_macro2::TokenStream> {
let task_specs = CuTaskSpecSet::from_graph(graph);
let channel_usage = collect_bridge_channel_usage(graph);
let mut bridge_specs = build_bridge_specs(cuconfig, graph, &channel_usage);
let (culist_plan, exec_entities, plan_to_original) =
build_execution_plan(graph, &task_specs, &mut bridge_specs).map_err(|e| {
if let Some(mission) = mission_label {
CuError::from(format!(
"Could not compute copperlist plan for mission '{mission}': {e}"
))
} else {
CuError::from(format!("Could not compute copperlist plan: {e}"))
}
})?;
let task_names = collect_task_names(graph);
let (culist_order, node_output_positions) = collect_culist_metadata(
&culist_plan,
&exec_entities,
&mut bridge_specs,
&plan_to_original,
);
#[cfg(feature = "macro_debug")]
if let Some(mission) = mission_label {
eprintln!(
"[The CuStampedDataSet matching tasks ids for mission '{mission}' are {:?}]",
culist_order
);
} else {
eprintln!(
"[The CuStampedDataSet matching tasks ids are {:?}]",
culist_order
);
}
Ok(gen_culist_support(
&culist_plan,
&culist_order,
&node_output_positions,
&task_names,
&bridge_specs,
))
}
fn gen_culist_support(
runtime_plan: &CuExecutionLoop,
culist_indices_in_plan_order: &[usize],
node_output_positions: &HashMap<NodeId, usize>,
task_names: &[(NodeId, String, String)],
bridge_specs: &[BridgeSpec],
) -> proc_macro2::TokenStream {
#[cfg(feature = "macro_debug")]
eprintln!("[Extract msgs types]");
let output_packs = extract_output_packs(runtime_plan);
let slot_types: Vec<Type> = output_packs.iter().map(|pack| pack.slot_type()).collect();
let culist_size = output_packs.len();
#[cfg(feature = "macro_debug")]
eprintln!("[build the copperlist struct]");
let msgs_types_tuple: TypeTuple = build_culist_tuple(&slot_types);
#[cfg(feature = "macro_debug")]
eprintln!("[build the copperlist tuple bincode support]");
let msgs_types_tuple_encode = build_culist_tuple_encode(&slot_types);
let msgs_types_tuple_decode = build_culist_tuple_decode(&slot_types);
#[cfg(feature = "macro_debug")]
eprintln!("[build the copperlist tuple debug support]");
let msgs_types_tuple_debug = build_culist_tuple_debug(&slot_types);
#[cfg(feature = "macro_debug")]
eprintln!("[build the copperlist tuple serialize support]");
let msgs_types_tuple_serialize = build_culist_tuple_serialize(&slot_types);
#[cfg(feature = "macro_debug")]
eprintln!("[build the default tuple support]");
let msgs_types_tuple_default = build_culist_tuple_default(&slot_types);
#[cfg(feature = "macro_debug")]
eprintln!("[build erasedcumsgs]");
let erasedmsg_trait_impl = build_culist_erasedcumsgs(&output_packs);
let metadata_accessors: Vec<proc_macro2::TokenStream> = culist_indices_in_plan_order
.iter()
.map(|idx| {
let slot_index = syn::Index::from(*idx);
let pack = output_packs
.get(*idx)
.unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
if pack.is_multi() {
quote! { &culist.msgs.0.#slot_index.0.metadata }
} else {
quote! { &culist.msgs.0.#slot_index.metadata }
}
})
.collect();
let mut zeroed_init_tokens: Vec<proc_macro2::TokenStream> = Vec::new();
for idx in culist_indices_in_plan_order {
let slot_index = syn::Index::from(*idx);
let pack = output_packs
.get(*idx)
.unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
if pack.is_multi() {
for port_idx in 0..pack.msg_types.len() {
let port_index = syn::Index::from(port_idx);
zeroed_init_tokens.push(quote! {
self.0.#slot_index.#port_index.metadata.status_txt = CuCompactString::default();
self.0.#slot_index.#port_index.metadata.process_time.start =
cu29::clock::OptionCuTime::none();
self.0.#slot_index.#port_index.metadata.process_time.end =
cu29::clock::OptionCuTime::none();
});
}
} else {
zeroed_init_tokens.push(quote! {
self.0.#slot_index.metadata.status_txt = CuCompactString::default();
self.0.#slot_index.metadata.process_time.start = cu29::clock::OptionCuTime::none();
self.0.#slot_index.metadata.process_time.end = cu29::clock::OptionCuTime::none();
});
}
}
let collect_metadata_function = quote! {
pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a CuMsgMetadata; #culist_size] {
[#( #metadata_accessors, )*]
}
};
let cumsg_count: usize = output_packs.iter().map(|pack| pack.msg_types.len()).sum();
let payload_bytes_accumulators: Vec<proc_macro2::TokenStream> = culist_indices_in_plan_order
.iter()
.map(|idx| {
let slot_index = syn::Index::from(*idx);
let pack = output_packs
.get(*idx)
.unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
if pack.is_multi() {
let iter = (0..pack.msg_types.len()).map(|port_idx| {
let port_index = syn::Index::from(port_idx);
quote! {
if let Some(payload) = culist.msgs.0.#slot_index.#port_index.payload() {
raw += cu29::monitoring::CuPayloadSize::raw_bytes(payload);
handles += cu29::monitoring::CuPayloadSize::handle_bytes(payload);
}
}
});
quote! { #(#iter)* }
} else {
quote! {
if let Some(payload) = culist.msgs.0.#slot_index.payload() {
raw += cu29::monitoring::CuPayloadSize::raw_bytes(payload);
handles += cu29::monitoring::CuPayloadSize::handle_bytes(payload);
}
}
}
})
.collect();
let payload_raw_bytes_accumulators: Vec<proc_macro2::TokenStream> = output_packs
.iter()
.enumerate()
.map(|(slot_idx, pack)| {
let slot_index = syn::Index::from(slot_idx);
if pack.is_multi() {
let iter = (0..pack.msg_types.len()).map(|port_idx| {
let port_index = syn::Index::from(port_idx);
quote! {
if let Some(payload) = self.0.#slot_index.#port_index.payload() {
bytes.push(Some(
cu29::monitoring::CuPayloadSize::raw_bytes(payload) as u64
));
} else {
bytes.push(None);
}
}
});
quote! { #(#iter)* }
} else {
quote! {
if let Some(payload) = self.0.#slot_index.payload() {
bytes.push(Some(
cu29::monitoring::CuPayloadSize::raw_bytes(payload) as u64
));
} else {
bytes.push(None);
}
}
}
})
.collect();
let compute_payload_bytes_fn = quote! {
pub fn compute_payload_bytes(culist: &CuList) -> (u64, u64) {
let mut raw: usize = 0;
let mut handles: usize = 0;
#(#payload_bytes_accumulators)*
(raw as u64, handles as u64)
}
};
let payload_raw_bytes_impl = quote! {
impl ::cu29::CuPayloadRawBytes for CuStampedDataSet {
fn payload_raw_bytes(&self) -> Vec<Option<u64>> {
let mut bytes: Vec<Option<u64>> = Vec::with_capacity(#cumsg_count);
#(#payload_raw_bytes_accumulators)*
bytes
}
}
};
let mut slot_origin_ids: Vec<Option<String>> = vec![None; output_packs.len()];
let mut slot_task_names: Vec<Option<String>> = vec![None; output_packs.len()];
let mut methods = Vec::new();
for (node_id, task_id, member_name) in task_names {
let output_position = node_output_positions.get(node_id).unwrap_or_else(|| {
panic!("Task {task_id} (node id: {node_id}) not found in execution order")
});
let pack = output_packs
.get(*output_position)
.unwrap_or_else(|| panic!("Missing output pack for task {task_id}"));
let slot_index = syn::Index::from(*output_position);
slot_origin_ids[*output_position] = Some(task_id.clone());
slot_task_names[*output_position] = Some(member_name.clone());
if pack.msg_types.len() == 1 {
let fn_name = format_ident!("get_{}_output", member_name);
let payload_type = pack.msg_types.first().unwrap();
methods.push(quote! {
#[allow(dead_code)]
pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
&self.0.#slot_index
}
});
} else {
let outputs_fn = format_ident!("get_{}_outputs", member_name);
let slot_type = pack.slot_type();
for (port_idx, payload_type) in pack.msg_types.iter().enumerate() {
let fn_name = format_ident!("get_{}_output_{}", member_name, port_idx);
let port_index = syn::Index::from(port_idx);
methods.push(quote! {
#[allow(dead_code)]
pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
&self.0.#slot_index.#port_index
}
});
}
methods.push(quote! {
#[allow(dead_code)]
pub fn #outputs_fn(&self) -> &#slot_type {
&self.0.#slot_index
}
});
}
}
for spec in bridge_specs {
for channel in &spec.rx_channels {
if let Some(culist_index) = channel.culist_index {
let origin_id = format!("bridge::{}::rx::{}", spec.id, channel.id);
let Some(existing_slot) = slot_origin_ids.get_mut(culist_index) else {
panic!(
"Bridge origin '{origin_id}' points to out-of-range copperlist slot {culist_index}"
);
};
if let Some(existing) = existing_slot.as_ref() {
panic!(
"Duplicate slot origin assignment for slot {culist_index}: '{existing}' and '{origin_id}'"
);
}
*existing_slot = Some(origin_id.clone());
let Some(slot_name) = slot_task_names.get_mut(culist_index) else {
panic!(
"Bridge origin '{origin_id}' points to out-of-range name slot {culist_index}"
);
};
*slot_name = Some(origin_id);
}
}
for channel in &spec.tx_channels {
if let Some(culist_index) = channel.culist_index {
let origin_id = format!("bridge::{}::tx::{}", spec.id, channel.id);
let Some(existing_slot) = slot_origin_ids.get_mut(culist_index) else {
panic!(
"Bridge origin '{origin_id}' points to out-of-range copperlist slot {culist_index}"
);
};
if let Some(existing) = existing_slot.as_ref() {
panic!(
"Duplicate slot origin assignment for slot {culist_index}: '{existing}' and '{origin_id}'"
);
}
*existing_slot = Some(origin_id.clone());
let Some(slot_name) = slot_task_names.get_mut(culist_index) else {
panic!(
"Bridge origin '{origin_id}' points to out-of-range name slot {culist_index}"
);
};
*slot_name = Some(origin_id);
}
}
}
let task_name_literals = flatten_slot_origin_ids(&output_packs, slot_origin_ids);
let mut logviz_blocks = Vec::new();
for (slot_idx, pack) in output_packs.iter().enumerate() {
if pack.msg_types.is_empty() {
continue;
}
let slot_index = syn::Index::from(slot_idx);
let slot_name = slot_task_names.get(slot_idx).and_then(|name| name.as_ref());
if pack.is_multi() {
for (port_idx, _) in pack.msg_types.iter().enumerate() {
let port_index = syn::Index::from(port_idx);
let path_expr = if let Some(name) = slot_name {
let lit = LitStr::new(name, Span::call_site());
quote! { format!("{}/{}", #lit, #port_idx) }
} else {
quote! { format!("slot_{}/{}", #slot_idx, #port_idx) }
};
logviz_blocks.push(quote! {
{
let msg = &self.0.#slot_index.#port_index;
if let Some(payload) = msg.payload() {
::cu29_logviz::apply_tov(rec, &msg.tov);
let path = #path_expr;
::cu29_logviz::log_payload_auto(rec, &path, payload)?;
}
}
});
}
} else {
let path_expr = if let Some(name) = slot_name {
let lit = LitStr::new(name, Span::call_site());
quote! { #lit.to_string() }
} else {
quote! { format!("slot_{}", #slot_idx) }
};
logviz_blocks.push(quote! {
{
let msg = &self.0.#slot_index;
if let Some(payload) = msg.payload() {
::cu29_logviz::apply_tov(rec, &msg.tov);
let path = #path_expr;
::cu29_logviz::log_payload_auto(rec, &path, payload)?;
}
}
});
}
}
let logviz_impl = if cfg!(feature = "logviz") {
quote! {
impl ::cu29_logviz::LogvizDataSet for CuStampedDataSet {
fn logviz_emit(
&self,
rec: &::cu29_logviz::RecordingStream,
) -> ::cu29::prelude::CuResult<()> {
#(#logviz_blocks)*
Ok(())
}
}
}
} else {
quote! {}
};
for spec in bridge_specs {
for channel in &spec.rx_channels {
if let Some(culist_index) = channel.culist_index {
let slot_index = syn::Index::from(culist_index);
let bridge_name = config_id_to_struct_member(spec.id.as_str());
let channel_name = config_id_to_struct_member(channel.id.as_str());
let fn_name = format_ident!("get_{}_rx_{}", bridge_name, channel_name);
let msg_type = &channel.msg_type;
methods.push(quote! {
#[allow(dead_code)]
pub fn #fn_name(&self) -> &CuMsg<#msg_type> {
&self.0.#slot_index
}
});
}
}
}
quote! {
#collect_metadata_function
#compute_payload_bytes_fn
pub struct CuStampedDataSet(pub #msgs_types_tuple);
pub type CuList = CopperList<CuStampedDataSet>;
impl CuStampedDataSet {
#(#methods)*
#[allow(dead_code)]
fn get_tuple(&self) -> &#msgs_types_tuple {
&self.0
}
#[allow(dead_code)]
fn get_tuple_mut(&mut self) -> &mut #msgs_types_tuple {
&mut self.0
}
}
#payload_raw_bytes_impl
#logviz_impl
impl MatchingTasks for CuStampedDataSet {
#[allow(dead_code)]
fn get_all_task_ids() -> &'static [&'static str] {
&[#(#task_name_literals),*]
}
}
#msgs_types_tuple_encode
#msgs_types_tuple_decode
#msgs_types_tuple_debug
#msgs_types_tuple_serialize
#msgs_types_tuple_default
#erasedmsg_trait_impl
impl CuListZeroedInit for CuStampedDataSet {
fn init_zeroed(&mut self) {
#(#zeroed_init_tokens)*
}
}
}
}
fn gen_sim_support(
runtime_plan: &CuExecutionLoop,
exec_entities: &[ExecutionEntity],
bridge_specs: &[BridgeSpec],
) -> proc_macro2::TokenStream {
#[cfg(feature = "macro_debug")]
eprintln!("[Sim: Build SimEnum]");
let plan_enum: Vec<proc_macro2::TokenStream> = runtime_plan
.steps
.iter()
.map(|unit| match unit {
CuExecutionUnit::Step(step) => match &exec_entities[step.node_id as usize].kind {
ExecutionEntityKind::Task { .. } => {
let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
let inputs: Vec<Type> = step
.input_msg_indices_types
.iter()
.map(|input| {
parse_str::<Type>(format!("CuMsg<{}>", input.msg_type).as_str()).unwrap()
})
.collect();
let output: Option<Type> = step.output_msg_pack.as_ref().map(|pack| {
let msg_types: Vec<Type> = pack
.msg_types
.iter()
.map(|msg_type| {
parse_str::<Type>(msg_type.as_str()).unwrap_or_else(|_| {
panic!("Could not transform {msg_type} into a message Rust type.")
})
})
.collect();
build_output_slot_type(&msg_types)
});
let no_output = parse_str::<Type>("CuMsg<()>").unwrap();
let output = output.as_ref().unwrap_or(&no_output);
let inputs_type = if inputs.is_empty() {
quote! { () }
} else if inputs.len() == 1 {
let input = inputs.first().unwrap();
quote! { &'a #input }
} else {
quote! { &'a (#(&'a #inputs),*) }
};
quote! {
#enum_ident(CuTaskCallbackState<#inputs_type, &'a mut #output>)
}
}
ExecutionEntityKind::BridgeRx { bridge_index, channel_index } => {
let bridge_spec = &bridge_specs[*bridge_index];
let channel = &bridge_spec.rx_channels[*channel_index];
let enum_entry_name = config_id_to_enum(&format!("{}_rx_{}", bridge_spec.id, channel.id));
let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
let channel_type: Type = parse_str::<Type>(channel.msg_type_name.as_str()).unwrap();
let bridge_type = runtime_bridge_type_for_spec(bridge_spec, true);
let _const_ident = &channel.const_ident;
quote! {
#enum_ident {
channel: &'static cu29::cubridge::BridgeChannel<< <#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet >::Id, #channel_type>,
msg: &'a mut CuMsg<#channel_type>,
}
}
}
ExecutionEntityKind::BridgeTx { bridge_index, channel_index } => {
let bridge_spec = &bridge_specs[*bridge_index];
let channel = &bridge_spec.tx_channels[*channel_index];
let enum_entry_name = config_id_to_enum(&format!("{}_tx_{}", bridge_spec.id, channel.id));
let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
let channel_type: Type = parse_str::<Type>(channel.msg_type_name.as_str()).unwrap();
let bridge_type = runtime_bridge_type_for_spec(bridge_spec, true);
let _const_ident = &channel.const_ident;
quote! {
#enum_ident {
channel: &'static cu29::cubridge::BridgeChannel<< <#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet >::Id, #channel_type>,
msg: &'a CuMsg<#channel_type>,
}
}
}
},
CuExecutionUnit::Loop(_) => {
todo!("Needs to be implemented")
}
})
.collect();
let mut variants = plan_enum;
for bridge_spec in bridge_specs {
let enum_entry_name = config_id_to_enum(&format!("{}_bridge", bridge_spec.id));
let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
variants.push(quote! {
#enum_ident(cu29::simulation::CuBridgeLifecycleState)
});
}
variants.push(quote! { __Phantom(core::marker::PhantomData<&'a ()>) });
quote! {
#[allow(dead_code, unused_lifetimes)]
pub enum SimStep<'a> {
#(#variants),*
}
}
}
#[proc_macro_attribute]
pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
#[cfg(feature = "macro_debug")]
eprintln!("[entry]");
let mut application_struct = parse_macro_input!(input as ItemStruct);
let application_name = &application_struct.ident;
let builder_name = format_ident!("{}Builder", application_name);
let mut config_file: Option<LitStr> = None;
let mut sim_mode = false;
let mut ignore_resources = false;
#[cfg(feature = "std")]
let std = true;
#[cfg(not(feature = "std"))]
let std = false;
let signal_handler = cfg!(feature = "signal-handler");
let rt_guard = rtsan_guard_tokens();
let attribute_config_parser = parser(|meta| {
if meta.path.is_ident("config") {
config_file = Some(meta.value()?.parse()?);
Ok(())
} else if meta.path.is_ident("sim_mode") {
if meta.input.peek(syn::Token![=]) {
meta.input.parse::<syn::Token![=]>()?;
let value: syn::LitBool = meta.input.parse()?;
sim_mode = value.value();
Ok(())
} else {
sim_mode = true;
Ok(())
}
} else if meta.path.is_ident("ignore_resources") {
if meta.input.peek(syn::Token![=]) {
meta.input.parse::<syn::Token![=]>()?;
let value: syn::LitBool = meta.input.parse()?;
ignore_resources = value.value();
Ok(())
} else {
ignore_resources = true;
Ok(())
}
} else {
Err(meta.error("unsupported property"))
}
});
#[cfg(feature = "macro_debug")]
eprintln!("[parse]");
parse_macro_input!(args with attribute_config_parser);
if ignore_resources && !sim_mode {
return return_error(
"`ignore_resources` is only supported when `sim_mode` is enabled".to_string(),
);
}
let config_file = match config_file {
Some(file) => file.value(),
None => {
return return_error(
"Expected config file attribute like #[CopperRuntime(config = \"path\")]"
.to_string(),
);
}
};
if !std::path::Path::new(&config_full_path(&config_file)).exists() {
return return_error(format!(
"The configuration file `{config_file}` does not exist. Please provide a valid path."
));
}
let copper_config = match read_config(&config_file) {
Ok(cuconfig) => cuconfig,
Err(e) => return return_error(e.to_string()),
};
let copper_config_content = match read_to_string(config_full_path(config_file.as_str())) {
Ok(ok) => ok,
Err(e) => {
return return_error(format!(
"Could not read the config file (should not happen because we just succeeded just before). {e}"
));
}
};
let caller_root = utils::caller_crate_root();
let (git_commit, git_dirty) = detect_git_info(&caller_root);
let git_commit_tokens = if let Some(commit) = git_commit {
quote! { Some(#commit.to_string()) }
} else {
quote! { None }
};
let git_dirty_tokens = if let Some(dirty) = git_dirty {
quote! { Some(#dirty) }
} else {
quote! { None }
};
#[cfg(feature = "macro_debug")]
eprintln!("[build monitor type]");
let monitor_configs = copper_config.get_monitor_configs();
let (monitor_type, monitor_instanciator_body) = if monitor_configs.is_empty() {
(
quote! { NoMonitor },
quote! {
let monitor = NoMonitor::new(metadata, runtime)
.expect("Failed to create NoMonitor.");
monitor
},
)
} else if monitor_configs.len() == 1 {
let only_monitor_type = parse_str::<Type>(monitor_configs[0].get_type())
.expect("Could not transform the monitor type name into a Rust type.");
(
quote! { #only_monitor_type },
quote! {
let monitor_metadata = metadata.with_monitor_config(
config
.get_monitor_configs()
.first()
.and_then(|entry| entry.get_config().cloned())
);
let monitor = #only_monitor_type::new(monitor_metadata, runtime)
.expect("Failed to create the given monitor.");
monitor
},
)
} else {
let monitor_types: Vec<Type> = monitor_configs
.iter()
.map(|monitor_config| {
parse_str::<Type>(monitor_config.get_type())
.expect("Could not transform the monitor type name into a Rust type.")
})
.collect();
let monitor_bindings: Vec<Ident> = (0..monitor_types.len())
.map(|idx| format_ident!("__cu_monitor_{idx}"))
.collect();
let monitor_indices: Vec<syn::Index> =
(0..monitor_types.len()).map(syn::Index::from).collect();
let monitor_builders: Vec<proc_macro2::TokenStream> = monitor_types
.iter()
.zip(monitor_bindings.iter())
.zip(monitor_indices.iter())
.map(|((monitor_ty, monitor_binding), monitor_idx)| {
quote! {
let __cu_monitor_cfg_entry = config
.get_monitor_configs()
.get(#monitor_idx)
.and_then(|entry| entry.get_config().cloned());
let __cu_monitor_metadata = metadata
.clone()
.with_monitor_config(__cu_monitor_cfg_entry);
let #monitor_binding = #monitor_ty::new(__cu_monitor_metadata, runtime.clone())
.expect("Failed to create one of the configured monitors.");
}
})
.collect();
let tuple_type: TypeTuple = parse_quote! { (#(#monitor_types),*,) };
(
quote! { #tuple_type },
quote! {
#(#monitor_builders)*
let monitor: #tuple_type = (#(#monitor_bindings),*,);
monitor
},
)
};
#[cfg(feature = "macro_debug")]
eprintln!("[build runtime field]");
let runtime_field: Field = if sim_mode {
parse_quote! {
copper_runtime: cu29::curuntime::CuRuntime<CuSimTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
}
} else {
parse_quote! {
copper_runtime: cu29::curuntime::CuRuntime<CuTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
}
};
let lifecycle_stream_field: Field = parse_quote! {
runtime_lifecycle_stream: Option<Box<dyn WriteStream<RuntimeLifecycleRecord>>>
};
#[cfg(feature = "macro_debug")]
eprintln!("[match struct anonymity]");
match &mut application_struct.fields {
Named(fields_named) => {
fields_named.named.push(runtime_field);
fields_named.named.push(lifecycle_stream_field);
}
Unnamed(fields_unnamed) => {
fields_unnamed.unnamed.push(runtime_field);
fields_unnamed.unnamed.push(lifecycle_stream_field);
}
Fields::Unit => {
panic!(
"This struct is a unit struct, it should have named or unnamed fields. use struct Something {{}} and not struct Something;"
)
}
};
let all_missions = copper_config.graphs.get_all_missions_graphs();
let mut all_missions_tokens = Vec::<proc_macro2::TokenStream>::new();
for (mission, graph) in &all_missions {
let git_commit_tokens = git_commit_tokens.clone();
let git_dirty_tokens = git_dirty_tokens.clone();
let mission_mod = parse_str::<Ident>(mission.as_str())
.expect("Could not make an identifier of the mission name");
#[cfg(feature = "macro_debug")]
eprintln!("[extract tasks ids & types]");
let task_specs = CuTaskSpecSet::from_graph(graph);
let culist_channel_usage = collect_bridge_channel_usage(graph);
let mut culist_bridge_specs =
build_bridge_specs(&copper_config, graph, &culist_channel_usage);
let (culist_plan, culist_exec_entities, culist_plan_to_original) =
match build_execution_plan(graph, &task_specs, &mut culist_bridge_specs) {
Ok(plan) => plan,
Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
};
let task_names = collect_task_names(graph);
let (culist_call_order, node_output_positions) = collect_culist_metadata(
&culist_plan,
&culist_exec_entities,
&mut culist_bridge_specs,
&culist_plan_to_original,
);
#[cfg(feature = "macro_debug")]
{
eprintln!("[runtime plan for mission {mission}]");
eprintln!("{culist_plan:?}");
}
let culist_support: proc_macro2::TokenStream = gen_culist_support(
&culist_plan,
&culist_call_order,
&node_output_positions,
&task_names,
&culist_bridge_specs,
);
let (
threadpool_bundle_index,
resources_module,
resources_instanciator_fn,
task_resource_mappings,
bridge_resource_mappings,
) = if ignore_resources {
if task_specs.background_flags.iter().any(|&flag| flag) {
return return_error(
"`ignore_resources` cannot be used with background tasks because they require the threadpool resource bundle"
.to_string(),
);
}
let bundle_specs: Vec<BundleSpec> = Vec::new();
let resource_specs: Vec<ResourceKeySpec> = Vec::new();
let (resources_module, resources_instanciator_fn) =
match build_resources_module(&bundle_specs) {
Ok(tokens) => tokens,
Err(e) => return return_error(e.to_string()),
};
let task_resource_mappings =
match build_task_resource_mappings(&resource_specs, &task_specs) {
Ok(tokens) => tokens,
Err(e) => return return_error(e.to_string()),
};
let bridge_resource_mappings =
build_bridge_resource_mappings(&resource_specs, &culist_bridge_specs, sim_mode);
(
None,
resources_module,
resources_instanciator_fn,
task_resource_mappings,
bridge_resource_mappings,
)
} else {
let bundle_specs = match build_bundle_specs(&copper_config, mission.as_str()) {
Ok(specs) => specs,
Err(e) => return return_error(e.to_string()),
};
let threadpool_bundle_index = if task_specs.background_flags.iter().any(|&flag| flag) {
match bundle_specs
.iter()
.position(|bundle| bundle.id == "threadpool")
{
Some(index) => Some(index),
None => {
return return_error(
"Background tasks require the threadpool bundle to be configured"
.to_string(),
);
}
}
} else {
None
};
let resource_specs = match collect_resource_specs(
graph,
&task_specs,
&culist_bridge_specs,
&bundle_specs,
) {
Ok(specs) => specs,
Err(e) => return return_error(e.to_string()),
};
let (resources_module, resources_instanciator_fn) =
match build_resources_module(&bundle_specs) {
Ok(tokens) => tokens,
Err(e) => return return_error(e.to_string()),
};
let task_resource_mappings =
match build_task_resource_mappings(&resource_specs, &task_specs) {
Ok(tokens) => tokens,
Err(e) => return return_error(e.to_string()),
};
let bridge_resource_mappings =
build_bridge_resource_mappings(&resource_specs, &culist_bridge_specs, sim_mode);
(
threadpool_bundle_index,
resources_module,
resources_instanciator_fn,
task_resource_mappings,
bridge_resource_mappings,
)
};
let task_ids = task_specs.ids.clone();
let ids = build_monitored_ids(&task_ids, &mut culist_bridge_specs);
let monitored_component_entries: Vec<proc_macro2::TokenStream> = ids
.iter()
.enumerate()
.map(|(idx, id)| {
let id_lit = LitStr::new(id, Span::call_site());
if idx < task_specs.task_types.len() {
let task_ty = &task_specs.task_types[idx];
let component_type = match task_specs.cutypes[idx] {
CuTaskType::Source => quote! { cu29::monitoring::ComponentType::Source },
CuTaskType::Regular => quote! { cu29::monitoring::ComponentType::Task },
CuTaskType::Sink => quote! { cu29::monitoring::ComponentType::Sink },
};
quote! {
cu29::monitoring::MonitorComponentMetadata::new(
#id_lit,
#component_type,
Some(stringify!(#task_ty)),
)
}
} else {
quote! {
cu29::monitoring::MonitorComponentMetadata::new(
#id_lit,
cu29::monitoring::ComponentType::Bridge,
None,
)
}
}
})
.collect();
let culist_component_mapping = match build_monitor_culist_component_mapping(
&culist_plan,
&culist_exec_entities,
&culist_bridge_specs,
) {
Ok(mapping) => mapping,
Err(e) => return return_error(e),
};
let task_reflect_read_arms: Vec<proc_macro2::TokenStream> = task_specs
.ids
.iter()
.enumerate()
.map(|(index, task_id)| {
let task_index = syn::Index::from(index);
let task_id_lit = LitStr::new(task_id, Span::call_site());
quote! {
#task_id_lit => Some(&self.copper_runtime.tasks.#task_index as &dyn cu29::reflect::Reflect),
}
})
.collect();
let task_reflect_write_arms: Vec<proc_macro2::TokenStream> = task_specs
.ids
.iter()
.enumerate()
.map(|(index, task_id)| {
let task_index = syn::Index::from(index);
let task_id_lit = LitStr::new(task_id, Span::call_site());
quote! {
#task_id_lit => Some(&mut self.copper_runtime.tasks.#task_index as &mut dyn cu29::reflect::Reflect),
}
})
.collect();
let mut reflect_registry_types: BTreeMap<String, Type> = BTreeMap::new();
let mut add_reflect_type = |ty: Type| {
let key = quote! { #ty }.to_string();
reflect_registry_types.entry(key).or_insert(ty);
};
for task_type in &task_specs.task_types {
add_reflect_type(task_type.clone());
}
let mut sim_bridge_channel_decls = Vec::<proc_macro2::TokenStream>::new();
let bridge_runtime_types: Vec<Type> = culist_bridge_specs
.iter()
.map(|spec| {
if sim_mode && !spec.run_in_sim {
let (tx_set_ident, tx_id_ident, rx_set_ident, rx_id_ident) =
sim_bridge_channel_set_idents(spec.tuple_index);
if !spec.tx_channels.is_empty() {
let tx_entries = spec.tx_channels.iter().map(|channel| {
let entry_ident = Ident::new(
&channel.const_ident.to_string().to_lowercase(),
Span::call_site(),
);
let msg_type = &channel.msg_type;
quote! { #entry_ident => #msg_type, }
});
sim_bridge_channel_decls.push(quote! {
cu29::tx_channels! {
pub struct #tx_set_ident : #tx_id_ident {
#(#tx_entries)*
}
}
});
}
if !spec.rx_channels.is_empty() {
let rx_entries = spec.rx_channels.iter().map(|channel| {
let entry_ident = Ident::new(
&channel.const_ident.to_string().to_lowercase(),
Span::call_site(),
);
let msg_type = &channel.msg_type;
quote! { #entry_ident => #msg_type, }
});
sim_bridge_channel_decls.push(quote! {
cu29::rx_channels! {
pub struct #rx_set_ident : #rx_id_ident {
#(#rx_entries)*
}
}
});
}
}
runtime_bridge_type_for_spec(spec, sim_mode)
})
.collect();
let sim_bridge_channel_defs = quote! { #(#sim_bridge_channel_decls)* };
for (bridge_index, bridge_spec) in culist_bridge_specs.iter().enumerate() {
add_reflect_type(bridge_runtime_types[bridge_index].clone());
for channel in bridge_spec
.rx_channels
.iter()
.chain(bridge_spec.tx_channels.iter())
{
add_reflect_type(channel.msg_type.clone());
}
}
for output_pack in extract_output_packs(&culist_plan) {
for msg_type in output_pack.msg_types {
add_reflect_type(msg_type);
}
}
let reflect_type_registration_calls: Vec<proc_macro2::TokenStream> = reflect_registry_types
.values()
.map(|ty| {
quote! {
registry.register::<#ty>();
}
})
.collect();
let bridges_type_tokens: proc_macro2::TokenStream = if bridge_runtime_types.is_empty() {
quote! { () }
} else {
let bridge_types_for_tuple = bridge_runtime_types.clone();
let tuple: TypeTuple = parse_quote! { (#(#bridge_types_for_tuple),*,) };
quote! { #tuple }
};
let bridge_binding_idents: Vec<Ident> = culist_bridge_specs
.iter()
.enumerate()
.map(|(idx, _)| format_ident!("bridge_{idx}"))
.collect();
let bridge_init_statements: Vec<proc_macro2::TokenStream> = culist_bridge_specs
.iter()
.enumerate()
.map(|(idx, spec)| {
let binding_ident = &bridge_binding_idents[idx];
let bridge_mapping_ref = bridge_resource_mappings.refs[idx].clone();
let bridge_type = &bridge_runtime_types[idx];
let bridge_name = spec.id.clone();
let config_index = syn::Index::from(spec.config_index);
let binding_error = LitStr::new(
&format!("Failed to bind resources for bridge '{}'", bridge_name),
Span::call_site(),
);
let tx_configs: Vec<proc_macro2::TokenStream> = spec
.tx_channels
.iter()
.map(|channel| {
let const_ident = &channel.const_ident;
let channel_name = channel.id.clone();
let channel_config_index = syn::Index::from(channel.config_index);
quote! {
{
let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
cu29::config::BridgeChannelConfigRepresentation::Tx { route, config, .. } => {
(route.clone(), config.clone())
}
_ => panic!(
"Bridge '{}' channel '{}' expected to be Tx",
#bridge_name,
#channel_name
),
};
cu29::cubridge::BridgeChannelConfig::from_static(
&<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
channel_route,
channel_config,
)
}
}
})
.collect();
let rx_configs: Vec<proc_macro2::TokenStream> = spec
.rx_channels
.iter()
.map(|channel| {
let const_ident = &channel.const_ident;
let channel_name = channel.id.clone();
let channel_config_index = syn::Index::from(channel.config_index);
quote! {
{
let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
cu29::config::BridgeChannelConfigRepresentation::Rx { route, config, .. } => {
(route.clone(), config.clone())
}
_ => panic!(
"Bridge '{}' channel '{}' expected to be Rx",
#bridge_name,
#channel_name
),
};
cu29::cubridge::BridgeChannelConfig::from_static(
&<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
channel_route,
channel_config,
)
}
}
})
.collect();
quote! {
let #binding_ident = {
let bridge_cfg = config
.bridges
.get(#config_index)
.unwrap_or_else(|| panic!("Bridge '{}' missing from configuration", #bridge_name));
let bridge_mapping = #bridge_mapping_ref;
let bridge_resources = <<#bridge_type as cu29::cubridge::CuBridge>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
bridge_mapping,
)
.map_err(|e| cu29::CuError::new_with_cause(#binding_error, e))?;
let tx_channels: &[cu29::cubridge::BridgeChannelConfig<
<<#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet>::Id,
>] = &[#(#tx_configs),*];
let rx_channels: &[cu29::cubridge::BridgeChannelConfig<
<<#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet>::Id,
>] = &[#(#rx_configs),*];
<#bridge_type as cu29::cubridge::CuBridge>::new(
bridge_cfg.config.as_ref(),
tx_channels,
rx_channels,
bridge_resources,
)?
};
}
})
.collect();
let bridges_instanciator = if culist_bridge_specs.is_empty() {
quote! {
pub fn bridges_instanciator(_config: &CuConfig, resources: &mut ResourceManager) -> CuResult<CuBridges> {
let _ = resources;
Ok(())
}
}
} else {
let bridge_bindings = bridge_binding_idents.clone();
quote! {
pub fn bridges_instanciator(config: &CuConfig, resources: &mut ResourceManager) -> CuResult<CuBridges> {
#(#bridge_init_statements)*
Ok((#(#bridge_bindings),*,))
}
}
};
let all_sim_tasks_types: Vec<Type> = task_specs
.ids
.iter()
.zip(&task_specs.cutypes)
.zip(&task_specs.sim_task_types)
.zip(&task_specs.background_flags)
.zip(&task_specs.run_in_sim_flags)
.zip(task_specs.output_types.iter())
.map(|(((((task_id, task_type), sim_type), background), run_in_sim), output_type)| {
match task_type {
CuTaskType::Source => {
if *background {
panic!("CuSrcTask {task_id} cannot be a background task, it should be a regular task.");
}
if *run_in_sim {
sim_type.clone()
} else {
let msg_type = graph
.get_node_output_msg_type(task_id.as_str())
.unwrap_or_else(|| panic!("CuSrcTask {task_id} should have an outgoing connection with a valid output msg type"));
let sim_task_name = format!("CuSimSrcTask<{msg_type}>");
parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
}
}
CuTaskType::Regular => {
if *background {
if let Some(out_ty) = output_type {
parse_quote!(CuAsyncTask<#sim_type, #out_ty>)
} else {
panic!("{task_id}: If a task is background, it has to have an output");
}
} else {
sim_type.clone()
}
},
CuTaskType::Sink => {
if *background {
panic!("CuSinkTask {task_id} cannot be a background task, it should be a regular task.");
}
if *run_in_sim {
sim_type.clone()
}
else {
let msg_types = graph
.get_node_input_msg_types(task_id.as_str())
.unwrap_or_else(|| panic!("CuSinkTask {task_id} should have an incoming connection with a valid input msg type"));
let msg_type = if msg_types.len() == 1 {
format!("({},)", msg_types[0])
} else {
format!("({})", msg_types.join(", "))
};
let sim_task_name = format!("CuSimSinkTask<{msg_type}>");
parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
}
}
}
})
.collect();
#[cfg(feature = "macro_debug")]
eprintln!("[build task tuples]");
let task_types = &task_specs.task_types;
let task_types_tuple: TypeTuple = if task_types.is_empty() {
parse_quote! { () }
} else {
parse_quote! { (#(#task_types),*,) }
};
let task_types_tuple_sim: TypeTuple = if all_sim_tasks_types.is_empty() {
parse_quote! { () }
} else {
parse_quote! { (#(#all_sim_tasks_types),*,) }
};
#[cfg(feature = "macro_debug")]
eprintln!("[gen instances]");
let task_sim_instances_init_code = all_sim_tasks_types
.iter()
.enumerate()
.map(|(index, ty)| {
let additional_error_info = format!(
"Failed to get create instance for {}, instance index {}.",
task_specs.type_names[index], index
);
let mapping_ref = task_resource_mappings.refs[index].clone();
let background = task_specs.background_flags[index];
let inner_task_type = &task_specs.sim_task_types[index];
match task_specs.cutypes[index] {
CuTaskType::Source => quote! {
{
let resources = <<#ty as CuSrcTask>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
#mapping_ref,
).map_err(|e| e.add_cause(#additional_error_info))?;
<#ty as CuSrcTask>::new(all_instances_configs[#index], resources)
.map_err(|e| e.add_cause(#additional_error_info))?
}
},
CuTaskType::Regular => {
if background {
let threadpool_bundle_index = threadpool_bundle_index
.expect("threadpool bundle missing for background tasks");
quote! {
{
let inner_resources = <<#inner_task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
#mapping_ref,
).map_err(|e| e.add_cause(#additional_error_info))?;
let threadpool_key = cu29::resource::ResourceKey::new(
cu29::resource::BundleIndex::new(#threadpool_bundle_index),
<cu29::resource::ThreadPoolBundle as cu29::resource::ResourceBundleDecl>::Id::BgThreads as usize,
);
let threadpool = resources.borrow_shared_arc(threadpool_key)?;
let resources = cu29::cuasynctask::CuAsyncTaskResources {
inner: inner_resources,
threadpool,
};
<#ty as CuTask>::new(all_instances_configs[#index], resources)
.map_err(|e| e.add_cause(#additional_error_info))?
}
}
} else {
quote! {
{
let resources = <<#ty as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
#mapping_ref,
).map_err(|e| e.add_cause(#additional_error_info))?;
<#ty as CuTask>::new(all_instances_configs[#index], resources)
.map_err(|e| e.add_cause(#additional_error_info))?
}
}
}
}
CuTaskType::Sink => quote! {
{
let resources = <<#ty as CuSinkTask>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
#mapping_ref,
).map_err(|e| e.add_cause(#additional_error_info))?;
<#ty as CuSinkTask>::new(all_instances_configs[#index], resources)
.map_err(|e| e.add_cause(#additional_error_info))?
}
},
}
})
.collect::<Vec<_>>();
let task_instances_init_code = task_specs
.instantiation_types
.iter()
.zip(&task_specs.background_flags)
.enumerate()
.map(|(index, (task_type, background))| {
let additional_error_info = format!(
"Failed to get create instance for {}, instance index {}.",
task_specs.type_names[index], index
);
let mapping_ref = task_resource_mappings.refs[index].clone();
let inner_task_type = &task_specs.sim_task_types[index];
match task_specs.cutypes[index] {
CuTaskType::Source => quote! {
{
let resources = <<#task_type as CuSrcTask>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
#mapping_ref,
).map_err(|e| e.add_cause(#additional_error_info))?;
<#task_type as CuSrcTask>::new(all_instances_configs[#index], resources)
.map_err(|e| e.add_cause(#additional_error_info))?
}
},
CuTaskType::Regular => {
if *background {
let threadpool_bundle_index = threadpool_bundle_index
.expect("threadpool bundle missing for background tasks");
quote! {
{
let inner_resources = <<#inner_task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
#mapping_ref,
).map_err(|e| e.add_cause(#additional_error_info))?;
let threadpool_key = cu29::resource::ResourceKey::new(
cu29::resource::BundleIndex::new(#threadpool_bundle_index),
<cu29::resource::ThreadPoolBundle as cu29::resource::ResourceBundleDecl>::Id::BgThreads as usize,
);
let threadpool = resources.borrow_shared_arc(threadpool_key)?;
let resources = cu29::cuasynctask::CuAsyncTaskResources {
inner: inner_resources,
threadpool,
};
<#task_type as CuTask>::new(all_instances_configs[#index], resources)
.map_err(|e| e.add_cause(#additional_error_info))?
}
}
} else {
quote! {
{
let resources = <<#task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
#mapping_ref,
).map_err(|e| e.add_cause(#additional_error_info))?;
<#task_type as CuTask>::new(all_instances_configs[#index], resources)
.map_err(|e| e.add_cause(#additional_error_info))?
}
}
}
}
CuTaskType::Sink => quote! {
{
let resources = <<#task_type as CuSinkTask>::Resources<'_> as ResourceBindings>::from_bindings(
resources,
#mapping_ref,
).map_err(|e| e.add_cause(#additional_error_info))?;
<#task_type as CuSinkTask>::new(all_instances_configs[#index], resources)
.map_err(|e| e.add_cause(#additional_error_info))?
}
},
}
})
.collect::<Vec<_>>();
let (
task_restore_code,
task_start_calls,
task_stop_calls,
task_preprocess_calls,
task_postprocess_calls,
): (Vec<_>, Vec<_>, Vec<_>, Vec<_>, Vec<_>) = itertools::multiunzip(
(0..task_specs.task_types.len())
.map(|index| {
let task_index = int2sliceindex(index as u32);
let task_tuple_index = syn::Index::from(index);
let task_enum_name = config_id_to_enum(&task_specs.ids[index]);
let enum_name = Ident::new(&task_enum_name, Span::call_site());
(
quote! {
tasks.#task_tuple_index.thaw(&mut decoder).map_err(|e| CuError::from("Failed to thaw").add_cause(&e.to_string()))?
},
{ let monitoring_action = quote! {
let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#index), CuComponentState::Start, &error);
match decision {
Decision::Abort => {
debug!("Start: ABORT decision from monitoring. Component '{}' errored out \
during start. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
return Ok(());
}
Decision::Ignore => {
debug!("Start: IGNORE decision from monitoring. Component '{}' errored out \
during start. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
}
Decision::Shutdown => {
debug!("Start: SHUTDOWN decision from monitoring. Component '{}' errored out \
during start. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
return Err(CuError::new_with_cause("Component errored out during start.", error));
}
}
};
let call_sim_callback = if sim_mode {
quote! {
let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Start));
let doit = if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
#monitoring_action
false
}
else {
ovr == SimOverride::ExecuteByRuntime
};
}
} else {
quote! {
let doit = true; }
};
quote! {
#call_sim_callback
if doit {
self.copper_runtime.record_execution_marker(
cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#index),
step: CuComponentState::Start,
culistid: None,
}
);
let task = &mut self.copper_runtime.tasks.#task_index;
ctx.set_current_task(#index);
if let Err(error) = task.start(&ctx) {
#monitoring_action
}
}
}
},
{ let monitoring_action = quote! {
let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#index), CuComponentState::Stop, &error);
match decision {
Decision::Abort => {
debug!("Stop: ABORT decision from monitoring. Component '{}' errored out \
during stop. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
return Ok(());
}
Decision::Ignore => {
debug!("Stop: IGNORE decision from monitoring. Component '{}' errored out \
during stop. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
}
Decision::Shutdown => {
debug!("Stop: SHUTDOWN decision from monitoring. Component '{}' errored out \
during stop. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
return Err(CuError::new_with_cause("Component errored out during stop.", error));
}
}
};
let call_sim_callback = if sim_mode {
quote! {
let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Stop));
let doit = if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
#monitoring_action
false
}
else {
ovr == SimOverride::ExecuteByRuntime
};
}
} else {
quote! {
let doit = true; }
};
quote! {
#call_sim_callback
if doit {
self.copper_runtime.record_execution_marker(
cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#index),
step: CuComponentState::Stop,
culistid: None,
}
);
let task = &mut self.copper_runtime.tasks.#task_index;
ctx.set_current_task(#index);
if let Err(error) = task.stop(&ctx) {
#monitoring_action
}
}
}
},
{ let monitoring_action = quote! {
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#index), CuComponentState::Preprocess, &error);
match decision {
Decision::Abort => {
debug!("Preprocess: ABORT decision from monitoring. Component '{}' errored out \
during preprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
return Ok(());
}
Decision::Ignore => {
debug!("Preprocess: IGNORE decision from monitoring. Component '{}' errored out \
during preprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
}
Decision::Shutdown => {
debug!("Preprocess: SHUTDOWN decision from monitoring. Component '{}' errored out \
during preprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
return Err(CuError::new_with_cause("Component errored out during preprocess.", error));
}
}
};
let call_sim_callback = if sim_mode {
quote! {
let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Preprocess));
let doit = if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
#monitoring_action
false
} else {
ovr == SimOverride::ExecuteByRuntime
};
}
} else {
quote! {
let doit = true; }
};
quote! {
#call_sim_callback
if doit {
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#index),
step: CuComponentState::Preprocess,
culistid: None,
});
ctx.set_current_task(#index);
let maybe_error = {
#rt_guard
tasks.#task_index.preprocess(&ctx)
};
if let Err(error) = maybe_error {
#monitoring_action
}
}
}
},
{ let monitoring_action = quote! {
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#index), CuComponentState::Postprocess, &error);
match decision {
Decision::Abort => {
debug!("Postprocess: ABORT decision from monitoring. Component '{}' errored out \
during postprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
return Ok(());
}
Decision::Ignore => {
debug!("Postprocess: IGNORE decision from monitoring. Component '{}' errored out \
during postprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
}
Decision::Shutdown => {
debug!("Postprocess: SHUTDOWN decision from monitoring. Component '{}' errored out \
during postprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#index)));
return Err(CuError::new_with_cause("Component errored out during postprocess.", error));
}
}
};
let call_sim_callback = if sim_mode {
quote! {
let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Postprocess));
let doit = if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
#monitoring_action
false
} else {
ovr == SimOverride::ExecuteByRuntime
};
}
} else {
quote! {
let doit = true; }
};
quote! {
#call_sim_callback
if doit {
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#index),
step: CuComponentState::Postprocess,
culistid: None,
});
ctx.set_current_task(#index);
let maybe_error = {
#rt_guard
tasks.#task_index.postprocess(&ctx)
};
if let Err(error) = maybe_error {
#monitoring_action
}
}
}
}
)
})
);
let bridge_start_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
.iter()
.map(|spec| {
let bridge_index = int2sliceindex(spec.tuple_index as u32);
let monitor_index = syn::Index::from(
spec.monitor_index
.expect("Bridge missing monitor index for start"),
);
let enum_ident = Ident::new(
&config_id_to_enum(&format!("{}_bridge", spec.id)),
Span::call_site(),
);
let call_sim = if sim_mode {
quote! {
let doit = {
let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Start);
let ovr = sim_callback(state);
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Start, &error);
match decision {
Decision::Abort => { debug!("Start: ABORT decision from monitoring. Component '{}' errored out during start. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Ok(()); }
Decision::Ignore => { debug!("Start: IGNORE decision from monitoring. Component '{}' errored out during start. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); false }
Decision::Shutdown => { debug!("Start: SHUTDOWN decision from monitoring. Component '{}' errored out during start. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Err(CuError::new_with_cause("Component errored out during start.", error)); }
}
} else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
quote! {
{
#call_sim
if !doit { return Ok(()); }
self.copper_runtime.record_execution_marker(
cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#monitor_index),
step: CuComponentState::Start,
culistid: None,
}
);
ctx.clear_current_task();
let bridge = &mut self.copper_runtime.bridges.#bridge_index;
if let Err(error) = bridge.start(&ctx) {
let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Start, &error);
match decision {
Decision::Abort => {
debug!("Start: ABORT decision from monitoring. Component '{}' errored out during start. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Ok(());
}
Decision::Ignore => {
debug!("Start: IGNORE decision from monitoring. Component '{}' errored out during start. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
}
Decision::Shutdown => {
debug!("Start: SHUTDOWN decision from monitoring. Component '{}' errored out during start. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Err(CuError::new_with_cause("Component errored out during start.", error));
}
}
}
}
}
})
.collect();
let bridge_stop_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
.iter()
.map(|spec| {
let bridge_index = int2sliceindex(spec.tuple_index as u32);
let monitor_index = syn::Index::from(
spec.monitor_index
.expect("Bridge missing monitor index for stop"),
);
let enum_ident = Ident::new(
&config_id_to_enum(&format!("{}_bridge", spec.id)),
Span::call_site(),
);
let call_sim = if sim_mode {
quote! {
let doit = {
let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Stop);
let ovr = sim_callback(state);
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Stop, &error);
match decision {
Decision::Abort => { debug!("Stop: ABORT decision from monitoring. Component '{}' errored out during stop. Aborting all the other stops.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Ok(()); }
Decision::Ignore => { debug!("Stop: IGNORE decision from monitoring. Component '{}' errored out during stop. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); false }
Decision::Shutdown => { debug!("Stop: SHUTDOWN decision from monitoring. Component '{}' errored out during stop. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Err(CuError::new_with_cause("Component errored out during stop.", error)); }
}
} else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
quote! {
{
#call_sim
if !doit { return Ok(()); }
self.copper_runtime.record_execution_marker(
cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#monitor_index),
step: CuComponentState::Stop,
culistid: None,
}
);
ctx.clear_current_task();
let bridge = &mut self.copper_runtime.bridges.#bridge_index;
if let Err(error) = bridge.stop(&ctx) {
let decision = self.copper_runtime.monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Stop, &error);
match decision {
Decision::Abort => {
debug!("Stop: ABORT decision from monitoring. Component '{}' errored out during stop. Aborting all the other stops.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Ok(());
}
Decision::Ignore => {
debug!("Stop: IGNORE decision from monitoring. Component '{}' errored out during stop. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
}
Decision::Shutdown => {
debug!("Stop: SHUTDOWN decision from monitoring. Component '{}' errored out during stop. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Err(CuError::new_with_cause("Component errored out during stop.", error));
}
}
}
}
}
})
.collect();
let bridge_preprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
.iter()
.map(|spec| {
let bridge_index = int2sliceindex(spec.tuple_index as u32);
let monitor_index = syn::Index::from(
spec.monitor_index
.expect("Bridge missing monitor index for preprocess"),
);
let enum_ident = Ident::new(
&config_id_to_enum(&format!("{}_bridge", spec.id)),
Span::call_site(),
);
let call_sim = if sim_mode {
quote! {
let doit = {
let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Preprocess);
let ovr = sim_callback(state);
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Preprocess, &error);
match decision {
Decision::Abort => { debug!("Preprocess: ABORT decision from monitoring. Component '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Ok(()); }
Decision::Ignore => { debug!("Preprocess: IGNORE decision from monitoring. Component '{}' errored out during preprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); false }
Decision::Shutdown => { debug!("Preprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Err(CuError::new_with_cause("Component errored out during preprocess.", error)); }
}
} else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
quote! {
{
#call_sim
if doit {
ctx.clear_current_task();
let bridge = &mut __cu_bridges.#bridge_index;
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#monitor_index),
step: CuComponentState::Preprocess,
culistid: None,
});
let maybe_error = {
#rt_guard
bridge.preprocess(&ctx)
};
if let Err(error) = maybe_error {
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Preprocess, &error);
match decision {
Decision::Abort => {
debug!("Preprocess: ABORT decision from monitoring. Component '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Ok(());
}
Decision::Ignore => {
debug!("Preprocess: IGNORE decision from monitoring. Component '{}' errored out during preprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
}
Decision::Shutdown => {
debug!("Preprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Err(CuError::new_with_cause("Component errored out during preprocess.", error));
}
}
}
}
}
}
})
.collect();
let bridge_postprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
.iter()
.map(|spec| {
let bridge_index = int2sliceindex(spec.tuple_index as u32);
let monitor_index = syn::Index::from(
spec.monitor_index
.expect("Bridge missing monitor index for postprocess"),
);
let enum_ident = Ident::new(
&config_id_to_enum(&format!("{}_bridge", spec.id)),
Span::call_site(),
);
let call_sim = if sim_mode {
quote! {
let doit = {
let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Postprocess);
let ovr = sim_callback(state);
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Postprocess, &error);
match decision {
Decision::Abort => { debug!("Postprocess: ABORT decision from monitoring. Component '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Ok(()); }
Decision::Ignore => { debug!("Postprocess: IGNORE decision from monitoring. Component '{}' errored out during postprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); false }
Decision::Shutdown => { debug!("Postprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index))); return Err(CuError::new_with_cause("Component errored out during postprocess.", error)); }
}
} else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
quote! {
{
#call_sim
if doit {
ctx.clear_current_task();
let bridge = &mut __cu_bridges.#bridge_index;
kf_manager.freeze_any(clid, bridge)?;
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#monitor_index),
step: CuComponentState::Postprocess,
culistid: Some(clid),
});
let maybe_error = {
#rt_guard
bridge.postprocess(&ctx)
};
if let Err(error) = maybe_error {
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Postprocess, &error);
match decision {
Decision::Abort => {
debug!("Postprocess: ABORT decision from monitoring. Component '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Ok(());
}
Decision::Ignore => {
debug!("Postprocess: IGNORE decision from monitoring. Component '{}' errored out during postprocess. The runtime will continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
}
Decision::Shutdown => {
debug!("Postprocess: SHUTDOWN decision from monitoring. Component '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Err(CuError::new_with_cause("Component errored out during postprocess.", error));
}
}
}
}
}
}
})
.collect();
let mut start_calls = bridge_start_calls;
start_calls.extend(task_start_calls);
let mut stop_calls = task_stop_calls;
stop_calls.extend(bridge_stop_calls);
let mut preprocess_calls = bridge_preprocess_calls;
preprocess_calls.extend(task_preprocess_calls);
let mut postprocess_calls = task_postprocess_calls;
postprocess_calls.extend(bridge_postprocess_calls);
let bridge_restore_code: Vec<proc_macro2::TokenStream> = culist_bridge_specs
.iter()
.enumerate()
.map(|(index, _)| {
let bridge_tuple_index = syn::Index::from(index);
quote! {
__cu_bridges.#bridge_tuple_index
.thaw(&mut decoder)
.map_err(|e| CuError::from("Failed to thaw bridge").add_cause(&e.to_string()))?
}
})
.collect();
let output_pack_sizes = collect_output_pack_sizes(&culist_plan);
let runtime_plan_code_and_logging: Vec<(
proc_macro2::TokenStream,
proc_macro2::TokenStream,
)> = culist_plan
.steps
.iter()
.map(|unit| match unit {
CuExecutionUnit::Step(step) => {
#[cfg(feature = "macro_debug")]
eprintln!(
"{} -> {} as {:?}. task_id: {} Input={:?}, Output={:?}",
step.node.get_id(),
step.node.get_type(),
step.task_type,
step.node_id,
step.input_msg_indices_types,
step.output_msg_pack
);
match &culist_exec_entities[step.node_id as usize].kind {
ExecutionEntityKind::Task { task_index } => generate_task_execution_tokens(
step,
*task_index,
&task_specs,
&output_pack_sizes,
sim_mode,
&mission_mod,
),
ExecutionEntityKind::BridgeRx {
bridge_index,
channel_index,
} => {
let spec = &culist_bridge_specs[*bridge_index];
generate_bridge_rx_execution_tokens(
step,
spec,
*channel_index,
&mission_mod,
sim_mode,
)
}
ExecutionEntityKind::BridgeTx {
bridge_index,
channel_index,
} => {
let spec = &culist_bridge_specs[*bridge_index];
generate_bridge_tx_execution_tokens(
step,
spec,
*channel_index,
&output_pack_sizes,
&mission_mod,
sim_mode,
)
}
}
}
CuExecutionUnit::Loop(_) => {
panic!("Execution loops are not supported in runtime generation");
}
})
.collect();
let sim_support = if sim_mode {
Some(gen_sim_support(
&culist_plan,
&culist_exec_entities,
&culist_bridge_specs,
))
} else {
None
};
let (new, run_one_iteration, start_all_tasks, stop_all_tasks, run) = if sim_mode {
(
quote! {
fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<Self>
},
quote! {
fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
},
quote! {
fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
},
quote! {
fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
},
quote! {
fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
},
)
} else {
(
if std {
quote! {
fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>) -> CuResult<Self>
}
} else {
quote! {
fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>) -> CuResult<Self>
}
},
quote! {
fn run_one_iteration(&mut self) -> CuResult<()>
},
quote! {
fn start_all_tasks(&mut self) -> CuResult<()>
},
quote! {
fn stop_all_tasks(&mut self) -> CuResult<()>
},
quote! {
fn run(&mut self) -> CuResult<()>
},
)
};
let sim_callback_arg = if sim_mode {
Some(quote!(sim_callback))
} else {
None
};
let app_trait = if sim_mode {
quote!(CuSimApplication)
} else {
quote!(CuApplication)
};
let sim_callback_on_new_calls = task_specs.ids.iter().enumerate().map(|(i, id)| {
let enum_name = config_id_to_enum(id);
let enum_ident = Ident::new(&enum_name, Span::call_site());
quote! {
sim_callback(SimStep::#enum_ident(CuTaskCallbackState::New(all_instances_configs[#i].cloned())));
}
});
let sim_callback_on_new_bridges = culist_bridge_specs.iter().map(|spec| {
let enum_ident = Ident::new(
&config_id_to_enum(&format!("{}_bridge", spec.id)),
Span::call_site(),
);
let cfg_index = syn::Index::from(spec.config_index);
quote! {
sim_callback(SimStep::#enum_ident(
cu29::simulation::CuBridgeLifecycleState::New(config.bridges[#cfg_index].config.clone())
));
}
});
let sim_callback_on_new = if sim_mode {
Some(quote! {
let graph = config.get_graph(Some(#mission)).expect("Could not find the mission #mission");
let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
.get_all_nodes()
.iter()
.map(|(_, node)| node.get_instance_config())
.collect();
#(#sim_callback_on_new_calls)*
#(#sim_callback_on_new_bridges)*
})
} else {
None
};
let (runtime_plan_code, preprocess_logging_calls): (Vec<_>, Vec<_>) =
itertools::multiunzip(runtime_plan_code_and_logging);
let config_load_stmt = if std {
quote! {
let (config, config_source) = if let Some(overridden_config) = config_override {
debug!("CuConfig: Overridden programmatically.");
(overridden_config, RuntimeLifecycleConfigSource::ProgrammaticOverride)
} else if ::std::path::Path::new(config_filename).exists() {
debug!("CuConfig: Reading configuration from file: {}", config_filename);
(
cu29::config::read_configuration(config_filename)?,
RuntimeLifecycleConfigSource::ExternalFile,
)
} else {
let original_config = Self::original_config();
debug!("CuConfig: Using the bundled configuration compiled into the binary.");
(
cu29::config::read_configuration_str(original_config, None)?,
RuntimeLifecycleConfigSource::BundledDefault,
)
};
}
} else {
quote! {
let original_config = Self::original_config();
debug!("CuConfig: Using the bundled configuration compiled into the binary.");
let config = cu29::config::read_configuration_str(original_config, None)?;
let config_source = RuntimeLifecycleConfigSource::BundledDefault;
}
};
let init_resources_sig = if std {
quote! {
pub fn init_resources(config_override: Option<CuConfig>) -> CuResult<AppResources>
}
} else {
quote! {
pub fn init_resources() -> CuResult<AppResources>
}
};
let init_resources_call = if std {
quote! { Self::init_resources(config_override)? }
} else {
quote! { Self::init_resources()? }
};
let new_with_resources_sig = if sim_mode {
quote! {
pub fn new_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
clock: RobotClock,
unified_logger: Arc<Mutex<L>>,
app_resources: AppResources,
sim_callback: &mut impl FnMut(SimStep) -> SimOverride,
) -> CuResult<Self>
}
} else {
quote! {
pub fn new_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
clock: RobotClock,
unified_logger: Arc<Mutex<L>>,
app_resources: AppResources,
) -> CuResult<Self>
}
};
let new_with_resources_call = if sim_mode {
quote! { Self::new_with_resources(clock, unified_logger, app_resources, sim_callback) }
} else {
quote! { Self::new_with_resources(clock, unified_logger, app_resources) }
};
let kill_handler = if std && signal_handler {
Some(quote! {
ctrlc::set_handler(move || {
STOP_FLAG.store(true, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler");
})
} else {
None
};
let run_loop = if std {
quote! {
loop {
let iter_start = cu29::curuntime::perf_now(&self.copper_runtime.clock);
let result = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(
|| <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg)
)) {
Ok(result) => result,
Err(payload) => {
let panic_message = cu29::monitoring::panic_payload_to_string(payload.as_ref());
self.copper_runtime.monitor.process_panic(&panic_message);
let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::Panic {
message: panic_message.clone(),
file: None,
line: None,
column: None,
});
Err(CuError::from(format!(
"Panic while running one iteration: {}",
panic_message
)))
}
};
if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
let period: CuDuration = (1_000_000_000u64 / rate).into();
let elapsed = cu29::curuntime::perf_now(&self.copper_runtime.clock) - iter_start;
if elapsed < period {
std::thread::sleep(std::time::Duration::from_nanos(period.as_nanos() - elapsed.as_nanos()));
}
}
if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
break result;
}
}
}
} else {
quote! {
loop {
let iter_start = cu29::curuntime::perf_now(&self.copper_runtime.clock);
let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
let period: CuDuration = (1_000_000_000u64 / rate).into();
let elapsed = cu29::curuntime::perf_now(&self.copper_runtime.clock) - iter_start;
if elapsed < period {
busy_wait_for(period - elapsed);
}
}
if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
break result;
}
}
}
};
#[cfg(feature = "macro_debug")]
eprintln!("[build the run methods]");
let run_methods: proc_macro2::TokenStream = quote! {
#run_one_iteration {
let runtime = &mut self.copper_runtime;
let clock = &runtime.clock;
let execution_probe = &runtime.execution_probe;
let monitor = &mut runtime.monitor;
let tasks = &mut runtime.tasks;
let __cu_bridges = &mut runtime.bridges;
let cl_manager = &mut runtime.copperlists_manager;
let kf_manager = &mut runtime.keyframes_manager;
let iteration_clid = cl_manager.inner.next_cl_id();
let mut ctx = cu29::context::CuContext::builder(clock.clone())
.cl_id(iteration_clid)
.task_ids(#mission_mod::TASK_IDS)
.build();
#(#preprocess_calls)*
let culist = cl_manager.inner.create().expect("Ran out of space for copper lists"); let clid = culist.id;
debug_assert_eq!(clid, iteration_clid);
kf_manager.reset(clid, clock); culist.change_state(cu29::copperlist::CopperListState::Processing);
culist.msgs.init_zeroed();
let mut ctx = cu29::context::CuContext::builder(clock.clone())
.cl_id(iteration_clid)
.task_ids(#mission_mod::TASK_IDS)
.build();
{
let msgs = &mut culist.msgs.0;
#(#runtime_plan_code)*
} let (raw_payload_bytes, handle_bytes) = #mission_mod::compute_payload_bytes(&culist);
ctx.clear_current_task();
let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
#(#preprocess_logging_calls)*
cl_manager.end_of_processing(clid)?;
kf_manager.end_of_processing(clid)?;
monitor_result?;
let stats = cu29::monitoring::CopperListIoStats {
raw_culist_bytes: core::mem::size_of::<CuList>() as u64 + raw_payload_bytes,
handle_bytes,
encoded_culist_bytes: cl_manager.last_encoded_bytes,
keyframe_bytes: kf_manager.last_encoded_bytes,
structured_log_bytes_total: ::cu29::prelude::structured_log_bytes_total(),
culistid: clid,
};
monitor.observe_copperlist_io(stats);
#(#postprocess_calls)*
Ok(())
}
fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
let runtime = &mut self.copper_runtime;
let clock = &runtime.clock;
let tasks = &mut runtime.tasks;
let __cu_bridges = &mut runtime.bridges;
let config = cu29::bincode::config::standard();
let reader = cu29::bincode::de::read::SliceReader::new(&keyframe.serialized_tasks);
let mut decoder = DecoderImpl::new(reader, config, ());
#(#task_restore_code);*;
#(#bridge_restore_code);*;
Ok(())
}
#start_all_tasks {
let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::MissionStarted {
mission: #mission.to_string(),
});
let lifecycle_clid = self.copper_runtime.copperlists_manager.inner.last_cl_id();
let mut ctx = cu29::context::CuContext::builder(self.copper_runtime.clock.clone())
.cl_id(lifecycle_clid)
.task_ids(#mission_mod::TASK_IDS)
.build();
#(#start_calls)*
ctx.clear_current_task();
self.copper_runtime.monitor.start(&ctx)?;
Ok(())
}
#stop_all_tasks {
let lifecycle_clid = self.copper_runtime.copperlists_manager.inner.last_cl_id();
let mut ctx = cu29::context::CuContext::builder(self.copper_runtime.clock.clone())
.cl_id(lifecycle_clid)
.task_ids(#mission_mod::TASK_IDS)
.build();
#(#stop_calls)*
ctx.clear_current_task();
self.copper_runtime.monitor.stop(&ctx)?;
let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::MissionStopped {
mission: #mission.to_string(),
reason: "stop_all_tasks".to_string(),
});
Ok(())
}
#run {
static STOP_FLAG: AtomicBool = AtomicBool::new(false);
#kill_handler
<Self as #app_trait<S, L>>::start_all_tasks(self, #sim_callback_arg)?;
let result = #run_loop;
if result.is_err() {
error!("A task errored out: {}", &result);
}
<Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
let _ = self.log_shutdown_completed();
result
}
};
let tasks_type = if sim_mode {
quote!(CuSimTasks)
} else {
quote!(CuTasks)
};
let tasks_instanciator_fn = if sim_mode {
quote!(tasks_instanciator_sim)
} else {
quote!(tasks_instanciator)
};
let app_impl_decl = if sim_mode {
quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuSimApplication<S, L> for #application_name)
} else {
quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuApplication<S, L> for #application_name)
};
let simstep_type_decl = if sim_mode {
quote!(
type Step<'z> = SimStep<'z>;
)
} else {
quote!()
};
let app_resources_struct = quote! {
pub struct AppResources {
pub config: CuConfig,
pub config_source: RuntimeLifecycleConfigSource,
pub resources: ResourceManager,
}
};
let init_resources_fn = quote! {
#init_resources_sig {
let config_filename = #config_file;
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp init: config file {}", config_filename);
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp init: loading config");
#config_load_stmt
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp init: config loaded");
if let Some(runtime) = &config.runtime {
#[cfg(target_os = "none")]
::cu29::prelude::info!(
"CuApp init: rate_target_hz={}",
runtime.rate_target_hz.unwrap_or(0)
);
} else {
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp init: rate_target_hz=none");
}
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp init: building resources");
let resources = #mission_mod::resources_instanciator(&config)?;
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp init: resources ready");
Ok(AppResources {
config,
config_source,
resources,
})
}
};
let new_with_resources_fn = quote! {
#new_with_resources_sig {
let AppResources {
config,
config_source,
resources,
} = app_resources;
#[cfg(target_os = "none")]
{
let structured_stream = ::cu29::prelude::stream_write::<
::cu29::prelude::CuLogEntry,
S,
>(
unified_logger.clone(),
::cu29::prelude::UnifiedLogType::StructuredLogLine,
4096 * 10,
)?;
let _logger_runtime = ::cu29::prelude::LoggerRuntime::init(
clock.clone(),
structured_stream,
None::<::cu29::prelude::NullLog>,
);
}
let mut default_section_size = size_of::<super::#mission_mod::CuList>() * 64;
if let Some(section_size_mib) = config.logging.as_ref().and_then(|l| l.section_size_mib) {
default_section_size = section_size_mib as usize * 1024usize * 1024usize;
}
#[cfg(target_os = "none")]
::cu29::prelude::info!(
"CuApp new: copperlist section size={}",
default_section_size
);
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp new: creating copperlist stream");
let copperlist_stream = stream_write::<#mission_mod::CuList, S>(
unified_logger.clone(),
UnifiedLogType::CopperList,
default_section_size,
)?;
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp new: copperlist stream ready");
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp new: creating keyframes stream");
let keyframes_stream = stream_write::<KeyFrame, S>(
unified_logger.clone(),
UnifiedLogType::FrozenTasks,
1024 * 1024 * 10, )?;
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp new: keyframes stream ready");
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp new: creating runtime lifecycle stream");
let mut runtime_lifecycle_stream = stream_write::<RuntimeLifecycleRecord, S>(
unified_logger.clone(),
UnifiedLogType::RuntimeLifecycle,
1024 * 64, )?;
let effective_config_ron = config
.serialize_ron()
.unwrap_or_else(|_| "<failed to serialize config>".to_string());
let stack_info = RuntimeLifecycleStackInfo {
app_name: env!("CARGO_PKG_NAME").to_string(),
app_version: env!("CARGO_PKG_VERSION").to_string(),
git_commit: #git_commit_tokens,
git_dirty: #git_dirty_tokens,
};
runtime_lifecycle_stream.log(&RuntimeLifecycleRecord {
timestamp: clock.now(),
event: RuntimeLifecycleEvent::Instantiated {
config_source,
effective_config_ron,
stack: stack_info,
},
})?;
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp new: runtime lifecycle stream ready");
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp new: building runtime");
let copper_runtime = CuRuntime::<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>::new_with_resources(
clock,
&config,
#mission,
resources,
#mission_mod::#tasks_instanciator_fn,
#mission_mod::MONITORED_COMPONENTS,
#mission_mod::CULIST_COMPONENT_MAPPING,
#mission_mod::monitor_instanciator,
#mission_mod::bridges_instanciator,
copperlist_stream,
keyframes_stream)?;
#[cfg(target_os = "none")]
::cu29::prelude::info!("CuApp new: runtime built");
let application = Ok(#application_name {
copper_runtime,
runtime_lifecycle_stream: Some(Box::new(runtime_lifecycle_stream)),
});
#sim_callback_on_new
application
}
};
let app_inherent_impl = quote! {
impl #application_name {
pub fn original_config() -> String {
#copper_config_content.to_string()
}
pub fn register_reflect_types(registry: &mut cu29::reflect::TypeRegistry) {
#(#reflect_type_registration_calls)*
}
pub fn log_runtime_lifecycle_event(
&mut self,
event: RuntimeLifecycleEvent,
) -> CuResult<()> {
let timestamp = self.copper_runtime.clock.now();
let Some(stream) = self.runtime_lifecycle_stream.as_mut() else {
return Err(CuError::from("Runtime lifecycle stream is not initialized"));
};
stream.log(&RuntimeLifecycleRecord { timestamp, event })
}
pub fn log_shutdown_completed(&mut self) -> CuResult<()> {
self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::ShutdownCompleted)
}
#init_resources_fn
#new_with_resources_fn
#[inline]
pub fn copper_runtime_mut(&mut self) -> &mut CuRuntime<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB> {
&mut self.copper_runtime
}
}
};
let app_reflect_impl = quote! {
impl cu29::reflect::ReflectTaskIntrospection for #application_name {
fn reflect_task(&self, task_id: &str) -> Option<&dyn cu29::reflect::Reflect> {
match task_id {
#(#task_reflect_read_arms)*
_ => None,
}
}
fn reflect_task_mut(
&mut self,
task_id: &str,
) -> Option<&mut dyn cu29::reflect::Reflect> {
match task_id {
#(#task_reflect_write_arms)*
_ => None,
}
}
fn register_reflect_types(registry: &mut cu29::reflect::TypeRegistry) {
#application_name::register_reflect_types(registry);
}
}
};
#[cfg(feature = "std")]
#[cfg(feature = "macro_debug")]
eprintln!("[build result]");
let application_impl = quote! {
#app_impl_decl {
#simstep_type_decl
#new {
let app_resources = #init_resources_call;
#new_with_resources_call
}
fn get_original_config() -> String {
Self::original_config()
}
#run_methods
}
};
let (
builder_struct,
builder_new,
builder_impl,
builder_sim_callback_method,
builder_build_sim_callback_arg,
) = if sim_mode {
(
quote! {
#[allow(dead_code)]
pub struct #builder_name <'a, F> {
clock: Option<RobotClock>,
unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
config_override: Option<CuConfig>,
sim_callback: Option<&'a mut F>
}
},
quote! {
#[allow(dead_code)]
pub fn new() -> Self {
Self {
clock: None,
unified_logger: None,
config_override: None,
sim_callback: None,
}
}
},
quote! {
impl<'a, F> #builder_name <'a, F>
where
F: FnMut(SimStep) -> SimOverride,
},
Some(quote! {
pub fn with_sim_callback(mut self, sim_callback: &'a mut F) -> Self
{
self.sim_callback = Some(sim_callback);
self
}
}),
Some(quote! {
self.sim_callback
.ok_or(CuError::from("Sim callback missing from builder"))?,
}),
)
} else {
(
quote! {
#[allow(dead_code)]
pub struct #builder_name {
clock: Option<RobotClock>,
unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
config_override: Option<CuConfig>,
}
},
quote! {
#[allow(dead_code)]
pub fn new() -> Self {
Self {
clock: None,
unified_logger: None,
config_override: None,
}
}
},
quote! {
impl #builder_name
},
None,
None,
)
};
let std_application_impl = if sim_mode {
Some(quote! {
impl #application_name {
pub fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
<Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self, sim_callback)
}
pub fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
<Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self, sim_callback)
}
pub fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
<Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self, sim_callback)
}
pub fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
<Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self, sim_callback)
}
}
})
} else if std {
Some(quote! {
impl #application_name {
pub fn start_all_tasks(&mut self) -> CuResult<()> {
<Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self)
}
pub fn run_one_iteration(&mut self) -> CuResult<()> {
<Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self)
}
pub fn run(&mut self) -> CuResult<()> {
<Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self)
}
pub fn stop_all_tasks(&mut self) -> CuResult<()> {
<Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self)
}
}
})
} else {
None };
let application_builder = if std {
Some(quote! {
#builder_struct
#builder_impl
{
#builder_new
#[allow(dead_code)]
pub fn with_clock(mut self, clock: RobotClock) -> Self {
self.clock = Some(clock);
self
}
#[allow(dead_code)]
pub fn with_unified_logger(mut self, unified_logger: Arc<Mutex<UnifiedLoggerWrite>>) -> Self {
self.unified_logger = Some(unified_logger);
self
}
#[allow(dead_code)]
pub fn with_context(mut self, copper_ctx: &CopperContext) -> Self {
self.clock = Some(copper_ctx.clock.clone());
self.unified_logger = Some(copper_ctx.unified_logger.clone());
self
}
#[allow(dead_code)]
pub fn with_config(mut self, config_override: CuConfig) -> Self {
self.config_override = Some(config_override);
self
}
#builder_sim_callback_method
#[allow(dead_code)]
pub fn build(self) -> CuResult<#application_name> {
#application_name::new(
self.clock
.ok_or(CuError::from("Clock missing from builder"))?,
self.unified_logger
.ok_or(CuError::from("Unified logger missing from builder"))?,
self.config_override,
#builder_build_sim_callback_arg
)
}
}
})
} else {
None
};
let sim_imports = if sim_mode {
Some(quote! {
use cu29::simulation::SimOverride;
use cu29::simulation::CuTaskCallbackState;
use cu29::simulation::CuSimSrcTask;
use cu29::simulation::CuSimSinkTask;
use cu29::simulation::CuSimBridge;
use cu29::prelude::app::CuSimApplication;
use cu29::cubridge::BridgeChannelSet;
})
} else {
None
};
let sim_tasks = if sim_mode {
Some(quote! {
pub type CuSimTasks = #task_types_tuple_sim;
})
} else {
None
};
let sim_inst_body = if task_sim_instances_init_code.is_empty() {
quote! {
let _ = resources;
Ok(())
}
} else {
quote! { Ok(( #(#task_sim_instances_init_code),*, )) }
};
let sim_tasks_instanciator = if sim_mode {
Some(quote! {
pub fn tasks_instanciator_sim(
all_instances_configs: Vec<Option<&ComponentConfig>>,
resources: &mut ResourceManager,
) -> CuResult<CuSimTasks> {
#sim_inst_body
}})
} else {
None
};
let tasks_inst_body_std = if task_instances_init_code.is_empty() {
quote! {
let _ = resources;
Ok(())
}
} else {
quote! { Ok(( #(#task_instances_init_code),*, )) }
};
let tasks_inst_body_nostd = if task_instances_init_code.is_empty() {
quote! {
let _ = resources;
Ok(())
}
} else {
quote! { Ok(( #(#task_instances_init_code),*, )) }
};
let tasks_instanciator = if std {
quote! {
pub fn tasks_instanciator<'c>(
all_instances_configs: Vec<Option<&'c ComponentConfig>>,
resources: &mut ResourceManager,
) -> CuResult<CuTasks> {
#tasks_inst_body_std
}
}
} else {
quote! {
pub fn tasks_instanciator<'c>(
all_instances_configs: Vec<Option<&'c ComponentConfig>>,
resources: &mut ResourceManager,
) -> CuResult<CuTasks> {
#tasks_inst_body_nostd
}
}
};
let imports = if std {
quote! {
use cu29::rayon::ThreadPool;
use cu29::cuasynctask::CuAsyncTask;
use cu29::curuntime::CopperContext;
use cu29::resource::{ResourceBindings, ResourceManager};
use cu29::prelude::SectionStorage;
use cu29::prelude::UnifiedLoggerWrite;
use cu29::prelude::memmap::MmapSectionStorage;
use std::fmt::{Debug, Formatter};
use std::fmt::Result as FmtResult;
use std::mem::size_of;
use std::boxed::Box;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
}
} else {
quote! {
use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::string::String;
use alloc::string::ToString;
use core::sync::atomic::{AtomicBool, Ordering};
use core::fmt::{Debug, Formatter};
use core::fmt::Result as FmtResult;
use core::mem::size_of;
use spin::Mutex;
use cu29::prelude::SectionStorage;
use cu29::resource::{ResourceBindings, ResourceManager};
}
};
let task_mapping_defs = task_resource_mappings.defs.clone();
let bridge_mapping_defs = bridge_resource_mappings.defs.clone();
let mission_mod_tokens = quote! {
mod #mission_mod {
use super::*;
use cu29::bincode::Encode;
use cu29::bincode::enc::Encoder;
use cu29::bincode::error::EncodeError;
use cu29::bincode::Decode;
use cu29::bincode::de::Decoder;
use cu29::bincode::de::DecoderImpl;
use cu29::bincode::error::DecodeError;
use cu29::clock::RobotClock;
use cu29::config::CuConfig;
use cu29::config::ComponentConfig;
use cu29::curuntime::CuRuntime;
use cu29::curuntime::KeyFrame;
use cu29::curuntime::RuntimeLifecycleConfigSource;
use cu29::curuntime::RuntimeLifecycleEvent;
use cu29::curuntime::RuntimeLifecycleRecord;
use cu29::curuntime::RuntimeLifecycleStackInfo;
use cu29::CuResult;
use cu29::CuError;
use cu29::cutask::CuSrcTask;
use cu29::cutask::CuSinkTask;
use cu29::cutask::CuTask;
use cu29::cutask::CuMsg;
use cu29::cutask::CuMsgMetadata;
use cu29::copperlist::CopperList;
use cu29::monitoring::CuMonitor; use cu29::monitoring::CuComponentState;
use cu29::monitoring::Decision;
use cu29::prelude::app::CuApplication;
use cu29::prelude::debug;
use cu29::prelude::stream_write;
use cu29::prelude::UnifiedLogType;
use cu29::prelude::UnifiedLogWrite;
use cu29::prelude::WriteStream;
#imports
#sim_imports
#[allow(unused_imports)]
use cu29::monitoring::NoMonitor;
pub type CuTasks = #task_types_tuple;
pub type CuBridges = #bridges_type_tokens;
#sim_bridge_channel_defs
#resources_module
#resources_instanciator_fn
#task_mapping_defs
#bridge_mapping_defs
#sim_tasks
#sim_support
#sim_tasks_instanciator
pub const TASK_IDS: &'static [&'static str] = &[#( #task_ids ),*];
pub const MONITORED_COMPONENTS: &'static [cu29::monitoring::MonitorComponentMetadata] =
&[#( #monitored_component_entries ),*];
pub const CULIST_COMPONENT_MAPPING: &'static [cu29::monitoring::ComponentId] =
&[#( cu29::monitoring::ComponentId::new(#culist_component_mapping) ),*];
pub const MONITOR_LAYOUT: cu29::monitoring::CopperListLayout =
cu29::monitoring::CopperListLayout::new(
MONITORED_COMPONENTS,
CULIST_COMPONENT_MAPPING,
);
#[inline]
pub fn monitor_component_label(
component_id: cu29::monitoring::ComponentId,
) -> &'static str {
MONITORED_COMPONENTS[component_id.index()].id()
}
#culist_support
#tasks_instanciator
#bridges_instanciator
pub fn monitor_instanciator(
config: &CuConfig,
metadata: ::cu29::monitoring::CuMonitoringMetadata,
runtime: ::cu29::monitoring::CuMonitoringRuntime,
) -> #monitor_type {
#monitor_instanciator_body
}
#app_resources_struct
pub #application_struct
#app_inherent_impl
#app_reflect_impl
#application_impl
#std_application_impl
#application_builder
}
};
all_missions_tokens.push(mission_mod_tokens);
}
let default_application_tokens = if all_missions.contains_key("default") {
let default_builder = if std {
Some(quote! {
#[allow(unused_imports)]
use default::#builder_name;
})
} else {
None
};
quote! {
#default_builder
#[allow(unused_imports)]
use default::AppResources;
#[allow(unused_imports)]
use default::resources as app_resources;
#[allow(unused_imports)]
use default::#application_name;
}
} else {
quote!() };
let result: proc_macro2::TokenStream = quote! {
#(#all_missions_tokens)*
#default_application_tokens
};
result.into()
}
fn read_config(config_file: &str) -> CuResult<CuConfig> {
let filename = config_full_path(config_file);
read_configuration(filename.as_str())
}
fn config_full_path(config_file: &str) -> String {
let mut config_full_path = utils::caller_crate_root();
config_full_path.push(config_file);
let filename = config_full_path
.as_os_str()
.to_str()
.expect("Could not interpret the config file name");
filename.to_string()
}
fn extract_tasks_output_types(graph: &CuGraph) -> Vec<Option<Type>> {
graph
.get_all_nodes()
.iter()
.map(|(_, node)| {
let id = node.get_id();
let type_str = graph.get_node_output_msg_type(id.as_str());
type_str.map(|type_str| {
parse_str::<Type>(type_str.as_str()).expect("Could not parse output message type.")
})
})
.collect()
}
struct CuTaskSpecSet {
pub ids: Vec<String>,
pub cutypes: Vec<CuTaskType>,
pub background_flags: Vec<bool>,
pub logging_enabled: Vec<bool>,
pub type_names: Vec<String>,
pub task_types: Vec<Type>,
pub instantiation_types: Vec<Type>,
pub sim_task_types: Vec<Type>,
pub run_in_sim_flags: Vec<bool>,
#[allow(dead_code)]
pub output_types: Vec<Option<Type>>,
pub node_id_to_task_index: Vec<Option<usize>>,
}
impl CuTaskSpecSet {
pub fn from_graph(graph: &CuGraph) -> Self {
let all_id_nodes: Vec<(NodeId, &Node)> = graph
.get_all_nodes()
.into_iter()
.filter(|(_, node)| node.get_flavor() == Flavor::Task)
.collect();
let ids = all_id_nodes
.iter()
.map(|(_, node)| node.get_id().to_string())
.collect();
let cutypes = all_id_nodes
.iter()
.map(|(id, _)| find_task_type_for_id(graph, *id))
.collect();
let background_flags: Vec<bool> = all_id_nodes
.iter()
.map(|(_, node)| node.is_background())
.collect();
let logging_enabled: Vec<bool> = all_id_nodes
.iter()
.map(|(_, node)| node.is_logging_enabled())
.collect();
let type_names: Vec<String> = all_id_nodes
.iter()
.map(|(_, node)| node.get_type().to_string())
.collect();
let output_types = extract_tasks_output_types(graph);
let task_types = type_names
.iter()
.zip(background_flags.iter())
.zip(output_types.iter())
.map(|((name, &background), output_type)| {
let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
panic!("Could not transform {name} into a Task Rust type: {error}");
});
if background {
if let Some(output_type) = output_type {
parse_quote!(CuAsyncTask<#name_type, #output_type>)
} else {
panic!("{name}: If a task is background, it has to have an output");
}
} else {
name_type
}
})
.collect();
let instantiation_types = type_names
.iter()
.zip(background_flags.iter())
.zip(output_types.iter())
.map(|((name, &background), output_type)| {
let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
panic!("Could not transform {name} into a Task Rust type: {error}");
});
if background {
if let Some(output_type) = output_type {
parse_quote!(CuAsyncTask::<#name_type, #output_type>)
} else {
panic!("{name}: If a task is background, it has to have an output");
}
} else {
name_type
}
})
.collect();
let sim_task_types = type_names
.iter()
.map(|name| {
parse_str::<Type>(name).unwrap_or_else(|err| {
eprintln!("Could not transform {name} into a Task Rust type.");
panic!("{err}")
})
})
.collect();
let run_in_sim_flags = all_id_nodes
.iter()
.map(|(_, node)| node.is_run_in_sim())
.collect();
let mut node_id_to_task_index = vec![None; graph.node_count()];
for (index, (node_id, _)) in all_id_nodes.iter().enumerate() {
node_id_to_task_index[*node_id as usize] = Some(index);
}
Self {
ids,
cutypes,
background_flags,
logging_enabled,
type_names,
task_types,
instantiation_types,
sim_task_types,
run_in_sim_flags,
output_types,
node_id_to_task_index,
}
}
}
#[derive(Clone)]
struct OutputPack {
msg_types: Vec<Type>,
}
impl OutputPack {
fn slot_type(&self) -> Type {
build_output_slot_type(&self.msg_types)
}
fn is_multi(&self) -> bool {
self.msg_types.len() > 1
}
}
fn build_output_slot_type(msg_types: &[Type]) -> Type {
if msg_types.is_empty() {
parse_quote! { () }
} else if msg_types.len() == 1 {
let msg_type = msg_types.first().unwrap();
parse_quote! { CuMsg<#msg_type> }
} else {
parse_quote! { ( #( CuMsg<#msg_types> ),* ) }
}
}
fn flatten_slot_origin_ids(
output_packs: &[OutputPack],
slot_origin_ids: Vec<Option<String>>,
) -> Vec<String> {
let mut ids = Vec::new();
for (slot, pack) in output_packs.iter().enumerate() {
if pack.msg_types.is_empty() {
continue;
}
let origin = slot_origin_ids
.get(slot)
.and_then(|origin| origin.as_ref())
.unwrap_or_else(|| panic!("Missing slot origin id for copperlist output slot {slot}"));
for _ in 0..pack.msg_types.len() {
ids.push(origin.clone());
}
}
ids
}
fn extract_output_packs(runtime_plan: &CuExecutionLoop) -> Vec<OutputPack> {
let mut packs: Vec<(u32, OutputPack)> = runtime_plan
.steps
.iter()
.filter_map(|unit| match unit {
CuExecutionUnit::Step(step) => {
if let Some(output_pack) = &step.output_msg_pack {
let msg_types: Vec<Type> = output_pack
.msg_types
.iter()
.map(|output_msg_type| {
parse_str::<Type>(output_msg_type.as_str()).unwrap_or_else(|_| {
panic!(
"Could not transform {output_msg_type} into a message Rust type."
)
})
})
.collect();
Some((output_pack.culist_index, OutputPack { msg_types }))
} else {
None
}
}
CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
})
.collect();
packs.sort_by_key(|(index, _)| *index);
packs.into_iter().map(|(_, pack)| pack).collect()
}
fn collect_output_pack_sizes(runtime_plan: &CuExecutionLoop) -> Vec<usize> {
let mut sizes: Vec<(u32, usize)> = runtime_plan
.steps
.iter()
.filter_map(|unit| match unit {
CuExecutionUnit::Step(step) => step
.output_msg_pack
.as_ref()
.map(|output_pack| (output_pack.culist_index, output_pack.msg_types.len())),
CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
})
.collect();
sizes.sort_by_key(|(index, _)| *index);
sizes.into_iter().map(|(_, size)| size).collect()
}
fn build_culist_tuple(slot_types: &[Type]) -> TypeTuple {
if slot_types.is_empty() {
parse_quote! { () }
} else {
parse_quote! { ( #( #slot_types ),* ) }
}
}
fn build_culist_tuple_encode(slot_types: &[Type]) -> ItemImpl {
let indices: Vec<usize> = (0..slot_types.len()).collect();
let encode_fields: Vec<_> = indices
.iter()
.map(|i| {
let idx = syn::Index::from(*i);
quote! { self.0.#idx.encode(encoder)?; }
})
.collect();
parse_quote! {
impl Encode for CuStampedDataSet {
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
#(#encode_fields)*
Ok(())
}
}
}
}
fn build_culist_tuple_decode(slot_types: &[Type]) -> ItemImpl {
let indices: Vec<usize> = (0..slot_types.len()).collect();
let decode_fields: Vec<_> = indices
.iter()
.map(|i| {
let slot_type = &slot_types[*i];
quote! { <#slot_type as Decode<()>>::decode(decoder)? }
})
.collect();
parse_quote! {
impl Decode<()> for CuStampedDataSet {
fn decode<D: Decoder<Context=()>>(decoder: &mut D) -> Result<Self, DecodeError> {
Ok(CuStampedDataSet ((
#(#decode_fields),*
)))
}
}
}
}
fn build_culist_erasedcumsgs(output_packs: &[OutputPack]) -> ItemImpl {
let mut casted_fields: Vec<proc_macro2::TokenStream> = Vec::new();
for (idx, pack) in output_packs.iter().enumerate() {
let slot_index = syn::Index::from(idx);
if pack.is_multi() {
for port_idx in 0..pack.msg_types.len() {
let port_index = syn::Index::from(port_idx);
casted_fields.push(quote! {
&self.0.#slot_index.#port_index as &dyn ErasedCuStampedData
});
}
} else {
casted_fields.push(quote! { &self.0.#slot_index as &dyn ErasedCuStampedData });
}
}
parse_quote! {
impl ErasedCuStampedDataSet for CuStampedDataSet {
fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
vec![
#(#casted_fields),*
]
}
}
}
}
fn build_culist_tuple_debug(slot_types: &[Type]) -> ItemImpl {
let indices: Vec<usize> = (0..slot_types.len()).collect();
let debug_fields: Vec<_> = indices
.iter()
.map(|i| {
let idx = syn::Index::from(*i);
quote! { .field(&self.0.#idx) }
})
.collect();
parse_quote! {
impl Debug for CuStampedDataSet {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
f.debug_tuple("CuStampedDataSet")
#(#debug_fields)*
.finish()
}
}
}
}
fn build_culist_tuple_serialize(slot_types: &[Type]) -> ItemImpl {
let indices: Vec<usize> = (0..slot_types.len()).collect();
let tuple_len = slot_types.len();
let serialize_fields: Vec<_> = indices
.iter()
.map(|i| {
let idx = syn::Index::from(*i);
quote! { &self.0.#idx }
})
.collect();
parse_quote! {
impl Serialize for CuStampedDataSet {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut tuple = serializer.serialize_tuple(#tuple_len)?;
#(tuple.serialize_element(#serialize_fields)?;)*
tuple.end()
}
}
}
}
fn build_culist_tuple_default(slot_types: &[Type]) -> ItemImpl {
let default_fields: Vec<_> = slot_types
.iter()
.map(|slot_type| quote! { <#slot_type as Default>::default() })
.collect();
parse_quote! {
impl Default for CuStampedDataSet {
fn default() -> CuStampedDataSet
{
CuStampedDataSet((
#(#default_fields),*
))
}
}
}
}
fn collect_bridge_channel_usage(graph: &CuGraph) -> HashMap<BridgeChannelKey, String> {
let mut usage = HashMap::new();
for cnx in graph.edges() {
if let Some(channel) = &cnx.src_channel {
let key = BridgeChannelKey {
bridge_id: cnx.src.clone(),
channel_id: channel.clone(),
direction: BridgeChannelDirection::Rx,
};
usage
.entry(key)
.and_modify(|msg| {
if msg != &cnx.msg {
panic!(
"Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
cnx.src, channel, msg, cnx.msg
);
}
})
.or_insert(cnx.msg.clone());
}
if let Some(channel) = &cnx.dst_channel {
let key = BridgeChannelKey {
bridge_id: cnx.dst.clone(),
channel_id: channel.clone(),
direction: BridgeChannelDirection::Tx,
};
usage
.entry(key)
.and_modify(|msg| {
if msg != &cnx.msg {
panic!(
"Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
cnx.dst, channel, msg, cnx.msg
);
}
})
.or_insert(cnx.msg.clone());
}
}
usage
}
fn build_bridge_specs(
config: &CuConfig,
graph: &CuGraph,
channel_usage: &HashMap<BridgeChannelKey, String>,
) -> Vec<BridgeSpec> {
let mut specs = Vec::new();
for (bridge_index, bridge_cfg) in config.bridges.iter().enumerate() {
if graph.get_node_id_by_name(bridge_cfg.id.as_str()).is_none() {
continue;
}
let type_path = parse_str::<Type>(bridge_cfg.type_.as_str()).unwrap_or_else(|err| {
panic!(
"Could not parse bridge type '{}' for '{}': {err}",
bridge_cfg.type_, bridge_cfg.id
)
});
let mut rx_channels = Vec::new();
let mut tx_channels = Vec::new();
for (channel_index, channel) in bridge_cfg.channels.iter().enumerate() {
match channel {
BridgeChannelConfigRepresentation::Rx { id, .. } => {
let key = BridgeChannelKey {
bridge_id: bridge_cfg.id.clone(),
channel_id: id.clone(),
direction: BridgeChannelDirection::Rx,
};
if let Some(msg_type) = channel_usage.get(&key) {
let msg_type_name = msg_type.clone();
let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
panic!(
"Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
bridge_cfg.id, id
)
});
let const_ident =
Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
rx_channels.push(BridgeChannelSpec {
id: id.clone(),
const_ident,
msg_type,
msg_type_name,
config_index: channel_index,
plan_node_id: None,
culist_index: None,
monitor_index: None,
});
}
}
BridgeChannelConfigRepresentation::Tx { id, .. } => {
let key = BridgeChannelKey {
bridge_id: bridge_cfg.id.clone(),
channel_id: id.clone(),
direction: BridgeChannelDirection::Tx,
};
if let Some(msg_type) = channel_usage.get(&key) {
let msg_type_name = msg_type.clone();
let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
panic!(
"Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
bridge_cfg.id, id
)
});
let const_ident =
Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
tx_channels.push(BridgeChannelSpec {
id: id.clone(),
const_ident,
msg_type,
msg_type_name,
config_index: channel_index,
plan_node_id: None,
culist_index: None,
monitor_index: None,
});
}
}
}
}
if rx_channels.is_empty() && tx_channels.is_empty() {
continue;
}
specs.push(BridgeSpec {
id: bridge_cfg.id.clone(),
type_path,
run_in_sim: bridge_cfg.is_run_in_sim(),
config_index: bridge_index,
tuple_index: 0,
monitor_index: None,
rx_channels,
tx_channels,
});
}
for (tuple_index, spec) in specs.iter_mut().enumerate() {
spec.tuple_index = tuple_index;
}
specs
}
fn collect_task_names(graph: &CuGraph) -> Vec<(NodeId, String, String)> {
graph
.get_all_nodes()
.iter()
.filter(|(_, node)| node.get_flavor() == Flavor::Task)
.map(|(node_id, node)| {
(
*node_id,
node.get_id().to_string(),
config_id_to_struct_member(node.get_id().as_str()),
)
})
.collect()
}
#[derive(Clone, Copy)]
enum ResourceOwner {
Task(usize),
Bridge(usize),
}
#[derive(Clone)]
struct ResourceKeySpec {
bundle_index: usize,
provider_path: syn::Path,
resource_name: String,
binding_name: String,
owner: ResourceOwner,
}
fn parse_resource_path(path: &str) -> CuResult<(String, String)> {
let (bundle_id, name) = path.split_once('.').ok_or_else(|| {
CuError::from(format!(
"Resource '{path}' is missing a bundle prefix (expected bundle.resource)"
))
})?;
if bundle_id.is_empty() || name.is_empty() {
return Err(CuError::from(format!(
"Resource '{path}' must use the 'bundle.resource' format"
)));
}
Ok((bundle_id.to_string(), name.to_string()))
}
fn collect_resource_specs(
graph: &CuGraph,
task_specs: &CuTaskSpecSet,
bridge_specs: &[BridgeSpec],
bundle_specs: &[BundleSpec],
) -> CuResult<Vec<ResourceKeySpec>> {
let mut bridge_lookup: BTreeMap<String, usize> = BTreeMap::new();
for (idx, spec) in bridge_specs.iter().enumerate() {
bridge_lookup.insert(spec.id.clone(), idx);
}
let mut bundle_lookup: HashMap<String, (usize, syn::Path)> = HashMap::new();
for (index, bundle) in bundle_specs.iter().enumerate() {
bundle_lookup.insert(bundle.id.clone(), (index, bundle.provider_path.clone()));
}
let mut specs = Vec::new();
for (node_id, node) in graph.get_all_nodes() {
let resources = node.get_resources();
if let Some(resources) = resources {
let task_index = task_specs.node_id_to_task_index[node_id as usize];
let owner = if let Some(task_index) = task_index {
ResourceOwner::Task(task_index)
} else if node.get_flavor() == Flavor::Bridge {
let bridge_index = bridge_lookup.get(&node.get_id()).ok_or_else(|| {
CuError::from(format!(
"Resource mapping attached to unknown bridge node '{}'",
node.get_id()
))
})?;
ResourceOwner::Bridge(*bridge_index)
} else {
return Err(CuError::from(format!(
"Resource mapping attached to non-task node '{}'",
node.get_id()
)));
};
for (binding_name, path) in resources {
let (bundle_id, resource_name) = parse_resource_path(path)?;
let (bundle_index, provider_path) =
bundle_lookup.get(&bundle_id).ok_or_else(|| {
CuError::from(format!(
"Resource '{}' references unknown bundle '{}'",
path, bundle_id
))
})?;
specs.push(ResourceKeySpec {
bundle_index: *bundle_index,
provider_path: provider_path.clone(),
resource_name,
binding_name: binding_name.clone(),
owner,
});
}
}
}
Ok(specs)
}
fn build_bundle_list<'a>(config: &'a CuConfig, mission: &str) -> Vec<&'a ResourceBundleConfig> {
config
.resources
.iter()
.filter(|bundle| {
bundle
.missions
.as_ref()
.is_none_or(|missions| missions.iter().any(|m| m == mission))
})
.collect()
}
struct BundleSpec {
id: String,
provider_path: syn::Path,
}
fn build_bundle_specs(config: &CuConfig, mission: &str) -> CuResult<Vec<BundleSpec>> {
build_bundle_list(config, mission)
.into_iter()
.map(|bundle| {
let provider_path: syn::Path =
syn::parse_str(bundle.provider.as_str()).map_err(|err| {
CuError::from(format!(
"Failed to parse provider path '{}' for bundle '{}': {err}",
bundle.provider, bundle.id
))
})?;
Ok(BundleSpec {
id: bundle.id.clone(),
provider_path,
})
})
.collect()
}
fn build_resources_module(
bundle_specs: &[BundleSpec],
) -> CuResult<(proc_macro2::TokenStream, proc_macro2::TokenStream)> {
let bundle_consts = bundle_specs.iter().enumerate().map(|(index, bundle)| {
let const_ident = Ident::new(
&config_id_to_bridge_const(bundle.id.as_str()),
Span::call_site(),
);
quote! { pub const #const_ident: BundleIndex = BundleIndex::new(#index); }
});
let resources_module = quote! {
pub mod resources {
#![allow(dead_code)]
use cu29::resource::BundleIndex;
pub mod bundles {
use super::BundleIndex;
#(#bundle_consts)*
}
}
};
let bundle_counts = bundle_specs.iter().map(|bundle| {
let provider_path = &bundle.provider_path;
quote! { <#provider_path as cu29::resource::ResourceBundleDecl>::Id::COUNT }
});
let bundle_inits = bundle_specs
.iter()
.enumerate()
.map(|(index, bundle)| {
let bundle_id = LitStr::new(bundle.id.as_str(), Span::call_site());
let provider_path = &bundle.provider_path;
quote! {
let bundle_cfg = config
.resources
.iter()
.find(|b| b.id == #bundle_id)
.unwrap_or_else(|| panic!("Resource bundle '{}' missing from configuration", #bundle_id));
let bundle_ctx = cu29::resource::BundleContext::<#provider_path>::new(
cu29::resource::BundleIndex::new(#index),
#bundle_id,
);
<#provider_path as cu29::resource::ResourceBundle>::build(
bundle_ctx,
bundle_cfg.config.as_ref(),
&mut manager,
)?;
}
})
.collect::<Vec<_>>();
let resources_instanciator = quote! {
pub fn resources_instanciator(config: &CuConfig) -> CuResult<cu29::resource::ResourceManager> {
let bundle_counts: &[usize] = &[ #(#bundle_counts),* ];
let mut manager = cu29::resource::ResourceManager::new(bundle_counts);
#(#bundle_inits)*
Ok(manager)
}
};
Ok((resources_module, resources_instanciator))
}
struct ResourceMappingTokens {
defs: proc_macro2::TokenStream,
refs: Vec<proc_macro2::TokenStream>,
}
fn build_task_resource_mappings(
resource_specs: &[ResourceKeySpec],
task_specs: &CuTaskSpecSet,
) -> CuResult<ResourceMappingTokens> {
let mut per_task: Vec<Vec<&ResourceKeySpec>> = vec![Vec::new(); task_specs.ids.len()];
for spec in resource_specs {
let ResourceOwner::Task(task_index) = spec.owner else {
continue;
};
per_task
.get_mut(task_index)
.ok_or_else(|| {
CuError::from(format!(
"Resource '{}' mapped to invalid task index {}",
spec.binding_name, task_index
))
})?
.push(spec);
}
let mut mapping_defs = Vec::new();
let mut mapping_refs = Vec::new();
for (idx, entries) in per_task.iter().enumerate() {
if entries.is_empty() {
mapping_refs.push(quote! { None });
continue;
}
let binding_task_type = if task_specs.background_flags[idx] {
&task_specs.sim_task_types[idx]
} else {
&task_specs.task_types[idx]
};
let binding_trait = match task_specs.cutypes[idx] {
CuTaskType::Source => quote! { CuSrcTask },
CuTaskType::Regular => quote! { CuTask },
CuTaskType::Sink => quote! { CuSinkTask },
};
let entries_ident = format_ident!("TASK{}_RES_ENTRIES", idx);
let map_ident = format_ident!("TASK{}_RES_MAPPING", idx);
let binding_type = quote! {
<<#binding_task_type as #binding_trait>::Resources<'_> as ResourceBindings>::Binding
};
let entry_tokens = entries.iter().map(|spec| {
let binding_ident =
Ident::new(&config_id_to_enum(spec.binding_name.as_str()), Span::call_site());
let resource_ident =
Ident::new(&config_id_to_enum(spec.resource_name.as_str()), Span::call_site());
let bundle_index = spec.bundle_index;
let provider_path = &spec.provider_path;
quote! {
(#binding_type::#binding_ident, cu29::resource::ResourceKey::new(
cu29::resource::BundleIndex::new(#bundle_index),
<#provider_path as cu29::resource::ResourceBundleDecl>::Id::#resource_ident as usize,
))
}
});
mapping_defs.push(quote! {
const #entries_ident: &[(#binding_type, cu29::resource::ResourceKey)] = &[ #(#entry_tokens),* ];
const #map_ident: cu29::resource::ResourceBindingMap<#binding_type> =
cu29::resource::ResourceBindingMap::new(#entries_ident);
});
mapping_refs.push(quote! { Some(&#map_ident) });
}
Ok(ResourceMappingTokens {
defs: quote! { #(#mapping_defs)* },
refs: mapping_refs,
})
}
fn build_bridge_resource_mappings(
resource_specs: &[ResourceKeySpec],
bridge_specs: &[BridgeSpec],
sim_mode: bool,
) -> ResourceMappingTokens {
let mut per_bridge: Vec<Vec<&ResourceKeySpec>> = vec![Vec::new(); bridge_specs.len()];
for spec in resource_specs {
let ResourceOwner::Bridge(bridge_index) = spec.owner else {
continue;
};
if sim_mode && !bridge_specs[bridge_index].run_in_sim {
continue;
}
per_bridge[bridge_index].push(spec);
}
let mut mapping_defs = Vec::new();
let mut mapping_refs = Vec::new();
for (idx, entries) in per_bridge.iter().enumerate() {
if entries.is_empty() {
mapping_refs.push(quote! { None });
continue;
}
let bridge_type = &bridge_specs[idx].type_path;
let binding_type = quote! {
<<#bridge_type as cu29::cubridge::CuBridge>::Resources<'_> as ResourceBindings>::Binding
};
let entries_ident = format_ident!("BRIDGE{}_RES_ENTRIES", idx);
let map_ident = format_ident!("BRIDGE{}_RES_MAPPING", idx);
let entry_tokens = entries.iter().map(|spec| {
let binding_ident =
Ident::new(&config_id_to_enum(spec.binding_name.as_str()), Span::call_site());
let resource_ident =
Ident::new(&config_id_to_enum(spec.resource_name.as_str()), Span::call_site());
let bundle_index = spec.bundle_index;
let provider_path = &spec.provider_path;
quote! {
(#binding_type::#binding_ident, cu29::resource::ResourceKey::new(
cu29::resource::BundleIndex::new(#bundle_index),
<#provider_path as cu29::resource::ResourceBundleDecl>::Id::#resource_ident as usize,
))
}
});
mapping_defs.push(quote! {
const #entries_ident: &[(#binding_type, cu29::resource::ResourceKey)] = &[ #(#entry_tokens),* ];
const #map_ident: cu29::resource::ResourceBindingMap<#binding_type> =
cu29::resource::ResourceBindingMap::new(#entries_ident);
});
mapping_refs.push(quote! { Some(&#map_ident) });
}
ResourceMappingTokens {
defs: quote! { #(#mapping_defs)* },
refs: mapping_refs,
}
}
fn build_execution_plan(
graph: &CuGraph,
task_specs: &CuTaskSpecSet,
bridge_specs: &mut [BridgeSpec],
) -> CuResult<(
CuExecutionLoop,
Vec<ExecutionEntity>,
HashMap<NodeId, NodeId>,
)> {
let mut plan_graph = CuGraph::default();
let mut exec_entities = Vec::new();
let mut original_to_plan = HashMap::new();
let mut plan_to_original = HashMap::new();
let mut name_to_original = HashMap::new();
let mut channel_nodes = HashMap::new();
for (node_id, node) in graph.get_all_nodes() {
name_to_original.insert(node.get_id(), node_id);
if node.get_flavor() != Flavor::Task {
continue;
}
let plan_node_id = plan_graph.add_node(node.clone())?;
let task_index = task_specs.node_id_to_task_index[node_id as usize]
.expect("Task missing from specifications");
plan_to_original.insert(plan_node_id, node_id);
original_to_plan.insert(node_id, plan_node_id);
if plan_node_id as usize != exec_entities.len() {
panic!("Unexpected node ordering while mirroring tasks in plan graph");
}
exec_entities.push(ExecutionEntity {
kind: ExecutionEntityKind::Task { task_index },
});
}
for (bridge_index, spec) in bridge_specs.iter_mut().enumerate() {
for (channel_index, channel_spec) in spec.rx_channels.iter_mut().enumerate() {
let mut node = Node::new(
format!("{}::rx::{}", spec.id, channel_spec.id).as_str(),
"__CuBridgeRxChannel",
);
node.set_flavor(Flavor::Bridge);
let plan_node_id = plan_graph.add_node(node)?;
if plan_node_id as usize != exec_entities.len() {
panic!("Unexpected node ordering while inserting bridge rx channel");
}
channel_spec.plan_node_id = Some(plan_node_id);
exec_entities.push(ExecutionEntity {
kind: ExecutionEntityKind::BridgeRx {
bridge_index,
channel_index,
},
});
channel_nodes.insert(
BridgeChannelKey {
bridge_id: spec.id.clone(),
channel_id: channel_spec.id.clone(),
direction: BridgeChannelDirection::Rx,
},
plan_node_id,
);
}
for (channel_index, channel_spec) in spec.tx_channels.iter_mut().enumerate() {
let mut node = Node::new(
format!("{}::tx::{}", spec.id, channel_spec.id).as_str(),
"__CuBridgeTxChannel",
);
node.set_flavor(Flavor::Bridge);
let plan_node_id = plan_graph.add_node(node)?;
if plan_node_id as usize != exec_entities.len() {
panic!("Unexpected node ordering while inserting bridge tx channel");
}
channel_spec.plan_node_id = Some(plan_node_id);
exec_entities.push(ExecutionEntity {
kind: ExecutionEntityKind::BridgeTx {
bridge_index,
channel_index,
},
});
channel_nodes.insert(
BridgeChannelKey {
bridge_id: spec.id.clone(),
channel_id: channel_spec.id.clone(),
direction: BridgeChannelDirection::Tx,
},
plan_node_id,
);
}
}
for cnx in graph.edges() {
let src_plan = if let Some(channel) = &cnx.src_channel {
let key = BridgeChannelKey {
bridge_id: cnx.src.clone(),
channel_id: channel.clone(),
direction: BridgeChannelDirection::Rx,
};
*channel_nodes
.get(&key)
.unwrap_or_else(|| panic!("Bridge source {:?} missing from plan graph", key))
} else {
let node_id = name_to_original
.get(&cnx.src)
.copied()
.unwrap_or_else(|| panic!("Unknown source node '{}'", cnx.src));
*original_to_plan
.get(&node_id)
.unwrap_or_else(|| panic!("Source node '{}' missing from plan", cnx.src))
};
let dst_plan = if let Some(channel) = &cnx.dst_channel {
let key = BridgeChannelKey {
bridge_id: cnx.dst.clone(),
channel_id: channel.clone(),
direction: BridgeChannelDirection::Tx,
};
*channel_nodes
.get(&key)
.unwrap_or_else(|| panic!("Bridge destination {:?} missing from plan graph", key))
} else {
let node_id = name_to_original
.get(&cnx.dst)
.copied()
.unwrap_or_else(|| panic!("Unknown destination node '{}'", cnx.dst));
*original_to_plan
.get(&node_id)
.unwrap_or_else(|| panic!("Destination node '{}' missing from plan", cnx.dst))
};
plan_graph
.connect_ext_with_order(
src_plan,
dst_plan,
&cnx.msg,
cnx.missions.clone(),
None,
None,
cnx.order,
)
.map_err(|e| CuError::from(e.to_string()))?;
}
let runtime_plan = compute_runtime_plan(&plan_graph)?;
Ok((runtime_plan, exec_entities, plan_to_original))
}
fn collect_culist_metadata(
runtime_plan: &CuExecutionLoop,
exec_entities: &[ExecutionEntity],
bridge_specs: &mut [BridgeSpec],
plan_to_original: &HashMap<NodeId, NodeId>,
) -> (Vec<usize>, HashMap<NodeId, usize>) {
let mut culist_order = Vec::new();
let mut node_output_positions = HashMap::new();
for unit in &runtime_plan.steps {
if let CuExecutionUnit::Step(step) = unit
&& let Some(output_pack) = &step.output_msg_pack
{
let output_idx = output_pack.culist_index;
culist_order.push(output_idx as usize);
match &exec_entities[step.node_id as usize].kind {
ExecutionEntityKind::Task { .. } => {
if let Some(original_node_id) = plan_to_original.get(&step.node_id) {
node_output_positions.insert(*original_node_id, output_idx as usize);
}
}
ExecutionEntityKind::BridgeRx {
bridge_index,
channel_index,
} => {
bridge_specs[*bridge_index].rx_channels[*channel_index].culist_index =
Some(output_idx as usize);
}
ExecutionEntityKind::BridgeTx {
bridge_index,
channel_index,
} => {
bridge_specs[*bridge_index].tx_channels[*channel_index].culist_index =
Some(output_idx as usize);
}
}
}
}
(culist_order, node_output_positions)
}
fn build_monitor_culist_component_mapping(
runtime_plan: &CuExecutionLoop,
exec_entities: &[ExecutionEntity],
bridge_specs: &[BridgeSpec],
) -> Result<Vec<usize>, String> {
let mut mapping = Vec::new();
for unit in &runtime_plan.steps {
if let CuExecutionUnit::Step(step) = unit
&& step.output_msg_pack.is_some()
{
let Some(entity) = exec_entities.get(step.node_id as usize) else {
return Err(format!(
"Missing execution entity for plan node {} while building monitor mapping",
step.node_id
));
};
let component_index = match &entity.kind {
ExecutionEntityKind::Task { task_index } => *task_index,
ExecutionEntityKind::BridgeRx {
bridge_index,
channel_index,
} => bridge_specs
.get(*bridge_index)
.and_then(|spec| spec.rx_channels.get(*channel_index))
.and_then(|channel| channel.monitor_index)
.ok_or_else(|| {
format!(
"Missing monitor index for bridge rx {}:{}",
bridge_index, channel_index
)
})?,
ExecutionEntityKind::BridgeTx {
bridge_index,
channel_index,
} => bridge_specs
.get(*bridge_index)
.and_then(|spec| spec.tx_channels.get(*channel_index))
.and_then(|channel| channel.monitor_index)
.ok_or_else(|| {
format!(
"Missing monitor index for bridge tx {}:{}",
bridge_index, channel_index
)
})?,
};
mapping.push(component_index);
}
}
Ok(mapping)
}
#[allow(dead_code)]
fn build_monitored_ids(task_ids: &[String], bridge_specs: &mut [BridgeSpec]) -> Vec<String> {
let mut names = task_ids.to_vec();
for spec in bridge_specs.iter_mut() {
spec.monitor_index = Some(names.len());
names.push(format!("bridge::{}", spec.id));
for channel in spec.rx_channels.iter_mut() {
channel.monitor_index = Some(names.len());
names.push(format!("bridge::{}::rx::{}", spec.id, channel.id));
}
for channel in spec.tx_channels.iter_mut() {
channel.monitor_index = Some(names.len());
names.push(format!("bridge::{}::tx::{}", spec.id, channel.id));
}
}
names
}
fn generate_task_execution_tokens(
step: &CuExecutionStep,
task_index: usize,
task_specs: &CuTaskSpecSet,
output_pack_sizes: &[usize],
sim_mode: bool,
mission_mod: &Ident,
) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
let node_index = int2sliceindex(task_index as u32);
let task_instance = quote! { tasks.#node_index };
let comment_str = format!(
"DEBUG ->> {} ({:?}) Id:{} I:{:?} O:{:?}",
step.node.get_id(),
step.task_type,
step.node_id,
step.input_msg_indices_types,
step.output_msg_pack
);
let comment_tokens = quote! {{
let _ = stringify!(#comment_str);
}};
let tid = task_index;
let task_enum_name = config_id_to_enum(&task_specs.ids[tid]);
let enum_name = Ident::new(&task_enum_name, Span::call_site());
let task_hint = config_id_to_struct_member(&task_specs.ids[tid]);
let source_slot_match_trait_ident = format_ident!(
"__CuOutputSlotMustMatchTaskOutput__Task_{}__Add_dst___nc___connections_for_unused_outputs",
task_hint
);
let source_slot_match_fn_ident = format_ident!(
"__cu_source_output_slot_or_add_dst___nc___for_unused_outputs__task_{}",
task_hint
);
let regular_slot_match_trait_ident = format_ident!(
"__CuOutputSlotMustMatchTaskOutput__Task_{}__Add_dst___nc___connections_for_unused_outputs",
task_hint
);
let regular_slot_match_fn_ident = format_ident!(
"__cu_task_output_slot_or_add_dst___nc___for_unused_outputs__task_{}",
task_hint
);
let rt_guard = rtsan_guard_tokens();
let run_in_sim_flag = task_specs.run_in_sim_flags[tid];
let maybe_sim_tick = if sim_mode && !run_in_sim_flag {
quote! {
if !doit {
#task_instance.sim_tick();
}
}
} else {
quote!()
};
let output_pack = step
.output_msg_pack
.as_ref()
.expect("Task should have an output message pack.");
let output_culist_index = int2sliceindex(output_pack.culist_index);
let output_ports: Vec<syn::Index> = (0..output_pack.msg_types.len())
.map(syn::Index::from)
.collect();
let output_clear_payload = if output_ports.len() == 1 {
quote! { cumsg_output.clear_payload(); }
} else {
quote! { #(cumsg_output.#output_ports.clear_payload();)* }
};
let output_start_time = if output_ports.len() == 1 {
quote! {
if cumsg_output.metadata.process_time.start.is_none() {
cumsg_output.metadata.process_time.start = cu29::curuntime::perf_now(clock).into();
}
}
} else {
quote! {
let start_time = cu29::curuntime::perf_now(clock).into();
#( if cumsg_output.#output_ports.metadata.process_time.start.is_none() {
cumsg_output.#output_ports.metadata.process_time.start = start_time;
} )*
}
};
let output_end_time = if output_ports.len() == 1 {
quote! {
if cumsg_output.metadata.process_time.end.is_none() {
cumsg_output.metadata.process_time.end = cu29::curuntime::perf_now(clock).into();
}
}
} else {
quote! {
let end_time = cu29::curuntime::perf_now(clock).into();
#( if cumsg_output.#output_ports.metadata.process_time.end.is_none() {
cumsg_output.#output_ports.metadata.process_time.end = end_time;
} )*
}
};
match step.task_type {
CuTaskType::Source => {
let monitoring_action = quote! {
debug!("Component {}: Error during process: {}", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), &error);
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#tid), CuComponentState::Process, &error);
match decision {
Decision::Abort => {
debug!("Process: ABORT decision from monitoring. Component '{}' errored out \
during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), clid);
ctx.clear_current_task();
let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
cl_manager.end_of_processing(clid)?;
monitor_result?;
return Ok(());
}
Decision::Ignore => {
debug!("Process: IGNORE decision from monitoring. Component '{}' errored out \
during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
let cumsg_output = &mut msgs.#output_culist_index;
#output_clear_payload
}
Decision::Shutdown => {
debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out \
during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
return Err(CuError::new_with_cause("Component errored out during process.", error));
}
}
};
let call_sim_callback = if sim_mode {
quote! {
let doit = {
let cumsg_output = &mut msgs.#output_culist_index;
let state = CuTaskCallbackState::Process((), cumsg_output);
let ovr = sim_callback(SimStep::#enum_name(state));
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
#monitoring_action
false
} else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
let logging_tokens = if !task_specs.logging_enabled[tid] {
quote! {
let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
#output_clear_payload
}
} else {
quote!()
};
let source_process_tokens = quote! {
#[allow(non_camel_case_types)]
trait #source_slot_match_trait_ident<Expected> {
fn __cu_cast_output_slot(slot: &mut Self) -> &mut Expected;
}
impl<T> #source_slot_match_trait_ident<T> for T {
fn __cu_cast_output_slot(slot: &mut Self) -> &mut T {
slot
}
}
fn #source_slot_match_fn_ident<'a, Task, Slot>(
_task: &Task,
slot: &'a mut Slot,
) -> &'a mut Task::Output<'static>
where
Task: cu29::cutask::CuSrcTask,
Slot: #source_slot_match_trait_ident<Task::Output<'static>>,
{
<Slot as #source_slot_match_trait_ident<Task::Output<'static>>>::__cu_cast_output_slot(slot)
}
#output_start_time
let result = {
let cumsg_output = #source_slot_match_fn_ident::<
_,
_,
>(&#task_instance, cumsg_output);
#rt_guard
ctx.set_current_task(#tid);
#task_instance.process(&ctx, cumsg_output)
};
#output_end_time
result
};
(
quote! {
{
#comment_tokens
kf_manager.freeze_task(clid, &#task_instance)?;
#call_sim_callback
let cumsg_output = &mut msgs.#output_culist_index;
#maybe_sim_tick
let maybe_error = if doit {
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#tid),
step: CuComponentState::Process,
culistid: Some(clid),
});
#source_process_tokens
} else {
Ok(())
};
if let Err(error) = maybe_error {
#monitoring_action
}
}
},
logging_tokens,
)
}
CuTaskType::Sink => {
let input_exprs: Vec<proc_macro2::TokenStream> = step
.input_msg_indices_types
.iter()
.map(|input| {
let input_index = int2sliceindex(input.culist_index);
let output_size = output_pack_sizes
.get(input.culist_index as usize)
.copied()
.unwrap_or_else(|| {
panic!(
"Missing output pack size for culist index {}",
input.culist_index
)
});
if output_size > 1 {
let port_index = syn::Index::from(input.src_port);
quote! { msgs.#input_index.#port_index }
} else {
quote! { msgs.#input_index }
}
})
.collect();
let inputs_type = if input_exprs.len() == 1 {
let input = input_exprs.first().unwrap();
quote! { #input }
} else {
quote! { (#(&#input_exprs),*) }
};
let monitoring_action = quote! {
debug!("Component {}: Error during process: {}", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), &error);
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#tid), CuComponentState::Process, &error);
match decision {
Decision::Abort => {
debug!("Process: ABORT decision from monitoring. Component '{}' errored out \
during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), clid);
ctx.clear_current_task();
let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
cl_manager.end_of_processing(clid)?;
monitor_result?;
return Ok(());
}
Decision::Ignore => {
debug!("Process: IGNORE decision from monitoring. Component '{}' errored out \
during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
let cumsg_output = &mut msgs.#output_culist_index;
#output_clear_payload
}
Decision::Shutdown => {
debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out \
during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
return Err(CuError::new_with_cause("Component errored out during process.", error));
}
}
};
let call_sim_callback = if sim_mode {
quote! {
let doit = {
let cumsg_input = &#inputs_type;
let cumsg_output = &mut msgs.#output_culist_index;
let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
let ovr = sim_callback(SimStep::#enum_name(state));
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
#monitoring_action
false
} else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
(
quote! {
{
#comment_tokens
kf_manager.freeze_task(clid, &#task_instance)?;
#call_sim_callback
let cumsg_input = &#inputs_type;
let cumsg_output = &mut msgs.#output_culist_index;
let maybe_error = if doit {
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#tid),
step: CuComponentState::Process,
culistid: Some(clid),
});
#output_start_time
let result = {
#rt_guard
ctx.set_current_task(#tid);
#task_instance.process(&ctx, cumsg_input)
};
#output_end_time
result
} else {
Ok(())
};
if let Err(error) = maybe_error {
#monitoring_action
}
}
},
quote! {},
)
}
CuTaskType::Regular => {
let input_exprs: Vec<proc_macro2::TokenStream> = step
.input_msg_indices_types
.iter()
.map(|input| {
let input_index = int2sliceindex(input.culist_index);
let output_size = output_pack_sizes
.get(input.culist_index as usize)
.copied()
.unwrap_or_else(|| {
panic!(
"Missing output pack size for culist index {}",
input.culist_index
)
});
if output_size > 1 {
let port_index = syn::Index::from(input.src_port);
quote! { msgs.#input_index.#port_index }
} else {
quote! { msgs.#input_index }
}
})
.collect();
let inputs_type = if input_exprs.len() == 1 {
let input = input_exprs.first().unwrap();
quote! { #input }
} else {
quote! { (#(&#input_exprs),*) }
};
let monitoring_action = quote! {
debug!("Component {}: Error during process: {}", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), &error);
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#tid), CuComponentState::Process, &error);
match decision {
Decision::Abort => {
debug!("Process: ABORT decision from monitoring. Component '{}' errored out \
during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)), clid);
ctx.clear_current_task();
let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
cl_manager.end_of_processing(clid)?;
monitor_result?;
return Ok(());
}
Decision::Ignore => {
debug!("Process: IGNORE decision from monitoring. Component '{}' errored out \
during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
let cumsg_output = &mut msgs.#output_culist_index;
#output_clear_payload
}
Decision::Shutdown => {
debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out \
during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#tid)));
return Err(CuError::new_with_cause("Component errored out during process.", error));
}
}
};
let call_sim_callback = if sim_mode {
quote! {
let doit = {
let cumsg_input = &#inputs_type;
let cumsg_output = &mut msgs.#output_culist_index;
let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
let ovr = sim_callback(SimStep::#enum_name(state));
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
#monitoring_action
false
}
else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
let logging_tokens = if !task_specs.logging_enabled[tid] {
quote! {
let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
#output_clear_payload
}
} else {
quote!()
};
let regular_process_tokens = quote! {
#[allow(non_camel_case_types)]
trait #regular_slot_match_trait_ident<Expected> {
fn __cu_cast_output_slot(slot: &mut Self) -> &mut Expected;
}
impl<T> #regular_slot_match_trait_ident<T> for T {
fn __cu_cast_output_slot(slot: &mut Self) -> &mut T {
slot
}
}
fn #regular_slot_match_fn_ident<'a, Task, Slot>(
_task: &Task,
slot: &'a mut Slot,
) -> &'a mut Task::Output<'static>
where
Task: cu29::cutask::CuTask,
Slot: #regular_slot_match_trait_ident<Task::Output<'static>>,
{
<Slot as #regular_slot_match_trait_ident<Task::Output<'static>>>::__cu_cast_output_slot(slot)
}
#output_start_time
let result = {
let cumsg_output = #regular_slot_match_fn_ident::<
_,
_,
>(&#task_instance, cumsg_output);
#rt_guard
ctx.set_current_task(#tid);
#task_instance.process(&ctx, cumsg_input, cumsg_output)
};
#output_end_time
result
};
(
quote! {
{
#comment_tokens
kf_manager.freeze_task(clid, &#task_instance)?;
#call_sim_callback
let cumsg_input = &#inputs_type;
let cumsg_output = &mut msgs.#output_culist_index;
let maybe_error = if doit {
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#tid),
step: CuComponentState::Process,
culistid: Some(clid),
});
#regular_process_tokens
} else {
Ok(())
};
if let Err(error) = maybe_error {
#monitoring_action
}
}
},
logging_tokens,
)
}
}
}
fn generate_bridge_rx_execution_tokens(
step: &CuExecutionStep,
bridge_spec: &BridgeSpec,
channel_index: usize,
mission_mod: &Ident,
sim_mode: bool,
) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
let rt_guard = rtsan_guard_tokens();
let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
let channel = &bridge_spec.rx_channels[channel_index];
let output_pack = step
.output_msg_pack
.as_ref()
.expect("Bridge Rx channel missing output pack");
let port_index = output_pack
.msg_types
.iter()
.position(|msg| msg == &channel.msg_type_name)
.unwrap_or_else(|| {
panic!(
"Bridge Rx channel '{}' missing output port for '{}'",
channel.id, channel.msg_type_name
)
});
let culist_index_ts = int2sliceindex(output_pack.culist_index);
let output_ref = if output_pack.msg_types.len() == 1 {
quote! { &mut msgs.#culist_index_ts }
} else {
let port_index = syn::Index::from(port_index);
quote! { &mut msgs.#culist_index_ts.#port_index }
};
let monitor_index = syn::Index::from(
channel
.monitor_index
.expect("Bridge Rx channel missing monitor index"),
);
let bridge_type = runtime_bridge_type_for_spec(bridge_spec, sim_mode);
let const_ident = &channel.const_ident;
let enum_ident = Ident::new(
&config_id_to_enum(&format!("{}_rx_{}", bridge_spec.id, channel.id)),
Span::call_site(),
);
let call_sim_callback = if sim_mode {
quote! {
let doit = {
let state = SimStep::#enum_ident {
channel: &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
msg: cumsg_output,
};
let ovr = sim_callback(state);
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Process, &error);
match decision {
Decision::Abort => {
debug!("Process: ABORT decision from monitoring. Component '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)), clid);
ctx.clear_current_task();
let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
cl_manager.end_of_processing(clid)?;
monitor_result?;
return Ok(());
}
Decision::Ignore => {
debug!("Process: IGNORE decision from monitoring. Component '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
cumsg_output.clear_payload();
false
}
Decision::Shutdown => {
debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Err(CuError::new_with_cause("Component errored out during process.", error));
}
}
} else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
(
quote! {
{
let bridge = &mut __cu_bridges.#bridge_tuple_index;
let cumsg_output = #output_ref;
#call_sim_callback
if doit {
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#monitor_index),
step: CuComponentState::Process,
culistid: Some(clid),
});
cumsg_output.metadata.process_time.start = cu29::curuntime::perf_now(clock).into();
let maybe_error = {
#rt_guard
ctx.clear_current_task();
bridge.receive(
&ctx,
&<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
cumsg_output,
)
};
cumsg_output.metadata.process_time.end = cu29::curuntime::perf_now(clock).into();
if let Err(error) = maybe_error {
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Process, &error);
match decision {
Decision::Abort => {
debug!("Process: ABORT decision from monitoring. Component '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)), clid);
ctx.clear_current_task();
let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
cl_manager.end_of_processing(clid)?;
monitor_result?;
return Ok(());
}
Decision::Ignore => {
debug!("Process: IGNORE decision from monitoring. Component '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
cumsg_output.clear_payload();
}
Decision::Shutdown => {
debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Err(CuError::new_with_cause("Component errored out during process.", error));
}
}
}
}
}
},
quote! {},
)
}
fn generate_bridge_tx_execution_tokens(
step: &CuExecutionStep,
bridge_spec: &BridgeSpec,
channel_index: usize,
output_pack_sizes: &[usize],
mission_mod: &Ident,
sim_mode: bool,
) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
let rt_guard = rtsan_guard_tokens();
let channel = &bridge_spec.tx_channels[channel_index];
let monitor_index = syn::Index::from(
channel
.monitor_index
.expect("Bridge Tx channel missing monitor index"),
);
let input = step
.input_msg_indices_types
.first()
.expect("Bridge Tx channel should have exactly one input");
let input_index = int2sliceindex(input.culist_index);
let output_size = output_pack_sizes
.get(input.culist_index as usize)
.copied()
.unwrap_or_else(|| {
panic!(
"Missing output pack size for culist index {}",
input.culist_index
)
});
let input_ref = if output_size > 1 {
let port_index = syn::Index::from(input.src_port);
quote! { &mut msgs.#input_index.#port_index }
} else {
quote! { &mut msgs.#input_index }
};
let output_pack = step
.output_msg_pack
.as_ref()
.expect("Bridge Tx channel missing output pack");
if output_pack.msg_types.len() != 1 {
panic!(
"Bridge Tx channel '{}' expected a single output message slot, got {}",
channel.id,
output_pack.msg_types.len()
);
}
let output_index = int2sliceindex(output_pack.culist_index);
let output_ref = quote! { &mut msgs.#output_index };
let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
let bridge_type = runtime_bridge_type_for_spec(bridge_spec, sim_mode);
let const_ident = &channel.const_ident;
let enum_ident = Ident::new(
&config_id_to_enum(&format!("{}_tx_{}", bridge_spec.id, channel.id)),
Span::call_site(),
);
let call_sim_callback = if sim_mode {
quote! {
let doit = {
let state = SimStep::#enum_ident {
channel: &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
msg: &*cumsg_input,
};
let ovr = sim_callback(state);
if let SimOverride::Errored(reason) = ovr {
let error: CuError = reason.into();
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Process, &error);
match decision {
Decision::Abort => {
debug!("Process: ABORT decision from monitoring. Component '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)), clid);
ctx.clear_current_task();
let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
cl_manager.end_of_processing(clid)?;
monitor_result?;
return Ok(());
}
Decision::Ignore => {
debug!("Process: IGNORE decision from monitoring. Component '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
false
}
Decision::Shutdown => {
debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Err(CuError::new_with_cause("Component errored out during process.", error));
}
}
} else {
ovr == SimOverride::ExecuteByRuntime
}
};
}
} else {
quote! { let doit = true; }
};
(
quote! {
{
let bridge = &mut __cu_bridges.#bridge_tuple_index;
let cumsg_input = #input_ref;
let cumsg_output = #output_ref;
#call_sim_callback
if doit {
execution_probe.record(cu29::monitoring::ExecutionMarker {
component_id: cu29::monitoring::ComponentId::new(#monitor_index),
step: CuComponentState::Process,
culistid: Some(clid),
});
cumsg_output.metadata.process_time.start = cu29::curuntime::perf_now(clock).into();
let maybe_error = {
#rt_guard
ctx.clear_current_task();
bridge.send(
&ctx,
&<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
&*cumsg_input,
)
};
if let Err(error) = maybe_error {
let decision = monitor.process_error(cu29::monitoring::ComponentId::new(#monitor_index), CuComponentState::Process, &error);
match decision {
Decision::Abort => {
debug!("Process: ABORT decision from monitoring. Component '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)), clid);
ctx.clear_current_task();
let monitor_result = monitor.process_copperlist(&ctx, #mission_mod::MONITOR_LAYOUT.view(&#mission_mod::collect_metadata(&culist)));
cl_manager.end_of_processing(clid)?;
monitor_result?;
return Ok(());
}
Decision::Ignore => {
debug!("Process: IGNORE decision from monitoring. Component '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
}
Decision::Shutdown => {
debug!("Process: SHUTDOWN decision from monitoring. Component '{}' errored out during process. The runtime cannot continue.", #mission_mod::monitor_component_label(cu29::monitoring::ComponentId::new(#monitor_index)));
return Err(CuError::new_with_cause("Component errored out during process.", error));
}
}
}
cumsg_output.metadata.process_time.end = cu29::curuntime::perf_now(clock).into();
}
}
},
quote! {},
)
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum BridgeChannelDirection {
Rx,
Tx,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct BridgeChannelKey {
bridge_id: String,
channel_id: String,
direction: BridgeChannelDirection,
}
#[derive(Clone)]
struct BridgeChannelSpec {
id: String,
const_ident: Ident,
#[allow(dead_code)]
msg_type: Type,
msg_type_name: String,
config_index: usize,
plan_node_id: Option<NodeId>,
culist_index: Option<usize>,
monitor_index: Option<usize>,
}
#[derive(Clone)]
struct BridgeSpec {
id: String,
type_path: Type,
run_in_sim: bool,
config_index: usize,
tuple_index: usize,
monitor_index: Option<usize>,
rx_channels: Vec<BridgeChannelSpec>,
tx_channels: Vec<BridgeChannelSpec>,
}
fn sim_bridge_channel_set_idents(bridge_tuple_index: usize) -> (Ident, Ident, Ident, Ident) {
(
format_ident!("__CuSimBridge{}TxChannels", bridge_tuple_index),
format_ident!("__CuSimBridge{}TxId", bridge_tuple_index),
format_ident!("__CuSimBridge{}RxChannels", bridge_tuple_index),
format_ident!("__CuSimBridge{}RxId", bridge_tuple_index),
)
}
fn runtime_bridge_type_for_spec(bridge_spec: &BridgeSpec, sim_mode: bool) -> Type {
if sim_mode && !bridge_spec.run_in_sim {
let (tx_set_ident, _tx_id_ident, rx_set_ident, _rx_id_ident) =
sim_bridge_channel_set_idents(bridge_spec.tuple_index);
let tx_type: Type = if bridge_spec.tx_channels.is_empty() {
parse_quote!(cu29::simulation::CuNoBridgeChannels)
} else {
parse_quote!(#tx_set_ident)
};
let rx_type: Type = if bridge_spec.rx_channels.is_empty() {
parse_quote!(cu29::simulation::CuNoBridgeChannels)
} else {
parse_quote!(#rx_set_ident)
};
parse_quote!(cu29::simulation::CuSimBridge<#tx_type, #rx_type>)
} else {
bridge_spec.type_path.clone()
}
}
#[derive(Clone)]
struct ExecutionEntity {
kind: ExecutionEntityKind,
}
#[derive(Clone)]
enum ExecutionEntityKind {
Task {
task_index: usize,
},
BridgeRx {
bridge_index: usize,
channel_index: usize,
},
BridgeTx {
bridge_index: usize,
channel_index: usize,
},
}
#[cfg(test)]
mod tests {
#[test]
fn test_compile_fail() {
use rustc_version::{Channel, version_meta};
use std::{env, fs, path::Path};
let log_index_dir = env::temp_dir()
.join("cu29_derive_trybuild_log_index")
.join("a")
.join("b")
.join("c");
fs::create_dir_all(&log_index_dir).unwrap();
unsafe {
env::set_var("LOG_INDEX_DIR", &log_index_dir);
}
let dir = Path::new("tests/compile_fail");
for entry in fs::read_dir(dir).unwrap() {
let entry = entry.unwrap();
if !entry.file_type().unwrap().is_dir() {
continue;
}
for file in fs::read_dir(entry.path()).unwrap() {
let file = file.unwrap();
let p = file.path();
if p.extension().and_then(|x| x.to_str()) != Some("rs") {
continue;
}
let base = p.with_extension("stderr"); let src = match version_meta().unwrap().channel {
Channel::Beta => Path::new(&format!("{}.beta", base.display())).to_path_buf(),
_ => Path::new(&format!("{}.stable", base.display())).to_path_buf(),
};
if src.exists() {
fs::copy(src, &base).unwrap();
}
}
}
let t = trybuild::TestCases::new();
t.compile_fail("tests/compile_fail/*/*.rs");
t.pass("tests/compile_pass/*/*.rs");
}
#[test]
fn runtime_plan_keeps_nc_order_for_non_first_connected_output() {
use super::*;
use cu29::config::CuConfig;
use cu29::curuntime::{CuExecutionUnit, compute_runtime_plan};
let config: CuConfig =
read_config("tests/config/multi_output_source_non_first_connected_valid.ron")
.expect("failed to read test config");
let graph = config.get_graph(None).expect("missing graph");
let src_id = graph.get_node_id_by_name("src").expect("missing src node");
let runtime = compute_runtime_plan(graph).expect("runtime plan failed");
let src_step = runtime
.steps
.iter()
.find_map(|step| match step {
CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
_ => None,
})
.expect("missing source step");
assert_eq!(
src_step.output_msg_pack.as_ref().unwrap().msg_types,
vec!["i32", "bool"]
);
}
#[test]
fn matching_task_ids_are_flattened_per_output_message() {
use super::*;
use cu29::config::CuConfig;
let config: CuConfig =
read_config("tests/config/multi_output_source_non_first_connected_valid.ron")
.expect("failed to read test config");
let graph = config.get_graph(None).expect("missing graph");
let task_specs = CuTaskSpecSet::from_graph(graph);
let channel_usage = collect_bridge_channel_usage(graph);
let mut bridge_specs = build_bridge_specs(&config, graph, &channel_usage);
let (runtime_plan, exec_entities, plan_to_original) =
build_execution_plan(graph, &task_specs, &mut bridge_specs)
.expect("runtime plan failed");
let output_packs = extract_output_packs(&runtime_plan);
let task_names = collect_task_names(graph);
let (_, node_output_positions) = collect_culist_metadata(
&runtime_plan,
&exec_entities,
&mut bridge_specs,
&plan_to_original,
);
let mut slot_origin_ids: Vec<Option<String>> = vec![None; output_packs.len()];
for (node_id, task_id, _) in task_names {
let output_position = node_output_positions
.get(&node_id)
.unwrap_or_else(|| panic!("Task {task_id} (node id: {node_id}) not found"));
slot_origin_ids[*output_position] = Some(task_id);
}
let flattened_ids = flatten_slot_origin_ids(&output_packs, slot_origin_ids);
assert_eq!(
flattened_ids,
vec!["src".to_string(), "src".to_string(), "sink".to_string()]
);
}
#[test]
fn bridge_resources_are_collected() {
use super::*;
use cu29::config::{CuGraph, Flavor, Node};
use std::collections::HashMap;
use syn::parse_str;
let mut graph = CuGraph::default();
let mut node = Node::new_with_flavor("radio", "bridge::Dummy", Flavor::Bridge);
let mut res = HashMap::new();
res.insert("serial".to_string(), "fc.serial0".to_string());
node.set_resources(Some(res));
graph.add_node(node).expect("bridge node");
let task_specs = CuTaskSpecSet::from_graph(&graph);
let bridge_spec = BridgeSpec {
id: "radio".to_string(),
type_path: parse_str("bridge::Dummy").unwrap(),
run_in_sim: true,
config_index: 0,
tuple_index: 0,
monitor_index: None,
rx_channels: Vec::new(),
tx_channels: Vec::new(),
};
let mut config = cu29::config::CuConfig::default();
config.resources.push(ResourceBundleConfig {
id: "fc".to_string(),
provider: "board::Bundle".to_string(),
config: None,
missions: None,
});
let bundle_specs = build_bundle_specs(&config, "default").expect("bundle specs");
let specs = collect_resource_specs(&graph, &task_specs, &[bridge_spec], &bundle_specs)
.expect("collect specs");
assert_eq!(specs.len(), 1);
assert!(matches!(specs[0].owner, ResourceOwner::Bridge(0)));
assert_eq!(specs[0].binding_name, "serial");
assert_eq!(specs[0].bundle_index, 0);
assert_eq!(specs[0].resource_name, "serial0");
}
}