alpine_protocol_sdk/discovery/
runner.rs1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2use std::time::Duration;
3
4use alpine::attestation::AttesterRegistry;
5use thiserror::Error;
6use tokio::time::{sleep, Duration as TokioDuration};
7use tracing::{info, warn};
8
9use crate::discovery::{DiscoveryClient, DiscoveryClientOptions, DiscoveryError, DiscoveryOutcome};
10use crate::phase::claim_discovery;
11
12#[derive(Debug, Clone)]
13pub struct DiscoveryRunOptions {
14 pub local_addr: Option<SocketAddr>,
15 pub prefer_multicast: bool,
16 pub allow_broadcast: bool,
17 pub cached_targets: Vec<SocketAddr>,
18 pub scan_subnets: bool,
19 pub scan_rate_per_sec: u32,
20 pub scan_timeout_ms: u64,
21 pub scan_max_hosts: u32,
22 pub attester_registry: Option<AttesterRegistry>,
23}
24
25impl Default for DiscoveryRunOptions {
26 fn default() -> Self {
27 Self {
28 local_addr: None,
29 prefer_multicast: false,
30 allow_broadcast: true,
31 cached_targets: Vec::new(),
32 scan_subnets: false,
33 scan_rate_per_sec: 200,
34 scan_timeout_ms: 500,
35 scan_max_hosts: 1024,
36 attester_registry: None,
37 }
38 }
39}
40
41#[derive(Debug, Error)]
42pub enum DiscoveryRunError {
43 #[error("unicast discovery failed: {0}")]
44 Unicast(DiscoveryError),
45 #[error("broadcast discovery failed: {0}")]
46 Broadcast(DiscoveryError),
47 #[error("no viable interfaces for broadcast discovery (need a non-loopback IPv4 address)")]
48 NoInterfaces,
49 #[error("cached unicast discovery failed")]
50 CachedUnicastFailed,
51 #[error("subnet scan discovery failed")]
52 SubnetScanFailed,
53 #[error("subnet scan disabled (scan_max_hosts = 0)")]
54 SubnetScanDisabled,
55}
56
57fn default_local_addr() -> SocketAddr {
58 SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
59}
60
61fn resolve_local_addr(remote_addr: SocketAddr, override_addr: Option<SocketAddr>) -> SocketAddr {
62 if let Some(addr) = override_addr {
63 return addr;
64 }
65 if matches!(remote_addr.ip(), IpAddr::V4(v4) if v4.is_broadcast()) {
66 return default_local_addr();
67 }
68 let bind_addr = if remote_addr.is_ipv4() {
69 SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
70 } else {
71 SocketAddr::new(IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED), 0)
72 };
73 if let Ok(sock) = std::net::UdpSocket::bind(bind_addr) {
74 if sock.connect(remote_addr).is_ok() {
75 if let Ok(local) = sock.local_addr() {
76 return SocketAddr::new(local.ip(), 0);
77 }
78 }
79 }
80 default_local_addr()
81}
82
83pub async fn run_discovery(remote_addr: SocketAddr) -> Result<DiscoveryOutcome, DiscoveryRunError> {
84 run_discovery_with_options(remote_addr, DiscoveryRunOptions::default()).await
85}
86
87pub async fn run_discovery_with_options(
88 remote_addr: SocketAddr,
89 opts: DiscoveryRunOptions,
90) -> Result<DiscoveryOutcome, DiscoveryRunError> {
91 let _phase_guard = claim_discovery().map_err(|_| DiscoveryRunError::Unicast(DiscoveryError::PermissionDenied))?;
92 let is_broadcast_target = matches!(remote_addr.ip(), IpAddr::V4(v4) if v4.is_broadcast());
93
94 if !is_broadcast_target {
95 let local_addr = resolve_local_addr(remote_addr, opts.local_addr);
96 let mut options = DiscoveryClientOptions::new(
97 remote_addr,
98 local_addr,
99 Duration::from_secs(3),
100 );
101 options.attester_registry = opts.attester_registry.clone();
102 options = options.disable_multicast().disable_broadcast();
103 info!(
104 "[ALPINE][DISCOVERY] route_hint remote={} local_bind={}",
105 remote_addr, options.local_addr
106 );
107 let local_bind = options.local_addr;
108 let client = DiscoveryClient::new(options).map_err(DiscoveryRunError::Unicast)?;
109 match client.discover(&["alpine-control".to_string()]) {
110 Ok(outcome) => return Ok(outcome),
111 Err(err) => {
112 if opts.allow_broadcast && remote_addr.is_ipv4() {
113 warn!(
114 "[ALPINE][DISCOVERY][WARN] unicast discovery failed ({}); falling back to broadcast",
115 err
116 );
117 match attempt_broadcast(
118 remote_addr.port(),
119 &opts,
120 Some(local_bind.ip()),
121 ) {
122 Ok(outcome) => return Ok(outcome),
123 Err(broadcast_err) => {
124 if let Ok(outcome) = attempt_cached_unicast(&opts) {
125 return Ok(outcome);
126 }
127 if opts.scan_subnets {
128 return attempt_subnet_scan(
129 remote_addr.port(),
130 &opts,
131 Some(local_bind.ip()),
132 )
133 .await;
134 }
135 return Err(broadcast_err);
136 }
137 }
138 }
139 return Err(DiscoveryRunError::Unicast(err));
140 }
141 }
142 }
143
144 match attempt_broadcast(remote_addr.port(), &opts, None) {
145 Ok(outcome) => Ok(outcome),
146 Err(broadcast_err) => {
147 if let Ok(outcome) = attempt_cached_unicast(&opts) {
148 return Ok(outcome);
149 }
150 if opts.scan_subnets {
151 return attempt_subnet_scan(remote_addr.port(), &opts, None).await;
152 }
153 Err(broadcast_err)
154 }
155 }
156}
157
158fn attempt_broadcast(
159 port: u16,
160 opts: &DiscoveryRunOptions,
161 preferred_ip: Option<IpAddr>,
162) -> Result<DiscoveryOutcome, DiscoveryRunError> {
163 let mut attempts = collect_interfaces()?;
164 if attempts.is_empty() {
165 return Err(DiscoveryRunError::NoInterfaces);
166 }
167
168 if let Some(pref) = preferred_ip {
169 if let Some(idx) = attempts.iter().position(|a| IpAddr::V4(a.local_ip) == pref) {
170 let preferred = attempts.remove(idx);
171 attempts.insert(0, preferred);
172 }
173 }
174
175 let mut last_err: Option<DiscoveryError> = None;
176 for attempt in attempts.iter() {
177 let mut options = DiscoveryClientOptions::new(
178 SocketAddr::new(IpAddr::V4(attempt.broadcast), port),
179 SocketAddr::new(IpAddr::V4(attempt.local_ip), 0),
180 Duration::from_secs(3),
181 );
182 options.interface = Some(attempt.iface.clone());
183 options.attester_registry = opts.attester_registry.clone();
184 if !opts.prefer_multicast {
185 options = options.disable_multicast();
186 }
187 if !opts.allow_broadcast {
188 options = options.disable_broadcast();
189 }
190 info!(
191 "[ALPINE][DISCOVERY] iface={} local_ip={} netmask={} broadcast={} bound={}:0 so_broadcast={}",
192 attempt.iface,
193 attempt.local_ip,
194 attempt.netmask,
195 attempt.broadcast,
196 attempt.local_ip,
197 options.allow_broadcast
198 );
199
200 match DiscoveryClient::new(options) {
201 Ok(client) => match client.discover(&["alpine-control".to_string()]) {
202 Ok(outcome) => return Ok(outcome),
203 Err(err) => {
204 warn!(
205 "[ALPINE][DISCOVERY][WARN] iface={} error={}",
206 attempt.iface, err
207 );
208 last_err = Some(err);
209 continue;
210 }
211 },
212 Err(err) => {
213 warn!(
214 "[ALPINE][DISCOVERY][WARN] iface={} error={}",
215 attempt.iface, err
216 );
217 last_err = Some(err);
218 continue;
219 }
220 }
221 }
222
223 Err(DiscoveryRunError::Broadcast(
224 last_err.unwrap_or(DiscoveryError::Timeout),
225 ))
226}
227
228fn attempt_cached_unicast(opts: &DiscoveryRunOptions) -> Result<DiscoveryOutcome, DiscoveryRunError> {
229 if opts.cached_targets.is_empty() {
230 return Err(DiscoveryRunError::CachedUnicastFailed);
231 }
232 for target in opts.cached_targets.iter() {
233 let local_addr = resolve_local_addr(*target, None);
234 let mut options = DiscoveryClientOptions::new(
235 *target,
236 local_addr,
237 Duration::from_millis(opts.scan_timeout_ms),
238 );
239 options.attester_registry = opts.attester_registry.clone();
240 options = options.disable_multicast().disable_broadcast();
241 info!(
242 "[ALPINE][DISCOVERY] cached_unicast target={} local_bind={}",
243 target, options.local_addr
244 );
245 if let Ok(client) = DiscoveryClient::new(options) {
246 match client.discover(&["alpine-control".to_string()]) {
247 Ok(outcome) => return Ok(outcome),
248 Err(err) => {
249 warn!(
250 "[ALPINE][DISCOVERY][WARN] cached_unicast target={} error={}",
251 target, err
252 );
253 }
254 }
255 }
256 }
257 Err(DiscoveryRunError::CachedUnicastFailed)
258}
259
260async fn attempt_subnet_scan(
261 port: u16,
262 opts: &DiscoveryRunOptions,
263 preferred_ip: Option<IpAddr>,
264) -> Result<DiscoveryOutcome, DiscoveryRunError> {
265 if opts.scan_max_hosts == 0 {
266 return Err(DiscoveryRunError::SubnetScanDisabled);
267 }
268 let mut attempts = collect_interfaces()?;
269 if attempts.is_empty() {
270 return Err(DiscoveryRunError::NoInterfaces);
271 }
272 if let Some(pref) = preferred_ip {
273 if let Some(idx) = attempts.iter().position(|a| IpAddr::V4(a.local_ip) == pref) {
274 let preferred = attempts.remove(idx);
275 attempts.insert(0, preferred);
276 }
277 }
278
279 let rate = opts.scan_rate_per_sec.max(1);
280 let delay_ms = (1000u64 / rate as u64).max(1);
281 let timeout_ms = opts.scan_timeout_ms.max(100);
282
283 for attempt in attempts.iter() {
284 let mut scanned = 0u32;
285 let (network, broadcast) = network_bounds(attempt.local_ip, attempt.netmask);
286 let start = network.saturating_add(1);
287 let end = broadcast.saturating_sub(1);
288 info!(
289 "[ALPINE][DISCOVERY] subnet_scan iface={} local_ip={} netmask={} range={}.{} timeout_ms={} rate_per_sec={} max_hosts={}",
290 attempt.iface,
291 attempt.local_ip,
292 attempt.netmask,
293 std::net::Ipv4Addr::from(network),
294 std::net::Ipv4Addr::from(broadcast),
295 timeout_ms,
296 rate,
297 opts.scan_max_hosts
298 );
299 let mut ip = start;
300 while ip <= end && scanned < opts.scan_max_hosts {
301 let target_ip = std::net::Ipv4Addr::from(ip);
302 if target_ip != attempt.local_ip {
303 let target = SocketAddr::new(IpAddr::V4(target_ip), port);
304 let local_addr = SocketAddr::new(IpAddr::V4(attempt.local_ip), 0);
305 let mut options = DiscoveryClientOptions::new(
306 target,
307 local_addr,
308 Duration::from_millis(timeout_ms),
309 );
310 options.attester_registry = opts.attester_registry.clone();
311 options = options.disable_multicast().disable_broadcast();
312 if let Ok(client) = DiscoveryClient::new(options) {
313 if let Ok(outcome) = client.discover(&["alpine-control".to_string()]) {
314 return Ok(outcome);
315 }
316 }
317 scanned += 1;
318 sleep(TokioDuration::from_millis(delay_ms)).await;
319 }
320 ip = ip.saturating_add(1);
321 }
322 }
323 Err(DiscoveryRunError::SubnetScanFailed)
324}
325
326struct IfaceAttempt {
327 iface: String,
328 local_ip: std::net::Ipv4Addr,
329 netmask: std::net::Ipv4Addr,
330 broadcast: std::net::Ipv4Addr,
331}
332
333fn collect_interfaces() -> Result<Vec<IfaceAttempt>, DiscoveryRunError> {
334 let mut attempts = Vec::new();
335 let ifaces = get_if_addrs::get_if_addrs().map_err(|_| DiscoveryRunError::NoInterfaces)?;
336 for iface in ifaces {
337 if iface.is_loopback() {
338 continue;
339 }
340 if let get_if_addrs::IfAddr::V4(v4) = iface.addr {
341 let ipv4 = v4.ip;
342 let maskv4 = v4.netmask;
343 let ip_u32 = u32::from_be_bytes(ipv4.octets());
344 let mask_u32 = u32::from_be_bytes(maskv4.octets());
345 let bcast = ip_u32 | (!mask_u32);
346 let bcast_ip = std::net::Ipv4Addr::from(bcast.to_be_bytes());
347 attempts.push(IfaceAttempt {
348 iface: iface.name,
349 local_ip: ipv4,
350 netmask: maskv4,
351 broadcast: bcast_ip,
352 });
353 }
354 }
355 Ok(attempts)
356}
357
358fn network_bounds(ip: std::net::Ipv4Addr, netmask: std::net::Ipv4Addr) -> (u32, u32) {
359 let ip_u32 = u32::from_be_bytes(ip.octets());
360 let mask_u32 = u32::from_be_bytes(netmask.octets());
361 let network = ip_u32 & mask_u32;
362 let broadcast = network | (!mask_u32);
363 (network, broadcast)
364}