nodedb_cluster/multi_raft/
core.rs1use std::collections::HashMap;
4use std::path::PathBuf;
5use std::time::Duration;
6
7use tracing::info;
8
9use nodedb_raft::node::RaftConfig;
10use nodedb_raft::{RaftNode, Ready};
11
12use crate::error::{ClusterError, Result};
13use crate::raft_storage::RedbLogStorage;
14use crate::routing::RoutingTable;
15
16#[derive(Debug, Clone, serde::Serialize)]
18pub struct GroupStatus {
19 pub group_id: u64,
20 pub role: String,
22 pub leader_id: u64,
23 pub term: u64,
24 pub commit_index: u64,
25 pub last_applied: u64,
26 pub member_count: usize,
27 pub vshard_count: usize,
28}
29
30pub struct MultiRaft {
38 pub(super) node_id: u64,
40 pub(super) groups: HashMap<u64, RaftNode<RedbLogStorage>>,
42 pub(super) routing: RoutingTable,
44 pub(super) election_timeout_min: Duration,
46 pub(super) election_timeout_max: Duration,
47 pub(super) heartbeat_interval: Duration,
49 pub(super) data_dir: PathBuf,
51}
52
53#[derive(Debug, Default)]
55pub struct MultiRaftReady {
56 pub groups: Vec<(u64, Ready)>,
58}
59
60impl MultiRaftReady {
61 pub fn is_empty(&self) -> bool {
62 self.groups.iter().all(|(_gid, r)| r.is_empty())
63 }
64
65 pub fn total_committed(&self) -> usize {
67 self.groups
68 .iter()
69 .map(|(_, r)| r.committed_entries.len())
70 .sum()
71 }
72}
73
74impl MultiRaft {
75 pub fn new(node_id: u64, routing: RoutingTable, data_dir: PathBuf) -> Self {
76 Self {
77 node_id,
78 groups: HashMap::new(),
79 routing,
80 election_timeout_min: Duration::from_secs(2),
81 election_timeout_max: Duration::from_secs(5),
82 heartbeat_interval: Duration::from_millis(50),
83 data_dir,
84 }
85 }
86
87 pub fn with_election_timeout(mut self, min: Duration, max: Duration) -> Self {
89 self.election_timeout_min = min;
90 self.election_timeout_max = max;
91 self
92 }
93
94 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
96 self.heartbeat_interval = interval;
97 self
98 }
99
100 pub fn add_group(&mut self, group_id: u64, peers: Vec<u64>) -> Result<()> {
105 self.add_group_inner(group_id, peers, vec![], false)
106 }
107
108 pub fn add_group_as_learner(
117 &mut self,
118 group_id: u64,
119 voters: Vec<u64>,
120 learners: Vec<u64>,
121 ) -> Result<()> {
122 self.add_group_inner(group_id, voters, learners, true)
123 }
124
125 fn add_group_inner(
126 &mut self,
127 group_id: u64,
128 peers: Vec<u64>,
129 learners: Vec<u64>,
130 starts_as_learner: bool,
131 ) -> Result<()> {
132 let config = RaftConfig {
133 node_id: self.node_id,
134 group_id,
135 peers,
136 learners,
137 starts_as_learner,
138 election_timeout_min: self.election_timeout_min,
139 election_timeout_max: self.election_timeout_max,
140 heartbeat_interval: self.heartbeat_interval,
141 };
142
143 let storage_path = self.data_dir.join(format!("raft/group-{group_id}.redb"));
144 let storage = RedbLogStorage::open(&storage_path).map_err(|e| ClusterError::Transport {
145 detail: format!("failed to open raft storage for group {group_id}: {e}"),
146 })?;
147 let node = RaftNode::new(config, storage);
148 self.groups.insert(group_id, node);
149
150 info!(
151 node = self.node_id,
152 group = group_id,
153 as_learner = starts_as_learner,
154 path = %storage_path.display(),
155 "added raft group with persistent storage"
156 );
157 Ok(())
158 }
159
160 pub fn tick(&mut self) -> MultiRaftReady {
162 let mut ready = MultiRaftReady::default();
163
164 for (&group_id, node) in &mut self.groups {
165 node.tick();
166 let r = node.take_ready();
167 if !r.is_empty() {
168 ready.groups.push((group_id, r));
169 }
170 }
171
172 ready
173 }
174
175 pub fn routing(&self) -> &RoutingTable {
176 &self.routing
177 }
178
179 pub fn routing_mut(&mut self) -> &mut RoutingTable {
180 &mut self.routing
181 }
182
183 pub fn node_id(&self) -> u64 {
184 self.node_id
185 }
186
187 pub fn group_count(&self) -> usize {
188 self.groups.len()
189 }
190
191 pub fn groups_mut(&mut self) -> &mut HashMap<u64, RaftNode<RedbLogStorage>> {
193 &mut self.groups
194 }
195
196 pub fn group_statuses(&self) -> Vec<GroupStatus> {
198 let mut statuses = Vec::with_capacity(self.groups.len());
199 for (&group_id, node) in &self.groups {
200 let vshard_count = self.routing.vshards_for_group(group_id).len();
201 let members = self
202 .routing
203 .group_info(group_id)
204 .map(|info| info.members.clone())
205 .unwrap_or_default();
206
207 statuses.push(GroupStatus {
208 group_id,
209 role: format!("{:?}", node.role()),
210 leader_id: node.leader_id(),
211 term: node.current_term(),
212 commit_index: node.commit_index(),
213 last_applied: node.last_applied(),
214 member_count: members.len(),
215 vshard_count,
216 });
217 }
218 statuses.sort_by_key(|s| s.group_id);
219 statuses
220 }
221
222 pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<Option<u64>> {
224 let group_id = self.routing.group_for_vshard(vshard_id)?;
225 let node = self
226 .groups
227 .get(&group_id)
228 .ok_or(ClusterError::GroupNotFound { group_id })?;
229 let lid = node.leader_id();
230 Ok(if lid == 0 { None } else { Some(lid) })
231 }
232
233 pub fn propose(&mut self, vshard_id: u16, data: Vec<u8>) -> Result<(u64, u64)> {
237 let group_id = self.routing.group_for_vshard(vshard_id)?;
238 let node = self
239 .groups
240 .get_mut(&group_id)
241 .ok_or(ClusterError::GroupNotFound { group_id })?;
242 let log_index = node.propose(data)?;
243 Ok((group_id, log_index))
244 }
245
246 pub fn propose_to_group(&mut self, group_id: u64, data: Vec<u8>) -> Result<u64> {
251 let node = self
252 .groups
253 .get_mut(&group_id)
254 .ok_or(ClusterError::GroupNotFound { group_id })?;
255 Ok(node.propose(data)?)
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262 use std::time::Instant;
263
264 #[test]
265 fn single_node_multi_raft() {
266 let dir = tempfile::tempdir().unwrap();
267 let rt = RoutingTable::uniform(4, &[1], 1);
268 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
269
270 for gid in 0..4 {
271 mr.add_group(gid, vec![]).unwrap();
272 }
273 assert_eq!(mr.group_count(), 4);
274
275 for node in mr.groups.values_mut() {
276 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
277 }
278
279 let ready = mr.tick();
280 assert_eq!(ready.groups.len(), 4);
281 }
282
283 #[test]
284 fn propose_routes_to_correct_group() {
285 let dir = tempfile::tempdir().unwrap();
286 let rt = RoutingTable::uniform(4, &[1], 1);
287 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
288
289 for gid in 0..4 {
290 mr.add_group(gid, vec![]).unwrap();
291 }
292 for node in mr.groups.values_mut() {
293 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
294 }
295 mr.tick();
296 for (gid, ready) in mr.tick().groups {
297 if let Some(last) = ready.committed_entries.last() {
298 mr.advance_applied(gid, last.index).unwrap();
299 }
300 }
301
302 let (_gid, idx) = mr.propose(0, b"cmd-shard-0".to_vec()).unwrap();
303 assert!(idx > 0);
304
305 let (_gid, idx) = mr.propose(256, b"cmd-shard-256".to_vec()).unwrap();
306 assert!(idx > 0);
307 }
308
309 #[test]
310 fn add_group_as_learner_starts_in_learner_role() {
311 use nodedb_raft::NodeRole;
312 let dir = tempfile::tempdir().unwrap();
313 let rt = RoutingTable::uniform(1, &[1, 2], 2);
314 let mut mr = MultiRaft::new(2, rt, dir.path().to_path_buf());
315
316 mr.add_group_as_learner(0, vec![1], vec![]).unwrap();
317
318 let node = mr.groups.get(&0).unwrap();
319 assert_eq!(node.role(), NodeRole::Learner);
320 assert_eq!(node.voters(), &[1]);
321 }
322}