use std::collections::HashSet;
use crate::embed::events::PeerId;
pub type VnodeId = u32;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum NodeRole {
Primary,
Fallback,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AnnotatedPeer {
pub peer_id: PeerId,
pub vnode: VnodeId,
pub role: NodeRole,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RingPoint {
pub token: u64,
pub peer_id: PeerId,
}
impl RingPoint {
#[must_use]
pub fn new(token: u64, peer_id: PeerId) -> Self {
Self { token, peer_id }
}
}
#[derive(Clone, Debug)]
pub struct ClusterState {
ring: Vec<RingPoint>,
alive: HashSet<PeerId>,
}
impl ClusterState {
#[must_use]
pub fn new(mut ring: Vec<RingPoint>, alive: HashSet<PeerId>) -> Self {
ring.sort_by(|a, b| a.token.cmp(&b.token).then(a.peer_id.cmp(&b.peer_id)));
Self { ring, alive }
}
#[must_use]
pub fn ring(&self) -> &[RingPoint] {
&self.ring
}
#[must_use]
pub fn is_alive(&self, peer_id: PeerId) -> bool {
self.alive.contains(&peer_id)
}
}
#[must_use]
pub fn walk_n_successors(
cluster: &ClusterState,
key_token: u64,
n: usize,
) -> Vec<(VnodeId, PeerId)> {
let ring = cluster.ring();
if ring.is_empty() || n == 0 {
return Vec::new();
}
let start = primary_index(ring, key_token);
let mut out: Vec<(VnodeId, PeerId)> = Vec::with_capacity(n);
let len = ring.len();
for step in 0..len {
if out.len() >= n {
break;
}
let idx = (start + step) % len;
let pt = &ring[idx];
if out.iter().any(|(_, pid)| *pid == pt.peer_id) {
continue;
}
let vnode = u32::try_from(idx).unwrap_or(u32::MAX);
out.push((vnode, pt.peer_id));
}
out
}
#[must_use]
pub fn get_apl_ann(cluster: &ClusterState, key_token: u64, n: usize) -> Vec<AnnotatedPeer> {
let ring = cluster.ring();
if ring.is_empty() || n == 0 {
return Vec::new();
}
let start = primary_index(ring, key_token);
let len = ring.len();
let mut walk: Vec<(VnodeId, PeerId)> = Vec::new();
for step in 0..len {
let idx = (start + step) % len;
let pid = ring[idx].peer_id;
if walk.iter().any(|(_, p)| *p == pid) {
continue;
}
let vnode = u32::try_from(idx).unwrap_or(u32::MAX);
walk.push((vnode, pid));
}
let canonical_len = walk.len().min(n);
let mut result: Vec<AnnotatedPeer> = Vec::with_capacity(canonical_len);
let mut next_fallback = canonical_len;
for slot in 0..canonical_len {
let (vnode, peer_id) = walk[slot];
if cluster.is_alive(peer_id) {
result.push(AnnotatedPeer {
peer_id,
vnode,
role: NodeRole::Primary,
});
continue;
}
while next_fallback < walk.len() {
let (fb_vnode, fb_peer) = walk[next_fallback];
next_fallback += 1;
if !cluster.is_alive(fb_peer) {
continue;
}
if result.iter().any(|p| p.peer_id == fb_peer) {
continue;
}
result.push(AnnotatedPeer {
peer_id: fb_peer,
vnode: fb_vnode,
role: NodeRole::Fallback,
});
break;
}
}
result
}
#[must_use]
pub fn primaries(annotated: &[AnnotatedPeer]) -> Vec<&AnnotatedPeer> {
annotated
.iter()
.filter(|p| p.role == NodeRole::Primary)
.collect()
}
#[must_use]
pub fn fallbacks(annotated: &[AnnotatedPeer]) -> Vec<&AnnotatedPeer> {
annotated
.iter()
.filter(|p| p.role == NodeRole::Fallback)
.collect()
}
fn primary_index(ring: &[RingPoint], key_token: u64) -> usize {
debug_assert!(!ring.is_empty(), "primary_index requires a non-empty ring");
match ring.binary_search_by_key(&key_token, |p| p.token) {
Ok(i) => i,
Err(i) => {
if i >= ring.len() {
0
} else {
i
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ring(pairs: &[(u64, PeerId)]) -> Vec<RingPoint> {
pairs.iter().map(|&(t, p)| RingPoint::new(t, p)).collect()
}
#[test]
fn empty_ring_returns_empty_apl() {
let cs = ClusterState::new(Vec::new(), HashSet::new());
assert!(get_apl_ann(&cs, 100, 3).is_empty());
assert!(walk_n_successors(&cs, 100, 3).is_empty());
}
#[test]
fn n_zero_returns_empty_apl() {
let cs = ClusterState::new(ring(&[(10, 0)]), [0u32].into_iter().collect());
assert!(get_apl_ann(&cs, 10, 0).is_empty());
assert!(walk_n_successors(&cs, 10, 0).is_empty());
}
#[test]
fn walk_wraps_on_overflow() {
let cs = ClusterState::new(
ring(&[(10, 0), (20, 1), (30, 2)]),
[0u32, 1, 2].into_iter().collect(),
);
let pre = walk_n_successors(&cs, 50, 3);
assert_eq!(pre.iter().map(|p| p.1).collect::<Vec<_>>(), vec![0, 1, 2]);
}
#[test]
fn walk_dedups_peers_with_multiple_vnodes() {
let cs = ClusterState::new(
ring(&[(10, 0), (20, 0), (30, 1)]),
[0u32, 1].into_iter().collect(),
);
let pre = walk_n_successors(&cs, 0, 2);
assert_eq!(pre.iter().map(|p| p.1).collect::<Vec<_>>(), vec![0, 1]);
}
#[test]
fn primaries_subset_of_canonical_walk() {
let cs = ClusterState::new(
ring(&[(10, 0), (20, 1), (30, 2), (40, 3)]),
[0u32, 1, 2, 3].into_iter().collect(),
);
let canonical: Vec<PeerId> = walk_n_successors(&cs, 0, 3)
.into_iter()
.map(|p| p.1)
.collect();
let apl = get_apl_ann(&cs, 0, 3);
for entry in primaries(&apl) {
assert!(canonical.contains(&entry.peer_id));
}
}
}