Skip to main content

nodedb_cluster/multi_raft/
core.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `MultiRaft` struct, constructors, group lifecycle, tick, observability.
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::time::Duration;
8
9use tracing::info;
10
11use nodedb_raft::node::RaftConfig;
12use nodedb_raft::{RaftNode, Ready};
13
14use crate::error::{ClusterError, Result};
15use crate::raft_storage::RedbLogStorage;
16use crate::routing::RoutingTable;
17
18/// Snapshot of a single Raft group's state for observability.
19#[derive(Debug, Clone, serde::Serialize)]
20pub struct GroupStatus {
21    pub group_id: u64,
22    /// Role as a human-readable string ("Leader", "Follower", "Candidate", "Learner").
23    pub role: String,
24    pub leader_id: u64,
25    pub term: u64,
26    pub commit_index: u64,
27    pub last_applied: u64,
28    pub member_count: usize,
29    pub vshard_count: usize,
30}
31
32/// Multi-Raft coordinator managing multiple Raft groups on a single node.
33///
34/// This coordinator:
35/// - Manages all Raft groups hosted on this node
36/// - Batches heartbeats across groups sharing the same leader
37/// - Routes incoming RPCs to the correct group
38/// - Collects `Ready` output from all groups for the caller to execute
39pub struct MultiRaft {
40    /// This node's ID.
41    pub(super) node_id: u64,
42    /// Raft groups hosted on this node (group_id → RaftNode).
43    pub(super) groups: HashMap<u64, RaftNode<RedbLogStorage>>,
44    /// Routing table (vShard → group mapping).
45    pub(super) routing: RoutingTable,
46    /// Default election timeout range.
47    pub(super) election_timeout_min: Duration,
48    pub(super) election_timeout_max: Duration,
49    /// Heartbeat interval.
50    pub(super) heartbeat_interval: Duration,
51    /// Data directory for persistent Raft log storage.
52    pub(super) data_dir: PathBuf,
53}
54
55/// Aggregated output from all Raft groups after a tick.
56#[derive(Debug, Default)]
57pub struct MultiRaftReady {
58    /// Per-group ready output: (group_id, Ready).
59    pub groups: Vec<(u64, Ready)>,
60}
61
62impl MultiRaftReady {
63    pub fn is_empty(&self) -> bool {
64        self.groups.iter().all(|(_gid, r)| r.is_empty())
65    }
66
67    /// Total committed entries across all groups.
68    pub fn total_committed(&self) -> usize {
69        self.groups
70            .iter()
71            .map(|(_, r)| r.committed_entries.len())
72            .sum()
73    }
74}
75
76impl MultiRaft {
77    pub fn new(node_id: u64, routing: RoutingTable, data_dir: PathBuf) -> Self {
78        Self {
79            node_id,
80            groups: HashMap::new(),
81            routing,
82            election_timeout_min: Duration::from_secs(2),
83            election_timeout_max: Duration::from_secs(5),
84            heartbeat_interval: Duration::from_millis(50),
85            data_dir,
86        }
87    }
88
89    /// Configure election timeout range.
90    pub fn with_election_timeout(mut self, min: Duration, max: Duration) -> Self {
91        self.election_timeout_min = min;
92        self.election_timeout_max = max;
93        self
94    }
95
96    /// Configure heartbeat interval.
97    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
98        self.heartbeat_interval = interval;
99        self
100    }
101
102    /// Initialize a Raft group on this node as a voting member.
103    ///
104    /// `peers` is the list of other voters in the group (excluding self).
105    /// For a learner-start group, use `add_group_as_learner` instead.
106    pub fn add_group(&mut self, group_id: u64, peers: Vec<u64>) -> Result<()> {
107        self.add_group_inner(group_id, peers, vec![], false)
108    }
109
110    /// Initialize a Raft group on this node as a non-voting learner.
111    ///
112    /// The local node boots in the `Learner` role and will not stand for
113    /// election until it is promoted by a `PromoteLearner` conf change.
114    ///
115    /// `voters` is the full voter set of the group (excluding self).
116    /// `learners` is the learner set of the group excluding self — usually
117    /// empty unless multiple learners are being admitted in the same round.
118    pub fn add_group_as_learner(
119        &mut self,
120        group_id: u64,
121        voters: Vec<u64>,
122        learners: Vec<u64>,
123    ) -> Result<()> {
124        self.add_group_inner(group_id, voters, learners, true)
125    }
126
127    fn add_group_inner(
128        &mut self,
129        group_id: u64,
130        peers: Vec<u64>,
131        learners: Vec<u64>,
132        starts_as_learner: bool,
133    ) -> Result<()> {
134        let config = RaftConfig {
135            node_id: self.node_id,
136            group_id,
137            peers,
138            learners,
139            observers: vec![],
140            starts_as_learner,
141            starts_as_observer: false,
142            election_timeout_min: self.election_timeout_min,
143            election_timeout_max: self.election_timeout_max,
144            heartbeat_interval: self.heartbeat_interval,
145        };
146
147        let storage_path = self.data_dir.join(format!("raft/group-{group_id}.redb"));
148        let storage = RedbLogStorage::open(&storage_path).map_err(|e| ClusterError::Transport {
149            detail: format!("failed to open raft storage for group {group_id}: {e}"),
150        })?;
151        let node = RaftNode::new(config, storage);
152        self.groups.insert(group_id, node);
153
154        info!(
155            node = self.node_id,
156            group = group_id,
157            as_learner = starts_as_learner,
158            path = %storage_path.display(),
159            "added raft group with persistent storage"
160        );
161        Ok(())
162    }
163
164    /// Tick all Raft groups. Returns aggregated ready output.
165    pub fn tick(&mut self) -> MultiRaftReady {
166        let mut ready = MultiRaftReady::default();
167
168        for (&group_id, node) in &mut self.groups {
169            node.tick();
170            let r = node.take_ready();
171            if !r.is_empty() {
172                ready.groups.push((group_id, r));
173            }
174        }
175
176        ready
177    }
178
179    pub fn routing(&self) -> &RoutingTable {
180        &self.routing
181    }
182
183    pub fn routing_mut(&mut self) -> &mut RoutingTable {
184        &mut self.routing
185    }
186
187    pub fn node_id(&self) -> u64 {
188        self.node_id
189    }
190
191    pub fn group_count(&self) -> usize {
192        self.groups.len()
193    }
194
195    /// Mutable access to the underlying Raft groups (for testing / bootstrap).
196    pub fn groups_mut(&mut self) -> &mut HashMap<u64, RaftNode<RedbLogStorage>> {
197        &mut self.groups
198    }
199
200    /// Snapshot of all Raft group states for observability.
201    pub fn group_statuses(&self) -> Vec<GroupStatus> {
202        let mut statuses = Vec::with_capacity(self.groups.len());
203        for (&group_id, node) in &self.groups {
204            let vshard_count = self.routing.vshards_for_group(group_id).len();
205            let members = self
206                .routing
207                .group_info(group_id)
208                .map(|info| info.members.clone())
209                .unwrap_or_default();
210
211            statuses.push(GroupStatus {
212                group_id,
213                role: format!("{:?}", node.role()),
214                leader_id: node.leader_id(),
215                term: node.current_term(),
216                commit_index: node.commit_index(),
217                last_applied: node.last_applied(),
218                member_count: members.len(),
219                vshard_count,
220            });
221        }
222        statuses.sort_by_key(|s| s.group_id);
223        statuses
224    }
225
226    /// Get the leader for a given vShard (from local group state).
227    pub fn leader_for_vshard(&self, vshard_id: u32) -> Result<Option<u64>> {
228        let group_id = self.routing.group_for_vshard(vshard_id)?;
229        let node = self
230            .groups
231            .get(&group_id)
232            .ok_or(ClusterError::GroupNotFound { group_id })?;
233        let lid = node.leader_id();
234        Ok(if lid == 0 { None } else { Some(lid) })
235    }
236
237    /// Propose a command to the Raft group that owns the given vShard.
238    ///
239    /// Returns `(group_id, log_index)` on success.
240    pub fn propose(&mut self, vshard_id: u32, data: Vec<u8>) -> Result<(u64, u64)> {
241        let group_id = self.routing.group_for_vshard(vshard_id)?;
242        let node = self
243            .groups
244            .get_mut(&group_id)
245            .ok_or(ClusterError::GroupNotFound { group_id })?;
246        let log_index = node.propose(data)?;
247        Ok((group_id, log_index))
248    }
249
250    /// Returns `true` if this node is currently the leader of `group_id`.
251    ///
252    /// Returns `false` when the group does not exist on this node or when the
253    /// node is a follower, candidate, or learner in the group.
254    pub fn is_group_leader(&self, group_id: u64) -> bool {
255        use nodedb_raft::state::NodeRole;
256        self.groups
257            .get(&group_id)
258            .map(|n| n.role() == NodeRole::Leader)
259            .unwrap_or(false)
260    }
261
262    /// Propose a command directly to a specific Raft group (e.g. the
263    /// metadata group, which has no vShard mapping).
264    ///
265    /// Returns the committed log index on success.
266    pub fn propose_to_group(&mut self, group_id: u64, data: Vec<u8>) -> Result<u64> {
267        let node = self
268            .groups
269            .get_mut(&group_id)
270            .ok_or(ClusterError::GroupNotFound { group_id })?;
271        Ok(node.propose(data)?)
272    }
273
274    /// Read committed log entries for a Raft group in the inclusive index
275    /// range `[lo, hi]`.
276    ///
277    /// `hi` is clamped to the group's `commit_index` so callers that pass
278    /// `u64::MAX` never read uncommitted entries.
279    ///
280    /// Used by the Calvin scheduler's rebuild path to replay sequenced
281    /// transactions from the sequencer Raft log after a restart.
282    ///
283    /// Returns `Err(ClusterError::Raft(RaftError::LogCompacted))` if `lo`
284    /// has been compacted into a snapshot (caller must install a snapshot
285    /// instead of replaying from log).
286    pub fn read_committed_entries(
287        &self,
288        group_id: u64,
289        lo: u64,
290        hi: u64,
291    ) -> Result<Vec<nodedb_raft::message::LogEntry>> {
292        let node = self
293            .groups
294            .get(&group_id)
295            .ok_or(ClusterError::GroupNotFound { group_id })?;
296        let entries = node.log_entries_range(lo, hi)?;
297        Ok(entries.to_vec())
298    }
299}
300
301// Re-export LogEntry so callers of `read_committed_entries` can name the type.
302pub use nodedb_raft::LogEntry;
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use std::time::Instant;
308
309    #[test]
310    fn single_node_multi_raft() {
311        let dir = tempfile::tempdir().unwrap();
312        // uniform(4, ...) creates 4 data groups (1..=4) plus metadata group 0.
313        let rt = RoutingTable::uniform(4, &[1], 1);
314        let mut mr = MultiRaft::new(1, rt.clone(), dir.path().to_path_buf());
315
316        for gid in rt.group_ids() {
317            mr.add_group(gid, vec![]).unwrap();
318        }
319        // 4 data groups + 1 metadata group.
320        assert_eq!(mr.group_count(), 5);
321
322        for node in mr.groups.values_mut() {
323            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
324        }
325
326        let ready = mr.tick();
327        assert_eq!(ready.groups.len(), 5);
328    }
329
330    #[test]
331    fn propose_routes_to_correct_group() {
332        let dir = tempfile::tempdir().unwrap();
333        let rt = RoutingTable::uniform(4, &[1], 1);
334        let mut mr = MultiRaft::new(1, rt.clone(), dir.path().to_path_buf());
335
336        for gid in rt.group_ids() {
337            mr.add_group(gid, vec![]).unwrap();
338        }
339        for node in mr.groups.values_mut() {
340            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
341        }
342        mr.tick();
343        for (gid, ready) in mr.tick().groups {
344            if let Some(last) = ready.committed_entries.last() {
345                mr.advance_applied(gid, last.index).unwrap();
346            }
347        }
348
349        // vshard 0 maps to data group 1, vshard 256 also maps to group 1 (256 % 4 + 1 = 1).
350        let (_gid, idx) = mr.propose(0, b"cmd-shard-0".to_vec()).unwrap();
351        assert!(idx > 0);
352
353        let (_gid, idx) = mr.propose(256, b"cmd-shard-256".to_vec()).unwrap();
354        assert!(idx > 0);
355    }
356
357    #[test]
358    fn add_group_as_learner_starts_in_learner_role() {
359        use nodedb_raft::NodeRole;
360        let dir = tempfile::tempdir().unwrap();
361        // uniform(1, ...) creates data group 1 plus metadata group 0.
362        let rt = RoutingTable::uniform(1, &[1, 2], 2);
363        let mut mr = MultiRaft::new(2, rt, dir.path().to_path_buf());
364
365        // Data group 1: join as learner (node 1 is the voter, we're node 2 = learner).
366        mr.add_group_as_learner(1, vec![1], vec![]).unwrap();
367
368        let node = mr.groups.get(&1).unwrap();
369        assert_eq!(node.role(), NodeRole::Learner);
370        assert_eq!(node.voters(), &[1]);
371    }
372}