use std::collections::HashMap;
pub fn fnv1a_hash(input: &str) -> u64 {
const OFFSET_BASIS: u64 = 14_695_981_039_346_656_037;
const PRIME: u64 = 1_099_511_628_211;
let mut hash: u64 = OFFSET_BASIS;
for byte in input.bytes() {
hash ^= byte as u64;
hash = hash.wrapping_mul(PRIME);
}
hash
}
#[derive(Debug, Clone)]
pub struct VNode {
pub hash: u64,
pub node_id: String,
}
pub struct ConsistentHashRing {
vnodes: Vec<VNode>,
replicas: usize,
}
impl ConsistentHashRing {
pub fn new(replicas: usize) -> Self {
Self {
vnodes: Vec::new(),
replicas: replicas.max(1),
}
}
pub fn add_node(&mut self, node_id: &str) {
for i in 0..self.replicas {
let key = format!("{}#{}", node_id, i);
let hash = fnv1a_hash(&key);
self.vnodes.push(VNode {
hash,
node_id: node_id.to_string(),
});
}
self.vnodes.sort_unstable_by_key(|v| v.hash);
}
pub fn remove_node(&mut self, node_id: &str) {
self.vnodes.retain(|v| v.node_id != node_id);
}
pub fn get_node(&self, key: &str) -> Option<&str> {
if self.vnodes.is_empty() {
return None;
}
let hash = fnv1a_hash(key);
let idx = self.vnodes.partition_point(|v| v.hash < hash) % self.vnodes.len();
Some(&self.vnodes[idx].node_id)
}
pub fn get_nodes(&self, key: &str, count: usize) -> Vec<&str> {
if self.vnodes.is_empty() || count == 0 {
return Vec::new();
}
let hash = fnv1a_hash(key);
let start = self.vnodes.partition_point(|v| v.hash < hash) % self.vnodes.len();
let mut result: Vec<&str> = Vec::with_capacity(count);
let total = self.vnodes.len();
for offset in 0..total {
let idx = (start + offset) % total;
let node_id = self.vnodes[idx].node_id.as_str();
if !result.contains(&node_id) {
result.push(node_id);
}
if result.len() >= count {
break;
}
}
result
}
pub fn node_count(&self) -> usize {
let mut seen: Vec<&str> = Vec::new();
for v in &self.vnodes {
let s = v.node_id.as_str();
if !seen.contains(&s) {
seen.push(s);
}
}
seen.len()
}
pub fn vnode_count(&self) -> usize {
self.vnodes.len()
}
}
#[derive(Clone, Debug)]
pub struct NodeInfo {
pub id: String,
pub addr: String,
pub healthy: bool,
pub load: f32,
pub last_seen_ms: u64,
}
impl NodeInfo {
pub fn new(id: impl Into<String>, addr: impl Into<String>) -> Self {
Self {
id: id.into(),
addr: addr.into(),
healthy: true,
load: 0.0,
last_seen_ms: current_time_ms(),
}
}
}
pub struct NodeRegistry {
nodes: HashMap<String, NodeInfo>,
ring: ConsistentHashRing,
}
impl NodeRegistry {
pub fn new() -> Self {
Self {
nodes: HashMap::new(),
ring: ConsistentHashRing::new(150),
}
}
pub fn register(&mut self, info: NodeInfo) {
let id = info.id.clone();
if self.nodes.contains_key(&id) {
self.ring.remove_node(&id);
}
self.ring.add_node(&id);
self.nodes.insert(id, info);
}
pub fn deregister(&mut self, node_id: &str) {
self.ring.remove_node(node_id);
self.nodes.remove(node_id);
}
pub fn mark_healthy(&mut self, node_id: &str, healthy: bool) {
if let Some(node) = self.nodes.get_mut(node_id) {
node.healthy = healthy;
node.last_seen_ms = current_time_ms();
}
}
pub fn update_load(&mut self, node_id: &str, load: f32) {
if let Some(node) = self.nodes.get_mut(node_id) {
node.load = load.clamp(0.0, 1.0);
node.last_seen_ms = current_time_ms();
}
}
pub fn route_request(&self, request_key: &str) -> Option<&NodeInfo> {
let candidates = self.ring.get_nodes(request_key, self.nodes.len().max(1));
for node_id in candidates {
if let Some(info) = self.nodes.get(node_id) {
if info.healthy {
return Some(info);
}
}
}
None
}
pub fn healthy_nodes(&self) -> Vec<&NodeInfo> {
self.nodes.values().filter(|n| n.healthy).collect()
}
pub fn all_nodes(&self) -> Vec<&NodeInfo> {
self.nodes.values().collect()
}
pub fn ring(&self) -> &ConsistentHashRing {
&self.ring
}
}
impl Default for NodeRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct CoordinatorConfig {
pub node_id: String,
pub bind_addr: String,
pub peers: Vec<String>,
pub heartbeat_interval_ms: u64,
pub health_timeout_ms: u64,
}
impl CoordinatorConfig {
pub fn local_default(node_id: impl Into<String>) -> Self {
Self {
node_id: node_id.into(),
bind_addr: "127.0.0.1:8080".to_string(),
peers: Vec::new(),
heartbeat_interval_ms: 1_000,
health_timeout_ms: 5_000,
}
}
}
pub struct DistributedCoordinator {
config: CoordinatorConfig,
registry: NodeRegistry,
}
impl DistributedCoordinator {
pub fn new(config: CoordinatorConfig) -> Self {
Self {
config,
registry: NodeRegistry::new(),
}
}
pub fn register_self(&mut self) {
let info = NodeInfo::new(self.config.node_id.clone(), self.config.bind_addr.clone());
self.registry.register(info);
}
pub fn add_peer(&mut self, addr: &str, node_id: &str) {
let info = NodeInfo::new(node_id, addr);
self.registry.register(info);
}
pub fn route(&self, request_key: &str) -> Option<String> {
self.registry
.route_request(request_key)
.map(|n| n.addr.clone())
}
pub fn cluster_size(&self) -> usize {
self.registry.all_nodes().len()
}
pub fn healthy_count(&self) -> usize {
self.registry.healthy_nodes().len()
}
pub fn topology_summary(&self) -> String {
let total = self.cluster_size();
let healthy = self.healthy_count();
let vnodes = self.registry.ring().vnode_count();
let self_id = &self.config.node_id;
format!("cluster[nodes={total} healthy={healthy} vnodes={vnodes} self={self_id}]")
}
pub fn registry(&self) -> &NodeRegistry {
&self.registry
}
pub fn config(&self) -> &CoordinatorConfig {
&self.config
}
pub fn set_peer_health(&mut self, node_id: &str, healthy: bool) {
self.registry.mark_healthy(node_id, healthy);
}
pub fn update_peer_load(&mut self, node_id: &str, load: f32) {
self.registry.update_load(node_id, load);
}
}
fn current_time_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fnv1a_deterministic() {
assert_eq!(fnv1a_hash("hello"), fnv1a_hash("hello"));
assert_eq!(fnv1a_hash(""), fnv1a_hash(""));
}
#[test]
fn fnv1a_different_inputs() {
assert_ne!(fnv1a_hash("foo"), fnv1a_hash("bar"));
assert_ne!(fnv1a_hash("node-0"), fnv1a_hash("node-1"));
}
#[test]
fn hash_ring_empty_returns_none() {
let ring = ConsistentHashRing::new(10);
assert!(ring.get_node("any-key").is_none());
}
#[test]
fn hash_ring_single_node_always_routes_there() {
let mut ring = ConsistentHashRing::new(10);
ring.add_node("solo");
for key in &["a", "b", "c", "hello", "world", "12345"] {
assert_eq!(ring.get_node(key), Some("solo"));
}
}
#[test]
fn hash_ring_vnode_count_equals_replicas_times_nodes() {
let mut ring = ConsistentHashRing::new(50);
ring.add_node("n1");
assert_eq!(ring.vnode_count(), 50);
ring.add_node("n2");
assert_eq!(ring.vnode_count(), 100);
ring.add_node("n3");
assert_eq!(ring.vnode_count(), 150);
}
#[test]
fn hash_ring_node_count() {
let mut ring = ConsistentHashRing::new(10);
assert_eq!(ring.node_count(), 0);
ring.add_node("a");
assert_eq!(ring.node_count(), 1);
ring.add_node("b");
assert_eq!(ring.node_count(), 2);
}
}