use std::collections::BTreeMap;
use thiserror::Error;
use crate::cluster::peer::Peer;
#[derive(Debug, Error, PartialEq, Eq)]
pub enum TokenCoverageError {
#[error("rack {dc}/{rack} has no tokens")]
EmptyRack {
dc: String,
rack: String,
},
#[error(
"rack {dc}/{rack} has overlapping token {token} claimed by peer {peer_a} and peer {peer_b}"
)]
Overlap {
dc: String,
rack: String,
token: u32,
peer_a: u32,
peer_b: u32,
},
}
pub fn validate_token_coverage(peers: &[Peer]) -> Result<(), TokenCoverageError> {
let mut by_rack: BTreeMap<(&str, &str), Vec<&Peer>> = BTreeMap::new();
for p in peers {
by_rack.entry((p.dc(), p.rack())).or_default().push(p);
}
for ((dc, rack), rack_peers) in &by_rack {
let mut tokens: Vec<(u32, u32)> = Vec::new();
for p in rack_peers {
for t in p.tokens() {
tokens.push((t.get_int(), p.idx()));
}
}
if tokens.is_empty() {
return Err(TokenCoverageError::EmptyRack {
dc: (*dc).to_string(),
rack: (*rack).to_string(),
});
}
tokens.sort_by_key(|&(t, _)| t);
for w in tokens.windows(2) {
if w[0].0 == w[1].0 {
return Err(TokenCoverageError::Overlap {
dc: (*dc).to_string(),
rack: (*rack).to_string(),
token: w[0].0,
peer_a: w[0].1,
peer_b: w[1].1,
});
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::peer::PeerEndpoint;
use crate::hashkit::DynToken;
fn mk(idx: u32, dc: &str, rack: &str, tokens: &[u32]) -> Peer {
Peer::new(
idx,
PeerEndpoint::tcp("127.0.0.1".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
rack.into(),
dc.into(),
tokens.iter().copied().map(DynToken::from_u32).collect(),
idx == 0,
true,
false,
)
}
#[test]
fn empty_input_is_ok() {
assert!(validate_token_coverage(&[]).is_ok());
}
#[test]
fn empty_rack_rejected() {
let p = mk(0, "dc1", "r1", &[]);
let err = validate_token_coverage(&[p]).unwrap_err();
assert_eq!(
err,
TokenCoverageError::EmptyRack {
dc: "dc1".into(),
rack: "r1".into(),
}
);
}
#[test]
fn duplicate_token_rejected() {
let a = mk(0, "dc1", "r1", &[1_234_567]);
let b = mk(1, "dc1", "r1", &[1_234_567]);
let err = validate_token_coverage(&[a, b]).unwrap_err();
match err {
TokenCoverageError::Overlap {
dc,
rack,
token,
peer_a,
peer_b,
} => {
assert_eq!(dc, "dc1");
assert_eq!(rack, "r1");
assert_eq!(token, 1_234_567);
let mut got = [peer_a, peer_b];
got.sort_unstable();
assert_eq!(got, [0, 1]);
}
TokenCoverageError::EmptyRack { .. } => panic!("expected Overlap, got EmptyRack"),
}
}
#[test]
fn duplicate_across_racks_is_fine() {
let a = mk(0, "dc1", "r1", &[42]);
let b = mk(1, "dc1", "r2", &[42]);
assert!(validate_token_coverage(&[a, b]).is_ok());
}
#[test]
fn duplicate_across_dcs_is_fine() {
let a = mk(0, "dc1", "r1", &[42]);
let b = mk(1, "dc2", "r1", &[42]);
assert!(validate_token_coverage(&[a, b]).is_ok());
}
#[test]
fn valid_3_peer_ring() {
let a = mk(0, "dc1", "r1", &[0]);
let b = mk(1, "dc1", "r1", &[1_431_655_765]);
let c = mk(2, "dc1", "r1", &[2_863_311_530]);
assert!(validate_token_coverage(&[a, b, c]).is_ok());
}
#[test]
fn valid_4_peer_ring() {
let a = mk(0, "dc1", "r1", &[0]);
let b = mk(1, "dc1", "r1", &[1_073_741_824]);
let c = mk(2, "dc1", "r1", &[2_147_483_648]);
let d = mk(3, "dc1", "r1", &[3_221_225_472]);
assert!(validate_token_coverage(&[a, b, c, d]).is_ok());
}
#[test]
fn pass3_3_peer_subset_is_valid_config() {
let a = mk(0, "dc1", "r1", &[0]);
let b = mk(1, "dc1", "r1", &[1_073_741_824]);
let c = mk(2, "dc1", "r1", &[2_147_483_648]);
assert!(validate_token_coverage(&[a, b, c]).is_ok());
}
#[test]
fn multi_token_per_peer_is_fine() {
let a = mk(0, "dc1", "r1", &[0, 1_073_741_824]);
let b = mk(1, "dc1", "r1", &[2_147_483_648, 3_221_225_472]);
assert!(validate_token_coverage(&[a, b]).is_ok());
}
#[test]
fn duplicate_within_single_peer_token_list_rejected() {
let a = mk(0, "dc1", "r1", &[5, 5]);
let err = validate_token_coverage(&[a]).unwrap_err();
assert!(matches!(err, TokenCoverageError::Overlap { token: 5, .. }));
}
#[test]
fn fault_in_second_dc_still_rejects() {
let a = mk(0, "dc1", "r1", &[10]);
let b = mk(1, "dc1", "r1", &[20]);
let c = mk(2, "dc2", "r1", &[100]);
let d = mk(3, "dc2", "r1", &[100]);
let err = validate_token_coverage(&[a, b, c, d]).unwrap_err();
match err {
TokenCoverageError::Overlap { dc, .. } => assert_eq!(dc, "dc2"),
TokenCoverageError::EmptyRack { .. } => {
panic!("expected Overlap in dc2, got EmptyRack")
}
}
}
#[test]
fn empty_rack_in_second_dc_still_rejects() {
let a = mk(0, "dc1", "r1", &[10]);
let b = mk(1, "dc2", "r1", &[]);
let err = validate_token_coverage(&[a, b]).unwrap_err();
assert_eq!(
err,
TokenCoverageError::EmptyRack {
dc: "dc2".into(),
rack: "r1".into(),
}
);
}
}