scepter 0.1.3

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::collections::HashSet;

use scepter::{
    ngrams, BucketLayout, CollectionAggregator, Distribution, DistributionError, EvaluatorShard,
    ExcerptStrategy, Field, FieldHintIndex, FieldPredicate, FieldValue, KeyEncoder,
    LexicographicKey, LoadSample, Mergeable, MetricKind, MetricSchema, PartialAggregate, PlanNode,
    QueryFragment, RangeAssigner, Schedule, ShardError, StandingError, StandingQuery, Sum,
    TargetSchema, TimeSeriesKey, ValueKind,
};

#[test]
fn sum_aggregator_accumulates_and_finishes() {
    let mut sum = Sum::<u64>::default();
    sum.add(3);
    sum.add(4);

    assert_eq!(sum.finish(), 7);
}

#[test]
fn fixed_width_layout_builds_exact_contiguous_ranges() {
    let layout = BucketLayout::fixed_width(-1.0, 0.5, 3).unwrap();

    assert_eq!(layout.ranges, vec![-1.0..-0.5, -0.5..0.0, 0.0..0.5]);
    assert_eq!(
        BucketLayout::fixed_width(0.0, -1.0, 1),
        Err(DistributionError::InvalidBucketRange)
    );
    assert_eq!(
        BucketLayout::fixed_width(f64::INFINITY, 1.0, 1),
        Err(DistributionError::InvalidBucketRange)
    );
    assert_eq!(
        BucketLayout::fixed_width(0.0, f64::NAN, 1),
        Err(DistributionError::InvalidBucketRange)
    );
    assert_eq!(
        BucketLayout::fixed_width(0.0, 1.0, 0),
        Err(DistributionError::EmptyLayout)
    );
}

#[test]
fn bucket_for_handles_edges_and_out_of_range_values() {
    let layout = BucketLayout::fixed_width(0.0, 10.0, 2).unwrap();

    assert_eq!(layout.bucket_for(0.0), Some(0));
    assert_eq!(layout.bucket_for(10.0), Some(1));
    assert_eq!(layout.bucket_for(20.0), Some(1));
    assert_eq!(layout.bucket_for(-0.1), None);
    assert_eq!(layout.bucket_for(20.1), None);
}

#[test]
fn record_updates_only_matching_bucket() {
    let layout = BucketLayout::fixed_width(0.0, 10.0, 2).unwrap();
    let mut distribution = Distribution::<()>::from_layout(&layout);

    distribution.record(5.0, 3).unwrap();
    distribution.record(10.0, 2).unwrap();
    distribution.record(15.0, 4).unwrap();
    distribution.record(20.0, 5).unwrap();

    assert_eq!(distribution.buckets()[0].count, 3);
    assert_eq!(distribution.buckets()[1].count, 11);
    assert_eq!(
        distribution.record(25.0, 1),
        Err(DistributionError::BucketOutOfBounds)
    );
}

#[test]
fn exemplars_are_set_and_checked_by_bucket() {
    let layout = BucketLayout::fixed_width(0.0, 1.0, 1).unwrap();
    let mut distribution = Distribution::from_layout(&layout);

    distribution
        .set_exemplar(
            0,
            scepter::Exemplar {
                value: 0.25,
                payload: "trace-1",
            },
        )
        .unwrap();

    assert_eq!(distribution.exemplar(0).unwrap().payload, "trace-1");
    assert_eq!(
        distribution.set_exemplar(
            1,
            scepter::Exemplar {
                value: 2.0,
                payload: "trace-2",
            },
        ),
        Err(DistributionError::BucketOutOfBounds)
    );
}

#[test]
fn distribution_rejects_incompatible_layouts_and_bad_percentiles() {
    let mut left = BucketLayout::fixed_width(0.0, 1.0, 1)
        .unwrap()
        .empty_distribution::<()>();
    let right = BucketLayout::fixed_width(1.0, 1.0, 1)
        .unwrap()
        .empty_distribution::<()>();

    assert_eq!(
        left.try_merge(&right),
        Err(DistributionError::IncompatibleLayout)
    );
    assert_eq!(
        left.delta(&right),
        Err(DistributionError::IncompatibleLayout)
    );
    assert_eq!(
        left.percentile(101.0),
        Err(DistributionError::InvalidPercentile)
    );
}

