use std::collections::{HashMap, HashSet};
use crate::common::{BoxError, Config};
use crate::optimizer::Optimizer;
use crate::parser::Program;
use crate::planner::StratumPlanner;
use crate::profiler::Profiler;
use crate::stratifier::Stratifier;
#[derive(Debug)]
pub struct ProgramPlanner {
strata: Vec<StratumPlanner>,
}
impl ProgramPlanner {
pub fn from_program(
config: &Config,
program: &Program,
profiler: &mut Option<Profiler>,
) -> Result<Self, BoxError> {
let stratifier = Stratifier::from_program(program, config.is_extended())?;
let mut optimizer = Optimizer::new();
let mut strata: Vec<StratumPlanner> = stratifier
.stratum()
.iter()
.enumerate()
.map(|(idx, rule_refs)| {
let rules: Vec<_> = rule_refs.iter().copied().cloned().collect();
StratumPlanner::from_rules(
config,
&rules,
&mut optimizer,
profiler,
&stratifier,
idx,
)
.map_err(BoxError::from)
})
.collect::<Result<_, _>>()?;
prune_cross_stratum_duplicates(&mut strata);
Ok(Self { strata })
}
pub fn strata(&self) -> &[StratumPlanner] {
&self.strata
}
}
fn prune_cross_stratum_duplicates(strata: &mut [StratumPlanner]) {
let mut idb_writes: HashMap<u64, Vec<usize>> = HashMap::new();
for (idx, stratum) in strata.iter().enumerate() {
for fp in stratum.idb_to_heads_map().keys() {
idb_writes.entry(*fp).or_default().push(idx);
}
}
let mut idb_deps: HashMap<u64, HashSet<u64>> = idb_writes
.keys()
.map(|&fp| (fp, HashSet::from([fp])))
.collect();
let mut emitted_at: HashMap<u64, usize> = HashMap::new();
for (idx, stratum) in strata.iter_mut().enumerate() {
stratum.retain_non_recursive_transformations(|t| {
let fp = t.output().fingerprint();
let t_deps: HashSet<u64> = t
.input_fingerprints()
.into_iter()
.filter_map(|f| idb_deps.get(&f))
.flatten()
.copied()
.collect();
idb_deps.entry(fp).or_default().extend(&t_deps);
let keep = match emitted_at.get(&fp) {
None => true,
Some(&prev) => t_deps
.iter()
.any(|idb| idb_writes[idb].iter().any(|&k| k > prev && k <= idx)),
};
if keep {
emitted_at.insert(fp, idx);
}
keep
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use crate::common::SourceMap;
fn analyze(src: &str) -> ProgramPlanner {
let mut tmp = tempfile::NamedTempFile::new().expect("tempfile");
tmp.write_all(src.as_bytes()).expect("write");
let mut sm = SourceMap::new();
let mut program =
Program::parse(&tmp.path().to_string_lossy(), false, &mut sm).expect("parse");
crate::typechecker::check_program(&mut program, &Config::default()).expect("typecheck");
ProgramPlanner::from_program(&Config::default(), &program, &mut None).expect("plan")
}
const DYCK_SRC: &str = "\
.decl Arc(x: int32, y: int32, l: int32)\n\
.input Arc(IO=\"file\", filename=\"Arc.csv\", delimiter=\",\")\n\
.decl Zero(x: int32, y: int32)\n\
.printsize Zero\n\
.decl One(x: int32, y: int32)\n\
.printsize One\n\
.decl Dyck(x: int32, y: int32)\n\
.printsize Dyck\n\
Zero(x, y) :- Arc(x, y, 0).\n\
One(x, y) :- Arc(x, y, 1).\n\
Dyck(x, y) :- Zero(x, z), Zero(z, y).\n\
Dyck(x, y) :- One(x, z), One(z, y).\n\
Dyck(x, y) :- Zero(x, z), Dyck(z, w), Zero(w, y).\n\
Dyck(x, y) :- One(x, z), Dyck(z, w), One(w, y).\n\
Dyck(x, y) :- Dyck(x, z), Dyck(z, y).\n";
#[test]
fn dyck_prune_collapses_cross_stratum_duplicates() {
let pp = analyze(DYCK_SRC);
assert_eq!(pp.strata().len(), 3, "dyck should stratify into 3 strata");
let mut owner: HashMap<u64, usize> = HashMap::new();
for (idx, stratum) in pp.strata().iter().enumerate() {
for t in stratum.non_recursive_transformations() {
let fp = t.output().fingerprint();
if let Some(prev) = owner.insert(fp, idx) {
panic!("fp 0x{fp:016x} survives in both stratum {prev} and stratum {idx}");
}
}
}
assert_eq!(
owner.len(),
8,
"expected 8 non-recursive transformations after prune"
);
}
const RHS_ID_SHARING_SRC: &str = "\
.decl A(x: int32, y: int32)\n\
.decl B(x: int32, y: int32)\n\
.decl C(x: int32, y: int32)\n\
.decl Out1(x: int32, y: int32)\n\
.decl Out2(x: int32, y: int32)\n\
.input A(IO=\"file\", filename=\"A.csv\", delimiter=\",\")\n\
.input B(IO=\"file\", filename=\"B.csv\", delimiter=\",\")\n\
.input C(IO=\"file\", filename=\"C.csv\", delimiter=\",\")\n\
.output Out1\n\
.output Out2\n\
Out1(x, y) :- A(x, z), B(z, y).\n\
Out2(x, y) :- B(z, y), C(x, z).\n";
#[test]
fn rhs_id_does_not_split_identical_arrangements() {
let pp = analyze(RHS_ID_SHARING_SRC);
let b_fp = crate::common::compute_fp("b");
let b_arrangements: Vec<_> = pp
.strata()
.iter()
.flat_map(|s| s.non_recursive_transformations())
.filter(|t| t.is_unary() && t.unary_input().fingerprint() == b_fp)
.collect();
assert_eq!(
b_arrangements.len(),
1,
"both rules key B on its first column; the arrangement must be shared"
);
let shared_fp = b_arrangements[0].output().fingerprint();
let consumers = pp
.strata()
.iter()
.flat_map(|s| s.non_recursive_transformations())
.filter(|t| !t.is_unary())
.filter(|t| {
let (left, right) = t.binary_input();
left.fingerprint() == shared_fp || right.fingerprint() == shared_fp
})
.count();
assert_eq!(
consumers, 2,
"both joins must consume the shared B arrangement"
);
}
#[test]
fn swapped_output_columns_stay_distinct() {
let pp = analyze(
"\
.decl R(k: int32, v: int32)\n\
.decl S(k: int32, v: int32)\n\
.decl P(a: int32, b: int32)\n\
.decl Q(a: int32, b: int32)\n\
.input R(IO=\"file\", filename=\"R.csv\", delimiter=\",\")\n\
.input S(IO=\"file\", filename=\"S.csv\", delimiter=\",\")\n\
.output P\n\
.output Q\n\
P(a, b) :- R(k, a), S(k, b).\n\
Q(a, b) :- R(k, b), S(k, a).\n",
);
let heads: Vec<u64> = pp
.strata()
.iter()
.flat_map(|s| s.idb_to_heads_map().values().flatten().copied())
.collect();
assert_eq!(heads.len(), 2, "P and Q must each keep their own head");
assert_ne!(
heads[0], heads[1],
"swapped output columns must not collapse into one head"
);
}
#[test]
fn equal_fingerprint_implies_equal_content() {
use crate::planner::TransformationFlow;
for src in [DYCK_SRC, RHS_ID_SHARING_SRC] {
let pp = analyze(src);
let mut seen: HashMap<u64, (&str, Vec<u64>, TransformationFlow)> = HashMap::new();
for stratum in pp.strata() {
for planner in stratum.rule_planners() {
for tx in planner.transformations() {
let fp = tx.output().fingerprint();
let content = (
tx.operation_name(),
tx.input_fingerprints(),
tx.flow().clone(),
);
match seen.get(&fp) {
None => {
seen.insert(fp, content);
}
Some(prev) => assert_eq!(
*prev, content,
"fingerprint 0x{fp:016x} maps to two different contents"
),
}
}
}
}
}
}
}