Skip to main content

orlando_cluster/
hash_ring.rs

1use std::collections::{BTreeMap, HashSet};
2
3#[derive(Clone, Debug, PartialEq, Eq, Hash)]
4pub struct SiloAddress {
5    pub host: String,
6    pub port: u16,
7    pub silo_id: String,
8}
9
10impl SiloAddress {
11    pub fn endpoint(&self) -> String {
12        format!("{}:{}", self.host, self.port)
13    }
14}
15
16#[derive(Clone)]
17pub struct HashRing {
18    ring: BTreeMap<u64, SiloAddress>,
19    virtual_nodes: u32,
20}
21
22impl HashRing {
23    pub fn new(virtual_nodes: u32) -> Self {
24        Self {
25            ring: BTreeMap::new(),
26            virtual_nodes,
27        }
28    }
29
30    pub fn add(&mut self, silo: SiloAddress) {
31        for i in 0..self.virtual_nodes {
32            let key = hash_key(&format!("{}:{}", silo.silo_id, i));
33            self.ring.insert(key, silo.clone());
34        }
35    }
36
37    pub fn remove(&mut self, silo: &SiloAddress) {
38        for i in 0..self.virtual_nodes {
39            let key = hash_key(&format!("{}:{}", silo.silo_id, i));
40            self.ring.remove(&key);
41        }
42    }
43
44    pub fn get(&self, grain_key: &str) -> Option<&SiloAddress> {
45        if self.ring.is_empty() {
46            return None;
47        }
48        let key = hash_key(grain_key);
49        self.ring
50            .range(key..)
51            .next()
52            .or_else(|| self.ring.iter().next())
53            .map(|(_, v)| v)
54    }
55
56    pub fn members(&self) -> Vec<SiloAddress> {
57        let mut seen_ids = HashSet::new();
58        let mut result = Vec::new();
59        for silo in self.ring.values() {
60            if seen_ids.insert(&silo.silo_id) {
61                result.push(silo.clone());
62            }
63        }
64        result
65    }
66
67    pub fn is_empty(&self) -> bool {
68        self.ring.is_empty()
69    }
70}
71
72/// FNV-1a hash — deterministic across Rust versions and platforms.
73/// DefaultHasher (SipHash) uses random seeds, which means two silos compiled
74/// with different Rust versions could disagree on grain placement.
75fn hash_key(key: &str) -> u64 {
76    let mut hash: u64 = 0xcbf29ce484222325; // FNV offset basis
77    for byte in key.as_bytes() {
78        hash ^= *byte as u64;
79        hash = hash.wrapping_mul(0x100000001b3); // FNV prime
80    }
81    hash
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87
88    #[test]
89    fn consistent_placement() {
90        let mut ring = HashRing::new(150);
91        ring.add(SiloAddress {
92            host: "127.0.0.1".into(),
93            port: 5001,
94            silo_id: "silo-a".into(),
95        });
96        ring.add(SiloAddress {
97            host: "127.0.0.1".into(),
98            port: 5002,
99            silo_id: "silo-b".into(),
100        });
101
102        // Same key always maps to same silo
103        let first = ring.get("my-grain/key-1").unwrap().silo_id.clone();
104        let second = ring.get("my-grain/key-1").unwrap().silo_id.clone();
105        assert_eq!(first, second);
106    }
107
108    #[test]
109    fn distributes_across_silos() {
110        let mut ring = HashRing::new(150);
111        ring.add(SiloAddress {
112            host: "127.0.0.1".into(),
113            port: 5001,
114            silo_id: "silo-a".into(),
115        });
116        ring.add(SiloAddress {
117            host: "127.0.0.1".into(),
118            port: 5002,
119            silo_id: "silo-b".into(),
120        });
121
122        let mut a_count = 0;
123        let mut b_count = 0;
124        for i in 0..100 {
125            let target = ring.get(&format!("grain/{}", i)).unwrap();
126            if target.silo_id == "silo-a" {
127                a_count += 1;
128            } else {
129                b_count += 1;
130            }
131        }
132
133        // Both silos should get a meaningful share (not all on one)
134        assert!(a_count > 5, "silo-a got {a_count} grains, expected > 5");
135        assert!(b_count > 5, "silo-b got {b_count} grains, expected > 5");
136    }
137
138    #[test]
139    fn remove_silo() {
140        let mut ring = HashRing::new(150);
141        let silo_a = SiloAddress {
142            host: "127.0.0.1".into(),
143            port: 5001,
144            silo_id: "silo-a".into(),
145        };
146        let silo_b = SiloAddress {
147            host: "127.0.0.1".into(),
148            port: 5002,
149            silo_id: "silo-b".into(),
150        };
151        ring.add(silo_a.clone());
152        ring.add(silo_b);
153
154        ring.remove(&silo_a);
155
156        // All grains now go to silo-b
157        for i in 0..20 {
158            let target = ring.get(&format!("grain/{}", i)).unwrap();
159            assert_eq!(target.silo_id, "silo-b");
160        }
161    }
162}