nodedb_cluster/bootstrap/
join.rs1use std::collections::HashSet;
22use std::net::SocketAddr;
23
24use tracing::{debug, info, warn};
25
26use crate::catalog::ClusterCatalog;
27use crate::error::{ClusterError, Result};
28use crate::lifecycle_state::ClusterLifecycleTracker;
29use crate::multi_raft::MultiRaft;
30use crate::routing::{GroupInfo, RoutingTable};
31use crate::rpc_codec::{JoinRequest, JoinResponse, LEADER_REDIRECT_PREFIX, RaftRpc};
32use crate::topology::{ClusterTopology, NodeInfo, NodeState};
33use crate::transport::NexarTransport;
34
35use super::config::{ClusterConfig, ClusterState};
36
37const MAX_REDIRECTS_PER_ATTEMPT: u32 = 3;
41
42pub(crate) fn parse_leader_hint(error: &str) -> Option<SocketAddr> {
55 error
56 .strip_prefix(LEADER_REDIRECT_PREFIX)
57 .and_then(|s| s.trim().parse().ok())
58}
59
60pub(super) async fn join(
72 config: &ClusterConfig,
73 catalog: &ClusterCatalog,
74 transport: &NexarTransport,
75 lifecycle: &ClusterLifecycleTracker,
76) -> Result<ClusterState> {
77 info!(
78 node_id = config.node_id,
79 seeds = ?config.seed_nodes,
80 "joining existing cluster"
81 );
82
83 if config.seed_nodes.is_empty() {
84 let err = ClusterError::Transport {
85 detail: "no seed nodes configured".into(),
86 };
87 lifecycle.to_failed(err.to_string());
88 return Err(err);
89 }
90
91 let req_template = JoinRequest {
92 node_id: config.node_id,
93 listen_addr: config.listen_addr.to_string(),
94 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
95 };
96
97 let policy = config.join_retry;
98 let mut last_err: Option<ClusterError> = None;
99
100 for attempt in 0..policy.max_attempts {
101 lifecycle.to_joining(attempt);
102
103 let delay = policy.backoff_for(attempt);
104 if !delay.is_zero() {
105 debug!(
106 node_id = config.node_id,
107 attempt,
108 delay_ms = delay.as_millis() as u64,
109 "backing off before next join attempt"
110 );
111 tokio::time::sleep(delay).await;
112 }
113
114 match try_join_once(config, catalog, transport, &req_template).await {
115 Ok(state) => return Ok(state),
116 Err(e) => {
117 warn!(
118 node_id = config.node_id,
119 attempt,
120 error = %e,
121 "join attempt failed; will retry"
122 );
123 last_err = Some(e);
124 }
125 }
126 }
127
128 let max_attempts = policy.max_attempts;
129 let err = last_err.unwrap_or_else(|| ClusterError::Transport {
130 detail: format!("join exhausted {max_attempts} attempts with no concrete error"),
131 });
132 lifecycle.to_failed(err.to_string());
133 Err(err)
134}
135
136async fn try_join_once(
141 config: &ClusterConfig,
142 catalog: &ClusterCatalog,
143 transport: &NexarTransport,
144 req_template: &JoinRequest,
145) -> Result<ClusterState> {
146 let mut work: std::collections::VecDeque<SocketAddr> =
156 config.seed_nodes.iter().copied().collect();
157 {
158 let mut sorted: Vec<SocketAddr> = work.drain(..).collect();
162 sorted.sort();
163 work.extend(sorted);
164 }
165 let mut visited: HashSet<SocketAddr> = HashSet::new();
166 let mut redirects: u32 = 0;
167 let mut last_err: Option<ClusterError> = None;
168
169 while let Some(addr) = work.pop_front() {
170 if !visited.insert(addr) {
171 continue;
172 }
173
174 let rpc = RaftRpc::JoinRequest(req_template.clone());
175 match transport.send_rpc_to_addr(addr, rpc).await {
176 Ok(RaftRpc::JoinResponse(resp)) => {
177 if resp.success {
178 return apply_join_response(config, catalog, transport, &resp);
179 }
180 if let Some(leader) = parse_leader_hint(&resp.error) {
182 if redirects < MAX_REDIRECTS_PER_ATTEMPT && !visited.contains(&leader) {
183 info!(
184 node_id = config.node_id,
185 from = %addr,
186 to = %leader,
187 "following leader redirect"
188 );
189 redirects += 1;
190 work.push_front(leader);
191 continue;
192 }
193 debug!(
194 node_id = config.node_id,
195 from = %addr,
196 leader = %leader,
197 redirects,
198 "redirect cap reached or loop detected; falling through"
199 );
200 }
201 last_err = Some(ClusterError::Transport {
202 detail: format!("join rejected by {addr}: {}", resp.error),
203 });
204 }
205 Ok(other) => {
206 last_err = Some(ClusterError::Transport {
207 detail: format!("unexpected response from {addr}: {other:?}"),
208 });
209 }
210 Err(e) => {
211 debug!(%addr, error = %e, "seed unreachable");
212 last_err = Some(e);
213 }
214 }
215 }
216
217 Err(last_err.unwrap_or_else(|| ClusterError::Transport {
218 detail: "no seed nodes produced a response".into(),
219 }))
220}
221
222fn apply_join_response(
238 config: &ClusterConfig,
239 catalog: &ClusterCatalog,
240 transport: &NexarTransport,
241 resp: &JoinResponse,
242) -> Result<ClusterState> {
243 let mut topology = ClusterTopology::new();
245 for node in &resp.nodes {
246 let state = NodeState::from_u8(node.state).unwrap_or(NodeState::Active);
247 let mut info = NodeInfo {
248 node_id: node.node_id,
249 addr: node.addr.clone(),
250 state,
251 raft_groups: node.raft_groups.clone(),
252 wire_version: node.wire_version,
253 };
254 if node.node_id == config.node_id {
255 info.state = NodeState::Active;
256 }
257 topology.add_node(info);
258 }
259
260 let mut group_members = std::collections::HashMap::new();
262 for g in &resp.groups {
263 group_members.insert(
264 g.group_id,
265 GroupInfo {
266 leader: g.leader,
267 members: g.members.clone(),
268 learners: g.learners.clone(),
269 },
270 );
271 }
272 let routing = RoutingTable::from_parts(resp.vshard_to_group.clone(), group_members);
273
274 catalog.save_cluster_id(resp.cluster_id)?;
283 catalog.save_topology(&topology)?;
284 catalog.save_routing(&routing)?;
285
286 let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone())
292 .with_election_timeout(config.election_timeout_min, config.election_timeout_max);
293 for g in &resp.groups {
294 let is_voter = g.members.contains(&config.node_id);
295 let is_learner = g.learners.contains(&config.node_id);
296
297 if is_voter {
298 let peers: Vec<u64> = g
299 .members
300 .iter()
301 .copied()
302 .filter(|&id| id != config.node_id)
303 .collect();
304 multi_raft.add_group(g.group_id, peers)?;
305 } else if is_learner {
306 let voters = g.members.clone();
307 let other_learners: Vec<u64> = g
308 .learners
309 .iter()
310 .copied()
311 .filter(|&id| id != config.node_id)
312 .collect();
313 multi_raft.add_group_as_learner(g.group_id, voters, other_learners)?;
314 }
315 }
316
317 for node in &resp.nodes {
319 if node.node_id != config.node_id
320 && let Ok(addr) = node.addr.parse::<SocketAddr>()
321 {
322 transport.register_peer(node.node_id, addr);
323 }
324 }
325
326 info!(
327 node_id = config.node_id,
328 nodes = topology.node_count(),
329 groups = routing.num_groups(),
330 "joined cluster"
331 );
332
333 Ok(ClusterState {
334 topology,
335 routing,
336 multi_raft,
337 })
338}
339
340#[cfg(test)]
341mod tests {
342 use super::super::bootstrap_fn::bootstrap;
343 use super::super::config::JoinRetryPolicy;
344 use super::super::handle_join::handle_join_request;
345 use super::*;
346 use std::sync::{Arc, Mutex};
347 use std::time::Duration;
348
349 fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
350 let dir = tempfile::tempdir().unwrap();
351 let path = dir.path().join("cluster.redb");
352 let catalog = ClusterCatalog::open(&path).unwrap();
353 (dir, catalog)
354 }
355
356 #[test]
359 fn parse_leader_hint_extracts_valid_addr() {
360 assert_eq!(
361 parse_leader_hint("not leader; retry at 10.0.0.1:9400"),
362 Some("10.0.0.1:9400".parse().unwrap())
363 );
364 assert_eq!(
365 parse_leader_hint("not leader; retry at 127.0.0.1:65535"),
366 Some("127.0.0.1:65535".parse().unwrap())
367 );
368 }
369
370 #[test]
371 fn parse_leader_hint_rejects_unrelated_error() {
372 assert_eq!(
373 parse_leader_hint("node_id 2 already registered with different address 10.0.0.2:9400"),
374 None
375 );
376 assert_eq!(parse_leader_hint(""), None);
377 assert_eq!(
378 parse_leader_hint("conf change commit timeout on group 0"),
379 None
380 );
381 }
382
383 #[test]
384 fn parse_leader_hint_rejects_malformed_addr() {
385 assert_eq!(parse_leader_hint("not leader; retry at notanaddress"), None);
386 assert_eq!(parse_leader_hint("not leader; retry at "), None);
387 assert_eq!(parse_leader_hint("not leader; retry at 10.0.0.1"), None);
388 }
389
390 #[test]
391 fn join_retry_policy_default_schedule() {
392 let policy = JoinRetryPolicy::default();
396 assert_eq!(policy.backoff_for(0), Duration::ZERO);
397 assert_eq!(policy.backoff_for(1), Duration::from_millis(250));
398 assert_eq!(policy.backoff_for(2), Duration::from_millis(500));
399 assert_eq!(policy.backoff_for(3), Duration::from_secs(1));
400 assert_eq!(policy.backoff_for(4), Duration::from_secs(2));
401 assert_eq!(policy.backoff_for(5), Duration::from_secs(4));
402 assert_eq!(policy.backoff_for(6), Duration::from_secs(8));
403 assert_eq!(policy.backoff_for(7), Duration::from_secs(16));
404 assert_eq!(policy.backoff_for(8), Duration::from_secs(32));
405 assert_eq!(policy.backoff_for(9), Duration::ZERO);
407 }
408
409 #[test]
410 fn join_retry_policy_test_schedule_is_subsecond() {
411 let policy = JoinRetryPolicy {
414 max_attempts: 8,
415 max_backoff_secs: 2,
416 };
417 let total: Duration = (0..=policy.max_attempts)
420 .map(|a| policy.backoff_for(a))
421 .sum();
422 assert!(
423 total < Duration::from_secs(5),
424 "test schedule too slow: {total:?}"
425 );
426 assert_eq!(policy.backoff_for(8), Duration::from_secs(2));
428 }
429
430 #[tokio::test]
433 async fn full_bootstrap_join_flow() {
434 use crate::transport::credentials::TransportCredentials;
436 let t1 = Arc::new(
437 NexarTransport::new(
438 1,
439 "127.0.0.1:0".parse().unwrap(),
440 TransportCredentials::Insecure,
441 )
442 .unwrap(),
443 );
444 let t2 = Arc::new(
445 NexarTransport::new(
446 2,
447 "127.0.0.1:0".parse().unwrap(),
448 TransportCredentials::Insecure,
449 )
450 .unwrap(),
451 );
452
453 let (_dir1, catalog1) = temp_catalog();
454 let (_dir2, catalog2) = temp_catalog();
455
456 let addr1 = t1.local_addr();
457 let addr2 = t2.local_addr();
458
459 let config1 = ClusterConfig {
460 node_id: 1,
461 listen_addr: addr1,
462 seed_nodes: vec![addr1],
463 num_groups: 2,
464 replication_factor: 1,
465 data_dir: _dir1.path().to_path_buf(),
466 force_bootstrap: false,
467 join_retry: Default::default(),
468 swim_udp_addr: None,
469 election_timeout_min: Duration::from_millis(150),
470 election_timeout_max: Duration::from_millis(300),
471 };
472 let state1 = bootstrap(&config1, &catalog1).unwrap();
473
474 let topology1 = Arc::new(Mutex::new(state1.topology));
475 let routing1 = Arc::new(state1.routing);
476
477 struct JoinHandler {
478 topology: Arc<Mutex<ClusterTopology>>,
479 routing: Arc<RoutingTable>,
480 }
481
482 impl crate::transport::RaftRpcHandler for JoinHandler {
483 async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
484 match rpc {
485 RaftRpc::JoinRequest(req) => {
486 let mut topo = self.topology.lock().unwrap();
487 let resp = handle_join_request(&req, &mut topo, &self.routing, 99);
488 Ok(RaftRpc::JoinResponse(resp))
489 }
490 other => Err(ClusterError::Transport {
491 detail: format!("unexpected: {other:?}"),
492 }),
493 }
494 }
495 }
496
497 let handler = Arc::new(JoinHandler {
498 topology: topology1.clone(),
499 routing: routing1.clone(),
500 });
501
502 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
503 let t1c = t1.clone();
504 tokio::spawn(async move {
505 t1c.serve(handler, shutdown_rx).await.unwrap();
506 });
507
508 tokio::time::sleep(Duration::from_millis(30)).await;
509
510 let config2 = ClusterConfig {
511 node_id: 2,
512 listen_addr: addr2,
513 seed_nodes: vec![addr1],
514 num_groups: 2,
515 replication_factor: 1,
516 data_dir: _dir2.path().to_path_buf(),
517 force_bootstrap: false,
518 join_retry: Default::default(),
519 swim_udp_addr: None,
520 election_timeout_min: Duration::from_millis(150),
521 election_timeout_max: Duration::from_millis(300),
522 };
523
524 let lifecycle = ClusterLifecycleTracker::new();
525 let state2 = join(&config2, &catalog2, &t2, &lifecycle).await.unwrap();
526 assert!(matches!(
529 lifecycle.current(),
530 crate::lifecycle_state::ClusterLifecycleState::Joining { .. }
531 ));
532
533 assert_eq!(state2.topology.node_count(), 2);
534 assert_eq!(state2.routing.num_groups(), 2);
535
536 assert!(catalog2.load_topology().unwrap().is_some());
539 assert!(catalog2.load_routing().unwrap().is_some());
540
541 let topo1 = topology1.lock().unwrap();
543 assert_eq!(topo1.node_count(), 2);
544 assert!(topo1.contains(2));
545
546 shutdown_tx.send(true).unwrap();
547 }
548}