use std::collections::BTreeMap;
use std::ops::Range;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ReplicaState {
Available,
Recovering,
Unavailable,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ReplicaQuality {
pub start_time: u64,
pub end_time: u64,
pub points: u64,
pub expected_points: u64,
pub complete: bool,
pub state: ReplicaState,
}
impl ReplicaQuality {
pub fn new(
start_time: u64,
end_time: u64,
points: u64,
expected_points: u64,
complete: bool,
state: ReplicaState,
) -> Self {
Self {
start_time,
end_time,
points,
expected_points,
complete,
state,
}
}
pub fn coverage_span(self) -> u64 {
self.end_time.saturating_sub(self.start_time)
}
pub fn density(self) -> f64 {
if self.expected_points == 0 {
return 1.0;
}
(self.points as f64 / self.expected_points as f64).min(1.0)
}
pub fn selectable(self) -> bool {
self.state != ReplicaState::Unavailable
}
fn rank(self) -> ReplicaRank {
ReplicaRank {
selectable: self.selectable(),
complete: self.complete,
state: self.state,
density: self.density(),
coverage_span: self.coverage_span(),
end_time: self.end_time,
points: self.points,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ReplicaCandidate<ReplicaId> {
pub range: Range<Vec<u8>>,
pub replica: ReplicaId,
pub quality: ReplicaQuality,
}
impl<ReplicaId> ReplicaCandidate<ReplicaId> {
pub fn new(range: Range<Vec<u8>>, replica: ReplicaId, quality: ReplicaQuality) -> Self {
Self {
range,
replica,
quality,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ResolvedRange<ReplicaId> {
pub range: Range<Vec<u8>>,
pub primary: ReplicaId,
pub fallbacks: Vec<ReplicaId>,
pub quality: ReplicaQuality,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReplicaResolver {
max_fallbacks: usize,
}
impl Default for ReplicaResolver {
fn default() -> Self {
Self { max_fallbacks: 2 }
}
}
impl ReplicaResolver {
pub fn new() -> Self {
Self::default()
}
pub fn with_max_fallbacks(max_fallbacks: usize) -> Self {
Self { max_fallbacks }
}
pub fn max_fallbacks(self) -> usize {
self.max_fallbacks
}
pub fn resolve<ReplicaId>(
self,
candidates: impl IntoIterator<Item = ReplicaCandidate<ReplicaId>>,
) -> Vec<ResolvedRange<ReplicaId>> {
let mut by_range = BTreeMap::<RangeKey, Vec<ReplicaCandidate<ReplicaId>>>::new();
for candidate in candidates {
by_range
.entry(RangeKey::from(candidate.range.clone()))
.or_default()
.push(candidate);
}
let mut resolved = Vec::new();
for (_, mut candidates) in by_range {
candidates.retain(|candidate| candidate.quality.selectable());
if candidates.is_empty() {
continue;
}
candidates.sort_by(|left, right| compare_quality(right.quality, left.quality));
let primary = candidates.remove(0);
let fallbacks = candidates
.into_iter()
.take(self.max_fallbacks)
.map(|candidate| candidate.replica)
.collect();
resolved.push(ResolvedRange {
range: primary.range,
primary: primary.replica,
fallbacks,
quality: primary.quality,
});
}
resolved
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct RangeKey {
start: Vec<u8>,
end: Vec<u8>,
}
impl From<Range<Vec<u8>>> for RangeKey {
fn from(range: Range<Vec<u8>>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
struct ReplicaRank {
selectable: bool,
complete: bool,
state: ReplicaState,
density: f64,
coverage_span: u64,
end_time: u64,
points: u64,
}
fn compare_quality(left: ReplicaQuality, right: ReplicaQuality) -> std::cmp::Ordering {
let left = left.rank();
let right = right.rank();
left.selectable
.cmp(&right.selectable)
.then_with(|| left.complete.cmp(&right.complete))
.then_with(|| state_score(left.state).cmp(&state_score(right.state)))
.then_with(|| left.density.total_cmp(&right.density))
.then_with(|| left.coverage_span.cmp(&right.coverage_span))
.then_with(|| left.end_time.cmp(&right.end_time))
.then_with(|| left.points.cmp(&right.points))
}
fn state_score(state: ReplicaState) -> u8 {
match state {
ReplicaState::Available => 2,
ReplicaState::Recovering => 1,
ReplicaState::Unavailable => 0,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn quality(
points: u64,
expected_points: u64,
complete: bool,
state: ReplicaState,
) -> ReplicaQuality {
ReplicaQuality::new(10, 40, points, expected_points, complete, state)
}
#[test]
fn resolver_selects_best_replica_and_orders_fallbacks() {
let candidates = vec![
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"recovering",
quality(100, 100, true, ReplicaState::Recovering),
),
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"primary",
quality(95, 100, true, ReplicaState::Available),
),
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"fallback",
quality(80, 100, true, ReplicaState::Available),
),
];
let resolved = ReplicaResolver::with_max_fallbacks(1).resolve(candidates);
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].primary, "primary");
assert_eq!(resolved[0].fallbacks, vec!["fallback"]);
}
#[test]
fn resolver_skips_unavailable_replicas() {
let candidates = vec![
ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"down",
quality(100, 100, true, ReplicaState::Unavailable),
),
ReplicaCandidate::new(
b"m".to_vec()..b"z".to_vec(),
"up",
quality(50, 100, false, ReplicaState::Available),
),
];
let resolved = ReplicaResolver::new().resolve(candidates);
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].range, b"m".to_vec()..b"z".to_vec());
assert_eq!(resolved[0].primary, "up");
}
#[test]
fn quality_reports_density_and_coverage() {
let quality = ReplicaQuality::new(10, 42, 25, 50, false, ReplicaState::Available);
assert_eq!(quality.coverage_span(), 32);
assert_eq!(quality.density(), 0.5);
assert!(quality.selectable());
}
}