alpine_protocol_sdk/discovery/
client.rs

1use std::{
2    fmt, io,
3    net::{IpAddr, SocketAddr, UdpSocket},
4    time::Duration,
5};
6
7use alpine::attestation::{verify_device_identity_attestation, AttesterRegistry};
8use alpine::messages::{DiscoveryReply, DiscoveryRequest};
9use rand::{rngs::OsRng, RngCore};
10use serde_cbor;
11use tracing::{info, warn};
12
13use socket2::{Domain, Protocol, Socket, Type};
14
15use crate::phase::{current_phase, Phase};
16
17const DEFAULT_MULTICAST_IPV4: &str = "239.255.255.250:19455";
18const DEFAULT_MULTICAST_IPV6: &str = "[ff12::1]:19455";
19const DEFAULT_BROADCAST_IPV4: &str = "255.255.255.255:19455";
20
21/// Options used to configure the blocking discovery helper.
22pub struct DiscoveryClientOptions {
23    pub remote_addr: SocketAddr,
24    pub local_addr: SocketAddr,
25    pub timeout: Duration,
26    pub prefer_multicast: bool,
27    pub allow_broadcast: bool,
28    pub interface: Option<String>,
29    pub attester_registry: Option<AttesterRegistry>,
30}
31
32impl DiscoveryClientOptions {
33    /// Creates options with the provided remote socket and a default timeout.
34    pub fn new(remote_addr: SocketAddr, local_addr: SocketAddr, timeout: Duration) -> Self {
35        Self {
36            remote_addr,
37            local_addr,
38            timeout,
39            prefer_multicast: false,
40            allow_broadcast: true,
41            interface: None,
42            attester_registry: None,
43        }
44    }
45
46    pub fn disable_multicast(mut self) -> Self {
47        self.prefer_multicast = false;
48        self
49    }
50
51    pub fn disable_broadcast(mut self) -> Self {
52        self.allow_broadcast = false;
53        self
54    }
55
56    pub fn with_attester_registry(mut self, registry: AttesterRegistry) -> Self {
57        self.attester_registry = Some(registry);
58        self
59    }
60}
61
62/// Errors that can happen while sending or receiving discovery payloads.
63#[derive(Debug)]
64pub enum DiscoveryError {
65    Io(io::Error),
66    Decode(serde_cbor::Error),
67    Timeout,
68    PermissionDenied,
69    MulticastUnavailable,
70    BroadcastBlocked,
71}
72
73impl fmt::Display for DiscoveryError {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        match self {
76            DiscoveryError::Io(err) => write!(f, "io error: {}", err),
77            DiscoveryError::Decode(err) => write!(f, "cbors serialization error: {}", err),
78            DiscoveryError::Timeout => write!(f, "discovery timed out"),
79            DiscoveryError::PermissionDenied => {
80                write!(f, "discovery channel permission denied")
81            }
82            DiscoveryError::MulticastUnavailable => {
83                write!(f, "multicast discovery unavailable")
84            }
85            DiscoveryError::BroadcastBlocked => write!(f, "broadcast discovery blocked"),
86        }
87    }
88}
89
90impl std::error::Error for DiscoveryError {}
91
92impl From<io::Error> for DiscoveryError {
93    fn from(err: io::Error) -> Self {
94        match err.kind() {
95            io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
96            _ => DiscoveryError::Io(err),
97        }
98    }
99}
100
101impl From<serde_cbor::Error> for DiscoveryError {
102    fn from(err: serde_cbor::Error) -> Self {
103        DiscoveryError::Decode(err)
104    }
105}
106
107/// The outcome of a discovery request.
108pub struct DiscoveryOutcome {
109    pub reply: DiscoveryReply,
110    pub peer: SocketAddr,
111    pub client_nonce: Vec<u8>,
112    pub local_addr: SocketAddr,
113    pub device_identity_pubkey: Option<Vec<u8>>,
114    pub device_identity_trusted: bool,
115    pub device_identity_attestation_error: Option<String>,
116    pub interface: Option<String>,
117}
118
119/// Stateless discovery helper that wraps the protocol request/response models.
120pub struct DiscoveryClient {
121    socket: UdpSocket,
122    remote_addr: SocketAddr,
123    prefer_multicast: bool,
124    allow_broadcast: bool,
125    ipv6: bool,
126    interface: Option<String>,
127    attester_registry: Option<AttesterRegistry>,
128}
129
130impl DiscoveryClient {
131    /// Creates a client that will send discovery packets to `remote_addr`.
132    pub fn new(options: DiscoveryClientOptions) -> Result<Self, DiscoveryError> {
133        let domain = if options.remote_addr.is_ipv4() {
134            Domain::IPV4
135        } else {
136            Domain::IPV6
137        };
138        let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
139        if options.allow_broadcast && options.remote_addr.is_ipv4() {
140            socket.set_broadcast(true)?;
141        }
142        socket.bind(&options.local_addr.into())?;
143
144        let socket: UdpSocket = socket.into();
145        socket.set_read_timeout(Some(options.timeout))?;
146        // TTL of 4 keeps traffic on the local segment but makes sure it leaves the host.
147        if options.prefer_multicast || options.remote_addr.ip().is_multicast() {
148            let _ = socket.set_multicast_ttl_v4(4);
149        }
150        let local_addr = socket.local_addr().unwrap_or(options.local_addr);
151        info!(
152            "[ALPINE][DISCOVERY][SOCKET] discovery socket created local_addr={} remote_addr={} prefer_multicast={}",
153            local_addr,
154            options.remote_addr,
155            options.prefer_multicast
156        );
157        Ok(Self {
158            socket,
159            remote_addr: options.remote_addr,
160            prefer_multicast: options.prefer_multicast,
161            allow_broadcast: options.allow_broadcast,
162            ipv6: options.remote_addr.is_ipv6() || options.local_addr.is_ipv6(),
163            interface: options.interface.clone(),
164            attester_registry: options.attester_registry.clone(),
165        })
166    }
167
168    /// Sends a discovery payload with the requested capability names and waits for a reply.
169    pub fn discover(&self, requested: &[String]) -> Result<DiscoveryOutcome, DiscoveryError> {
170        let phase = current_phase();
171        if phase == Phase::Handshake {
172            warn!(
173                "[ALPINE][BUG] discovery attempted during handshake phase (no sends expected); local_addr={}",
174                self.socket
175                    .local_addr()
176                    .unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 0)))
177            );
178        }
179
180        let mut nonce = vec![0u8; 32];
181        OsRng.fill_bytes(&mut nonce);
182        let request = DiscoveryRequest::new(requested.to_vec(), nonce.clone());
183        let payload = serde_cbor::to_vec(&request)?;
184        let payload_len = payload.len();
185        debug_assert!(
186            payload_len > 8,
187            "discovery payload unexpectedly small; framing may have drifted"
188        );
189
190        let mut send_error: Option<DiscoveryError> = None;
191        let mut sent = false;
192        for target in self.discovery_targets() {
193            let mode = classify_target(target, self.remote_addr);
194            let local_bind = self
195                .socket
196                .local_addr()
197                .map(|addr| addr.to_string())
198                .unwrap_or_else(|_| "unknown".into());
199            info!(
200                "[ALPINE][DISCOVERY][TX][attempt] target={} mode={} local_bind={} payload_len={}",
201                target,
202                mode,
203                local_bind,
204                payload.len()
205            );
206            match self.socket.send_to(&payload, target) {
207                Ok(bytes) => {
208                    info!(
209                        "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} bytes_sent={}",
210                        target,
211                        mode,
212                        local_bind,
213                        bytes
214                    );
215                    sent = true;
216                }
217                Err(err) => {
218                    let kind = err.kind();
219                    let mapped = self.map_send_error(err, target);
220                    warn!(
221                        "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} phase={} error={}",
222                        target,
223                        mode,
224                        local_bind,
225                        phase.label(),
226                        mapped
227                    );
228                    send_error = Some(mapped);
229                    if !self.should_continue_after_error(&kind) {
230                        return Err(send_error.unwrap());
231                    }
232                }
233            }
234        }
235
236        if !sent {
237            return Err(send_error.unwrap_or_else(|| DiscoveryError::PermissionDenied));
238        }
239
240        let timeout_ms = self
241            .socket
242            .read_timeout()
243            .ok()
244            .flatten()
245            .map(|d| d.as_millis())
246            .unwrap_or_default();
247        let local_port = self
248            .socket
249            .local_addr()
250            .map(|addr| addr.port())
251            .unwrap_or_default();
252        info!(
253            "[ALPINE][DISCOVERY] awaiting reply timeout_ms={} local_port={} remote_hint={}",
254            timeout_ms, local_port, self.remote_addr
255        );
256
257        let mut buf = vec![0u8; 2048];
258        let (len, peer) = match self.socket.recv_from(&mut buf) {
259            Ok(res) => res,
260            Err(err) => return Err(self.map_recv_error(err)),
261        };
262        let reply: DiscoveryReply = serde_cbor::from_slice(&buf[..len])?;
263        let local_addr = self.socket.local_addr().map_err(DiscoveryError::from)?;
264        info!(
265            "[ALPINE][DISCOVERY][RX] reply received peer={} local_addr={} iface={:?} bytes={}",
266            peer, local_addr, self.interface, len
267        );
268        let (device_identity_trusted, device_identity_attestation_error) =
269            self.verify_attestation(&reply);
270        let outcome = DiscoveryOutcome {
271            reply: reply.clone(),
272            peer,
273            client_nonce: nonce,
274            local_addr,
275            device_identity_pubkey: if reply.device_identity_pubkey.is_empty() {
276                None
277            } else {
278                Some(reply.device_identity_pubkey.clone())
279            },
280            device_identity_trusted,
281            device_identity_attestation_error,
282            interface: self.interface.clone(),
283        };
284        Ok(outcome)
285    }
286
287    fn discovery_targets(&self) -> Vec<SocketAddr> {
288        let mut targets = Vec::new();
289
290        if self.prefer_multicast {
291            if !self.ipv6 {
292                if let Ok(addr) = DEFAULT_MULTICAST_IPV4.parse() {
293                    push_if_unique(&mut targets, addr);
294                }
295            }
296            if self.ipv6 {
297                if let Ok(addr) = DEFAULT_MULTICAST_IPV6.parse() {
298                    push_if_unique(&mut targets, addr);
299                }
300            }
301        }
302
303        push_if_unique(&mut targets, self.remote_addr);
304
305        if self.allow_broadcast && self.remote_addr.ip().is_ipv4() && !self.ipv6 {
306            if let Ok(addr) = DEFAULT_BROADCAST_IPV4.parse() {
307                push_if_unique(&mut targets, addr);
308            }
309        }
310
311        targets
312    }
313
314    fn map_send_error(&self, err: io::Error, target: SocketAddr) -> DiscoveryError {
315        match err.kind() {
316            io::ErrorKind::PermissionDenied => {
317                if target.ip().is_multicast() {
318                    DiscoveryError::MulticastUnavailable
319                } else if is_broadcast_addr(target.ip()) {
320                    DiscoveryError::BroadcastBlocked
321                } else {
322                    DiscoveryError::PermissionDenied
323                }
324            }
325            io::ErrorKind::ConnectionReset | io::ErrorKind::WouldBlock => {
326                DiscoveryError::PermissionDenied
327            }
328            _ => DiscoveryError::Io(err),
329        }
330    }
331
332    fn map_recv_error(&self, err: io::Error) -> DiscoveryError {
333        match err.kind() {
334            io::ErrorKind::TimedOut => DiscoveryError::Timeout,
335            io::ErrorKind::PermissionDenied | io::ErrorKind::ConnectionReset => {
336                DiscoveryError::PermissionDenied
337            }
338            io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
339            _ => DiscoveryError::Io(err),
340        }
341    }
342
343    fn should_continue_after_error(&self, kind: &io::ErrorKind) -> bool {
344        matches!(
345            kind,
346            io::ErrorKind::PermissionDenied
347                | io::ErrorKind::WouldBlock
348                | io::ErrorKind::ConnectionReset
349        )
350    }
351
352    fn verify_attestation(
353        &self,
354        reply: &DiscoveryReply,
355    ) -> (bool, Option<String>) {
356        if reply.device_identity_attestation.is_empty() {
357            return (false, Some("device identity attestation missing".into()));
358        }
359        let Some(registry) = &self.attester_registry else {
360            return (
361                false,
362                Some("attester registry not configured".into()),
363            );
364        };
365        match verify_device_identity_attestation(reply, registry, std::time::SystemTime::now()) {
366            Ok(_) => (true, None),
367            Err(err) => {
368                warn!(
369                    "[ALPINE][DISCOVERY][TRUST] attestation verification failed device_id={} err={}",
370                    reply.device_id,
371                    err
372                );
373                (false, Some(err.to_string()))
374            }
375        }
376    }
377}
378
379impl Drop for DiscoveryClient {
380    fn drop(&mut self) {
381        if let Ok(local) = self.socket.local_addr() {
382            info!(
383                "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped local_addr={} remote_addr={}",
384                local, self.remote_addr
385            );
386        } else {
387            info!(
388                "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped remote_addr={}",
389                self.remote_addr
390            );
391        }
392    }
393}
394
395fn is_broadcast_addr(ip: IpAddr) -> bool {
396    matches!(ip, IpAddr::V4(addr) if addr.is_broadcast())
397}
398
399fn push_if_unique(targets: &mut Vec<SocketAddr>, candidate: SocketAddr) {
400    if !targets.contains(&candidate) {
401        targets.push(candidate);
402    }
403}
404
405fn classify_target(target: SocketAddr, configured: SocketAddr) -> &'static str {
406    if target.ip().is_multicast() {
407        "multicast"
408    } else if is_broadcast_addr(target.ip()) {
409        "broadcast"
410    } else if target == configured {
411        "unicast-configured"
412    } else {
413        "unicast"
414    }
415}