use proc_macro2::TokenStream;
use quote::quote;
use crate::codegen::CodeGen;
use crate::profiler::Profiler;
impl CodeGen {
fn profile_log_dir(&self) -> String {
format!("{}_log", self.config.program_name())
}
pub(crate) fn gen_time_profile_struct(&self) -> TokenStream {
if !self.config.profiling_enabled() {
return quote! {};
}
quote! {
#[derive(Clone, Debug, Default)]
struct OpStats {
name: String,
addr: String,
total_active: Duration,
activations: u64,
current_start: Option<Duration>,
}
}
}
pub(crate) fn gen_time_profile_init(&self) -> TokenStream {
if !self.config.profiling_enabled() {
return quote! {};
}
let log_dir = self.profile_log_dir();
let ops_path = format!("{log_dir}/ops.json");
quote! {
let op_stats: Rc<RefCell<HashMap<usize, OpStats>>> =
Rc::new(RefCell::new(HashMap::new()));
let op_stats_in_log = Rc::clone(&op_stats);
if worker.index() == 0 {
let _ = std::fs::create_dir_all(#log_dir);
let _ = std::fs::write(#ops_path, __FLOWLOG_OPS_JSON);
}
worker
.log_register()
.expect("failed to get log_register")
.insert::<TimelyEventBuilder, _>("timely", move |_batch_time, data| {
let Some(data) = data else {
return;
};
for (ts, event) in data.iter() {
match event {
TimelyEvent::Operates(op) => {
let mut map = op_stats_in_log.borrow_mut();
let entry = map.entry(op.id).or_default();
entry.name = op.name.to_string();
entry.addr = format!("{:?}", op.addr);
}
TimelyEvent::Schedule(sched) => {
let mut map = op_stats_in_log.borrow_mut();
let entry = map.entry(sched.id).or_default();
match sched.start_stop {
StartStop::Start => {
entry.current_start = Some(*ts);
}
StartStop::Stop => {
if let Some(st) = entry.current_start.take() {
let delta = ts
.checked_sub(st)
.unwrap_or(Duration::ZERO);
entry.total_active += delta;
entry.activations += 1;
}
}
}
}
_ => {}
}
}
});
}
}
pub(crate) fn gen_time_profile_write_batch(&self) -> TokenStream {
if !self.config.profiling_enabled() {
return quote! {};
}
let dir = format!("{}/time", self.profile_log_dir());
let path_fmt = format!("{dir}/time_worker_t0_{{}}.log");
gen_time_profile_write_core(&dir, quote! { format!(#path_fmt, index) })
}
pub(crate) fn gen_time_profile_write_incremental(&self) -> TokenStream {
if !self.config.profiling_enabled() {
return quote! {};
}
let dir = format!("{}/time", self.profile_log_dir());
let path_fmt = format!("{dir}/time_worker_t{{}}_{{}}.log");
let write =
gen_time_profile_write_core(&dir, quote! { format!(#path_fmt, time_stamp - 1, index) });
quote! {
{ #write }
for (_id, st) in op_stats.borrow_mut().iter_mut() {
st.total_active = Duration::ZERO;
st.activations = 0;
st.current_start = None;
}
}
}
pub(crate) fn gen_memory_profile_struct(&self) -> TokenStream {
if !self.config.profiling_enabled() {
return quote! {};
}
quote! {
#[derive(Clone, Debug, Default)]
struct DdArrangeStats {
batch_count: u64,
batch_total_len: usize,
merge_completes: u64,
merge_input_total: usize,
merge_output_total: usize,
drop_count: u64,
drop_total_len: usize,
batcher_size: isize,
batcher_capacity: isize,
}
}
}
pub(crate) fn gen_memory_profile_init(&self) -> TokenStream {
if !self.config.profiling_enabled() {
return quote! {};
}
quote! {
let dd_stats: Rc<RefCell<HashMap<usize, DdArrangeStats>>> =
Rc::new(RefCell::new(HashMap::new()));
let dd_stats_in_log = Rc::clone(&dd_stats);
worker
.log_register()
.expect("failed to get log_register")
.insert::<DifferentialEventBuilder, _>(
"differential/arrange",
move |_batch_time, data| {
let Some(data) = data else { return; };
for (_ts, event) in data.iter() {
match event {
DifferentialEvent::Batch(b) => {
let mut map = dd_stats_in_log.borrow_mut();
let e = map.entry(b.operator).or_default();
e.batch_count += 1;
e.batch_total_len += b.length;
}
DifferentialEvent::Merge(m) => {
if let Some(complete_len) = m.complete {
let mut map = dd_stats_in_log.borrow_mut();
let e = map.entry(m.operator).or_default();
e.merge_completes += 1;
e.merge_input_total += m.length1 + m.length2;
e.merge_output_total += complete_len;
}
}
DifferentialEvent::Drop(d) => {
let mut map = dd_stats_in_log.borrow_mut();
let e = map.entry(d.operator).or_default();
e.drop_count += 1;
e.drop_total_len += d.length;
}
DifferentialEvent::Batcher(b) => {
let mut map = dd_stats_in_log.borrow_mut();
let e = map.entry(b.operator).or_default();
e.batcher_size += b.size_diff;
e.batcher_capacity += b.capacity_diff;
}
_ => {} }
}
},
);
}
}
pub(crate) fn gen_memory_profile_write_batch(&self) -> TokenStream {
if !self.config.profiling_enabled() {
return quote! {};
}
let dir = format!("{}/memory", self.profile_log_dir());
let path_fmt = format!("{dir}/memory_worker_t0_{{}}.log");
gen_memory_profile_write_core(&dir, quote! { format!(#path_fmt, index) })
}
pub(crate) fn gen_memory_profile_write_incremental(&self) -> TokenStream {
if !self.config.profiling_enabled() {
return quote! {};
}
let dir = format!("{}/memory", self.profile_log_dir());
let path_fmt = format!("{dir}/memory_worker_t{{}}_{{}}.log");
let write = gen_memory_profile_write_core(
&dir,
quote! { format!(#path_fmt, time_stamp - 1, index) },
);
quote! {
#write
for (_id, st) in dd_stats.borrow_mut().iter_mut() {
*st = DdArrangeStats::default();
}
}
}
}
pub(crate) fn render_profile_ops_const(profiler: Option<&Profiler>) -> TokenStream {
let Some(profiler) = profiler else {
return quote! {};
};
let json = profiler.to_json_string();
quote! {
const __FLOWLOG_OPS_JSON: &str = #json;
}
}
fn gen_time_profile_write_core(dir: &str, file_path_expr: TokenStream) -> TokenStream {
let create_msg = format!("failed to create {dir} directory");
quote! {
let map = op_stats.borrow();
let mut rows: Vec<(usize, OpStats)> =
map.iter().map(|(id, st)| (*id, st.clone())).collect();
rows.sort_by_key(|(id, _st)| *id);
std::fs::create_dir_all(#dir).expect(#create_msg);
let stats_file = File::create(#file_path_expr)
.expect("failed to create operator stats log file");
let mut stats_writer = BufWriter::new(stats_file);
writeln!(
stats_writer,
"{:<20} {:<12} {:<16} {}",
"addr", "activations", "total_active_ms", "name"
)
.ok();
for (_id, st) in rows {
let total_ms = st.total_active.as_secs_f64() * 1000.0;
writeln!(
stats_writer,
"{:<20} {:<12} {:<16.3} {}",
st.addr, st.activations, total_ms, st.name
)
.ok();
}
stats_writer.flush().ok();
}
}
fn gen_memory_profile_write_core(dir: &str, file_path_expr: TokenStream) -> TokenStream {
let create_msg = format!("failed to create {dir} directory");
quote! {
{
let op_map = op_stats.borrow();
let dd_map = dd_stats.borrow();
let mut rows: Vec<(Vec<usize>, String, String, DdArrangeStats)> = dd_map
.iter()
.map(|(id, st)| {
let (addr, name) = op_map
.get(id)
.map(|o| (o.addr.clone(), o.name.clone()))
.unwrap_or_else(|| (
format!("[id={}]", id),
"<unknown>".to_string(),
));
let nums: Vec<usize> = addr
.trim_matches(|c| c == '[' || c == ']')
.split(',')
.filter_map(|s| s.trim().parse().ok())
.collect();
(nums, addr, name, st.clone())
})
.collect();
rows.sort_by(|a, b| a.0.cmp(&b.0));
std::fs::create_dir_all(#dir).expect(#create_msg);
let dd_file = File::create(#file_path_expr)
.expect("failed to create DD arrange stats log file");
let mut w = BufWriter::new(dd_file);
writeln!(
w,
"{:<20} {:<14} {:<10} {:<14} {:<14} {:<14} {}",
"addr", "batched_in", "merges", "merge_in", "merge_out", "dropped", "name"
).ok();
for (_nums, addr, name, st) in &rows {
writeln!(
w,
"{:<20} {:<14} {:<10} {:<14} {:<14} {:<14} {}",
addr,
st.batch_total_len,
st.merge_completes,
st.merge_input_total,
st.merge_output_total,
st.drop_total_len,
name
).ok();
}
if rows.is_empty() {
writeln!(w, "(no differential arrangement events recorded)").ok();
}
w.flush().ok();
}
}
}