nodedb_cluster/
routing.rs1use std::collections::HashMap;
2
3use crate::error::{ClusterError, Result};
4
5pub const VSHARD_COUNT: u16 = 1024;
7
8#[derive(
19 Debug,
20 Clone,
21 serde::Serialize,
22 serde::Deserialize,
23 zerompk::ToMessagePack,
24 zerompk::FromMessagePack,
25)]
26pub struct RoutingTable {
27 vshard_to_group: Vec<u64>,
29 group_members: HashMap<u64, GroupInfo>,
31}
32
33#[derive(
34 Debug,
35 Clone,
36 serde::Serialize,
37 serde::Deserialize,
38 zerompk::ToMessagePack,
39 zerompk::FromMessagePack,
40)]
41pub struct GroupInfo {
42 pub leader: u64,
44 pub members: Vec<u64>,
46}
47
48impl RoutingTable {
49 pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
54 assert!(!nodes.is_empty(), "need at least one node");
55 assert!(replication_factor > 0, "need at least RF=1");
56
57 let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
58 for i in 0..VSHARD_COUNT {
59 vshard_to_group.push((i as u64) % num_groups);
60 }
61
62 let mut group_members = HashMap::new();
63 for group_id in 0..num_groups {
64 let rf = replication_factor.min(nodes.len());
65 let start = (group_id as usize * rf) % nodes.len();
66 let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
67 let leader = members[0];
68 group_members.insert(group_id, GroupInfo { leader, members });
69 }
70
71 Self {
72 vshard_to_group,
73 group_members,
74 }
75 }
76
77 pub fn group_for_vshard(&self, vshard_id: u16) -> Result<u64> {
79 self.vshard_to_group
80 .get(vshard_id as usize)
81 .copied()
82 .ok_or(ClusterError::VShardNotMapped { vshard_id })
83 }
84
85 pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<u64> {
87 let group_id = self.group_for_vshard(vshard_id)?;
88 let info = self
89 .group_members
90 .get(&group_id)
91 .ok_or(ClusterError::GroupNotFound { group_id })?;
92 Ok(info.leader)
93 }
94
95 pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
97 self.group_members.get(&group_id)
98 }
99
100 pub fn set_leader(&mut self, group_id: u64, leader: u64) {
102 if let Some(info) = self.group_members.get_mut(&group_id) {
103 info.leader = leader;
104 }
105 }
106
107 pub fn reassign_vshard(&mut self, vshard_id: u16, new_group_id: u64) {
110 if (vshard_id as usize) < self.vshard_to_group.len() {
111 self.vshard_to_group[vshard_id as usize] = new_group_id;
112 }
113 }
114
115 pub fn vshards_for_group(&self, group_id: u64) -> Vec<u16> {
117 self.vshard_to_group
118 .iter()
119 .enumerate()
120 .filter(|(_, gid)| **gid == group_id)
121 .map(|(i, _)| i as u16)
122 .collect()
123 }
124
125 pub fn num_groups(&self) -> usize {
127 self.group_members.len()
128 }
129
130 pub fn group_ids(&self) -> Vec<u64> {
132 self.group_members.keys().copied().collect()
133 }
134
135 pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
137 if let Some(info) = self.group_members.get_mut(&group_id) {
138 info.members = members;
139 }
140 }
141
142 pub fn vshard_to_group(&self) -> &[u64] {
144 &self.vshard_to_group
145 }
146
147 pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
149 &self.group_members
150 }
151
152 pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
154 Self {
155 vshard_to_group,
156 group_members,
157 }
158 }
159}
160
161pub fn vshard_for_collection(collection: &str) -> u16 {
168 let hash = collection
169 .as_bytes()
170 .iter()
171 .fold(0u16, |h, &b| h.wrapping_mul(31).wrapping_add(b as u16));
172 hash % VSHARD_COUNT
173}
174
175pub fn fnv1a_hash(key: &str) -> u64 {
181 let mut hash: u64 = 0xcbf29ce484222325;
182 for byte in key.as_bytes() {
183 hash ^= *byte as u64;
184 hash = hash.wrapping_mul(0x100000001b3);
185 }
186 hash
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn uniform_distribution() {
195 let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
196 assert_eq!(rt.num_groups(), 16);
197
198 for gid in 0..16 {
200 let shards = rt.vshards_for_group(gid);
201 assert_eq!(shards.len(), 64);
202 }
203 }
204
205 #[test]
206 fn leader_lookup() {
207 let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
208 let leader = rt.leader_for_vshard(0).unwrap();
209 assert!(leader > 0);
210 }
211
212 #[test]
213 fn reassign_vshard() {
214 let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
215 let old_group = rt.group_for_vshard(0).unwrap();
216 let new_group = (old_group + 1) % 4;
217 rt.reassign_vshard(0, new_group);
218 assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
219 }
220
221 #[test]
222 fn set_leader() {
223 let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
224 rt.set_leader(0, 99);
225 assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
226 }
227
228 #[test]
229 fn vshard_not_mapped() {
230 let rt = RoutingTable::uniform(2, &[1, 2], 2);
231 assert!(rt.group_for_vshard(1023).is_ok());
233 }
234}