Skip to main content

nodedb_cluster/multi_raft/
core.rs

1//! `MultiRaft` struct, constructors, group lifecycle, tick, observability.
2
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::time::Duration;
6
7use tracing::info;
8
9use nodedb_raft::node::RaftConfig;
10use nodedb_raft::{RaftNode, Ready};
11
12use crate::error::{ClusterError, Result};
13use crate::raft_storage::RedbLogStorage;
14use crate::routing::RoutingTable;
15
16/// Snapshot of a single Raft group's state for observability.
17#[derive(Debug, Clone, serde::Serialize)]
18pub struct GroupStatus {
19    pub group_id: u64,
20    /// Role as a human-readable string ("Leader", "Follower", "Candidate", "Learner").
21    pub role: String,
22    pub leader_id: u64,
23    pub term: u64,
24    pub commit_index: u64,
25    pub last_applied: u64,
26    pub member_count: usize,
27    pub vshard_count: usize,
28}
29
30/// Multi-Raft coordinator managing multiple Raft groups on a single node.
31///
32/// This coordinator:
33/// - Manages all Raft groups hosted on this node
34/// - Batches heartbeats across groups sharing the same leader
35/// - Routes incoming RPCs to the correct group
36/// - Collects `Ready` output from all groups for the caller to execute
37pub struct MultiRaft {
38    /// This node's ID.
39    pub(super) node_id: u64,
40    /// Raft groups hosted on this node (group_id → RaftNode).
41    pub(super) groups: HashMap<u64, RaftNode<RedbLogStorage>>,
42    /// Routing table (vShard → group mapping).
43    pub(super) routing: RoutingTable,
44    /// Default election timeout range.
45    pub(super) election_timeout_min: Duration,
46    pub(super) election_timeout_max: Duration,
47    /// Heartbeat interval.
48    pub(super) heartbeat_interval: Duration,
49    /// Data directory for persistent Raft log storage.
50    pub(super) data_dir: PathBuf,
51}
52
53/// Aggregated output from all Raft groups after a tick.
54#[derive(Debug, Default)]
55pub struct MultiRaftReady {
56    /// Per-group ready output: (group_id, Ready).
57    pub groups: Vec<(u64, Ready)>,
58}
59
60impl MultiRaftReady {
61    pub fn is_empty(&self) -> bool {
62        self.groups.iter().all(|(_gid, r)| r.is_empty())
63    }
64
65    /// Total committed entries across all groups.
66    pub fn total_committed(&self) -> usize {
67        self.groups
68            .iter()
69            .map(|(_, r)| r.committed_entries.len())
70            .sum()
71    }
72}
73
74impl MultiRaft {
75    pub fn new(node_id: u64, routing: RoutingTable, data_dir: PathBuf) -> Self {
76        Self {
77            node_id,
78            groups: HashMap::new(),
79            routing,
80            election_timeout_min: Duration::from_secs(2),
81            election_timeout_max: Duration::from_secs(5),
82            heartbeat_interval: Duration::from_millis(50),
83            data_dir,
84        }
85    }
86
87    /// Configure election timeout range.
88    pub fn with_election_timeout(mut self, min: Duration, max: Duration) -> Self {
89        self.election_timeout_min = min;
90        self.election_timeout_max = max;
91        self
92    }
93
94    /// Configure heartbeat interval.
95    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
96        self.heartbeat_interval = interval;
97        self
98    }
99
100    /// Initialize a Raft group on this node as a voting member.
101    ///
102    /// `peers` is the list of other voters in the group (excluding self).
103    /// For a learner-start group, use `add_group_as_learner` instead.
104    pub fn add_group(&mut self, group_id: u64, peers: Vec<u64>) -> Result<()> {
105        self.add_group_inner(group_id, peers, vec![], false)
106    }
107
108    /// Initialize a Raft group on this node as a non-voting learner.
109    ///
110    /// The local node boots in the `Learner` role and will not stand for
111    /// election until it is promoted by a `PromoteLearner` conf change.
112    ///
113    /// `voters` is the full voter set of the group (excluding self).
114    /// `learners` is the learner set of the group excluding self — usually
115    /// empty unless multiple learners are being admitted in the same round.
116    pub fn add_group_as_learner(
117        &mut self,
118        group_id: u64,
119        voters: Vec<u64>,
120        learners: Vec<u64>,
121    ) -> Result<()> {
122        self.add_group_inner(group_id, voters, learners, true)
123    }
124
125    fn add_group_inner(
126        &mut self,
127        group_id: u64,
128        peers: Vec<u64>,
129        learners: Vec<u64>,
130        starts_as_learner: bool,
131    ) -> Result<()> {
132        let config = RaftConfig {
133            node_id: self.node_id,
134            group_id,
135            peers,
136            learners,
137            starts_as_learner,
138            election_timeout_min: self.election_timeout_min,
139            election_timeout_max: self.election_timeout_max,
140            heartbeat_interval: self.heartbeat_interval,
141        };
142
143        let storage_path = self.data_dir.join(format!("raft/group-{group_id}.redb"));
144        let storage = RedbLogStorage::open(&storage_path).map_err(|e| ClusterError::Transport {
145            detail: format!("failed to open raft storage for group {group_id}: {e}"),
146        })?;
147        let node = RaftNode::new(config, storage);
148        self.groups.insert(group_id, node);
149
150        info!(
151            node = self.node_id,
152            group = group_id,
153            as_learner = starts_as_learner,
154            path = %storage_path.display(),
155            "added raft group with persistent storage"
156        );
157        Ok(())
158    }
159
160    /// Tick all Raft groups. Returns aggregated ready output.
161    pub fn tick(&mut self) -> MultiRaftReady {
162        let mut ready = MultiRaftReady::default();
163
164        for (&group_id, node) in &mut self.groups {
165            node.tick();
166            let r = node.take_ready();
167            if !r.is_empty() {
168                ready.groups.push((group_id, r));
169            }
170        }
171
172        ready
173    }
174
175    pub fn routing(&self) -> &RoutingTable {
176        &self.routing
177    }
178
179    pub fn routing_mut(&mut self) -> &mut RoutingTable {
180        &mut self.routing
181    }
182
183    pub fn node_id(&self) -> u64 {
184        self.node_id
185    }
186
187    pub fn group_count(&self) -> usize {
188        self.groups.len()
189    }
190
191    /// Mutable access to the underlying Raft groups (for testing / bootstrap).
192    pub fn groups_mut(&mut self) -> &mut HashMap<u64, RaftNode<RedbLogStorage>> {
193        &mut self.groups
194    }
195
196    /// Snapshot of all Raft group states for observability.
197    pub fn group_statuses(&self) -> Vec<GroupStatus> {
198        let mut statuses = Vec::with_capacity(self.groups.len());
199        for (&group_id, node) in &self.groups {
200            let vshard_count = self.routing.vshards_for_group(group_id).len();
201            let members = self
202                .routing
203                .group_info(group_id)
204                .map(|info| info.members.clone())
205                .unwrap_or_default();
206
207            statuses.push(GroupStatus {
208                group_id,
209                role: format!("{:?}", node.role()),
210                leader_id: node.leader_id(),
211                term: node.current_term(),
212                commit_index: node.commit_index(),
213                last_applied: node.last_applied(),
214                member_count: members.len(),
215                vshard_count,
216            });
217        }
218        statuses.sort_by_key(|s| s.group_id);
219        statuses
220    }
221
222    /// Get the leader for a given vShard (from local group state).
223    pub fn leader_for_vshard(&self, vshard_id: u16) -> Result<Option<u64>> {
224        let group_id = self.routing.group_for_vshard(vshard_id)?;
225        let node = self
226            .groups
227            .get(&group_id)
228            .ok_or(ClusterError::GroupNotFound { group_id })?;
229        let lid = node.leader_id();
230        Ok(if lid == 0 { None } else { Some(lid) })
231    }
232
233    /// Propose a command to the Raft group that owns the given vShard.
234    ///
235    /// Returns `(group_id, log_index)` on success.
236    pub fn propose(&mut self, vshard_id: u16, data: Vec<u8>) -> Result<(u64, u64)> {
237        let group_id = self.routing.group_for_vshard(vshard_id)?;
238        let node = self
239            .groups
240            .get_mut(&group_id)
241            .ok_or(ClusterError::GroupNotFound { group_id })?;
242        let log_index = node.propose(data)?;
243        Ok((group_id, log_index))
244    }
245
246    /// Propose a command directly to a specific Raft group (e.g. the
247    /// metadata group, which has no vShard mapping).
248    ///
249    /// Returns the committed log index on success.
250    pub fn propose_to_group(&mut self, group_id: u64, data: Vec<u8>) -> Result<u64> {
251        let node = self
252            .groups
253            .get_mut(&group_id)
254            .ok_or(ClusterError::GroupNotFound { group_id })?;
255        Ok(node.propose(data)?)
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use std::time::Instant;
263
264    #[test]
265    fn single_node_multi_raft() {
266        let dir = tempfile::tempdir().unwrap();
267        let rt = RoutingTable::uniform(4, &[1], 1);
268        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
269
270        for gid in 0..4 {
271            mr.add_group(gid, vec![]).unwrap();
272        }
273        assert_eq!(mr.group_count(), 4);
274
275        for node in mr.groups.values_mut() {
276            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
277        }
278
279        let ready = mr.tick();
280        assert_eq!(ready.groups.len(), 4);
281    }
282
283    #[test]
284    fn propose_routes_to_correct_group() {
285        let dir = tempfile::tempdir().unwrap();
286        let rt = RoutingTable::uniform(4, &[1], 1);
287        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
288
289        for gid in 0..4 {
290            mr.add_group(gid, vec![]).unwrap();
291        }
292        for node in mr.groups.values_mut() {
293            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
294        }
295        mr.tick();
296        for (gid, ready) in mr.tick().groups {
297            if let Some(last) = ready.committed_entries.last() {
298                mr.advance_applied(gid, last.index).unwrap();
299            }
300        }
301
302        let (_gid, idx) = mr.propose(0, b"cmd-shard-0".to_vec()).unwrap();
303        assert!(idx > 0);
304
305        let (_gid, idx) = mr.propose(256, b"cmd-shard-256".to_vec()).unwrap();
306        assert!(idx > 0);
307    }
308
309    #[test]
310    fn add_group_as_learner_starts_in_learner_role() {
311        use nodedb_raft::NodeRole;
312        let dir = tempfile::tempdir().unwrap();
313        let rt = RoutingTable::uniform(1, &[1, 2], 2);
314        let mut mr = MultiRaft::new(2, rt, dir.path().to_path_buf());
315
316        mr.add_group_as_learner(0, vec![1], vec![]).unwrap();
317
318        let node = mr.groups.get(&0).unwrap();
319        assert_eq!(node.role(), NodeRole::Learner);
320        assert_eq!(node.voters(), &[1]);
321    }
322}