use std::cmp::Ordering;
use crate::cluster::datacenter::Continuum;
use crate::hashkit::DynToken;
#[must_use]
pub fn dispatch(continuums: &[Continuum], token: &DynToken) -> Option<u32> {
let n = continuums.len();
if n == 0 {
return None;
}
let first = &continuums[0];
let last = &continuums[n - 1];
if last.token.cmp(token) == Ordering::Less {
return Some(first.peer_idx);
}
if first.token.cmp(token) != Ordering::Less {
return Some(first.peer_idx);
}
let mut left = 0usize;
let mut right = n - 1;
while left < right {
let middle = left + (right - left) / 2;
match continuums[middle].token.cmp(token) {
Ordering::Equal => return Some(continuums[middle].peer_idx),
Ordering::Less => left = middle + 1,
Ordering::Greater => right = middle,
}
}
Some(continuums[right].peer_idx)
}
#[derive(Clone, Debug)]
pub struct PeerTokens<'a> {
pub peer_idx: u32,
pub dc: &'a str,
pub rack: &'a str,
pub tokens: &'a [DynToken],
}
pub fn rebuild_continuums(
dcs: &mut [crate::cluster::datacenter::Datacenter],
peers: &[PeerTokens<'_>],
) -> usize {
for dc in dcs.iter_mut() {
for rack in dc.racks_mut().iter_mut() {
rack.clear_continuums();
}
}
let mut applied = 0usize;
let mut touched: Vec<(usize, usize)> = Vec::new();
for peer in peers {
let Some(dc_idx) = dcs.iter().position(|d| d.name() == peer.dc) else {
continue;
};
let Some(rack_idx) = dcs[dc_idx].rack_idx(peer.rack) else {
continue;
};
let dc = &mut dcs[dc_idx];
let rack = &mut dc.racks_mut()[rack_idx];
rack.add_peer_tokens(peer.peer_idx, peer.tokens);
applied += 1;
if !touched.contains(&(dc_idx, rack_idx)) {
touched.push((dc_idx, rack_idx));
}
}
for (di, ri) in touched {
dcs[di].racks_mut()[ri].sort_continuums();
}
applied
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::datacenter::Datacenter;
fn ring(pairs: &[(u32, u32)]) -> Vec<Continuum> {
pairs
.iter()
.map(|&(idx, tok)| Continuum::new(DynToken::from_u32(tok), idx))
.collect()
}
#[test]
fn empty_ring_returns_none() {
let cs: [Continuum; 0] = [];
assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), None);
}
#[test]
fn single_token_always_resolves() {
let cs = ring(&[(7, 100)]);
assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), Some(7));
assert_eq!(dispatch(&cs, &DynToken::from_u32(100)), Some(7));
assert_eq!(dispatch(&cs, &DynToken::from_u32(101)), Some(7));
}
#[test]
fn dispatch_wraps_on_overflow() {
let cs = ring(&[(0, 10), (1, 20), (2, 30)]);
assert_eq!(dispatch(&cs, &DynToken::from_u32(35)), Some(0));
assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), Some(0));
}
#[test]
fn dispatch_finds_upper_bound() {
let cs = ring(&[(0, 10), (1, 20), (2, 30)]);
assert_eq!(dispatch(&cs, &DynToken::from_u32(11)), Some(1));
assert_eq!(dispatch(&cs, &DynToken::from_u32(20)), Some(1));
assert_eq!(dispatch(&cs, &DynToken::from_u32(21)), Some(2));
assert_eq!(dispatch(&cs, &DynToken::from_u32(30)), Some(2));
}
#[test]
fn rebuild_skips_unknown_dc() {
let mut dc = Datacenter::new("d".into());
dc.upsert_rack("r".into());
let mut dcs = vec![dc];
let toks = [DynToken::from_u32(1)];
let known = PeerTokens {
peer_idx: 0,
dc: "d",
rack: "r",
tokens: &toks,
};
let unknown = PeerTokens {
peer_idx: 1,
dc: "ghost",
rack: "r",
tokens: &toks,
};
assert_eq!(rebuild_continuums(&mut dcs, &[known, unknown]), 1);
}
#[test]
fn rebuild_clears_before_repopulating() {
let mut dc = Datacenter::new("d".into());
dc.upsert_rack("r".into());
let mut dcs = vec![dc];
let toks = [DynToken::from_u32(1)];
let p = PeerTokens {
peer_idx: 0,
dc: "d",
rack: "r",
tokens: &toks,
};
rebuild_continuums(&mut dcs, std::slice::from_ref(&p));
rebuild_continuums(&mut dcs, &[p]);
assert_eq!(dcs[0].racks()[0].ncontinuum(), 1);
}
}