scepter 0.1.3

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use scepter::{
    BucketLayout, CollectionAggregator, Distribution, ExecutionLevel, FanoutPlan, FieldHintIndex,
    FieldPredicate, IngestDecision, IngestRouter, KeyEncoder, LogicalPlan, PlanNode,
    PushdownPlanner, RangeAssigner,
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut ranges = RangeAssigner::new();
    ranges.assign(b"a".to_vec()..b"m".to_vec(), "leaf-a")?;
    ranges.assign(b"m".to_vec()..b"z".to_vec(), "leaf-m")?;

    let router = IngestRouter::new(ranges);
    let encoded_series = KeyEncoder::new()
        .push_field("ComputeTask")
        .push_field("monarch-api")
        .clone()
        .finish();
    assert!(!encoded_series.is_empty());

    let route = router.route("monarch-api", 1_700_000_000);
    assert_eq!(route, IngestDecision::Route("leaf-m"));

    let mut hints = FieldHintIndex::new();
    hints.insert_value("ComputeTask", "job", "monarch-api", "leaf-m");
    hints.insert_value("ComputeTask", "job", "monarch-worker", "leaf-m");
    hints.insert_value("ComputeTask", "job", "atlas-api", "leaf-a");

    let candidates = hints.candidates(
        "ComputeTask",
        "job",
        &FieldPredicate::Equals("monarch-api".to_owned()),
    );
    assert_eq!(candidates.len(), 1);
    assert!(candidates.contains("leaf-m"));

    let fanout = FanoutPlan::new(ExecutionLevel::Leaf, candidates).partial();
    assert_eq!(fanout.children.len(), 1);

    let plan = LogicalPlan::new(vec![
        PlanNode::Scan {
            metric: "/rpc/server/latency".to_owned(),
        },
        PlanNode::Filter {
            field: "job".to_owned(),
        },
        PlanNode::GroupBy {
            fields: vec!["zone".to_owned()],
        },
        PlanNode::Project {
            fields: vec!["p99".to_owned()],
        },
    ]);
    let fragments = PushdownPlanner::new().fragments(&plan);
    assert_eq!(fragments.len(), 3);

    let layout = BucketLayout::fixed_width(0.0, 50.0, 4)?;
    let mut leaf_a = Distribution::<()>::from_layout(&layout);
    leaf_a.record(42.0, 3)?;

    let mut leaf_m = Distribution::<()>::from_layout(&layout);
    leaf_m.record(71.0, 4)?;
    leaf_m.record(140.0, 1)?;

    let mut aggregate = CollectionAggregator::new();
    aggregate.add("/rpc/server/latency", leaf_a)?;
    aggregate.add("/rpc/server/latency", leaf_m)?;
    let merged = aggregate.into_inner();
    let latency = &merged["/rpc/server/latency"];

    println!("route={route:?}");
    println!(
        "hints={} postings={} fanout={:?}",
        hints.hint_count(),
        hints.posting_count(),
        fanout.children
    );
    println!(
        "fragments={} samples={} p99={:?}",
        fragments.len(),
        latency.total_count(),
        latency.percentile(99.0)?
    );

    Ok(())
}