use std::net::SocketAddr;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
use crate::bootstrap::handle_join_request;
use crate::conf_change::{ConfChange, ConfChangeType};
use crate::error::{ClusterError, Result};
use crate::forward::PlanExecutor;
use crate::health;
use crate::multi_raft::GroupStatus;
use crate::routing::RoutingTable;
use crate::rpc_codec::{JoinRequest, JoinResponse, LEADER_REDIRECT_PREFIX};
use super::handle_rpc::{JoinDecision, TOPOLOGY_GROUP_ID, decide_join};
use super::loop_core::{CommitApplier, RaftLoop};
const CONF_CHANGE_COMMIT_TIMEOUT: Duration = Duration::from_secs(5);
const CONF_CHANGE_POLL_INTERVAL: Duration = Duration::from_millis(20);
impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
pub(super) async fn join_flow(&self, req: JoinRequest) -> JoinResponse {
let (group0_leader, routing): (u64, RoutingTable) = {
let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
let routing = mr.routing().clone();
let leader_id = mr
.group_statuses()
.into_iter()
.find(|s: &GroupStatus| s.group_id == TOPOLOGY_GROUP_ID)
.map(|s| s.leader_id)
.unwrap_or(0);
(leader_id, routing)
};
let leader_addr_hint = if group0_leader != 0 && group0_leader != self.node_id {
self.topology
.read()
.unwrap_or_else(|p| p.into_inner())
.get_node(group0_leader)
.map(|n| n.addr.clone())
} else {
None
};
if let JoinDecision::Redirect { leader_addr } =
decide_join(group0_leader, self.node_id, leader_addr_hint)
{
warn!(
joining_node = req.node_id,
leader_id = group0_leader,
leader_addr = %leader_addr,
"JoinRequest received on non-leader; redirecting"
);
return reject(format!("{LEADER_REDIRECT_PREFIX}{leader_addr}"));
}
let new_addr: SocketAddr = match req.listen_addr.parse() {
Ok(a) => a,
Err(e) => {
return reject(format!("invalid listen_addr '{}': {e}", req.listen_addr));
}
};
let existing = self
.topology
.read()
.unwrap_or_else(|p| p.into_inner())
.get_node(req.node_id)
.cloned();
if let Some(existing) = existing {
if existing.addr != req.listen_addr {
return reject(format!(
"node_id {} already registered with different address {} (request: {})",
req.node_id, existing.addr, req.listen_addr
));
}
debug!(
joining_node = req.node_id,
"idempotent re-join; returning current cluster state"
);
return self.build_current_response(&req);
}
self.transport.register_peer(req.node_id, new_addr);
let cluster_id = match self.catalog.as_ref() {
Some(catalog) => match catalog.load_cluster_id() {
Ok(Some(id)) => id,
Ok(None) => {
return reject(
"server catalog is attached but has no cluster_id — refusing to \
issue a JoinResponse without a real cluster identity"
.to_string(),
);
}
Err(e) => {
return reject(format!("failed to read cluster_id from catalog: {e}"));
}
},
None => self.node_id,
};
{
let mut topo = self.topology.write().unwrap_or_else(|p| p.into_inner());
let initial_resp = handle_join_request(&req, &mut topo, &routing, cluster_id);
if !initial_resp.success {
return initial_resp;
}
}
let group_ids: Vec<u64> = {
let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
mr.routing().group_ids()
};
let mut pending: Vec<(u64, u64)> = Vec::with_capacity(group_ids.len()); for gid in &group_ids {
let change = ConfChange {
change_type: ConfChangeType::AddLearner,
node_id: req.node_id,
};
let propose_result = {
let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
mr.propose_conf_change(*gid, &change)
};
match propose_result {
Ok((_, log_index)) => pending.push((*gid, log_index)),
Err(ClusterError::Transport { detail }) => {
return reject(format!(
"failed to propose AddLearner on group {gid}: {detail}"
));
}
Err(e) => {
return reject(format!("failed to propose AddLearner on group {gid}: {e}"));
}
}
}
let deadline = Instant::now() + CONF_CHANGE_COMMIT_TIMEOUT;
for (gid, log_index) in &pending {
if let Err(err) = self
.wait_for_learner_applied(*gid, req.node_id, *log_index, deadline)
.await
{
return reject(err.to_string());
}
}
if let Some(catalog) = self.catalog.as_ref() {
let topo_snapshot = self
.topology
.read()
.unwrap_or_else(|p| p.into_inner())
.clone();
let routing_snapshot = {
let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
mr.routing().clone()
};
if let Err(e) = catalog.save_topology(&topo_snapshot) {
warn!(error = %e, "failed to persist topology after join");
return reject(format!("catalog save_topology failed: {e}"));
}
if let Err(e) = catalog.save_routing(&routing_snapshot) {
warn!(error = %e, "failed to persist routing after join");
return reject(format!("catalog save_routing failed: {e}"));
}
}
health::broadcast_topology(self.node_id, &self.topology, &self.transport);
info!(
joining_node = req.node_id,
groups = pending.len(),
"join accepted; learner AddLearner commits complete"
);
self.build_current_response(&req)
}
async fn wait_for_learner_applied(
&self,
group_id: u64,
learner_id: u64,
log_index: u64,
deadline: Instant,
) -> Result<()> {
loop {
let applied = {
let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
mr.routing()
.group_info(group_id)
.map(|info| info.learners.contains(&learner_id))
};
match applied {
Some(true) => return Ok(()),
Some(false) => {}
None => return Err(ClusterError::JoinGroupDisappeared { group_id }),
}
if Instant::now() >= deadline {
return Err(ClusterError::JoinCommitTimeout {
group_id,
log_index,
});
}
tokio::time::sleep(CONF_CHANGE_POLL_INTERVAL).await;
}
}
fn build_current_response(&self, req: &JoinRequest) -> JoinResponse {
let cluster_id = match self.catalog.as_ref() {
Some(catalog) => match catalog.load_cluster_id() {
Ok(Some(id)) => id,
Ok(None) => {
return reject(
"server catalog is attached but has no cluster_id — refusing to \
issue a JoinResponse without a real cluster identity"
.to_string(),
);
}
Err(e) => {
return reject(format!("failed to read cluster_id from catalog: {e}"));
}
},
None => self.node_id,
};
let topology_clone = self
.topology
.read()
.unwrap_or_else(|p| p.into_inner())
.clone();
let routing_clone = {
let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
mr.routing().clone()
};
let mut topo = topology_clone;
handle_join_request(req, &mut topo, &routing_clone, cluster_id)
}
}
fn reject(error: String) -> JoinResponse {
JoinResponse {
success: false,
error,
cluster_id: 0,
nodes: vec![],
vshard_to_group: vec![],
groups: vec![],
}
}