use proptest::prelude::*;
use scepter::{
AdmissionWindow, Bucket, BucketLayout, BucketedAggregator, CumulativePoint, DeltaPoint,
Distribution, ExcerptStrategy, FieldHintIndex, FieldPredicate, KeyEncoder, LexicographicKey,
RangeAssigner, SumReducer,
};
proptest! {
#[test]
fn u64_lexicographic_encoding_preserves_numeric_order(left in any::<u64>(), right in any::<u64>()) {
prop_assert_eq!(left.cmp(&right), left.encoded_key().cmp(&right.encoded_key()));
}
#[test]
fn i64_lexicographic_encoding_preserves_numeric_order(left in any::<i64>(), right in any::<i64>()) {
prop_assert_eq!(left.cmp(&right), left.encoded_key().cmp(&right.encoded_key()));
}
#[test]
fn finite_f64_lexicographic_encoding_preserves_numeric_order(
left in -1.0e12_f64..1.0e12,
right in -1.0e12_f64..1.0e12,
) {
prop_assume!(!left.is_nan() && !right.is_nan());
prop_assert_eq!(left.total_cmp(&right), left.encoded_key().cmp(&right.encoded_key()));
}
#[test]
fn compound_keys_preserve_later_field_order(prefix in "[a-z]{1,8}", left in 0_u64..1_000_000, right in 0_u64..1_000_000) {
let mut left_key = KeyEncoder::new();
left_key.push_field(prefix.as_str()).push_field(&left);
let mut right_key = KeyEncoder::new();
right_key.push_field(prefix.as_str()).push_field(&right);
prop_assert_eq!(
left.cmp(&right),
left_key.finish().cmp(&right_key.finish())
);
}
#[test]
fn range_assigner_routes_to_the_unique_containing_range(key in 0_u8..=254) {
let mut assigner = RangeAssigner::new();
assigner.assign(vec![0]..vec![85], "low").unwrap();
assigner.assign(vec![85]..vec![170], "mid").unwrap();
assigner.assign(vec![170]..vec![255], "high").unwrap();
let expected = match key {
0..=84 => Some(&"low"),
85..=169 => Some(&"mid"),
170..=254 => Some(&"high"),
255 => None,
};
prop_assert_eq!(assigner.worker_for_encoded(&[key]), expected);
}
#[test]
fn range_split_preserves_routing_for_each_side(split in 1_u8..=254, key in 0_u8..=254) {
let mut assigner = RangeAssigner::new();
assigner.assign(vec![0]..vec![255], "left").unwrap();
assigner.split_at(&[0], vec![split], "right").unwrap();
let expected = if key < split { Some(&"left") } else { Some(&"right") };
prop_assert_eq!(assigner.worker_for_encoded(&[key]), expected);
}
#[test]
fn field_hint_exact_lookup_returns_inserted_child(value in "[a-z]{1,24}") {
let mut index = FieldHintIndex::new();
index.insert_value("Target", "job", &value, "leaf");
let candidates = index.candidates("Target", "job", &FieldPredicate::Equals(value));
prop_assert!(candidates.contains("leaf"));
}
#[test]
fn full_excerpt_strategy_does_not_match_distinct_full_values(
left in "[a-z]{1,12}",
right in "[a-z]{1,12}",
) {
prop_assume!(left != right);
let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
index.insert_value("Metric", "name", &left, "left-child");
index.insert_value("Metric", "name", &right, "right-child");
let candidates = index.candidates("Metric", "name", &FieldPredicate::Equals(left));
prop_assert!(candidates.contains("left-child"));
prop_assert!(!candidates.contains("right-child"));
}
#[test]
fn distribution_merge_total_count_is_additive(
(left_counts, right_counts) in equal_length_count_vectors(1..16, 0_u64..10_000),
) {
let left_total: u64 = left_counts.iter().sum();
let right_total: u64 = right_counts.iter().sum();
let mut left = distribution_from_counts(&left_counts);
let right = distribution_from_counts(&right_counts);
left.try_merge(&right).unwrap();
prop_assert_eq!(left.total_count(), left_total + right_total);
}
#[test]
fn percentile_is_inside_bucket_layout(counts in prop::collection::vec(0_u64..1000, 1..16), percentile in 0.0_f64..=100.0) {
prop_assume!(counts.iter().any(|count| *count > 0));
let distribution = distribution_from_counts(&counts);
let value = distribution.percentile(percentile).unwrap().unwrap();
let first = distribution.buckets().first().unwrap().range.start;
let last = distribution.buckets().last().unwrap().range.end;
prop_assert!(first <= value && value <= last);
}
#[test]
fn cumulative_delta_total_is_saturating_difference(
(previous_counts, increments) in equal_length_count_vectors(1..16, 0_u64..1000),
) {
let current_counts: Vec<u64> = previous_counts
.iter()
.zip(&increments)
.map(|(previous, increment)| previous + increment)
.collect();
let previous_total: u64 = previous_counts.iter().sum();
let current_total: u64 = current_counts.iter().sum();
let current = CumulativePoint {
start: 1,
timestamp: 20,
value: distribution_from_counts(¤t_counts),
};
let previous = CumulativePoint {
start: 1,
timestamp: 10,
value: distribution_from_counts(&previous_counts),
};
let delta = current.delta_since(&previous).unwrap();
prop_assert_eq!(delta.value.total_count(), current_total - previous_total);
}
#[test]
fn bucket_layout_from_bounds_accepts_strictly_increasing_bounds(
start in -10_000_i64..10_000,
steps in prop::collection::vec(1_i64..100, 1..16),
) {
let mut bounds = Vec::with_capacity(steps.len() + 1);
let mut current = start as f64;
bounds.push(current);
for step in steps {
current += step as f64;
bounds.push(current);
}
let layout = BucketLayout::from_bounds(&bounds).unwrap();
prop_assert_eq!(layout.ranges.len(), bounds.len() - 1);
}
#[test]
fn bucketed_aggregation_sum_matches_input_for_one_bucket(
values in prop::collection::vec(0_u64..1_000, 1..64),
) {
let mut aggregator =
BucketedAggregator::with_reducer(60, AdmissionWindow::new(0), SumReducer).unwrap();
let expected: u64 = values.iter().sum();
for value in values {
aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 10,
value,
})
.unwrap();
}
let finalized = aggregator.advance_to(60);
prop_assert_eq!(finalized.len(), 1);
prop_assert_eq!(finalized[0].value, expected);
}
#[test]
fn bucketed_aggregation_rejects_points_before_admission_cutoff(
watermark in 1_u64..10_000,
window in 0_u64..1_000,
) {
let mut aggregator =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(window), SumReducer).unwrap();
aggregator.advance_to(watermark);
let cutoff = watermark.saturating_sub(window);
prop_assume!(cutoff > 0);
let result = aggregator.ingest(DeltaPoint {
key: "user-a",
end_time: cutoff - 1,
value: 1_u64,
});
prop_assert!(matches!(result, Err(scepter::CollectionError::LateDelta)));
}
}
fn distribution_from_counts(counts: &[u64]) -> Distribution<()> {
let buckets = counts
.iter()
.enumerate()
.map(|(index, count)| Bucket {
range: index as f64..(index + 1) as f64,
count: *count,
})
.collect();
Distribution::new(buckets)
}
fn equal_length_count_vectors(
len_range: std::ops::Range<usize>,
value_range: std::ops::Range<u64>,
) -> impl Strategy<Value = (Vec<u64>, Vec<u64>)> {
len_range.prop_flat_map(move |len| {
(
prop::collection::vec(value_range.clone(), len),
prop::collection::vec(value_range.clone(), len),
)
})
}