#[test]
fn public_errors_have_human_readable_messages() {
    assert_eq!(
        DistributionError::InvalidPercentile.to_string(),
        "percentile must be finite and between 0 and 100"
    );
    assert_eq!(
        scepter::HintError::EmptyExcerpt.to_string(),
        "excerpt size must be greater than zero"
    );
    assert_eq!(
        ShardError::SplitOutsideRange.to_string(),
        "split key must be inside the range"
    );
    assert_eq!(
        StandingError::EmptyShardSet.to_string(),
        "shard count must be greater than zero"
    );
}

#[test]
fn cumulative_delta_rejects_reset_and_out_of_order_points() {
    let distribution = BucketLayout::fixed_width(0.0, 1.0, 1)
        .unwrap()
        .empty_distribution::<()>();
    let reset_current = scepter::CumulativePoint {
        start: 2,
        timestamp: 30,
        value: distribution.clone(),
    };
    let reset_previous = scepter::CumulativePoint {
        start: 1,
        timestamp: 20,
        value: distribution.clone(),
    };
    let out_of_order_current = scepter::CumulativePoint {
        start: 1,
        timestamp: 10,
        value: distribution.clone(),
    };
    let out_of_order_previous = scepter::CumulativePoint {
        start: 1,
        timestamp: 20,
        value: distribution,
    };

    assert_eq!(
        reset_current.delta_since(&reset_previous),
        Err(DistributionError::ResetOrOutOfOrderPoint)
    );
    assert_eq!(
        out_of_order_current.delta_since(&out_of_order_previous),
        Err(DistributionError::ResetOrOutOfOrderPoint)
    );
}

#[test]
fn ngram_generation_is_padded_and_sized() {
    assert_eq!(ngrams("ab", 2), vec!["^a", "ab", "b$"]);
    assert_eq!(
        ExcerptStrategy::NGram(4).excerpts("go"),
        vec!["^^^g", "^^go", "^go$", "go$$", "o$$$"]
    );
}

#[test]
fn candidate_union_combines_predicate_results() {
    let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
    index.insert_value("Metric", "name", "latency", "leaf-1");
    index.insert_value("Metric", "name", "errors", "leaf-2");

    let candidates = index.candidate_union(
        "Metric",
        "name",
        &[
            FieldPredicate::Equals("latency".to_owned()),
            FieldPredicate::Equals("errors".to_owned()),
        ],
    );

    assert_eq!(candidates, HashSet::from(["leaf-1", "leaf-2"]));
}

#[test]
fn custom_excerpt_strategy_changes_index_semantics() {
    let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::NGram(1));
    index.insert_value("Target", "job", "abc", "leaf-1");

    let candidates = index.candidates("Target", "job", &FieldPredicate::Equals("b".to_owned()));

    assert_eq!(candidates, HashSet::from(["leaf-1"]));
}

#[test]
fn key_encoding_covers_string_bool_and_field_values() {
    assert_eq!("abc".to_owned().encoded_key(), "abc".encoded_key());
    assert!(false.encoded_key() < true.encoded_key());
    assert_eq!(FieldValue::I64(-3).encoded_key(), (-3_i64).encoded_key());
    assert_eq!(FieldValue::F64(1.5).encoded_key(), 1.5_f64.encoded_key());
}

#[test]
fn key_encoder_uses_separators_between_fields() {
    let bytes = KeyEncoder::new()
        .push_field("ab")
        .push_field("cd")
        .clone()
        .finish();

    assert_eq!(bytes, b"ab\0cd");
}

#[test]
fn schema_and_series_keys_expose_exact_semantics() {
    let schema = TargetSchema::new(
        "ComputeTask",
        vec![Field::new("cluster", ValueKind::String)],
        "cluster",
    );
    assert!(schema.has_field("cluster"));
    assert!(!schema.has_field("job"));

    let metric = MetricSchema::new(
        "/rpc/server/latency",
        vec![Field::new("command", ValueKind::String)],
        ValueKind::Distribution,
        MetricKind::Cumulative,
    );
    assert_eq!(metric.metric_kind, MetricKind::Cumulative);

    let key = TimeSeriesKey {
        target_schema: "ComputeTask".to_owned(),
        target_fields: vec![FieldValue::String("aa".to_owned())],
        metric_name: "/rpc/server/latency".to_owned(),
        metric_fields: vec![FieldValue::String("Query".to_owned())],
    };
    let target = KeyEncoder::new()
        .push_field("ComputeTask")
        .push_field("aa")
        .clone()
        .finish();

    assert_eq!(key.target_key(), target);
    assert!(key.series_key().ends_with(b"Query"));
}

