nodedb_cluster/multi_raft/
core.rs1use std::collections::HashMap;
6use std::path::PathBuf;
7use std::time::Duration;
8
9use tracing::info;
10
11use nodedb_raft::node::RaftConfig;
12use nodedb_raft::{RaftNode, Ready};
13
14use crate::error::{ClusterError, Result};
15use crate::raft_storage::RedbLogStorage;
16use crate::routing::RoutingTable;
17
18#[derive(Debug, Clone, serde::Serialize)]
20pub struct GroupStatus {
21 pub group_id: u64,
22 pub role: String,
24 pub leader_id: u64,
25 pub term: u64,
26 pub commit_index: u64,
27 pub last_applied: u64,
28 pub member_count: usize,
29 pub vshard_count: usize,
30}
31
32pub struct MultiRaft {
40 pub(super) node_id: u64,
42 pub(super) groups: HashMap<u64, RaftNode<RedbLogStorage>>,
44 pub(super) routing: RoutingTable,
46 pub(super) election_timeout_min: Duration,
48 pub(super) election_timeout_max: Duration,
49 pub(super) heartbeat_interval: Duration,
51 pub(super) data_dir: PathBuf,
53}
54
55#[derive(Debug, Default)]
57pub struct MultiRaftReady {
58 pub groups: Vec<(u64, Ready)>,
60}
61
62impl MultiRaftReady {
63 pub fn is_empty(&self) -> bool {
64 self.groups.iter().all(|(_gid, r)| r.is_empty())
65 }
66
67 pub fn total_committed(&self) -> usize {
69 self.groups
70 .iter()
71 .map(|(_, r)| r.committed_entries.len())
72 .sum()
73 }
74}
75
76impl MultiRaft {
77 pub fn new(node_id: u64, routing: RoutingTable, data_dir: PathBuf) -> Self {
78 Self {
79 node_id,
80 groups: HashMap::new(),
81 routing,
82 election_timeout_min: Duration::from_secs(2),
83 election_timeout_max: Duration::from_secs(5),
84 heartbeat_interval: Duration::from_millis(50),
85 data_dir,
86 }
87 }
88
89 pub fn with_election_timeout(mut self, min: Duration, max: Duration) -> Self {
91 self.election_timeout_min = min;
92 self.election_timeout_max = max;
93 self
94 }
95
96 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
98 self.heartbeat_interval = interval;
99 self
100 }
101
102 pub fn add_group(&mut self, group_id: u64, peers: Vec<u64>) -> Result<()> {
107 self.add_group_inner(group_id, peers, vec![], false)
108 }
109
110 pub fn add_group_as_learner(
119 &mut self,
120 group_id: u64,
121 voters: Vec<u64>,
122 learners: Vec<u64>,
123 ) -> Result<()> {
124 self.add_group_inner(group_id, voters, learners, true)
125 }
126
127 fn add_group_inner(
128 &mut self,
129 group_id: u64,
130 peers: Vec<u64>,
131 learners: Vec<u64>,
132 starts_as_learner: bool,
133 ) -> Result<()> {
134 let config = RaftConfig {
135 node_id: self.node_id,
136 group_id,
137 peers,
138 learners,
139 observers: vec![],
140 starts_as_learner,
141 starts_as_observer: false,
142 election_timeout_min: self.election_timeout_min,
143 election_timeout_max: self.election_timeout_max,
144 heartbeat_interval: self.heartbeat_interval,
145 };
146
147 let storage_path = self.data_dir.join(format!("raft/group-{group_id}.redb"));
148 let storage = RedbLogStorage::open(&storage_path).map_err(|e| ClusterError::Transport {
149 detail: format!("failed to open raft storage for group {group_id}: {e}"),
150 })?;
151 let node = RaftNode::new(config, storage);
152 self.groups.insert(group_id, node);
153
154 info!(
155 node = self.node_id,
156 group = group_id,
157 as_learner = starts_as_learner,
158 path = %storage_path.display(),
159 "added raft group with persistent storage"
160 );
161 Ok(())
162 }
163
164 pub fn tick(&mut self) -> MultiRaftReady {
166 let mut ready = MultiRaftReady::default();
167
168 for (&group_id, node) in &mut self.groups {
169 node.tick();
170 let r = node.take_ready();
171 if !r.is_empty() {
172 ready.groups.push((group_id, r));
173 }
174 }
175
176 ready
177 }
178
179 pub fn routing(&self) -> &RoutingTable {
180 &self.routing
181 }
182
183 pub fn routing_mut(&mut self) -> &mut RoutingTable {
184 &mut self.routing
185 }
186
187 pub fn node_id(&self) -> u64 {
188 self.node_id
189 }
190
191 pub fn group_count(&self) -> usize {
192 self.groups.len()
193 }
194
195 pub fn groups_mut(&mut self) -> &mut HashMap<u64, RaftNode<RedbLogStorage>> {
197 &mut self.groups
198 }
199
200 pub fn group_statuses(&self) -> Vec<GroupStatus> {
202 let mut statuses = Vec::with_capacity(self.groups.len());
203 for (&group_id, node) in &self.groups {
204 let vshard_count = self.routing.vshards_for_group(group_id).len();
205 let members = self
206 .routing
207 .group_info(group_id)
208 .map(|info| info.members.clone())
209 .unwrap_or_default();
210
211 statuses.push(GroupStatus {
212 group_id,
213 role: format!("{:?}", node.role()),
214 leader_id: node.leader_id(),
215 term: node.current_term(),
216 commit_index: node.commit_index(),
217 last_applied: node.last_applied(),
218 member_count: members.len(),
219 vshard_count,
220 });
221 }
222 statuses.sort_by_key(|s| s.group_id);
223 statuses
224 }
225
226 pub fn leader_for_vshard(&self, vshard_id: u32) -> Result<Option<u64>> {
228 let group_id = self.routing.group_for_vshard(vshard_id)?;
229 let node = self
230 .groups
231 .get(&group_id)
232 .ok_or(ClusterError::GroupNotFound { group_id })?;
233 let lid = node.leader_id();
234 Ok(if lid == 0 { None } else { Some(lid) })
235 }
236
237 pub fn propose(&mut self, vshard_id: u32, data: Vec<u8>) -> Result<(u64, u64)> {
241 let group_id = self.routing.group_for_vshard(vshard_id)?;
242 let node = self
243 .groups
244 .get_mut(&group_id)
245 .ok_or(ClusterError::GroupNotFound { group_id })?;
246 let log_index = node.propose(data)?;
247 Ok((group_id, log_index))
248 }
249
250 pub fn is_group_leader(&self, group_id: u64) -> bool {
255 use nodedb_raft::state::NodeRole;
256 self.groups
257 .get(&group_id)
258 .map(|n| n.role() == NodeRole::Leader)
259 .unwrap_or(false)
260 }
261
262 pub fn propose_to_group(&mut self, group_id: u64, data: Vec<u8>) -> Result<u64> {
267 let node = self
268 .groups
269 .get_mut(&group_id)
270 .ok_or(ClusterError::GroupNotFound { group_id })?;
271 Ok(node.propose(data)?)
272 }
273
274 pub fn read_committed_entries(
287 &self,
288 group_id: u64,
289 lo: u64,
290 hi: u64,
291 ) -> Result<Vec<nodedb_raft::message::LogEntry>> {
292 let node = self
293 .groups
294 .get(&group_id)
295 .ok_or(ClusterError::GroupNotFound { group_id })?;
296 let entries = node.log_entries_range(lo, hi)?;
297 Ok(entries.to_vec())
298 }
299}
300
301pub use nodedb_raft::LogEntry;
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use std::time::Instant;
308
309 #[test]
310 fn single_node_multi_raft() {
311 let dir = tempfile::tempdir().unwrap();
312 let rt = RoutingTable::uniform(4, &[1], 1);
314 let mut mr = MultiRaft::new(1, rt.clone(), dir.path().to_path_buf());
315
316 for gid in rt.group_ids() {
317 mr.add_group(gid, vec![]).unwrap();
318 }
319 assert_eq!(mr.group_count(), 5);
321
322 for node in mr.groups.values_mut() {
323 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
324 }
325
326 let ready = mr.tick();
327 assert_eq!(ready.groups.len(), 5);
328 }
329
330 #[test]
331 fn propose_routes_to_correct_group() {
332 let dir = tempfile::tempdir().unwrap();
333 let rt = RoutingTable::uniform(4, &[1], 1);
334 let mut mr = MultiRaft::new(1, rt.clone(), dir.path().to_path_buf());
335
336 for gid in rt.group_ids() {
337 mr.add_group(gid, vec![]).unwrap();
338 }
339 for node in mr.groups.values_mut() {
340 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
341 }
342 mr.tick();
343 for (gid, ready) in mr.tick().groups {
344 if let Some(last) = ready.committed_entries.last() {
345 mr.advance_applied(gid, last.index).unwrap();
346 }
347 }
348
349 let (_gid, idx) = mr.propose(0, b"cmd-shard-0".to_vec()).unwrap();
351 assert!(idx > 0);
352
353 let (_gid, idx) = mr.propose(256, b"cmd-shard-256".to_vec()).unwrap();
354 assert!(idx > 0);
355 }
356
357 #[test]
358 fn add_group_as_learner_starts_in_learner_role() {
359 use nodedb_raft::NodeRole;
360 let dir = tempfile::tempdir().unwrap();
361 let rt = RoutingTable::uniform(1, &[1, 2], 2);
363 let mut mr = MultiRaft::new(2, rt, dir.path().to_path_buf());
364
365 mr.add_group_as_learner(1, vec![1], vec![]).unwrap();
367
368 let node = mr.groups.get(&1).unwrap();
369 assert_eq!(node.role(), NodeRole::Learner);
370 assert_eq!(node.voters(), &[1]);
371 }
372}