Skip to main content

nodedb_cluster/bootstrap/
handle_join.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Server-side `JoinRequest` handler: build a `JoinResponse` from current cluster state.
4//!
5//! Called by [`crate::raft_loop::handle_rpc`] on the group-0 leader when a
6//! `JoinRequest` arrives. This function is the single source of truth for how
7//! a new node is admitted into the topology wire response; the Raft conf-change
8//! that actually replicates membership across groups is driven separately from
9//! the RPC arm.
10//!
11//! Semantics:
12//!
13//! - **New node**: added to topology as `Active`, full wire response returned.
14//! - **Known node, same address**: idempotent — no mutation, full wire response returned.
15//! - **Known node, different address**: rejected with `success: false`. This
16//!   catches node-id reuse (operator error or a ghost node coming back with a
17//!   stale id on a new address).
18//! - **Invalid `listen_addr` in the request**: rejected with `success: false`.
19
20use std::net::SocketAddr;
21
22use tracing::warn;
23
24use crate::routing::RoutingTable;
25use crate::rpc_codec::{JoinGroupInfo, JoinNodeInfo, JoinRequest, JoinResponse};
26use crate::topology::{CLUSTER_WIRE_FORMAT_VERSION, ClusterTopology, NodeInfo, NodeState};
27
28/// Build a `JoinResponse` for an incoming `JoinRequest`.
29///
30/// See module docs for semantics. Mutates `topology` only when the node is
31/// newly admitted; idempotent for re-joins with the same address.
32///
33/// `cluster_id` is the id of the cluster this node belongs to — the
34/// join flow reads it from the local catalog and threads it through so
35/// the joining node can persist it and take the `restart()` path on a
36/// subsequent boot. Zero is a valid placeholder when the server's
37/// catalog has not yet been populated; rejection responses also carry
38/// zero.
39pub fn handle_join_request(
40    req: &JoinRequest,
41    topology: &mut ClusterTopology,
42    routing: &RoutingTable,
43    cluster_id: u64,
44) -> JoinResponse {
45    // Validate the wire version carried in the JOIN payload (belt-and-suspenders
46    // check; the transport-level handshake already negotiated a compatible version
47    // before this RPC was dispatched). The `wire_version` field here is the
48    // cluster-wide schema version (`CLUSTER_WIRE_FORMAT_VERSION`), distinct from
49    // the transport-level RPC frame version. We require an exact match because
50    // this build uses floor == ceiling (no backward-compat window in the schema).
51    if req.wire_version != CLUSTER_WIRE_FORMAT_VERSION {
52        warn!(
53            node_id = req.node_id,
54            joiner_wire_version = req.wire_version,
55            expected_wire_version = CLUSTER_WIRE_FORMAT_VERSION,
56            "join request rejected: joiner cluster wire_version mismatch"
57        );
58        return reject(format!(
59            "joiner wire_version {} does not match this cluster's wire_version {} — \
60             rolling upgrade is required before this node can join",
61            req.wire_version, CLUSTER_WIRE_FORMAT_VERSION
62        ));
63    }
64
65    // Validate the listen address early.
66    let addr: SocketAddr = match req.listen_addr.parse() {
67        Ok(a) => a,
68        Err(e) => {
69            return reject(format!("invalid listen_addr '{}': {e}", req.listen_addr));
70        }
71    };
72
73    // Collision / idempotency check — both require reading the existing entry.
74    if let Some(existing) = topology.get_node(req.node_id) {
75        let existing_addr = existing.addr.clone();
76        if existing_addr != req.listen_addr {
77            // Same id, different address — reject.
78            return reject(format!(
79                "node_id {} already registered with different address {} (request: {})",
80                req.node_id, existing_addr, req.listen_addr
81            ));
82        }
83        // Same id, same address. If already Active we short-circuit —
84        // no topology mutation at all, just rebuild the wire response. If the
85        // node was in a non-Active state (Joining/Draining), normalize to
86        // Active now because it's clearly back online.
87        if existing.state != NodeState::Active
88            && let Some(entry) = topology.get_node_mut(req.node_id)
89        {
90            entry.state = NodeState::Active;
91        }
92        return build_response(topology, routing, cluster_id);
93    }
94
95    // Brand new node — admit as Active. Stamp the joiner's own
96    // wire version and identity fields onto its NodeInfo so every
97    // peer that replays this topology has the correct version and
98    // identity pins.
99    let spki_pin: Option<[u8; 32]> = req.spki_pin.as_deref().and_then(|b| {
100        if b.len() == 32 {
101            let mut arr = [0u8; 32];
102            arr.copy_from_slice(b);
103            Some(arr)
104        } else {
105            None
106        }
107    });
108    topology.add_node(
109        NodeInfo::new(req.node_id, addr, NodeState::Active)
110            .with_wire_version(req.wire_version)
111            .with_spiffe_id(req.spiffe_id.clone())
112            .with_spki_pin(spki_pin),
113    );
114    build_response(topology, routing, cluster_id)
115}
116
117/// Build a successful `JoinResponse` from the current topology and routing.
118fn build_response(
119    topology: &ClusterTopology,
120    routing: &RoutingTable,
121    cluster_id: u64,
122) -> JoinResponse {
123    let nodes: Vec<JoinNodeInfo> = topology
124        .all_nodes()
125        .map(|n| JoinNodeInfo {
126            node_id: n.node_id,
127            addr: n.addr.clone(),
128            state: n.state.as_u8(),
129            raft_groups: n.raft_groups.clone(),
130            wire_version: n.wire_version,
131            spiffe_id: n.spiffe_id.clone(),
132            spki_pin: n.spki_pin.map(|arr| arr.to_vec()),
133        })
134        .collect();
135
136    let groups: Vec<JoinGroupInfo> = routing
137        .group_members()
138        .iter()
139        .map(|(&gid, info)| JoinGroupInfo {
140            group_id: gid,
141            leader: info.leader,
142            members: info.members.clone(),
143            learners: info.learners.clone(),
144        })
145        .collect();
146
147    JoinResponse {
148        success: true,
149        error: String::new(),
150        cluster_id,
151        nodes,
152        vshard_to_group: routing.vshard_to_group().to_vec(),
153        groups,
154    }
155}
156
157/// Build a rejection response with the given error message.
158fn reject(error: String) -> JoinResponse {
159    JoinResponse {
160        success: false,
161        error,
162        cluster_id: 0,
163        nodes: vec![],
164        vshard_to_group: vec![],
165        groups: vec![],
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    fn topo_with_one_node() -> ClusterTopology {
174        let mut topology = ClusterTopology::new();
175        topology.add_node(NodeInfo::new(
176            1,
177            "10.0.0.1:9400".parse().unwrap(),
178            NodeState::Active,
179        ));
180        topology
181    }
182
183    #[test]
184    fn handle_join_request_adds_node() {
185        let mut topology = topo_with_one_node();
186        let routing = RoutingTable::uniform(2, &[1], 1);
187
188        let req = JoinRequest {
189            node_id: 2,
190            listen_addr: "10.0.0.2:9400".into(),
191            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
192            spiffe_id: None,
193            spki_pin: None,
194        };
195
196        let resp = handle_join_request(&req, &mut topology, &routing, 42);
197
198        assert!(resp.success);
199        assert_eq!(resp.nodes.len(), 2);
200        assert_eq!(resp.vshard_to_group.len(), 1024);
201        // uniform(2, ...) creates 2 data groups + 1 metadata group = 3 total.
202        assert_eq!(resp.groups.len(), 3);
203
204        assert!(topology.contains(2));
205        assert_eq!(topology.node_count(), 2);
206    }
207
208    #[test]
209    fn handle_join_request_idempotent() {
210        let mut topology = topo_with_one_node();
211        let routing = RoutingTable::uniform(1, &[1], 1);
212
213        let req = JoinRequest {
214            node_id: 2,
215            listen_addr: "10.0.0.2:9400".into(),
216            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
217            spiffe_id: None,
218            spki_pin: None,
219        };
220
221        let _ = handle_join_request(&req, &mut topology, &routing, 42);
222        let resp = handle_join_request(&req, &mut topology, &routing, 42);
223
224        assert!(resp.success);
225        assert_eq!(resp.nodes.len(), 2); // Still 2, not 3.
226        assert_eq!(topology.node_count(), 2);
227    }
228
229    /// A second join with the same id+addr must not mutate topology at all
230    /// (no duplicate entries, no state reset). Verify by capturing
231    /// `node_count` and the node ordering between calls.
232    #[test]
233    fn handle_join_request_idempotent_no_mutation() {
234        let mut topology = topo_with_one_node();
235        let routing = RoutingTable::uniform(1, &[1], 1);
236
237        let req = JoinRequest {
238            node_id: 2,
239            listen_addr: "10.0.0.2:9400".into(),
240            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
241            spiffe_id: None,
242            spki_pin: None,
243        };
244
245        let resp1 = handle_join_request(&req, &mut topology, &routing, 7);
246        let ids_before: Vec<u64> = topology.all_nodes().map(|n| n.node_id).collect();
247        let count_before = topology.node_count();
248
249        let resp2 = handle_join_request(&req, &mut topology, &routing, 7);
250        assert_eq!(resp1.cluster_id, 7);
251        assert_eq!(resp2.cluster_id, 7);
252        let ids_after: Vec<u64> = topology.all_nodes().map(|n| n.node_id).collect();
253
254        assert!(resp1.success && resp2.success);
255        assert_eq!(count_before, topology.node_count());
256        assert_eq!(ids_before, ids_after);
257        assert_eq!(resp2.nodes.len(), 2);
258        // Node 2 must still be Active.
259        let n2 = topology.get_node(2).unwrap();
260        assert_eq!(n2.state, NodeState::Active);
261    }
262
263    /// Same id, different address → reject.
264    #[test]
265    fn handle_join_request_rejects_id_collision() {
266        let mut topology = topo_with_one_node();
267        let routing = RoutingTable::uniform(1, &[1], 1);
268
269        // First join: node 2 at 10.0.0.2:9400.
270        let req1 = JoinRequest {
271            node_id: 2,
272            listen_addr: "10.0.0.2:9400".into(),
273            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
274            spiffe_id: None,
275            spki_pin: None,
276        };
277        let resp1 = handle_join_request(&req1, &mut topology, &routing, 11);
278        assert!(resp1.success);
279
280        // Second join: same id, different address — must be rejected.
281        let req2 = JoinRequest {
282            node_id: 2,
283            listen_addr: "10.0.0.99:9400".into(),
284            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
285            spiffe_id: None,
286            spki_pin: None,
287        };
288        let resp2 = handle_join_request(&req2, &mut topology, &routing, 11);
289
290        assert!(!resp2.success);
291        assert!(
292            resp2.error.contains("already registered"),
293            "error should mention collision: {}",
294            resp2.error
295        );
296        // Topology must not be clobbered.
297        assert_eq!(topology.node_count(), 2);
298        let n2 = topology.get_node(2).unwrap();
299        assert_eq!(n2.addr, "10.0.0.2:9400");
300    }
301
302    #[test]
303    fn handle_join_invalid_addr() {
304        let mut topology = ClusterTopology::new();
305        let routing = RoutingTable::uniform(1, &[1], 1);
306
307        let req = JoinRequest {
308            node_id: 2,
309            listen_addr: "not-a-valid-address".into(),
310            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
311            spiffe_id: None,
312            spki_pin: None,
313        };
314
315        let resp = handle_join_request(&req, &mut topology, &routing, 42);
316        assert!(!resp.success);
317        assert!(!resp.error.is_empty());
318    }
319}