#[test]
fn query_nodes_and_fanout_intersection_have_observable_behavior() {
    assert_eq!(
        PlanNode::GroupBy { fields: vec![] }.lowest_valid_level(),
        scepter::ExecutionLevel::Leaf
    );
    assert_eq!(
        PlanNode::GroupBy {
            fields: vec!["zone".to_owned()]
        }
        .lowest_valid_level(),
        scepter::ExecutionLevel::Zone
    );

    let mut fanout = scepter::FanoutPlan::new(
        scepter::ExecutionLevel::Leaf,
        HashSet::from(["leaf-1", "leaf-2"]),
    );
    fanout.intersect(&HashSet::from(["leaf-2", "leaf-3"]));
    assert_eq!(fanout.children, HashSet::from(["leaf-2"]));

    let fragment = QueryFragment {
        level: scepter::ExecutionLevel::Leaf,
        nodes: vec![PlanNode::Scan {
            metric: "m".to_owned(),
        }],
    };
    assert_eq!(fragment.nodes.len(), 1);
}

#[test]
fn load_score_and_range_assignment_operations_are_observable() {
    let load = LoadSample {
        writes_per_second: 2.0,
        bytes_per_second: 2048.0,
        queries_per_second: 3.0,
    };
    assert_eq!(load.score(), 7.0);

    let mut assigner = RangeAssigner::new();
    assigner.assign(vec![0]..vec![10], "leaf-1").unwrap();
    assert_eq!(assigner.worker_for_encoded(&[10]), None);

    assigner.reassign_start(&[0], "leaf-2").unwrap();
    assert_eq!(assigner.worker_for_encoded(&[5]), Some(&"leaf-2"));
    assert_eq!(
        assigner.reassign_start(&[9], "leaf-3"),
        Err(ShardError::RangeNotFound)
    );
    assert_eq!(
        assigner.try_assign(vec![9]..vec![9], "leaf-3"),
        Err(ShardError::EmptyRange)
    );
    assert_eq!(
        assigner.split_at(&[0], vec![0], "leaf-4"),
        Err(ShardError::SplitOutsideRange)
    );
    assert_eq!(assigner.assignments().count(), 1);
}

#[test]
fn standing_query_schedule_and_sharding_are_concrete() {
    assert_eq!(Schedule::every_seconds(5).every_millis, 5000);

    let shard = EvaluatorShard::for_query("query-a", 8).unwrap();
    assert!(shard.shard < 8);
    assert!(shard.owns("query-a"));

    let other_name = (0..100)
        .map(|index| format!("query-b-{index}"))
        .find(|name| !shard.owns(name))
        .expect("at least one generated query should map elsewhere");
    assert!(!shard.owns(&other_name));

    let standing = StandingQuery {
        name: "query-a".to_owned(),
        schedule: Schedule::every_seconds(5),
        plan: scepter::LogicalPlan::default(),
        output_metric: "/derived/query-a".to_owned(),
    };
    assert_eq!(standing.output_metric, "/derived/query-a");
}

#[test]
fn collection_aggregator_uses_mergeable_values() {
    #[derive(Debug, Clone, PartialEq)]
    struct Counter(u64);

    impl Mergeable for Counter {
        type Error = std::convert::Infallible;

        fn merge_from(&mut self, other: Self) -> Result<(), Self::Error> {
            self.0 += other.0;
            Ok(())
        }
    }

    let mut aggregator = CollectionAggregator::new();
    aggregator.add("k", Counter(2)).unwrap();
    aggregator.add("k", Counter(5)).unwrap();

    assert_eq!(aggregator.into_inner()["k"], Counter(7));
}

#[test]
fn u64_mergeable_adds_values() {
    let mut value = 10_u64;

    value.merge_from(7).unwrap();

    assert_eq!(value, 17);
}