scepter 0.1.5

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::collections::BTreeMap;
use std::ops::Range;

/// Operational state of a replica that may answer a query for a target range.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ReplicaState {
    /// Replica is available and not known to be recovering.
    Available,
    /// Replica can answer, but its data may still be catching up.
    Recovering,
    /// Replica should not be selected for query execution.
    Unavailable,
}

/// Quality summary returned by a replica during replica resolution.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ReplicaQuality {
    /// Earliest timestamp covered by the replica.
    pub start_time: u64,
    /// Latest timestamp covered by the replica.
    pub end_time: u64,
    /// Number of points observed in the covered interval.
    pub points: u64,
    /// Expected points for a complete result in the covered interval.
    pub expected_points: u64,
    /// Whether the replica reports complete target coverage.
    pub complete: bool,
    /// Current operational state of the replica.
    pub state: ReplicaState,
}

impl ReplicaQuality {
    /// Creates a quality summary.
    pub fn new(
        start_time: u64,
        end_time: u64,
        points: u64,
        expected_points: u64,
        complete: bool,
        state: ReplicaState,
    ) -> Self {
        Self {
            start_time,
            end_time,
            points,
            expected_points,
            complete,
            state,
        }
    }

    /// Returns the covered time span.
    pub fn coverage_span(self) -> u64 {
        self.end_time.saturating_sub(self.start_time)
    }

    /// Returns observed point density as `points / expected_points`.
    pub fn density(self) -> f64 {
        if self.expected_points == 0 {
            return 1.0;
        }
        (self.points as f64 / self.expected_points as f64).min(1.0)
    }

    /// Returns true if this replica can be selected.
    pub fn selectable(self) -> bool {
        self.state != ReplicaState::Unavailable
    }

    fn rank(self) -> ReplicaRank {
        ReplicaRank {
            selectable: self.selectable(),
            complete: self.complete,
            state: self.state,
            density: self.density(),
            coverage_span: self.coverage_span(),
            end_time: self.end_time,
            points: self.points,
        }
    }
}

/// Candidate replica for one target range.
#[derive(Debug, Clone, PartialEq)]
pub struct ReplicaCandidate<ReplicaId> {
    /// Target range covered by this candidate.
    pub range: Range<Vec<u8>>,
    /// Replica identifier.
    pub replica: ReplicaId,
    /// Replica quality summary.
    pub quality: ReplicaQuality,
}

impl<ReplicaId> ReplicaCandidate<ReplicaId> {
    /// Creates a replica candidate.
    pub fn new(range: Range<Vec<u8>>, replica: ReplicaId, quality: ReplicaQuality) -> Self {
        Self {
            range,
            replica,
            quality,
        }
    }
}

/// Selected primary and fallback replicas for one target range.
#[derive(Debug, Clone, PartialEq)]
pub struct ResolvedRange<ReplicaId> {
    /// Target range being resolved.
    pub range: Range<Vec<u8>>,
    /// Primary replica selected for query execution.
    pub primary: ReplicaId,
    /// Equivalent fallback replicas ordered by quality.
    pub fallbacks: Vec<ReplicaId>,
    /// Quality of the selected primary.
    pub quality: ReplicaQuality,
}

/// Selects the best replica per range and records ordered fallbacks.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReplicaResolver {
    max_fallbacks: usize,
}

impl Default for ReplicaResolver {
    fn default() -> Self {
        Self { max_fallbacks: 2 }
    }
}

impl ReplicaResolver {
    /// Creates a resolver that keeps up to two fallbacks per range.
    pub fn new() -> Self {
        Self::default()
    }

    /// Creates a resolver with an explicit fallback limit.
    pub fn with_max_fallbacks(max_fallbacks: usize) -> Self {
        Self { max_fallbacks }
    }

    /// Returns the fallback limit.
    pub fn max_fallbacks(self) -> usize {
        self.max_fallbacks
    }

