1use super::ir::{HypergraphRule, VertexId};
20use xlog_core::RelId;
21use xlog_ir::rir::HelperSplitSpec;
22use xlog_stats::StatsSnapshot;
23
24const DEFAULT_BURIED_SKEW_THRESHOLD: f64 = 3.0;
25
26pub trait VariableOrder {
37 fn name(&self) -> &'static str;
40
41 fn order(&self, hg: &HypergraphRule) -> Vec<VertexId>;
43}
44
45#[derive(Debug, Clone, Copy, Default)]
53pub struct AppearanceOrder;
54
55impl VariableOrder for AppearanceOrder {
56 fn name(&self) -> &'static str {
57 "appearance"
58 }
59
60 fn order(&self, hg: &HypergraphRule) -> Vec<VertexId> {
61 hg.vertex_ids().collect()
62 }
63}
64
65pub trait StatsSource {
72 fn relation_cardinality(&self, rel_id: RelId) -> Option<u64>;
74
75 fn column_ndv(&self, rel_id: RelId, col_idx: usize) -> Option<u64>;
77
78 fn join_selectivity(
80 &self,
81 left_rel: RelId,
82 right_rel: RelId,
83 left_col: usize,
84 right_col: usize,
85 ) -> Option<f64>;
86
87 fn prefix_degree(&self, rel_id: RelId, col_idx: usize) -> Option<(f64, f64)>;
89
90 fn key_heat(&self, rel_id: RelId, col_idx: usize) -> Option<(f64, f64)>;
92}
93
94impl StatsSource for StatsSnapshot {
95 fn relation_cardinality(&self, rel_id: RelId) -> Option<u64> {
96 let card = self
97 .relations
98 .iter()
99 .find(|rel| rel.rel_id == rel_id)?
100 .cardinality;
101 (card > 0).then_some(card)
102 }
103
104 fn column_ndv(&self, rel_id: RelId, col_idx: usize) -> Option<u64> {
105 let ndv = self
106 .relations
107 .iter()
108 .find(|rel| rel.rel_id == rel_id)?
109 .get_column(col_idx)?
110 .distinct_estimate;
111 (ndv > 0).then_some(ndv)
112 }
113
114 fn join_selectivity(
115 &self,
116 left_rel: RelId,
117 right_rel: RelId,
118 left_col: usize,
119 right_col: usize,
120 ) -> Option<f64> {
121 self.join_selectivities.iter().find_map(|sel| {
122 let direct = sel.left_rel == left_rel
123 && sel.right_rel == right_rel
124 && sel.left_keys.as_slice() == [left_col]
125 && sel.right_keys.as_slice() == [right_col];
126 let swapped = sel.left_rel == right_rel
127 && sel.right_rel == left_rel
128 && sel.left_keys.as_slice() == [right_col]
129 && sel.right_keys.as_slice() == [left_col];
130 (direct || swapped).then_some(sel.selectivity)
131 })
132 }
133
134 fn prefix_degree(&self, rel_id: RelId, col_idx: usize) -> Option<(f64, f64)> {
135 let degree = self
136 .relations
137 .iter()
138 .find(|rel| rel.rel_id == rel_id)?
139 .get_prefix_degree(col_idx)?;
140 Some((degree.avg_degree, degree.max_degree))
141 }
142
143 fn key_heat(&self, rel_id: RelId, col_idx: usize) -> Option<(f64, f64)> {
144 let heat = self
145 .relations
146 .iter()
147 .find(|rel| rel.rel_id == rel_id)?
148 .get_key_heat(col_idx)?;
149 Some((heat.heat, heat.skew_factor))
150 }
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct KCliqueEdge {
156 pub rel_id: RelId,
158 pub left: VertexId,
160 pub right: VertexId,
162 pub left_col: usize,
164 pub right_col: usize,
166}
167
168impl KCliqueEdge {
169 pub fn touches(&self, other: &KCliqueEdge) -> bool {
171 self.left == other.left
172 || self.left == other.right
173 || self.right == other.left
174 || self.right == other.right
175 }
176
177 fn endpoint_col(&self, vertex: VertexId) -> Option<usize> {
178 if self.left == vertex {
179 Some(self.left_col)
180 } else if self.right == vertex {
181 Some(self.right_col)
182 } else {
183 None
184 }
185 }
186
187 fn other_endpoint(&self, vertex: VertexId) -> Option<VertexId> {
188 if self.left == vertex {
189 Some(self.right)
190 } else if self.right == vertex {
191 Some(self.left)
192 } else {
193 None
194 }
195 }
196}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
200pub struct KCliqueShape {
201 variable_count: u8,
202 edges: Vec<KCliqueEdge>,
203}
204
205impl KCliqueShape {
206 pub fn complete(variable_count: u8, first_rel_id: RelId) -> Option<Self> {
208 valid_variable_count(variable_count)?;
209 let mut edges = Vec::new();
210 let mut next_rel = first_rel_id.0;
211
212 for left in 0..variable_count {
213 for right in (left + 1)..variable_count {
214 edges.push(KCliqueEdge {
215 rel_id: RelId(next_rel),
216 left: VertexId(usize::from(left)),
217 right: VertexId(usize::from(right)),
218 left_col: 0,
219 right_col: 1,
220 });
221 next_rel = next_rel.checked_add(1)?;
222 }
223 }
224
225 Some(Self {
226 variable_count,
227 edges,
228 })
229 }
230
231 pub fn from_edges(variable_count: u8, edges: Vec<KCliqueEdge>) -> Option<Self> {
237 valid_variable_count(variable_count)?;
238 (!edges.is_empty()).then_some(())?;
239 Some(Self {
240 variable_count,
241 edges,
242 })
243 }
244
245 pub fn cycle4(first_rel_id: RelId) -> Option<Self> {
247 let variable_count = 4;
248 valid_variable_count(variable_count)?;
249 let endpoints = [(0, 1), (1, 2), (2, 3), (3, 0)];
250 let mut edges = Vec::new();
251
252 for (idx, (left, right)) in endpoints.iter().enumerate() {
253 edges.push(KCliqueEdge {
254 rel_id: RelId(first_rel_id.0.checked_add(idx as u32)?),
255 left: VertexId(*left),
256 right: VertexId(*right),
257 left_col: 0,
258 right_col: 1,
259 });
260 }
261
262 Some(Self {
263 variable_count,
264 edges,
265 })
266 }
267
268 pub fn variable_count(&self) -> u8 {
270 self.variable_count
271 }
272
273 pub fn edges(&self) -> &[KCliqueEdge] {
275 &self.edges
276 }
277
278 fn variables(&self) -> impl Iterator<Item = VertexId> + '_ {
279 (0..self.variable_count).map(|idx| VertexId(usize::from(idx)))
280 }
281}
282
283#[derive(Debug, Clone, PartialEq)]
285pub struct FullVariableOrder {
286 pub variable_order: Vec<VertexId>,
288 pub edge_permutation: Vec<usize>,
290 pub variable_share_allocation: Vec<VariableShare>,
292 pub cost_prediction: CostPredictionRecord,
294 pub predicted_winner: PredictedWinner,
296 pub helper_split_specs: Vec<HelperSplitSpec>,
298}
299
300#[derive(Debug, Clone, PartialEq)]
302pub struct VariableShare {
303 pub variable: VertexId,
305 pub share: f64,
307}
308
309#[derive(Debug, Clone, PartialEq)]
311pub struct CostPredictionRecord {
312 pub wcoj_cost: f64,
314 pub hash_cost: f64,
316}
317
318#[derive(Debug, Clone, Copy, PartialEq, Eq)]
320pub enum PredictedWinner {
321 WcojPath,
323 HashPath,
325}
326
327pub fn plan_kclique_var_order<S: StatsSource>(
335 shape: &KCliqueShape,
336 stats: &S,
337) -> Option<FullVariableOrder> {
338 if shape.edges.is_empty() {
339 return None;
340 }
341 valid_variable_count(shape.variable_count)?;
342 require_complete_stats(shape, stats)?;
343
344 let root_scores = root_scores(shape, stats)?;
345 let variable_order = greedy_variable_order(shape, &root_scores);
346 let edge_permutation = edge_permutation(shape, &variable_order);
347 let variable_share_allocation = share_allocation(&root_scores);
348 let cost_prediction = cost_prediction(shape, stats, &root_scores)?;
349 let predicted_winner = if cost_prediction.wcoj_cost <= cost_prediction.hash_cost {
350 PredictedWinner::WcojPath
351 } else {
352 PredictedWinner::HashPath
353 };
354 let helper_split_specs = helper_split_specs_for_buried_skew(shape, stats, &variable_order)?;
355
356 Some(FullVariableOrder {
357 variable_order,
358 edge_permutation,
359 variable_share_allocation,
360 cost_prediction,
361 predicted_winner,
362 helper_split_specs,
363 })
364}
365
366fn valid_variable_count(variable_count: u8) -> Option<()> {
367 (2..=8).contains(&variable_count).then_some(())
368}
369
370fn require_complete_stats<S: StatsSource>(shape: &KCliqueShape, stats: &S) -> Option<()> {
371 for edge in shape.edges() {
372 stats.relation_cardinality(edge.rel_id)?;
373 checked_ndv(stats, edge.rel_id, edge.left_col)?;
374 checked_ndv(stats, edge.rel_id, edge.right_col)?;
375 checked_prefix(stats, edge.rel_id, edge.left_col)?;
376 checked_prefix(stats, edge.rel_id, edge.right_col)?;
377 checked_heat(stats, edge.rel_id, edge.left_col)?;
378 checked_heat(stats, edge.rel_id, edge.right_col)?;
379 }
380
381 for (left_idx, left_edge) in shape.edges().iter().enumerate() {
382 for right_edge in shape.edges().iter().skip(left_idx + 1) {
383 if left_edge.touches(right_edge) {
384 checked_selectivity(
385 stats,
386 left_edge.rel_id,
387 right_edge.rel_id,
388 left_edge.left_col,
389 right_edge.left_col,
390 )?;
391 }
392 }
393 }
394
395 Some(())
396}
397
398fn checked_ndv<S: StatsSource>(stats: &S, rel_id: RelId, col_idx: usize) -> Option<u64> {
399 let ndv = stats.column_ndv(rel_id, col_idx)?;
400 (ndv > 0).then_some(ndv)
401}
402
403fn checked_prefix<S: StatsSource>(stats: &S, rel_id: RelId, col_idx: usize) -> Option<(f64, f64)> {
404 let (avg, max) = stats.prefix_degree(rel_id, col_idx)?;
405 (avg.is_finite() && max.is_finite() && avg > 0.0 && max >= avg).then_some((avg, max))
406}
407
408fn checked_heat<S: StatsSource>(stats: &S, rel_id: RelId, col_idx: usize) -> Option<(f64, f64)> {
409 let (heat, skew) = stats.key_heat(rel_id, col_idx)?;
410 (heat.is_finite() && skew.is_finite() && heat >= 0.0 && skew >= 0.0).then_some((heat, skew))
411}
412
413fn checked_selectivity<S: StatsSource>(
414 stats: &S,
415 left_rel: RelId,
416 right_rel: RelId,
417 left_col: usize,
418 right_col: usize,
419) -> Option<f64> {
420 let selectivity = stats.join_selectivity(left_rel, right_rel, left_col, right_col)?;
421 (selectivity.is_finite() && (0.0..=1.0).contains(&selectivity)).then_some(selectivity)
422}
423
424fn root_scores<S: StatsSource>(shape: &KCliqueShape, stats: &S) -> Option<Vec<(VertexId, f64)>> {
425 let mut scores = Vec::new();
426
427 for variable in shape.variables() {
428 let mut score = 0.0;
429 for edge in shape.edges() {
430 let Some(col_idx) = edge.endpoint_col(variable) else {
431 continue;
432 };
433 let card = stats.relation_cardinality(edge.rel_id)? as f64;
434 let ndv = checked_ndv(stats, edge.rel_id, col_idx)? as f64;
435 let (avg_degree, max_degree) = checked_prefix(stats, edge.rel_id, col_idx)?;
436 let (heat, skew) = checked_heat(stats, edge.rel_id, col_idx)?;
437 let prefix_skew = (max_degree / avg_degree).max(1.0);
438 let density = card / ndv;
439 score += density
440 * avg_degree.powi(2)
441 * (1.0 + heat).powi(2)
442 * (1.0 + 0.25 * skew)
443 * prefix_skew.sqrt();
444 }
445 scores.push((variable, score.max(f64::EPSILON)));
446 }
447
448 scores.sort_by_key(|entry| entry.0);
449 Some(scores)
450}
451
452fn greedy_variable_order(shape: &KCliqueShape, root_scores: &[(VertexId, f64)]) -> Vec<VertexId> {
453 let mut remaining: Vec<VertexId> = root_scores.iter().map(|(var, _)| *var).collect();
454 let mut order = Vec::with_capacity(remaining.len());
455
456 while !remaining.is_empty() {
457 let best_pos = remaining
458 .iter()
459 .enumerate()
460 .min_by(|(_, left), (_, right)| {
461 let left_score = marginal_score(shape, root_scores, **left, &order);
462 let right_score = marginal_score(shape, root_scores, **right, &order);
463 left_score
464 .total_cmp(&right_score)
465 .then_with(|| left.cmp(right))
466 })
467 .map(|(idx, _)| idx)
468 .unwrap_or(0);
469 order.push(remaining.remove(best_pos));
470 }
471
472 order
473}
474
475fn marginal_score(
476 shape: &KCliqueShape,
477 root_scores: &[(VertexId, f64)],
478 variable: VertexId,
479 bound: &[VertexId],
480) -> f64 {
481 let root = root_scores
482 .iter()
483 .find(|(var, _)| *var == variable)
484 .map(|(_, score)| *score)
485 .unwrap_or(f64::MAX);
486 let bound_edges = shape
487 .edges()
488 .iter()
489 .filter(|edge| {
490 edge.endpoint_col(variable).is_some()
491 && edge
492 .other_endpoint(variable)
493 .is_some_and(|other| bound.contains(&other))
494 })
495 .count() as f64;
496
497 root / (1.0 + bound_edges).powi(2)
498}
499
500fn edge_permutation(shape: &KCliqueShape, variable_order: &[VertexId]) -> Vec<usize> {
501 let mut positions = vec![0usize; usize::from(shape.variable_count())];
502 for (pos, variable) in variable_order.iter().enumerate() {
503 positions[variable.0] = pos;
504 }
505
506 let mut indexed: Vec<(usize, usize, usize, RelId)> = shape
507 .edges()
508 .iter()
509 .enumerate()
510 .map(|(idx, edge)| {
511 let left = positions[edge.left.0];
512 let right = positions[edge.right.0];
513 (idx, left.max(right), left.min(right), edge.rel_id)
514 })
515 .collect();
516 indexed.sort_by_key(|(_, max_pos, min_pos, rel_id)| (*max_pos, *min_pos, *rel_id));
517 indexed.into_iter().map(|(idx, _, _, _)| idx).collect()
518}
519
520fn helper_split_specs_for_buried_skew<S: StatsSource>(
521 shape: &KCliqueShape,
522 stats: &S,
523 variable_order: &[VertexId],
524) -> Option<Vec<HelperSplitSpec>> {
525 let leader = *variable_order.first()?;
526 let variable_heat = per_variable_heat(shape, stats)?;
527 let leader_heat = variable_heat
528 .iter()
529 .find(|(variable, _)| *variable == leader)
530 .map(|(_, heat)| *heat)?
531 .max(f64::EPSILON);
532 let (hot_variable, hot_heat) = variable_heat
533 .iter()
534 .copied()
535 .filter(|(variable, _)| *variable != leader)
536 .max_by(|left, right| {
537 left.1
538 .total_cmp(&right.1)
539 .then_with(|| right.0.cmp(&left.0))
540 })?;
541 let threshold = buried_skew_threshold();
542 if hot_heat / leader_heat < threshold {
543 return Some(Vec::new());
544 }
545
546 let helper_vertices: Vec<VertexId> = variable_order
547 .iter()
548 .copied()
549 .filter(|variable| *variable != hot_variable)
550 .take(2)
551 .collect();
552 if helper_vertices.len() != 2 {
553 return Some(Vec::new());
554 }
555 let edge_hot_left = clique_edge_idx_for_vars(shape, hot_variable, helper_vertices[0])?;
556 let edge_hot_right = clique_edge_idx_for_vars(shape, hot_variable, helper_vertices[1])?;
557 let leader_edge = clique_edge_idx_for_vars(shape, helper_vertices[0], helper_vertices[1])?;
558
559 Some(vec![HelperSplitSpec {
560 helper_id: 0,
561 variable: u8::try_from(hot_variable.0).ok()?,
562 edge_slots: vec![
563 u8::try_from(edge_hot_left).ok()?,
564 u8::try_from(edge_hot_right).ok()?,
565 u8::try_from(leader_edge).ok()?,
566 ],
567 }])
568}
569
570fn buried_skew_threshold() -> f64 {
571 std::env::var("XLOG_BURIED_SKEW_THRESHOLD")
572 .ok()
573 .and_then(|raw| raw.parse::<f64>().ok())
574 .filter(|value| value.is_finite() && *value > 0.0)
575 .unwrap_or(DEFAULT_BURIED_SKEW_THRESHOLD)
576}
577
578fn per_variable_heat<S: StatsSource>(
579 shape: &KCliqueShape,
580 stats: &S,
581) -> Option<Vec<(VertexId, f64)>> {
582 let mut heats = Vec::new();
583 for variable in shape.variables() {
584 let mut heat = 0.0f64;
585 for edge in shape.edges() {
586 let Some(col_idx) = edge.endpoint_col(variable) else {
587 continue;
588 };
589 let (key_heat, skew_factor) = checked_heat(stats, edge.rel_id, col_idx)?;
590 heat = heat.max(key_heat.max(skew_factor));
591 }
592 heats.push((variable, heat.max(f64::EPSILON)));
593 }
594 Some(heats)
595}
596
597fn clique_edge_idx_for_vars(
598 shape: &KCliqueShape,
599 left: VertexId,
600 right: VertexId,
601) -> Option<usize> {
602 let (left, right) = if left <= right {
603 (left, right)
604 } else {
605 (right, left)
606 };
607 shape
608 .edges()
609 .iter()
610 .position(|edge| edge.left == left && edge.right == right)
611}
612
613fn share_allocation(root_scores: &[(VertexId, f64)]) -> Vec<VariableShare> {
614 let inverse_sum: f64 = root_scores.iter().map(|(_, score)| 1.0 / *score).sum();
615 let mut shares: Vec<VariableShare> = root_scores
616 .iter()
617 .map(|(variable, score)| VariableShare {
618 variable: *variable,
619 share: (1.0 / *score) / inverse_sum,
620 })
621 .collect();
622 shares.sort_by_key(|share| share.variable);
623 shares
624}
625
626fn cost_prediction<S: StatsSource>(
627 shape: &KCliqueShape,
628 stats: &S,
629 root_scores: &[(VertexId, f64)],
630) -> Option<CostPredictionRecord> {
631 let wcoj_cost = root_scores.iter().map(|(_, score)| *score).sum::<f64>();
632 let mut hash_cost = 0.0;
633
634 for edge in shape.edges() {
635 hash_cost += stats.relation_cardinality(edge.rel_id)? as f64;
636 }
637
638 let avg_selectivity = average_touching_selectivity(shape, stats)?;
639 let hash_cost = hash_cost * (1.0 + avg_selectivity);
640
641 Some(CostPredictionRecord {
642 wcoj_cost,
643 hash_cost,
644 })
645}
646
647fn average_touching_selectivity<S: StatsSource>(shape: &KCliqueShape, stats: &S) -> Option<f64> {
648 let mut total = 0.0;
649 let mut count = 0usize;
650
651 for (left_idx, left_edge) in shape.edges().iter().enumerate() {
652 for right_edge in shape.edges().iter().skip(left_idx + 1) {
653 if left_edge.touches(right_edge) {
654 total += checked_selectivity(
655 stats,
656 left_edge.rel_id,
657 right_edge.rel_id,
658 left_edge.left_col,
659 right_edge.left_col,
660 )?;
661 count += 1;
662 }
663 }
664 }
665
666 if count == 0 {
667 Some(1.0)
668 } else {
669 Some(total / count as f64)
670 }
671}