use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap};
use std::fs;
use std::process::{Command, Stdio};
use std::rc::Rc;
use dfir_lang::diagnostic::Diagnostics;
use dfir_lang::graph::DfirGraph;
use proc_macro2::Span;
use quote::quote;
use sha2::{Digest, Sha256};
use slotmap::SparseSecondaryMap;
use stageleft::QuotedWithContext;
use syn::visit_mut::VisitMut;
use tempfile::TempPath;
use trybuild_internals_api::{cargo, dependencies, path};
use crate::compile::builder::ExternalPortId;
use crate::compile::deploy_provider::{Deploy, DynSourceSink, Node, RegisterPort};
#[cfg(feature = "deploy")]
use crate::compile::trybuild::generate::LinkingMode;
use crate::compile::trybuild::generate::{
CONCURRENT_TEST_LOCK, IS_TEST, TrybuildConfig, create_trybuild, write_atomic,
};
use crate::compile::trybuild::rewriters::UseTestModeStaged;
use crate::deploy::deploy_runtime::cluster_membership_stream;
use crate::location::dynamic::LocationId;
use crate::location::member_id::TaglessMemberId;
use crate::location::{LocationKey, MembershipEvent};
use crate::staging_util::get_this_crate;
crate::newtype_counter! {
pub struct SimNodePort(usize);
pub struct SimExternalPort(usize);
}
#[derive(Clone)]
pub struct SimNode {
pub shared_port_counter: Rc<RefCell<SimNodePort>>,
}
impl Node for SimNode {
type Port = SimNodePort;
type Meta = ();
type InstantiateEnv = ();
fn next_port(&self) -> Self::Port {
self.shared_port_counter.borrow_mut().get_and_increment()
}
fn update_meta(&self, _meta: &Self::Meta) {}
fn instantiate(
&self,
_env: &mut Self::InstantiateEnv,
_meta: &mut Self::Meta,
_graph: DfirGraph,
_extra_stmts: &[syn::Stmt],
_sidecars: &[syn::Expr],
) {
}
}
#[derive(Clone, Default)]
pub(crate) struct SimExternalPortRegistry {
pub(crate) port_counter: SimExternalPort,
pub(crate) registered: HashMap<ExternalPortId, SimExternalPort>,
}
#[derive(Clone, Default)]
pub struct SimExternal {
pub(crate) shared_inner: Rc<RefCell<SimExternalPortRegistry>>,
}
impl Node for SimExternal {
type Port = SimExternalPort;
type Meta = ();
type InstantiateEnv = ();
fn next_port(&self) -> Self::Port {
self.shared_inner
.borrow_mut()
.port_counter
.get_and_increment()
}
fn update_meta(&self, _meta: &Self::Meta) {
todo!("SimExternal::update_meta is not yet implemented")
}
fn instantiate(
&self,
_env: &mut Self::InstantiateEnv,
_meta: &mut Self::Meta,
_graph: DfirGraph,
_extra_stmts: &[syn::Stmt],
_sidecars: &[syn::Expr],
) {
}
}
impl<'a> RegisterPort<'a, SimDeploy> for SimExternal {
fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
assert!(
self.shared_inner
.borrow_mut()
.registered
.insert(external_port_id, port)
.is_none_or(|old| old == port)
);
}
#[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
fn as_bytes_bidi(
&self,
_external_port_id: ExternalPortId,
) -> impl Future<
Output = DynSourceSink<
Result<bytes::BytesMut, std::io::Error>,
bytes::Bytes,
std::io::Error,
>,
> + 'a {
async { todo!("SimExternal::as_bytes_bidi is not yet supported in simulation") }
}
#[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
fn as_bincode_bidi<InT, OutT>(
&self,
_external_port_id: ExternalPortId,
) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
where
InT: serde::Serialize + 'static,
OutT: serde::de::DeserializeOwned + 'static,
{
async { todo!("SimExternal::as_bincode_bidi is not yet supported in simulation") }
}
#[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
fn as_bincode_sink<T>(
&self,
_external_port_id: ExternalPortId,
) -> impl Future<Output = std::pin::Pin<Box<dyn futures::Sink<T, Error = std::io::Error>>>> + 'a
where
T: serde::Serialize + 'static,
{
async { todo!("SimExternal::as_bincode_sink is not yet supported in simulation") }
}
#[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
fn as_bincode_source<T>(
&self,
_external_port_id: ExternalPortId,
) -> impl Future<Output = std::pin::Pin<Box<dyn futures::Stream<Item = T>>>> + 'a
where
T: serde::de::DeserializeOwned + 'static,
{
async { todo!("SimExternal::as_bincode_source is not yet supported in simulation") }
}
}
pub(super) struct SimDeploy {}
impl<'a> Deploy<'a> for SimDeploy {
type Meta = ();
type InstantiateEnv = ();
type Process = SimNode;
type Cluster = SimNode;
type External = SimExternal;
fn o2o_sink_source(
_env: &mut Self::InstantiateEnv,
_p1: &Self::Process,
p1_port: &<Self::Process as Node>::Port,
_p2: &Self::Process,
p2_port: &<Self::Process as Node>::Port,
_name: Option<&str>,
_networking_info: &crate::networking::NetworkingInfo,
) -> (syn::Expr, syn::Expr) {
let ident_sink =
syn::Ident::new(&format!("__hydro_o2o_sink_{}", p1_port), Span::call_site());
let ident_source = syn::Ident::new(
&format!("__hydro_o2o_source_{}", p2_port),
Span::call_site(),
);
(
syn::parse_quote!(#ident_sink),
syn::parse_quote!(#ident_source),
)
}
fn o2o_connect(
_p1: &Self::Process,
_p1_port: &<Self::Process as Node>::Port,
_p2: &Self::Process,
_p2_port: &<Self::Process as Node>::Port,
) -> Box<dyn FnOnce()> {
Box::new(|| {})
}
fn o2m_sink_source(
_env: &mut Self::InstantiateEnv,
_p1: &Self::Process,
p1_port: &<Self::Process as Node>::Port,
_c2: &Self::Cluster,
c2_port: &<Self::Cluster as Node>::Port,
_name: Option<&str>,
_networking_info: &crate::networking::NetworkingInfo,
) -> (syn::Expr, syn::Expr) {
let ident_sink =
syn::Ident::new(&format!("__hydro_o2m_sink_{}", p1_port), Span::call_site());
let ident_source = syn::Ident::new(
&format!("__hydro_o2m_source_{}", c2_port),
Span::call_site(),
);
(
syn::parse_quote!(#ident_sink),
syn::parse_quote!(#ident_source),
)
}
fn o2m_connect(
_p1: &Self::Process,
_p1_port: &<Self::Process as Node>::Port,
_c2: &Self::Cluster,
_c2_port: &<Self::Cluster as Node>::Port,
) -> Box<dyn FnOnce()> {
Box::new(|| {})
}
fn m2o_sink_source(
_env: &mut Self::InstantiateEnv,
_c1: &Self::Cluster,
c1_port: &<Self::Cluster as Node>::Port,
_p2: &Self::Process,
p2_port: &<Self::Process as Node>::Port,
_name: Option<&str>,
_networking_info: &crate::networking::NetworkingInfo,
) -> (syn::Expr, syn::Expr) {
let ident_sink =
syn::Ident::new(&format!("__hydro_m2o_sink_{}", c1_port), Span::call_site());
let ident_source = syn::Ident::new(
&format!("__hydro_m2o_source_{}", p2_port),
Span::call_site(),
);
(
syn::parse_quote!(#ident_sink),
syn::parse_quote!(#ident_source),
)
}
fn m2o_connect(
_c1: &Self::Cluster,
_c1_port: &<Self::Cluster as Node>::Port,
_p2: &Self::Process,
_p2_port: &<Self::Process as Node>::Port,
) -> Box<dyn FnOnce()> {
Box::new(|| {})
}
fn m2m_sink_source(
_env: &mut Self::InstantiateEnv,
_c1: &Self::Cluster,
c1_port: &<Self::Cluster as Node>::Port,
_c2: &Self::Cluster,
c2_port: &<Self::Cluster as Node>::Port,
_name: Option<&str>,
_networking_info: &crate::networking::NetworkingInfo,
) -> (syn::Expr, syn::Expr) {
let ident_sink =
syn::Ident::new(&format!("__hydro_m2m_sink_{}", c1_port), Span::call_site());
let ident_source = syn::Ident::new(
&format!("__hydro_m2m_source_{}", c2_port),
Span::call_site(),
);
(
syn::parse_quote!(#ident_sink),
syn::parse_quote!(#ident_source),
)
}
fn m2m_connect(
_c1: &Self::Cluster,
_c1_port: &<Self::Cluster as Node>::Port,
_c2: &Self::Cluster,
_c2_port: &<Self::Cluster as Node>::Port,
) -> Box<dyn FnOnce()> {
Box::new(|| {})
}
fn e2o_many_source(
_extra_stmts: &mut Vec<syn::Stmt>,
_p2: &Self::Process,
_p2_port: &<Self::Process as Node>::Port,
_codec_type: &syn::Type,
_shared_handle: String,
) -> syn::Expr {
todo!("e2o_many_source is not yet supported in simulation")
}
fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
todo!("e2o_many_sink is not yet supported in simulation")
}
fn e2o_source(
_extra_stmts: &mut Vec<syn::Stmt>,
_p1: &Self::External,
p1_port: &<Self::External as Node>::Port,
_p2: &Self::Process,
_p2_port: &<Self::Process as Node>::Port,
_codec_type: &syn::Type,
_shared_handle: String,
) -> syn::Expr {
let ident = syn::Ident::new("__hydro_external_in", Span::call_site());
let p1_port_usize = p1_port.0;
syn::parse_quote!({
let (__sender, __receiver) = __root_dfir_rs::util::unbounded_channel::<__root_dfir_rs::bytes::Bytes>();
#ident.insert(#p1_port_usize, __sender);
__receiver
})
}
fn e2o_connect(
_p1: &Self::External,
_p1_port: &<Self::External as Node>::Port,
_p2: &Self::Process,
_p2_port: &<Self::Process as Node>::Port,
_many: bool,
_server_hint: crate::location::NetworkHint,
) -> Box<dyn FnOnce()> {
Box::new(|| {})
}
fn o2e_sink(
_p1: &Self::Process,
_p1_port: &<Self::Process as Node>::Port,
_p2: &Self::External,
p2_port: &<Self::External as Node>::Port,
_shared_handle: String,
) -> syn::Expr {
let ident = syn::Ident::new("__hydro_external_out", Span::call_site());
let p2_port_usize = p2_port.0;
syn::parse_quote!({
let (__sender, __receiver) = __root_dfir_rs::util::unbounded_channel::<__root_dfir_rs::bytes::Bytes>();
#ident.insert(#p2_port_usize, __root_dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream::new(__receiver.into_inner()));
__sender
})
}
fn e2m_source(
_extra_stmts: &mut Vec<syn::Stmt>,
_p1: &Self::External,
p1_port: &<Self::External as Node>::Port,
_c2: &Self::Cluster,
_c2_port: &<Self::Cluster as Node>::Port,
_codec_type: &syn::Type,
_shared_handle: String,
) -> syn::Expr {
let ident = syn::Ident::new("__hydro_cluster_external_in", Span::call_site());
let p1_port_usize = p1_port.0;
syn::parse_quote!({
let (__sender, __receiver) = __root_dfir_rs::util::unbounded_channel::<__root_dfir_rs::bytes::Bytes>();
#ident.entry(#p1_port_usize).or_insert_with(Vec::new).push(__sender);
__receiver
})
}
fn e2m_connect(
_p1: &Self::External,
_p1_port: &<Self::External as Node>::Port,
_c2: &Self::Cluster,
_c2_port: &<Self::Cluster as Node>::Port,
_server_hint: crate::location::NetworkHint,
) -> Box<dyn FnOnce()> {
Box::new(|| {})
}
fn m2e_sink(
_c1: &Self::Cluster,
_c1_port: &<Self::Cluster as Node>::Port,
_p2: &Self::External,
p2_port: &<Self::External as Node>::Port,
_shared_handle: String,
) -> syn::Expr {
let ident = syn::Ident::new("__hydro_cluster_external_out", Span::call_site());
let p2_port_usize = p2_port.0;
syn::parse_quote!({
let (__sender, __receiver) = __root_dfir_rs::util::unbounded_channel::<__root_dfir_rs::bytes::Bytes>();
#ident.entry(#p2_port_usize).or_insert_with(Vec::new).push(__root_dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream::new(__receiver.into_inner()));
__sender
})
}
#[expect(unreachable_code, reason = "todo!() is unreachable")]
fn cluster_ids(
_of_cluster: LocationKey,
) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
todo!("cluster_ids is not yet supported in simulation");
stageleft::q!(todo!())
}
#[expect(unreachable_code, reason = "todo!() is unreachable")]
fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
todo!("cluster_self_id is not yet supported in simulation");
stageleft::q!(todo!())
}
fn cluster_membership_stream(
_env: &mut Self::InstantiateEnv,
_at_location: &LocationId,
location_id: &LocationId,
) -> impl QuotedWithContext<
'a,
Box<dyn futures::Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>,
(),
> {
cluster_membership_stream(location_id)
}
}
pub(super) fn compile_sim(bin: String, trybuild: TrybuildConfig) -> Result<TempPath, ()> {
let mut command = Command::new("cargo");
let is_fuzz = std::env::var("BOLERO_FUZZER").is_ok();
let crate_to_compile = if is_fuzz {
trybuild.project_dir.clone()
} else {
path!(trybuild.project_dir / "dylib-examples")
};
command.current_dir(&crate_to_compile);
command.args(["rustc", "--locked"]);
command.args(["--example", "sim-dylib"]);
command.args(["--target-dir", trybuild.target_dir.to_str().unwrap()]);
if let Some(features) = &trybuild.features {
command.args(["--features", &features.join(",")]);
}
command.args(["--config", "build.incremental = false"]);
command.args(["--crate-type", "cdylib"]);
command.arg("--message-format=json-diagnostic-rendered-ansi");
command.env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
command.env("TRYBUILD_LIB_NAME", &bin);
command.arg("--");
if cfg!(target_os = "linux") {
let debug_path = if let Ok(target) = std::env::var("CARGO_BUILD_TARGET") {
path!(trybuild.target_dir / target / "debug")
} else {
path!(trybuild.target_dir / "debug")
};
command.args([&format!(
"-Clink-arg=-Wl,-rpath,{}",
debug_path.to_str().unwrap()
)]);
if cfg!(target_env = "gnu") {
command.arg(
"-Clink-args=-Wl,-z,nodelete",
);
}
}
if let Ok(fuzzer) = std::env::var("BOLERO_FUZZER") {
command.env_remove("BOLERO_FUZZER");
if fuzzer == "libfuzzer" {
#[cfg(target_os = "macos")]
{
command.args(["-Clink-arg=-undefined", "-Clink-arg=dynamic_lookup"]);
}
#[cfg(target_os = "linux")]
{
command.args(["-Clink-arg=-Wl,--unresolved-symbols=ignore-all"]);
}
}
}
let mut spawned = command
.stdout(Stdio::piped())
.stdin(Stdio::null())
.spawn()
.unwrap();
let reader = std::io::BufReader::new(spawned.stdout.take().unwrap());
let mut out = Err(());
for message in cargo_metadata::Message::parse_stream(reader) {
match message.unwrap() {
cargo_metadata::Message::CompilerArtifact(artifact) => {
let is_output = artifact.target.is_example();
if is_output {
use std::path::PathBuf;
let path = artifact.filenames.first().unwrap();
let path_buf: PathBuf = path.clone().into();
out = Ok(path_buf);
}
}
cargo_metadata::Message::CompilerMessage(mut msg) => {
if let Some(rendered) = msg.message.rendered.as_mut() {
let file_names = msg
.message
.spans
.iter()
.map(|s| &s.file_name)
.collect::<std::collections::BTreeSet<_>>();
for file_name in file_names {
*rendered = rendered.replace(
file_name,
&format!("(full path) {}/{file_name}", trybuild.project_dir.display()),
)
}
}
eprintln!("{}", msg.message);
}
cargo_metadata::Message::TextLine(line) => {
eprintln!("{}", line);
}
cargo_metadata::Message::BuildFinished(_) => {}
cargo_metadata::Message::BuildScriptExecuted(_) => {}
msg => panic!("Unexpected message type: {:?}", msg),
}
}
spawned.wait().unwrap();
let out_file = tempfile::NamedTempFile::new().unwrap().into_temp_path();
fs::copy(out.as_ref().unwrap(), &out_file).unwrap();
Ok(out_file)
}
pub(super) fn create_sim_graph_trybuild(
process_graphs: BTreeMap<LocationId, DfirGraph>,
cluster_graphs: BTreeMap<LocationId, DfirGraph>,
cluster_max_sizes: SparseSecondaryMap<LocationKey, usize>,
process_tick_graphs: BTreeMap<LocationId, DfirGraph>,
cluster_tick_graphs: BTreeMap<LocationId, DfirGraph>,
extra_stmts_global: Vec<syn::Stmt>,
extra_stmts_cluster: BTreeMap<LocationId, Vec<syn::Stmt>>,
) -> (String, TrybuildConfig) {
let source_dir = cargo::manifest_dir().unwrap();
let source_manifest = dependencies::get_manifest(&source_dir).unwrap();
let crate_name = source_manifest.package.name.replace("-", "_");
let is_test = IS_TEST.load(std::sync::atomic::Ordering::Relaxed);
let generated_code = compile_sim_graph_trybuild(
process_graphs,
cluster_graphs,
cluster_max_sizes,
process_tick_graphs,
cluster_tick_graphs,
extra_stmts_global,
extra_stmts_cluster,
&crate_name,
is_test,
);
let inlined_staged = if is_test {
let raw_toml_manifest = toml::from_str::<toml::Value>(
&fs::read_to_string(path!(source_dir / "Cargo.toml")).unwrap(),
)
.unwrap();
let maybe_custom_lib_path = raw_toml_manifest
.get("lib")
.and_then(|lib| lib.get("path"))
.and_then(|path| path.as_str());
let mut gen_staged = stageleft_tool::gen_staged_trybuild(
&maybe_custom_lib_path
.map(|s| path!(source_dir / s))
.unwrap_or_else(|| path!(source_dir / "src" / "lib.rs")),
&path!(source_dir / "Cargo.toml"),
&crate_name,
Some("hydro___test".to_owned()),
);
gen_staged.attrs.insert(
0,
syn::parse_quote! {
#![allow(
unused,
ambiguous_glob_reexports,
clippy::suspicious_else_formatting,
unexpected_cfgs,
reason = "generated code"
)]
},
);
Some(prettyplease::unparse(&gen_staged))
} else {
None
};
let source = prettyplease::unparse(&generated_code);
let hash = format!("{:X}", Sha256::digest(&source))
.chars()
.take(8)
.collect::<String>();
let bin_name = hash;
let (project_dir, target_dir, mut cur_bin_enabled_features) = create_trybuild().unwrap();
let is_fuzz = std::env::var("BOLERO_FUZZER").is_ok();
let examples_dir = if is_fuzz {
path!(project_dir / "examples")
} else {
path!(project_dir / "dylib-examples" / "examples")
};
fs::create_dir_all(path!(project_dir / "src")).unwrap();
fs::create_dir_all(&examples_dir).unwrap();
let out_path = path!(examples_dir / format!("{bin_name}.rs"));
{
let _concurrent_test_lock = CONCURRENT_TEST_LOCK.lock().unwrap();
write_atomic(source.as_ref(), &out_path).unwrap();
}
if let Some(inlined_staged) = inlined_staged {
let staged_path = path!(project_dir / "src" / "__staged.rs");
{
let _concurrent_test_lock = CONCURRENT_TEST_LOCK.lock().unwrap();
write_atomic(inlined_staged.as_bytes(), &staged_path).unwrap();
}
}
if is_test {
if cur_bin_enabled_features.is_none() {
cur_bin_enabled_features = Some(vec![]);
}
cur_bin_enabled_features
.as_mut()
.unwrap()
.push("hydro___test".to_owned());
}
(
bin_name,
TrybuildConfig {
project_dir,
target_dir,
features: cur_bin_enabled_features,
#[cfg(feature = "deploy")]
linking_mode: LinkingMode::Dynamic,
},
)
}
#[expect(clippy::too_many_arguments, reason = "necessary for code generation")]
fn compile_sim_graph_trybuild(
process_graphs: BTreeMap<LocationId, DfirGraph>,
cluster_graphs: BTreeMap<LocationId, DfirGraph>,
cluster_max_sizes: SparseSecondaryMap<LocationKey, usize>,
process_tick_graphs: BTreeMap<LocationId, DfirGraph>,
cluster_tick_graphs: BTreeMap<LocationId, DfirGraph>,
mut extra_stmts_global: Vec<syn::Stmt>,
mut extra_stmts_cluster: BTreeMap<LocationId, Vec<syn::Stmt>>,
crate_name: &str,
is_test: bool,
) -> syn::File {
let mut diagnostics = Diagnostics::new();
let mut dfir_into_code = |g: &DfirGraph| {
let mut dfir_expr: syn::Expr = syn::parse2(
g.as_code_with_options(
"e! { __root_dfir_rs },
true,
false,
quote!(),
&mut diagnostics,
)
.expect("DFIR code generation failed with diagnostics."),
)
.unwrap();
if is_test {
UseTestModeStaged { crate_name }.visit_expr_mut(&mut dfir_expr);
}
dfir_expr
};
let mut dfir_into_code_erased = |g: &DfirGraph| -> syn::Expr {
let inner = dfir_into_code(g);
syn::parse_quote! {
__root_dfir_rs::scheduled::context::Dfir::into_erased(#inner)
}
};
if is_test {
extra_stmts_global.iter_mut().for_each(|stmt| {
UseTestModeStaged { crate_name }.visit_stmt_mut(stmt);
});
extra_stmts_cluster.values_mut().for_each(|stmts| {
stmts.iter_mut().for_each(|stmt| {
UseTestModeStaged { crate_name }.visit_stmt_mut(stmt);
})
});
}
let process_dfir_exprs = process_graphs
.into_iter()
.map(|(lid, g)| {
let dfir_expr = dfir_into_code_erased(&g);
let ser_lid = serde_json::to_string(&lid).unwrap();
syn::parse_quote!((#ser_lid, None, #dfir_expr))
})
.collect::<Vec<syn::Expr>>();
let mut cluster_ticks_grouped_by_root = cluster_tick_graphs.into_iter().fold::<BTreeMap<
LocationId,
Vec<(LocationId, DfirGraph)>,
>, _>(
BTreeMap::new(),
|mut acc, (lid, g)| {
let root = lid.root();
acc.entry(root.clone()).or_default().push((lid, g));
acc
},
);
let root = get_this_crate();
let cluster_dfir_stmts = cluster_graphs
.into_iter()
.map(|(lid, g)| {
let dfir_expr = dfir_into_code_erased(&g);
let tick_dfir_stmts = cluster_ticks_grouped_by_root
.remove(&lid)
.unwrap_or_default()
.into_iter()
.map(|(tick_lid, tick_g)| {
let tick_dfir_expr = dfir_into_code_erased(&tick_g);
let ser_tick_lid = serde_json::to_string(&tick_lid).unwrap();
syn::parse_quote! {
__tick_dfirs.push((
#ser_tick_lid,
Some(__current_cluster_id),
#tick_dfir_expr
));
}
})
.collect::<Vec<syn::Stmt>>();
let ser_lid = serde_json::to_string(&lid).unwrap();
let extra_stmts_per_cluster =
extra_stmts_cluster.get(&lid).cloned().unwrap_or_default();
let max_size = cluster_max_sizes.get(lid.key()).cloned().unwrap() as u32;
let self_id_ident = syn::Ident::new(
&format!("__hydro_lang_cluster_self_id_{}", lid.key()),
Span::call_site(),
);
syn::parse_quote! {
for __current_cluster_id in 0..#max_size {
__async_dfirs.push((
#ser_lid,
Some(__current_cluster_id),
{
#(#extra_stmts_per_cluster)*
let #self_id_ident = &*Box::leak(Box::new(#root::__staged::location::TaglessMemberId::from_raw_id(__current_cluster_id)));
#(#tick_dfir_stmts)*
#dfir_expr
}
));
}
}
})
.collect::<Vec<syn::Stmt>>();
let process_tick_dfir_exprs = process_tick_graphs
.into_iter()
.map(|(lid, g)| {
let dfir_expr = dfir_into_code_erased(&g);
let ser_lid = serde_json::to_string(&lid).unwrap();
syn::parse_quote!((#ser_lid, None, #dfir_expr))
})
.collect::<Vec<syn::Expr>>();
let mut cluster_max_sizes = cluster_max_sizes.into_iter().collect::<Vec<_>>();
cluster_max_sizes.sort();
let cluster_ids_stmts = cluster_max_sizes.into_iter()
.map(|(loc_key, max_size)| {
let ident = syn::Ident::new(
&format!(
"__hydro_lang_cluster_ids_{}",
loc_key,
),
Span::call_site(),
);
let elements = (0..max_size as u32)
.map(|i| syn::parse_quote! { #i })
.collect::<Vec<syn::Expr>>();
syn::parse_quote! {
let #ident: &'static [#root::__staged::location::TaglessMemberId] = Box::leak(Box::new([#(#root::__staged::location::TaglessMemberId::from_raw_id(#elements)),*]));
}
})
.collect::<Vec<syn::Stmt>>();
let orig_crate_name = quote::format_ident!("{}", crate_name);
let trybuild_crate_name_ident = quote::format_ident!("{}_hydro_trybuild", crate_name);
let source_ast: syn::File = syn::parse_quote! {
use #trybuild_crate_name_ident::__root as #orig_crate_name;
use #trybuild_crate_name_ident::__staged::__deps::*;
use #root::prelude::*;
use #root::runtime_support::dfir_rs as __root_dfir_rs;
pub use #trybuild_crate_name_ident::__staged;
#[allow(unused)]
fn __hydro_runtime_core<'a>(
__hydro_external_out: &mut ::std::collections::HashMap<usize, __root_dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream<__root_dfir_rs::bytes::Bytes>>,
__hydro_external_in: &mut ::std::collections::HashMap<usize, __root_dfir_rs::tokio::sync::mpsc::UnboundedSender<__root_dfir_rs::bytes::Bytes>>,
__hydro_cluster_external_out: &mut ::std::collections::HashMap<usize, Vec<__root_dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream<__root_dfir_rs::bytes::Bytes>>>,
__hydro_cluster_external_in: &mut ::std::collections::HashMap<usize, Vec<__root_dfir_rs::tokio::sync::mpsc::UnboundedSender<__root_dfir_rs::bytes::Bytes>>>,
__println_handler: fn(::std::fmt::Arguments<'_>),
__eprintln_handler: fn(::std::fmt::Arguments<'_>),
) -> (
Vec<(&'static str, Option<u32>, __root_dfir_rs::scheduled::context::DfirErased)>,
Vec<(&'static str, Option<u32>, __root_dfir_rs::scheduled::context::DfirErased)>,
#root::sim::runtime::Hooks<&'static str>,
#root::sim::runtime::InlineHooks<&'static str>,
) {
macro_rules! println {
($($arg:tt)*) => ({
__println_handler(::std::format_args!($($arg)*));
})
}
macro_rules! eprintln {
($($arg:tt)*) => ({
__eprintln_handler(::std::format_args!($($arg)*));
})
}
macro_rules! dbg {
() => {
eprintln!("[{}:{}:{}]", ::std::file!(), ::std::line!(), ::std::column!())
};
($val:expr $(,)?) => {
match $val {
tmp => {
eprintln!("[{}:{}:{}] {} = {:#?}",
::std::file!(),
::std::line!(),
::std::column!(),
::std::stringify!($val),
&&tmp as &dyn ::std::fmt::Debug,
);
tmp
}
}
};
($($val:expr),+ $(,)?) => {
($(dbg!($val)),+,)
};
}
let mut __hydro_hooks: ::std::collections::HashMap<(&'static str, Option<u32>), ::std::vec::Vec<Box<dyn #root::sim::runtime::SimHook>>> = ::std::collections::HashMap::new();
let mut __hydro_inline_hooks: ::std::collections::HashMap<(&'static str, Option<u32>), ::std::vec::Vec<Box<dyn #root::sim::runtime::SimInlineHook>>> = ::std::collections::HashMap::new();
#(#extra_stmts_global)*
#(#cluster_ids_stmts)*
let mut __async_dfirs = vec![#(#process_dfir_exprs),*];
let mut __tick_dfirs = vec![#(#process_tick_dfir_exprs),*];
#(#cluster_dfir_stmts)*
(__async_dfirs, __tick_dfirs, __hydro_hooks, __hydro_inline_hooks)
}
#[unsafe(no_mangle)]
unsafe extern "Rust" fn __hydro_runtime(
should_color: bool,
__hydro_external_out: &mut ::std::collections::HashMap<usize, __root_dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream<__root_dfir_rs::bytes::Bytes>>,
__hydro_external_in: &mut ::std::collections::HashMap<usize, __root_dfir_rs::tokio::sync::mpsc::UnboundedSender<__root_dfir_rs::bytes::Bytes>>,
__hydro_cluster_external_out: &mut ::std::collections::HashMap<usize, Vec<__root_dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream<__root_dfir_rs::bytes::Bytes>>>,
__hydro_cluster_external_in: &mut ::std::collections::HashMap<usize, Vec<__root_dfir_rs::tokio::sync::mpsc::UnboundedSender<__root_dfir_rs::bytes::Bytes>>>,
__println_handler: fn(::std::fmt::Arguments<'_>),
__eprintln_handler: fn(::std::fmt::Arguments<'_>),
) -> (
Vec<(&'static str, Option<u32>, __root_dfir_rs::scheduled::context::DfirErased)>,
Vec<(&'static str, Option<u32>, __root_dfir_rs::scheduled::context::DfirErased)>,
#root::sim::runtime::Hooks<&'static str>,
#root::sim::runtime::InlineHooks<&'static str>,
) {
#root::runtime_support::colored::control::set_override(should_color);
__hydro_runtime_core(__hydro_external_out, __hydro_external_in, __hydro_cluster_external_out, __hydro_cluster_external_in, __println_handler, __eprintln_handler)
}
};
source_ast
}