alpine_protocol_sdk/discovery/
runner.rs

1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2use std::time::Duration;
3
4use alpine::attestation::AttesterRegistry;
5use thiserror::Error;
6use tokio::time::{sleep, Duration as TokioDuration};
7use tracing::{info, warn};
8
9use crate::discovery::{DiscoveryClient, DiscoveryClientOptions, DiscoveryError, DiscoveryOutcome};
10use crate::phase::claim_discovery;
11
12#[derive(Debug, Clone)]
13pub struct DiscoveryRunOptions {
14    pub local_addr: Option<SocketAddr>,
15    pub prefer_multicast: bool,
16    pub allow_broadcast: bool,
17    pub cached_targets: Vec<SocketAddr>,
18    pub scan_subnets: bool,
19    pub scan_rate_per_sec: u32,
20    pub scan_timeout_ms: u64,
21    pub scan_max_hosts: u32,
22    pub attester_registry: Option<AttesterRegistry>,
23}
24
25impl Default for DiscoveryRunOptions {
26    fn default() -> Self {
27        Self {
28            local_addr: None,
29            prefer_multicast: false,
30            allow_broadcast: true,
31            cached_targets: Vec::new(),
32            scan_subnets: false,
33            scan_rate_per_sec: 200,
34            scan_timeout_ms: 500,
35            scan_max_hosts: 1024,
36            attester_registry: None,
37        }
38    }
39}
40
41#[derive(Debug, Error)]
42pub enum DiscoveryRunError {
43    #[error("unicast discovery failed: {0}")]
44    Unicast(DiscoveryError),
45    #[error("broadcast discovery failed: {0}")]
46    Broadcast(DiscoveryError),
47    #[error("no viable interfaces for broadcast discovery (need a non-loopback IPv4 address)")]
48    NoInterfaces,
49    #[error("cached unicast discovery failed")]
50    CachedUnicastFailed,
51    #[error("subnet scan discovery failed")]
52    SubnetScanFailed,
53    #[error("subnet scan disabled (scan_max_hosts = 0)")]
54    SubnetScanDisabled,
55}
56
57fn default_local_addr() -> SocketAddr {
58    SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
59}
60
61fn resolve_local_addr(remote_addr: SocketAddr, override_addr: Option<SocketAddr>) -> SocketAddr {
62    if let Some(addr) = override_addr {
63        return addr;
64    }
65    if matches!(remote_addr.ip(), IpAddr::V4(v4) if v4.is_broadcast()) {
66        return default_local_addr();
67    }
68    let bind_addr = if remote_addr.is_ipv4() {
69        SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
70    } else {
71        SocketAddr::new(IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED), 0)
72    };
73    if let Ok(sock) = std::net::UdpSocket::bind(bind_addr) {
74        if sock.connect(remote_addr).is_ok() {
75            if let Ok(local) = sock.local_addr() {
76                return SocketAddr::new(local.ip(), 0);
77            }
78        }
79    }
80    default_local_addr()
81}
82
83pub async fn run_discovery(remote_addr: SocketAddr) -> Result<DiscoveryOutcome, DiscoveryRunError> {
84    run_discovery_with_options(remote_addr, DiscoveryRunOptions::default()).await
85}
86
87pub async fn run_discovery_with_options(
88    remote_addr: SocketAddr,
89    opts: DiscoveryRunOptions,
90) -> Result<DiscoveryOutcome, DiscoveryRunError> {
91    let _phase_guard = claim_discovery().map_err(|_| DiscoveryRunError::Unicast(DiscoveryError::PermissionDenied))?;
92    let is_broadcast_target = matches!(remote_addr.ip(), IpAddr::V4(v4) if v4.is_broadcast());
93
94    if !is_broadcast_target {
95        let local_addr = resolve_local_addr(remote_addr, opts.local_addr);
96        let mut options = DiscoveryClientOptions::new(
97            remote_addr,
98            local_addr,
99            Duration::from_secs(3),
100        );
101        options.attester_registry = opts.attester_registry.clone();
102        options = options.disable_multicast().disable_broadcast();
103        info!(
104            "[ALPINE][DISCOVERY] route_hint remote={} local_bind={}",
105            remote_addr, options.local_addr
106        );
107        let local_bind = options.local_addr;
108        let client = DiscoveryClient::new(options).map_err(DiscoveryRunError::Unicast)?;
109        match client.discover(&["alpine-control".to_string()]) {
110            Ok(outcome) => return Ok(outcome),
111            Err(err) => {
112                if opts.allow_broadcast && remote_addr.is_ipv4() {
113                    warn!(
114                        "[ALPINE][DISCOVERY][WARN] unicast discovery failed ({}); falling back to broadcast",
115                        err
116                    );
117                    match attempt_broadcast(
118                        remote_addr.port(),
119                        &opts,
120                        Some(local_bind.ip()),
121                    ) {
122                        Ok(outcome) => return Ok(outcome),
123                        Err(broadcast_err) => {
124                            if let Ok(outcome) = attempt_cached_unicast(&opts) {
125                                return Ok(outcome);
126                            }
127                            if opts.scan_subnets {
128                                return attempt_subnet_scan(
129                                    remote_addr.port(),
130                                    &opts,
131                                    Some(local_bind.ip()),
132                                )
133                                .await;
134                            }
135                            return Err(broadcast_err);
136                        }
137                    }
138                }
139                return Err(DiscoveryRunError::Unicast(err));
140            }
141        }
142    }
143
144    match attempt_broadcast(remote_addr.port(), &opts, None) {
145        Ok(outcome) => Ok(outcome),
146        Err(broadcast_err) => {
147            if let Ok(outcome) = attempt_cached_unicast(&opts) {
148                return Ok(outcome);
149            }
150            if opts.scan_subnets {
151                return attempt_subnet_scan(remote_addr.port(), &opts, None).await;
152            }
153            Err(broadcast_err)
154        }
155    }
156}
157
158fn attempt_broadcast(
159    port: u16,
160    opts: &DiscoveryRunOptions,
161    preferred_ip: Option<IpAddr>,
162) -> Result<DiscoveryOutcome, DiscoveryRunError> {
163    let mut attempts = collect_interfaces()?;
164    if attempts.is_empty() {
165        return Err(DiscoveryRunError::NoInterfaces);
166    }
167
168    if let Some(pref) = preferred_ip {
169        if let Some(idx) = attempts.iter().position(|a| IpAddr::V4(a.local_ip) == pref) {
170            let preferred = attempts.remove(idx);
171            attempts.insert(0, preferred);
172        }
173    }
174
175    let mut last_err: Option<DiscoveryError> = None;
176    for attempt in attempts.iter() {
177        let mut options = DiscoveryClientOptions::new(
178            SocketAddr::new(IpAddr::V4(attempt.broadcast), port),
179            SocketAddr::new(IpAddr::V4(attempt.local_ip), 0),
180            Duration::from_secs(3),
181        );
182        options.interface = Some(attempt.iface.clone());
183        options.attester_registry = opts.attester_registry.clone();
184        if !opts.prefer_multicast {
185            options = options.disable_multicast();
186        }
187        if !opts.allow_broadcast {
188            options = options.disable_broadcast();
189        }
190        info!(
191            "[ALPINE][DISCOVERY] iface={} local_ip={} netmask={} broadcast={} bound={}:0 so_broadcast={}",
192            attempt.iface,
193            attempt.local_ip,
194            attempt.netmask,
195            attempt.broadcast,
196            attempt.local_ip,
197            options.allow_broadcast
198        );
199
200        match DiscoveryClient::new(options) {
201            Ok(client) => match client.discover(&["alpine-control".to_string()]) {
202                Ok(outcome) => return Ok(outcome),
203                Err(err) => {
204                    warn!(
205                        "[ALPINE][DISCOVERY][WARN] iface={} error={}",
206                        attempt.iface, err
207                    );
208                    last_err = Some(err);
209                    continue;
210                }
211            },
212            Err(err) => {
213                warn!(
214                    "[ALPINE][DISCOVERY][WARN] iface={} error={}",
215                    attempt.iface, err
216                );
217                last_err = Some(err);
218                continue;
219            }
220        }
221    }
222
223    Err(DiscoveryRunError::Broadcast(
224        last_err.unwrap_or(DiscoveryError::Timeout),
225    ))
226}
227
228fn attempt_cached_unicast(opts: &DiscoveryRunOptions) -> Result<DiscoveryOutcome, DiscoveryRunError> {
229    if opts.cached_targets.is_empty() {
230        return Err(DiscoveryRunError::CachedUnicastFailed);
231    }
232    for target in opts.cached_targets.iter() {
233        let local_addr = resolve_local_addr(*target, None);
234        let mut options = DiscoveryClientOptions::new(
235            *target,
236            local_addr,
237            Duration::from_millis(opts.scan_timeout_ms),
238        );
239        options.attester_registry = opts.attester_registry.clone();
240        options = options.disable_multicast().disable_broadcast();
241        info!(
242            "[ALPINE][DISCOVERY] cached_unicast target={} local_bind={}",
243            target, options.local_addr
244        );
245        if let Ok(client) = DiscoveryClient::new(options) {
246            match client.discover(&["alpine-control".to_string()]) {
247                Ok(outcome) => return Ok(outcome),
248                Err(err) => {
249                    warn!(
250                        "[ALPINE][DISCOVERY][WARN] cached_unicast target={} error={}",
251                        target, err
252                    );
253                }
254            }
255        }
256    }
257    Err(DiscoveryRunError::CachedUnicastFailed)
258}
259
260async fn attempt_subnet_scan(
261    port: u16,
262    opts: &DiscoveryRunOptions,
263    preferred_ip: Option<IpAddr>,
264) -> Result<DiscoveryOutcome, DiscoveryRunError> {
265    if opts.scan_max_hosts == 0 {
266        return Err(DiscoveryRunError::SubnetScanDisabled);
267    }
268    let mut attempts = collect_interfaces()?;
269    if attempts.is_empty() {
270        return Err(DiscoveryRunError::NoInterfaces);
271    }
272    if let Some(pref) = preferred_ip {
273        if let Some(idx) = attempts.iter().position(|a| IpAddr::V4(a.local_ip) == pref) {
274            let preferred = attempts.remove(idx);
275            attempts.insert(0, preferred);
276        }
277    }
278
279    let rate = opts.scan_rate_per_sec.max(1);
280    let delay_ms = (1000u64 / rate as u64).max(1);
281    let timeout_ms = opts.scan_timeout_ms.max(100);
282
283    for attempt in attempts.iter() {
284        let mut scanned = 0u32;
285        let (network, broadcast) = network_bounds(attempt.local_ip, attempt.netmask);
286        let start = network.saturating_add(1);
287        let end = broadcast.saturating_sub(1);
288        info!(
289            "[ALPINE][DISCOVERY] subnet_scan iface={} local_ip={} netmask={} range={}.{} timeout_ms={} rate_per_sec={} max_hosts={}",
290            attempt.iface,
291            attempt.local_ip,
292            attempt.netmask,
293            std::net::Ipv4Addr::from(network),
294            std::net::Ipv4Addr::from(broadcast),
295            timeout_ms,
296            rate,
297            opts.scan_max_hosts
298        );
299        let mut ip = start;
300        while ip <= end && scanned < opts.scan_max_hosts {
301            let target_ip = std::net::Ipv4Addr::from(ip);
302            if target_ip != attempt.local_ip {
303                let target = SocketAddr::new(IpAddr::V4(target_ip), port);
304                let local_addr = SocketAddr::new(IpAddr::V4(attempt.local_ip), 0);
305                let mut options = DiscoveryClientOptions::new(
306                    target,
307                    local_addr,
308                    Duration::from_millis(timeout_ms),
309                );
310                options.attester_registry = opts.attester_registry.clone();
311                options = options.disable_multicast().disable_broadcast();
312                if let Ok(client) = DiscoveryClient::new(options) {
313                    if let Ok(outcome) = client.discover(&["alpine-control".to_string()]) {
314                        return Ok(outcome);
315                    }
316                }
317                scanned += 1;
318                sleep(TokioDuration::from_millis(delay_ms)).await;
319            }
320            ip = ip.saturating_add(1);
321        }
322    }
323    Err(DiscoveryRunError::SubnetScanFailed)
324}
325
326struct IfaceAttempt {
327    iface: String,
328    local_ip: std::net::Ipv4Addr,
329    netmask: std::net::Ipv4Addr,
330    broadcast: std::net::Ipv4Addr,
331}
332
333fn collect_interfaces() -> Result<Vec<IfaceAttempt>, DiscoveryRunError> {
334    let mut attempts = Vec::new();
335    let ifaces = get_if_addrs::get_if_addrs().map_err(|_| DiscoveryRunError::NoInterfaces)?;
336    for iface in ifaces {
337        if iface.is_loopback() {
338            continue;
339        }
340        if let get_if_addrs::IfAddr::V4(v4) = iface.addr {
341            let ipv4 = v4.ip;
342            let maskv4 = v4.netmask;
343            let ip_u32 = u32::from_be_bytes(ipv4.octets());
344            let mask_u32 = u32::from_be_bytes(maskv4.octets());
345            let bcast = ip_u32 | (!mask_u32);
346            let bcast_ip = std::net::Ipv4Addr::from(bcast.to_be_bytes());
347            attempts.push(IfaceAttempt {
348                iface: iface.name,
349                local_ip: ipv4,
350                netmask: maskv4,
351                broadcast: bcast_ip,
352            });
353        }
354    }
355    Ok(attempts)
356}
357
358fn network_bounds(ip: std::net::Ipv4Addr, netmask: std::net::Ipv4Addr) -> (u32, u32) {
359    let ip_u32 = u32::from_be_bytes(ip.octets());
360    let mask_u32 = u32::from_be_bytes(netmask.octets());
361    let network = ip_u32 & mask_u32;
362    let broadcast = network | (!mask_u32);
363    (network, broadcast)
364}