scepter 0.1.5

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::collections::HashSet;
use std::hash::Hash;

/// Level in a hierarchical query execution tree.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ExecutionLevel {
    /// Leaf-level execution near stored series.
    Leaf,
    /// Zone-level execution over multiple leaves.
    Zone,
    /// Root-level execution over multiple zones.
    Root,
}

/// Logical query operation.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PlanNode {
    /// Read a metric family.
    Scan {
        /// Metric family to scan.
        metric: String,
    },
    /// Filter on one field.
    Filter {
        /// Field to filter.
        field: String,
    },
    /// Join by field names.
    Join {
        /// Join key fields.
        on: Vec<String>,
    },
    /// Group rows by field names.
    GroupBy {
        /// Grouping fields.
        fields: Vec<String>,
    },
    /// Select output fields.
    Project {
        /// Projected fields.
        fields: Vec<String>,
    },
}

impl PlanNode {
    /// Lowest execution level where this operation can be evaluated.
    pub fn lowest_valid_level(&self) -> ExecutionLevel {
        match self {
            Self::Scan { .. } | Self::Filter { .. } | Self::Join { .. } => ExecutionLevel::Leaf,
            Self::GroupBy { fields } if fields.iter().any(|field| field == "zone") => {
                ExecutionLevel::Zone
            }
            Self::GroupBy { .. } | Self::Project { .. } => ExecutionLevel::Leaf,
        }
    }
}

/// Ordered collection of logical query operations.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct LogicalPlan {
    nodes: Vec<PlanNode>,
}

impl LogicalPlan {
    /// Creates a logical plan from ordered nodes.
    pub fn new(nodes: Vec<PlanNode>) -> Self {
        Self { nodes }
    }

    /// Returns plan nodes in evaluation order.
    pub fn nodes(&self) -> &[PlanNode] {
        &self.nodes
    }
}

/// Contiguous plan fragment assigned to one execution level.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueryFragment {
    /// Execution level for this fragment.
    pub level: ExecutionLevel,
    /// Plan nodes in the fragment.
    pub nodes: Vec<PlanNode>,
}

/// Splits logical plans into execution-level fragments.
#[derive(Debug, Clone, Default)]
pub struct PushdownPlanner;

impl PushdownPlanner {
    /// Creates a pushdown planner.
    pub fn new() -> Self {
        Self
    }

    /// Returns query fragments grouped by their lowest valid execution level.
    pub fn fragments(&self, plan: &LogicalPlan) -> Vec<QueryFragment> {
        let mut fragments = Vec::<QueryFragment>::new();

        for node in plan.nodes() {
            let level = node.lowest_valid_level();
            match fragments.last_mut() {
                Some(fragment) if fragment.level == level => fragment.nodes.push(node.clone()),
                _ => fragments.push(QueryFragment {
                    level,
                    nodes: vec![node.clone()],
                }),
            }
        }

        fragments
    }
}

/// Candidate child fanout for distributed execution.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FanoutPlan<ChildId: Eq + Hash> {
    /// Level at which this fanout applies.
    pub level: ExecutionLevel,
    /// Candidate child IDs to query.
    pub children: HashSet<ChildId>,
    /// Whether children produce partial aggregates.
    pub partial: bool,
}

impl<ChildId: Eq + Hash> FanoutPlan<ChildId> {
    /// Creates a fanout plan with `partial` set to false.
    pub fn new(level: ExecutionLevel, children: HashSet<ChildId>) -> Self {
        Self {
            level,
            children,
            partial: false,
        }
    }

    /// Marks this fanout as producing partial aggregates.
    pub fn partial(mut self) -> Self {
        self.partial = true;
        self
    }

    /// Intersects candidate children with another child set.
    pub fn intersect(&mut self, other: &HashSet<ChildId>)
    where
        ChildId: Clone,
    {
        self.children.retain(|child| other.contains(child));
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn pushdown_planner_places_zone_grouping_at_zone_level() {
        let plan = LogicalPlan::new(vec![
            PlanNode::Scan {
                metric: "/rpc/server/latency".to_owned(),
            },
            PlanNode::Filter {
                field: "job".to_owned(),
            },
            PlanNode::GroupBy {
                fields: vec!["zone".to_owned()],
            },
        ]);

        let fragments = PushdownPlanner::new().fragments(&plan);

        assert_eq!(fragments[0].level, ExecutionLevel::Leaf);
        assert_eq!(fragments[1].level, ExecutionLevel::Zone);
    }
}