scepter 0.1.0

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use crate::query::LogicalPlan;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StandingError {
    EmptyShardSet,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Schedule {
    pub every_millis: u64,
}

impl Schedule {
    pub fn every_seconds(seconds: u64) -> Self {
        Self {
            every_millis: seconds.saturating_mul(1000),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StandingQuery {
    pub name: String,
    pub schedule: Schedule,
    pub plan: LogicalPlan,
    pub output_metric: String,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EvaluatorShard {
    pub shard: usize,
    pub shard_count: usize,
}

impl EvaluatorShard {
    pub fn for_query(name: &str, shard_count: usize) -> Result<Self, StandingError> {
        if shard_count == 0 {
            return Err(StandingError::EmptyShardSet);
        }
        Ok(Self {
            shard: shard_for(name, shard_count),
            shard_count,
        })
    }

    pub fn owns(&self, query_name: &str) -> bool {
        shard_for(query_name, self.shard_count) == self.shard
    }
}

fn shard_for(name: &str, shard_count: usize) -> usize {
    debug_assert!(shard_count > 0);
    let mut hasher = DefaultHasher::new();
    name.hash(&mut hasher);
    hasher.finish() as usize % shard_count
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn evaluator_shards_are_stable_for_same_name() {
        let first = EvaluatorShard::for_query("alert.rpc_latency", 16).unwrap();
        let second = EvaluatorShard::for_query("alert.rpc_latency", 16).unwrap();

        assert_eq!(first, second);
        assert!(first.owns("alert.rpc_latency"));
    }
}