Skip to main content

nodedb_cluster/bootstrap/
probe.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Bootstrap probe: deterministic "who should bootstrap" decision and
4//! the live Ping probe used to confirm the elected bootstrapper is up.
5//!
6//! # The rule
7//!
8//! To eliminate the `should_bootstrap` race where multiple seeds each
9//! saw "no other seed is up" and all bootstrapped disjoint clusters,
10//! we use a **deterministic elected-bootstrapper** rule:
11//!
12//! > The seed whose `SocketAddr` is lexicographically smallest is the
13//! > designated bootstrapper. Every other seed calls `join()`.
14//!
15//! `SocketAddr` has a total ordering (IPv4 octets compare before IPv6,
16//! and ports tie-break), so every node given the same seed list agrees
17//! on the same bootstrapper without any network round-trips — no race
18//! is possible.
19//!
20//! # The Ping probe
21//!
22//! When this node is **not** the designated bootstrapper, we still
23//! want to give the elected seed a short window to come up before
24//! entering the retry-backoff loop in `join()`. `ping_probe` sends a
25//! cheap, side-effect-free `RaftRpc::Ping` to the elected seed up to
26//! `MAX_PROBE_ATTEMPTS` times at `PROBE_INTERVAL` spacing. Any
27//! successful `Pong` response means the bootstrapper is alive — we
28//! immediately return `false` so the caller falls through to `join()`.
29//! If every attempt fails we still return `false` (the caller's join
30//! loop has its own retry schedule and will handle the slow-start
31//! case).
32//!
33//! # The force flag
34//!
35//! `ClusterConfig.force_bootstrap` is an operator escape hatch for
36//! disaster recovery — the designated bootstrapper has been lost
37//! permanently and the operator wants a different seed to take over.
38//! When set, `should_bootstrap` returns `true` without probing.
39
40use std::net::SocketAddr;
41use std::time::Duration;
42
43use tracing::{debug, info};
44
45use crate::rpc_codec::{PingRequest, RaftRpc};
46use crate::transport::NexarTransport;
47
48use super::config::ClusterConfig;
49
50/// How many Ping attempts to make against the designated bootstrapper
51/// before giving up and handing off to the caller's join loop.
52const MAX_PROBE_ATTEMPTS: u32 = 10;
53
54/// Delay between consecutive Ping attempts when the previous one
55/// failed. Tunes the probe cadence for the common case of "designated
56/// bootstrapper has not finished its first election yet".
57const PROBE_INTERVAL: Duration = Duration::from_millis(300);
58
59/// Hard per-attempt timeout. The underlying QUIC transport can block
60/// for seconds on an unreachable endpoint while it retries the
61/// handshake; bounding each attempt explicitly keeps the total
62/// `should_bootstrap` window predictable regardless of the transport's
63/// internal schedule.
64const PROBE_TIMEOUT: Duration = Duration::from_millis(200);
65
66/// Decide whether this node should bootstrap a new cluster.
67///
68/// Returns `true` iff any of the following hold:
69///
70/// 1. `config.force_bootstrap` is set (operator escape hatch).
71/// 2. This node's `listen_addr` is the lexicographically smallest
72///    entry in `config.seed_nodes` — the deterministic elected
73///    bootstrapper.
74///
75/// Returns `false` in every other case, including when the designated
76/// bootstrapper is currently unreachable: the caller's `join()` loop
77/// owns its own retry schedule and is the correct place to wait for
78/// the bootstrapper to come up.
79///
80/// This function performs a live Ping probe **only** when it is about
81/// to return `false` anyway — the probe is best-effort observability
82/// and logging, not a decision input. With a deterministic rule there
83/// is nothing to race on.
84pub(super) async fn should_bootstrap(config: &ClusterConfig, transport: &NexarTransport) -> bool {
85    if config.force_bootstrap {
86        info!(
87            node_id = config.node_id,
88            listen_addr = %config.listen_addr,
89            "force_bootstrap flag set — bootstrapping unconditionally"
90        );
91        return true;
92    }
93
94    let designated = match designated_bootstrapper(&config.seed_nodes) {
95        Some(addr) => addr,
96        None => {
97            // Empty seed list — caller already treats this as an
98            // implicit single-node bootstrap (`seed_nodes = [self]`
99            // fallback in `TestNode::spawn` / the main binary's
100            // config layer). Bootstrap is the only reasonable choice.
101            return true;
102        }
103    };
104
105    if designated == config.listen_addr {
106        info!(
107            node_id = config.node_id,
108            listen_addr = %config.listen_addr,
109            "this node is the designated bootstrapper"
110        );
111        return true;
112    }
113
114    info!(
115        node_id = config.node_id,
116        listen_addr = %config.listen_addr,
117        %designated,
118        "deferring to designated bootstrapper; probing for liveness"
119    );
120
121    // Non-blocking best-effort probe — each attempt is bounded by
122    // PROBE_TIMEOUT, so the total window is at most
123    // MAX_PROBE_ATTEMPTS * (PROBE_TIMEOUT + PROBE_INTERVAL). Exits
124    // early as soon as the bootstrapper answers, so the common case is
125    // a single sub-second round trip.
126    for attempt in 0..MAX_PROBE_ATTEMPTS {
127        let probe_result = tokio::time::timeout(
128            PROBE_TIMEOUT,
129            ping_probe(designated, transport, config.node_id),
130        )
131        .await;
132
133        match probe_result {
134            Ok(Ok(())) => {
135                info!(
136                    node_id = config.node_id,
137                    %designated,
138                    attempt,
139                    "designated bootstrapper is up"
140                );
141                return false;
142            }
143            Ok(Err(e)) => {
144                debug!(
145                    node_id = config.node_id,
146                    %designated,
147                    attempt,
148                    error = %e,
149                    "ping probe failed"
150                );
151            }
152            Err(_elapsed) => {
153                debug!(
154                    node_id = config.node_id,
155                    %designated,
156                    attempt,
157                    timeout_ms = PROBE_TIMEOUT.as_millis() as u64,
158                    "ping probe timed out"
159                );
160            }
161        }
162        if attempt + 1 < MAX_PROBE_ATTEMPTS {
163            tokio::time::sleep(PROBE_INTERVAL).await;
164        }
165    }
166
167    info!(
168        node_id = config.node_id,
169        %designated,
170        "designated bootstrapper did not respond; proceeding to join loop"
171    );
172    false
173}
174
175/// Look up the lexicographically smallest seed address, which is the
176/// designated bootstrapper under the single-elected-bootstrapper rule.
177///
178/// Pure function — exported for unit testing.
179pub(super) fn designated_bootstrapper(seed_nodes: &[SocketAddr]) -> Option<SocketAddr> {
180    seed_nodes.iter().min().copied()
181}
182
183/// Send a single Ping RPC to `addr` and wait for the response.
184///
185/// Returns `Ok(())` on any successful `Pong` reply, or an error
186/// describing the failure. Unlike the old `JoinRequest`-as-probe, Ping
187/// is idempotent and has no state-mutation intent.
188async fn ping_probe(
189    addr: SocketAddr,
190    transport: &NexarTransport,
191    self_node_id: u64,
192) -> crate::error::Result<()> {
193    let rpc = RaftRpc::Ping(PingRequest {
194        sender_id: self_node_id,
195        topology_version: 0,
196    });
197    match transport.send_rpc_to_addr(addr, rpc).await {
198        Ok(RaftRpc::Pong(_)) => Ok(()),
199        Ok(other) => Err(crate::error::ClusterError::Transport {
200            detail: format!("unexpected response to Ping from {addr}: {other:?}"),
201        }),
202        Err(e) => Err(e),
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use std::sync::Arc;
210
211    fn addr(s: &str) -> SocketAddr {
212        s.parse().unwrap()
213    }
214
215    fn cfg_with_seeds(node_id: u64, listen: &str, seeds: &[&str]) -> ClusterConfig {
216        // `data_dir` is not touched by `should_bootstrap` — the probe
217        // phase is network-only. A placeholder path is fine.
218        ClusterConfig {
219            node_id,
220            listen_addr: addr(listen),
221            seed_nodes: seeds.iter().map(|s| addr(s)).collect(),
222            num_groups: 2,
223            replication_factor: 1,
224            data_dir: std::env::temp_dir(),
225            force_bootstrap: false,
226            join_retry: Default::default(),
227            swim_udp_addr: None,
228            election_timeout_min: Duration::from_millis(150),
229            election_timeout_max: Duration::from_millis(300),
230            install_snapshot_chunk_bytes: 4 * 1024 * 1024,
231            orphan_partial_max_age_secs: 300,
232        }
233    }
234
235    #[test]
236    fn designated_bootstrapper_picks_smallest() {
237        let seeds = vec![
238            addr("10.0.0.3:9400"),
239            addr("10.0.0.1:9400"),
240            addr("10.0.0.2:9400"),
241        ];
242        assert_eq!(designated_bootstrapper(&seeds), Some(addr("10.0.0.1:9400")));
243    }
244
245    #[test]
246    fn designated_bootstrapper_empty_is_none() {
247        assert!(designated_bootstrapper(&[]).is_none());
248    }
249
250    #[test]
251    fn designated_bootstrapper_tie_break_by_port() {
252        // Same IP, different ports — smaller port wins.
253        let seeds = vec![addr("10.0.0.1:9401"), addr("10.0.0.1:9400")];
254        assert_eq!(designated_bootstrapper(&seeds), Some(addr("10.0.0.1:9400")));
255    }
256
257    #[tokio::test]
258    async fn should_bootstrap_when_self_is_lowest_seed() {
259        let cfg = cfg_with_seeds(
260            1,
261            "10.0.0.1:9400",
262            &["10.0.0.1:9400", "10.0.0.2:9400", "10.0.0.3:9400"],
263        );
264        use crate::transport::credentials::TransportCredentials;
265        let transport = Arc::new(
266            NexarTransport::new(
267                1,
268                "127.0.0.1:0".parse().unwrap(),
269                TransportCredentials::Insecure,
270            )
271            .unwrap(),
272        );
273        assert!(should_bootstrap(&cfg, &transport).await);
274    }
275
276    #[tokio::test]
277    async fn force_bootstrap_overrides_rule() {
278        use crate::transport::credentials::TransportCredentials;
279        // Not the lowest seed, but force flag is set.
280        let mut cfg = cfg_with_seeds(
281            3,
282            "10.0.0.3:9400",
283            &["10.0.0.1:9400", "10.0.0.2:9400", "10.0.0.3:9400"],
284        );
285        cfg.force_bootstrap = true;
286        let transport = Arc::new(
287            NexarTransport::new(
288                3,
289                "127.0.0.1:0".parse().unwrap(),
290                TransportCredentials::Insecure,
291            )
292            .unwrap(),
293        );
294        assert!(should_bootstrap(&cfg, &transport).await);
295    }
296
297    #[tokio::test]
298    async fn should_bootstrap_false_when_designated_unreachable() {
299        // Not the lowest seed, force flag unset, designated bootstrapper
300        // (10.0.0.1:9400) is unreachable — probe should eventually fail
301        // and return `false` so the caller proceeds to the join loop.
302        //
303        // Use `127.0.0.1` addresses to keep the probe on loopback but
304        // dial a port we know nobody is bound on to force an error.
305        //
306        // Override MAX_PROBE_ATTEMPTS indirectly by pointing at a seed
307        // address this test owns: we construct the config with a
308        // non-routable designated seed (`127.0.0.1:1` which is below
309        // the privileged-port range and nothing can bind there without
310        // root). If the probe quietly succeeds that's a bug.
311        use crate::transport::credentials::TransportCredentials;
312        let cfg = cfg_with_seeds(2, "127.0.0.1:9400", &["127.0.0.1:1", "127.0.0.1:9400"]);
313        let transport = Arc::new(
314            NexarTransport::new(
315                2,
316                "127.0.0.1:0".parse().unwrap(),
317                TransportCredentials::Insecure,
318            )
319            .unwrap(),
320        );
321        // This test is bounded by MAX_PROBE_ATTEMPTS * PROBE_INTERVAL
322        // ~= 5 s. Wrap in a timeout so a regression hangs instead of
323        // stalling the whole test suite.
324        let result =
325            tokio::time::timeout(Duration::from_secs(8), should_bootstrap(&cfg, &transport))
326                .await
327                .expect("should_bootstrap should not hang");
328        assert!(!result);
329    }
330}