nodedb_cluster/raft_loop/join.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Server-side `JoinRequest` orchestration.
4//!
5//! This is the async flow invoked by the `RaftRpc::JoinRequest` arm in
6//! [`super::handle_rpc`]. It turns a remote node's desire to join the
7//! cluster into a series of durable Raft conf-changes and returns a
8//! `JoinResponse` containing everything the joining node needs to
9//! reconstruct its local `MultiRaft` in the `Learner` role.
10//!
11//! ## Flow
12//!
13//! 1. **Leader check.** Snapshot the group-0 leader id and clone the
14//! routing table under a single `MultiRaft` lock. If another node is
15//! the leader, return a redirect response with that node's address.
16//! 2. **Validate address.** Parse `req.listen_addr`. On failure, return
17//! an error response.
18//! 3. **Idempotency / collision check.** If the node id is already in
19//! topology with the same address and is Active, rebuild and return
20//! the current response without any further Raft activity. If the
21//! node id exists with a different address, reject.
22//! 4. **Register transport peer.** Add the new peer address to the
23//! local transport so the leader can immediately send AppendEntries
24//! to the learner-to-be.
25//! 5. **Admit into topology.** Under a short `topology.write()` guard,
26//! call `bootstrap::handle_join_request` — the only side effect is
27//! inserting the new `NodeInfo`. The routing-table clone we took in
28//! step 1 is intentionally *not* reused for the final response; a
29//! fresh clone is taken after step 6 so the response reflects the
30//! post-AddLearner routing state.
31//! 6. **Propose AddLearner on every group.** For each Raft group, take
32//! the `MultiRaft` lock, propose
33//! `ConfChange::AddLearner(new_node_id)`, and record the resulting
34//! log index. Drop the lock between groups. If this node is not the
35//! leader of a particular group the propose will fail with
36//! `NotLeader` — we surface that as a failure response. (For the
37//! 3-node bootstrap case in the integration test the bootstrap seed
38//! leads every group, so this path is exercised end-to-end.)
39//! 7. **Wait for each conf-change to commit.** Poll `commit_index_for`
40//! on each group every 20 ms with a 5-second deadline. A
41//! single-voter group (the bootstrap seed before any voters have
42//! been added) commits instantly. Multi-voter groups wait for
43//! quorum. On timeout, return an error response — the joining node
44//! will retry the whole flow.
45//! 8. **Persist topology + routing to catalog** (when a catalog is
46//! attached). Order matters: Raft log → catalog → response.
47//! 9. **Broadcast TopologyUpdate** to every currently-active peer so
48//! followers learn the new node's address. Fire-and-forget.
49//! 10. **Build and return JoinResponse** with the updated routing
50//! (which now includes the new node as a learner on every group).
51//!
52//! The Raft-level promotion from learner to voter happens asynchronously
53//! in the tick loop (`super::tick::promote_ready_learners`) once the
54//! learner's `match_index` catches up. That avoids blocking the join
55//! handler on replication progress while still completing the
56//! two-phase single-server add.
57
58use std::net::SocketAddr;
59use std::time::{Duration, Instant};
60
61use tracing::{debug, info, warn};
62
63use crate::bootstrap::handle_join_request;
64use crate::conf_change::{ConfChange, ConfChangeType};
65use crate::error::{ClusterError, Result};
66use crate::forward::PlanExecutor;
67use crate::health;
68use crate::multi_raft::GroupStatus;
69use crate::routing::RoutingTable;
70use crate::rpc_codec::{JoinRequest, JoinResponse, LEADER_REDIRECT_PREFIX};
71
72use super::handle_rpc::{JoinDecision, TOPOLOGY_GROUP_ID, decide_join};
73use super::loop_core::{CommitApplier, RaftLoop};
74
75/// Maximum time we wait for any one `AddLearner` conf-change to commit
76/// before giving up and returning a failure response to the joining
77/// node.
78const CONF_CHANGE_COMMIT_TIMEOUT: Duration = Duration::from_secs(5);
79
80/// Polling interval for the commit-wait loop.
81const CONF_CHANGE_POLL_INTERVAL: Duration = Duration::from_millis(20);
82
83impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
84 /// Full server-side `JoinRequest` handler. See module docs for the
85 /// phase-by-phase description.
86 pub(super) async fn join_flow(&self, req: JoinRequest) -> JoinResponse {
87 // 1. Snapshot group-0 leader + clone routing under one lock.
88 let (group0_leader, routing): (u64, RoutingTable) = {
89 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
90 let routing = mr.routing().clone();
91 let leader_id = mr
92 .group_statuses()
93 .into_iter()
94 .find(|s: &GroupStatus| s.group_id == TOPOLOGY_GROUP_ID)
95 .map(|s| s.leader_id)
96 .unwrap_or(0);
97 (leader_id, routing)
98 };
99
100 // Leader check.
101 let leader_addr_hint = if group0_leader != 0 && group0_leader != self.node_id {
102 self.topology
103 .read()
104 .unwrap_or_else(|p| p.into_inner())
105 .get_node(group0_leader)
106 .map(|n| n.addr.clone())
107 } else {
108 None
109 };
110 if let JoinDecision::Redirect { leader_addr } =
111 decide_join(group0_leader, self.node_id, leader_addr_hint)
112 {
113 warn!(
114 joining_node = req.node_id,
115 leader_id = group0_leader,
116 leader_addr = %leader_addr,
117 "JoinRequest received on non-leader; redirecting"
118 );
119 return reject(format!("{LEADER_REDIRECT_PREFIX}{leader_addr}"));
120 }
121
122 // 2. Validate the address.
123 let new_addr: SocketAddr = match req.listen_addr.parse() {
124 Ok(a) => a,
125 Err(e) => {
126 return reject(format!("invalid listen_addr '{}': {e}", req.listen_addr));
127 }
128 };
129
130 // 3. Idempotency / collision check against topology.
131 // `handle_join_request` in step 5 handles the fine-grained
132 // semantics, but we check here first so idempotent re-joins
133 // short-circuit *before* we propose any Raft conf changes.
134 let existing = self
135 .topology
136 .read()
137 .unwrap_or_else(|p| p.into_inner())
138 .get_node(req.node_id)
139 .cloned();
140 if let Some(existing) = existing {
141 if existing.addr != req.listen_addr {
142 return reject(format!(
143 "node_id {} already registered with different address {} (request: {})",
144 req.node_id, existing.addr, req.listen_addr
145 ));
146 }
147 // Same id + same addr → idempotent replay. Just rebuild the
148 // current response from the latest routing state without
149 // proposing any conf changes.
150 debug!(
151 joining_node = req.node_id,
152 "idempotent re-join; returning current cluster state"
153 );
154 return self.build_current_response(&req);
155 }
156
157 // 4. Register transport peer so the leader can reach it.
158 self.transport.register_peer(req.node_id, new_addr);
159
160 // Read the local cluster id from the catalog and echo it
161 // on every successful `JoinResponse`. The joining node
162 // persists this value so its next boot takes the
163 // `restart()` path instead of re-bootstrapping.
164 //
165 // Strict contract:
166 //
167 // - If a catalog is attached and is missing a cluster_id,
168 // the server is lying about being bootstrapped — this
169 // is an invariant violation, so we reject the join
170 // loudly instead of papering over it with a sentinel
171 // zero that would silently collapse two different
172 // clusters into one "cluster 0".
173 // - If a catalog is not attached (unit-test path), we
174 // fall back to `self.node_id`. This is a test-only
175 // affordance: it keeps the response well-formed without
176 // inventing a cross-cluster identity, because in tests
177 // every node id is locally unique by construction.
178 let cluster_id = match self.catalog.as_ref() {
179 Some(catalog) => match catalog.load_cluster_id() {
180 Ok(Some(id)) => id,
181 Ok(None) => {
182 return reject(
183 "server catalog is attached but has no cluster_id — refusing to \
184 issue a JoinResponse without a real cluster identity"
185 .to_string(),
186 );
187 }
188 Err(e) => {
189 return reject(format!("failed to read cluster_id from catalog: {e}"));
190 }
191 },
192 None => self.node_id,
193 };
194
195 // 5. Admit into topology.
196 {
197 let mut topo = self.topology.write().unwrap_or_else(|p| p.into_inner());
198 let initial_resp = handle_join_request(&req, &mut topo, &routing, cluster_id);
199 if !initial_resp.success {
200 // Reject bubbled up from the shared function (e.g., the
201 // collision check we just did, repeated under the write
202 // guard in case something raced).
203 return initial_resp;
204 }
205 }
206
207 // 6. Propose AddLearner on every group.
208 let group_ids: Vec<u64> = {
209 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
210 mr.routing().group_ids()
211 };
212
213 let mut pending: Vec<(u64, u64)> = Vec::with_capacity(group_ids.len()); // (group_id, log_index)
214 for gid in &group_ids {
215 let change = ConfChange {
216 change_type: ConfChangeType::AddLearner,
217 node_id: req.node_id,
218 };
219 let propose_result = {
220 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
221 mr.propose_conf_change(*gid, &change)
222 };
223 match propose_result {
224 Ok((_, log_index)) => pending.push((*gid, log_index)),
225 Err(ClusterError::Transport { detail }) => {
226 return reject(format!(
227 "failed to propose AddLearner on group {gid}: {detail}"
228 ));
229 }
230 Err(e) => {
231 return reject(format!("failed to propose AddLearner on group {gid}: {e}"));
232 }
233 }
234 }
235
236 // 7. Wait for every conf change to actually *apply* to
237 // routing. Earlier versions of this flow polled
238 // `commit_index_for` and relied on an unconditional
239 // inline apply inside `propose_conf_change` — which
240 // was racy for multi-voter groups where the commit
241 // can be deferred until quorum replicates the log
242 // entry. The correct semantic signal is "the new node
243 // appears in `routing.group_info(gid).learners`",
244 // because that's what `apply_conf_change` writes after
245 // the commit lands. Polling this also works cleanly
246 // for single-voter groups (the inline apply makes the
247 // condition true on the first poll) and multi-voter
248 // groups (the tick loop runs concurrently with this
249 // `await`, drains `committed_entries`, and calls
250 // `apply_conf_change` → routing update → condition
251 // flips).
252 let deadline = Instant::now() + CONF_CHANGE_COMMIT_TIMEOUT;
253 for (gid, log_index) in &pending {
254 if let Err(err) = self
255 .wait_for_learner_applied(*gid, req.node_id, *log_index, deadline)
256 .await
257 {
258 return reject(err.to_string());
259 }
260 }
261
262 // 8. Persist catalog (topology + post-AddLearner routing).
263 if let Some(catalog) = self.catalog.as_ref() {
264 let topo_snapshot = self
265 .topology
266 .read()
267 .unwrap_or_else(|p| p.into_inner())
268 .clone();
269 let routing_snapshot = {
270 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
271 mr.routing().clone()
272 };
273 if let Err(e) = catalog.save_topology(&topo_snapshot) {
274 warn!(error = %e, "failed to persist topology after join");
275 return reject(format!("catalog save_topology failed: {e}"));
276 }
277 if let Err(e) = catalog.save_routing(&routing_snapshot) {
278 warn!(error = %e, "failed to persist routing after join");
279 return reject(format!("catalog save_routing failed: {e}"));
280 }
281 }
282
283 // 9. Broadcast topology to everyone so peers learn the new addr.
284 health::broadcast_topology(self.node_id, &self.topology, &self.transport);
285
286 // 10. Build the final response from the post-AddLearner state.
287 info!(
288 joining_node = req.node_id,
289 groups = pending.len(),
290 "join accepted; learner AddLearner commits complete"
291 );
292 self.build_current_response(&req)
293 }
294
295 /// Wait for the semantic goal of "learner is now tracked in
296 /// `routing.group_info(group_id).learners`", polling every
297 /// [`CONF_CHANGE_POLL_INTERVAL`] up to `deadline`.
298 ///
299 /// This is the post-apply condition that `apply_conf_change`
300 /// writes once a committed `AddLearner` entry has been
301 /// applied to the local state. Polling this rather than the
302 /// raw `commit_index` is what lets the join flow stay
303 /// correct on multi-voter groups where the commit is
304 /// deferred until quorum replicates.
305 ///
306 /// `log_index` is carried into the error enum for debugging
307 /// only; the condition is not gated on it.
308 ///
309 /// Surfaces failure through [`ClusterError::JoinCommitTimeout`]
310 /// and [`ClusterError::JoinGroupDisappeared`] so the join
311 /// flow can match the cause and so the crate's central
312 /// error enum owns the human-readable rendering.
313 async fn wait_for_learner_applied(
314 &self,
315 group_id: u64,
316 learner_id: u64,
317 log_index: u64,
318 deadline: Instant,
319 ) -> Result<()> {
320 loop {
321 let applied = {
322 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
323 mr.routing()
324 .group_info(group_id)
325 .map(|info| info.learners.contains(&learner_id))
326 };
327 match applied {
328 Some(true) => return Ok(()),
329 Some(false) => {}
330 None => return Err(ClusterError::JoinGroupDisappeared { group_id }),
331 }
332 if Instant::now() >= deadline {
333 return Err(ClusterError::JoinCommitTimeout {
334 group_id,
335 log_index,
336 });
337 }
338 tokio::time::sleep(CONF_CHANGE_POLL_INTERVAL).await;
339 }
340 }
341
342 /// Build a `JoinResponse` snapshotting the current topology
343 /// and routing. Used both by the happy-path return and by the
344 /// idempotent re-join short-circuit. The strict cluster_id
345 /// check is the same as the one at the top of `join_flow` —
346 /// a catalog-attached server with no stamped cluster_id is an
347 /// invariant violation and we reject the join rather than
348 /// synthesise a sentinel identity.
349 fn build_current_response(&self, req: &JoinRequest) -> JoinResponse {
350 let cluster_id = match self.catalog.as_ref() {
351 Some(catalog) => match catalog.load_cluster_id() {
352 Ok(Some(id)) => id,
353 Ok(None) => {
354 return reject(
355 "server catalog is attached but has no cluster_id — refusing to \
356 issue a JoinResponse without a real cluster identity"
357 .to_string(),
358 );
359 }
360 Err(e) => {
361 return reject(format!("failed to read cluster_id from catalog: {e}"));
362 }
363 },
364 None => self.node_id,
365 };
366
367 let topology_clone = self
368 .topology
369 .read()
370 .unwrap_or_else(|p| p.into_inner())
371 .clone();
372 let routing_clone = {
373 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
374 mr.routing().clone()
375 };
376 // Re-use the pure builder from `bootstrap/handle_join.rs`.
377 // `handle_join_request` is idempotent against the same
378 // (id, addr) — at this point the topology already
379 // contains the new node, so this call only rebuilds the
380 // wire response.
381 let mut topo = topology_clone;
382 handle_join_request(req, &mut topo, &routing_clone, cluster_id)
383 }
384}
385
386/// Build a failure `JoinResponse` with the given error message.
387fn reject(error: String) -> JoinResponse {
388 JoinResponse {
389 success: false,
390 error,
391 cluster_id: 0,
392 nodes: vec![],
393 vshard_to_group: vec![],
394 groups: vec![],
395 }
396}