use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Write as _;
use crate::parser::{Atom, FlowLogRule, Predicate};
use crate::planner::{Transformation, TransformationInfo};
mod common; mod core; mod fuse; mod post; mod prepare; mod sip;
#[derive(Debug)]
pub(crate) struct RulePlanner {
rule: FlowLogRule,
transformation_infos: Vec<TransformationInfo>,
producer_consumer: HashMap<u64, (Vec<usize>, Vec<usize>)>,
}
impl RulePlanner {
pub(crate) fn new(rule: FlowLogRule) -> Self {
Self {
rule,
transformation_infos: Vec::new(),
producer_consumer: HashMap::new(),
}
}
#[inline]
pub(crate) fn transformation_infos(&self) -> &Vec<TransformationInfo> {
&self.transformation_infos
}
#[inline]
pub(crate) fn rule(&self) -> &FlowLogRule {
&self.rule
}
}
impl RulePlanner {
pub(crate) fn generate_rule_plan_tree_debug_map(&self) -> BTreeMap<u64, (String, Vec<u64>)> {
let mut debug_info_map: BTreeMap<u64, (String, Vec<u64>)> = BTreeMap::new();
if self.transformation_infos.is_empty() {
return debug_info_map;
}
let atom_labels = self.rhs_atom_labels();
let mut referenced_children = BTreeSet::new();
for info in &self.transformation_infos {
let tx = match info {
TransformationInfo::KVToKV { .. } => Transformation::kv_to_kv(info),
TransformationInfo::JoinToKV { .. } => Transformation::join(info),
TransformationInfo::AntiJoinToKV { .. } => Transformation::antijoin(info),
};
let (label, children) = Self::build_transformation_debug_entry(&tx);
referenced_children.extend(children.iter().copied());
debug_info_map.insert(tx.output().fingerprint(), (label, children));
}
for child_fp in referenced_children {
debug_info_map
.entry(child_fp)
.or_insert_with(|| (atom_labels[&child_fp].clone(), Vec::new()));
}
debug_info_map
}
pub(crate) fn rhs_atom_labels(&self) -> HashMap<u64, String> {
self.rhs_atom_map(Atom::to_string)
}
pub(crate) fn rhs_atom_names(&self) -> HashMap<u64, String> {
self.rhs_atom_map(|atom| atom.name().to_string())
}
fn rhs_atom_map<F>(&self, mut value_of: F) -> HashMap<u64, String>
where
F: FnMut(&Atom) -> String,
{
let mut out = HashMap::new();
for predicate in self.rule.rhs() {
if let Predicate::PositiveAtom(atom) | Predicate::NegativeAtom(atom) = predicate {
out.entry(atom.fingerprint())
.or_insert_with(|| value_of(atom));
}
}
out
}
fn build_transformation_debug_entry(tx: &Transformation) -> (String, Vec<u64>) {
let children = tx.input_fingerprints();
let mut label = tx.operation_name().to_string();
if !label.is_empty() {
label.push(' ');
}
label.push_str(&tx.flow().to_string());
(label, children)
}
pub(crate) fn transformation_infos_dump(&self) -> String {
if self.transformation_infos.is_empty() {
return " (none)".to_string();
}
let mut out = String::new();
for (i, tx) in self.transformation_infos.iter().enumerate() {
let _ = write!(out, " [{:>3}] {}", i, tx);
}
out
}
}
impl std::fmt::Display for RulePlanner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Some(last) = self.transformation_infos.last() else {
return writeln!(f, "Plan Tree: (empty)");
};
let root = last.output_info_fp();
let debug_map = self.generate_rule_plan_tree_debug_map();
writeln!(f)?;
writeln!(f, "Rule:\n{}", self.rule)?;
writeln!(f)?;
writeln!(f, "Plan Tree:")?;
let mut walker = Walker {
debug_info_map: &debug_map,
ids: HashMap::new(),
next_id: 1,
expanded: HashSet::new(),
stack: HashSet::new(),
};
let root_uid = walker.get_id(root);
writeln!(f, "#{} {}", root_uid, walker.node_title(root))?;
walker.expanded.insert(root);
walker.stack.insert(root);
let rc: Vec<_> = walker.children(root).to_vec();
for (i, cid) in rc.iter().enumerate() {
walker.fmt_node(f, *cid, "", i + 1 == rc.len())?;
}
walker.stack.remove(&root);
Ok(())
}
}
struct Walker<'a> {
debug_info_map: &'a BTreeMap<u64, (String, Vec<u64>)>,
ids: HashMap<u64, usize>,
next_id: usize,
expanded: HashSet<u64>,
stack: HashSet<u64>,
}
impl<'a> Walker<'a> {
fn node_title(&self, id: u64) -> &str {
self.debug_info_map
.get(&id)
.map_or("<unknown>", |(lbl, _)| lbl.as_str())
}
fn children(&self, id: u64) -> &[u64] {
self.debug_info_map
.get(&id)
.map_or(&[], |(_, kids)| kids.as_slice())
}
fn get_id(&mut self, node: u64) -> usize {
*self.ids.entry(node).or_insert_with(|| {
let id = self.next_id;
self.next_id += 1;
id
})
}
fn fmt_node(
&mut self,
f: &mut std::fmt::Formatter<'_>,
node: u64,
prefix: &str,
is_last: bool,
) -> std::fmt::Result {
let (branch, spacer) = if is_last {
("└── ", " ")
} else {
("├── ", "│ ")
};
let uid = self.get_id(node);
let title = self.node_title(node);
if self.stack.contains(&node) {
writeln!(f, "{}{}⟲ #{} (cycle)", prefix, branch, uid)?;
return Ok(());
}
if self.expanded.contains(&node) {
writeln!(f, "{}{}↪ #{}", prefix, branch, uid)?;
return Ok(());
}
writeln!(f, "{}{}#{} {}", prefix, branch, uid, title)?;
self.expanded.insert(node);
self.stack.insert(node);
let kids: Vec<_> = self.children(node).to_vec();
let next_prefix = format!("{}{}", prefix, spacer);
for (i, cid) in kids.iter().enumerate() {
self.fmt_node(f, *cid, &next_prefix, i + 1 == kids.len())?;
}
self.stack.remove(&node);
Ok(())
}
}