scepter 0.1.5

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use scepter::{
    AdmissionWindow, BucketLayout, BucketedAggregator, CollectionAggregator, DeltaPoint,
    Distribution, ExecutionLevel, FanoutPlan, FieldHintIndex, FieldPredicate, IngestDecision,
    IngestRouter, IssueKind, KeyEncoder, LogicalPlan, PlanNode, PushdownPlanner, QueryHealth,
    RangeAssigner, ReplicaCandidate, ReplicaQuality, ReplicaResolver, ReplicaState, SumReducer,
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut ranges = RangeAssigner::new();
    ranges.assign(b"a".to_vec()..b"m".to_vec(), "leaf-a")?;
    ranges.assign(b"m".to_vec()..b"z".to_vec(), "leaf-m")?;

    let router = IngestRouter::new(ranges);
    let encoded_series = KeyEncoder::new()
        .push_field("ComputeTask")
        .push_field("monarch-api")
        .clone()
        .finish();
    assert!(!encoded_series.is_empty());

    let route = router.route("monarch-api", 1_700_000_000);
    assert_eq!(route, IngestDecision::Route("leaf-m"));

    let mut hints = FieldHintIndex::new();
    hints.insert_value("ComputeTask", "job", "monarch-api", "leaf-m");
    hints.insert_value("ComputeTask", "job", "monarch-worker", "leaf-m");
    hints.insert_value("ComputeTask", "job", "atlas-api", "leaf-a");

    let candidates = hints.candidates(
        "ComputeTask",
        "job",
        &FieldPredicate::Equals("monarch-api".to_owned()),
    );
    assert_eq!(candidates.len(), 1);
    assert!(candidates.contains("leaf-m"));

    let fanout = FanoutPlan::new(ExecutionLevel::Leaf, candidates).partial();
    assert_eq!(fanout.children.len(), 1);

    let plan = LogicalPlan::new(vec![
        PlanNode::Scan {
            metric: "/rpc/server/latency".to_owned(),
        },
        PlanNode::Filter {
            field: "job".to_owned(),
        },
        PlanNode::GroupBy {
            fields: vec!["zone".to_owned()],
        },
        PlanNode::Project {
            fields: vec!["p99".to_owned()],
        },
    ]);
    let fragments = PushdownPlanner::new().fragments(&plan);
    assert_eq!(fragments.len(), 3);

    let layout = BucketLayout::fixed_width(0.0, 50.0, 4)?;
    let mut leaf_a = Distribution::<()>::from_layout(&layout);
    leaf_a.record(42.0, 3)?;

    let mut leaf_m = Distribution::<()>::from_layout(&layout);
    leaf_m.record(71.0, 4)?;
    leaf_m.record(140.0, 1)?;

    let mut aggregate = CollectionAggregator::new();
    aggregate.add("/rpc/server/latency", leaf_a)?;
    aggregate.add("/rpc/server/latency", leaf_m)?;
    let merged = aggregate.into_inner();
    let latency = &merged["/rpc/server/latency"];

    let mut disk_iops = BucketedAggregator::with_reducer(60, AdmissionWindow::new(10), SumReducer)?;
    disk_iops.ingest(DeltaPoint {
        key: ("cluster-a", "storage-user"),
        end_time: 12,
        value: 3_000_u64,
    })?;
    disk_iops.ingest(DeltaPoint {
        key: ("cluster-a", "storage-user"),
        end_time: 38,
        value: 2_000_u64,
    })?;
    let finalized_iops = disk_iops.advance_to(70);

    let replicas = ReplicaResolver::with_max_fallbacks(1).resolve(vec![
        ReplicaCandidate::new(
            b"m".to_vec()..b"z".to_vec(),
            "leaf-m-a",
            ReplicaQuality::new(
                1_700_000_000,
                1_700_000_060,
                60,
                60,
                true,
                ReplicaState::Available,
            ),
        ),
        ReplicaCandidate::new(
            b"m".to_vec()..b"z".to_vec(),
            "leaf-m-b",
            ReplicaQuality::new(
                1_700_000_000,
                1_700_000_060,
                55,
                60,
                true,
                ReplicaState::Available,
            ),
        ),
    ]);
    assert_eq!(replicas[0].primary, "leaf-m-a");

    let mut health = QueryHealth::with_expected_children(2);
    health.record_completed();
    health.push_issue("zone-west", IssueKind::PrunedZone, "soft deadline elapsed");

    println!("route={route:?}");
    println!(
        "hints={} postings={} fanout={:?}",
        hints.hint_count(),
        hints.posting_count(),
        fanout.children
    );
    println!(
        "fragments={} samples={} p99={:?}",
        fragments.len(),
        latency.total_count(),
        latency.percentile(99.0)?
    );
    println!("finalized_iops={finalized_iops:?}");
    println!(
        "replica_primary={} query_partial={} completeness={:.2}",
        replicas[0].primary,
        health.is_partial(),
        health.completeness()
    );

    Ok(())
}