nodedb_cluster/bootstrap/probe.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Bootstrap probe: deterministic "who should bootstrap" decision and
4//! the live Ping probe used to confirm the elected bootstrapper is up.
5//!
6//! # The rule
7//!
8//! To eliminate the `should_bootstrap` race where multiple seeds each
9//! saw "no other seed is up" and all bootstrapped disjoint clusters,
10//! we use a **deterministic elected-bootstrapper** rule:
11//!
12//! > The seed whose `SocketAddr` is lexicographically smallest is the
13//! > designated bootstrapper. Every other seed calls `join()`.
14//!
15//! `SocketAddr` has a total ordering (IPv4 octets compare before IPv6,
16//! and ports tie-break), so every node given the same seed list agrees
17//! on the same bootstrapper without any network round-trips — no race
18//! is possible.
19//!
20//! # The Ping probe
21//!
22//! When this node is **not** the designated bootstrapper, we still
23//! want to give the elected seed a short window to come up before
24//! entering the retry-backoff loop in `join()`. `ping_probe` sends a
25//! cheap, side-effect-free `RaftRpc::Ping` to the elected seed up to
26//! `MAX_PROBE_ATTEMPTS` times at `PROBE_INTERVAL` spacing. Any
27//! successful `Pong` response means the bootstrapper is alive — we
28//! immediately return `false` so the caller falls through to `join()`.
29//! If every attempt fails we still return `false` (the caller's join
30//! loop has its own retry schedule and will handle the slow-start
31//! case).
32//!
33//! # The force flag
34//!
35//! `ClusterConfig.force_bootstrap` is an operator escape hatch for
36//! disaster recovery — the designated bootstrapper has been lost
37//! permanently and the operator wants a different seed to take over.
38//! When set, `should_bootstrap` returns `true` without probing.
39
40use std::net::SocketAddr;
41use std::time::Duration;
42
43use tracing::{debug, info};
44
45use crate::rpc_codec::{PingRequest, RaftRpc};
46use crate::transport::NexarTransport;
47
48use super::config::ClusterConfig;
49
50/// How many Ping attempts to make against the designated bootstrapper
51/// before giving up and handing off to the caller's join loop.
52const MAX_PROBE_ATTEMPTS: u32 = 10;
53
54/// Delay between consecutive Ping attempts when the previous one
55/// failed. Tunes the probe cadence for the common case of "designated
56/// bootstrapper has not finished its first election yet".
57const PROBE_INTERVAL: Duration = Duration::from_millis(300);
58
59/// Hard per-attempt timeout. The underlying QUIC transport can block
60/// for seconds on an unreachable endpoint while it retries the
61/// handshake; bounding each attempt explicitly keeps the total
62/// `should_bootstrap` window predictable regardless of the transport's
63/// internal schedule.
64const PROBE_TIMEOUT: Duration = Duration::from_millis(200);
65
66/// Decide whether this node should bootstrap a new cluster.
67///
68/// Returns `true` iff any of the following hold:
69///
70/// 1. `config.force_bootstrap` is set (operator escape hatch).
71/// 2. This node's `listen_addr` is the lexicographically smallest
72/// entry in `config.seed_nodes` — the deterministic elected
73/// bootstrapper.
74///
75/// Returns `false` in every other case, including when the designated
76/// bootstrapper is currently unreachable: the caller's `join()` loop
77/// owns its own retry schedule and is the correct place to wait for
78/// the bootstrapper to come up.
79///
80/// This function performs a live Ping probe **only** when it is about
81/// to return `false` anyway — the probe is best-effort observability
82/// and logging, not a decision input. With a deterministic rule there
83/// is nothing to race on.
84pub(super) async fn should_bootstrap(config: &ClusterConfig, transport: &NexarTransport) -> bool {
85 if config.force_bootstrap {
86 info!(
87 node_id = config.node_id,
88 listen_addr = %config.listen_addr,
89 "force_bootstrap flag set — bootstrapping unconditionally"
90 );
91 return true;
92 }
93
94 let designated = match designated_bootstrapper(&config.seed_nodes) {
95 Some(addr) => addr,
96 None => {
97 // Empty seed list — caller already treats this as an
98 // implicit single-node bootstrap (`seed_nodes = [self]`
99 // fallback in `TestNode::spawn` / the main binary's
100 // config layer). Bootstrap is the only reasonable choice.
101 return true;
102 }
103 };
104
105 if designated == config.listen_addr {
106 info!(
107 node_id = config.node_id,
108 listen_addr = %config.listen_addr,
109 "this node is the designated bootstrapper"
110 );
111 return true;
112 }
113
114 info!(
115 node_id = config.node_id,
116 listen_addr = %config.listen_addr,
117 %designated,
118 "deferring to designated bootstrapper; probing for liveness"
119 );
120
121 // Non-blocking best-effort probe — each attempt is bounded by
122 // PROBE_TIMEOUT, so the total window is at most
123 // MAX_PROBE_ATTEMPTS * (PROBE_TIMEOUT + PROBE_INTERVAL). Exits
124 // early as soon as the bootstrapper answers, so the common case is
125 // a single sub-second round trip.
126 for attempt in 0..MAX_PROBE_ATTEMPTS {
127 let probe_result = tokio::time::timeout(
128 PROBE_TIMEOUT,
129 ping_probe(designated, transport, config.node_id),
130 )
131 .await;
132
133 match probe_result {
134 Ok(Ok(())) => {
135 info!(
136 node_id = config.node_id,
137 %designated,
138 attempt,
139 "designated bootstrapper is up"
140 );
141 return false;
142 }
143 Ok(Err(e)) => {
144 debug!(
145 node_id = config.node_id,
146 %designated,
147 attempt,
148 error = %e,
149 "ping probe failed"
150 );
151 }
152 Err(_elapsed) => {
153 debug!(
154 node_id = config.node_id,
155 %designated,
156 attempt,
157 timeout_ms = PROBE_TIMEOUT.as_millis() as u64,
158 "ping probe timed out"
159 );
160 }
161 }
162 if attempt + 1 < MAX_PROBE_ATTEMPTS {
163 tokio::time::sleep(PROBE_INTERVAL).await;
164 }
165 }
166
167 info!(
168 node_id = config.node_id,
169 %designated,
170 "designated bootstrapper did not respond; proceeding to join loop"
171 );
172 false
173}
174
175/// Look up the lexicographically smallest seed address, which is the
176/// designated bootstrapper under the single-elected-bootstrapper rule.
177///
178/// Pure function — exported for unit testing.
179pub(super) fn designated_bootstrapper(seed_nodes: &[SocketAddr]) -> Option<SocketAddr> {
180 seed_nodes.iter().min().copied()
181}
182
183/// Send a single Ping RPC to `addr` and wait for the response.
184///
185/// Returns `Ok(())` on any successful `Pong` reply, or an error
186/// describing the failure. Unlike the old `JoinRequest`-as-probe, Ping
187/// is idempotent and has no state-mutation intent.
188async fn ping_probe(
189 addr: SocketAddr,
190 transport: &NexarTransport,
191 self_node_id: u64,
192) -> crate::error::Result<()> {
193 let rpc = RaftRpc::Ping(PingRequest {
194 sender_id: self_node_id,
195 topology_version: 0,
196 });
197 match transport.send_rpc_to_addr(addr, rpc).await {
198 Ok(RaftRpc::Pong(_)) => Ok(()),
199 Ok(other) => Err(crate::error::ClusterError::Transport {
200 detail: format!("unexpected response to Ping from {addr}: {other:?}"),
201 }),
202 Err(e) => Err(e),
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use std::sync::Arc;
210
211 fn addr(s: &str) -> SocketAddr {
212 s.parse().unwrap()
213 }
214
215 fn cfg_with_seeds(node_id: u64, listen: &str, seeds: &[&str]) -> ClusterConfig {
216 // `data_dir` is not touched by `should_bootstrap` — the probe
217 // phase is network-only. A placeholder path is fine.
218 ClusterConfig {
219 node_id,
220 listen_addr: addr(listen),
221 seed_nodes: seeds.iter().map(|s| addr(s)).collect(),
222 num_groups: 2,
223 replication_factor: 1,
224 data_dir: std::env::temp_dir(),
225 force_bootstrap: false,
226 join_retry: Default::default(),
227 swim_udp_addr: None,
228 election_timeout_min: Duration::from_millis(150),
229 election_timeout_max: Duration::from_millis(300),
230 install_snapshot_chunk_bytes: 4 * 1024 * 1024,
231 orphan_partial_max_age_secs: 300,
232 }
233 }
234
235 #[test]
236 fn designated_bootstrapper_picks_smallest() {
237 let seeds = vec![
238 addr("10.0.0.3:9400"),
239 addr("10.0.0.1:9400"),
240 addr("10.0.0.2:9400"),
241 ];
242 assert_eq!(designated_bootstrapper(&seeds), Some(addr("10.0.0.1:9400")));
243 }
244
245 #[test]
246 fn designated_bootstrapper_empty_is_none() {
247 assert!(designated_bootstrapper(&[]).is_none());
248 }
249
250 #[test]
251 fn designated_bootstrapper_tie_break_by_port() {
252 // Same IP, different ports — smaller port wins.
253 let seeds = vec![addr("10.0.0.1:9401"), addr("10.0.0.1:9400")];
254 assert_eq!(designated_bootstrapper(&seeds), Some(addr("10.0.0.1:9400")));
255 }
256
257 #[tokio::test]
258 async fn should_bootstrap_when_self_is_lowest_seed() {
259 let cfg = cfg_with_seeds(
260 1,
261 "10.0.0.1:9400",
262 &["10.0.0.1:9400", "10.0.0.2:9400", "10.0.0.3:9400"],
263 );
264 use crate::transport::credentials::TransportCredentials;
265 let transport = Arc::new(
266 NexarTransport::new(
267 1,
268 "127.0.0.1:0".parse().unwrap(),
269 TransportCredentials::Insecure,
270 )
271 .unwrap(),
272 );
273 assert!(should_bootstrap(&cfg, &transport).await);
274 }
275
276 #[tokio::test]
277 async fn force_bootstrap_overrides_rule() {
278 use crate::transport::credentials::TransportCredentials;
279 // Not the lowest seed, but force flag is set.
280 let mut cfg = cfg_with_seeds(
281 3,
282 "10.0.0.3:9400",
283 &["10.0.0.1:9400", "10.0.0.2:9400", "10.0.0.3:9400"],
284 );
285 cfg.force_bootstrap = true;
286 let transport = Arc::new(
287 NexarTransport::new(
288 3,
289 "127.0.0.1:0".parse().unwrap(),
290 TransportCredentials::Insecure,
291 )
292 .unwrap(),
293 );
294 assert!(should_bootstrap(&cfg, &transport).await);
295 }
296
297 #[tokio::test]
298 async fn should_bootstrap_false_when_designated_unreachable() {
299 // Not the lowest seed, force flag unset, designated bootstrapper
300 // (10.0.0.1:9400) is unreachable — probe should eventually fail
301 // and return `false` so the caller proceeds to the join loop.
302 //
303 // Use `127.0.0.1` addresses to keep the probe on loopback but
304 // dial a port we know nobody is bound on to force an error.
305 //
306 // Override MAX_PROBE_ATTEMPTS indirectly by pointing at a seed
307 // address this test owns: we construct the config with a
308 // non-routable designated seed (`127.0.0.1:1` which is below
309 // the privileged-port range and nothing can bind there without
310 // root). If the probe quietly succeeds that's a bug.
311 use crate::transport::credentials::TransportCredentials;
312 let cfg = cfg_with_seeds(2, "127.0.0.1:9400", &["127.0.0.1:1", "127.0.0.1:9400"]);
313 let transport = Arc::new(
314 NexarTransport::new(
315 2,
316 "127.0.0.1:0".parse().unwrap(),
317 TransportCredentials::Insecure,
318 )
319 .unwrap(),
320 );
321 // This test is bounded by MAX_PROBE_ATTEMPTS * PROBE_INTERVAL
322 // ~= 5 s. Wrap in a timeout so a regression hangs instead of
323 // stalling the whole test suite.
324 let result =
325 tokio::time::timeout(Duration::from_secs(8), should_bootstrap(&cfg, &transport))
326 .await
327 .expect("should_bootstrap should not hang");
328 assert!(!result);
329 }
330}