scepter 0.1.5

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use proptest::prelude::*;
use scepter::{
    AdmissionWindow, Bucket, BucketLayout, BucketedAggregator, CumulativePoint, DeltaPoint,
    Distribution, ExcerptStrategy, FieldHintIndex, FieldPredicate, KeyEncoder, LexicographicKey,
    RangeAssigner, SumReducer,
};

proptest! {
    #[test]
    fn u64_lexicographic_encoding_preserves_numeric_order(left in any::<u64>(), right in any::<u64>()) {
        prop_assert_eq!(left.cmp(&right), left.encoded_key().cmp(&right.encoded_key()));
    }

    #[test]
    fn i64_lexicographic_encoding_preserves_numeric_order(left in any::<i64>(), right in any::<i64>()) {
        prop_assert_eq!(left.cmp(&right), left.encoded_key().cmp(&right.encoded_key()));
    }

    #[test]
    fn finite_f64_lexicographic_encoding_preserves_numeric_order(
        left in -1.0e12_f64..1.0e12,
        right in -1.0e12_f64..1.0e12,
    ) {
        prop_assume!(!left.is_nan() && !right.is_nan());
        prop_assert_eq!(left.total_cmp(&right), left.encoded_key().cmp(&right.encoded_key()));
    }

    #[test]
    fn compound_keys_preserve_later_field_order(prefix in "[a-z]{1,8}", left in 0_u64..1_000_000, right in 0_u64..1_000_000) {
        let mut left_key = KeyEncoder::new();
        left_key.push_field(prefix.as_str()).push_field(&left);

        let mut right_key = KeyEncoder::new();
        right_key.push_field(prefix.as_str()).push_field(&right);

        prop_assert_eq!(
            left.cmp(&right),
            left_key.finish().cmp(&right_key.finish())
        );
    }

    #[test]
    fn range_assigner_routes_to_the_unique_containing_range(key in 0_u8..=254) {
        let mut assigner = RangeAssigner::new();
        assigner.assign(vec![0]..vec![85], "low").unwrap();
        assigner.assign(vec![85]..vec![170], "mid").unwrap();
        assigner.assign(vec![170]..vec![255], "high").unwrap();

        let expected = match key {
            0..=84 => Some(&"low"),
            85..=169 => Some(&"mid"),
            170..=254 => Some(&"high"),
            255 => None,
        };

        prop_assert_eq!(assigner.worker_for_encoded(&[key]), expected);
    }

    #[test]
    fn range_split_preserves_routing_for_each_side(split in 1_u8..=254, key in 0_u8..=254) {
        let mut assigner = RangeAssigner::new();
        assigner.assign(vec![0]..vec![255], "left").unwrap();
        assigner.split_at(&[0], vec![split], "right").unwrap();

        let expected = if key < split { Some(&"left") } else { Some(&"right") };
        prop_assert_eq!(assigner.worker_for_encoded(&[key]), expected);
    }

    #[test]
    fn field_hint_exact_lookup_returns_inserted_child(value in "[a-z]{1,24}") {
        let mut index = FieldHintIndex::new();
        index.insert_value("Target", "job", &value, "leaf");

        let candidates = index.candidates("Target", "job", &FieldPredicate::Equals(value));

        prop_assert!(candidates.contains("leaf"));
    }

    #[test]
    fn full_excerpt_strategy_does_not_match_distinct_full_values(
        left in "[a-z]{1,12}",
        right in "[a-z]{1,12}",
    ) {
        prop_assume!(left != right);

        let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
        index.insert_value("Metric", "name", &left, "left-child");
        index.insert_value("Metric", "name", &right, "right-child");

        let candidates = index.candidates("Metric", "name", &FieldPredicate::Equals(left));

        prop_assert!(candidates.contains("left-child"));
        prop_assert!(!candidates.contains("right-child"));
    }

    #[test]
    fn distribution_merge_total_count_is_additive(
        (left_counts, right_counts) in equal_length_count_vectors(1..16, 0_u64..10_000),
    ) {
        let left_total: u64 = left_counts.iter().sum();
        let right_total: u64 = right_counts.iter().sum();
        let mut left = distribution_from_counts(&left_counts);
        let right = distribution_from_counts(&right_counts);

        left.try_merge(&right).unwrap();

        prop_assert_eq!(left.total_count(), left_total + right_total);
    }

    #[test]
    fn percentile_is_inside_bucket_layout(counts in prop::collection::vec(0_u64..1000, 1..16), percentile in 0.0_f64..=100.0) {
        prop_assume!(counts.iter().any(|count| *count > 0));

        let distribution = distribution_from_counts(&counts);
        let value = distribution.percentile(percentile).unwrap().unwrap();
        let first = distribution.buckets().first().unwrap().range.start;
        let last = distribution.buckets().last().unwrap().range.end;

        prop_assert!(first <= value && value <= last);
    }

    #[test]
    fn cumulative_delta_total_is_saturating_difference(
        (previous_counts, increments) in equal_length_count_vectors(1..16, 0_u64..1000),
    ) {
        let current_counts: Vec<u64> = previous_counts
            .iter()
            .zip(&increments)
            .map(|(previous, increment)| previous + increment)
            .collect();
        let previous_total: u64 = previous_counts.iter().sum();
        let current_total: u64 = current_counts.iter().sum();

        let current = CumulativePoint {
            start: 1,
            timestamp: 20,
            value: distribution_from_counts(&current_counts),
        };
        let previous = CumulativePoint {
            start: 1,
            timestamp: 10,
            value: distribution_from_counts(&previous_counts),
        };

        let delta = current.delta_since(&previous).unwrap();

        prop_assert_eq!(delta.value.total_count(), current_total - previous_total);
    }

    #[test]
    fn bucket_layout_from_bounds_accepts_strictly_increasing_bounds(
        start in -10_000_i64..10_000,
        steps in prop::collection::vec(1_i64..100, 1..16),
    ) {
        let mut bounds = Vec::with_capacity(steps.len() + 1);
        let mut current = start as f64;
        bounds.push(current);
        for step in steps {
            current += step as f64;
            bounds.push(current);
        }

        let layout = BucketLayout::from_bounds(&bounds).unwrap();

        prop_assert_eq!(layout.ranges.len(), bounds.len() - 1);
    }

    #[test]
    fn bucketed_aggregation_sum_matches_input_for_one_bucket(
        values in prop::collection::vec(0_u64..1_000, 1..64),
    ) {
        let mut aggregator =
            BucketedAggregator::with_reducer(60, AdmissionWindow::new(0), SumReducer).unwrap();
        let expected: u64 = values.iter().sum();

        for value in values {
            aggregator
                .ingest(DeltaPoint {
                    key: "user-a",
                    end_time: 10,
                    value,
                })
                .unwrap();
        }

        let finalized = aggregator.advance_to(60);

        prop_assert_eq!(finalized.len(), 1);
        prop_assert_eq!(finalized[0].value, expected);
    }

    #[test]
    fn bucketed_aggregation_rejects_points_before_admission_cutoff(
        watermark in 1_u64..10_000,
        window in 0_u64..1_000,
    ) {
        let mut aggregator =
            BucketedAggregator::with_reducer(10, AdmissionWindow::new(window), SumReducer).unwrap();
        aggregator.advance_to(watermark);
        let cutoff = watermark.saturating_sub(window);
        prop_assume!(cutoff > 0);

        let result = aggregator.ingest(DeltaPoint {
            key: "user-a",
            end_time: cutoff - 1,
            value: 1_u64,
        });

        prop_assert!(matches!(result, Err(scepter::CollectionError::LateDelta)));
    }
}

fn distribution_from_counts(counts: &[u64]) -> Distribution<()> {
    let buckets = counts
        .iter()
        .enumerate()
        .map(|(index, count)| Bucket {
            range: index as f64..(index + 1) as f64,
            count: *count,
        })
        .collect();
    Distribution::new(buckets)
}

fn equal_length_count_vectors(
    len_range: std::ops::Range<usize>,
    value_range: std::ops::Range<u64>,
) -> impl Strategy<Value = (Vec<u64>, Vec<u64>)> {
    len_range.prop_flat_map(move |len| {
        (
            prop::collection::vec(value_range.clone(), len),
            prop::collection::vec(value_range.clone(), len),
        )
    })
}