nodedb_cluster/bootstrap/
join.rs1use std::collections::HashSet;
22use std::net::SocketAddr;
23
24use tracing::{debug, info, warn};
25
26use crate::catalog::ClusterCatalog;
27use crate::error::{ClusterError, Result};
28use crate::lifecycle_state::ClusterLifecycleTracker;
29use crate::multi_raft::MultiRaft;
30use crate::routing::{GroupInfo, RoutingTable};
31use crate::rpc_codec::{JoinRequest, JoinResponse, LEADER_REDIRECT_PREFIX, RaftRpc};
32use crate::topology::{ClusterTopology, NodeInfo, NodeState};
33use crate::transport::NexarTransport;
34
35use super::config::{ClusterConfig, ClusterState};
36
37const MAX_REDIRECTS_PER_ATTEMPT: u32 = 3;
41
42pub(crate) fn parse_leader_hint(error: &str) -> Option<SocketAddr> {
55 error
56 .strip_prefix(LEADER_REDIRECT_PREFIX)
57 .and_then(|s| s.trim().parse().ok())
58}
59
60pub(super) async fn join(
72 config: &ClusterConfig,
73 catalog: &ClusterCatalog,
74 transport: &NexarTransport,
75 lifecycle: &ClusterLifecycleTracker,
76) -> Result<ClusterState> {
77 info!(
78 node_id = config.node_id,
79 seeds = ?config.seed_nodes,
80 "joining existing cluster"
81 );
82
83 if config.seed_nodes.is_empty() {
84 let err = ClusterError::Transport {
85 detail: "no seed nodes configured".into(),
86 };
87 lifecycle.to_failed(err.to_string());
88 return Err(err);
89 }
90
91 let req_template = JoinRequest {
92 node_id: config.node_id,
93 listen_addr: config.listen_addr.to_string(),
94 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
95 };
96
97 let policy = config.join_retry;
98 let mut last_err: Option<ClusterError> = None;
99
100 for attempt in 0..policy.max_attempts {
101 lifecycle.to_joining(attempt);
102
103 let delay = policy.backoff_for(attempt);
104 if !delay.is_zero() {
105 debug!(
106 node_id = config.node_id,
107 attempt,
108 delay_ms = delay.as_millis() as u64,
109 "backing off before next join attempt"
110 );
111 tokio::time::sleep(delay).await;
112 }
113
114 match try_join_once(config, catalog, transport, &req_template).await {
115 Ok(state) => return Ok(state),
116 Err(e) => {
117 warn!(
118 node_id = config.node_id,
119 attempt,
120 error = %e,
121 "join attempt failed; will retry"
122 );
123 last_err = Some(e);
124 }
125 }
126 }
127
128 let max_attempts = policy.max_attempts;
129 let err = last_err.unwrap_or_else(|| ClusterError::Transport {
130 detail: format!("join exhausted {max_attempts} attempts with no concrete error"),
131 });
132 lifecycle.to_failed(err.to_string());
133 Err(err)
134}
135
136async fn try_join_once(
141 config: &ClusterConfig,
142 catalog: &ClusterCatalog,
143 transport: &NexarTransport,
144 req_template: &JoinRequest,
145) -> Result<ClusterState> {
146 let mut work: std::collections::VecDeque<SocketAddr> =
156 config.seed_nodes.iter().copied().collect();
157 {
158 let mut sorted: Vec<SocketAddr> = work.drain(..).collect();
162 sorted.sort();
163 work.extend(sorted);
164 }
165 let mut visited: HashSet<SocketAddr> = HashSet::new();
166 let mut redirects: u32 = 0;
167 let mut last_err: Option<ClusterError> = None;
168
169 while let Some(addr) = work.pop_front() {
170 if !visited.insert(addr) {
171 continue;
172 }
173
174 let rpc = RaftRpc::JoinRequest(req_template.clone());
175 match transport.send_rpc_to_addr(addr, rpc).await {
176 Ok(RaftRpc::JoinResponse(resp)) => {
177 if resp.success {
178 return apply_join_response(config, catalog, transport, &resp);
179 }
180 if let Some(leader) = parse_leader_hint(&resp.error) {
182 if redirects < MAX_REDIRECTS_PER_ATTEMPT && !visited.contains(&leader) {
183 info!(
184 node_id = config.node_id,
185 from = %addr,
186 to = %leader,
187 "following leader redirect"
188 );
189 redirects += 1;
190 work.push_front(leader);
191 continue;
192 }
193 debug!(
194 node_id = config.node_id,
195 from = %addr,
196 leader = %leader,
197 redirects,
198 "redirect cap reached or loop detected; falling through"
199 );
200 }
201 last_err = Some(ClusterError::Transport {
202 detail: format!("join rejected by {addr}: {}", resp.error),
203 });
204 }
205 Ok(other) => {
206 last_err = Some(ClusterError::Transport {
207 detail: format!("unexpected response from {addr}: {other:?}"),
208 });
209 }
210 Err(e) => {
211 debug!(%addr, error = %e, "seed unreachable");
212 last_err = Some(e);
213 }
214 }
215 }
216
217 Err(last_err.unwrap_or_else(|| ClusterError::Transport {
218 detail: "no seed nodes produced a response".into(),
219 }))
220}
221
222fn apply_join_response(
238 config: &ClusterConfig,
239 catalog: &ClusterCatalog,
240 transport: &NexarTransport,
241 resp: &JoinResponse,
242) -> Result<ClusterState> {
243 let mut topology = ClusterTopology::new();
245 for node in &resp.nodes {
246 let state = NodeState::from_u8(node.state).unwrap_or(NodeState::Active);
247 let mut info = NodeInfo {
248 node_id: node.node_id,
249 addr: node.addr.clone(),
250 state,
251 raft_groups: node.raft_groups.clone(),
252 wire_version: node.wire_version,
253 };
254 if node.node_id == config.node_id {
255 info.state = NodeState::Active;
256 }
257 topology.add_node(info);
258 }
259
260 let mut group_members = std::collections::HashMap::new();
262 for g in &resp.groups {
263 group_members.insert(
264 g.group_id,
265 GroupInfo {
266 leader: g.leader,
267 members: g.members.clone(),
268 learners: g.learners.clone(),
269 },
270 );
271 }
272 let routing = RoutingTable::from_parts(resp.vshard_to_group.clone(), group_members);
273
274 catalog.save_cluster_id(resp.cluster_id)?;
283 catalog.save_topology(&topology)?;
284 catalog.save_routing(&routing)?;
285
286 let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone());
292 for g in &resp.groups {
293 let is_voter = g.members.contains(&config.node_id);
294 let is_learner = g.learners.contains(&config.node_id);
295
296 if is_voter {
297 let peers: Vec<u64> = g
298 .members
299 .iter()
300 .copied()
301 .filter(|&id| id != config.node_id)
302 .collect();
303 multi_raft.add_group(g.group_id, peers)?;
304 } else if is_learner {
305 let voters = g.members.clone();
306 let other_learners: Vec<u64> = g
307 .learners
308 .iter()
309 .copied()
310 .filter(|&id| id != config.node_id)
311 .collect();
312 multi_raft.add_group_as_learner(g.group_id, voters, other_learners)?;
313 }
314 }
315
316 for node in &resp.nodes {
318 if node.node_id != config.node_id
319 && let Ok(addr) = node.addr.parse::<SocketAddr>()
320 {
321 transport.register_peer(node.node_id, addr);
322 }
323 }
324
325 info!(
326 node_id = config.node_id,
327 nodes = topology.node_count(),
328 groups = routing.num_groups(),
329 "joined cluster"
330 );
331
332 Ok(ClusterState {
333 topology,
334 routing,
335 multi_raft,
336 })
337}
338
339#[cfg(test)]
340mod tests {
341 use super::super::bootstrap_fn::bootstrap;
342 use super::super::config::JoinRetryPolicy;
343 use super::super::handle_join::handle_join_request;
344 use super::*;
345 use std::sync::{Arc, Mutex};
346 use std::time::Duration;
347
348 fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
349 let dir = tempfile::tempdir().unwrap();
350 let path = dir.path().join("cluster.redb");
351 let catalog = ClusterCatalog::open(&path).unwrap();
352 (dir, catalog)
353 }
354
355 #[test]
358 fn parse_leader_hint_extracts_valid_addr() {
359 assert_eq!(
360 parse_leader_hint("not leader; retry at 10.0.0.1:9400"),
361 Some("10.0.0.1:9400".parse().unwrap())
362 );
363 assert_eq!(
364 parse_leader_hint("not leader; retry at 127.0.0.1:65535"),
365 Some("127.0.0.1:65535".parse().unwrap())
366 );
367 }
368
369 #[test]
370 fn parse_leader_hint_rejects_unrelated_error() {
371 assert_eq!(
372 parse_leader_hint("node_id 2 already registered with different address 10.0.0.2:9400"),
373 None
374 );
375 assert_eq!(parse_leader_hint(""), None);
376 assert_eq!(
377 parse_leader_hint("conf change commit timeout on group 0"),
378 None
379 );
380 }
381
382 #[test]
383 fn parse_leader_hint_rejects_malformed_addr() {
384 assert_eq!(parse_leader_hint("not leader; retry at notanaddress"), None);
385 assert_eq!(parse_leader_hint("not leader; retry at "), None);
386 assert_eq!(parse_leader_hint("not leader; retry at 10.0.0.1"), None);
387 }
388
389 #[test]
390 fn join_retry_policy_default_schedule() {
391 let policy = JoinRetryPolicy::default();
395 assert_eq!(policy.backoff_for(0), Duration::ZERO);
396 assert_eq!(policy.backoff_for(1), Duration::from_millis(250));
397 assert_eq!(policy.backoff_for(2), Duration::from_millis(500));
398 assert_eq!(policy.backoff_for(3), Duration::from_secs(1));
399 assert_eq!(policy.backoff_for(4), Duration::from_secs(2));
400 assert_eq!(policy.backoff_for(5), Duration::from_secs(4));
401 assert_eq!(policy.backoff_for(6), Duration::from_secs(8));
402 assert_eq!(policy.backoff_for(7), Duration::from_secs(16));
403 assert_eq!(policy.backoff_for(8), Duration::from_secs(32));
404 assert_eq!(policy.backoff_for(9), Duration::ZERO);
406 }
407
408 #[test]
409 fn join_retry_policy_test_schedule_is_subsecond() {
410 let policy = JoinRetryPolicy {
413 max_attempts: 8,
414 max_backoff_secs: 2,
415 };
416 let total: Duration = (0..=policy.max_attempts)
419 .map(|a| policy.backoff_for(a))
420 .sum();
421 assert!(
422 total < Duration::from_secs(5),
423 "test schedule too slow: {total:?}"
424 );
425 assert_eq!(policy.backoff_for(8), Duration::from_secs(2));
427 }
428
429 #[tokio::test]
432 async fn full_bootstrap_join_flow() {
433 let t1 = Arc::new(NexarTransport::new(1, "127.0.0.1:0".parse().unwrap()).unwrap());
435 let t2 = Arc::new(NexarTransport::new(2, "127.0.0.1:0".parse().unwrap()).unwrap());
436
437 let (_dir1, catalog1) = temp_catalog();
438 let (_dir2, catalog2) = temp_catalog();
439
440 let addr1 = t1.local_addr();
441 let addr2 = t2.local_addr();
442
443 let config1 = ClusterConfig {
444 node_id: 1,
445 listen_addr: addr1,
446 seed_nodes: vec![addr1],
447 num_groups: 2,
448 replication_factor: 1,
449 data_dir: _dir1.path().to_path_buf(),
450 force_bootstrap: false,
451 join_retry: Default::default(),
452 };
453 let state1 = bootstrap(&config1, &catalog1).unwrap();
454
455 let topology1 = Arc::new(Mutex::new(state1.topology));
456 let routing1 = Arc::new(state1.routing);
457
458 struct JoinHandler {
459 topology: Arc<Mutex<ClusterTopology>>,
460 routing: Arc<RoutingTable>,
461 }
462
463 impl crate::transport::RaftRpcHandler for JoinHandler {
464 async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
465 match rpc {
466 RaftRpc::JoinRequest(req) => {
467 let mut topo = self.topology.lock().unwrap();
468 let resp = handle_join_request(&req, &mut topo, &self.routing, 99);
469 Ok(RaftRpc::JoinResponse(resp))
470 }
471 other => Err(ClusterError::Transport {
472 detail: format!("unexpected: {other:?}"),
473 }),
474 }
475 }
476 }
477
478 let handler = Arc::new(JoinHandler {
479 topology: topology1.clone(),
480 routing: routing1.clone(),
481 });
482
483 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
484 let t1c = t1.clone();
485 tokio::spawn(async move {
486 t1c.serve(handler, shutdown_rx).await.unwrap();
487 });
488
489 tokio::time::sleep(Duration::from_millis(30)).await;
490
491 let config2 = ClusterConfig {
492 node_id: 2,
493 listen_addr: addr2,
494 seed_nodes: vec![addr1],
495 num_groups: 2,
496 replication_factor: 1,
497 data_dir: _dir2.path().to_path_buf(),
498 force_bootstrap: false,
499 join_retry: Default::default(),
500 };
501
502 let lifecycle = ClusterLifecycleTracker::new();
503 let state2 = join(&config2, &catalog2, &t2, &lifecycle).await.unwrap();
504 assert!(matches!(
507 lifecycle.current(),
508 crate::lifecycle_state::ClusterLifecycleState::Joining { .. }
509 ));
510
511 assert_eq!(state2.topology.node_count(), 2);
512 assert_eq!(state2.routing.num_groups(), 2);
513
514 assert!(catalog2.load_topology().unwrap().is_some());
517 assert!(catalog2.load_routing().unwrap().is_some());
518
519 let topo1 = topology1.lock().unwrap();
521 assert_eq!(topo1.node_count(), 2);
522 assert!(topo1.contains(2));
523
524 shutdown_tx.send(true).unwrap();
525 }
526}