    /// Resolves candidate replicas into one primary per target range.
    pub fn resolve<ReplicaId>(
        self,
        candidates: impl IntoIterator<Item = ReplicaCandidate<ReplicaId>>,
    ) -> Vec<ResolvedRange<ReplicaId>> {
        let mut by_range = BTreeMap::<RangeKey, Vec<ReplicaCandidate<ReplicaId>>>::new();
        for candidate in candidates {
            by_range
                .entry(RangeKey::from(candidate.range.clone()))
                .or_default()
                .push(candidate);
        }

        let mut resolved = Vec::new();
        for (_, mut candidates) in by_range {
            candidates.retain(|candidate| candidate.quality.selectable());
            if candidates.is_empty() {
                continue;
            }

            candidates.sort_by(|left, right| compare_quality(right.quality, left.quality));
            let primary = candidates.remove(0);
            let fallbacks = candidates
                .into_iter()
                .take(self.max_fallbacks)
                .map(|candidate| candidate.replica)
                .collect();

            resolved.push(ResolvedRange {
                range: primary.range,
                primary: primary.replica,
                fallbacks,
                quality: primary.quality,
            });
        }
        resolved
    }
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct RangeKey {
    start: Vec<u8>,
    end: Vec<u8>,
}

impl From<Range<Vec<u8>>> for RangeKey {
    fn from(range: Range<Vec<u8>>) -> Self {
        Self {
            start: range.start,
            end: range.end,
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq)]
struct ReplicaRank {
    selectable: bool,
    complete: bool,
    state: ReplicaState,
    density: f64,
    coverage_span: u64,
    end_time: u64,
    points: u64,
}

fn compare_quality(left: ReplicaQuality, right: ReplicaQuality) -> std::cmp::Ordering {
    let left = left.rank();
    let right = right.rank();
    left.selectable
        .cmp(&right.selectable)
        .then_with(|| left.complete.cmp(&right.complete))
        .then_with(|| state_score(left.state).cmp(&state_score(right.state)))
        .then_with(|| left.density.total_cmp(&right.density))
        .then_with(|| left.coverage_span.cmp(&right.coverage_span))
        .then_with(|| left.end_time.cmp(&right.end_time))
        .then_with(|| left.points.cmp(&right.points))
}

fn state_score(state: ReplicaState) -> u8 {
    match state {
        ReplicaState::Available => 2,
        ReplicaState::Recovering => 1,
        ReplicaState::Unavailable => 0,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn quality(
        points: u64,
        expected_points: u64,
        complete: bool,
        state: ReplicaState,
    ) -> ReplicaQuality {
        ReplicaQuality::new(10, 40, points, expected_points, complete, state)
    }

    #[test]
    fn resolver_selects_best_replica_and_orders_fallbacks() {
        let candidates = vec![
            ReplicaCandidate::new(
                b"a".to_vec()..b"m".to_vec(),
                "recovering",
                quality(100, 100, true, ReplicaState::Recovering),
            ),
            ReplicaCandidate::new(
                b"a".to_vec()..b"m".to_vec(),
                "primary",
                quality(95, 100, true, ReplicaState::Available),
            ),
            ReplicaCandidate::new(
                b"a".to_vec()..b"m".to_vec(),
                "fallback",
                quality(80, 100, true, ReplicaState::Available),
            ),
        ];

        let resolved = ReplicaResolver::with_max_fallbacks(1).resolve(candidates);

        assert_eq!(resolved.len(), 1);
        assert_eq!(resolved[0].primary, "primary");
        assert_eq!(resolved[0].fallbacks, vec!["fallback"]);
    }

    #[test]
    fn resolver_skips_unavailable_replicas() {
        let candidates = vec![
            ReplicaCandidate::new(
                b"a".to_vec()..b"m".to_vec(),
                "down",
                quality(100, 100, true, ReplicaState::Unavailable),
            ),
            ReplicaCandidate::new(
                b"m".to_vec()..b"z".to_vec(),
                "up",
                quality(50, 100, false, ReplicaState::Available),
            ),
        ];

        let resolved = ReplicaResolver::new().resolve(candidates);

        assert_eq!(resolved.len(), 1);
        assert_eq!(resolved[0].range, b"m".to_vec()..b"z".to_vec());
        assert_eq!(resolved[0].primary, "up");
    }

    #[test]
    fn quality_reports_density_and_coverage() {
        let quality = ReplicaQuality::new(10, 42, 25, 50, false, ReplicaState::Available);

        assert_eq!(quality.coverage_span(), 32);
        assert_eq!(quality.density(), 0.5);
        assert!(quality.selectable());
    }
}