use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use alpine::attestation::AttesterRegistry;
use thiserror::Error;
use tokio::time::{sleep, Duration as TokioDuration};
use tracing::{info, warn};
use crate::discovery::{DiscoveryClient, DiscoveryClientOptions, DiscoveryError, DiscoveryOutcome};
use crate::phase::claim_discovery;
#[derive(Debug, Clone)]
pub struct DiscoveryRunOptions {
pub local_addr: Option<SocketAddr>,
pub prefer_multicast: bool,
pub allow_broadcast: bool,
pub cached_targets: Vec<SocketAddr>,
pub scan_subnets: bool,
pub scan_rate_per_sec: u32,
pub scan_timeout_ms: u64,
pub scan_max_hosts: u32,
pub attester_registry: Option<AttesterRegistry>,
}
impl Default for DiscoveryRunOptions {
fn default() -> Self {
Self {
local_addr: None,
prefer_multicast: false,
allow_broadcast: true,
cached_targets: Vec::new(),
scan_subnets: false,
scan_rate_per_sec: 200,
scan_timeout_ms: 500,
scan_max_hosts: 1024,
attester_registry: None,
}
}
}
#[derive(Debug, Error)]
pub enum DiscoveryRunError {
#[error("unicast discovery failed: {0}")]
Unicast(DiscoveryError),
#[error("broadcast discovery failed: {0}")]
Broadcast(DiscoveryError),
#[error("no viable interfaces for broadcast discovery (need a non-loopback IPv4 address)")]
NoInterfaces,
#[error("cached unicast discovery failed")]
CachedUnicastFailed,
#[error("subnet scan discovery failed")]
SubnetScanFailed,
#[error("subnet scan disabled (scan_max_hosts = 0)")]
SubnetScanDisabled,
}
fn default_local_addr() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
}
fn resolve_local_addr(remote_addr: SocketAddr, override_addr: Option<SocketAddr>) -> SocketAddr {
if let Some(addr) = override_addr {
return addr;
}
if matches!(remote_addr.ip(), IpAddr::V4(v4) if v4.is_broadcast()) {
return default_local_addr();
}
let bind_addr = if remote_addr.is_ipv4() {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
} else {
SocketAddr::new(IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED), 0)
};
if let Ok(sock) = std::net::UdpSocket::bind(bind_addr) {
if sock.connect(remote_addr).is_ok() {
if let Ok(local) = sock.local_addr() {
return SocketAddr::new(local.ip(), 0);
}
}
}
default_local_addr()
}
pub async fn run_discovery(remote_addr: SocketAddr) -> Result<DiscoveryOutcome, DiscoveryRunError> {
run_discovery_with_options(remote_addr, DiscoveryRunOptions::default()).await
}
pub async fn run_discovery_with_options(
remote_addr: SocketAddr,
opts: DiscoveryRunOptions,
) -> Result<DiscoveryOutcome, DiscoveryRunError> {
let _phase_guard = claim_discovery().map_err(|_| DiscoveryRunError::Unicast(DiscoveryError::PermissionDenied))?;
let is_broadcast_target = matches!(remote_addr.ip(), IpAddr::V4(v4) if v4.is_broadcast());
if !is_broadcast_target {
let local_addr = resolve_local_addr(remote_addr, opts.local_addr);
let mut options = DiscoveryClientOptions::new(
remote_addr,
local_addr,
Duration::from_secs(3),
);
options.attester_registry = opts.attester_registry.clone();
options = options.disable_multicast().disable_broadcast();
info!(
"[ALPINE][DISCOVERY] route_hint remote={} local_bind={}",
remote_addr, options.local_addr
);
let local_bind = options.local_addr;
let client = DiscoveryClient::new(options).map_err(DiscoveryRunError::Unicast)?;
match client.discover(&["alpine-control".to_string()]) {
Ok(outcome) => return Ok(outcome),
Err(err) => {
if opts.allow_broadcast && remote_addr.is_ipv4() {
warn!(
"[ALPINE][DISCOVERY][WARN] unicast discovery failed ({}); falling back to broadcast",
err
);
match attempt_broadcast(
remote_addr.port(),
&opts,
Some(local_bind.ip()),
) {
Ok(outcome) => return Ok(outcome),
Err(broadcast_err) => {
if let Ok(outcome) = attempt_cached_unicast(&opts) {
return Ok(outcome);
}
if opts.scan_subnets {
return attempt_subnet_scan(
remote_addr.port(),
&opts,
Some(local_bind.ip()),
)
.await;
}
return Err(broadcast_err);
}
}
}
return Err(DiscoveryRunError::Unicast(err));
}
}
}
match attempt_broadcast(remote_addr.port(), &opts, None) {
Ok(outcome) => Ok(outcome),
Err(broadcast_err) => {
if let Ok(outcome) = attempt_cached_unicast(&opts) {
return Ok(outcome);
}
if opts.scan_subnets {
return attempt_subnet_scan(remote_addr.port(), &opts, None).await;
}
Err(broadcast_err)
}
}
}
fn attempt_broadcast(
port: u16,
opts: &DiscoveryRunOptions,
preferred_ip: Option<IpAddr>,
) -> Result<DiscoveryOutcome, DiscoveryRunError> {
let mut attempts = collect_interfaces()?;
if attempts.is_empty() {
return Err(DiscoveryRunError::NoInterfaces);
}
if let Some(pref) = preferred_ip {
if let Some(idx) = attempts.iter().position(|a| IpAddr::V4(a.local_ip) == pref) {
let preferred = attempts.remove(idx);
attempts.insert(0, preferred);
}
}
let mut last_err: Option<DiscoveryError> = None;
for attempt in attempts.iter() {
let mut options = DiscoveryClientOptions::new(
SocketAddr::new(IpAddr::V4(attempt.broadcast), port),
SocketAddr::new(IpAddr::V4(attempt.local_ip), 0),
Duration::from_secs(3),
);
options.interface = Some(attempt.iface.clone());
options.attester_registry = opts.attester_registry.clone();
if !opts.prefer_multicast {
options = options.disable_multicast();
}
if !opts.allow_broadcast {
options = options.disable_broadcast();
}
info!(
"[ALPINE][DISCOVERY] iface={} local_ip={} netmask={} broadcast={} bound={}:0 so_broadcast={}",
attempt.iface,
attempt.local_ip,
attempt.netmask,
attempt.broadcast,
attempt.local_ip,
options.allow_broadcast
);
match DiscoveryClient::new(options) {
Ok(client) => match client.discover(&["alpine-control".to_string()]) {
Ok(outcome) => return Ok(outcome),
Err(err) => {
warn!(
"[ALPINE][DISCOVERY][WARN] iface={} error={}",
attempt.iface, err
);
last_err = Some(err);
continue;
}
},
Err(err) => {
warn!(
"[ALPINE][DISCOVERY][WARN] iface={} error={}",
attempt.iface, err
);
last_err = Some(err);
continue;
}
}
}
Err(DiscoveryRunError::Broadcast(
last_err.unwrap_or(DiscoveryError::Timeout),
))
}
fn attempt_cached_unicast(opts: &DiscoveryRunOptions) -> Result<DiscoveryOutcome, DiscoveryRunError> {
if opts.cached_targets.is_empty() {
return Err(DiscoveryRunError::CachedUnicastFailed);
}
for target in opts.cached_targets.iter() {
let local_addr = resolve_local_addr(*target, None);
let mut options = DiscoveryClientOptions::new(
*target,
local_addr,
Duration::from_millis(opts.scan_timeout_ms),
);
options.attester_registry = opts.attester_registry.clone();
options = options.disable_multicast().disable_broadcast();
info!(
"[ALPINE][DISCOVERY] cached_unicast target={} local_bind={}",
target, options.local_addr
);
if let Ok(client) = DiscoveryClient::new(options) {
match client.discover(&["alpine-control".to_string()]) {
Ok(outcome) => return Ok(outcome),
Err(err) => {
warn!(
"[ALPINE][DISCOVERY][WARN] cached_unicast target={} error={}",
target, err
);
}
}
}
}
Err(DiscoveryRunError::CachedUnicastFailed)
}
async fn attempt_subnet_scan(
port: u16,
opts: &DiscoveryRunOptions,
preferred_ip: Option<IpAddr>,
) -> Result<DiscoveryOutcome, DiscoveryRunError> {
if opts.scan_max_hosts == 0 {
return Err(DiscoveryRunError::SubnetScanDisabled);
}
let mut attempts = collect_interfaces()?;
if attempts.is_empty() {
return Err(DiscoveryRunError::NoInterfaces);
}
if let Some(pref) = preferred_ip {
if let Some(idx) = attempts.iter().position(|a| IpAddr::V4(a.local_ip) == pref) {
let preferred = attempts.remove(idx);
attempts.insert(0, preferred);
}
}
let rate = opts.scan_rate_per_sec.max(1);
let delay_ms = (1000u64 / rate as u64).max(1);
let timeout_ms = opts.scan_timeout_ms.max(100);
for attempt in attempts.iter() {
let mut scanned = 0u32;
let (network, broadcast) = network_bounds(attempt.local_ip, attempt.netmask);
let start = network.saturating_add(1);
let end = broadcast.saturating_sub(1);
info!(
"[ALPINE][DISCOVERY] subnet_scan iface={} local_ip={} netmask={} range={}.{} timeout_ms={} rate_per_sec={} max_hosts={}",
attempt.iface,
attempt.local_ip,
attempt.netmask,
std::net::Ipv4Addr::from(network),
std::net::Ipv4Addr::from(broadcast),
timeout_ms,
rate,
opts.scan_max_hosts
);
let mut ip = start;
while ip <= end && scanned < opts.scan_max_hosts {
let target_ip = std::net::Ipv4Addr::from(ip);
if target_ip != attempt.local_ip {
let target = SocketAddr::new(IpAddr::V4(target_ip), port);
let local_addr = SocketAddr::new(IpAddr::V4(attempt.local_ip), 0);
let mut options = DiscoveryClientOptions::new(
target,
local_addr,
Duration::from_millis(timeout_ms),
);
options.attester_registry = opts.attester_registry.clone();
options = options.disable_multicast().disable_broadcast();
if let Ok(client) = DiscoveryClient::new(options) {
if let Ok(outcome) = client.discover(&["alpine-control".to_string()]) {
return Ok(outcome);
}
}
scanned += 1;
sleep(TokioDuration::from_millis(delay_ms)).await;
}
ip = ip.saturating_add(1);
}
}
Err(DiscoveryRunError::SubnetScanFailed)
}
struct IfaceAttempt {
iface: String,
local_ip: std::net::Ipv4Addr,
netmask: std::net::Ipv4Addr,
broadcast: std::net::Ipv4Addr,
}
fn collect_interfaces() -> Result<Vec<IfaceAttempt>, DiscoveryRunError> {
let mut attempts = Vec::new();
let ifaces = get_if_addrs::get_if_addrs().map_err(|_| DiscoveryRunError::NoInterfaces)?;
for iface in ifaces {
if iface.is_loopback() {
continue;
}
if let get_if_addrs::IfAddr::V4(v4) = iface.addr {
let ipv4 = v4.ip;
let maskv4 = v4.netmask;
let ip_u32 = u32::from_be_bytes(ipv4.octets());
let mask_u32 = u32::from_be_bytes(maskv4.octets());
let bcast = ip_u32 | (!mask_u32);
let bcast_ip = std::net::Ipv4Addr::from(bcast.to_be_bytes());
attempts.push(IfaceAttempt {
iface: iface.name,
local_ip: ipv4,
netmask: maskv4,
broadcast: bcast_ip,
});
}
}
Ok(attempts)
}
fn network_bounds(ip: std::net::Ipv4Addr, netmask: std::net::Ipv4Addr) -> (u32, u32) {
let ip_u32 = u32::from_be_bytes(ip.octets());
let mask_u32 = u32::from_be_bytes(netmask.octets());
let network = ip_u32 & mask_u32;
let broadcast = network | (!mask_u32);
(network, broadcast)
}