use std::collections::BTreeMap;
use std::ops::Range;
use crate::key::LexicographicKey;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ShardError {
EmptyRange,
RangeNotFound,
SplitOutsideRange,
}
#[derive(Debug, Clone, Default)]
pub struct RangeAssigner<WorkerId> {
ranges: BTreeMap<Vec<u8>, RangeAssignment<WorkerId>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangeAssignment<WorkerId> {
pub end: Vec<u8>,
pub worker: WorkerId,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct LoadSample {
pub writes_per_second: f64,
pub bytes_per_second: f64,
pub queries_per_second: f64,
}
impl LoadSample {
pub fn score(self) -> f64 {
self.writes_per_second + self.queries_per_second + self.bytes_per_second / 1024.0
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct RangeLoad<WorkerId> {
pub range: Range<Vec<u8>>,
pub worker: WorkerId,
pub load: LoadSample,
}
impl<WorkerId: Clone> RangeAssigner<WorkerId> {
pub fn new() -> Self {
Self {
ranges: BTreeMap::new(),
}
}
pub fn assign(&mut self, range: Range<Vec<u8>>, worker: WorkerId) -> Result<(), ShardError> {
if range.start >= range.end {
return Err(ShardError::EmptyRange);
}
self.ranges.insert(
range.start,
RangeAssignment {
end: range.end,
worker,
},
);
Ok(())
}
pub fn try_assign(
&mut self,
range: Range<Vec<u8>>,
worker: WorkerId,
) -> Result<(), ShardError> {
self.assign(range, worker)
}
pub fn reassign_start(&mut self, start: &[u8], worker: WorkerId) -> Result<(), ShardError> {
let Some(assignment) = self.ranges.get_mut(start) else {
return Err(ShardError::RangeNotFound);
};
assignment.worker = worker;
Ok(())
}
pub fn split_at(
&mut self,
range_start: &[u8],
split_key: Vec<u8>,
right_worker: WorkerId,
) -> Result<(), ShardError> {
let Some(left) = self.ranges.get_mut(range_start) else {
return Err(ShardError::RangeNotFound);
};
if range_start >= split_key.as_slice() || split_key >= left.end {
return Err(ShardError::SplitOutsideRange);
}
let right_end = left.end.clone();
left.end = split_key.clone();
self.ranges.insert(
split_key,
RangeAssignment {
end: right_end,
worker: right_worker,
},
);
Ok(())
}
pub fn worker_for<K: LexicographicKey + ?Sized>(&self, key: &K) -> Option<&WorkerId> {
let encoded = key.encoded_key();
self.worker_for_encoded(&encoded)
}
pub fn worker_for_encoded(&self, encoded: &[u8]) -> Option<&WorkerId> {
self.ranges
.range(..=encoded.to_vec())
.next_back()
.and_then(|(start, assignment)| {
if start.as_slice() <= encoded && encoded < assignment.end.as_slice() {
Some(&assignment.worker)
} else {
None
}
})
}
pub fn assignments(&self) -> impl Iterator<Item = (&Vec<u8>, &RangeAssignment<WorkerId>)> {
self.ranges.iter()
}
}
pub fn hottest_range<WorkerId>(ranges: &[RangeLoad<WorkerId>]) -> Option<&RangeLoad<WorkerId>> {
ranges
.iter()
.max_by(|left, right| left.load.score().total_cmp(&right.load.score()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn range_assigner_finds_matching_worker() {
let mut assigner = RangeAssigner::new();
assigner
.assign(b"a".to_vec()..b"m".to_vec(), "leaf-1")
.unwrap();
assigner
.assign(b"m".to_vec()..b"z".to_vec(), "leaf-2")
.unwrap();
assert_eq!(assigner.worker_for("beta"), Some(&"leaf-1"));
assert_eq!(assigner.worker_for("omega"), Some(&"leaf-2"));
}
#[test]
fn hottest_range_uses_combined_load_score() {
let ranges = vec![
RangeLoad {
range: b"a".to_vec()..b"m".to_vec(),
worker: "leaf-1",
load: LoadSample {
writes_per_second: 1.0,
bytes_per_second: 0.0,
queries_per_second: 1.0,
},
},
RangeLoad {
range: b"m".to_vec()..b"z".to_vec(),
worker: "leaf-2",
load: LoadSample {
writes_per_second: 2.0,
bytes_per_second: 4096.0,
queries_per_second: 1.0,
},
},
];
assert_eq!(hottest_range(&ranges).unwrap().worker, "leaf-2");
}
#[test]
fn split_at_creates_two_routable_ranges() {
let mut assigner = RangeAssigner::new();
assigner
.assign(b"a".to_vec()..b"z".to_vec(), "leaf-1")
.unwrap();
assigner
.split_at(b"a", b"m".to_vec(), "leaf-2")
.expect("split should be valid");
assert_eq!(assigner.worker_for("beta"), Some(&"leaf-1"));
assert_eq!(assigner.worker_for("omega"), Some(&"leaf-2"));
}
}