use proc_macro2::{Ident, Span, TokenStream};
use quote::quote;
use syn::Index;
use crate::codegen::CodeGen;
use crate::codegen::ty::tuple_type;
use crate::parser::{DataType, Relation};
use crate::profiler::{Profiler, with_profiler};
#[derive(Default)]
pub(crate) struct InspectorCodegen {
pub buf_declarations: Vec<TokenStream>, pub buf_clones: Vec<TokenStream>, pub local_decls: Vec<TokenStream>, pub inspect_stmts: Vec<TokenStream>, pub flush_stmts: Vec<TokenStream>, pub size_cell_decls: Vec<TokenStream>, pub size_cell_clones: Vec<TokenStream>, }
impl CodeGen {
pub(crate) fn collect_inspectors(
&mut self,
profiler: &mut Option<Profiler>,
) -> InspectorCodegen {
let mut cg = InspectorCodegen::default();
with_profiler(profiler, |p| p.update_inspect_block());
for idb in self.program.idbs() {
let var = self.find_global_ident(idb.fingerprint());
let name = idb.name();
let data_type = idb.data_type();
if idb.printsize() {
self.features.mark_as_collection();
self.features.mark_timely_map();
let cell_ident = Ident::new(&format!("size_{}", name), Span::call_site());
cg.size_cell_decls.push(quote! {
let #cell_ident: std::sync::Arc<std::sync::Mutex<(Ts, i32)>> =
std::sync::Arc::new(std::sync::Mutex::new(<(Ts, i32)>::default()));
});
cg.size_cell_clones.push(quote! {
let #cell_ident = #cell_ident.clone();
});
cg.inspect_stmts
.push(self.gen_size_inspector(&var, name, &cell_ident, profiler));
}
if data_type
.iter()
.any(|dt| matches!(dt, DataType::Float32 | DataType::Float64))
{
self.features.mark_ordered_float();
}
if idb.output() {
if data_type.contains(&DataType::String) {
self.features.mark_string_resolve();
}
self.features.mark_output_buffers();
if self.config.output_to_stdout() {
with_profiler(profiler, |p| {
p.inspect_content_terminal_operator(name.to_string(), name.to_string());
});
} else {
with_profiler(profiler, |p| {
p.inspect_content_file_operator(name.to_string(), name.to_string());
});
}
let (buf_decl, buf_clone, buf_ident) = self.gen_buf_declaration(name, idb);
cg.buf_declarations.push(buf_decl);
cg.buf_clones.push(buf_clone);
let (local_decl, inspect, flush) =
self.gen_write_inspector_mem(&var, &buf_ident, idb);
cg.local_decls.push(local_decl);
cg.inspect_stmts.push(inspect);
cg.flush_stmts.push(flush);
}
}
cg
}
}
impl CodeGen {
fn gen_size_inspector(
&self,
var: &Ident,
name: &str,
cell_ident: &Ident,
profiler: &mut Option<Profiler>,
) -> TokenStream {
let maybe_probe = if self.config.is_incremental() {
quote! { .probe_with(&mut probe) }
} else {
quote! {}
};
with_profiler(profiler, |p| {
p.inspect_size_operator(name.to_string(), name.to_string());
});
let dedup = if self.config.is_datalog_batch() {
quote! {
.consolidate()
.inner
.flat_map(move |(_, t, _)| std::iter::once(((), t.clone(), 1_i32)))
}
} else {
quote! {
.threshold(|_, w| if *w > 0 { 1i32 } else { 0 })
.inner
.flat_map(move |(_, t, d)| std::iter::once(((), t.clone(), d)))
}
};
quote! {{
let #cell_ident = #cell_ident.clone();
#var.clone()
#dedup
.as_collection()
.map(|_| ())
.consolidate()
.inspect(move |(_data, time, size)| {
*#cell_ident.lock().unwrap() = (time.clone(), *size);
})
#maybe_probe;
}}
}
}
impl CodeGen {
fn gen_buf_declaration(&self, name: &str, idb: &Relation) -> (TokenStream, TokenStream, Ident) {
let buf_ident = Ident::new(&format!("buf_{}", name), Span::call_site());
let inner_ty = tuple_type(idb, self.features.string_intern());
let declaration = quote! {
let #buf_ident: Arc<Mutex<Vec<Vec<#inner_ty>>>> =
Arc::new(Mutex::new(Vec::new()));
};
let clone_stmt = quote! {
let #buf_ident = #buf_ident.clone();
};
(declaration, clone_stmt, buf_ident)
}
fn gen_write_inspector_mem(
&self,
var: &Ident,
buf_ident: &Ident,
idb: &Relation,
) -> (TokenStream, TokenStream, TokenStream) {
let (maybe_consolidate, maybe_probe) = if self.config.is_incremental() {
(
quote! { .consolidate() },
quote! { .probe_with(&mut probe) },
)
} else {
(quote! {}, quote! {})
};
let local_ident = Ident::new(&format!("local_{}", idb.name()), Span::call_site());
let (data_pat, data_expr) = if idb.arity() == 0 {
(quote! { _data }, quote! { () })
} else {
(quote! { data }, quote! { data.clone() })
};
let (diff_pat, diff_expr) = if self.config.is_batch() {
(quote! { _diff }, quote! { 1_i32 })
} else {
(quote! { diff }, quote! { *diff })
};
let inspect_pattern = quote! { (#data_pat, time, #diff_pat) };
let push_stmt = quote! {
#local_ident
.borrow_mut()
.push((#data_expr, time.clone(), #diff_expr));
};
let inner_ty = tuple_type(idb, self.features.string_intern());
let local_decl = quote! {
let #local_ident: Rc<RefCell<Vec<#inner_ty>>> =
Rc::new(RefCell::new(Vec::new()));
};
let inspect_stmt = quote! {{
let #local_ident = #local_ident.clone();
#var
#maybe_consolidate
.inspect(move |#inspect_pattern| {
#push_stmt
})
#maybe_probe;
}};
let flush_stmt = quote! {
#buf_ident.lock().unwrap().push(std::mem::take(&mut *#local_ident.borrow_mut()));
};
(local_decl, inspect_stmt, flush_stmt)
}
}
pub fn gen_drain_block(
buf_ident: &Ident,
idb: &Relation,
sink_preamble: TokenStream,
write_row: TokenStream,
string_intern: bool,
) -> TokenStream {
let order_by = idb.output_order_by();
let limit = idb.output_limit();
let elem_ty = tuple_type(idb, string_intern);
match (order_by.as_ref(), limit) {
(None, _) => quote! {{
#sink_preamble
for worker_buf in #buf_ident.lock().unwrap().drain(..) {
for row in &worker_buf {
#write_row
}
}
}},
(Some(spec), None) => {
let cmp_body_sort = order_comparators(spec, string_intern);
let cmp_body_merge = cmp_body_sort.clone();
quote! {{
let mut per_worker: Vec<Vec<#elem_ty>> =
std::mem::take(&mut *#buf_ident.lock().unwrap());
for buf in per_worker.iter_mut() {
buf.sort_by(|a: &#elem_ty, b: &#elem_ty| {
#(#cmp_body_sort)*
std::cmp::Ordering::Equal
});
}
#sink_preamble
::flowlog_runtime::sort::k_way_merge(
per_worker,
|a: &#elem_ty, b: &#elem_ty| {
#(#cmp_body_merge)*
std::cmp::Ordering::Equal
},
|val| {
let row = &val;
#write_row
},
);
}}
}
(Some(spec), Some(n)) => {
let cmp_body = order_comparators(spec, string_intern);
quote! {{
let all: Vec<#elem_ty> = #buf_ident.lock().unwrap()
.drain(..).flatten().collect();
let all = ::flowlog_runtime::sort::topk(all, #n, |a: &#elem_ty, b: &#elem_ty| {
#(#cmp_body)*
std::cmp::Ordering::Equal
});
#sink_preamble
for row in &all {
#write_row
}
}}
}
}
}
pub fn field_accessor(
col_idx: usize,
data_type: &DataType,
base: TokenStream,
string_intern: bool,
) -> TokenStream {
let idx = Index::from(col_idx);
let inner = quote! { #base.0.#idx };
if matches!(data_type, DataType::String) && string_intern {
quote! { resolve(#inner) }
} else {
inner
}
}
pub(crate) fn order_comparators(
spec: &[(usize, DataType, bool)],
string_intern: bool,
) -> Vec<TokenStream> {
spec.iter()
.map(|(col_idx, data_type, ascending)| {
let a_expr = field_accessor(*col_idx, data_type, quote! { a }, string_intern);
let b_expr = field_accessor(*col_idx, data_type, quote! { b }, string_intern);
let cmp_expr = if *ascending {
quote! { #a_expr.cmp(&#b_expr) }
} else {
quote! { #b_expr.cmp(&#a_expr) }
};
quote! {
let cmp = #cmp_expr;
if cmp != std::cmp::Ordering::Equal { return cmp; }
}
})
.collect()
}