use std::collections::HashSet;
use std::hash::Hash;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ExecutionLevel {
Leaf,
Zone,
Root,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PlanNode {
Scan {
metric: String,
},
Filter {
field: String,
},
Join {
on: Vec<String>,
},
GroupBy {
fields: Vec<String>,
},
Project {
fields: Vec<String>,
},
}
impl PlanNode {
pub fn lowest_valid_level(&self) -> ExecutionLevel {
match self {
Self::Scan { .. } | Self::Filter { .. } | Self::Join { .. } => ExecutionLevel::Leaf,
Self::GroupBy { fields } if fields.iter().any(|field| field == "zone") => {
ExecutionLevel::Zone
}
Self::GroupBy { .. } | Self::Project { .. } => ExecutionLevel::Leaf,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct LogicalPlan {
nodes: Vec<PlanNode>,
}
impl LogicalPlan {
pub fn new(nodes: Vec<PlanNode>) -> Self {
Self { nodes }
}
pub fn nodes(&self) -> &[PlanNode] {
&self.nodes
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueryFragment {
pub level: ExecutionLevel,
pub nodes: Vec<PlanNode>,
}
#[derive(Debug, Clone, Default)]
pub struct PushdownPlanner;
impl PushdownPlanner {
pub fn new() -> Self {
Self
}
pub fn fragments(&self, plan: &LogicalPlan) -> Vec<QueryFragment> {
let mut fragments = Vec::<QueryFragment>::new();
for node in plan.nodes() {
let level = node.lowest_valid_level();
match fragments.last_mut() {
Some(fragment) if fragment.level == level => fragment.nodes.push(node.clone()),
_ => fragments.push(QueryFragment {
level,
nodes: vec![node.clone()],
}),
}
}
fragments
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FanoutPlan<ChildId: Eq + Hash> {
pub level: ExecutionLevel,
pub children: HashSet<ChildId>,
pub partial: bool,
}
impl<ChildId: Eq + Hash> FanoutPlan<ChildId> {
pub fn new(level: ExecutionLevel, children: HashSet<ChildId>) -> Self {
Self {
level,
children,
partial: false,
}
}
pub fn partial(mut self) -> Self {
self.partial = true;
self
}
pub fn intersect(&mut self, other: &HashSet<ChildId>)
where
ChildId: Clone,
{
self.children.retain(|child| other.contains(child));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pushdown_planner_places_zone_grouping_at_zone_level() {
let plan = LogicalPlan::new(vec![
PlanNode::Scan {
metric: "/rpc/server/latency".to_owned(),
},
PlanNode::Filter {
field: "job".to_owned(),
},
PlanNode::GroupBy {
fields: vec!["zone".to_owned()],
},
]);
let fragments = PushdownPlanner::new().fragments(&plan);
assert_eq!(fragments[0].level, ExecutionLevel::Leaf);
assert_eq!(fragments[1].level, ExecutionLevel::Zone);
}
}