use std::collections::HashSet;
use scepter::{
ngrams, AdmissionWindow, BucketLayout, BucketedAggregator, CollectionAggregator,
CollectionError, DeltaPoint, Distribution, DistributionError, EvaluatorShard, ExcerptStrategy,
Field, FieldHintIndex, FieldPredicate, FieldValue, IssueKind, KeyEncoder, LexicographicKey,
LoadSample, MaxReducer, Mergeable, MetricKind, MetricSchema, MinReducer, PartialAggregate,
PlanNode, QueryFragment, QueryHealth, RangeAssigner, ReplicaCandidate, ReplicaQuality,
ReplicaResolver, ReplicaState, Schedule, ShardError, StandingError, StandingQuery, Sum,
SumReducer, TargetSchema, TimeSeriesKey, ValueKind,
};
#[test]
fn sum_aggregator_accumulates_and_finishes() {
let mut sum = Sum::<u64>::default();
sum.add(3);
sum.add(4);
assert_eq!(sum.finish(), 7);
}
#[test]
fn fixed_width_layout_builds_exact_contiguous_ranges() {
let layout = BucketLayout::fixed_width(-1.0, 0.5, 3).unwrap();
assert_eq!(layout.ranges, vec![-1.0..-0.5, -0.5..0.0, 0.0..0.5]);
assert_eq!(
BucketLayout::fixed_width(0.0, -1.0, 1),
Err(DistributionError::InvalidBucketRange)
);
assert_eq!(
BucketLayout::fixed_width(f64::INFINITY, 1.0, 1),
Err(DistributionError::InvalidBucketRange)
);
assert_eq!(
BucketLayout::fixed_width(0.0, f64::NAN, 1),
Err(DistributionError::InvalidBucketRange)
);
assert_eq!(
BucketLayout::fixed_width(0.0, 1.0, 0),
Err(DistributionError::EmptyLayout)
);
}
#[test]
fn bucket_for_handles_edges_and_out_of_range_values() {
let layout = BucketLayout::fixed_width(0.0, 10.0, 2).unwrap();
assert_eq!(layout.bucket_for(0.0), Some(0));
assert_eq!(layout.bucket_for(10.0), Some(1));
assert_eq!(layout.bucket_for(20.0), Some(1));
assert_eq!(layout.bucket_for(-0.1), None);
assert_eq!(layout.bucket_for(20.1), None);
}
#[test]
fn record_updates_only_matching_bucket() {
let layout = BucketLayout::fixed_width(0.0, 10.0, 2).unwrap();
let mut distribution = Distribution::<()>::from_layout(&layout);
distribution.record(5.0, 3).unwrap();
distribution.record(10.0, 2).unwrap();
distribution.record(15.0, 4).unwrap();
distribution.record(20.0, 5).unwrap();
assert_eq!(distribution.buckets()[0].count, 3);
assert_eq!(distribution.buckets()[1].count, 11);
assert_eq!(
distribution.record(25.0, 1),
Err(DistributionError::BucketOutOfBounds)
);
}
#[test]
fn exemplars_are_set_and_checked_by_bucket() {
let layout = BucketLayout::fixed_width(0.0, 1.0, 1).unwrap();
let mut distribution = Distribution::from_layout(&layout);
distribution
.set_exemplar(
0,
scepter::Exemplar {
value: 0.25,
payload: "trace-1",
},
)
.unwrap();
assert_eq!(distribution.exemplar(0).unwrap().payload, "trace-1");
assert_eq!(
distribution.set_exemplar(
1,
scepter::Exemplar {
value: 2.0,
payload: "trace-2",
},
),
Err(DistributionError::BucketOutOfBounds)
);
}
#[test]
fn distribution_rejects_incompatible_layouts_and_bad_percentiles() {
let mut left = BucketLayout::fixed_width(0.0, 1.0, 1)
.unwrap()
.empty_distribution::<()>();
let right = BucketLayout::fixed_width(1.0, 1.0, 1)
.unwrap()
.empty_distribution::<()>();
assert_eq!(
left.try_merge(&right),
Err(DistributionError::IncompatibleLayout)
);
assert_eq!(
left.delta(&right),
Err(DistributionError::IncompatibleLayout)
);
assert_eq!(
left.percentile(101.0),
Err(DistributionError::InvalidPercentile)
);
}
#[test]
fn public_errors_have_human_readable_messages() {
assert_eq!(
DistributionError::InvalidPercentile.to_string(),
"percentile must be finite and between 0 and 100"
);
assert_eq!(
scepter::HintError::EmptyExcerpt.to_string(),
"excerpt size must be greater than zero"
);
assert_eq!(
ShardError::SplitOutsideRange.to_string(),
"split key must be inside the range"
);
assert_eq!(
StandingError::EmptyShardSet.to_string(),
"shard count must be greater than zero"
);
assert_eq!(
CollectionError::<std::convert::Infallible>::LateDelta.to_string(),
"delta point is outside the admission window"
);
}
#[test]
fn cumulative_delta_rejects_reset_and_out_of_order_points() {
let distribution = BucketLayout::fixed_width(0.0, 1.0, 1)
.unwrap()
.empty_distribution::<()>();
let reset_current = scepter::CumulativePoint {
start: 2,
timestamp: 30,
value: distribution.clone(),
};
let reset_previous = scepter::CumulativePoint {
start: 1,
timestamp: 20,
value: distribution.clone(),
};
let out_of_order_current = scepter::CumulativePoint {
start: 1,
timestamp: 10,
value: distribution.clone(),
};
let out_of_order_previous = scepter::CumulativePoint {
start: 1,
timestamp: 20,
value: distribution,
};
assert_eq!(
reset_current.delta_since(&reset_previous),
Err(DistributionError::ResetOrOutOfOrderPoint)
);
assert_eq!(
out_of_order_current.delta_since(&out_of_order_previous),
Err(DistributionError::ResetOrOutOfOrderPoint)
);
}
#[test]
fn ngram_generation_is_padded_and_sized() {
assert_eq!(ngrams("ab", 2), vec!["^a", "ab", "b$"]);
assert_eq!(
ExcerptStrategy::NGram(4).excerpts("go"),
vec!["^^^g", "^^go", "^go$", "go$$", "o$$$"]
);
}
#[test]
fn candidate_union_combines_predicate_results() {
let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
index.insert_value("Metric", "name", "latency", "leaf-1");
index.insert_value("Metric", "name", "errors", "leaf-2");
let candidates = index.candidate_union(
"Metric",
"name",
&[
FieldPredicate::Equals("latency".to_owned()),
FieldPredicate::Equals("errors".to_owned()),
],
);
assert_eq!(candidates, HashSet::from(["leaf-1", "leaf-2"]));
}
#[test]
fn custom_excerpt_strategy_changes_index_semantics() {
let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::NGram(1));
index.insert_value("Target", "job", "abc", "leaf-1");
let candidates = index.candidates("Target", "job", &FieldPredicate::Equals("b".to_owned()));
assert_eq!(candidates, HashSet::from(["leaf-1"]));
}
#[test]
fn key_encoding_covers_string_bool_and_field_values() {
assert_eq!("abc".to_owned().encoded_key(), "abc".encoded_key());
assert!(false.encoded_key() < true.encoded_key());
assert_eq!(FieldValue::I64(-3).encoded_key(), (-3_i64).encoded_key());
assert_eq!(FieldValue::F64(1.5).encoded_key(), 1.5_f64.encoded_key());
}
#[test]
fn key_encoder_uses_separators_between_fields() {
let bytes = KeyEncoder::new()
.push_field("ab")
.push_field("cd")
.clone()
.finish();
assert_eq!(bytes, b"ab\0cd");
}
#[test]
fn schema_and_series_keys_expose_exact_semantics() {
let schema = TargetSchema::new(
"ComputeTask",
vec![Field::new("cluster", ValueKind::String)],
"cluster",
);
assert!(schema.has_field("cluster"));
assert!(!schema.has_field("job"));
let metric = MetricSchema::new(
"/rpc/server/latency",
vec![Field::new("command", ValueKind::String)],
ValueKind::Distribution,
MetricKind::Cumulative,
);
assert_eq!(metric.metric_kind, MetricKind::Cumulative);
let key = TimeSeriesKey {
target_schema: "ComputeTask".to_owned(),
target_fields: vec![FieldValue::String("aa".to_owned())],
metric_name: "/rpc/server/latency".to_owned(),
metric_fields: vec![FieldValue::String("Query".to_owned())],
};
let target = KeyEncoder::new()
.push_field("ComputeTask")
.push_field("aa")
.clone()
.finish();
assert_eq!(key.target_key(), target);
assert!(key.series_key().ends_with(b"Query"));
}
#[test]
fn query_nodes_and_fanout_intersection_have_observable_behavior() {
assert_eq!(
PlanNode::GroupBy { fields: vec![] }.lowest_valid_level(),
scepter::ExecutionLevel::Leaf
);
assert_eq!(
PlanNode::GroupBy {
fields: vec!["zone".to_owned()]
}
.lowest_valid_level(),
scepter::ExecutionLevel::Zone
);
let mut fanout = scepter::FanoutPlan::new(
scepter::ExecutionLevel::Leaf,
HashSet::from(["leaf-1", "leaf-2"]),
);
fanout.intersect(&HashSet::from(["leaf-2", "leaf-3"]));
assert_eq!(fanout.children, HashSet::from(["leaf-2"]));
let fragment = QueryFragment {
level: scepter::ExecutionLevel::Leaf,
nodes: vec![PlanNode::Scan {
metric: "m".to_owned(),
}],
};
assert_eq!(fragment.nodes.len(), 1);
}
#[test]
fn load_score_and_range_assignment_operations_are_observable() {
let load = LoadSample {
writes_per_second: 2.0,
bytes_per_second: 2048.0,
queries_per_second: 3.0,
};
assert_eq!(load.score(), 7.0);
let mut assigner = RangeAssigner::new();
assigner.assign(vec![0]..vec![10], "leaf-1").unwrap();
assert_eq!(assigner.worker_for_encoded(&[10]), None);
assigner.reassign_start(&[0], "leaf-2").unwrap();
assert_eq!(assigner.worker_for_encoded(&[5]), Some(&"leaf-2"));
assert_eq!(
assigner.reassign_start(&[9], "leaf-3"),
Err(ShardError::RangeNotFound)
);
assert_eq!(
assigner.try_assign(vec![9]..vec![9], "leaf-3"),
Err(ShardError::EmptyRange)
);
assert_eq!(
assigner.split_at(&[0], vec![0], "leaf-4"),
Err(ShardError::SplitOutsideRange)
);
assert_eq!(assigner.assignments().count(), 1);
}
#[test]
fn standing_query_schedule_and_sharding_are_concrete() {
assert_eq!(Schedule::every_seconds(5).every_millis, 5000);
let shard = EvaluatorShard::for_query("query-a", 8).unwrap();
assert!(shard.shard < 8);
assert!(shard.owns("query-a"));
let other_name = (0..100)
.map(|index| format!("query-b-{index}"))
.find(|name| !shard.owns(name))
.expect("at least one generated query should map elsewhere");
assert!(!shard.owns(&other_name));
let standing = StandingQuery {
name: "query-a".to_owned(),
schedule: Schedule::every_seconds(5),
plan: scepter::LogicalPlan::default(),
output_metric: "/derived/query-a".to_owned(),
};
assert_eq!(standing.output_metric, "/derived/query-a");
}
#[test]
fn collection_aggregator_uses_mergeable_values() {
#[derive(Debug, Clone, PartialEq)]
struct Counter(u64);
impl Mergeable for Counter {
type Error = std::convert::Infallible;
fn merge_from(&mut self, other: Self) -> Result<(), Self::Error> {
self.0 += other.0;
Ok(())
}
}
let mut aggregator = CollectionAggregator::new();
aggregator.add("k", Counter(2)).unwrap();
aggregator.add("k", Counter(5)).unwrap();
assert_eq!(aggregator.into_inner()["k"], Counter(7));
}
#[test]
fn u64_mergeable_adds_values() {
let mut value = 10_u64;
value.merge_from(7).unwrap();
assert_eq!(value, 17);
}
#[test]
fn bucketed_collection_aggregation_exposes_window_semantics() {
let mut aggregator =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(5), SumReducer).unwrap();
assert_eq!(aggregator.period(), 10);
assert_eq!(aggregator.offset(), 0);
assert_eq!(aggregator.admission().length(), 5);
assert_eq!(aggregator.watermark(), 0);
aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 9,
value: 2_u64,
})
.unwrap();
aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 10,
value: 5_u64,
})
.unwrap();
assert_eq!(aggregator.open_bucket_count(), 2);
let finalized = aggregator.advance_to(15);
assert_eq!(finalized.len(), 1);
assert_eq!(finalized[0].start, 0);
assert_eq!(finalized[0].end, 10);
assert_eq!(finalized[0].value, 2);
assert_eq!(aggregator.watermark(), 15);
assert_eq!(
aggregator.ingest(DeltaPoint {
key: "user-a",
end_time: 9,
value: 1_u64,
}),
Err(CollectionError::LateDelta)
);
}
#[test]
fn bucketed_collection_ingest_at_returns_finalized_buckets() {
let mut aggregator =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(0), SumReducer).unwrap();
aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 2,
value: 3_u64,
})
.unwrap();
let finalized = aggregator
.ingest_at(
10,
DeltaPoint {
key: "user-a",
end_time: 11,
value: 4_u64,
},
)
.unwrap();
assert_eq!(finalized.len(), 1);
assert_eq!(finalized[0].value, 3);
assert_eq!(aggregator.open_bucket_count(), 1);
}
#[test]
fn bucketed_collection_min_and_max_reducers_are_observable() {
let mut min =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(0), MinReducer).unwrap();
let mut max =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(0), MaxReducer).unwrap();
for value in [8_u64, 3, 11] {
min.ingest(DeltaPoint {
key: "cpu",
end_time: 1,
value,
})
.unwrap();
max.ingest(DeltaPoint {
key: "cpu",
end_time: 1,
value,
})
.unwrap();
}
assert_eq!(min.advance_to(10)[0].value, 3);
assert_eq!(max.advance_to(10)[0].value, 11);
}
#[test]
fn replica_resolver_prefers_available_complete_dense_replicas() {
assert_eq!(ReplicaResolver::with_max_fallbacks(2).max_fallbacks(), 2);
let candidates = vec![
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"recovering-complete",
ReplicaQuality::new(10, 40, 100, 100, true, ReplicaState::Recovering),
),
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"available-dense",
ReplicaQuality::new(10, 40, 95, 100, true, ReplicaState::Available),
),
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"available-sparse",
ReplicaQuality::new(10, 40, 70, 100, true, ReplicaState::Available),
),
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"unavailable",
ReplicaQuality::new(10, 40, 100, 100, true, ReplicaState::Unavailable),
),
];
let resolved = ReplicaResolver::with_max_fallbacks(2).resolve(candidates);
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].primary, "available-dense");
assert_eq!(
resolved[0].fallbacks,
vec!["available-sparse", "recovering-complete"]
);
assert_eq!(resolved[0].quality.density(), 0.95);
}
#[test]
fn replica_resolver_resolves_ranges_independently() {
let resolved = ReplicaResolver::with_max_fallbacks(0).resolve(vec![
ReplicaCandidate::new(
b"m".to_vec()..b"z".to_vec(),
"right",
ReplicaQuality::new(0, 60, 60, 60, true, ReplicaState::Available),
),
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"left",
ReplicaQuality::new(0, 30, 30, 60, false, ReplicaState::Available),
),
]);
assert_eq!(resolved.len(), 2);
assert_eq!(resolved[0].range, b"a".to_vec()..b"m".to_vec());
assert_eq!(resolved[0].fallbacks.len(), 0);
assert_eq!(resolved[1].range, b"m".to_vec()..b"z".to_vec());
}
#[test]
fn query_health_tracks_degraded_children() {
let mut health = QueryHealth::with_expected_children(3);
health.record_completed();
health.record_completed();
health.push_issue("zone-a", IssueKind::PrunedZone, "soft deadline elapsed");
health.push_issue("leaf-7", IssueKind::MissingReplica, "no selectable replica");
assert!(health.is_partial());
assert_eq!(health.expected_children(), 3);
assert_eq!(health.completed_children(), 2);
assert_eq!(health.completeness(), 2.0 / 3.0);
assert_eq!(health.issues().len(), 2);
assert_eq!(health.issues()[1].kind, IssueKind::MissingReplica);
}
#[test]
fn query_health_distinguishes_issue_only_and_missing_child_partiality() {
let mut issue_only = QueryHealth::with_expected_children(1);
issue_only.record_completed();
issue_only.push_issue("zone-a", IssueKind::StaleConfig, "using cached config");
assert!(issue_only.is_partial());
assert_eq!(issue_only.completeness(), 1.0);
let missing_child = QueryHealth::<&str>::with_expected_children(1);
assert!(missing_child.is_partial());
assert_eq!(missing_child.completeness(), 0.0);
}
#[cfg(feature = "compressed-postings")]
#[test]
fn compressed_postings_match_generic_hint_candidates() {
use scepter::NumericFieldHintIndex;
let mut generic = FieldHintIndex::with_strategy(ExcerptStrategy::Trigram);
let mut compressed = NumericFieldHintIndex::with_strategy(ExcerptStrategy::Trigram);
for (child, value) in [(1_u32, "monarch"), (2, "monitor"), (3, "archival-monarch")] {
generic.insert_value("ComputeTask", "job", value, child);
compressed.insert_value("ComputeTask", "job", value, child);
}
let predicate = FieldPredicate::Contains("monarch".to_owned());
let generic_candidates = generic.candidates("ComputeTask", "job", &predicate);
let compressed_candidates = compressed.candidates("ComputeTask", "job", &predicate);
assert_eq!(
generic_candidates.len(),
compressed_candidates.len() as usize
);
for child in generic_candidates {
assert!(compressed_candidates.contains(child));
}
assert_eq!(compressed.posting_count(), generic.posting_count() as u64);
}
#[cfg(feature = "arrow")]
#[test]
fn arrow_exports_preserve_row_counts_and_schema_names() {
use scepter::{finalized_u64_buckets_to_record_batch, query_health_to_record_batch};
let buckets = vec![
scepter::FinalizedBucket {
key: "series-a",
start: 0,
end: 60,
value: 3_u64,
},
scepter::FinalizedBucket {
key: "series-b",
start: 60,
end: 120,
value: 5_u64,
},
];
let batch = finalized_u64_buckets_to_record_batch(&buckets).unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.schema().field(3).name(), "value");
let mut health = QueryHealth::with_expected_children(1);
health.push_issue("zone-a", IssueKind::TimedOut, "deadline elapsed");
let issues = query_health_to_record_batch(&health).unwrap();
assert_eq!(issues.num_rows(), 1);
assert_eq!(issues.schema().field(1).name(), "kind");
}
#[cfg(all(feature = "cbor", feature = "zstd"))]
#[test]
fn wire_helpers_round_trip_compressed_cbor_payloads() {
use scepter::{from_cbor_zstd, to_cbor_zstd, WireFinalizedBucket};
let bucket = WireFinalizedBucket {
key: "series-a".to_owned(),
start: 0,
end: 60,
value: 8_u64,
};
let encoded = to_cbor_zstd(&bucket, 1).unwrap();
let decoded: WireFinalizedBucket<String, u64> = from_cbor_zstd(&encoded).unwrap();
assert_eq!(decoded, bucket);
}