1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
//! Per-EDB `(handle, collection)` declarations for the dataflow scope.
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
use crate::codegen::CodeGen;
use crate::parser::DataType;
use crate::profiler::{Profiler, with_profiler};
impl CodeGen {
/// Generate per-EDB declarations as `(handle, collection)` pairs:
///
/// ```ignore
/// let (h_<rel>, <rel>) = scope.new_collection::<_, Diff>();
/// ```
pub(crate) fn gen_edb_decls(&mut self, profiler: &mut Option<Profiler>) -> Vec<TokenStream> {
let normalize = self.dedup_nonrecursive();
let edbs = self.program.edbs();
if edbs.is_empty() {
return Vec::new();
}
self.features.mark_dd_input();
if self.config.str_intern_enabled()
&& edbs
.iter()
.any(|rel| rel.data_type().contains(&DataType::String))
{
self.features.mark_string_intern();
}
if edbs.iter().any(|rel| {
let dt = rel.data_type();
dt.contains(&DataType::Float32) || dt.contains(&DataType::Float64)
}) {
self.features.mark_ordered_float();
}
// Record enter inputs block if profiler is enabled
with_profiler(profiler, |profiler| {
profiler.update_input_block();
});
edbs.iter()
.map(|rel| {
let handle = format_ident!("h{}", rel.name());
let coll = format_ident!("{}", rel.name());
// Record source file input operator and dedup operator in profiler if enabled
with_profiler(profiler, |profiler| {
profiler.input_edb_operator(rel.name().to_string(), coll.to_string());
profiler.input_dedup_operator(
rel.name().to_string(),
coll.to_string(),
coll.to_string(),
);
});
quote! {
let (#handle, #coll) = scope.new_collection::<_, Diff>();
let #coll = #coll #normalize;
}
})
.collect()
}
/// Generate a *single* mutable handle binding pattern for one or more handles.
///
/// We intentionally shadow the original handles returned from `new_collection(...)`
/// so downstream code can uniformly work with mutable handles:
/// - 0 inputs: emits a harmless binding and returns `()`.
/// - 1 input: `let mut hR = worker.dataflow(...);` and returns `hR`.
/// - N inputs: `let (mut hA, mut hB, ...) = worker.dataflow(...);` and returns `(hA, hB, ...)`.
///
/// In **incremental** mode, we additionally bind/return a `probe` handle as the last element.
pub(crate) fn gen_handle_binding(&self) -> (TokenStream, TokenStream) {
let edb_names = self.program.edb_names();
let hs: Vec<_> = edb_names.iter().map(|n| format_ident!("h{}", n)).collect();
// Incremental mode additionally binds a probe handle as the last element.
if self.config.is_incremental() {
let probe = format_ident!("probe");
match hs.len() {
0 => (quote! { ( #probe, ) }, quote! { #probe }),
1 => {
let h = &hs[0];
(quote! { ( #h, #probe ) }, quote! { ( #h, #probe ) })
}
_ => (
quote! { ( #(#hs),*, #probe ) },
quote! { ( #(#hs),*, #probe ) },
),
}
} else {
match hs.len() {
0 => (quote! { _handles }, quote! { () }),
1 => {
let h = &hs[0];
(quote! { #h }, quote! { #h })
}
_ => (quote! { ( #(#hs),* ) }, quote! { ( #(#hs),* ) }),
}
}
}
}