Skip to main content

nodedb_cluster/raft_loop/
join.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Server-side `JoinRequest` orchestration.
4//!
5//! This is the async flow invoked by the `RaftRpc::JoinRequest` arm in
6//! [`super::handle_rpc`]. It turns a remote node's desire to join the
7//! cluster into a series of durable Raft conf-changes and returns a
8//! `JoinResponse` containing everything the joining node needs to
9//! reconstruct its local `MultiRaft` in the `Learner` role.
10//!
11//! ## Flow
12//!
13//! 1. **Leader check.** Snapshot the group-0 leader id and clone the
14//!    routing table under a single `MultiRaft` lock. If another node is
15//!    the leader, return a redirect response with that node's address.
16//! 2. **Validate address.** Parse `req.listen_addr`. On failure, return
17//!    an error response.
18//! 3. **Idempotency / collision check.** If the node id is already in
19//!    topology with the same address and is Active, rebuild and return
20//!    the current response without any further Raft activity. If the
21//!    node id exists with a different address, reject.
22//! 4. **Register transport peer.** Add the new peer address to the
23//!    local transport so the leader can immediately send AppendEntries
24//!    to the learner-to-be.
25//! 5. **Admit into topology.** Under a short `topology.write()` guard,
26//!    call `bootstrap::handle_join_request` — the only side effect is
27//!    inserting the new `NodeInfo`. The routing-table clone we took in
28//!    step 1 is intentionally *not* reused for the final response; a
29//!    fresh clone is taken after step 6 so the response reflects the
30//!    post-AddLearner routing state.
31//! 6. **Propose AddLearner on every group.** For each Raft group, take
32//!    the `MultiRaft` lock, propose
33//!    `ConfChange::AddLearner(new_node_id)`, and record the resulting
34//!    log index. Drop the lock between groups. If this node is not the
35//!    leader of a particular group the propose will fail with
36//!    `NotLeader` — we surface that as a failure response. (For the
37//!    3-node bootstrap case in the integration test the bootstrap seed
38//!    leads every group, so this path is exercised end-to-end.)
39//! 7. **Wait for each conf-change to commit.** Poll `commit_index_for`
40//!    on each group every 20 ms with a 5-second deadline. A
41//!    single-voter group (the bootstrap seed before any voters have
42//!    been added) commits instantly. Multi-voter groups wait for
43//!    quorum. On timeout, return an error response — the joining node
44//!    will retry the whole flow.
45//! 8. **Persist topology + routing to catalog** (when a catalog is
46//!    attached). Order matters: Raft log → catalog → response.
47//! 9. **Broadcast TopologyUpdate** to every currently-active peer so
48//!    followers learn the new node's address. Fire-and-forget.
49//! 10. **Build and return JoinResponse** with the updated routing
50//!     (which now includes the new node as a learner on every group).
51//!
52//! The Raft-level promotion from learner to voter happens asynchronously
53//! in the tick loop (`super::tick::promote_ready_learners`) once the
54//! learner's `match_index` catches up. That avoids blocking the join
55//! handler on replication progress while still completing the
56//! two-phase single-server add.
57
58use std::net::SocketAddr;
59use std::time::{Duration, Instant};
60
61use tracing::{debug, info, warn};
62
63use crate::bootstrap::handle_join_request;
64use crate::conf_change::{ConfChange, ConfChangeType};
65use crate::error::{ClusterError, Result};
66use crate::forward::PlanExecutor;
67use crate::health;
68use crate::multi_raft::GroupStatus;
69use crate::routing::RoutingTable;
70use crate::rpc_codec::{JoinRequest, JoinResponse, LEADER_REDIRECT_PREFIX};
71
72use super::handle_rpc::{JoinDecision, TOPOLOGY_GROUP_ID, decide_join};
73use super::loop_core::{CommitApplier, RaftLoop};
74
75/// Maximum time we wait for any one `AddLearner` conf-change to commit
76/// before giving up and returning a failure response to the joining
77/// node.
78const CONF_CHANGE_COMMIT_TIMEOUT: Duration = Duration::from_secs(5);
79
80/// Polling interval for the commit-wait loop.
81const CONF_CHANGE_POLL_INTERVAL: Duration = Duration::from_millis(20);
82
83impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
84    /// Full server-side `JoinRequest` handler. See module docs for the
85    /// phase-by-phase description.
86    pub(super) async fn join_flow(&self, req: JoinRequest) -> JoinResponse {
87        // 1. Snapshot group-0 leader + clone routing under one lock.
88        let (group0_leader, routing): (u64, RoutingTable) = {
89            let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
90            let routing = mr.routing().clone();
91            let leader_id = mr
92                .group_statuses()
93                .into_iter()
94                .find(|s: &GroupStatus| s.group_id == TOPOLOGY_GROUP_ID)
95                .map(|s| s.leader_id)
96                .unwrap_or(0);
97            (leader_id, routing)
98        };
99
100        // Leader check.
101        let leader_addr_hint = if group0_leader != 0 && group0_leader != self.node_id {
102            self.topology
103                .read()
104                .unwrap_or_else(|p| p.into_inner())
105                .get_node(group0_leader)
106                .map(|n| n.addr.clone())
107        } else {
108            None
109        };
110        if let JoinDecision::Redirect { leader_addr } =
111            decide_join(group0_leader, self.node_id, leader_addr_hint)
112        {
113            warn!(
114                joining_node = req.node_id,
115                leader_id = group0_leader,
116                leader_addr = %leader_addr,
117                "JoinRequest received on non-leader; redirecting"
118            );
119            return reject(format!("{LEADER_REDIRECT_PREFIX}{leader_addr}"));
120        }
121
122        // 2. Validate the address.
123        let new_addr: SocketAddr = match req.listen_addr.parse() {
124            Ok(a) => a,
125            Err(e) => {
126                return reject(format!("invalid listen_addr '{}': {e}", req.listen_addr));
127            }
128        };
129
130        // 3. Idempotency / collision check against topology.
131        //    `handle_join_request` in step 5 handles the fine-grained
132        //    semantics, but we check here first so idempotent re-joins
133        //    short-circuit *before* we propose any Raft conf changes.
134        let existing = self
135            .topology
136            .read()
137            .unwrap_or_else(|p| p.into_inner())
138            .get_node(req.node_id)
139            .cloned();
140        if let Some(existing) = existing {
141            if existing.addr != req.listen_addr {
142                return reject(format!(
143                    "node_id {} already registered with different address {} (request: {})",
144                    req.node_id, existing.addr, req.listen_addr
145                ));
146            }
147            // Same id + same addr → idempotent replay. Just rebuild the
148            // current response from the latest routing state without
149            // proposing any conf changes.
150            debug!(
151                joining_node = req.node_id,
152                "idempotent re-join; returning current cluster state"
153            );
154            return self.build_current_response(&req);
155        }
156
157        // 4. Register transport peer so the leader can reach it.
158        self.transport.register_peer(req.node_id, new_addr);
159
160        // Read the local cluster id from the catalog and echo it
161        // on every successful `JoinResponse`. The joining node
162        // persists this value so its next boot takes the
163        // `restart()` path instead of re-bootstrapping.
164        //
165        // Strict contract:
166        //
167        // - If a catalog is attached and is missing a cluster_id,
168        //   the server is lying about being bootstrapped — this
169        //   is an invariant violation, so we reject the join
170        //   loudly instead of papering over it with a sentinel
171        //   zero that would silently collapse two different
172        //   clusters into one "cluster 0".
173        // - If a catalog is not attached (unit-test path), we
174        //   fall back to `self.node_id`. This is a test-only
175        //   affordance: it keeps the response well-formed without
176        //   inventing a cross-cluster identity, because in tests
177        //   every node id is locally unique by construction.
178        let cluster_id = match self.catalog.as_ref() {
179            Some(catalog) => match catalog.load_cluster_id() {
180                Ok(Some(id)) => id,
181                Ok(None) => {
182                    return reject(
183                        "server catalog is attached but has no cluster_id — refusing to \
184                         issue a JoinResponse without a real cluster identity"
185                            .to_string(),
186                    );
187                }
188                Err(e) => {
189                    return reject(format!("failed to read cluster_id from catalog: {e}"));
190                }
191            },
192            None => self.node_id,
193        };
194
195        // 5. Admit into topology.
196        {
197            let mut topo = self.topology.write().unwrap_or_else(|p| p.into_inner());
198            let initial_resp = handle_join_request(&req, &mut topo, &routing, cluster_id);
199            if !initial_resp.success {
200                // Reject bubbled up from the shared function (e.g., the
201                // collision check we just did, repeated under the write
202                // guard in case something raced).
203                return initial_resp;
204            }
205        }
206
207        // 6. Propose AddLearner on every group.
208        let group_ids: Vec<u64> = {
209            let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
210            mr.routing().group_ids()
211        };
212
213        let mut pending: Vec<(u64, u64)> = Vec::with_capacity(group_ids.len()); // (group_id, log_index)
214        for gid in &group_ids {
215            let change = ConfChange {
216                change_type: ConfChangeType::AddLearner,
217                node_id: req.node_id,
218            };
219            let propose_result = {
220                let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
221                mr.propose_conf_change(*gid, &change)
222            };
223            match propose_result {
224                Ok((_, log_index)) => pending.push((*gid, log_index)),
225                Err(ClusterError::Transport { detail }) => {
226                    return reject(format!(
227                        "failed to propose AddLearner on group {gid}: {detail}"
228                    ));
229                }
230                Err(e) => {
231                    return reject(format!("failed to propose AddLearner on group {gid}: {e}"));
232                }
233            }
234        }
235
236        // 7. Wait for every conf change to actually *apply* to
237        //    routing. Earlier versions of this flow polled
238        //    `commit_index_for` and relied on an unconditional
239        //    inline apply inside `propose_conf_change` — which
240        //    was racy for multi-voter groups where the commit
241        //    can be deferred until quorum replicates the log
242        //    entry. The correct semantic signal is "the new node
243        //    appears in `routing.group_info(gid).learners`",
244        //    because that's what `apply_conf_change` writes after
245        //    the commit lands. Polling this also works cleanly
246        //    for single-voter groups (the inline apply makes the
247        //    condition true on the first poll) and multi-voter
248        //    groups (the tick loop runs concurrently with this
249        //    `await`, drains `committed_entries`, and calls
250        //    `apply_conf_change` → routing update → condition
251        //    flips).
252        let deadline = Instant::now() + CONF_CHANGE_COMMIT_TIMEOUT;
253        for (gid, log_index) in &pending {
254            if let Err(err) = self
255                .wait_for_learner_applied(*gid, req.node_id, *log_index, deadline)
256                .await
257            {
258                return reject(err.to_string());
259            }
260        }
261
262        // 8. Persist catalog (topology + post-AddLearner routing).
263        if let Some(catalog) = self.catalog.as_ref() {
264            let topo_snapshot = self
265                .topology
266                .read()
267                .unwrap_or_else(|p| p.into_inner())
268                .clone();
269            let routing_snapshot = {
270                let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
271                mr.routing().clone()
272            };
273            if let Err(e) = catalog.save_topology(&topo_snapshot) {
274                warn!(error = %e, "failed to persist topology after join");
275                return reject(format!("catalog save_topology failed: {e}"));
276            }
277            if let Err(e) = catalog.save_routing(&routing_snapshot) {
278                warn!(error = %e, "failed to persist routing after join");
279                return reject(format!("catalog save_routing failed: {e}"));
280            }
281        }
282
283        // 9. Broadcast topology to everyone so peers learn the new addr.
284        health::broadcast_topology(self.node_id, &self.topology, &self.transport);
285
286        // 10. Build the final response from the post-AddLearner state.
287        info!(
288            joining_node = req.node_id,
289            groups = pending.len(),
290            "join accepted; learner AddLearner commits complete"
291        );
292        self.build_current_response(&req)
293    }
294
295    /// Wait for the semantic goal of "learner is now tracked in
296    /// `routing.group_info(group_id).learners`", polling every
297    /// [`CONF_CHANGE_POLL_INTERVAL`] up to `deadline`.
298    ///
299    /// This is the post-apply condition that `apply_conf_change`
300    /// writes once a committed `AddLearner` entry has been
301    /// applied to the local state. Polling this rather than the
302    /// raw `commit_index` is what lets the join flow stay
303    /// correct on multi-voter groups where the commit is
304    /// deferred until quorum replicates.
305    ///
306    /// `log_index` is carried into the error enum for debugging
307    /// only; the condition is not gated on it.
308    ///
309    /// Surfaces failure through [`ClusterError::JoinCommitTimeout`]
310    /// and [`ClusterError::JoinGroupDisappeared`] so the join
311    /// flow can match the cause and so the crate's central
312    /// error enum owns the human-readable rendering.
313    async fn wait_for_learner_applied(
314        &self,
315        group_id: u64,
316        learner_id: u64,
317        log_index: u64,
318        deadline: Instant,
319    ) -> Result<()> {
320        loop {
321            let applied = {
322                let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
323                mr.routing()
324                    .group_info(group_id)
325                    .map(|info| info.learners.contains(&learner_id))
326            };
327            match applied {
328                Some(true) => return Ok(()),
329                Some(false) => {}
330                None => return Err(ClusterError::JoinGroupDisappeared { group_id }),
331            }
332            if Instant::now() >= deadline {
333                return Err(ClusterError::JoinCommitTimeout {
334                    group_id,
335                    log_index,
336                });
337            }
338            tokio::time::sleep(CONF_CHANGE_POLL_INTERVAL).await;
339        }
340    }
341
342    /// Build a `JoinResponse` snapshotting the current topology
343    /// and routing. Used both by the happy-path return and by the
344    /// idempotent re-join short-circuit. The strict cluster_id
345    /// check is the same as the one at the top of `join_flow` —
346    /// a catalog-attached server with no stamped cluster_id is an
347    /// invariant violation and we reject the join rather than
348    /// synthesise a sentinel identity.
349    fn build_current_response(&self, req: &JoinRequest) -> JoinResponse {
350        let cluster_id = match self.catalog.as_ref() {
351            Some(catalog) => match catalog.load_cluster_id() {
352                Ok(Some(id)) => id,
353                Ok(None) => {
354                    return reject(
355                        "server catalog is attached but has no cluster_id — refusing to \
356                         issue a JoinResponse without a real cluster identity"
357                            .to_string(),
358                    );
359                }
360                Err(e) => {
361                    return reject(format!("failed to read cluster_id from catalog: {e}"));
362                }
363            },
364            None => self.node_id,
365        };
366
367        let topology_clone = self
368            .topology
369            .read()
370            .unwrap_or_else(|p| p.into_inner())
371            .clone();
372        let routing_clone = {
373            let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
374            mr.routing().clone()
375        };
376        // Re-use the pure builder from `bootstrap/handle_join.rs`.
377        // `handle_join_request` is idempotent against the same
378        // (id, addr) — at this point the topology already
379        // contains the new node, so this call only rebuilds the
380        // wire response.
381        let mut topo = topology_clone;
382        handle_join_request(req, &mut topo, &routing_clone, cluster_id)
383    }
384}
385
386/// Build a failure `JoinResponse` with the given error message.
387fn reject(error: String) -> JoinResponse {
388    JoinResponse {
389        success: false,
390        error,
391        cluster_id: 0,
392        nodes: vec![],
393        vshard_to_group: vec![],
394        groups: vec![],
395    }
396}