Skip to main content

nodedb_cluster/raft_loop/
handle_rpc.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Inbound Raft RPC dispatch — `impl RaftRpcHandler for RaftLoop`.
4//!
5//! Each RPC variant is either handled inline (Raft consensus RPCs that
6//! just lock `MultiRaft`) or delegated to a helper module — health,
7//! forwarding, VShard envelopes, or (for `JoinRequest`) the async
8//! orchestration in [`super::join`].
9
10use crate::error::{ClusterError, Result};
11use crate::forward::PlanExecutor;
12use crate::health;
13use crate::rpc_codec::RaftRpc;
14use crate::transport::RaftRpcHandler;
15
16use super::loop_core::{CommitApplier, RaftLoop};
17
18/// The Raft group that owns cluster topology / membership.
19///
20/// Group 0 is the "metadata" group and is the authoritative source of
21/// truth for who is in the cluster. Joins must be processed by its
22/// leader; this constant is also used by the join orchestration in
23/// [`super::join`].
24pub(super) const TOPOLOGY_GROUP_ID: u64 = 0;
25
26/// Outcome of the leader-check phase of the join flow.
27///
28/// Extracted as a pure enum so the decision logic can be unit-tested
29/// without spinning up a real `MultiRaft` just to observe its leader id.
30#[derive(Debug, PartialEq, Eq)]
31pub(super) enum JoinDecision {
32    /// This node is the group-0 leader (or the founding seed with no leader
33    /// elected yet). Admit the join locally.
34    Admit,
35    /// Another node is the group-0 leader. The client should retry at
36    /// `leader_addr`.
37    Redirect { leader_addr: String },
38}
39
40/// Pure decision: given the observed group-0 leader, this node's id, and
41/// the leader's address (as known to the local topology), should we
42/// admit the join or redirect?
43///
44/// - `group0_leader == 0` means "no elected leader yet". On a freshly
45///   bootstrapped single-seed cluster this is normal — the founding node
46///   is the only possible leader, so we accept.
47/// - `group0_leader == self_node_id` means we are the leader — accept.
48/// - Otherwise redirect. If the leader's address is unknown to topology
49///   (an operator error that shouldn't happen in practice), we still
50///   redirect with an empty string so the client at least sees the
51///   `"not leader"` prefix and can decide to try the next seed.
52pub(super) fn decide_join(
53    group0_leader: u64,
54    self_node_id: u64,
55    leader_addr: Option<String>,
56) -> JoinDecision {
57    if group0_leader == 0 || group0_leader == self_node_id {
58        JoinDecision::Admit
59    } else {
60        JoinDecision::Redirect {
61            leader_addr: leader_addr.unwrap_or_default(),
62        }
63    }
64}
65
66impl<A: CommitApplier, P: PlanExecutor> RaftRpcHandler for RaftLoop<A, P> {
67    async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
68        match rpc {
69            // Raft consensus RPCs — lock MultiRaft (sync, never across await).
70            RaftRpc::AppendEntriesRequest(req) => {
71                let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
72                let resp = mr.handle_append_entries(&req)?;
73                Ok(RaftRpc::AppendEntriesResponse(resp))
74            }
75            RaftRpc::RequestVoteRequest(req) => {
76                let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
77                let resp = mr.handle_request_vote(&req)?;
78                Ok(RaftRpc::RequestVoteResponse(resp))
79            }
80            RaftRpc::InstallSnapshotRequest(req) => {
81                // Validate snapshot framing for any non-empty chunk.
82                // Empty data is the bootstrap stub (no engine data yet); skip
83                // framing in that case. When a real engine ships data it calls
84                // `encode_snapshot_chunk` on the sender side and enforcement
85                // happens here automatically.
86                if !req.data.is_empty() {
87                    // Short-circuit immediately if this chunk has already been
88                    // quarantined after two consecutive CRC failures. Without
89                    // this check a quarantined chunk would re-attempt the
90                    // (always-failing) decode on every incoming RPC and never
91                    // surface a stable, operator-visible error.
92                    if let Some(ref hook) = self.snapshot_quarantine_hook
93                        && hook.is_quarantined(req.group_id, req.last_included_index)
94                    {
95                        return Err(crate::error::ClusterError::Codec {
96                            detail: format!(
97                                "InstallSnapshot chunk quarantined: group={} index={}",
98                                req.group_id, req.last_included_index
99                            ),
100                        });
101                    }
102
103                    match nodedb_raft::decode_snapshot_chunk(&req.data) {
104                        Ok(_) => {
105                            // Successful decode — reset any prior strike so a
106                            // single transient CRC error does not permanently
107                            // count against a healthy peer.
108                            if let Some(ref hook) = self.snapshot_quarantine_hook {
109                                hook.record_success(req.group_id, req.last_included_index);
110                            }
111                        }
112                        Err(e) => {
113                            let is_crc_class = matches!(
114                                e,
115                                nodedb_raft::snapshot_framing::SnapshotFramingError::CrcMismatch {
116                                    ..
117                                }
118                                    | nodedb_raft::snapshot_framing::SnapshotFramingError::Truncated(
119                                        _
120                                    )
121                            );
122                            if is_crc_class && let Some(ref hook) = self.snapshot_quarantine_hook {
123                                hook.record_failure(
124                                    req.group_id,
125                                    req.last_included_index,
126                                    &e.to_string(),
127                                );
128                            }
129                            return Err(crate::error::ClusterError::Codec {
130                                detail: format!("InstallSnapshot framing: {e}"),
131                            });
132                        }
133                    }
134                }
135
136                let last_included_index = req.last_included_index;
137                let group_id = req.group_id;
138
139                // Route through the chunk accumulator when a data directory is
140                // configured. The accumulator writes chunks to a `.partial` file,
141                // validates the full CRC on the final chunk, and then calls
142                // `mr.handle_install_snapshot` after atomic rename.
143                //
144                // When `data_dir` is `None` (unit tests that don't set a data
145                // directory) fall through to the original direct call so test
146                // coverage for Raft state-machine transitions is unaffected.
147                //
148                // Quarantine accounting for offset regression and CRC errors is
149                // preserved: the `SnapshotOffsetRegression` and
150                // `SnapshotCrcMismatch` error paths in the receiver both surface
151                // as `ClusterError` variants that are propagated here.
152                if let Some(ref data_dir) = self.data_dir {
153                    match crate::install_snapshot::receiver::handle_chunk(
154                        &req,
155                        &self.partial_snapshots,
156                        data_dir,
157                        &self.multi_raft,
158                    )
159                    .await
160                    {
161                        Ok(crate::install_snapshot::ChunkOutcome::Committed(snap_resp)) => {
162                            // Final chunk committed — bump watcher for metadata group.
163                            if group_id == TOPOLOGY_GROUP_ID {
164                                self.group_watchers.bump(group_id, last_included_index);
165                            }
166                            return Ok(RaftRpc::InstallSnapshotResponse(snap_resp));
167                        }
168                        Ok(crate::install_snapshot::ChunkOutcome::Pending) => {
169                            // Non-final chunk — pass a done=false stub to MultiRaft so
170                            // it resets its election timeout and returns the current term.
171                            let pending_req = nodedb_raft::InstallSnapshotRequest {
172                                term: req.term,
173                                leader_id: req.leader_id,
174                                last_included_index: req.last_included_index,
175                                last_included_term: req.last_included_term,
176                                offset: req.offset,
177                                data: vec![],
178                                done: false,
179                                group_id,
180                                total_size: 0,
181                            };
182                            let resp = {
183                                let mut mr =
184                                    self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
185                                mr.handle_install_snapshot(&pending_req)?
186                            };
187                            return Ok(RaftRpc::InstallSnapshotResponse(resp));
188                        }
189                        Err(e @ crate::error::ClusterError::SnapshotOffsetRegression { .. }) => {
190                            // Record the regression as a quarantine strike so the
191                            // sender knows to retransmit from offset 0.
192                            if let Some(ref hook) = self.snapshot_quarantine_hook {
193                                hook.record_failure(group_id, last_included_index, &e.to_string());
194                            }
195                            // Reset partial state so the next offset-0 chunk starts fresh.
196                            self.partial_snapshots
197                                .lock()
198                                .unwrap_or_else(|p| p.into_inner())
199                                .remove(&group_id);
200                            return Err(e);
201                        }
202                        Err(e @ crate::error::ClusterError::SnapshotCrcMismatch { .. }) => {
203                            if let Some(ref hook) = self.snapshot_quarantine_hook {
204                                hook.record_failure(group_id, last_included_index, &e.to_string());
205                            }
206                            return Err(e);
207                        }
208                        Err(e) => return Err(e),
209                    }
210                }
211
212                // Fallback: no data_dir — direct call (unit test path).
213                let resp = {
214                    let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
215                    mr.handle_install_snapshot(&req)?
216                };
217                // Watcher contract: `applied_index` means "state visible
218                // on this node up to index N", NOT "raft has advanced to
219                // N". Bumping the watcher must therefore mirror actual
220                // state-machine progress.
221                //
222                // - Metadata group: `mr.handle_install_snapshot` restores
223                //   the metadata state machine synchronously before
224                //   returning, so the watcher can be bumped here — state
225                //   IS visible at `last_included_index`.
226                //
227                // - Data groups: snapshot install fast-forwards raft's
228                //   `last_applied` but does NOT restore the data-plane
229                //   state machine (no committed entries are produced for
230                //   `run_apply_loop`, and there is currently no
231                //   data-group state-machine snapshot restore path).
232                //   Bumping the watcher here would wake waiters that
233                //   then read missing state — silent data-loss-shaped
234                //   bug. The data-group watcher is bumped only by the
235                //   host crate's apply loop after the SPSC round-trip
236                //   completes; that path is the single source of truth
237                //   for "state visible".
238                //
239                // When data-group state-machine snapshots are
240                // implemented, the restore path must bump the watcher
241                // itself — not this handler.
242                if group_id == TOPOLOGY_GROUP_ID {
243                    self.group_watchers.bump(group_id, last_included_index);
244                }
245                Ok(RaftRpc::InstallSnapshotResponse(resp))
246            }
247            // Cluster join — full orchestration in `super::join`.
248            RaftRpc::JoinRequest(req) => Ok(RaftRpc::JoinResponse(self.join_flow(req).await)),
249            // Health check.
250            RaftRpc::Ping(req) => {
251                let topo_version = {
252                    let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
253                    topo.version()
254                };
255                Ok(health::handle_ping(self.node_id, topo_version, &req))
256            }
257            // Topology broadcast.
258            RaftRpc::TopologyUpdate(update) => {
259                let (updated, ack) =
260                    health::handle_topology_update(self.node_id, &self.topology, &update);
261                if updated {
262                    // Register every member's address with the transport
263                    // so raft RPCs to newly-learned peers actually have
264                    // a destination. Without this, a node that joined
265                    // early and then learned about a later joiner via
266                    // broadcast would hold a stale peer set in its
267                    // transport and AppendEntries to the new peer would
268                    // fail until the circuit breaker opened permanently.
269                    for node in &update.nodes {
270                        if node.node_id == self.node_id {
271                            continue;
272                        }
273                        match node.addr.parse::<std::net::SocketAddr>() {
274                            Ok(addr) => self.transport.register_peer(node.node_id, addr),
275                            Err(e) => tracing::warn!(
276                                node_id = node.node_id,
277                                addr = %node.addr,
278                                error = %e,
279                                "topology update contains unparseable peer address; skipping register_peer"
280                            ),
281                        }
282                    }
283                    // Persist the adopted topology so a subsequent
284                    // restart reads the latest member set from catalog
285                    // rather than the stale snapshot taken at join
286                    // time. Persist only when a catalog is attached;
287                    // failures are logged but never propagate — the
288                    // next TopologyUpdate will retry.
289                    if let Some(catalog) = self.catalog.as_ref() {
290                        let snap = self
291                            .topology
292                            .read()
293                            .unwrap_or_else(|p| p.into_inner())
294                            .clone();
295                        if let Err(e) = catalog.save_topology(&snap) {
296                            tracing::warn!(error = %e, "failed to persist topology update to catalog");
297                        }
298                    }
299                }
300                Ok(ack)
301            }
302            // Physical-plan execution (C-β) — execute locally via the PlanExecutor,
303            // skipping SQL re-planning entirely.
304            RaftRpc::ExecuteRequest(req) => {
305                let resp = self.plan_executor.execute_plan(req).await;
306                Ok(RaftRpc::ExecuteResponse(resp))
307            }
308            // Metadata-group proposal forwarding — apply locally if
309            // we're the metadata leader, otherwise return a
310            // NotLeader response with a leader hint so the
311            // forwarder can chase the redirect.
312            RaftRpc::MetadataProposeRequest(req) => {
313                let resp = match self.propose_to_metadata_group(req.bytes) {
314                    Ok(log_index) => crate::rpc_codec::MetadataProposeResponse::ok(log_index),
315                    Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
316                        leader_hint,
317                    })) => {
318                        crate::rpc_codec::MetadataProposeResponse::err("not leader", leader_hint)
319                    }
320                    Err(e) => crate::rpc_codec::MetadataProposeResponse::err(e.to_string(), None),
321                };
322                Ok(RaftRpc::MetadataProposeResponse(resp))
323            }
324            // Data-group proposal forwarding — apply locally if we are the
325            // data-group leader for the given vshard, otherwise return
326            // NotLeader with a hint so the forwarder can chase the redirect.
327            RaftRpc::DataProposeRequest(req) => {
328                let resp = {
329                    let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
330                    match mr.propose(req.vshard_id, req.bytes) {
331                        Ok((group_id, log_index)) => {
332                            crate::rpc_codec::DataProposeResponse::ok(group_id, log_index)
333                        }
334                        Err(crate::error::ClusterError::Raft(
335                            nodedb_raft::RaftError::NotLeader { leader_hint },
336                        )) => crate::rpc_codec::DataProposeResponse::err("not leader", leader_hint),
337                        Err(e) => crate::rpc_codec::DataProposeResponse::err(e.to_string(), None),
338                    }
339                };
340                Ok(RaftRpc::DataProposeResponse(resp))
341            }
342            // VShardEnvelope — dispatch to registered handler (Event Plane, etc.).
343            RaftRpc::VShardEnvelope(bytes) => {
344                if let Some(ref handler) = self.vshard_handler {
345                    let response_bytes = handler(bytes).await?;
346                    Ok(RaftRpc::VShardEnvelope(response_bytes))
347                } else {
348                    Err(ClusterError::Transport {
349                        detail: "VShardEnvelope handler not configured".into(),
350                    })
351                }
352            }
353            other => Err(ClusterError::Transport {
354                detail: format!("unexpected request type in RPC handler: {other:?}"),
355            }),
356        }
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use crate::multi_raft::MultiRaft;
364    use crate::routing::RoutingTable;
365    use crate::topology::{ClusterTopology, NodeInfo, NodeState};
366    use crate::transport::NexarTransport;
367    use nodedb_raft::message::LogEntry;
368    use std::sync::{Arc, RwLock};
369    use std::time::{Duration, Instant};
370
371    /// No-op applier for tests that don't care about state machine output.
372    struct NoopApplier;
373    impl CommitApplier for NoopApplier {
374        fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
375            entries.last().map(|e| e.index).unwrap_or(0)
376        }
377    }
378
379    fn make_transport(node_id: u64) -> Arc<NexarTransport> {
380        Arc::new(
381            NexarTransport::new(
382                node_id,
383                "127.0.0.1:0".parse().unwrap(),
384                crate::transport::credentials::TransportCredentials::Insecure,
385            )
386            .unwrap(),
387        )
388    }
389
390    #[tokio::test]
391    async fn rpc_handler_routes_append_entries() {
392        let dir = tempfile::tempdir().unwrap();
393        let transport = make_transport(1);
394        let rt = RoutingTable::uniform(1, &[1], 1);
395        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
396        mr.add_group(0, vec![]).unwrap();
397
398        for node in mr.groups_mut().values_mut() {
399            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
400        }
401
402        let topo = Arc::new(RwLock::new(ClusterTopology::new()));
403        let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
404
405        raft_loop.do_tick();
406        tokio::time::sleep(Duration::from_millis(20)).await;
407
408        let req = RaftRpc::AppendEntriesRequest(nodedb_raft::AppendEntriesRequest {
409            term: 99,
410            leader_id: 2,
411            prev_log_index: 0,
412            prev_log_term: 0,
413            entries: vec![],
414            leader_commit: 0,
415            group_id: 0,
416        });
417
418        let resp = raft_loop.handle_rpc(req).await.unwrap();
419        match resp {
420            RaftRpc::AppendEntriesResponse(r) => {
421                assert!(r.success);
422                assert_eq!(r.term, 99);
423            }
424            other => panic!("expected AppendEntriesResponse, got {other:?}"),
425        }
426    }
427
428    #[tokio::test]
429    async fn rpc_handler_routes_request_vote() {
430        let dir = tempfile::tempdir().unwrap();
431        let transport = make_transport(1);
432        let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
433        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
434        mr.add_group(0, vec![2, 3]).unwrap();
435
436        let topo = Arc::new(RwLock::new(ClusterTopology::new()));
437        let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
438
439        let req = RaftRpc::RequestVoteRequest(nodedb_raft::RequestVoteRequest {
440            term: 1,
441            candidate_id: 2,
442            last_log_index: 0,
443            last_log_term: 0,
444            group_id: 0,
445        });
446
447        let resp = raft_loop.handle_rpc(req).await.unwrap();
448        match resp {
449            RaftRpc::RequestVoteResponse(r) => {
450                assert!(r.vote_granted);
451                assert_eq!(r.term, 1);
452            }
453            other => panic!("expected RequestVoteResponse, got {other:?}"),
454        }
455    }
456
457    /// JoinRequest on a freshly-bootstrapped single-seed RaftLoop is
458    /// admitted locally: this node is leader of every group, so
459    /// `AddLearner` conf-changes are proposed and (because the groups
460    /// are single-voter) commit instantly.
461    #[tokio::test]
462    async fn rpc_handler_accepts_join_on_bootstrap_seed() {
463        let dir = tempfile::tempdir().unwrap();
464        let transport = make_transport(1);
465        // uniform(2, ...) creates metadata group 0 + data groups 1 and 2.
466        let rt = RoutingTable::uniform(2, &[1], 1);
467        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
468        mr.add_group(0, vec![]).unwrap();
469        mr.add_group(1, vec![]).unwrap();
470        mr.add_group(2, vec![]).unwrap();
471        // Force immediate election so both groups reach Leader before
472        // the join flow proposes AddLearner.
473        for node in mr.groups_mut().values_mut() {
474            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
475        }
476
477        let mut topology = ClusterTopology::new();
478        topology.add_node(NodeInfo::new(
479            1,
480            "127.0.0.1:9400".parse().unwrap(),
481            NodeState::Active,
482        ));
483        let topo = Arc::new(RwLock::new(topology));
484
485        let raft_loop = RaftLoop::new(mr, transport, topo.clone(), NoopApplier);
486        raft_loop.do_tick();
487        tokio::time::sleep(Duration::from_millis(20)).await;
488
489        let req = RaftRpc::JoinRequest(crate::rpc_codec::JoinRequest {
490            node_id: 2,
491            listen_addr: "127.0.0.1:9401".into(),
492            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
493            spiffe_id: None,
494            spki_pin: None,
495        });
496
497        let resp = raft_loop.handle_rpc(req).await.unwrap();
498        match resp {
499            RaftRpc::JoinResponse(r) => {
500                assert!(
501                    r.success,
502                    "join should succeed on bootstrap seed: {}",
503                    r.error
504                );
505                assert_eq!(r.nodes.len(), 2);
506                // uniform(2, ...) creates 3 groups (metadata + 2 data).
507                assert_eq!(r.groups.len(), 3);
508                assert_eq!(r.vshard_to_group.len(), 1024);
509                // The new node should appear as a learner on every group,
510                // not as a voter — voter promotion happens asynchronously
511                // via the tick loop's promotion phase.
512                for g in &r.groups {
513                    assert!(
514                        g.learners.contains(&2),
515                        "expected node 2 as learner in group {}, got learners={:?} members={:?}",
516                        g.group_id,
517                        g.learners,
518                        g.members
519                    );
520                }
521            }
522            other => panic!("expected JoinResponse, got {other:?}"),
523        }
524
525        let topo_guard = topo.read().unwrap();
526        assert_eq!(topo_guard.node_count(), 2);
527        assert!(topo_guard.contains(2));
528    }
529
530    #[test]
531    fn decide_join_self_leader_admits() {
532        assert_eq!(
533            decide_join(7, 7, Some("10.0.0.7:9400".into())),
534            JoinDecision::Admit
535        );
536    }
537
538    #[test]
539    fn decide_join_no_leader_yet_admits() {
540        assert_eq!(decide_join(0, 7, None), JoinDecision::Admit);
541    }
542
543    #[test]
544    fn decide_join_other_leader_redirects() {
545        assert_eq!(
546            decide_join(1, 7, Some("10.0.0.1:9400".into())),
547            JoinDecision::Redirect {
548                leader_addr: "10.0.0.1:9400".into()
549            }
550        );
551    }
552
553    #[test]
554    fn decide_join_other_leader_unknown_addr_still_redirects() {
555        assert_eq!(
556            decide_join(1, 7, None),
557            JoinDecision::Redirect {
558                leader_addr: String::new()
559            }
560        );
561    }
562}