nodedb_cluster/
routing.rs1use std::collections::HashMap;
2
3use crate::error::{ClusterError, Result};
4
5pub const VSHARD_COUNT: u16 = 1024;
7
8#[derive(
19 Debug,
20 Clone,
21 serde::Serialize,
22 serde::Deserialize,
23 zerompk::ToMessagePack,
24 zerompk::FromMessagePack,
25)]
26pub struct RoutingTable {
27 vshard_to_group: Vec<u64>,
29 group_members: HashMap<u64, GroupInfo>,
31}
32
33#[derive(
34 Debug,
35 Clone,
36 Default,
37 serde::Serialize,
38 serde::Deserialize,
39 zerompk::ToMessagePack,
40 zerompk::FromMessagePack,
41)]
42pub struct GroupInfo {
43 pub leader: u64,
45 pub members: Vec<u64>,
47 #[serde(default)]
54 pub learners: Vec<u64>,
55}
56
57impl RoutingTable {
58 pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
63 assert!(!nodes.is_empty(), "need at least one node");
64 assert!(replication_factor > 0, "need at least RF=1");
65
66 let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
67 for i in 0..VSHARD_COUNT {
68 vshard_to_group.push((i as u64) % num_groups);
69 }
70
71 let mut group_members = HashMap::new();
72 for group_id in 0..num_groups {
73 let rf = replication_factor.min(nodes.len());
74 let start = (group_id as usize * rf) % nodes.len();
75 let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
76 let leader = members[0];
77 group_members.insert(
78 group_id,
79 GroupInfo {
80 leader,
81 members,
82 learners: Vec::new(),
83 },
84 );
85 }
86
87 Self {
88 vshard_to_group,
89 group_members,
90 }
91 }
92
93 pub fn group_for_vshard(&self, vshard_id: u16) -> Result<u64> {
95 self.vshard_to_group
96 .get(vshard_id as usize)
97 .copied()
98 .ok_or(ClusterError::VShardNotMapped { vshard_id })
99 }
100
101 pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<u64> {
103 let group_id = self.group_for_vshard(vshard_id)?;
104 let info = self
105 .group_members
106 .get(&group_id)
107 .ok_or(ClusterError::GroupNotFound { group_id })?;
108 Ok(info.leader)
109 }
110
111 pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
113 self.group_members.get(&group_id)
114 }
115
116 pub fn set_leader(&mut self, group_id: u64, leader: u64) {
118 if let Some(info) = self.group_members.get_mut(&group_id) {
119 info.leader = leader;
120 }
121 }
122
123 pub fn reassign_vshard(&mut self, vshard_id: u16, new_group_id: u64) {
126 if (vshard_id as usize) < self.vshard_to_group.len() {
127 self.vshard_to_group[vshard_id as usize] = new_group_id;
128 }
129 }
130
131 pub fn vshards_for_group(&self, group_id: u64) -> Vec<u16> {
133 self.vshard_to_group
134 .iter()
135 .enumerate()
136 .filter(|(_, gid)| **gid == group_id)
137 .map(|(i, _)| i as u16)
138 .collect()
139 }
140
141 pub fn num_groups(&self) -> usize {
143 self.group_members.len()
144 }
145
146 pub fn group_ids(&self) -> Vec<u64> {
148 self.group_members.keys().copied().collect()
149 }
150
151 pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
153 if let Some(info) = self.group_members.get_mut(&group_id) {
154 info.members = members;
155 }
156 }
157
158 pub fn set_group_learners(&mut self, group_id: u64, learners: Vec<u64>) {
160 if let Some(info) = self.group_members.get_mut(&group_id) {
161 info.learners = learners;
162 }
163 }
164
165 pub fn add_group_learner(&mut self, group_id: u64, peer: u64) {
168 if let Some(info) = self.group_members.get_mut(&group_id)
169 && !info.members.contains(&peer)
170 && !info.learners.contains(&peer)
171 {
172 info.learners.push(peer);
173 }
174 }
175
176 pub fn promote_group_learner(&mut self, group_id: u64, peer: u64) -> bool {
179 if let Some(info) = self.group_members.get_mut(&group_id)
180 && let Some(pos) = info.learners.iter().position(|&id| id == peer)
181 {
182 info.learners.remove(pos);
183 if !info.members.contains(&peer) {
184 info.members.push(peer);
185 }
186 return true;
187 }
188 false
189 }
190
191 pub fn vshard_to_group(&self) -> &[u64] {
193 &self.vshard_to_group
194 }
195
196 pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
198 &self.group_members
199 }
200
201 pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
203 Self {
204 vshard_to_group,
205 group_members,
206 }
207 }
208}
209
210pub fn vshard_for_collection(collection: &str) -> u16 {
217 let hash = collection
218 .as_bytes()
219 .iter()
220 .fold(0u16, |h, &b| h.wrapping_mul(31).wrapping_add(b as u16));
221 hash % VSHARD_COUNT
222}
223
224pub fn fnv1a_hash(key: &str) -> u64 {
230 let mut hash: u64 = 0xcbf29ce484222325;
231 for byte in key.as_bytes() {
232 hash ^= *byte as u64;
233 hash = hash.wrapping_mul(0x100000001b3);
234 }
235 hash
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn uniform_distribution() {
244 let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
245 assert_eq!(rt.num_groups(), 16);
246
247 for gid in 0..16 {
249 let shards = rt.vshards_for_group(gid);
250 assert_eq!(shards.len(), 64);
251 }
252 }
253
254 #[test]
255 fn leader_lookup() {
256 let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
257 let leader = rt.leader_for_vshard(0).unwrap();
258 assert!(leader > 0);
259 }
260
261 #[test]
262 fn reassign_vshard() {
263 let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
264 let old_group = rt.group_for_vshard(0).unwrap();
265 let new_group = (old_group + 1) % 4;
266 rt.reassign_vshard(0, new_group);
267 assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
268 }
269
270 #[test]
271 fn set_leader() {
272 let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
273 rt.set_leader(0, 99);
274 assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
275 }
276
277 #[test]
278 fn vshard_not_mapped() {
279 let rt = RoutingTable::uniform(2, &[1, 2], 2);
280 assert!(rt.group_for_vshard(1023).is_ok());
282 }
283}