alpine_protocol_sdk/discovery/
client.rs

1use std::{
2    fmt, io,
3    net::{IpAddr, SocketAddr, UdpSocket},
4    time::Duration,
5};
6
7use alpine::messages::{DiscoveryReply, DiscoveryRequest};
8use rand::{rngs::OsRng, RngCore};
9use serde_cbor;
10use tracing::{info, warn};
11
12use socket2::{Domain, Protocol, Socket, Type};
13
14use crate::phase::{current_phase, Phase};
15
16const DEFAULT_MULTICAST_IPV4: &str = "239.255.255.250:19455";
17const DEFAULT_MULTICAST_IPV6: &str = "[ff12::1]:19455";
18const DEFAULT_BROADCAST_IPV4: &str = "255.255.255.255:19455";
19
20/// Options used to configure the blocking discovery helper.
21pub struct DiscoveryClientOptions {
22    pub remote_addr: SocketAddr,
23    pub local_addr: SocketAddr,
24    pub timeout: Duration,
25    pub prefer_multicast: bool,
26    pub allow_broadcast: bool,
27    pub interface: Option<String>,
28}
29
30impl DiscoveryClientOptions {
31    /// Creates options with the provided remote socket and a default timeout.
32    pub fn new(remote_addr: SocketAddr, local_addr: SocketAddr, timeout: Duration) -> Self {
33        Self {
34            remote_addr,
35            local_addr,
36            timeout,
37            prefer_multicast: false,
38            allow_broadcast: true,
39            interface: None,
40        }
41    }
42
43    pub fn disable_multicast(mut self) -> Self {
44        self.prefer_multicast = false;
45        self
46    }
47
48    pub fn disable_broadcast(mut self) -> Self {
49        self.allow_broadcast = false;
50        self
51    }
52}
53
54/// Errors that can happen while sending or receiving discovery payloads.
55#[derive(Debug)]
56pub enum DiscoveryError {
57    Io(io::Error),
58    Decode(serde_cbor::Error),
59    Timeout,
60    PermissionDenied,
61    MulticastUnavailable,
62    BroadcastBlocked,
63}
64
65impl fmt::Display for DiscoveryError {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        match self {
68            DiscoveryError::Io(err) => write!(f, "io error: {}", err),
69            DiscoveryError::Decode(err) => write!(f, "cbors serialization error: {}", err),
70            DiscoveryError::Timeout => write!(f, "discovery timed out"),
71            DiscoveryError::PermissionDenied => {
72                write!(f, "discovery channel permission denied")
73            }
74            DiscoveryError::MulticastUnavailable => {
75                write!(f, "multicast discovery unavailable")
76            }
77            DiscoveryError::BroadcastBlocked => write!(f, "broadcast discovery blocked"),
78        }
79    }
80}
81
82impl std::error::Error for DiscoveryError {}
83
84impl From<io::Error> for DiscoveryError {
85    fn from(err: io::Error) -> Self {
86        match err.kind() {
87            io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
88            _ => DiscoveryError::Io(err),
89        }
90    }
91}
92
93impl From<serde_cbor::Error> for DiscoveryError {
94    fn from(err: serde_cbor::Error) -> Self {
95        DiscoveryError::Decode(err)
96    }
97}
98
99/// The outcome of a discovery request.
100pub struct DiscoveryOutcome {
101    pub reply: DiscoveryReply,
102    pub peer: SocketAddr,
103    pub client_nonce: Vec<u8>,
104    pub local_addr: SocketAddr,
105    pub device_identity_pubkey: Option<Vec<u8>>,
106    pub interface: Option<String>,
107}
108
109/// Stateless discovery helper that wraps the protocol request/response models.
110pub struct DiscoveryClient {
111    socket: UdpSocket,
112    remote_addr: SocketAddr,
113    prefer_multicast: bool,
114    allow_broadcast: bool,
115    ipv6: bool,
116    interface: Option<String>,
117}
118
119impl DiscoveryClient {
120    /// Creates a client that will send discovery packets to `remote_addr`.
121    pub fn new(options: DiscoveryClientOptions) -> Result<Self, DiscoveryError> {
122        let domain = if options.remote_addr.is_ipv4() {
123            Domain::IPV4
124        } else {
125            Domain::IPV6
126        };
127        let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
128        if options.allow_broadcast && options.remote_addr.is_ipv4() {
129            socket.set_broadcast(true)?;
130        }
131        socket.bind(&options.local_addr.into())?;
132
133        let socket: UdpSocket = socket.into();
134        socket.set_read_timeout(Some(options.timeout))?;
135        // TTL of 4 keeps traffic on the local segment but makes sure it leaves the host.
136        if options.prefer_multicast || options.remote_addr.ip().is_multicast() {
137            let _ = socket.set_multicast_ttl_v4(4);
138        }
139        let local_addr = socket.local_addr().unwrap_or(options.local_addr);
140        info!(
141            "[ALPINE][DISCOVERY][SOCKET] discovery socket created local_addr={} remote_addr={} prefer_multicast={}",
142            local_addr,
143            options.remote_addr,
144            options.prefer_multicast
145        );
146        Ok(Self {
147            socket,
148            remote_addr: options.remote_addr,
149            prefer_multicast: options.prefer_multicast,
150            allow_broadcast: options.allow_broadcast,
151            ipv6: options.remote_addr.is_ipv6() || options.local_addr.is_ipv6(),
152            interface: options.interface.clone(),
153        })
154    }
155
156    /// Sends a discovery payload with the requested capability names and waits for a reply.
157    pub fn discover(&self, requested: &[String]) -> Result<DiscoveryOutcome, DiscoveryError> {
158        let phase = current_phase();
159        if phase == Phase::Handshake {
160            warn!(
161                "[ALPINE][BUG] discovery attempted during handshake phase (no sends expected); local_addr={}",
162                self.socket
163                    .local_addr()
164                    .unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 0)))
165            );
166        }
167
168        let mut nonce = vec![0u8; 32];
169        OsRng.fill_bytes(&mut nonce);
170        let request = DiscoveryRequest::new(requested.to_vec(), nonce.clone());
171        let payload = serde_cbor::to_vec(&request)?;
172        let payload_len = payload.len();
173        debug_assert!(
174            payload_len > 8,
175            "discovery payload unexpectedly small; framing may have drifted"
176        );
177
178        let mut send_error: Option<DiscoveryError> = None;
179        let mut sent = false;
180        for target in self.discovery_targets() {
181            let mode = classify_target(target, self.remote_addr);
182            let local_bind = self
183                .socket
184                .local_addr()
185                .map(|addr| addr.to_string())
186                .unwrap_or_else(|_| "unknown".into());
187            info!(
188                "[ALPINE][DISCOVERY][TX][attempt] target={} mode={} local_bind={} payload_len={}",
189                target,
190                mode,
191                local_bind,
192                payload.len()
193            );
194            match self.socket.send_to(&payload, target) {
195                Ok(bytes) => {
196                    info!(
197                        "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} bytes_sent={}",
198                        target,
199                        mode,
200                        local_bind,
201                        bytes
202                    );
203                    sent = true;
204                }
205                Err(err) => {
206                    let kind = err.kind();
207                    let mapped = self.map_send_error(err, target);
208                    warn!(
209                        "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} phase={} error={}",
210                        target,
211                        mode,
212                        local_bind,
213                        phase.label(),
214                        mapped
215                    );
216                    send_error = Some(mapped);
217                    if !self.should_continue_after_error(&kind) {
218                        return Err(send_error.unwrap());
219                    }
220                }
221            }
222        }
223
224        if !sent {
225            return Err(send_error.unwrap_or_else(|| DiscoveryError::PermissionDenied));
226        }
227
228        let timeout_ms = self
229            .socket
230            .read_timeout()
231            .ok()
232            .flatten()
233            .map(|d| d.as_millis())
234            .unwrap_or_default();
235        let local_port = self
236            .socket
237            .local_addr()
238            .map(|addr| addr.port())
239            .unwrap_or_default();
240        info!(
241            "[ALPINE][DISCOVERY] awaiting reply timeout_ms={} local_port={} remote_hint={}",
242            timeout_ms, local_port, self.remote_addr
243        );
244
245        let mut buf = vec![0u8; 2048];
246        let (len, peer) = match self.socket.recv_from(&mut buf) {
247            Ok(res) => res,
248            Err(err) => return Err(self.map_recv_error(err)),
249        };
250        let reply: DiscoveryReply = serde_cbor::from_slice(&buf[..len])?;
251        let local_addr = self.socket.local_addr().map_err(DiscoveryError::from)?;
252        info!(
253            "[ALPINE][DISCOVERY][RX] reply received peer={} local_addr={} iface={:?} bytes={}",
254            peer, local_addr, self.interface, len
255        );
256        let outcome = DiscoveryOutcome {
257            reply: reply.clone(),
258            peer,
259            client_nonce: nonce,
260            local_addr,
261            device_identity_pubkey: if reply.device_identity_pubkey.is_empty() {
262                None
263            } else {
264                Some(reply.device_identity_pubkey.clone())
265            },
266            interface: self.interface.clone(),
267        };
268        Ok(outcome)
269    }
270
271    fn discovery_targets(&self) -> Vec<SocketAddr> {
272        let mut targets = Vec::new();
273
274        if self.prefer_multicast {
275            if !self.ipv6 {
276                if let Ok(addr) = DEFAULT_MULTICAST_IPV4.parse() {
277                    push_if_unique(&mut targets, addr);
278                }
279            }
280            if self.ipv6 {
281                if let Ok(addr) = DEFAULT_MULTICAST_IPV6.parse() {
282                    push_if_unique(&mut targets, addr);
283                }
284            }
285        }
286
287        push_if_unique(&mut targets, self.remote_addr);
288
289        if self.allow_broadcast && self.remote_addr.ip().is_ipv4() && !self.ipv6 {
290            if let Ok(addr) = DEFAULT_BROADCAST_IPV4.parse() {
291                push_if_unique(&mut targets, addr);
292            }
293        }
294
295        targets
296    }
297
298    fn map_send_error(&self, err: io::Error, target: SocketAddr) -> DiscoveryError {
299        match err.kind() {
300            io::ErrorKind::PermissionDenied => {
301                if target.ip().is_multicast() {
302                    DiscoveryError::MulticastUnavailable
303                } else if is_broadcast_addr(target.ip()) {
304                    DiscoveryError::BroadcastBlocked
305                } else {
306                    DiscoveryError::PermissionDenied
307                }
308            }
309            io::ErrorKind::ConnectionReset | io::ErrorKind::WouldBlock => {
310                DiscoveryError::PermissionDenied
311            }
312            _ => DiscoveryError::Io(err),
313        }
314    }
315
316    fn map_recv_error(&self, err: io::Error) -> DiscoveryError {
317        match err.kind() {
318            io::ErrorKind::TimedOut => DiscoveryError::Timeout,
319            io::ErrorKind::PermissionDenied | io::ErrorKind::ConnectionReset => {
320                DiscoveryError::PermissionDenied
321            }
322            io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
323            _ => DiscoveryError::Io(err),
324        }
325    }
326
327    fn should_continue_after_error(&self, kind: &io::ErrorKind) -> bool {
328        matches!(
329            kind,
330            io::ErrorKind::PermissionDenied
331                | io::ErrorKind::WouldBlock
332                | io::ErrorKind::ConnectionReset
333        )
334    }
335}
336
337impl Drop for DiscoveryClient {
338    fn drop(&mut self) {
339        if let Ok(local) = self.socket.local_addr() {
340            info!(
341                "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped local_addr={} remote_addr={}",
342                local, self.remote_addr
343            );
344        } else {
345            info!(
346                "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped remote_addr={}",
347                self.remote_addr
348            );
349        }
350    }
351}
352
353fn is_broadcast_addr(ip: IpAddr) -> bool {
354    matches!(ip, IpAddr::V4(addr) if addr.is_broadcast())
355}
356
357fn push_if_unique(targets: &mut Vec<SocketAddr>, candidate: SocketAddr) {
358    if !targets.contains(&candidate) {
359        targets.push(candidate);
360    }
361}
362
363fn classify_target(target: SocketAddr, configured: SocketAddr) -> &'static str {
364    if target.ip().is_multicast() {
365        "multicast"
366    } else if is_broadcast_addr(target.ip()) {
367        "broadcast"
368    } else if target == configured {
369        "unicast-configured"
370    } else {
371        "unicast"
372    }
373}