Skip to main content

nodedb_cluster/bootstrap/
join.rs

1//! Join path: contact seeds, receive full cluster state, apply locally.
2//!
3//! The join loop is deliberately robust against two realistic cluster
4//! startup failure modes:
5//!
6//! 1. **Slow start**: the designated bootstrapper has not yet
7//!    completed its first Raft election when this node first calls
8//!    `join()`. Every seed may return "unreachable" or "not leader"
9//!    for a brief window. We retry the whole loop with exponential
10//!    backoff so the join eventually succeeds without operator
11//!    intervention.
12//!
13//! 2. **Leader redirect**: the seed we contacted is alive but isn't
14//!    the group-0 leader. It returns
15//!    `JoinResponse { success: false, error: "not leader; retry at <addr>" }`
16//!    and we follow the hint up to a small number of hops before
17//!    falling through to the next seed. The string format is the
18//!    contract set by `raft_loop::join::join_flow` — keep this parser
19//!    in lock-step with that producer.
20
21use std::collections::HashSet;
22use std::net::SocketAddr;
23
24use tracing::{debug, info, warn};
25
26use crate::catalog::ClusterCatalog;
27use crate::error::{ClusterError, Result};
28use crate::lifecycle_state::ClusterLifecycleTracker;
29use crate::multi_raft::MultiRaft;
30use crate::routing::{GroupInfo, RoutingTable};
31use crate::rpc_codec::{JoinRequest, JoinResponse, LEADER_REDIRECT_PREFIX, RaftRpc};
32use crate::topology::{ClusterTopology, NodeInfo, NodeState};
33use crate::transport::NexarTransport;
34
35use super::config::{ClusterConfig, ClusterState};
36
37/// Maximum number of leader-redirect hops inside a single join
38/// attempt. The redirect chain starts at whichever seed we first
39/// contact; each hop costs a round-trip, so keep this small.
40const MAX_REDIRECTS_PER_ATTEMPT: u32 = 3;
41
42/// Parse a `JoinResponse::error` string as a leader redirect hint.
43///
44/// The prefix is defined as a shared constant in `rpc_codec`
45/// (`LEADER_REDIRECT_PREFIX`) so the producer side
46/// (`raft_loop::join::join_flow`) and this consumer can never
47/// drift. Any other kind of rejection (collision, parse error,
48/// catalog persist failure, commit timeout, etc.) is treated as
49/// a hard failure that bubbles through the normal error path.
50///
51/// Returns `None` for any string that doesn't start with the
52/// expected prefix, or where the address portion does not parse
53/// as a valid `SocketAddr`.
54pub(crate) fn parse_leader_hint(error: &str) -> Option<SocketAddr> {
55    error
56        .strip_prefix(LEADER_REDIRECT_PREFIX)
57        .and_then(|s| s.trim().parse().ok())
58}
59
60/// Join an existing cluster by contacting seed nodes.
61///
62/// The loop has two layers:
63///
64/// - **Outer**: retry passes with exponential backoff per
65///   `config.join_retry`. Handles the "bootstrapper not up yet"
66///   startup race.
67/// - **Inner**: walk the seed list plus any leader-redirect hops for
68///   this attempt. A successful `JoinResponse` short-circuits the
69///   whole function; failures on one candidate fall through to the
70///   next.
71pub(super) async fn join(
72    config: &ClusterConfig,
73    catalog: &ClusterCatalog,
74    transport: &NexarTransport,
75    lifecycle: &ClusterLifecycleTracker,
76) -> Result<ClusterState> {
77    info!(
78        node_id = config.node_id,
79        seeds = ?config.seed_nodes,
80        "joining existing cluster"
81    );
82
83    if config.seed_nodes.is_empty() {
84        let err = ClusterError::Transport {
85            detail: "no seed nodes configured".into(),
86        };
87        lifecycle.to_failed(err.to_string());
88        return Err(err);
89    }
90
91    let req_template = JoinRequest {
92        node_id: config.node_id,
93        listen_addr: config.listen_addr.to_string(),
94        wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
95    };
96
97    let policy = config.join_retry;
98    let mut last_err: Option<ClusterError> = None;
99
100    for attempt in 0..policy.max_attempts {
101        lifecycle.to_joining(attempt);
102
103        let delay = policy.backoff_for(attempt);
104        if !delay.is_zero() {
105            debug!(
106                node_id = config.node_id,
107                attempt,
108                delay_ms = delay.as_millis() as u64,
109                "backing off before next join attempt"
110            );
111            tokio::time::sleep(delay).await;
112        }
113
114        match try_join_once(config, catalog, transport, &req_template).await {
115            Ok(state) => return Ok(state),
116            Err(e) => {
117                warn!(
118                    node_id = config.node_id,
119                    attempt,
120                    error = %e,
121                    "join attempt failed; will retry"
122                );
123                last_err = Some(e);
124            }
125        }
126    }
127
128    let max_attempts = policy.max_attempts;
129    let err = last_err.unwrap_or_else(|| ClusterError::Transport {
130        detail: format!("join exhausted {max_attempts} attempts with no concrete error"),
131    });
132    lifecycle.to_failed(err.to_string());
133    Err(err)
134}
135
136/// One pass over the seed list plus up to `MAX_REDIRECTS_PER_ATTEMPT`
137/// leader-redirect hops. Returns `Ok(state)` on the first successful
138/// `JoinResponse` or an error describing the last failure in this
139/// attempt.
140async fn try_join_once(
141    config: &ClusterConfig,
142    catalog: &ClusterCatalog,
143    transport: &NexarTransport,
144    req_template: &JoinRequest,
145) -> Result<ClusterState> {
146    // Work list: try seeds in sorted order so the lexicographically
147    // smallest address — the designated bootstrapper under the
148    // single-elected-bootstrapper rule — is contacted first. This is
149    // critical during the initial 5-node race: every other seed points
150    // at a node that is itself still joining, so asking them first
151    // eats the full RPC timeout per non-bootstrapper before we reach
152    // the one peer that can actually answer. `HashSet` deduplicates
153    // so a redirect loop can't consume all attempts against the same
154    // address.
155    let mut work: std::collections::VecDeque<SocketAddr> =
156        config.seed_nodes.iter().copied().collect();
157    {
158        // Sort so the designated bootstrapper surfaces first. Leader
159        // redirects get prepended with push_front below, keeping the
160        // "most likely to answer" candidate at the head.
161        let mut sorted: Vec<SocketAddr> = work.drain(..).collect();
162        sorted.sort();
163        work.extend(sorted);
164    }
165    let mut visited: HashSet<SocketAddr> = HashSet::new();
166    let mut redirects: u32 = 0;
167    let mut last_err: Option<ClusterError> = None;
168
169    while let Some(addr) = work.pop_front() {
170        if !visited.insert(addr) {
171            continue;
172        }
173
174        let rpc = RaftRpc::JoinRequest(req_template.clone());
175        match transport.send_rpc_to_addr(addr, rpc).await {
176            Ok(RaftRpc::JoinResponse(resp)) => {
177                if resp.success {
178                    return apply_join_response(config, catalog, transport, &resp);
179                }
180                // Rejected — is it a leader redirect we can follow?
181                if let Some(leader) = parse_leader_hint(&resp.error) {
182                    if redirects < MAX_REDIRECTS_PER_ATTEMPT && !visited.contains(&leader) {
183                        info!(
184                            node_id = config.node_id,
185                            from = %addr,
186                            to = %leader,
187                            "following leader redirect"
188                        );
189                        redirects += 1;
190                        work.push_front(leader);
191                        continue;
192                    }
193                    debug!(
194                        node_id = config.node_id,
195                        from = %addr,
196                        leader = %leader,
197                        redirects,
198                        "redirect cap reached or loop detected; falling through"
199                    );
200                }
201                last_err = Some(ClusterError::Transport {
202                    detail: format!("join rejected by {addr}: {}", resp.error),
203                });
204            }
205            Ok(other) => {
206                last_err = Some(ClusterError::Transport {
207                    detail: format!("unexpected response from {addr}: {other:?}"),
208                });
209            }
210            Err(e) => {
211                debug!(%addr, error = %e, "seed unreachable");
212                last_err = Some(e);
213            }
214        }
215    }
216
217    Err(last_err.unwrap_or_else(|| ClusterError::Transport {
218        detail: "no seed nodes produced a response".into(),
219    }))
220}
221
222/// Apply a JoinResponse: reconstruct topology, routing, and MultiRaft
223/// from wire data.
224///
225/// Order of operations is load-bearing for crash safety:
226///
227/// 1. Reconstruct the `ClusterTopology` and `RoutingTable` in memory.
228/// 2. Persist topology + routing to the catalog **first**, before any
229///    on-disk side effects. If we crash after this step, the next
230///    boot sees `catalog.is_bootstrapped() == true` and takes the
231///    `restart()` path, which reconstructs cleanly from the catalog.
232/// 3. Create the `MultiRaft` and add groups. `add_group` opens redb
233///    files on disk per group; these are idempotent per group id, so
234///    a crash mid-way leaves a recoverable state.
235/// 4. Register every peer address in the transport before returning
236///    so the first outgoing AppendEntries has a known destination.
237fn apply_join_response(
238    config: &ClusterConfig,
239    catalog: &ClusterCatalog,
240    transport: &NexarTransport,
241    resp: &JoinResponse,
242) -> Result<ClusterState> {
243    // 1. Reconstruct topology.
244    let mut topology = ClusterTopology::new();
245    for node in &resp.nodes {
246        let state = NodeState::from_u8(node.state).unwrap_or(NodeState::Active);
247        let mut info = NodeInfo {
248            node_id: node.node_id,
249            addr: node.addr.clone(),
250            state,
251            raft_groups: node.raft_groups.clone(),
252            wire_version: node.wire_version,
253        };
254        if node.node_id == config.node_id {
255            info.state = NodeState::Active;
256        }
257        topology.add_node(info);
258    }
259
260    // 1. Reconstruct routing table.
261    let mut group_members = std::collections::HashMap::new();
262    for g in &resp.groups {
263        group_members.insert(
264            g.group_id,
265            GroupInfo {
266                leader: g.leader,
267                members: g.members.clone(),
268                learners: g.learners.clone(),
269            },
270        );
271    }
272    let routing = RoutingTable::from_parts(resp.vshard_to_group.clone(), group_members);
273
274    // 2. Persist to catalog before any on-disk Raft side effects.
275    //    Cluster id is written first so `is_bootstrapped()` returns
276    //    `true` on any subsequent boot — without this, a joined node
277    //    that restarts would re-enter the bootstrap/join path
278    //    instead of taking `restart()`. Zero is a valid marker: the
279    //    joining node's catalog now carries `Some(0)` for
280    //    `load_cluster_id`, which is enough for the restart
281    //    dispatcher.
282    catalog.save_cluster_id(resp.cluster_id)?;
283    catalog.save_topology(&topology)?;
284    catalog.save_routing(&routing)?;
285
286    // 3. Create MultiRaft — join any group that includes this node,
287    //    either as a voter (group members) or as a learner (group
288    //    learners). A learner-started group boots in the `Learner`
289    //    role and will not run an election until a subsequent
290    //    `PromoteLearner` conf change is applied.
291    let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone())
292        .with_election_timeout(config.election_timeout_min, config.election_timeout_max);
293    for g in &resp.groups {
294        let is_voter = g.members.contains(&config.node_id);
295        let is_learner = g.learners.contains(&config.node_id);
296
297        if is_voter {
298            let peers: Vec<u64> = g
299                .members
300                .iter()
301                .copied()
302                .filter(|&id| id != config.node_id)
303                .collect();
304            multi_raft.add_group(g.group_id, peers)?;
305        } else if is_learner {
306            let voters = g.members.clone();
307            let other_learners: Vec<u64> = g
308                .learners
309                .iter()
310                .copied()
311                .filter(|&id| id != config.node_id)
312                .collect();
313            multi_raft.add_group_as_learner(g.group_id, voters, other_learners)?;
314        }
315    }
316
317    // 4. Register peer addresses in the transport.
318    for node in &resp.nodes {
319        if node.node_id != config.node_id
320            && let Ok(addr) = node.addr.parse::<SocketAddr>()
321        {
322            transport.register_peer(node.node_id, addr);
323        }
324    }
325
326    info!(
327        node_id = config.node_id,
328        nodes = topology.node_count(),
329        groups = routing.num_groups(),
330        "joined cluster"
331    );
332
333    Ok(ClusterState {
334        topology,
335        routing,
336        multi_raft,
337    })
338}
339
340#[cfg(test)]
341mod tests {
342    use super::super::bootstrap_fn::bootstrap;
343    use super::super::config::JoinRetryPolicy;
344    use super::super::handle_join::handle_join_request;
345    use super::*;
346    use std::sync::{Arc, Mutex};
347    use std::time::Duration;
348
349    fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
350        let dir = tempfile::tempdir().unwrap();
351        let path = dir.path().join("cluster.redb");
352        let catalog = ClusterCatalog::open(&path).unwrap();
353        (dir, catalog)
354    }
355
356    // ── Pure-function tests ───────────────────────────────────────
357
358    #[test]
359    fn parse_leader_hint_extracts_valid_addr() {
360        assert_eq!(
361            parse_leader_hint("not leader; retry at 10.0.0.1:9400"),
362            Some("10.0.0.1:9400".parse().unwrap())
363        );
364        assert_eq!(
365            parse_leader_hint("not leader; retry at 127.0.0.1:65535"),
366            Some("127.0.0.1:65535".parse().unwrap())
367        );
368    }
369
370    #[test]
371    fn parse_leader_hint_rejects_unrelated_error() {
372        assert_eq!(
373            parse_leader_hint("node_id 2 already registered with different address 10.0.0.2:9400"),
374            None
375        );
376        assert_eq!(parse_leader_hint(""), None);
377        assert_eq!(
378            parse_leader_hint("conf change commit timeout on group 0"),
379            None
380        );
381    }
382
383    #[test]
384    fn parse_leader_hint_rejects_malformed_addr() {
385        assert_eq!(parse_leader_hint("not leader; retry at notanaddress"), None);
386        assert_eq!(parse_leader_hint("not leader; retry at "), None);
387        assert_eq!(parse_leader_hint("not leader; retry at 10.0.0.1"), None);
388    }
389
390    #[test]
391    fn join_retry_policy_default_schedule() {
392        // Production default: 8 attempts, ceiling 32 s. Each delay is
393        // `32 s >> (8 - attempt)`, so the schedule halves down from
394        // the ceiling toward the first attempt.
395        let policy = JoinRetryPolicy::default();
396        assert_eq!(policy.backoff_for(0), Duration::ZERO);
397        assert_eq!(policy.backoff_for(1), Duration::from_millis(250));
398        assert_eq!(policy.backoff_for(2), Duration::from_millis(500));
399        assert_eq!(policy.backoff_for(3), Duration::from_secs(1));
400        assert_eq!(policy.backoff_for(4), Duration::from_secs(2));
401        assert_eq!(policy.backoff_for(5), Duration::from_secs(4));
402        assert_eq!(policy.backoff_for(6), Duration::from_secs(8));
403        assert_eq!(policy.backoff_for(7), Duration::from_secs(16));
404        assert_eq!(policy.backoff_for(8), Duration::from_secs(32));
405        // Out-of-range attempt → no backoff.
406        assert_eq!(policy.backoff_for(9), Duration::ZERO);
407    }
408
409    #[test]
410    fn join_retry_policy_test_schedule_is_subsecond() {
411        // A typical test config: still 8 attempts, but a 2 s ceiling
412        // produces a sub-5-second total backoff window.
413        let policy = JoinRetryPolicy {
414            max_attempts: 8,
415            max_backoff_secs: 2,
416        };
417        // First few attempts are floored to 1 ms (they round down
418        // below a millisecond in raw shifts).
419        let total: Duration = (0..=policy.max_attempts)
420            .map(|a| policy.backoff_for(a))
421            .sum();
422        assert!(
423            total < Duration::from_secs(5),
424            "test schedule too slow: {total:?}"
425        );
426        // Final attempt sleeps the full ceiling.
427        assert_eq!(policy.backoff_for(8), Duration::from_secs(2));
428    }
429
430    // ── End-to-end bootstrap + join flow over QUIC ────────────────
431
432    #[tokio::test]
433    async fn full_bootstrap_join_flow() {
434        // Node 1 bootstraps, Node 2 joins via QUIC.
435        use crate::transport::credentials::TransportCredentials;
436        let t1 = Arc::new(
437            NexarTransport::new(
438                1,
439                "127.0.0.1:0".parse().unwrap(),
440                TransportCredentials::Insecure,
441            )
442            .unwrap(),
443        );
444        let t2 = Arc::new(
445            NexarTransport::new(
446                2,
447                "127.0.0.1:0".parse().unwrap(),
448                TransportCredentials::Insecure,
449            )
450            .unwrap(),
451        );
452
453        let (_dir1, catalog1) = temp_catalog();
454        let (_dir2, catalog2) = temp_catalog();
455
456        let addr1 = t1.local_addr();
457        let addr2 = t2.local_addr();
458
459        let config1 = ClusterConfig {
460            node_id: 1,
461            listen_addr: addr1,
462            seed_nodes: vec![addr1],
463            num_groups: 2,
464            replication_factor: 1,
465            data_dir: _dir1.path().to_path_buf(),
466            force_bootstrap: false,
467            join_retry: Default::default(),
468            swim_udp_addr: None,
469            election_timeout_min: Duration::from_millis(150),
470            election_timeout_max: Duration::from_millis(300),
471        };
472        let state1 = bootstrap(&config1, &catalog1).unwrap();
473
474        let topology1 = Arc::new(Mutex::new(state1.topology));
475        let routing1 = Arc::new(state1.routing);
476
477        struct JoinHandler {
478            topology: Arc<Mutex<ClusterTopology>>,
479            routing: Arc<RoutingTable>,
480        }
481
482        impl crate::transport::RaftRpcHandler for JoinHandler {
483            async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
484                match rpc {
485                    RaftRpc::JoinRequest(req) => {
486                        let mut topo = self.topology.lock().unwrap();
487                        let resp = handle_join_request(&req, &mut topo, &self.routing, 99);
488                        Ok(RaftRpc::JoinResponse(resp))
489                    }
490                    other => Err(ClusterError::Transport {
491                        detail: format!("unexpected: {other:?}"),
492                    }),
493                }
494            }
495        }
496
497        let handler = Arc::new(JoinHandler {
498            topology: topology1.clone(),
499            routing: routing1.clone(),
500        });
501
502        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
503        let t1c = t1.clone();
504        tokio::spawn(async move {
505            t1c.serve(handler, shutdown_rx).await.unwrap();
506        });
507
508        tokio::time::sleep(Duration::from_millis(30)).await;
509
510        let config2 = ClusterConfig {
511            node_id: 2,
512            listen_addr: addr2,
513            seed_nodes: vec![addr1],
514            num_groups: 2,
515            replication_factor: 1,
516            data_dir: _dir2.path().to_path_buf(),
517            force_bootstrap: false,
518            join_retry: Default::default(),
519            swim_udp_addr: None,
520            election_timeout_min: Duration::from_millis(150),
521            election_timeout_max: Duration::from_millis(300),
522        };
523
524        let lifecycle = ClusterLifecycleTracker::new();
525        let state2 = join(&config2, &catalog2, &t2, &lifecycle).await.unwrap();
526        // Lifecycle should have walked Joining{0} → [settled before
527        // `to_ready` which is the caller's responsibility].
528        assert!(matches!(
529            lifecycle.current(),
530            crate::lifecycle_state::ClusterLifecycleState::Joining { .. }
531        ));
532
533        assert_eq!(state2.topology.node_count(), 2);
534        assert_eq!(state2.routing.num_groups(), 2);
535
536        // Verify node 2's state was persisted (reorder check: catalog
537        // is saved before MultiRaft files are touched).
538        assert!(catalog2.load_topology().unwrap().is_some());
539        assert!(catalog2.load_routing().unwrap().is_some());
540
541        // Verify node 1's topology was updated.
542        let topo1 = topology1.lock().unwrap();
543        assert_eq!(topo1.node_count(), 2);
544        assert!(topo1.contains(2));
545
546        shutdown_tx.send(true).unwrap();
547    }
548}