use std::collections::HashSet;
use scepter::{
ngrams, BucketLayout, CollectionAggregator, Distribution, DistributionError, EvaluatorShard,
ExcerptStrategy, Field, FieldHintIndex, FieldPredicate, FieldValue, KeyEncoder,
LexicographicKey, LoadSample, Mergeable, MetricKind, MetricSchema, PartialAggregate, PlanNode,
QueryFragment, RangeAssigner, Schedule, ShardError, StandingError, StandingQuery, Sum,
TargetSchema, TimeSeriesKey, ValueKind,
};
#[test]
fn sum_aggregator_accumulates_and_finishes() {
let mut sum = Sum::<u64>::default();
sum.add(3);
sum.add(4);
assert_eq!(sum.finish(), 7);
}
#[test]
fn fixed_width_layout_builds_exact_contiguous_ranges() {
let layout = BucketLayout::fixed_width(-1.0, 0.5, 3).unwrap();
assert_eq!(layout.ranges, vec![-1.0..-0.5, -0.5..0.0, 0.0..0.5]);
assert_eq!(
BucketLayout::fixed_width(0.0, -1.0, 1),
Err(DistributionError::InvalidBucketRange)
);
assert_eq!(
BucketLayout::fixed_width(f64::INFINITY, 1.0, 1),
Err(DistributionError::InvalidBucketRange)
);
assert_eq!(
BucketLayout::fixed_width(0.0, f64::NAN, 1),
Err(DistributionError::InvalidBucketRange)
);
assert_eq!(
BucketLayout::fixed_width(0.0, 1.0, 0),
Err(DistributionError::EmptyLayout)
);
}
#[test]
fn bucket_for_handles_edges_and_out_of_range_values() {
let layout = BucketLayout::fixed_width(0.0, 10.0, 2).unwrap();
assert_eq!(layout.bucket_for(0.0), Some(0));
assert_eq!(layout.bucket_for(10.0), Some(1));
assert_eq!(layout.bucket_for(20.0), Some(1));
assert_eq!(layout.bucket_for(-0.1), None);
assert_eq!(layout.bucket_for(20.1), None);
}
#[test]
fn record_updates_only_matching_bucket() {
let layout = BucketLayout::fixed_width(0.0, 10.0, 2).unwrap();
let mut distribution = Distribution::<()>::from_layout(&layout);
distribution.record(5.0, 3).unwrap();
distribution.record(10.0, 2).unwrap();
distribution.record(15.0, 4).unwrap();
distribution.record(20.0, 5).unwrap();
assert_eq!(distribution.buckets()[0].count, 3);
assert_eq!(distribution.buckets()[1].count, 11);
assert_eq!(
distribution.record(25.0, 1),
Err(DistributionError::BucketOutOfBounds)
);
}
#[test]
fn exemplars_are_set_and_checked_by_bucket() {
let layout = BucketLayout::fixed_width(0.0, 1.0, 1).unwrap();
let mut distribution = Distribution::from_layout(&layout);
distribution
.set_exemplar(
0,
scepter::Exemplar {
value: 0.25,
payload: "trace-1",
},
)
.unwrap();
assert_eq!(distribution.exemplar(0).unwrap().payload, "trace-1");
assert_eq!(
distribution.set_exemplar(
1,
scepter::Exemplar {
value: 2.0,
payload: "trace-2",
},
),
Err(DistributionError::BucketOutOfBounds)
);
}
#[test]
fn distribution_rejects_incompatible_layouts_and_bad_percentiles() {
let mut left = BucketLayout::fixed_width(0.0, 1.0, 1)
.unwrap()
.empty_distribution::<()>();
let right = BucketLayout::fixed_width(1.0, 1.0, 1)
.unwrap()
.empty_distribution::<()>();
assert_eq!(
left.try_merge(&right),
Err(DistributionError::IncompatibleLayout)
);
assert_eq!(
left.delta(&right),
Err(DistributionError::IncompatibleLayout)
);
assert_eq!(
left.percentile(101.0),
Err(DistributionError::InvalidPercentile)
);
}
#[test]
fn public_errors_have_human_readable_messages() {
assert_eq!(
DistributionError::InvalidPercentile.to_string(),
"percentile must be finite and between 0 and 100"
);
assert_eq!(
scepter::HintError::EmptyExcerpt.to_string(),
"excerpt size must be greater than zero"
);
assert_eq!(
ShardError::SplitOutsideRange.to_string(),
"split key must be inside the range"
);
assert_eq!(
StandingError::EmptyShardSet.to_string(),
"shard count must be greater than zero"
);
}
#[test]
fn cumulative_delta_rejects_reset_and_out_of_order_points() {
let distribution = BucketLayout::fixed_width(0.0, 1.0, 1)
.unwrap()
.empty_distribution::<()>();
let reset_current = scepter::CumulativePoint {
start: 2,
timestamp: 30,
value: distribution.clone(),
};
let reset_previous = scepter::CumulativePoint {
start: 1,
timestamp: 20,
value: distribution.clone(),
};
let out_of_order_current = scepter::CumulativePoint {
start: 1,
timestamp: 10,
value: distribution.clone(),
};
let out_of_order_previous = scepter::CumulativePoint {
start: 1,
timestamp: 20,
value: distribution,
};
assert_eq!(
reset_current.delta_since(&reset_previous),
Err(DistributionError::ResetOrOutOfOrderPoint)
);
assert_eq!(
out_of_order_current.delta_since(&out_of_order_previous),
Err(DistributionError::ResetOrOutOfOrderPoint)
);
}
#[test]
fn ngram_generation_is_padded_and_sized() {
assert_eq!(ngrams("ab", 2), vec!["^a", "ab", "b$"]);
assert_eq!(
ExcerptStrategy::NGram(4).excerpts("go"),
vec!["^^^g", "^^go", "^go$", "go$$", "o$$$"]
);
}
#[test]
fn candidate_union_combines_predicate_results() {
let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
index.insert_value("Metric", "name", "latency", "leaf-1");
index.insert_value("Metric", "name", "errors", "leaf-2");
let candidates = index.candidate_union(
"Metric",
"name",
&[
FieldPredicate::Equals("latency".to_owned()),
FieldPredicate::Equals("errors".to_owned()),
],
);
assert_eq!(candidates, HashSet::from(["leaf-1", "leaf-2"]));
}
#[test]
fn custom_excerpt_strategy_changes_index_semantics() {
let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::NGram(1));
index.insert_value("Target", "job", "abc", "leaf-1");
let candidates = index.candidates("Target", "job", &FieldPredicate::Equals("b".to_owned()));
assert_eq!(candidates, HashSet::from(["leaf-1"]));
}
#[test]
fn key_encoding_covers_string_bool_and_field_values() {
assert_eq!("abc".to_owned().encoded_key(), "abc".encoded_key());
assert!(false.encoded_key() < true.encoded_key());
assert_eq!(FieldValue::I64(-3).encoded_key(), (-3_i64).encoded_key());
assert_eq!(FieldValue::F64(1.5).encoded_key(), 1.5_f64.encoded_key());
}
#[test]
fn key_encoder_uses_separators_between_fields() {
let bytes = KeyEncoder::new()
.push_field("ab")
.push_field("cd")
.clone()
.finish();
assert_eq!(bytes, b"ab\0cd");
}
#[test]
fn schema_and_series_keys_expose_exact_semantics() {
let schema = TargetSchema::new(
"ComputeTask",
vec![Field::new("cluster", ValueKind::String)],
"cluster",
);
assert!(schema.has_field("cluster"));
assert!(!schema.has_field("job"));
let metric = MetricSchema::new(
"/rpc/server/latency",
vec![Field::new("command", ValueKind::String)],
ValueKind::Distribution,
MetricKind::Cumulative,
);
assert_eq!(metric.metric_kind, MetricKind::Cumulative);
let key = TimeSeriesKey {
target_schema: "ComputeTask".to_owned(),
target_fields: vec![FieldValue::String("aa".to_owned())],
metric_name: "/rpc/server/latency".to_owned(),
metric_fields: vec![FieldValue::String("Query".to_owned())],
};
let target = KeyEncoder::new()
.push_field("ComputeTask")
.push_field("aa")
.clone()
.finish();
assert_eq!(key.target_key(), target);
assert!(key.series_key().ends_with(b"Query"));
}
#[test]
fn query_nodes_and_fanout_intersection_have_observable_behavior() {
assert_eq!(
PlanNode::GroupBy { fields: vec![] }.lowest_valid_level(),
scepter::ExecutionLevel::Leaf
);
assert_eq!(
PlanNode::GroupBy {
fields: vec!["zone".to_owned()]
}
.lowest_valid_level(),
scepter::ExecutionLevel::Zone
);
let mut fanout = scepter::FanoutPlan::new(
scepter::ExecutionLevel::Leaf,
HashSet::from(["leaf-1", "leaf-2"]),
);
fanout.intersect(&HashSet::from(["leaf-2", "leaf-3"]));
assert_eq!(fanout.children, HashSet::from(["leaf-2"]));
let fragment = QueryFragment {
level: scepter::ExecutionLevel::Leaf,
nodes: vec![PlanNode::Scan {
metric: "m".to_owned(),
}],
};
assert_eq!(fragment.nodes.len(), 1);
}
#[test]
fn load_score_and_range_assignment_operations_are_observable() {
let load = LoadSample {
writes_per_second: 2.0,
bytes_per_second: 2048.0,
queries_per_second: 3.0,
};
assert_eq!(load.score(), 7.0);
let mut assigner = RangeAssigner::new();
assigner.assign(vec![0]..vec![10], "leaf-1").unwrap();
assert_eq!(assigner.worker_for_encoded(&[10]), None);
assigner.reassign_start(&[0], "leaf-2").unwrap();
assert_eq!(assigner.worker_for_encoded(&[5]), Some(&"leaf-2"));
assert_eq!(
assigner.reassign_start(&[9], "leaf-3"),
Err(ShardError::RangeNotFound)
);
assert_eq!(
assigner.try_assign(vec![9]..vec![9], "leaf-3"),
Err(ShardError::EmptyRange)
);
assert_eq!(
assigner.split_at(&[0], vec![0], "leaf-4"),
Err(ShardError::SplitOutsideRange)
);
assert_eq!(assigner.assignments().count(), 1);
}
#[test]
fn standing_query_schedule_and_sharding_are_concrete() {
assert_eq!(Schedule::every_seconds(5).every_millis, 5000);
let shard = EvaluatorShard::for_query("query-a", 8).unwrap();
assert!(shard.shard < 8);
assert!(shard.owns("query-a"));
let other_name = (0..100)
.map(|index| format!("query-b-{index}"))
.find(|name| !shard.owns(name))
.expect("at least one generated query should map elsewhere");
assert!(!shard.owns(&other_name));
let standing = StandingQuery {
name: "query-a".to_owned(),
schedule: Schedule::every_seconds(5),
plan: scepter::LogicalPlan::default(),
output_metric: "/derived/query-a".to_owned(),
};
assert_eq!(standing.output_metric, "/derived/query-a");
}
#[test]
fn collection_aggregator_uses_mergeable_values() {
#[derive(Debug, Clone, PartialEq)]
struct Counter(u64);
impl Mergeable for Counter {
type Error = std::convert::Infallible;
fn merge_from(&mut self, other: Self) -> Result<(), Self::Error> {
self.0 += other.0;
Ok(())
}
}
let mut aggregator = CollectionAggregator::new();
aggregator.add("k", Counter(2)).unwrap();
aggregator.add("k", Counter(5)).unwrap();
assert_eq!(aggregator.into_inner()["k"], Counter(7));
}
#[test]
fn u64_mergeable_adds_values() {
let mut value = 10_u64;
value.merge_from(7).unwrap();
assert_eq!(value, 17);
}