scepter 0.1.5

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::collections::BTreeMap;
use std::error::Error;
use std::fmt;
use std::ops::Range;

use crate::key::LexicographicKey;

/// Errors produced by range assignment operations.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ShardError {
    /// The assigned range start is greater than or equal to its end.
    EmptyRange,
    /// No range exists at the requested start key.
    RangeNotFound,
    /// The split key is outside the range being split.
    SplitOutsideRange,
}

impl fmt::Display for ShardError {
    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
        let message = match self {
            Self::EmptyRange => "range start must be less than range end",
            Self::RangeNotFound => "range was not found",
            Self::SplitOutsideRange => "split key must be inside the range",
        };
        formatter.write_str(message)
    }
}

impl Error for ShardError {}

/// Assigns ordered key ranges to workers.
#[derive(Debug, Clone, Default)]
pub struct RangeAssigner<WorkerId> {
    ranges: BTreeMap<Vec<u8>, RangeAssignment<WorkerId>>,
}

/// Worker assignment for one range.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangeAssignment<WorkerId> {
    /// Exclusive end key for this assignment.
    pub end: Vec<u8>,
    /// Worker that owns `[start, end)`.
    pub worker: WorkerId,
}

/// Load observation for one range.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct LoadSample {
    /// Write operations per second.
    pub writes_per_second: f64,
    /// Ingested bytes per second.
    pub bytes_per_second: f64,
    /// Query operations per second.
    pub queries_per_second: f64,
}

impl LoadSample {
    /// Returns a simple composite score for comparing range load.
    pub fn score(self) -> f64 {
        self.writes_per_second + self.queries_per_second + self.bytes_per_second / 1024.0
    }
}

/// Load sample associated with one assigned range.
#[derive(Debug, Clone, PartialEq)]
pub struct RangeLoad<WorkerId> {
    /// Encoded key range being measured.
    pub range: Range<Vec<u8>>,
    /// Worker that owns the range.
    pub worker: WorkerId,
    /// Observed range load.
    pub load: LoadSample,
}

impl<WorkerId: Clone> RangeAssigner<WorkerId> {
    /// Creates an empty range assigner.
    pub fn new() -> Self {
        Self {
            ranges: BTreeMap::new(),
        }
    }

    /// Assigns `range` to `worker`.
    pub fn assign(&mut self, range: Range<Vec<u8>>, worker: WorkerId) -> Result<(), ShardError> {
        if range.start >= range.end {
            return Err(ShardError::EmptyRange);
        }

        self.ranges.insert(
            range.start,
            RangeAssignment {
                end: range.end,
                worker,
            },
        );
        Ok(())
    }

    /// Backwards-compatible alias for `assign`.
    pub fn try_assign(
        &mut self,
        range: Range<Vec<u8>>,
        worker: WorkerId,
    ) -> Result<(), ShardError> {
        self.assign(range, worker)
    }

    /// Reassigns the range that starts at `start` to `worker`.
    pub fn reassign_start(&mut self, start: &[u8], worker: WorkerId) -> Result<(), ShardError> {
        let Some(assignment) = self.ranges.get_mut(start) else {
            return Err(ShardError::RangeNotFound);
        };
        assignment.worker = worker;
        Ok(())
    }

    /// Splits the range beginning at `range_start` at `split_key`.
    pub fn split_at(
        &mut self,
        range_start: &[u8],
        split_key: Vec<u8>,
        right_worker: WorkerId,
    ) -> Result<(), ShardError> {
        let Some(left) = self.ranges.get_mut(range_start) else {
            return Err(ShardError::RangeNotFound);
        };

        if range_start >= split_key.as_slice() || split_key >= left.end {
            return Err(ShardError::SplitOutsideRange);
        }

        let right_end = left.end.clone();
        left.end = split_key.clone();
        self.ranges.insert(
            split_key,
            RangeAssignment {
                end: right_end,
                worker: right_worker,
            },
        );
        Ok(())
    }

    /// Looks up the worker for a typed lexicographic key.
    pub fn worker_for<K: LexicographicKey + ?Sized>(&self, key: &K) -> Option<&WorkerId> {
        let encoded = key.encoded_key();
        self.worker_for_encoded(&encoded)
    }

    /// Looks up the worker for an already encoded key.
    pub fn worker_for_encoded(&self, encoded: &[u8]) -> Option<&WorkerId> {
        self.ranges
            .range(..=encoded.to_vec())
            .next_back()
            .and_then(|(start, assignment)| {
                if start.as_slice() <= encoded && encoded < assignment.end.as_slice() {
                    Some(&assignment.worker)
                } else {
                    None
                }
            })
    }

    /// Returns all range assignments keyed by inclusive start key.
    pub fn assignments(&self) -> impl Iterator<Item = (&Vec<u8>, &RangeAssignment<WorkerId>)> {
        self.ranges.iter()
    }
}

/// Returns the range with the highest composite load score.
pub fn hottest_range<WorkerId>(ranges: &[RangeLoad<WorkerId>]) -> Option<&RangeLoad<WorkerId>> {
    ranges
        .iter()
        .max_by(|left, right| left.load.score().total_cmp(&right.load.score()))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn range_assigner_finds_matching_worker() {
        let mut assigner = RangeAssigner::new();
        assigner
            .assign(b"a".to_vec()..b"m".to_vec(), "leaf-1")
            .unwrap();
        assigner
            .assign(b"m".to_vec()..b"z".to_vec(), "leaf-2")
            .unwrap();

        assert_eq!(assigner.worker_for("beta"), Some(&"leaf-1"));
        assert_eq!(assigner.worker_for("omega"), Some(&"leaf-2"));
    }

    #[test]
    fn hottest_range_uses_combined_load_score() {
        let ranges = vec![
            RangeLoad {
                range: b"a".to_vec()..b"m".to_vec(),
                worker: "leaf-1",
                load: LoadSample {
                    writes_per_second: 1.0,
                    bytes_per_second: 0.0,
                    queries_per_second: 1.0,
                },
            },
            RangeLoad {
                range: b"m".to_vec()..b"z".to_vec(),
                worker: "leaf-2",
                load: LoadSample {
                    writes_per_second: 2.0,
                    bytes_per_second: 4096.0,
                    queries_per_second: 1.0,
                },
            },
        ];

        assert_eq!(hottest_range(&ranges).unwrap().worker, "leaf-2");
    }

    #[test]
    fn split_at_creates_two_routable_ranges() {
        let mut assigner = RangeAssigner::new();
        assigner
            .assign(b"a".to_vec()..b"z".to_vec(), "leaf-1")
            .unwrap();

        assigner
            .split_at(b"a", b"m".to_vec(), "leaf-2")
            .expect("split should be valid");

        assert_eq!(assigner.worker_for("beta"), Some(&"leaf-1"));
        assert_eq!(assigner.worker_for("omega"), Some(&"leaf-2"));
    }
}