scepter 0.1.1

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::collections::HashSet;
use std::hash::Hash;

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ExecutionLevel {
    Leaf,
    Zone,
    Root,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PlanNode {
    Scan { metric: String },
    Filter { field: String },
    Join { on: Vec<String> },
    GroupBy { fields: Vec<String> },
    Project { fields: Vec<String> },
}

impl PlanNode {
    pub fn lowest_valid_level(&self) -> ExecutionLevel {
        match self {
            Self::Scan { .. } | Self::Filter { .. } | Self::Join { .. } => ExecutionLevel::Leaf,
            Self::GroupBy { fields } if fields.iter().any(|field| field == "zone") => {
                ExecutionLevel::Zone
            }
            Self::GroupBy { .. } | Self::Project { .. } => ExecutionLevel::Leaf,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct LogicalPlan {
    nodes: Vec<PlanNode>,
}

impl LogicalPlan {
    pub fn new(nodes: Vec<PlanNode>) -> Self {
        Self { nodes }
    }

    pub fn nodes(&self) -> &[PlanNode] {
        &self.nodes
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueryFragment {
    pub level: ExecutionLevel,
    pub nodes: Vec<PlanNode>,
}

#[derive(Debug, Clone, Default)]
pub struct PushdownPlanner;

impl PushdownPlanner {
    pub fn new() -> Self {
        Self
    }

    pub fn fragments(&self, plan: &LogicalPlan) -> Vec<QueryFragment> {
        let mut fragments = Vec::<QueryFragment>::new();

        for node in plan.nodes() {
            let level = node.lowest_valid_level();
            match fragments.last_mut() {
                Some(fragment) if fragment.level == level => fragment.nodes.push(node.clone()),
                _ => fragments.push(QueryFragment {
                    level,
                    nodes: vec![node.clone()],
                }),
            }
        }

        fragments
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FanoutPlan<ChildId: Eq + Hash> {
    pub level: ExecutionLevel,
    pub children: HashSet<ChildId>,
    pub partial: bool,
}

impl<ChildId: Eq + Hash> FanoutPlan<ChildId> {
    pub fn new(level: ExecutionLevel, children: HashSet<ChildId>) -> Self {
        Self {
            level,
            children,
            partial: false,
        }
    }

    pub fn partial(mut self) -> Self {
        self.partial = true;
        self
    }

    pub fn intersect(&mut self, other: &HashSet<ChildId>)
    where
        ChildId: Clone,
    {
        self.children.retain(|child| other.contains(child));
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn pushdown_planner_places_zone_grouping_at_zone_level() {
        let plan = LogicalPlan::new(vec![
            PlanNode::Scan {
                metric: "/rpc/server/latency".to_owned(),
            },
            PlanNode::Filter {
                field: "job".to_owned(),
            },
            PlanNode::GroupBy {
                fields: vec!["zone".to_owned()],
            },
        ]);

        let fragments = PushdownPlanner::new().fragments(&plan);

        assert_eq!(fragments[0].level, ExecutionLevel::Leaf);
        assert_eq!(fragments[1].level, ExecutionLevel::Zone);
    }
}