use std::collections::HashMap;
use crate::error::{ClusterError, Result};
pub const VSHARD_COUNT: u16 = 1024;
#[derive(
Debug,
Clone,
serde::Serialize,
serde::Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct RoutingTable {
vshard_to_group: Vec<u64>,
group_members: HashMap<u64, GroupInfo>,
}
#[derive(
Debug,
Clone,
serde::Serialize,
serde::Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct GroupInfo {
pub leader: u64,
pub members: Vec<u64>,
}
impl RoutingTable {
pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
assert!(!nodes.is_empty(), "need at least one node");
assert!(replication_factor > 0, "need at least RF=1");
let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
for i in 0..VSHARD_COUNT {
vshard_to_group.push((i as u64) % num_groups);
}
let mut group_members = HashMap::new();
for group_id in 0..num_groups {
let rf = replication_factor.min(nodes.len());
let start = (group_id as usize * rf) % nodes.len();
let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
let leader = members[0];
group_members.insert(group_id, GroupInfo { leader, members });
}
Self {
vshard_to_group,
group_members,
}
}
pub fn group_for_vshard(&self, vshard_id: u16) -> Result<u64> {
self.vshard_to_group
.get(vshard_id as usize)
.copied()
.ok_or(ClusterError::VShardNotMapped { vshard_id })
}
pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<u64> {
let group_id = self.group_for_vshard(vshard_id)?;
let info = self
.group_members
.get(&group_id)
.ok_or(ClusterError::GroupNotFound { group_id })?;
Ok(info.leader)
}
pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
self.group_members.get(&group_id)
}
pub fn set_leader(&mut self, group_id: u64, leader: u64) {
if let Some(info) = self.group_members.get_mut(&group_id) {
info.leader = leader;
}
}
pub fn reassign_vshard(&mut self, vshard_id: u16, new_group_id: u64) {
if (vshard_id as usize) < self.vshard_to_group.len() {
self.vshard_to_group[vshard_id as usize] = new_group_id;
}
}
pub fn vshards_for_group(&self, group_id: u64) -> Vec<u16> {
self.vshard_to_group
.iter()
.enumerate()
.filter(|(_, gid)| **gid == group_id)
.map(|(i, _)| i as u16)
.collect()
}
pub fn num_groups(&self) -> usize {
self.group_members.len()
}
pub fn group_ids(&self) -> Vec<u64> {
self.group_members.keys().copied().collect()
}
pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
if let Some(info) = self.group_members.get_mut(&group_id) {
info.members = members;
}
}
pub fn vshard_to_group(&self) -> &[u64] {
&self.vshard_to_group
}
pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
&self.group_members
}
pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
Self {
vshard_to_group,
group_members,
}
}
}
pub fn vshard_for_collection(collection: &str) -> u16 {
let hash = collection
.as_bytes()
.iter()
.fold(0u16, |h, &b| h.wrapping_mul(31).wrapping_add(b as u16));
hash % VSHARD_COUNT
}
pub fn fnv1a_hash(key: &str) -> u64 {
let mut hash: u64 = 0xcbf29ce484222325;
for byte in key.as_bytes() {
hash ^= *byte as u64;
hash = hash.wrapping_mul(0x100000001b3);
}
hash
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn uniform_distribution() {
let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
assert_eq!(rt.num_groups(), 16);
for gid in 0..16 {
let shards = rt.vshards_for_group(gid);
assert_eq!(shards.len(), 64);
}
}
#[test]
fn leader_lookup() {
let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
let leader = rt.leader_for_vshard(0).unwrap();
assert!(leader > 0);
}
#[test]
fn reassign_vshard() {
let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
let old_group = rt.group_for_vshard(0).unwrap();
let new_group = (old_group + 1) % 4;
rt.reassign_vshard(0, new_group);
assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
}
#[test]
fn set_leader() {
let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
rt.set_leader(0, 99);
assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
}
#[test]
fn vshard_not_mapped() {
let rt = RoutingTable::uniform(2, &[1, 2], 2);
assert!(rt.group_for_vshard(1023).is_ok());
}
}