1use std::collections::HashMap;
2
3use crate::error::{ClusterError, Result};
4
5pub const VSHARD_COUNT: u16 = 1024;
7
8#[derive(
19 Debug,
20 Clone,
21 serde::Serialize,
22 serde::Deserialize,
23 zerompk::ToMessagePack,
24 zerompk::FromMessagePack,
25)]
26pub struct RoutingTable {
27 vshard_to_group: Vec<u64>,
29 group_members: HashMap<u64, GroupInfo>,
31}
32
33#[derive(
34 Debug,
35 Clone,
36 Default,
37 serde::Serialize,
38 serde::Deserialize,
39 zerompk::ToMessagePack,
40 zerompk::FromMessagePack,
41)]
42pub struct GroupInfo {
43 pub leader: u64,
45 pub members: Vec<u64>,
47 #[serde(default)]
54 pub learners: Vec<u64>,
55}
56
57impl RoutingTable {
58 pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
63 assert!(!nodes.is_empty(), "need at least one node");
64 assert!(replication_factor > 0, "need at least RF=1");
65
66 let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
67 for i in 0..VSHARD_COUNT {
68 vshard_to_group.push((i as u64) % num_groups);
69 }
70
71 let mut group_members = HashMap::new();
72 for group_id in 0..num_groups {
73 let rf = replication_factor.min(nodes.len());
74 let start = (group_id as usize * rf) % nodes.len();
75 let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
76 let leader = members[0];
77 group_members.insert(
78 group_id,
79 GroupInfo {
80 leader,
81 members,
82 learners: Vec::new(),
83 },
84 );
85 }
86
87 Self {
88 vshard_to_group,
89 group_members,
90 }
91 }
92
93 pub fn group_for_vshard(&self, vshard_id: u16) -> Result<u64> {
95 self.vshard_to_group
96 .get(vshard_id as usize)
97 .copied()
98 .ok_or(ClusterError::VShardNotMapped { vshard_id })
99 }
100
101 pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<u64> {
103 let group_id = self.group_for_vshard(vshard_id)?;
104 let info = self
105 .group_members
106 .get(&group_id)
107 .ok_or(ClusterError::GroupNotFound { group_id })?;
108 Ok(info.leader)
109 }
110
111 pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
113 self.group_members.get(&group_id)
114 }
115
116 pub fn set_leader(&mut self, group_id: u64, leader: u64) {
118 if let Some(info) = self.group_members.get_mut(&group_id) {
119 info.leader = leader;
120 }
121 }
122
123 pub fn reassign_vshard(&mut self, vshard_id: u16, new_group_id: u64) {
126 if (vshard_id as usize) < self.vshard_to_group.len() {
127 self.vshard_to_group[vshard_id as usize] = new_group_id;
128 }
129 }
130
131 pub fn vshards_for_group(&self, group_id: u64) -> Vec<u16> {
133 self.vshard_to_group
134 .iter()
135 .enumerate()
136 .filter(|(_, gid)| **gid == group_id)
137 .map(|(i, _)| i as u16)
138 .collect()
139 }
140
141 pub fn num_groups(&self) -> usize {
143 self.group_members.len()
144 }
145
146 pub fn group_ids(&self) -> Vec<u64> {
148 self.group_members.keys().copied().collect()
149 }
150
151 pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
153 if let Some(info) = self.group_members.get_mut(&group_id) {
154 info.members = members;
155 }
156 }
157
158 pub fn remove_group_member(&mut self, group_id: u64, node_id: u64) -> bool {
167 let Some(info) = self.group_members.get_mut(&group_id) else {
168 return false;
169 };
170 let before_members = info.members.len();
171 let before_learners = info.learners.len();
172 info.members.retain(|&id| id != node_id);
173 info.learners.retain(|&id| id != node_id);
174 if info.leader == node_id {
175 info.leader = 0;
176 }
177 info.members.len() != before_members || info.learners.len() != before_learners
178 }
179
180 pub fn set_group_learners(&mut self, group_id: u64, learners: Vec<u64>) {
182 if let Some(info) = self.group_members.get_mut(&group_id) {
183 info.learners = learners;
184 }
185 }
186
187 pub fn add_group_learner(&mut self, group_id: u64, peer: u64) {
190 if let Some(info) = self.group_members.get_mut(&group_id)
191 && !info.members.contains(&peer)
192 && !info.learners.contains(&peer)
193 {
194 info.learners.push(peer);
195 }
196 }
197
198 pub fn promote_group_learner(&mut self, group_id: u64, peer: u64) -> bool {
201 if let Some(info) = self.group_members.get_mut(&group_id)
202 && let Some(pos) = info.learners.iter().position(|&id| id == peer)
203 {
204 info.learners.remove(pos);
205 if !info.members.contains(&peer) {
206 info.members.push(peer);
207 }
208 return true;
209 }
210 false
211 }
212
213 pub fn vshard_to_group(&self) -> &[u64] {
215 &self.vshard_to_group
216 }
217
218 pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
220 &self.group_members
221 }
222
223 pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
225 Self {
226 vshard_to_group,
227 group_members,
228 }
229 }
230}
231
232pub fn vshard_for_collection(collection: &str) -> u16 {
239 let hash = collection
240 .as_bytes()
241 .iter()
242 .fold(0u16, |h, &b| h.wrapping_mul(31).wrapping_add(b as u16));
243 hash % VSHARD_COUNT
244}
245
246pub fn fnv1a_hash(key: &str) -> u64 {
252 let mut hash: u64 = 0xcbf29ce484222325;
253 for byte in key.as_bytes() {
254 hash ^= *byte as u64;
255 hash = hash.wrapping_mul(0x100000001b3);
256 }
257 hash
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn uniform_distribution() {
266 let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
267 assert_eq!(rt.num_groups(), 16);
268
269 for gid in 0..16 {
271 let shards = rt.vshards_for_group(gid);
272 assert_eq!(shards.len(), 64);
273 }
274 }
275
276 #[test]
277 fn leader_lookup() {
278 let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
279 let leader = rt.leader_for_vshard(0).unwrap();
280 assert!(leader > 0);
281 }
282
283 #[test]
284 fn reassign_vshard() {
285 let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
286 let old_group = rt.group_for_vshard(0).unwrap();
287 let new_group = (old_group + 1) % 4;
288 rt.reassign_vshard(0, new_group);
289 assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
290 }
291
292 #[test]
293 fn set_leader() {
294 let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
295 rt.set_leader(0, 99);
296 assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
297 }
298
299 #[test]
300 fn remove_group_member_strips_voter_and_clears_leader() {
301 let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
302 rt.set_leader(0, 2);
303 assert!(rt.remove_group_member(0, 2));
304 let info = rt.group_info(0).unwrap();
305 assert!(!info.members.contains(&2));
306 assert_eq!(info.leader, 0, "leader hint should be cleared");
307 }
308
309 #[test]
310 fn remove_group_member_strips_learner_only() {
311 let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
312 rt.add_group_learner(0, 9);
313 assert!(rt.remove_group_member(0, 9));
314 let info = rt.group_info(0).unwrap();
315 assert!(!info.learners.contains(&9));
316 }
317
318 #[test]
319 fn remove_group_member_unknown_group_returns_false() {
320 let mut rt = RoutingTable::uniform(1, &[1, 2], 2);
321 assert!(!rt.remove_group_member(99, 1));
322 }
323
324 #[test]
325 fn vshard_not_mapped() {
326 let rt = RoutingTable::uniform(2, &[1, 2], 2);
327 assert!(rt.group_for_vshard(1023).is_ok());
329 }
330}