Skip to main content

nodedb_cluster/raft_loop/
handle_rpc.rs

1//! Inbound Raft RPC dispatch — `impl RaftRpcHandler for RaftLoop`.
2//!
3//! Each RPC variant is either handled inline (Raft consensus RPCs that
4//! just lock `MultiRaft`) or delegated to a helper module — health,
5//! forwarding, VShard envelopes, or (for `JoinRequest`) the async
6//! orchestration in [`super::join`].
7
8use crate::error::{ClusterError, Result};
9use crate::forward::PlanExecutor;
10use crate::health;
11use crate::rpc_codec::RaftRpc;
12use crate::transport::RaftRpcHandler;
13
14use super::loop_core::{CommitApplier, RaftLoop};
15
16/// The Raft group that owns cluster topology / membership.
17///
18/// Group 0 is the "metadata" group and is the authoritative source of
19/// truth for who is in the cluster. Joins must be processed by its
20/// leader; this constant is also used by the join orchestration in
21/// [`super::join`].
22pub(super) const TOPOLOGY_GROUP_ID: u64 = 0;
23
24/// Outcome of the leader-check phase of the join flow.
25///
26/// Extracted as a pure enum so the decision logic can be unit-tested
27/// without spinning up a real `MultiRaft` just to observe its leader id.
28#[derive(Debug, PartialEq, Eq)]
29pub(super) enum JoinDecision {
30    /// This node is the group-0 leader (or the founding seed with no leader
31    /// elected yet). Admit the join locally.
32    Admit,
33    /// Another node is the group-0 leader. The client should retry at
34    /// `leader_addr`.
35    Redirect { leader_addr: String },
36}
37
38/// Pure decision: given the observed group-0 leader, this node's id, and
39/// the leader's address (as known to the local topology), should we
40/// admit the join or redirect?
41///
42/// - `group0_leader == 0` means "no elected leader yet". On a freshly
43///   bootstrapped single-seed cluster this is normal — the founding node
44///   is the only possible leader, so we accept.
45/// - `group0_leader == self_node_id` means we are the leader — accept.
46/// - Otherwise redirect. If the leader's address is unknown to topology
47///   (an operator error that shouldn't happen in practice), we still
48///   redirect with an empty string so the client at least sees the
49///   `"not leader"` prefix and can decide to try the next seed.
50pub(super) fn decide_join(
51    group0_leader: u64,
52    self_node_id: u64,
53    leader_addr: Option<String>,
54) -> JoinDecision {
55    if group0_leader == 0 || group0_leader == self_node_id {
56        JoinDecision::Admit
57    } else {
58        JoinDecision::Redirect {
59            leader_addr: leader_addr.unwrap_or_default(),
60        }
61    }
62}
63
64impl<A: CommitApplier, P: PlanExecutor> RaftRpcHandler for RaftLoop<A, P> {
65    async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
66        match rpc {
67            // Raft consensus RPCs — lock MultiRaft (sync, never across await).
68            RaftRpc::AppendEntriesRequest(req) => {
69                let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
70                let resp = mr.handle_append_entries(&req)?;
71                Ok(RaftRpc::AppendEntriesResponse(resp))
72            }
73            RaftRpc::RequestVoteRequest(req) => {
74                let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
75                let resp = mr.handle_request_vote(&req)?;
76                Ok(RaftRpc::RequestVoteResponse(resp))
77            }
78            RaftRpc::InstallSnapshotRequest(req) => {
79                let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
80                let resp = mr.handle_install_snapshot(&req)?;
81                Ok(RaftRpc::InstallSnapshotResponse(resp))
82            }
83            // Cluster join — full orchestration in `super::join`.
84            RaftRpc::JoinRequest(req) => Ok(RaftRpc::JoinResponse(self.join_flow(req).await)),
85            // Health check.
86            RaftRpc::Ping(req) => {
87                let topo_version = {
88                    let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
89                    topo.version()
90                };
91                Ok(health::handle_ping(self.node_id, topo_version, &req))
92            }
93            // Topology broadcast.
94            RaftRpc::TopologyUpdate(update) => {
95                let (updated, ack) =
96                    health::handle_topology_update(self.node_id, &self.topology, &update);
97                if updated {
98                    // Register every member's address with the transport
99                    // so raft RPCs to newly-learned peers actually have
100                    // a destination. Without this, a node that joined
101                    // early and then learned about a later joiner via
102                    // broadcast would hold a stale peer set in its
103                    // transport and AppendEntries to the new peer would
104                    // fail until the circuit breaker opened permanently.
105                    for node in &update.nodes {
106                        if node.node_id == self.node_id {
107                            continue;
108                        }
109                        match node.addr.parse::<std::net::SocketAddr>() {
110                            Ok(addr) => self.transport.register_peer(node.node_id, addr),
111                            Err(e) => tracing::warn!(
112                                node_id = node.node_id,
113                                addr = %node.addr,
114                                error = %e,
115                                "topology update contains unparseable peer address; skipping register_peer"
116                            ),
117                        }
118                    }
119                    // Persist the adopted topology so a subsequent
120                    // restart reads the latest member set from catalog
121                    // rather than the stale snapshot taken at join
122                    // time. Persist only when a catalog is attached;
123                    // failures are logged but never propagate — the
124                    // next TopologyUpdate will retry.
125                    if let Some(catalog) = self.catalog.as_ref() {
126                        let snap = self
127                            .topology
128                            .read()
129                            .unwrap_or_else(|p| p.into_inner())
130                            .clone();
131                        if let Err(e) = catalog.save_topology(&snap) {
132                            tracing::warn!(error = %e, "failed to persist topology update to catalog");
133                        }
134                    }
135                }
136                Ok(ack)
137            }
138            // Physical-plan execution (C-β) — execute locally via the PlanExecutor,
139            // skipping SQL re-planning entirely.
140            RaftRpc::ExecuteRequest(req) => {
141                let resp = self.plan_executor.execute_plan(req).await;
142                Ok(RaftRpc::ExecuteResponse(resp))
143            }
144            // Metadata-group proposal forwarding — apply locally if
145            // we're the metadata leader, otherwise return a
146            // NotLeader response with a leader hint so the
147            // forwarder can chase the redirect.
148            RaftRpc::MetadataProposeRequest(req) => {
149                let resp = match self.propose_to_metadata_group(req.bytes) {
150                    Ok(log_index) => crate::rpc_codec::MetadataProposeResponse::ok(log_index),
151                    Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
152                        leader_hint,
153                    })) => {
154                        crate::rpc_codec::MetadataProposeResponse::err("not leader", leader_hint)
155                    }
156                    Err(e) => crate::rpc_codec::MetadataProposeResponse::err(e.to_string(), None),
157                };
158                Ok(RaftRpc::MetadataProposeResponse(resp))
159            }
160            // VShardEnvelope — dispatch to registered handler (Event Plane, etc.).
161            RaftRpc::VShardEnvelope(bytes) => {
162                if let Some(ref handler) = self.vshard_handler {
163                    let response_bytes = handler(bytes).await?;
164                    Ok(RaftRpc::VShardEnvelope(response_bytes))
165                } else {
166                    Err(ClusterError::Transport {
167                        detail: "VShardEnvelope handler not configured".into(),
168                    })
169                }
170            }
171            other => Err(ClusterError::Transport {
172                detail: format!("unexpected request type in RPC handler: {other:?}"),
173            }),
174        }
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::multi_raft::MultiRaft;
182    use crate::routing::RoutingTable;
183    use crate::topology::{ClusterTopology, NodeInfo, NodeState};
184    use crate::transport::NexarTransport;
185    use nodedb_raft::message::LogEntry;
186    use std::sync::{Arc, RwLock};
187    use std::time::{Duration, Instant};
188
189    /// No-op applier for tests that don't care about state machine output.
190    struct NoopApplier;
191    impl CommitApplier for NoopApplier {
192        fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
193            entries.last().map(|e| e.index).unwrap_or(0)
194        }
195    }
196
197    fn make_transport(node_id: u64) -> Arc<NexarTransport> {
198        Arc::new(
199            NexarTransport::new(
200                node_id,
201                "127.0.0.1:0".parse().unwrap(),
202                crate::transport::credentials::TransportCredentials::Insecure,
203            )
204            .unwrap(),
205        )
206    }
207
208    #[tokio::test]
209    async fn rpc_handler_routes_append_entries() {
210        let dir = tempfile::tempdir().unwrap();
211        let transport = make_transport(1);
212        let rt = RoutingTable::uniform(1, &[1], 1);
213        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
214        mr.add_group(0, vec![]).unwrap();
215
216        for node in mr.groups_mut().values_mut() {
217            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
218        }
219
220        let topo = Arc::new(RwLock::new(ClusterTopology::new()));
221        let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
222
223        raft_loop.do_tick();
224        tokio::time::sleep(Duration::from_millis(20)).await;
225
226        let req = RaftRpc::AppendEntriesRequest(nodedb_raft::AppendEntriesRequest {
227            term: 99,
228            leader_id: 2,
229            prev_log_index: 0,
230            prev_log_term: 0,
231            entries: vec![],
232            leader_commit: 0,
233            group_id: 0,
234        });
235
236        let resp = raft_loop.handle_rpc(req).await.unwrap();
237        match resp {
238            RaftRpc::AppendEntriesResponse(r) => {
239                assert!(r.success);
240                assert_eq!(r.term, 99);
241            }
242            other => panic!("expected AppendEntriesResponse, got {other:?}"),
243        }
244    }
245
246    #[tokio::test]
247    async fn rpc_handler_routes_request_vote() {
248        let dir = tempfile::tempdir().unwrap();
249        let transport = make_transport(1);
250        let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
251        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
252        mr.add_group(0, vec![2, 3]).unwrap();
253
254        let topo = Arc::new(RwLock::new(ClusterTopology::new()));
255        let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
256
257        let req = RaftRpc::RequestVoteRequest(nodedb_raft::RequestVoteRequest {
258            term: 1,
259            candidate_id: 2,
260            last_log_index: 0,
261            last_log_term: 0,
262            group_id: 0,
263        });
264
265        let resp = raft_loop.handle_rpc(req).await.unwrap();
266        match resp {
267            RaftRpc::RequestVoteResponse(r) => {
268                assert!(r.vote_granted);
269                assert_eq!(r.term, 1);
270            }
271            other => panic!("expected RequestVoteResponse, got {other:?}"),
272        }
273    }
274
275    /// JoinRequest on a freshly-bootstrapped single-seed RaftLoop is
276    /// admitted locally: this node is leader of every group, so
277    /// `AddLearner` conf-changes are proposed and (because the groups
278    /// are single-voter) commit instantly.
279    #[tokio::test]
280    async fn rpc_handler_accepts_join_on_bootstrap_seed() {
281        let dir = tempfile::tempdir().unwrap();
282        let transport = make_transport(1);
283        let rt = RoutingTable::uniform(2, &[1], 1);
284        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
285        mr.add_group(0, vec![]).unwrap();
286        mr.add_group(1, vec![]).unwrap();
287        // Force immediate election so both groups reach Leader before
288        // the join flow proposes AddLearner.
289        for node in mr.groups_mut().values_mut() {
290            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
291        }
292
293        let mut topology = ClusterTopology::new();
294        topology.add_node(NodeInfo::new(
295            1,
296            "127.0.0.1:9400".parse().unwrap(),
297            NodeState::Active,
298        ));
299        let topo = Arc::new(RwLock::new(topology));
300
301        let raft_loop = RaftLoop::new(mr, transport, topo.clone(), NoopApplier);
302        raft_loop.do_tick();
303        tokio::time::sleep(Duration::from_millis(20)).await;
304
305        let req = RaftRpc::JoinRequest(crate::rpc_codec::JoinRequest {
306            node_id: 2,
307            listen_addr: "127.0.0.1:9401".into(),
308            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
309        });
310
311        let resp = raft_loop.handle_rpc(req).await.unwrap();
312        match resp {
313            RaftRpc::JoinResponse(r) => {
314                assert!(
315                    r.success,
316                    "join should succeed on bootstrap seed: {}",
317                    r.error
318                );
319                assert_eq!(r.nodes.len(), 2);
320                assert_eq!(r.groups.len(), 2);
321                assert_eq!(r.vshard_to_group.len(), 1024);
322                // The new node should appear as a learner on every group,
323                // not as a voter — voter promotion happens asynchronously
324                // via the tick loop's promotion phase.
325                for g in &r.groups {
326                    assert!(
327                        g.learners.contains(&2),
328                        "expected node 2 as learner in group {}, got learners={:?} members={:?}",
329                        g.group_id,
330                        g.learners,
331                        g.members
332                    );
333                }
334            }
335            other => panic!("expected JoinResponse, got {other:?}"),
336        }
337
338        let topo_guard = topo.read().unwrap();
339        assert_eq!(topo_guard.node_count(), 2);
340        assert!(topo_guard.contains(2));
341    }
342
343    #[test]
344    fn decide_join_self_leader_admits() {
345        assert_eq!(
346            decide_join(7, 7, Some("10.0.0.7:9400".into())),
347            JoinDecision::Admit
348        );
349    }
350
351    #[test]
352    fn decide_join_no_leader_yet_admits() {
353        assert_eq!(decide_join(0, 7, None), JoinDecision::Admit);
354    }
355
356    #[test]
357    fn decide_join_other_leader_redirects() {
358        assert_eq!(
359            decide_join(1, 7, Some("10.0.0.1:9400".into())),
360            JoinDecision::Redirect {
361                leader_addr: "10.0.0.1:9400".into()
362            }
363        );
364    }
365
366    #[test]
367    fn decide_join_other_leader_unknown_addr_still_redirects() {
368        assert_eq!(
369            decide_join(1, 7, None),
370            JoinDecision::Redirect {
371                leader_addr: String::new()
372            }
373        );
374    }
375}