Skip to main content

nodedb_cluster/bootstrap/
join.rs

1//! Join path: contact seeds, receive full cluster state, apply locally.
2//!
3//! The join loop is deliberately robust against two realistic cluster
4//! startup failure modes:
5//!
6//! 1. **Slow start**: the designated bootstrapper has not yet
7//!    completed its first Raft election when this node first calls
8//!    `join()`. Every seed may return "unreachable" or "not leader"
9//!    for a brief window. We retry the whole loop with exponential
10//!    backoff so the join eventually succeeds without operator
11//!    intervention.
12//!
13//! 2. **Leader redirect**: the seed we contacted is alive but isn't
14//!    the group-0 leader. It returns
15//!    `JoinResponse { success: false, error: "not leader; retry at <addr>" }`
16//!    and we follow the hint up to a small number of hops before
17//!    falling through to the next seed. The string format is the
18//!    contract set by `raft_loop::join::join_flow` — keep this parser
19//!    in lock-step with that producer.
20
21use std::collections::HashSet;
22use std::net::SocketAddr;
23
24use tracing::{debug, info, warn};
25
26use crate::catalog::ClusterCatalog;
27use crate::error::{ClusterError, Result};
28use crate::lifecycle_state::ClusterLifecycleTracker;
29use crate::multi_raft::MultiRaft;
30use crate::routing::{GroupInfo, RoutingTable};
31use crate::rpc_codec::{JoinRequest, JoinResponse, LEADER_REDIRECT_PREFIX, RaftRpc};
32use crate::topology::{ClusterTopology, NodeInfo, NodeState};
33use crate::transport::NexarTransport;
34
35use super::config::{ClusterConfig, ClusterState};
36
37/// Maximum number of leader-redirect hops inside a single join
38/// attempt. The redirect chain starts at whichever seed we first
39/// contact; each hop costs a round-trip, so keep this small.
40const MAX_REDIRECTS_PER_ATTEMPT: u32 = 3;
41
42/// Parse a `JoinResponse::error` string as a leader redirect hint.
43///
44/// The prefix is defined as a shared constant in `rpc_codec`
45/// (`LEADER_REDIRECT_PREFIX`) so the producer side
46/// (`raft_loop::join::join_flow`) and this consumer can never
47/// drift. Any other kind of rejection (collision, parse error,
48/// catalog persist failure, commit timeout, etc.) is treated as
49/// a hard failure that bubbles through the normal error path.
50///
51/// Returns `None` for any string that doesn't start with the
52/// expected prefix, or where the address portion does not parse
53/// as a valid `SocketAddr`.
54pub(crate) fn parse_leader_hint(error: &str) -> Option<SocketAddr> {
55    error
56        .strip_prefix(LEADER_REDIRECT_PREFIX)
57        .and_then(|s| s.trim().parse().ok())
58}
59
60/// Join an existing cluster by contacting seed nodes.
61///
62/// The loop has two layers:
63///
64/// - **Outer**: retry passes with exponential backoff per
65///   `config.join_retry`. Handles the "bootstrapper not up yet"
66///   startup race.
67/// - **Inner**: walk the seed list plus any leader-redirect hops for
68///   this attempt. A successful `JoinResponse` short-circuits the
69///   whole function; failures on one candidate fall through to the
70///   next.
71pub(super) async fn join(
72    config: &ClusterConfig,
73    catalog: &ClusterCatalog,
74    transport: &NexarTransport,
75    lifecycle: &ClusterLifecycleTracker,
76) -> Result<ClusterState> {
77    info!(
78        node_id = config.node_id,
79        seeds = ?config.seed_nodes,
80        "joining existing cluster"
81    );
82
83    if config.seed_nodes.is_empty() {
84        let err = ClusterError::Transport {
85            detail: "no seed nodes configured".into(),
86        };
87        lifecycle.to_failed(err.to_string());
88        return Err(err);
89    }
90
91    let req_template = JoinRequest {
92        node_id: config.node_id,
93        listen_addr: config.listen_addr.to_string(),
94        wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
95    };
96
97    let policy = config.join_retry;
98    let mut last_err: Option<ClusterError> = None;
99
100    for attempt in 0..policy.max_attempts {
101        lifecycle.to_joining(attempt);
102
103        let delay = policy.backoff_for(attempt);
104        if !delay.is_zero() {
105            debug!(
106                node_id = config.node_id,
107                attempt,
108                delay_ms = delay.as_millis() as u64,
109                "backing off before next join attempt"
110            );
111            tokio::time::sleep(delay).await;
112        }
113
114        match try_join_once(config, catalog, transport, &req_template).await {
115            Ok(state) => return Ok(state),
116            Err(e) => {
117                warn!(
118                    node_id = config.node_id,
119                    attempt,
120                    error = %e,
121                    "join attempt failed; will retry"
122                );
123                last_err = Some(e);
124            }
125        }
126    }
127
128    let max_attempts = policy.max_attempts;
129    let err = last_err.unwrap_or_else(|| ClusterError::Transport {
130        detail: format!("join exhausted {max_attempts} attempts with no concrete error"),
131    });
132    lifecycle.to_failed(err.to_string());
133    Err(err)
134}
135
136/// One pass over the seed list plus up to `MAX_REDIRECTS_PER_ATTEMPT`
137/// leader-redirect hops. Returns `Ok(state)` on the first successful
138/// `JoinResponse` or an error describing the last failure in this
139/// attempt.
140async fn try_join_once(
141    config: &ClusterConfig,
142    catalog: &ClusterCatalog,
143    transport: &NexarTransport,
144    req_template: &JoinRequest,
145) -> Result<ClusterState> {
146    // Work list: try seeds in sorted order so the lexicographically
147    // smallest address — the designated bootstrapper under the
148    // single-elected-bootstrapper rule — is contacted first. This is
149    // critical during the initial 5-node race: every other seed points
150    // at a node that is itself still joining, so asking them first
151    // eats the full RPC timeout per non-bootstrapper before we reach
152    // the one peer that can actually answer. `HashSet` deduplicates
153    // so a redirect loop can't consume all attempts against the same
154    // address.
155    let mut work: std::collections::VecDeque<SocketAddr> =
156        config.seed_nodes.iter().copied().collect();
157    {
158        // Sort so the designated bootstrapper surfaces first. Leader
159        // redirects get prepended with push_front below, keeping the
160        // "most likely to answer" candidate at the head.
161        let mut sorted: Vec<SocketAddr> = work.drain(..).collect();
162        sorted.sort();
163        work.extend(sorted);
164    }
165    let mut visited: HashSet<SocketAddr> = HashSet::new();
166    let mut redirects: u32 = 0;
167    let mut last_err: Option<ClusterError> = None;
168
169    while let Some(addr) = work.pop_front() {
170        if !visited.insert(addr) {
171            continue;
172        }
173
174        let rpc = RaftRpc::JoinRequest(req_template.clone());
175        match transport.send_rpc_to_addr(addr, rpc).await {
176            Ok(RaftRpc::JoinResponse(resp)) => {
177                if resp.success {
178                    return apply_join_response(config, catalog, transport, &resp);
179                }
180                // Rejected — is it a leader redirect we can follow?
181                if let Some(leader) = parse_leader_hint(&resp.error) {
182                    if redirects < MAX_REDIRECTS_PER_ATTEMPT && !visited.contains(&leader) {
183                        info!(
184                            node_id = config.node_id,
185                            from = %addr,
186                            to = %leader,
187                            "following leader redirect"
188                        );
189                        redirects += 1;
190                        work.push_front(leader);
191                        continue;
192                    }
193                    debug!(
194                        node_id = config.node_id,
195                        from = %addr,
196                        leader = %leader,
197                        redirects,
198                        "redirect cap reached or loop detected; falling through"
199                    );
200                }
201                last_err = Some(ClusterError::Transport {
202                    detail: format!("join rejected by {addr}: {}", resp.error),
203                });
204            }
205            Ok(other) => {
206                last_err = Some(ClusterError::Transport {
207                    detail: format!("unexpected response from {addr}: {other:?}"),
208                });
209            }
210            Err(e) => {
211                debug!(%addr, error = %e, "seed unreachable");
212                last_err = Some(e);
213            }
214        }
215    }
216
217    Err(last_err.unwrap_or_else(|| ClusterError::Transport {
218        detail: "no seed nodes produced a response".into(),
219    }))
220}
221
222/// Apply a JoinResponse: reconstruct topology, routing, and MultiRaft
223/// from wire data.
224///
225/// Order of operations is load-bearing for crash safety:
226///
227/// 1. Reconstruct the `ClusterTopology` and `RoutingTable` in memory.
228/// 2. Persist topology + routing to the catalog **first**, before any
229///    on-disk side effects. If we crash after this step, the next
230///    boot sees `catalog.is_bootstrapped() == true` and takes the
231///    `restart()` path, which reconstructs cleanly from the catalog.
232/// 3. Create the `MultiRaft` and add groups. `add_group` opens redb
233///    files on disk per group; these are idempotent per group id, so
234///    a crash mid-way leaves a recoverable state.
235/// 4. Register every peer address in the transport before returning
236///    so the first outgoing AppendEntries has a known destination.
237fn apply_join_response(
238    config: &ClusterConfig,
239    catalog: &ClusterCatalog,
240    transport: &NexarTransport,
241    resp: &JoinResponse,
242) -> Result<ClusterState> {
243    // 1. Reconstruct topology.
244    let mut topology = ClusterTopology::new();
245    for node in &resp.nodes {
246        let state = NodeState::from_u8(node.state).unwrap_or(NodeState::Active);
247        let mut info = NodeInfo {
248            node_id: node.node_id,
249            addr: node.addr.clone(),
250            state,
251            raft_groups: node.raft_groups.clone(),
252            wire_version: node.wire_version,
253        };
254        if node.node_id == config.node_id {
255            info.state = NodeState::Active;
256        }
257        topology.add_node(info);
258    }
259
260    // 1. Reconstruct routing table.
261    let mut group_members = std::collections::HashMap::new();
262    for g in &resp.groups {
263        group_members.insert(
264            g.group_id,
265            GroupInfo {
266                leader: g.leader,
267                members: g.members.clone(),
268                learners: g.learners.clone(),
269            },
270        );
271    }
272    let routing = RoutingTable::from_parts(resp.vshard_to_group.clone(), group_members);
273
274    // 2. Persist to catalog before any on-disk Raft side effects.
275    //    Cluster id is written first so `is_bootstrapped()` returns
276    //    `true` on any subsequent boot — without this, a joined node
277    //    that restarts would re-enter the bootstrap/join path
278    //    instead of taking `restart()`. Zero is a valid marker: the
279    //    joining node's catalog now carries `Some(0)` for
280    //    `load_cluster_id`, which is enough for the restart
281    //    dispatcher.
282    catalog.save_cluster_id(resp.cluster_id)?;
283    catalog.save_topology(&topology)?;
284    catalog.save_routing(&routing)?;
285
286    // 3. Create MultiRaft — join any group that includes this node,
287    //    either as a voter (group members) or as a learner (group
288    //    learners). A learner-started group boots in the `Learner`
289    //    role and will not run an election until a subsequent
290    //    `PromoteLearner` conf change is applied.
291    let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone());
292    for g in &resp.groups {
293        let is_voter = g.members.contains(&config.node_id);
294        let is_learner = g.learners.contains(&config.node_id);
295
296        if is_voter {
297            let peers: Vec<u64> = g
298                .members
299                .iter()
300                .copied()
301                .filter(|&id| id != config.node_id)
302                .collect();
303            multi_raft.add_group(g.group_id, peers)?;
304        } else if is_learner {
305            let voters = g.members.clone();
306            let other_learners: Vec<u64> = g
307                .learners
308                .iter()
309                .copied()
310                .filter(|&id| id != config.node_id)
311                .collect();
312            multi_raft.add_group_as_learner(g.group_id, voters, other_learners)?;
313        }
314    }
315
316    // 4. Register peer addresses in the transport.
317    for node in &resp.nodes {
318        if node.node_id != config.node_id
319            && let Ok(addr) = node.addr.parse::<SocketAddr>()
320        {
321            transport.register_peer(node.node_id, addr);
322        }
323    }
324
325    info!(
326        node_id = config.node_id,
327        nodes = topology.node_count(),
328        groups = routing.num_groups(),
329        "joined cluster"
330    );
331
332    Ok(ClusterState {
333        topology,
334        routing,
335        multi_raft,
336    })
337}
338
339#[cfg(test)]
340mod tests {
341    use super::super::bootstrap_fn::bootstrap;
342    use super::super::config::JoinRetryPolicy;
343    use super::super::handle_join::handle_join_request;
344    use super::*;
345    use std::sync::{Arc, Mutex};
346    use std::time::Duration;
347
348    fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
349        let dir = tempfile::tempdir().unwrap();
350        let path = dir.path().join("cluster.redb");
351        let catalog = ClusterCatalog::open(&path).unwrap();
352        (dir, catalog)
353    }
354
355    // ── Pure-function tests ───────────────────────────────────────
356
357    #[test]
358    fn parse_leader_hint_extracts_valid_addr() {
359        assert_eq!(
360            parse_leader_hint("not leader; retry at 10.0.0.1:9400"),
361            Some("10.0.0.1:9400".parse().unwrap())
362        );
363        assert_eq!(
364            parse_leader_hint("not leader; retry at 127.0.0.1:65535"),
365            Some("127.0.0.1:65535".parse().unwrap())
366        );
367    }
368
369    #[test]
370    fn parse_leader_hint_rejects_unrelated_error() {
371        assert_eq!(
372            parse_leader_hint("node_id 2 already registered with different address 10.0.0.2:9400"),
373            None
374        );
375        assert_eq!(parse_leader_hint(""), None);
376        assert_eq!(
377            parse_leader_hint("conf change commit timeout on group 0"),
378            None
379        );
380    }
381
382    #[test]
383    fn parse_leader_hint_rejects_malformed_addr() {
384        assert_eq!(parse_leader_hint("not leader; retry at notanaddress"), None);
385        assert_eq!(parse_leader_hint("not leader; retry at "), None);
386        assert_eq!(parse_leader_hint("not leader; retry at 10.0.0.1"), None);
387    }
388
389    #[test]
390    fn join_retry_policy_default_schedule() {
391        // Production default: 8 attempts, ceiling 32 s. Each delay is
392        // `32 s >> (8 - attempt)`, so the schedule halves down from
393        // the ceiling toward the first attempt.
394        let policy = JoinRetryPolicy::default();
395        assert_eq!(policy.backoff_for(0), Duration::ZERO);
396        assert_eq!(policy.backoff_for(1), Duration::from_millis(250));
397        assert_eq!(policy.backoff_for(2), Duration::from_millis(500));
398        assert_eq!(policy.backoff_for(3), Duration::from_secs(1));
399        assert_eq!(policy.backoff_for(4), Duration::from_secs(2));
400        assert_eq!(policy.backoff_for(5), Duration::from_secs(4));
401        assert_eq!(policy.backoff_for(6), Duration::from_secs(8));
402        assert_eq!(policy.backoff_for(7), Duration::from_secs(16));
403        assert_eq!(policy.backoff_for(8), Duration::from_secs(32));
404        // Out-of-range attempt → no backoff.
405        assert_eq!(policy.backoff_for(9), Duration::ZERO);
406    }
407
408    #[test]
409    fn join_retry_policy_test_schedule_is_subsecond() {
410        // A typical test config: still 8 attempts, but a 2 s ceiling
411        // produces a sub-5-second total backoff window.
412        let policy = JoinRetryPolicy {
413            max_attempts: 8,
414            max_backoff_secs: 2,
415        };
416        // First few attempts are floored to 1 ms (they round down
417        // below a millisecond in raw shifts).
418        let total: Duration = (0..=policy.max_attempts)
419            .map(|a| policy.backoff_for(a))
420            .sum();
421        assert!(
422            total < Duration::from_secs(5),
423            "test schedule too slow: {total:?}"
424        );
425        // Final attempt sleeps the full ceiling.
426        assert_eq!(policy.backoff_for(8), Duration::from_secs(2));
427    }
428
429    // ── End-to-end bootstrap + join flow over QUIC ────────────────
430
431    #[tokio::test]
432    async fn full_bootstrap_join_flow() {
433        // Node 1 bootstraps, Node 2 joins via QUIC.
434        let t1 = Arc::new(NexarTransport::new(1, "127.0.0.1:0".parse().unwrap()).unwrap());
435        let t2 = Arc::new(NexarTransport::new(2, "127.0.0.1:0".parse().unwrap()).unwrap());
436
437        let (_dir1, catalog1) = temp_catalog();
438        let (_dir2, catalog2) = temp_catalog();
439
440        let addr1 = t1.local_addr();
441        let addr2 = t2.local_addr();
442
443        let config1 = ClusterConfig {
444            node_id: 1,
445            listen_addr: addr1,
446            seed_nodes: vec![addr1],
447            num_groups: 2,
448            replication_factor: 1,
449            data_dir: _dir1.path().to_path_buf(),
450            force_bootstrap: false,
451            join_retry: Default::default(),
452        };
453        let state1 = bootstrap(&config1, &catalog1).unwrap();
454
455        let topology1 = Arc::new(Mutex::new(state1.topology));
456        let routing1 = Arc::new(state1.routing);
457
458        struct JoinHandler {
459            topology: Arc<Mutex<ClusterTopology>>,
460            routing: Arc<RoutingTable>,
461        }
462
463        impl crate::transport::RaftRpcHandler for JoinHandler {
464            async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
465                match rpc {
466                    RaftRpc::JoinRequest(req) => {
467                        let mut topo = self.topology.lock().unwrap();
468                        let resp = handle_join_request(&req, &mut topo, &self.routing, 99);
469                        Ok(RaftRpc::JoinResponse(resp))
470                    }
471                    other => Err(ClusterError::Transport {
472                        detail: format!("unexpected: {other:?}"),
473                    }),
474                }
475            }
476        }
477
478        let handler = Arc::new(JoinHandler {
479            topology: topology1.clone(),
480            routing: routing1.clone(),
481        });
482
483        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
484        let t1c = t1.clone();
485        tokio::spawn(async move {
486            t1c.serve(handler, shutdown_rx).await.unwrap();
487        });
488
489        tokio::time::sleep(Duration::from_millis(30)).await;
490
491        let config2 = ClusterConfig {
492            node_id: 2,
493            listen_addr: addr2,
494            seed_nodes: vec![addr1],
495            num_groups: 2,
496            replication_factor: 1,
497            data_dir: _dir2.path().to_path_buf(),
498            force_bootstrap: false,
499            join_retry: Default::default(),
500        };
501
502        let lifecycle = ClusterLifecycleTracker::new();
503        let state2 = join(&config2, &catalog2, &t2, &lifecycle).await.unwrap();
504        // Lifecycle should have walked Joining{0} → [settled before
505        // `to_ready` which is the caller's responsibility].
506        assert!(matches!(
507            lifecycle.current(),
508            crate::lifecycle_state::ClusterLifecycleState::Joining { .. }
509        ));
510
511        assert_eq!(state2.topology.node_count(), 2);
512        assert_eq!(state2.routing.num_groups(), 2);
513
514        // Verify node 2's state was persisted (reorder check: catalog
515        // is saved before MultiRaft files are touched).
516        assert!(catalog2.load_topology().unwrap().is_some());
517        assert!(catalog2.load_routing().unwrap().is_some());
518
519        // Verify node 1's topology was updated.
520        let topo1 = topology1.lock().unwrap();
521        assert_eq!(topo1.node_count(), 2);
522        assert!(topo1.contains(2));
523
524        shutdown_tx.send(true).unwrap();
525    }
526}