use super::*;
use thiserror::Error;
#[derive(Debug, Error)]
pub(crate) enum ReceiverBootstrapError {
#[error("bind address configuration failed: {reason}")]
BindAddress { reason: String },
#[cfg(feature = "gossip-bootstrap")]
#[error("gossip runtime port plan failed: {reason}")]
GossipPortPlan { reason: String },
#[cfg(feature = "gossip-bootstrap")]
#[error("failed to bootstrap from all SOF_GOSSIP_ENTRYPOINT values: {reason}")]
GossipBootstrapExhausted { reason: String },
#[cfg(not(feature = "gossip-bootstrap"))]
#[error(
"SOF_GOSSIP_ENTRYPOINT set but this binary was built without `gossip-bootstrap` feature"
)]
GossipFeatureDisabled,
}
pub(crate) async fn start_receiver(
tx: ingest::RawPacketBatchSender,
tx_event_rx: mpsc::Receiver<TxObservedEvent>,
#[cfg(feature = "kernel-bypass")] _control_plane_only_bootstrap: bool,
) -> Result<ReceiverRuntime, ReceiverBootstrapError> {
let log_startup_steps = read_log_startup_steps();
let mut static_receiver_handles = Vec::new();
#[cfg(feature = "gossip-bootstrap")]
let mut gossip_receiver_handles = Vec::new();
#[cfg(not(feature = "gossip-bootstrap"))]
let gossip_receiver_handles = Vec::new();
#[cfg(feature = "gossip-bootstrap")]
let mut active_gossip_entrypoint: Option<String> = None;
#[cfg(feature = "gossip-bootstrap")]
let mut gossip_runtime: Option<GossipRuntime> = None;
#[cfg(feature = "gossip-bootstrap")]
let mut repair_client: Option<crate::repair::GossipRepairClient> = None;
#[cfg(feature = "gossip-bootstrap")]
let mut gossip_runtime_active_port_range: Option<PortRange> = None;
#[cfg(feature = "gossip-bootstrap")]
let mut gossip_runtime_primary_port_range: Option<PortRange> = None;
#[cfg(feature = "gossip-bootstrap")]
let mut gossip_runtime_secondary_port_range: Option<PortRange> = None;
#[cfg(feature = "gossip-bootstrap")]
let gossip_identity = Arc::new(Keypair::new());
let gossip_entrypoints = read_gossip_entrypoints();
if !gossip_entrypoints.is_empty() {
if log_startup_steps {
tracing::info!(
step = "gossip_bootstrap_config",
configured_entrypoints = gossip_entrypoints.len(),
"gossip bootstrap requested"
);
}
#[cfg(not(feature = "gossip-bootstrap"))]
{
return Err(ReceiverBootstrapError::GossipFeatureDisabled);
}
#[cfg(feature = "gossip-bootstrap")]
{
let port_plan = build_gossip_runtime_port_plan().map_err(|source| {
ReceiverBootstrapError::GossipPortPlan {
reason: source.to_string(),
}
})?;
gossip_runtime_primary_port_range = Some(port_plan.primary);
gossip_runtime_secondary_port_range = port_plan.secondary;
tracing::info!(
primary_range = %format_port_range(port_plan.primary),
secondary_range = port_plan
.secondary
.map(format_port_range)
.unwrap_or_else(|| "-".to_owned()),
"gossip runtime port plan initialized"
);
let prioritized_entrypoints =
prioritize_gossip_entrypoints(&gossip_entrypoints, None).await;
if log_startup_steps {
tracing::info!(
step = "gossip_bootstrap_prioritized",
total_candidates = prioritized_entrypoints.len(),
"gossip entrypoint ordering computed"
);
}
let mut last_error: Option<String> = None;
let bootstrap_stabilize_min_packets =
read_gossip_runtime_switch_stabilize_min_packets();
let bootstrap_stabilize_sustain =
Duration::from_millis(read_gossip_runtime_switch_stabilize_ms());
let bootstrap_stabilize_max_wait =
Duration::from_millis(read_gossip_bootstrap_stabilize_max_wait_ms());
let bootstrap_stabilize_min_peers = {
let configured = read_gossip_bootstrap_stabilize_min_peers();
#[cfg(feature = "kernel-bypass")]
{
if _control_plane_only_bootstrap {
1
} else {
configured
}
}
#[cfg(not(feature = "kernel-bypass"))]
{
configured
}
};
for (attempt, entrypoint) in prioritized_entrypoints.iter().enumerate() {
if log_startup_steps {
tracing::info!(
step = "gossip_bootstrap_attempt",
attempt = attempt.saturating_add(1),
entrypoint = %entrypoint,
"attempting gossip bootstrap entrypoint"
);
}
match start_gossip_bootstrapped_receiver_guarded(
entrypoint,
tx.clone(),
gossip_identity.clone(),
Some(port_plan.primary),
)
.await
{
Ok((gossip_receivers, runtime, client)) => {
let stabilization = wait_for_runtime_stabilization(
runtime.ingest_telemetry.clone(),
bootstrap_stabilize_sustain,
bootstrap_stabilize_min_packets,
bootstrap_stabilize_max_wait,
)
.await;
let discovered_peers = runtime.cluster_info.all_peers().len();
#[cfg(feature = "kernel-bypass")]
let stabilized_by_peers = stabilization.packets_seen > 0
&& discovered_peers >= bootstrap_stabilize_min_peers;
#[cfg(not(feature = "kernel-bypass"))]
let stabilized_by_peers = stabilization.packets_seen > 0
&& discovered_peers >= bootstrap_stabilize_min_peers;
let accepted = stabilization.stabilized || stabilized_by_peers;
if !accepted {
let candidate_receivers = gossip_receivers.len();
tracing::warn!(
entrypoint = %entrypoint,
waited_ms = duration_to_ms_u64(stabilization.elapsed),
packets_seen = stabilization.packets_seen,
peers_discovered = discovered_peers,
sustain_ms = duration_to_ms_u64(bootstrap_stabilize_sustain),
min_packets = bootstrap_stabilize_min_packets,
min_peers = bootstrap_stabilize_min_peers,
max_wait_ms = duration_to_ms_u64(bootstrap_stabilize_max_wait),
"gossip bootstrap runtime did not stabilize; trying next entrypoint"
);
stop_gossip_runtime_components(gossip_receivers, Some(runtime)).await;
tracing::warn!(
entrypoint = %entrypoint,
receiver_tasks_stopped = candidate_receivers,
"stopped unstable gossip bootstrap runtime; continuing with next entrypoint"
);
last_error = Some(format!(
"entrypoint {entrypoint} did not receive packets during bootstrap stabilization"
));
continue;
}
if !stabilization.stabilized && stabilized_by_peers {
#[cfg(feature = "kernel-bypass")]
if _control_plane_only_bootstrap {
tracing::info!(
entrypoint = %entrypoint,
packets_seen = stabilization.packets_seen,
peers_discovered = discovered_peers,
min_peers = bootstrap_stabilize_min_peers,
"accepting gossip bootstrap runtime via peer discovery for external kernel-bypass ingress"
);
} else {
tracing::warn!(
entrypoint = %entrypoint,
packets_seen = stabilization.packets_seen,
peers_discovered = discovered_peers,
min_peers = bootstrap_stabilize_min_peers,
"accepting gossip bootstrap runtime via peer discovery despite low packet flow"
);
}
#[cfg(not(feature = "kernel-bypass"))]
tracing::warn!(
entrypoint = %entrypoint,
packets_seen = stabilization.packets_seen,
peers_discovered = discovered_peers,
min_peers = bootstrap_stabilize_min_peers,
"accepting gossip bootstrap runtime via peer discovery despite low packet flow"
);
}
let mut gossip_receivers = gossip_receivers;
if attempt > 0 {
tracing::info!(
entrypoint = %entrypoint,
attempt = attempt.saturating_add(1),
"gossip bootstrap succeeded after fallback"
);
}
gossip_receiver_handles.append(&mut gossip_receivers);
gossip_runtime = Some(runtime);
repair_client = client;
active_gossip_entrypoint = Some(entrypoint.clone());
gossip_runtime_active_port_range = Some(port_plan.primary);
if log_startup_steps {
tracing::info!(
step = "gossip_bootstrap_active",
entrypoint = %entrypoint,
"gossip bootstrap entrypoint activated"
);
}
break;
}
Err(error) => {
tracing::warn!(
entrypoint = %entrypoint,
attempt = attempt.saturating_add(1),
error = %error,
"failed gossip bootstrap entrypoint; trying next"
);
last_error = Some(error.to_string());
}
}
}
if gossip_runtime.is_none()
&& let Some(error) = last_error
{
return Err(ReceiverBootstrapError::GossipBootstrapExhausted { reason: error });
}
}
}
if static_receiver_handles.is_empty() && gossip_receiver_handles.is_empty() {
let bind_addr = read_bind_addr().map_err(|source| ReceiverBootstrapError::BindAddress {
reason: source.to_string(),
})?;
tracing::info!(
%bind_addr,
"starting direct listener mode (recommended for proxy/tunnel feeds)"
);
static_receiver_handles.push(ingest::spawn_udp_receiver(bind_addr, tx));
}
tracing::info!(
static_receivers = static_receiver_handles.len(),
gossip_receivers = gossip_receiver_handles.len(),
"receiver bootstrap complete; waiting for traffic"
);
#[cfg(feature = "gossip-bootstrap")]
let gossip_ingest_telemetry = gossip_runtime
.as_ref()
.map(|runtime| runtime.ingest_telemetry.clone());
let runtime = ReceiverRuntime {
static_receiver_handles,
gossip_receiver_handles,
#[cfg(feature = "gossip-bootstrap")]
gossip_ingest_telemetry,
#[cfg(feature = "gossip-bootstrap")]
gossip_runtime,
#[cfg(feature = "gossip-bootstrap")]
gossip_identity,
#[cfg(feature = "gossip-bootstrap")]
active_gossip_entrypoint,
#[cfg(feature = "gossip-bootstrap")]
gossip_runtime_primary_port_range,
#[cfg(feature = "gossip-bootstrap")]
gossip_runtime_secondary_port_range,
#[cfg(feature = "gossip-bootstrap")]
gossip_runtime_active_port_range,
#[cfg(feature = "gossip-bootstrap")]
repair_client,
tx_event_rx,
};
#[cfg(feature = "gossip-bootstrap")]
let mut runtime = runtime;
#[cfg(feature = "gossip-bootstrap")]
if read_gossip_bootstrap_only() && runtime.gossip_runtime.is_some() {
let active_entrypoint = runtime.active_gossip_entrypoint.clone().unwrap_or_default();
let gossip_receivers = runtime.gossip_receiver_handles.len();
runtime.detach_gossip_control_plane();
tracing::info!(
entrypoint = %active_entrypoint,
gossip_receivers,
"gossip bootstrap-only mode enabled; detached gossip control-plane and kept direct receivers"
);
}
Ok(runtime)
}
#[cfg(feature = "kernel-bypass")]
#[must_use]
pub(crate) fn start_external_receiver(
tx_event_rx: mpsc::Receiver<TxObservedEvent>,
) -> ReceiverRuntime {
ReceiverRuntime::external(tx_event_rx)
}