use scepter::{
BucketLayout, CollectionAggregator, Distribution, ExecutionLevel, FanoutPlan, FieldHintIndex,
FieldPredicate, IngestDecision, IngestRouter, KeyEncoder, LogicalPlan, PlanNode,
PushdownPlanner, RangeAssigner,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut ranges = RangeAssigner::new();
ranges.assign(b"a".to_vec()..b"m".to_vec(), "leaf-a")?;
ranges.assign(b"m".to_vec()..b"z".to_vec(), "leaf-m")?;
let router = IngestRouter::new(ranges);
let encoded_series = KeyEncoder::new()
.push_field("ComputeTask")
.push_field("monarch-api")
.clone()
.finish();
assert!(!encoded_series.is_empty());
let route = router.route("monarch-api", 1_700_000_000);
assert_eq!(route, IngestDecision::Route("leaf-m"));
let mut hints = FieldHintIndex::new();
hints.insert_value("ComputeTask", "job", "monarch-api", "leaf-m");
hints.insert_value("ComputeTask", "job", "monarch-worker", "leaf-m");
hints.insert_value("ComputeTask", "job", "atlas-api", "leaf-a");
let candidates = hints.candidates(
"ComputeTask",
"job",
&FieldPredicate::Equals("monarch-api".to_owned()),
);
assert_eq!(candidates.len(), 1);
assert!(candidates.contains("leaf-m"));
let fanout = FanoutPlan::new(ExecutionLevel::Leaf, candidates).partial();
assert_eq!(fanout.children.len(), 1);
let plan = LogicalPlan::new(vec![
PlanNode::Scan {
metric: "/rpc/server/latency".to_owned(),
},
PlanNode::Filter {
field: "job".to_owned(),
},
PlanNode::GroupBy {
fields: vec!["zone".to_owned()],
},
PlanNode::Project {
fields: vec!["p99".to_owned()],
},
]);
let fragments = PushdownPlanner::new().fragments(&plan);
assert_eq!(fragments.len(), 3);
let layout = BucketLayout::fixed_width(0.0, 50.0, 4)?;
let mut leaf_a = Distribution::<()>::from_layout(&layout);
leaf_a.record(42.0, 3)?;
let mut leaf_m = Distribution::<()>::from_layout(&layout);
leaf_m.record(71.0, 4)?;
leaf_m.record(140.0, 1)?;
let mut aggregate = CollectionAggregator::new();
aggregate.add("/rpc/server/latency", leaf_a)?;
aggregate.add("/rpc/server/latency", leaf_m)?;
let merged = aggregate.into_inner();
let latency = &merged["/rpc/server/latency"];
println!("route={route:?}");
println!(
"hints={} postings={} fanout={:?}",
hints.hint_count(),
hints.posting_count(),
fanout.children
);
println!(
"fragments={} samples={} p99={:?}",
fragments.len(),
latency.total_count(),
latency.percentile(99.0)?
);
Ok(())
}