use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use crate::query::LogicalPlan;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StandingError {
EmptyShardSet,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Schedule {
pub every_millis: u64,
}
impl Schedule {
pub fn every_seconds(seconds: u64) -> Self {
Self {
every_millis: seconds.saturating_mul(1000),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StandingQuery {
pub name: String,
pub schedule: Schedule,
pub plan: LogicalPlan,
pub output_metric: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EvaluatorShard {
pub shard: usize,
pub shard_count: usize,
}
impl EvaluatorShard {
pub fn for_query(name: &str, shard_count: usize) -> Result<Self, StandingError> {
if shard_count == 0 {
return Err(StandingError::EmptyShardSet);
}
Ok(Self {
shard: shard_for(name, shard_count),
shard_count,
})
}
pub fn owns(&self, query_name: &str) -> bool {
shard_for(query_name, self.shard_count) == self.shard
}
}
fn shard_for(name: &str, shard_count: usize) -> usize {
debug_assert!(shard_count > 0);
let mut hasher = DefaultHasher::new();
name.hash(&mut hasher);
hasher.finish() as usize % shard_count
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn evaluator_shards_are_stable_for_same_name() {
let first = EvaluatorShard::for_query("alert.rpc_latency", 16).unwrap();
let second = EvaluatorShard::for_query("alert.rpc_latency", 16).unwrap();
assert_eq!(first, second);
assert!(first.owns("alert.rpc_latency"));
}
}