Skip to main content

nodedb_cluster/
routing.rs

1use std::collections::HashMap;
2
3use crate::error::{ClusterError, Result};
4
5/// Number of virtual shards.
6pub const VSHARD_COUNT: u16 = 1024;
7
8/// Maps vShards to Raft groups and Raft groups to nodes.
9///
10/// The 1024 vShards are divided into distinct Raft Groups
11/// (e.g., vShards 0-63 managed by Raft Group 1 across Nodes A, B, and C).
12///
13/// This table is the authoritative routing source. It is updated atomically
14/// via Raft state machine when:
15/// - A shard migration completes (Phase 3 atomic cut-over)
16/// - A Raft group membership changes
17/// - A node joins or decommissions
18#[derive(
19    Debug,
20    Clone,
21    serde::Serialize,
22    serde::Deserialize,
23    zerompk::ToMessagePack,
24    zerompk::FromMessagePack,
25)]
26pub struct RoutingTable {
27    /// vshard_id → raft_group_id.
28    vshard_to_group: Vec<u64>,
29    /// raft_group_id → (leader_node, [replica_nodes]).
30    group_members: HashMap<u64, GroupInfo>,
31}
32
33#[derive(
34    Debug,
35    Clone,
36    serde::Serialize,
37    serde::Deserialize,
38    zerompk::ToMessagePack,
39    zerompk::FromMessagePack,
40)]
41pub struct GroupInfo {
42    /// Current leader node ID (0 = no leader known).
43    pub leader: u64,
44    /// All voting members (including leader).
45    pub members: Vec<u64>,
46}
47
48impl RoutingTable {
49    /// Create a routing table with uniform distribution of vShards across groups.
50    ///
51    /// `num_groups` Raft groups are created. vShards are distributed round-robin.
52    /// Each group initially contains `nodes_per_group` nodes from the `nodes` list.
53    pub fn uniform(num_groups: u64, nodes: &[u64], replication_factor: usize) -> Self {
54        assert!(!nodes.is_empty(), "need at least one node");
55        assert!(replication_factor > 0, "need at least RF=1");
56
57        let mut vshard_to_group = Vec::with_capacity(VSHARD_COUNT as usize);
58        for i in 0..VSHARD_COUNT {
59            vshard_to_group.push((i as u64) % num_groups);
60        }
61
62        let mut group_members = HashMap::new();
63        for group_id in 0..num_groups {
64            let rf = replication_factor.min(nodes.len());
65            let start = (group_id as usize * rf) % nodes.len();
66            let members: Vec<u64> = (0..rf).map(|i| nodes[(start + i) % nodes.len()]).collect();
67            let leader = members[0];
68            group_members.insert(group_id, GroupInfo { leader, members });
69        }
70
71        Self {
72            vshard_to_group,
73            group_members,
74        }
75    }
76
77    /// Look up which Raft group owns a vShard.
78    pub fn group_for_vshard(&self, vshard_id: u16) -> Result<u64> {
79        self.vshard_to_group
80            .get(vshard_id as usize)
81            .copied()
82            .ok_or(ClusterError::VShardNotMapped { vshard_id })
83    }
84
85    /// Look up the leader node for a vShard.
86    pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<u64> {
87        let group_id = self.group_for_vshard(vshard_id)?;
88        let info = self
89            .group_members
90            .get(&group_id)
91            .ok_or(ClusterError::GroupNotFound { group_id })?;
92        Ok(info.leader)
93    }
94
95    /// Get group info.
96    pub fn group_info(&self, group_id: u64) -> Option<&GroupInfo> {
97        self.group_members.get(&group_id)
98    }
99
100    /// Update the leader for a Raft group.
101    pub fn set_leader(&mut self, group_id: u64, leader: u64) {
102        if let Some(info) = self.group_members.get_mut(&group_id) {
103            info.leader = leader;
104        }
105    }
106
107    /// Atomically reassign a vShard to a different Raft group.
108    /// Used during Phase 3 (atomic cut-over) of shard migration.
109    pub fn reassign_vshard(&mut self, vshard_id: u16, new_group_id: u64) {
110        if (vshard_id as usize) < self.vshard_to_group.len() {
111            self.vshard_to_group[vshard_id as usize] = new_group_id;
112        }
113    }
114
115    /// All vShards assigned to a given group.
116    pub fn vshards_for_group(&self, group_id: u64) -> Vec<u16> {
117        self.vshard_to_group
118            .iter()
119            .enumerate()
120            .filter(|(_, gid)| **gid == group_id)
121            .map(|(i, _)| i as u16)
122            .collect()
123    }
124
125    /// Number of Raft groups.
126    pub fn num_groups(&self) -> usize {
127        self.group_members.len()
128    }
129
130    /// All group IDs.
131    pub fn group_ids(&self) -> Vec<u64> {
132        self.group_members.keys().copied().collect()
133    }
134
135    /// Update the members of a Raft group (for membership changes).
136    pub fn set_group_members(&mut self, group_id: u64, members: Vec<u64>) {
137        if let Some(info) = self.group_members.get_mut(&group_id) {
138            info.members = members;
139        }
140    }
141
142    /// Access the vshard-to-group mapping (for persistence / wire transfer).
143    pub fn vshard_to_group(&self) -> &[u64] {
144        &self.vshard_to_group
145    }
146
147    /// Access all group members (for persistence / wire transfer).
148    pub fn group_members(&self) -> &HashMap<u64, GroupInfo> {
149        &self.group_members
150    }
151
152    /// Reconstruct a RoutingTable from persisted data.
153    pub fn from_parts(vshard_to_group: Vec<u64>, group_members: HashMap<u64, GroupInfo>) -> Self {
154        Self {
155            vshard_to_group,
156            group_members,
157        }
158    }
159}
160
161/// Compute the primary vShard for a collection name.
162///
163/// Maps a collection name to its vShard ID.
164///
165/// Must match `VShardId::from_collection()` in the nodedb types module
166/// exactly — uses u16 accumulator with multiplier 31.
167pub fn vshard_for_collection(collection: &str) -> u16 {
168    let hash = collection
169        .as_bytes()
170        .iter()
171        .fold(0u16, |h, &b| h.wrapping_mul(31).wrapping_add(b as u16));
172    hash % VSHARD_COUNT
173}
174
175/// FNV-1a 64-bit hash for deterministic key partitioning.
176///
177/// Used by distributed join shuffle and shard split to assign keys
178/// to partitions. NOT for vShard routing — use `vshard_for_collection`
179/// for that.
180pub fn fnv1a_hash(key: &str) -> u64 {
181    let mut hash: u64 = 0xcbf29ce484222325;
182    for byte in key.as_bytes() {
183        hash ^= *byte as u64;
184        hash = hash.wrapping_mul(0x100000001b3);
185    }
186    hash
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn uniform_distribution() {
195        let rt = RoutingTable::uniform(16, &[1, 2, 3], 3);
196        assert_eq!(rt.num_groups(), 16);
197
198        // Each group should have ~64 vShards (1024/16).
199        for gid in 0..16 {
200            let shards = rt.vshards_for_group(gid);
201            assert_eq!(shards.len(), 64);
202        }
203    }
204
205    #[test]
206    fn leader_lookup() {
207        let rt = RoutingTable::uniform(4, &[10, 20, 30], 3);
208        let leader = rt.leader_for_vshard(0).unwrap();
209        assert!(leader > 0);
210    }
211
212    #[test]
213    fn reassign_vshard() {
214        let mut rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
215        let old_group = rt.group_for_vshard(0).unwrap();
216        let new_group = (old_group + 1) % 4;
217        rt.reassign_vshard(0, new_group);
218        assert_eq!(rt.group_for_vshard(0).unwrap(), new_group);
219    }
220
221    #[test]
222    fn set_leader() {
223        let mut rt = RoutingTable::uniform(2, &[1, 2, 3], 3);
224        rt.set_leader(0, 99);
225        assert_eq!(rt.leader_for_vshard(0).unwrap(), 99);
226    }
227
228    #[test]
229    fn vshard_not_mapped() {
230        let rt = RoutingTable::uniform(2, &[1, 2], 2);
231        // All 1024 are mapped, so this shouldn't fail.
232        assert!(rt.group_for_vshard(1023).is_ok());
233    }
234}