nodedb_cluster/raft_loop/
handle_rpc.rs1use crate::error::{ClusterError, Result};
9use crate::forward::PlanExecutor;
10use crate::health;
11use crate::rpc_codec::RaftRpc;
12use crate::transport::RaftRpcHandler;
13
14use super::loop_core::{CommitApplier, RaftLoop};
15
16pub(super) const TOPOLOGY_GROUP_ID: u64 = 0;
23
24#[derive(Debug, PartialEq, Eq)]
29pub(super) enum JoinDecision {
30 Admit,
33 Redirect { leader_addr: String },
36}
37
38pub(super) fn decide_join(
51 group0_leader: u64,
52 self_node_id: u64,
53 leader_addr: Option<String>,
54) -> JoinDecision {
55 if group0_leader == 0 || group0_leader == self_node_id {
56 JoinDecision::Admit
57 } else {
58 JoinDecision::Redirect {
59 leader_addr: leader_addr.unwrap_or_default(),
60 }
61 }
62}
63
64impl<A: CommitApplier, P: PlanExecutor> RaftRpcHandler for RaftLoop<A, P> {
65 async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
66 match rpc {
67 RaftRpc::AppendEntriesRequest(req) => {
69 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
70 let resp = mr.handle_append_entries(&req)?;
71 Ok(RaftRpc::AppendEntriesResponse(resp))
72 }
73 RaftRpc::RequestVoteRequest(req) => {
74 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
75 let resp = mr.handle_request_vote(&req)?;
76 Ok(RaftRpc::RequestVoteResponse(resp))
77 }
78 RaftRpc::InstallSnapshotRequest(req) => {
79 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
80 let resp = mr.handle_install_snapshot(&req)?;
81 Ok(RaftRpc::InstallSnapshotResponse(resp))
82 }
83 RaftRpc::JoinRequest(req) => Ok(RaftRpc::JoinResponse(self.join_flow(req).await)),
85 RaftRpc::Ping(req) => {
87 let topo_version = {
88 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
89 topo.version()
90 };
91 Ok(health::handle_ping(self.node_id, topo_version, &req))
92 }
93 RaftRpc::TopologyUpdate(update) => {
95 let (updated, ack) =
96 health::handle_topology_update(self.node_id, &self.topology, &update);
97 if updated {
98 for node in &update.nodes {
106 if node.node_id == self.node_id {
107 continue;
108 }
109 match node.addr.parse::<std::net::SocketAddr>() {
110 Ok(addr) => self.transport.register_peer(node.node_id, addr),
111 Err(e) => tracing::warn!(
112 node_id = node.node_id,
113 addr = %node.addr,
114 error = %e,
115 "topology update contains unparseable peer address; skipping register_peer"
116 ),
117 }
118 }
119 if let Some(catalog) = self.catalog.as_ref() {
126 let snap = self
127 .topology
128 .read()
129 .unwrap_or_else(|p| p.into_inner())
130 .clone();
131 if let Err(e) = catalog.save_topology(&snap) {
132 tracing::warn!(error = %e, "failed to persist topology update to catalog");
133 }
134 }
135 }
136 Ok(ack)
137 }
138 RaftRpc::ExecuteRequest(req) => {
141 let resp = self.plan_executor.execute_plan(req).await;
142 Ok(RaftRpc::ExecuteResponse(resp))
143 }
144 RaftRpc::MetadataProposeRequest(req) => {
149 let resp = match self.propose_to_metadata_group(req.bytes) {
150 Ok(log_index) => crate::rpc_codec::MetadataProposeResponse::ok(log_index),
151 Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
152 leader_hint,
153 })) => {
154 crate::rpc_codec::MetadataProposeResponse::err("not leader", leader_hint)
155 }
156 Err(e) => crate::rpc_codec::MetadataProposeResponse::err(e.to_string(), None),
157 };
158 Ok(RaftRpc::MetadataProposeResponse(resp))
159 }
160 RaftRpc::VShardEnvelope(bytes) => {
162 if let Some(ref handler) = self.vshard_handler {
163 let response_bytes = handler(bytes).await?;
164 Ok(RaftRpc::VShardEnvelope(response_bytes))
165 } else {
166 Err(ClusterError::Transport {
167 detail: "VShardEnvelope handler not configured".into(),
168 })
169 }
170 }
171 other => Err(ClusterError::Transport {
172 detail: format!("unexpected request type in RPC handler: {other:?}"),
173 }),
174 }
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use crate::multi_raft::MultiRaft;
182 use crate::routing::RoutingTable;
183 use crate::topology::{ClusterTopology, NodeInfo, NodeState};
184 use crate::transport::NexarTransport;
185 use nodedb_raft::message::LogEntry;
186 use std::sync::{Arc, RwLock};
187 use std::time::{Duration, Instant};
188
189 struct NoopApplier;
191 impl CommitApplier for NoopApplier {
192 fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
193 entries.last().map(|e| e.index).unwrap_or(0)
194 }
195 }
196
197 fn make_transport(node_id: u64) -> Arc<NexarTransport> {
198 Arc::new(
199 NexarTransport::new(
200 node_id,
201 "127.0.0.1:0".parse().unwrap(),
202 crate::transport::credentials::TransportCredentials::Insecure,
203 )
204 .unwrap(),
205 )
206 }
207
208 #[tokio::test]
209 async fn rpc_handler_routes_append_entries() {
210 let dir = tempfile::tempdir().unwrap();
211 let transport = make_transport(1);
212 let rt = RoutingTable::uniform(1, &[1], 1);
213 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
214 mr.add_group(0, vec![]).unwrap();
215
216 for node in mr.groups_mut().values_mut() {
217 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
218 }
219
220 let topo = Arc::new(RwLock::new(ClusterTopology::new()));
221 let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
222
223 raft_loop.do_tick();
224 tokio::time::sleep(Duration::from_millis(20)).await;
225
226 let req = RaftRpc::AppendEntriesRequest(nodedb_raft::AppendEntriesRequest {
227 term: 99,
228 leader_id: 2,
229 prev_log_index: 0,
230 prev_log_term: 0,
231 entries: vec![],
232 leader_commit: 0,
233 group_id: 0,
234 });
235
236 let resp = raft_loop.handle_rpc(req).await.unwrap();
237 match resp {
238 RaftRpc::AppendEntriesResponse(r) => {
239 assert!(r.success);
240 assert_eq!(r.term, 99);
241 }
242 other => panic!("expected AppendEntriesResponse, got {other:?}"),
243 }
244 }
245
246 #[tokio::test]
247 async fn rpc_handler_routes_request_vote() {
248 let dir = tempfile::tempdir().unwrap();
249 let transport = make_transport(1);
250 let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
251 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
252 mr.add_group(0, vec![2, 3]).unwrap();
253
254 let topo = Arc::new(RwLock::new(ClusterTopology::new()));
255 let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
256
257 let req = RaftRpc::RequestVoteRequest(nodedb_raft::RequestVoteRequest {
258 term: 1,
259 candidate_id: 2,
260 last_log_index: 0,
261 last_log_term: 0,
262 group_id: 0,
263 });
264
265 let resp = raft_loop.handle_rpc(req).await.unwrap();
266 match resp {
267 RaftRpc::RequestVoteResponse(r) => {
268 assert!(r.vote_granted);
269 assert_eq!(r.term, 1);
270 }
271 other => panic!("expected RequestVoteResponse, got {other:?}"),
272 }
273 }
274
275 #[tokio::test]
280 async fn rpc_handler_accepts_join_on_bootstrap_seed() {
281 let dir = tempfile::tempdir().unwrap();
282 let transport = make_transport(1);
283 let rt = RoutingTable::uniform(2, &[1], 1);
284 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
285 mr.add_group(0, vec![]).unwrap();
286 mr.add_group(1, vec![]).unwrap();
287 for node in mr.groups_mut().values_mut() {
290 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
291 }
292
293 let mut topology = ClusterTopology::new();
294 topology.add_node(NodeInfo::new(
295 1,
296 "127.0.0.1:9400".parse().unwrap(),
297 NodeState::Active,
298 ));
299 let topo = Arc::new(RwLock::new(topology));
300
301 let raft_loop = RaftLoop::new(mr, transport, topo.clone(), NoopApplier);
302 raft_loop.do_tick();
303 tokio::time::sleep(Duration::from_millis(20)).await;
304
305 let req = RaftRpc::JoinRequest(crate::rpc_codec::JoinRequest {
306 node_id: 2,
307 listen_addr: "127.0.0.1:9401".into(),
308 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
309 });
310
311 let resp = raft_loop.handle_rpc(req).await.unwrap();
312 match resp {
313 RaftRpc::JoinResponse(r) => {
314 assert!(
315 r.success,
316 "join should succeed on bootstrap seed: {}",
317 r.error
318 );
319 assert_eq!(r.nodes.len(), 2);
320 assert_eq!(r.groups.len(), 2);
321 assert_eq!(r.vshard_to_group.len(), 1024);
322 for g in &r.groups {
326 assert!(
327 g.learners.contains(&2),
328 "expected node 2 as learner in group {}, got learners={:?} members={:?}",
329 g.group_id,
330 g.learners,
331 g.members
332 );
333 }
334 }
335 other => panic!("expected JoinResponse, got {other:?}"),
336 }
337
338 let topo_guard = topo.read().unwrap();
339 assert_eq!(topo_guard.node_count(), 2);
340 assert!(topo_guard.contains(2));
341 }
342
343 #[test]
344 fn decide_join_self_leader_admits() {
345 assert_eq!(
346 decide_join(7, 7, Some("10.0.0.7:9400".into())),
347 JoinDecision::Admit
348 );
349 }
350
351 #[test]
352 fn decide_join_no_leader_yet_admits() {
353 assert_eq!(decide_join(0, 7, None), JoinDecision::Admit);
354 }
355
356 #[test]
357 fn decide_join_other_leader_redirects() {
358 assert_eq!(
359 decide_join(1, 7, Some("10.0.0.1:9400".into())),
360 JoinDecision::Redirect {
361 leader_addr: "10.0.0.1:9400".into()
362 }
363 );
364 }
365
366 #[test]
367 fn decide_join_other_leader_unknown_addr_still_redirects() {
368 assert_eq!(
369 decide_join(1, 7, None),
370 JoinDecision::Redirect {
371 leader_addr: String::new()
372 }
373 );
374 }
375}