use super::*;
use thiserror::Error;
#[cfg(feature = "gossip-bootstrap")]
#[derive(Debug, Error)]
pub(crate) enum GossipRuntimeSwitchError {
#[error("all runtime switch candidates failed: {reason}")]
AllCandidatesFailed { reason: String },
#[error("runtime switch exhausted candidate list")]
ExhaustedCandidateList,
}
#[cfg(feature = "gossip-bootstrap")]
pub(crate) async fn maybe_switch_gossip_runtime(
runtime: &mut ReceiverRuntime,
packet_ingest_tx: &ingest::RawPacketBatchSender,
entrypoints: &[String],
entrypoint_bias: Option<&GossipEntrypointBias>,
) -> Result<Option<String>, GossipRuntimeSwitchError> {
let candidate_pool = collect_runtime_switch_entrypoints(
runtime,
entrypoints,
read_gossip_runtime_switch_peer_candidates(),
);
if candidate_pool.len() <= 1 {
return Ok(None);
}
let previous_entrypoint = runtime.active_gossip_entrypoint.clone();
let prioritized_candidates_max = read_gossip_runtime_switch_prioritized_candidates_max();
let mut alternate_candidates: Vec<String> = candidate_pool
.iter()
.filter(|entrypoint| Some(entrypoint.as_str()) != previous_entrypoint.as_deref())
.cloned()
.collect();
if let Some(bias) = entrypoint_bias {
alternate_candidates.sort_unstable_by(|left, right| {
bias.rank_for_entrypoint(left)
.unwrap_or(usize::MAX)
.cmp(&bias.rank_for_entrypoint(right).unwrap_or(usize::MAX))
.then_with(|| left.cmp(right))
});
}
if alternate_candidates.len() > prioritized_candidates_max {
alternate_candidates.truncate(prioritized_candidates_max);
}
let prioritized = prioritize_gossip_entrypoints(&alternate_candidates, entrypoint_bias).await;
let mut candidates = Vec::new();
let probe_enabled = read_gossip_entrypoint_probe_enabled();
if probe_enabled {
for entrypoint in prioritized {
if probe_gossip_entrypoint_live(&entrypoint).await {
candidates.push(entrypoint);
}
}
} else {
candidates.extend(prioritized);
}
if candidates.is_empty() {
tracing::debug!(
previous_entrypoint = previous_entrypoint.as_deref().unwrap_or(""),
"no alternate gossip entrypoint passed preflight; skipping runtime switch"
);
return Ok(None);
}
if let Some(active_entrypoint) = previous_entrypoint.as_ref() {
candidates.push(active_entrypoint.clone());
}
let configured_overlap = Duration::from_millis(read_gossip_runtime_switch_overlap_ms());
let overlap = if configured_overlap > Duration::ZERO
&& runtime.gossip_runtime_secondary_port_range.is_some()
&& runtime.gossip_runtime_primary_port_range.is_some()
{
configured_overlap
} else {
Duration::ZERO
};
if configured_overlap > Duration::ZERO && overlap == Duration::ZERO {
tracing::debug!(
"gossip runtime overlap disabled because SOF_GOSSIP_RUNTIME_SWITCH_PORT_RANGE is unset"
);
}
if overlap == Duration::ZERO {
let selected_port_range = runtime
.gossip_runtime_active_port_range
.or(runtime.gossip_runtime_primary_port_range);
runtime.stop_gossip_runtime().await;
tokio::time::sleep(Duration::from_millis(50)).await;
let mut attempts = 0_usize;
let mut last_error: Option<String> = None;
for entrypoint in candidates {
attempts = attempts.saturating_add(1);
match start_gossip_bootstrapped_receiver_guarded(
&entrypoint,
packet_ingest_tx.clone(),
runtime.gossip_identity.clone(),
selected_port_range,
)
.await
{
Ok((receiver_handles, gossip_runtime, repair_client)) => {
runtime.replace_gossip_runtime(
receiver_handles,
gossip_runtime,
repair_client,
Some(entrypoint.clone()),
selected_port_range,
);
tracing::info!(
previous_entrypoint = previous_entrypoint.as_deref().unwrap_or(""),
new_entrypoint = %entrypoint,
overlap_ms = 0_u64,
"gossip runtime handoff complete"
);
return Ok(Some(entrypoint));
}
Err(error) => {
tracing::warn!(
entrypoint = %entrypoint,
error = %error,
"runtime switch candidate failed"
);
last_error = Some(error.to_string());
}
}
}
if attempts == 0 {
return Ok(None);
}
if let Some(error) = last_error {
return Err(GossipRuntimeSwitchError::AllCandidatesFailed { reason: error });
}
return Err(GossipRuntimeSwitchError::ExhaustedCandidateList);
}
let mut old_receiver_handles = Some(std::mem::take(&mut runtime.gossip_receiver_handles));
let mut old_gossip_runtime = runtime.gossip_runtime.take();
let mut old_repair_client = runtime.repair_client.take();
let mut old_entrypoint = runtime.active_gossip_entrypoint.take();
let mut old_port_range = runtime.gossip_runtime_active_port_range.take();
let Some(primary_port_range) = runtime.gossip_runtime_primary_port_range else {
return Ok(None);
};
let Some(secondary_port_range) = runtime.gossip_runtime_secondary_port_range else {
return Ok(None);
};
let target_port_range = if old_port_range == Some(primary_port_range) {
secondary_port_range
} else {
primary_port_range
};
let mut attempts = 0_usize;
let mut last_error: Option<String> = None;
let stabilize_min_packets = read_gossip_runtime_switch_stabilize_min_packets();
let stabilize_sustain = Duration::from_millis(read_gossip_runtime_switch_stabilize_ms());
let stabilize_max_wait =
Duration::from_millis(read_gossip_runtime_switch_stabilize_max_wait_ms());
let stabilize_min_peers = read_gossip_runtime_switch_stabilize_min_peers();
let stabilize_effective_max_wait = stabilize_max_wait.max(
overlap
.saturating_add(Duration::from_millis(500))
.max(Duration::from_millis(1_500)),
);
for entrypoint in candidates {
attempts = attempts.saturating_add(1);
match start_gossip_bootstrapped_receiver_guarded(
&entrypoint,
packet_ingest_tx.clone(),
runtime.gossip_identity.clone(),
Some(target_port_range),
)
.await
{
Ok((receiver_handles, new_gossip_runtime, repair_client)) => {
let new_ingest_telemetry = new_gossip_runtime.ingest_telemetry.clone();
runtime.replace_gossip_runtime(
receiver_handles,
new_gossip_runtime,
repair_client,
Some(entrypoint.clone()),
Some(target_port_range),
);
let stabilization = wait_for_runtime_stabilization(
new_ingest_telemetry,
stabilize_sustain,
stabilize_min_packets,
stabilize_effective_max_wait,
)
.await;
let discovered_peers = runtime
.gossip_runtime
.as_ref()
.map(|candidate| candidate.cluster_info.all_peers().len())
.unwrap_or_default();
let stabilized_by_peers =
stabilization.packets_seen > 0 && discovered_peers >= stabilize_min_peers;
if !stabilization.stabilized && !stabilized_by_peers {
tracing::warn!(
previous_entrypoint = previous_entrypoint.as_deref().unwrap_or(""),
rejected_entrypoint = %entrypoint,
old_port_range = old_port_range
.map(format_port_range)
.unwrap_or_else(|| "-".to_owned()),
rejected_port_range = format_port_range(target_port_range),
waited_ms = duration_to_ms_u64(stabilization.elapsed),
packets_seen = stabilization.packets_seen,
peers_discovered = discovered_peers,
sustain_ms = duration_to_ms_u64(stabilize_sustain),
min_packets = stabilize_min_packets,
min_peers = stabilize_min_peers,
configured_max_wait_ms = duration_to_ms_u64(stabilize_max_wait),
effective_max_wait_ms = duration_to_ms_u64(stabilize_effective_max_wait),
"new gossip runtime did not stabilize during overlap; reverting to previous runtime"
);
let new_receiver_handles = std::mem::take(&mut runtime.gossip_receiver_handles);
let new_runtime = runtime.gossip_runtime.take();
let _ = runtime.repair_client.take();
stop_gossip_runtime_components(new_receiver_handles, new_runtime).await;
if let Some(old_gossip_runtime) = old_gossip_runtime.take() {
runtime.replace_gossip_runtime(
old_receiver_handles.take().unwrap_or_default(),
old_gossip_runtime,
old_repair_client.take(),
old_entrypoint.take(),
old_port_range.take(),
);
} else {
runtime.gossip_receiver_handles =
old_receiver_handles.take().unwrap_or_default();
runtime.repair_client = old_repair_client.take();
runtime.active_gossip_entrypoint = old_entrypoint.take();
runtime.gossip_runtime_active_port_range = old_port_range.take();
}
return Ok(None);
}
if !stabilization.stabilized && stabilized_by_peers {
tracing::warn!(
previous_entrypoint = previous_entrypoint.as_deref().unwrap_or(""),
accepted_entrypoint = %entrypoint,
packets_seen = stabilization.packets_seen,
peers_discovered = discovered_peers,
min_peers = stabilize_min_peers,
"accepting new gossip runtime via peer discovery despite low packet flow"
);
}
stop_gossip_runtime_components(
old_receiver_handles.take().unwrap_or_default(),
old_gossip_runtime.take(),
)
.await;
drop(old_repair_client.take());
tracing::info!(
previous_entrypoint = previous_entrypoint.as_deref().unwrap_or(""),
new_entrypoint = %entrypoint,
overlap_ms = duration_to_ms_u64(overlap),
old_port_range = old_port_range
.map(format_port_range)
.unwrap_or_else(|| "-".to_owned()),
new_port_range = format_port_range(target_port_range),
stabilization_wait_ms = duration_to_ms_u64(stabilization.elapsed),
stabilization_packets = stabilization.packets_seen,
stabilization_peers = discovered_peers,
"gossip runtime handoff complete"
);
return Ok(Some(entrypoint));
}
Err(error) => {
if overlap > Duration::ZERO && is_bind_conflict_error(&error) {
tracing::warn!(
entrypoint = %entrypoint,
overlap_ms = duration_to_ms_u64(overlap),
error = %error,
"runtime switch overlap bind conflict; keeping current gossip runtime"
);
if let Some(old_gossip_runtime) = old_gossip_runtime.take() {
runtime.replace_gossip_runtime(
old_receiver_handles.take().unwrap_or_default(),
old_gossip_runtime,
old_repair_client.take(),
old_entrypoint.take(),
old_port_range.take(),
);
} else {
runtime.gossip_receiver_handles =
old_receiver_handles.take().unwrap_or_default();
runtime.repair_client = old_repair_client.take();
runtime.active_gossip_entrypoint = old_entrypoint.take();
runtime.gossip_runtime_active_port_range = old_port_range.take();
}
return Ok(None);
}
tracing::warn!(
entrypoint = %entrypoint,
error = %error,
"runtime switch candidate failed"
);
last_error = Some(error.to_string());
}
}
}
if let Some(old_gossip_runtime) = old_gossip_runtime.take() {
runtime.replace_gossip_runtime(
old_receiver_handles.take().unwrap_or_default(),
old_gossip_runtime,
old_repair_client.take(),
old_entrypoint.take(),
old_port_range.take(),
);
} else {
runtime.gossip_receiver_handles = old_receiver_handles.take().unwrap_or_default();
runtime.repair_client = old_repair_client.take();
runtime.active_gossip_entrypoint = old_entrypoint.take();
runtime.gossip_runtime_active_port_range = old_port_range.take();
}
if attempts == 0 {
return Ok(None);
}
if let Some(error) = last_error {
return Err(GossipRuntimeSwitchError::AllCandidatesFailed { reason: error });
}
Err(GossipRuntimeSwitchError::ExhaustedCandidateList)
}