nodedb_cluster/raft_loop/handle_rpc.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Inbound Raft RPC dispatch — `impl RaftRpcHandler for RaftLoop`.
4//!
5//! Each RPC variant is either handled inline (Raft consensus RPCs that
6//! just lock `MultiRaft`) or delegated to a helper module — health,
7//! forwarding, VShard envelopes, or (for `JoinRequest`) the async
8//! orchestration in [`super::join`].
9
10use crate::error::{ClusterError, Result};
11use crate::forward::PlanExecutor;
12use crate::health;
13use crate::rpc_codec::RaftRpc;
14use crate::transport::RaftRpcHandler;
15
16use super::loop_core::{CommitApplier, RaftLoop};
17
18/// The Raft group that owns cluster topology / membership.
19///
20/// Group 0 is the "metadata" group and is the authoritative source of
21/// truth for who is in the cluster. Joins must be processed by its
22/// leader; this constant is also used by the join orchestration in
23/// [`super::join`].
24pub(super) const TOPOLOGY_GROUP_ID: u64 = 0;
25
26/// Outcome of the leader-check phase of the join flow.
27///
28/// Extracted as a pure enum so the decision logic can be unit-tested
29/// without spinning up a real `MultiRaft` just to observe its leader id.
30#[derive(Debug, PartialEq, Eq)]
31pub(super) enum JoinDecision {
32 /// This node is the group-0 leader (or the founding seed with no leader
33 /// elected yet). Admit the join locally.
34 Admit,
35 /// Another node is the group-0 leader. The client should retry at
36 /// `leader_addr`.
37 Redirect { leader_addr: String },
38}
39
40/// Pure decision: given the observed group-0 leader, this node's id, and
41/// the leader's address (as known to the local topology), should we
42/// admit the join or redirect?
43///
44/// - `group0_leader == 0` means "no elected leader yet". On a freshly
45/// bootstrapped single-seed cluster this is normal — the founding node
46/// is the only possible leader, so we accept.
47/// - `group0_leader == self_node_id` means we are the leader — accept.
48/// - Otherwise redirect. If the leader's address is unknown to topology
49/// (an operator error that shouldn't happen in practice), we still
50/// redirect with an empty string so the client at least sees the
51/// `"not leader"` prefix and can decide to try the next seed.
52pub(super) fn decide_join(
53 group0_leader: u64,
54 self_node_id: u64,
55 leader_addr: Option<String>,
56) -> JoinDecision {
57 if group0_leader == 0 || group0_leader == self_node_id {
58 JoinDecision::Admit
59 } else {
60 JoinDecision::Redirect {
61 leader_addr: leader_addr.unwrap_or_default(),
62 }
63 }
64}
65
66impl<A: CommitApplier, P: PlanExecutor> RaftRpcHandler for RaftLoop<A, P> {
67 async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
68 match rpc {
69 // Raft consensus RPCs — lock MultiRaft (sync, never across await).
70 RaftRpc::AppendEntriesRequest(req) => {
71 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
72 let resp = mr.handle_append_entries(&req)?;
73 Ok(RaftRpc::AppendEntriesResponse(resp))
74 }
75 RaftRpc::RequestVoteRequest(req) => {
76 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
77 let resp = mr.handle_request_vote(&req)?;
78 Ok(RaftRpc::RequestVoteResponse(resp))
79 }
80 RaftRpc::InstallSnapshotRequest(req) => {
81 // Validate snapshot framing for any non-empty chunk.
82 // Empty data is the bootstrap stub (no engine data yet); skip
83 // framing in that case. When a real engine ships data it calls
84 // `encode_snapshot_chunk` on the sender side and enforcement
85 // happens here automatically.
86 if !req.data.is_empty() {
87 // Short-circuit immediately if this chunk has already been
88 // quarantined after two consecutive CRC failures. Without
89 // this check a quarantined chunk would re-attempt the
90 // (always-failing) decode on every incoming RPC and never
91 // surface a stable, operator-visible error.
92 if let Some(ref hook) = self.snapshot_quarantine_hook
93 && hook.is_quarantined(req.group_id, req.last_included_index)
94 {
95 return Err(crate::error::ClusterError::Codec {
96 detail: format!(
97 "InstallSnapshot chunk quarantined: group={} index={}",
98 req.group_id, req.last_included_index
99 ),
100 });
101 }
102
103 match nodedb_raft::decode_snapshot_chunk(&req.data) {
104 Ok(_) => {
105 // Successful decode — reset any prior strike so a
106 // single transient CRC error does not permanently
107 // count against a healthy peer.
108 if let Some(ref hook) = self.snapshot_quarantine_hook {
109 hook.record_success(req.group_id, req.last_included_index);
110 }
111 }
112 Err(e) => {
113 let is_crc_class = matches!(
114 e,
115 nodedb_raft::snapshot_framing::SnapshotFramingError::CrcMismatch {
116 ..
117 }
118 | nodedb_raft::snapshot_framing::SnapshotFramingError::Truncated(
119 _
120 )
121 );
122 if is_crc_class && let Some(ref hook) = self.snapshot_quarantine_hook {
123 hook.record_failure(
124 req.group_id,
125 req.last_included_index,
126 &e.to_string(),
127 );
128 }
129 return Err(crate::error::ClusterError::Codec {
130 detail: format!("InstallSnapshot framing: {e}"),
131 });
132 }
133 }
134 }
135
136 let last_included_index = req.last_included_index;
137 let group_id = req.group_id;
138
139 // Route through the chunk accumulator when a data directory is
140 // configured. The accumulator writes chunks to a `.partial` file,
141 // validates the full CRC on the final chunk, and then calls
142 // `mr.handle_install_snapshot` after atomic rename.
143 //
144 // When `data_dir` is `None` (unit tests that don't set a data
145 // directory) fall through to the original direct call so test
146 // coverage for Raft state-machine transitions is unaffected.
147 //
148 // Quarantine accounting for offset regression and CRC errors is
149 // preserved: the `SnapshotOffsetRegression` and
150 // `SnapshotCrcMismatch` error paths in the receiver both surface
151 // as `ClusterError` variants that are propagated here.
152 if let Some(ref data_dir) = self.data_dir {
153 match crate::install_snapshot::receiver::handle_chunk(
154 &req,
155 &self.partial_snapshots,
156 data_dir,
157 &self.multi_raft,
158 )
159 .await
160 {
161 Ok(crate::install_snapshot::ChunkOutcome::Committed(snap_resp)) => {
162 // Final chunk committed — bump watcher for metadata group.
163 if group_id == TOPOLOGY_GROUP_ID {
164 self.group_watchers.bump(group_id, last_included_index);
165 }
166 return Ok(RaftRpc::InstallSnapshotResponse(snap_resp));
167 }
168 Ok(crate::install_snapshot::ChunkOutcome::Pending) => {
169 // Non-final chunk — pass a done=false stub to MultiRaft so
170 // it resets its election timeout and returns the current term.
171 let pending_req = nodedb_raft::InstallSnapshotRequest {
172 term: req.term,
173 leader_id: req.leader_id,
174 last_included_index: req.last_included_index,
175 last_included_term: req.last_included_term,
176 offset: req.offset,
177 data: vec![],
178 done: false,
179 group_id,
180 total_size: 0,
181 };
182 let resp = {
183 let mut mr =
184 self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
185 mr.handle_install_snapshot(&pending_req)?
186 };
187 return Ok(RaftRpc::InstallSnapshotResponse(resp));
188 }
189 Err(e @ crate::error::ClusterError::SnapshotOffsetRegression { .. }) => {
190 // Record the regression as a quarantine strike so the
191 // sender knows to retransmit from offset 0.
192 if let Some(ref hook) = self.snapshot_quarantine_hook {
193 hook.record_failure(group_id, last_included_index, &e.to_string());
194 }
195 // Reset partial state so the next offset-0 chunk starts fresh.
196 self.partial_snapshots
197 .lock()
198 .unwrap_or_else(|p| p.into_inner())
199 .remove(&group_id);
200 return Err(e);
201 }
202 Err(e @ crate::error::ClusterError::SnapshotCrcMismatch { .. }) => {
203 if let Some(ref hook) = self.snapshot_quarantine_hook {
204 hook.record_failure(group_id, last_included_index, &e.to_string());
205 }
206 return Err(e);
207 }
208 Err(e) => return Err(e),
209 }
210 }
211
212 // Fallback: no data_dir — direct call (unit test path).
213 let resp = {
214 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
215 mr.handle_install_snapshot(&req)?
216 };
217 // Watcher contract: `applied_index` means "state visible
218 // on this node up to index N", NOT "raft has advanced to
219 // N". Bumping the watcher must therefore mirror actual
220 // state-machine progress.
221 //
222 // - Metadata group: `mr.handle_install_snapshot` restores
223 // the metadata state machine synchronously before
224 // returning, so the watcher can be bumped here — state
225 // IS visible at `last_included_index`.
226 //
227 // - Data groups: snapshot install fast-forwards raft's
228 // `last_applied` but does NOT restore the data-plane
229 // state machine (no committed entries are produced for
230 // `run_apply_loop`, and there is currently no
231 // data-group state-machine snapshot restore path).
232 // Bumping the watcher here would wake waiters that
233 // then read missing state — silent data-loss-shaped
234 // bug. The data-group watcher is bumped only by the
235 // host crate's apply loop after the SPSC round-trip
236 // completes; that path is the single source of truth
237 // for "state visible".
238 //
239 // When data-group state-machine snapshots are
240 // implemented, the restore path must bump the watcher
241 // itself — not this handler.
242 if group_id == TOPOLOGY_GROUP_ID {
243 self.group_watchers.bump(group_id, last_included_index);
244 }
245 Ok(RaftRpc::InstallSnapshotResponse(resp))
246 }
247 // Cluster join — full orchestration in `super::join`.
248 RaftRpc::JoinRequest(req) => Ok(RaftRpc::JoinResponse(self.join_flow(req).await)),
249 // Health check.
250 RaftRpc::Ping(req) => {
251 let topo_version = {
252 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
253 topo.version()
254 };
255 Ok(health::handle_ping(self.node_id, topo_version, &req))
256 }
257 // Topology broadcast.
258 RaftRpc::TopologyUpdate(update) => {
259 let (updated, ack) =
260 health::handle_topology_update(self.node_id, &self.topology, &update);
261 if updated {
262 // Register every member's address with the transport
263 // so raft RPCs to newly-learned peers actually have
264 // a destination. Without this, a node that joined
265 // early and then learned about a later joiner via
266 // broadcast would hold a stale peer set in its
267 // transport and AppendEntries to the new peer would
268 // fail until the circuit breaker opened permanently.
269 for node in &update.nodes {
270 if node.node_id == self.node_id {
271 continue;
272 }
273 match node.addr.parse::<std::net::SocketAddr>() {
274 Ok(addr) => self.transport.register_peer(node.node_id, addr),
275 Err(e) => tracing::warn!(
276 node_id = node.node_id,
277 addr = %node.addr,
278 error = %e,
279 "topology update contains unparseable peer address; skipping register_peer"
280 ),
281 }
282 }
283 // Persist the adopted topology so a subsequent
284 // restart reads the latest member set from catalog
285 // rather than the stale snapshot taken at join
286 // time. Persist only when a catalog is attached;
287 // failures are logged but never propagate — the
288 // next TopologyUpdate will retry.
289 if let Some(catalog) = self.catalog.as_ref() {
290 let snap = self
291 .topology
292 .read()
293 .unwrap_or_else(|p| p.into_inner())
294 .clone();
295 if let Err(e) = catalog.save_topology(&snap) {
296 tracing::warn!(error = %e, "failed to persist topology update to catalog");
297 }
298 }
299 }
300 Ok(ack)
301 }
302 // Physical-plan execution (C-β) — execute locally via the PlanExecutor,
303 // skipping SQL re-planning entirely.
304 RaftRpc::ExecuteRequest(req) => {
305 let resp = self.plan_executor.execute_plan(req).await;
306 Ok(RaftRpc::ExecuteResponse(resp))
307 }
308 // Metadata-group proposal forwarding — apply locally if
309 // we're the metadata leader, otherwise return a
310 // NotLeader response with a leader hint so the
311 // forwarder can chase the redirect.
312 RaftRpc::MetadataProposeRequest(req) => {
313 let resp = match self.propose_to_metadata_group(req.bytes) {
314 Ok(log_index) => crate::rpc_codec::MetadataProposeResponse::ok(log_index),
315 Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
316 leader_hint,
317 })) => {
318 crate::rpc_codec::MetadataProposeResponse::err("not leader", leader_hint)
319 }
320 Err(e) => crate::rpc_codec::MetadataProposeResponse::err(e.to_string(), None),
321 };
322 Ok(RaftRpc::MetadataProposeResponse(resp))
323 }
324 // Data-group proposal forwarding — apply locally if we are the
325 // data-group leader for the given vshard, otherwise return
326 // NotLeader with a hint so the forwarder can chase the redirect.
327 RaftRpc::DataProposeRequest(req) => {
328 let resp = {
329 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
330 match mr.propose(req.vshard_id, req.bytes) {
331 Ok((group_id, log_index)) => {
332 crate::rpc_codec::DataProposeResponse::ok(group_id, log_index)
333 }
334 Err(crate::error::ClusterError::Raft(
335 nodedb_raft::RaftError::NotLeader { leader_hint },
336 )) => crate::rpc_codec::DataProposeResponse::err("not leader", leader_hint),
337 Err(e) => crate::rpc_codec::DataProposeResponse::err(e.to_string(), None),
338 }
339 };
340 Ok(RaftRpc::DataProposeResponse(resp))
341 }
342 // VShardEnvelope — dispatch to registered handler (Event Plane, etc.).
343 RaftRpc::VShardEnvelope(bytes) => {
344 if let Some(ref handler) = self.vshard_handler {
345 let response_bytes = handler(bytes).await?;
346 Ok(RaftRpc::VShardEnvelope(response_bytes))
347 } else {
348 Err(ClusterError::Transport {
349 detail: "VShardEnvelope handler not configured".into(),
350 })
351 }
352 }
353 other => Err(ClusterError::Transport {
354 detail: format!("unexpected request type in RPC handler: {other:?}"),
355 }),
356 }
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use crate::multi_raft::MultiRaft;
364 use crate::routing::RoutingTable;
365 use crate::topology::{ClusterTopology, NodeInfo, NodeState};
366 use crate::transport::NexarTransport;
367 use nodedb_raft::message::LogEntry;
368 use std::sync::{Arc, RwLock};
369 use std::time::{Duration, Instant};
370
371 /// No-op applier for tests that don't care about state machine output.
372 struct NoopApplier;
373 impl CommitApplier for NoopApplier {
374 fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
375 entries.last().map(|e| e.index).unwrap_or(0)
376 }
377 }
378
379 fn make_transport(node_id: u64) -> Arc<NexarTransport> {
380 Arc::new(
381 NexarTransport::new(
382 node_id,
383 "127.0.0.1:0".parse().unwrap(),
384 crate::transport::credentials::TransportCredentials::Insecure,
385 )
386 .unwrap(),
387 )
388 }
389
390 #[tokio::test]
391 async fn rpc_handler_routes_append_entries() {
392 let dir = tempfile::tempdir().unwrap();
393 let transport = make_transport(1);
394 let rt = RoutingTable::uniform(1, &[1], 1);
395 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
396 mr.add_group(0, vec![]).unwrap();
397
398 for node in mr.groups_mut().values_mut() {
399 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
400 }
401
402 let topo = Arc::new(RwLock::new(ClusterTopology::new()));
403 let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
404
405 raft_loop.do_tick();
406 tokio::time::sleep(Duration::from_millis(20)).await;
407
408 let req = RaftRpc::AppendEntriesRequest(nodedb_raft::AppendEntriesRequest {
409 term: 99,
410 leader_id: 2,
411 prev_log_index: 0,
412 prev_log_term: 0,
413 entries: vec![],
414 leader_commit: 0,
415 group_id: 0,
416 });
417
418 let resp = raft_loop.handle_rpc(req).await.unwrap();
419 match resp {
420 RaftRpc::AppendEntriesResponse(r) => {
421 assert!(r.success);
422 assert_eq!(r.term, 99);
423 }
424 other => panic!("expected AppendEntriesResponse, got {other:?}"),
425 }
426 }
427
428 #[tokio::test]
429 async fn rpc_handler_routes_request_vote() {
430 let dir = tempfile::tempdir().unwrap();
431 let transport = make_transport(1);
432 let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
433 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
434 mr.add_group(0, vec![2, 3]).unwrap();
435
436 let topo = Arc::new(RwLock::new(ClusterTopology::new()));
437 let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
438
439 let req = RaftRpc::RequestVoteRequest(nodedb_raft::RequestVoteRequest {
440 term: 1,
441 candidate_id: 2,
442 last_log_index: 0,
443 last_log_term: 0,
444 group_id: 0,
445 });
446
447 let resp = raft_loop.handle_rpc(req).await.unwrap();
448 match resp {
449 RaftRpc::RequestVoteResponse(r) => {
450 assert!(r.vote_granted);
451 assert_eq!(r.term, 1);
452 }
453 other => panic!("expected RequestVoteResponse, got {other:?}"),
454 }
455 }
456
457 /// JoinRequest on a freshly-bootstrapped single-seed RaftLoop is
458 /// admitted locally: this node is leader of every group, so
459 /// `AddLearner` conf-changes are proposed and (because the groups
460 /// are single-voter) commit instantly.
461 #[tokio::test]
462 async fn rpc_handler_accepts_join_on_bootstrap_seed() {
463 let dir = tempfile::tempdir().unwrap();
464 let transport = make_transport(1);
465 // uniform(2, ...) creates metadata group 0 + data groups 1 and 2.
466 let rt = RoutingTable::uniform(2, &[1], 1);
467 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
468 mr.add_group(0, vec![]).unwrap();
469 mr.add_group(1, vec![]).unwrap();
470 mr.add_group(2, vec![]).unwrap();
471 // Force immediate election so both groups reach Leader before
472 // the join flow proposes AddLearner.
473 for node in mr.groups_mut().values_mut() {
474 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
475 }
476
477 let mut topology = ClusterTopology::new();
478 topology.add_node(NodeInfo::new(
479 1,
480 "127.0.0.1:9400".parse().unwrap(),
481 NodeState::Active,
482 ));
483 let topo = Arc::new(RwLock::new(topology));
484
485 let raft_loop = RaftLoop::new(mr, transport, topo.clone(), NoopApplier);
486 raft_loop.do_tick();
487 tokio::time::sleep(Duration::from_millis(20)).await;
488
489 let req = RaftRpc::JoinRequest(crate::rpc_codec::JoinRequest {
490 node_id: 2,
491 listen_addr: "127.0.0.1:9401".into(),
492 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
493 spiffe_id: None,
494 spki_pin: None,
495 });
496
497 let resp = raft_loop.handle_rpc(req).await.unwrap();
498 match resp {
499 RaftRpc::JoinResponse(r) => {
500 assert!(
501 r.success,
502 "join should succeed on bootstrap seed: {}",
503 r.error
504 );
505 assert_eq!(r.nodes.len(), 2);
506 // uniform(2, ...) creates 3 groups (metadata + 2 data).
507 assert_eq!(r.groups.len(), 3);
508 assert_eq!(r.vshard_to_group.len(), 1024);
509 // The new node should appear as a learner on every group,
510 // not as a voter — voter promotion happens asynchronously
511 // via the tick loop's promotion phase.
512 for g in &r.groups {
513 assert!(
514 g.learners.contains(&2),
515 "expected node 2 as learner in group {}, got learners={:?} members={:?}",
516 g.group_id,
517 g.learners,
518 g.members
519 );
520 }
521 }
522 other => panic!("expected JoinResponse, got {other:?}"),
523 }
524
525 let topo_guard = topo.read().unwrap();
526 assert_eq!(topo_guard.node_count(), 2);
527 assert!(topo_guard.contains(2));
528 }
529
530 #[test]
531 fn decide_join_self_leader_admits() {
532 assert_eq!(
533 decide_join(7, 7, Some("10.0.0.7:9400".into())),
534 JoinDecision::Admit
535 );
536 }
537
538 #[test]
539 fn decide_join_no_leader_yet_admits() {
540 assert_eq!(decide_join(0, 7, None), JoinDecision::Admit);
541 }
542
543 #[test]
544 fn decide_join_other_leader_redirects() {
545 assert_eq!(
546 decide_join(1, 7, Some("10.0.0.1:9400".into())),
547 JoinDecision::Redirect {
548 leader_addr: "10.0.0.1:9400".into()
549 }
550 );
551 }
552
553 #[test]
554 fn decide_join_other_leader_unknown_addr_still_redirects() {
555 assert_eq!(
556 decide_join(1, 7, None),
557 JoinDecision::Redirect {
558 leader_addr: String::new()
559 }
560 );
561 }
562}