use std::io;
use std::path::Path;
use proc_macro2::TokenStream;
use quote::quote;
use crate::common::pretty_print;
use crate::build::engine::{gen_lib_engine, gen_lib_incremental_engine};
use crate::build::imports::gen_lib_imports;
use crate::build::pipeline::Pipeline;
use crate::build::relation::user::gen_public_rel_module;
use crate::build::results::{gen_batch_results, gen_incremental_results};
use crate::codegen::Features;
pub(crate) fn assemble(pipeline: &Pipeline, out_dir: &Path) -> io::Result<String> {
let config = &pipeline.config;
let string_intern = pipeline.features.string_intern();
let semiring_mod = gen_semiring_mod(pipeline, out_dir);
let lib_imports = gen_lib_imports(
&pipeline.relations,
&pipeline.features,
config.profiling_enabled(),
);
let type_declarations = &pipeline.parts.type_declarations;
let profile_structs = &pipeline.parts.profile_structs;
let profile_ops = &pipeline.parts.profile_ops;
let rel_module = gen_public_rel_module(&pipeline.program);
let (results_struct, lib_engine) = if config.is_incremental() {
(
gen_incremental_results(&pipeline.program),
gen_lib_incremental_engine(&pipeline.program, string_intern, &pipeline.parts),
)
} else {
(
gen_batch_results(&pipeline.program),
gen_lib_engine(&pipeline.program, string_intern, &pipeline.parts),
)
};
let udf_mod = gen_udf_mod(&pipeline.features, config.udf_file().map(Path::new))?;
Ok(pretty_print(quote! {
pub use __flowlog_gen::*;
#[allow(
dead_code,
unused_imports,
unused_variables,
unused_mut,
non_camel_case_types,
non_snake_case,
clippy::all,
)]
mod __flowlog_gen {
use ::flowlog_runtime::differential_dataflow;
use ::flowlog_runtime::timely;
use ::flowlog_runtime::serde;
use ::flowlog_runtime::ordered_float;
#semiring_mod
#lib_imports
#type_declarations
#profile_structs
#profile_ops
#rel_module
#results_struct
#udf_mod
#lib_engine
}
}))
}
fn gen_udf_mod(features: &Features, udf_file: Option<&Path>) -> io::Result<TokenStream> {
if !features.udf() {
return Ok(quote! {});
}
let path = udf_file.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"program uses `.extern fn` but no UDF file was configured — \
call `Builder::udf_file(..)` with the path to your UDF impls",
)
})?;
let abs = path.canonicalize().map_err(|e| {
io::Error::new(
e.kind(),
format!("failed to resolve UDF file '{}': {e}", path.display()),
)
})?;
let path_lit = abs.to_string_lossy().into_owned();
Ok(quote! {
#[path = #path_lit]
mod udf;
})
}
fn gen_semiring_mod(pipeline: &Pipeline, out_dir: &Path) -> TokenStream {
if pipeline.parts.semiring_modules.is_empty() {
return quote! {};
}
let mod_path = out_dir.join("semiring").join("mod.rs");
let mod_path_str = mod_path.to_string_lossy().into_owned();
quote! {
#[path = #mod_path_str]
mod semiring;
}
}