1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
//! Core logic for rule planning.
//!
//! This module implements the core rule planning algorithm, focusing on joining
//! two positive atoms and applying optimization transformations in a fixed-point loop.
//!
//! Core logic relies on optimizer to give the index of the two atoms to join.
use tracing::trace;
use super::RulePlanner;
use crate::catalog::{
ArithmeticPos, AtomArgumentSignature, AtomSignature, Catalog, JoinPredicates,
};
use crate::planner::PlanError;
use crate::planner::{KeyValueLayout, TransformationInfo};
// =========================================================================
// Core Planning
// =========================================================================
impl RulePlanner {
/// This is the main entry point for the rule planning process. It performs a join
/// between two positive atoms and then applies optimization transformations in a
/// fixed-point loop until no more optimizations can be applied.
pub(crate) fn core(
&mut self,
catalog: &mut Catalog,
join_tuple_index: (usize, usize),
) -> Result<(), PlanError> {
trace!(
"Join:\n LHS atom: ({}, {})\n RHS atom: ({}, {})",
catalog.rule().rhs()[catalog.positive_atom_rhs_id(join_tuple_index.0)],
catalog.positive_atom_rhs_id(join_tuple_index.0),
catalog.rule().rhs()[catalog.positive_atom_rhs_id(join_tuple_index.1)],
catalog.positive_atom_rhs_id(join_tuple_index.1),
);
// Premap EDB atoms to match required key/value layouts
self.apply_join_premaps(catalog, join_tuple_index)?;
// Execute the initial join between the two selected atoms
self.apply_join(catalog, join_tuple_index)?;
trace!("Catalog:\n{}", catalog);
trace!("{}", "-".repeat(60));
// Apply optimization transformations until fixed point
loop {
// 1) Apply semijoin optimizations and comparison pushdown
// These optimizations can create new opportunities for projection
if self.apply_semijoin(catalog)? {
trace!("Catalog:\n{}", catalog);
trace!("{}", "-".repeat(60));
continue;
}
// 2) Remove unused arguments to reduce data volume
// This must come after semijoins as they may eliminate argument usage
if self.remove_unused_arguments(catalog)? {
trace!("Catalog:\n{}", catalog);
trace!("{}", "-".repeat(60));
continue;
}
// Fixed point reached - no more optimizations possible
break;
}
Ok(())
}
/// Premaps EDB atoms to match required key/value layouts.
fn apply_join_premaps(
&mut self,
catalog: &mut Catalog,
join_tuple_index: (usize, usize),
) -> Result<(), PlanError> {
let (lhs_idx, rhs_idx) = join_tuple_index;
for idx in [lhs_idx, rhs_idx] {
if catalog
.original_atom_fingerprints()
.contains(&catalog.positive_atom_fingerprint(idx))
{
self.create_edb_premap_transformations(catalog, idx, true)?;
}
}
Ok(())
}
/// Applies a join transformation between two positive atoms.
fn apply_join(
&mut self,
catalog: &mut Catalog,
join_tuple_index: (usize, usize),
) -> Result<(), PlanError> {
let current_transformation_index = self.transformation_infos.len();
let (lhs_idx, rhs_idx) = join_tuple_index;
// Extract LHS atom information and register as consumer
let lhs_pos_fp = catalog.positive_atom_fingerprint(lhs_idx);
let left_atom_signature = AtomSignature::new(true, lhs_idx);
let left_atom_argument_signatures = catalog.positive_atom_argument_signature(lhs_idx);
self.insert_consumer(
catalog.original_atom_fingerprints(),
lhs_pos_fp,
current_transformation_index,
)?;
// Extract RHS atom information and register as consumer
let rhs_pos_fp = catalog.positive_atom_fingerprint(rhs_idx);
let right_atom_signatures = vec![AtomSignature::new(true, rhs_idx)];
let right_atom_argument_signatures = catalog.positive_atom_argument_signature(rhs_idx);
self.insert_consumer(
catalog.original_atom_fingerprints(),
rhs_pos_fp,
current_transformation_index,
)?;
// Partition arguments into join keys and payload values
let (lhs_keys, lhs_vals, rhs_keys, rhs_vals) = Self::partition_shared_keys(
catalog,
left_atom_argument_signatures,
right_atom_argument_signatures,
);
fn labelled<'a>(
positions: &'a [ArithmeticPos],
catalog: &'a Catalog,
) -> Vec<(&'a ArithmeticPos, &'a String)> {
positions
.iter()
.map(|pos| {
(
pos,
catalog.signature_to_argument_str(pos.init().as_var_signature().unwrap()),
)
})
.collect()
}
trace!("Join keys: {:?}", labelled(&lhs_keys, catalog));
trace!("Join LHS values: {:?}", labelled(&lhs_vals, catalog));
trace!("Join RHS values: {:?}", labelled(&rhs_vals, catalog));
// Construct output argument list: keys + LHS values + RHS values
let new_arguments_list: Vec<AtomArgumentSignature> = lhs_keys
.iter()
.chain(lhs_vals.iter())
.chain(rhs_vals.iter())
.map(|pos| *pos.init().as_var_signature().unwrap())
.collect();
// Create the join transformation with proper key-value layouts
let lhs_name = catalog.positive_atom_name(lhs_idx)?.to_string();
let rhs_name = catalog.positive_atom_name(rhs_idx)?.to_string();
let lhs_key_names = Self::attrs_from_positions(&lhs_keys, catalog);
let new_name = Self::join_name(&lhs_name, &rhs_name, &lhs_key_names);
let tx = TransformationInfo::join_to_kv(
lhs_pos_fp,
lhs_name,
rhs_pos_fp,
rhs_name,
new_name.clone(),
KeyValueLayout::new(lhs_keys.clone(), lhs_vals.clone()),
KeyValueLayout::new(rhs_keys.clone(), rhs_vals.clone()),
KeyValueLayout::new(
lhs_keys,
lhs_vals.iter().chain(rhs_vals.iter()).cloned().collect(),
),
JoinPredicates::default(),
);
let new_fp = tx.output_info_fp();
self.insert_producer(new_fp, current_transformation_index);
trace!("Join transformation:\n{}", tx);
// Store the transformation info
self.transformation_infos.push(tx);
// Update catalog with the new joined atom
catalog.join_modify(
left_atom_signature,
right_atom_signatures,
vec![new_arguments_list],
vec![new_name],
vec![new_fp],
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::super::common::test_setup;
use super::*;
/// `Out(x, z) :- A(x, y), B(y, z).` — shared var `y` is the join key.
/// Core must emit a `JoinToKV` whose output layout has `y` as the sole
/// join key, `x` (from A) and `z` (from B) as values, and both input
/// layouts keyed on `y`. A broken `partition_shared_keys` would route
/// `x` or `z` to the key position (cross product with wrong semantics)
/// or route `y` to values (no join at all, just stapling two streams).
///
/// Signatures are captured before `core()` runs because the pass
/// calls `update_rule` under the hood, rebuilding the catalog's
/// sig→name map around the joined atom. We compare by sig identity,
/// which pins each slot to the exact source-level argument regardless
/// of any post-join name remapping.
#[test]
fn core_join_emits_join_to_kv_with_shared_key_as_join_key() {
let (mut planner, mut catalog) = test_setup(
"\
.decl A(a: int32, b: int32)\n\
.decl B(a: int32, b: int32)\n\
.decl Out(x: int32, z: int32)\n\
.input A(IO=\"file\", filename=\"A.csv\", delimiter=\",\")\n\
.input B(IO=\"file\", filename=\"B.csv\", delimiter=\",\")\n\
.output Out\n\
Out(x, z) :- A(x, y), B(y, z).\n",
);
planner.prepare(&mut catalog).expect("prepare");
// Pin each source var to its pre-core argument signature.
let a_sigs = catalog.positive_atom_argument_signature(0).clone();
let b_sigs = catalog.positive_atom_argument_signature(1).clone();
let x_in_a = a_sigs[0];
let y_in_a = a_sigs[1];
let y_in_b = b_sigs[0];
let z_in_b = b_sigs[1];
planner.core(&mut catalog, (0, 1)).expect("core");
let join = planner
.transformation_infos()
.iter()
.find(|t| matches!(t, TransformationInfo::JoinToKV { .. }))
.expect("JoinToKV transformation missing");
let sig_of = |pos: &crate::catalog::ArithmeticPos| {
*pos.init().as_var_signature().expect("var signature")
};
let out = join.output_kv_layout();
assert_eq!(out.key().len(), 1, "exactly one join key");
assert_eq!(
sig_of(&out.key()[0]),
y_in_a,
"join key must be `y` from LHS (A's arg 1)"
);
assert_eq!(out.value().len(), 2, "two payload values");
assert_eq!(
sig_of(&out.value()[0]),
x_in_a,
"first output value must be `x` from LHS"
);
assert_eq!(
sig_of(&out.value()[1]),
z_in_b,
"second output value must be `z` from RHS"
);
// Both input layouts must also be keyed on `y` — if either side
// was keyed on its own local var, the join degenerates.
let (left, right) = join.input_kv_layout();
let right = right.expect("JoinToKV has a right input layout");
assert_eq!(sig_of(&left.key()[0]), y_in_a, "LHS input keyed on `y`");
assert_eq!(sig_of(&right.key()[0]), y_in_b, "RHS input keyed on `y`");
assert_eq!(sig_of(&left.value()[0]), x_in_a, "LHS payload is `x`");
assert_eq!(sig_of(&right.value()[0]), z_in_b, "RHS payload is `z`");
// Post-state: core must leave the catalog reduced to one atom
// (the join result) and flagged planned. A planner that emitted
// the right JoinToKV but forgot to call `catalog.join_modify`
// would fail here but pass the structural checks above.
assert_eq!(
catalog.positive_atom_number(),
1,
"two atoms must collapse into one after the join"
);
assert!(
catalog.is_planned(),
"catalog should be flagged planned after a complete 2-atom join"
);
}
}