use scepter::{
AdmissionWindow, BucketLayout, BucketedAggregator, CollectionAggregator, DeltaPoint,
Distribution, ExecutionLevel, FanoutPlan, FieldHintIndex, FieldPredicate, IngestDecision,
IngestRouter, IssueKind, KeyEncoder, LogicalPlan, PlanNode, PushdownPlanner, QueryHealth,
RangeAssigner, ReplicaCandidate, ReplicaQuality, ReplicaResolver, ReplicaState, SumReducer,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut ranges = RangeAssigner::new();
ranges.assign(b"a".to_vec()..b"m".to_vec(), "leaf-a")?;
ranges.assign(b"m".to_vec()..b"z".to_vec(), "leaf-m")?;
let router = IngestRouter::new(ranges);
let encoded_series = KeyEncoder::new()
.push_field("ComputeTask")
.push_field("monarch-api")
.clone()
.finish();
assert!(!encoded_series.is_empty());
let route = router.route("monarch-api", 1_700_000_000);
assert_eq!(route, IngestDecision::Route("leaf-m"));
let mut hints = FieldHintIndex::new();
hints.insert_value("ComputeTask", "job", "monarch-api", "leaf-m");
hints.insert_value("ComputeTask", "job", "monarch-worker", "leaf-m");
hints.insert_value("ComputeTask", "job", "atlas-api", "leaf-a");
let candidates = hints.candidates(
"ComputeTask",
"job",
&FieldPredicate::Equals("monarch-api".to_owned()),
);
assert_eq!(candidates.len(), 1);
assert!(candidates.contains("leaf-m"));
let fanout = FanoutPlan::new(ExecutionLevel::Leaf, candidates).partial();
assert_eq!(fanout.children.len(), 1);
let plan = LogicalPlan::new(vec![
PlanNode::Scan {
metric: "/rpc/server/latency".to_owned(),
},
PlanNode::Filter {
field: "job".to_owned(),
},
PlanNode::GroupBy {
fields: vec!["zone".to_owned()],
},
PlanNode::Project {
fields: vec!["p99".to_owned()],
},
]);
let fragments = PushdownPlanner::new().fragments(&plan);
assert_eq!(fragments.len(), 3);
let layout = BucketLayout::fixed_width(0.0, 50.0, 4)?;
let mut leaf_a = Distribution::<()>::from_layout(&layout);
leaf_a.record(42.0, 3)?;
let mut leaf_m = Distribution::<()>::from_layout(&layout);
leaf_m.record(71.0, 4)?;
leaf_m.record(140.0, 1)?;
let mut aggregate = CollectionAggregator::new();
aggregate.add("/rpc/server/latency", leaf_a)?;
aggregate.add("/rpc/server/latency", leaf_m)?;
let merged = aggregate.into_inner();
let latency = &merged["/rpc/server/latency"];
let mut disk_iops = BucketedAggregator::with_reducer(60, AdmissionWindow::new(10), SumReducer)?;
disk_iops.ingest(DeltaPoint {
key: ("cluster-a", "storage-user"),
end_time: 12,
value: 3_000_u64,
})?;
disk_iops.ingest(DeltaPoint {
key: ("cluster-a", "storage-user"),
end_time: 38,
value: 2_000_u64,
})?;
let finalized_iops = disk_iops.advance_to(70);
let replicas = ReplicaResolver::with_max_fallbacks(1).resolve(vec![
ReplicaCandidate::new(
b"m".to_vec()..b"z".to_vec(),
"leaf-m-a",
ReplicaQuality::new(
1_700_000_000,
1_700_000_060,
60,
60,
true,
ReplicaState::Available,
),
),
ReplicaCandidate::new(
b"m".to_vec()..b"z".to_vec(),
"leaf-m-b",
ReplicaQuality::new(
1_700_000_000,
1_700_000_060,
55,
60,
true,
ReplicaState::Available,
),
),
]);
assert_eq!(replicas[0].primary, "leaf-m-a");
let mut health = QueryHealth::with_expected_children(2);
health.record_completed();
health.push_issue("zone-west", IssueKind::PrunedZone, "soft deadline elapsed");
println!("route={route:?}");
println!(
"hints={} postings={} fanout={:?}",
hints.hint_count(),
hints.posting_count(),
fanout.children
);
println!(
"fragments={} samples={} p99={:?}",
fragments.len(),
latency.total_count(),
latency.percentile(99.0)?
);
println!("finalized_iops={finalized_iops:?}");
println!(
"replica_primary={} query_partial={} completeness={:.2}",
replicas[0].primary,
health.is_partial(),
health.completeness()
);
Ok(())
}