alpine_protocol_sdk/discovery/
client.rs1use std::{
2 fmt, io,
3 net::{IpAddr, SocketAddr, UdpSocket},
4 time::Duration,
5};
6
7use alpine::attestation::{verify_device_identity_attestation, AttesterRegistry};
8use alpine::messages::{DiscoveryReply, DiscoveryRequest};
9use rand::{rngs::OsRng, RngCore};
10use serde_cbor;
11use tracing::{info, warn};
12
13use socket2::{Domain, Protocol, Socket, Type};
14
15use crate::phase::{current_phase, Phase};
16
17const DEFAULT_MULTICAST_IPV4: &str = "239.255.255.250:19455";
18const DEFAULT_MULTICAST_IPV6: &str = "[ff12::1]:19455";
19const DEFAULT_BROADCAST_IPV4: &str = "255.255.255.255:19455";
20
21pub struct DiscoveryClientOptions {
23 pub remote_addr: SocketAddr,
24 pub local_addr: SocketAddr,
25 pub timeout: Duration,
26 pub prefer_multicast: bool,
27 pub allow_broadcast: bool,
28 pub interface: Option<String>,
29 pub attester_registry: Option<AttesterRegistry>,
30}
31
32impl DiscoveryClientOptions {
33 pub fn new(remote_addr: SocketAddr, local_addr: SocketAddr, timeout: Duration) -> Self {
35 Self {
36 remote_addr,
37 local_addr,
38 timeout,
39 prefer_multicast: false,
40 allow_broadcast: true,
41 interface: None,
42 attester_registry: None,
43 }
44 }
45
46 pub fn disable_multicast(mut self) -> Self {
47 self.prefer_multicast = false;
48 self
49 }
50
51 pub fn disable_broadcast(mut self) -> Self {
52 self.allow_broadcast = false;
53 self
54 }
55
56 pub fn with_attester_registry(mut self, registry: AttesterRegistry) -> Self {
57 self.attester_registry = Some(registry);
58 self
59 }
60}
61
62#[derive(Debug)]
64pub enum DiscoveryError {
65 Io(io::Error),
66 Decode(serde_cbor::Error),
67 Timeout,
68 PermissionDenied,
69 MulticastUnavailable,
70 BroadcastBlocked,
71}
72
73impl fmt::Display for DiscoveryError {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 match self {
76 DiscoveryError::Io(err) => write!(f, "io error: {}", err),
77 DiscoveryError::Decode(err) => write!(f, "cbors serialization error: {}", err),
78 DiscoveryError::Timeout => write!(f, "discovery timed out"),
79 DiscoveryError::PermissionDenied => {
80 write!(f, "discovery channel permission denied")
81 }
82 DiscoveryError::MulticastUnavailable => {
83 write!(f, "multicast discovery unavailable")
84 }
85 DiscoveryError::BroadcastBlocked => write!(f, "broadcast discovery blocked"),
86 }
87 }
88}
89
90impl std::error::Error for DiscoveryError {}
91
92impl From<io::Error> for DiscoveryError {
93 fn from(err: io::Error) -> Self {
94 match err.kind() {
95 io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
96 _ => DiscoveryError::Io(err),
97 }
98 }
99}
100
101impl From<serde_cbor::Error> for DiscoveryError {
102 fn from(err: serde_cbor::Error) -> Self {
103 DiscoveryError::Decode(err)
104 }
105}
106
107pub struct DiscoveryOutcome {
109 pub reply: DiscoveryReply,
110 pub peer: SocketAddr,
111 pub client_nonce: Vec<u8>,
112 pub local_addr: SocketAddr,
113 pub device_identity_pubkey: Option<Vec<u8>>,
114 pub device_identity_trusted: bool,
115 pub device_identity_attestation_error: Option<String>,
116 pub interface: Option<String>,
117}
118
119pub struct DiscoveryClient {
121 socket: UdpSocket,
122 remote_addr: SocketAddr,
123 prefer_multicast: bool,
124 allow_broadcast: bool,
125 ipv6: bool,
126 interface: Option<String>,
127 attester_registry: Option<AttesterRegistry>,
128}
129
130impl DiscoveryClient {
131 pub fn new(options: DiscoveryClientOptions) -> Result<Self, DiscoveryError> {
133 let domain = if options.remote_addr.is_ipv4() {
134 Domain::IPV4
135 } else {
136 Domain::IPV6
137 };
138 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
139 if options.allow_broadcast && options.remote_addr.is_ipv4() {
140 socket.set_broadcast(true)?;
141 }
142 socket.bind(&options.local_addr.into())?;
143
144 let socket: UdpSocket = socket.into();
145 socket.set_read_timeout(Some(options.timeout))?;
146 if options.prefer_multicast || options.remote_addr.ip().is_multicast() {
148 let _ = socket.set_multicast_ttl_v4(4);
149 }
150 let local_addr = socket.local_addr().unwrap_or(options.local_addr);
151 info!(
152 "[ALPINE][DISCOVERY][SOCKET] discovery socket created local_addr={} remote_addr={} prefer_multicast={}",
153 local_addr,
154 options.remote_addr,
155 options.prefer_multicast
156 );
157 Ok(Self {
158 socket,
159 remote_addr: options.remote_addr,
160 prefer_multicast: options.prefer_multicast,
161 allow_broadcast: options.allow_broadcast,
162 ipv6: options.remote_addr.is_ipv6() || options.local_addr.is_ipv6(),
163 interface: options.interface.clone(),
164 attester_registry: options.attester_registry.clone(),
165 })
166 }
167
168 pub fn discover(&self, requested: &[String]) -> Result<DiscoveryOutcome, DiscoveryError> {
170 let phase = current_phase();
171 if phase == Phase::Handshake {
172 warn!(
173 "[ALPINE][BUG] discovery attempted during handshake phase (no sends expected); local_addr={}",
174 self.socket
175 .local_addr()
176 .unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 0)))
177 );
178 }
179
180 let mut nonce = vec![0u8; 32];
181 OsRng.fill_bytes(&mut nonce);
182 let request = DiscoveryRequest::new(requested.to_vec(), nonce.clone());
183 let payload = serde_cbor::to_vec(&request)?;
184 let payload_len = payload.len();
185 debug_assert!(
186 payload_len > 8,
187 "discovery payload unexpectedly small; framing may have drifted"
188 );
189
190 let mut send_error: Option<DiscoveryError> = None;
191 let mut sent = false;
192 for target in self.discovery_targets() {
193 let mode = classify_target(target, self.remote_addr);
194 let local_bind = self
195 .socket
196 .local_addr()
197 .map(|addr| addr.to_string())
198 .unwrap_or_else(|_| "unknown".into());
199 info!(
200 "[ALPINE][DISCOVERY][TX][attempt] target={} mode={} local_bind={} payload_len={}",
201 target,
202 mode,
203 local_bind,
204 payload.len()
205 );
206 match self.socket.send_to(&payload, target) {
207 Ok(bytes) => {
208 info!(
209 "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} bytes_sent={}",
210 target,
211 mode,
212 local_bind,
213 bytes
214 );
215 sent = true;
216 }
217 Err(err) => {
218 let kind = err.kind();
219 let mapped = self.map_send_error(err, target);
220 warn!(
221 "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} phase={} error={}",
222 target,
223 mode,
224 local_bind,
225 phase.label(),
226 mapped
227 );
228 send_error = Some(mapped);
229 if !self.should_continue_after_error(&kind) {
230 return Err(send_error.unwrap());
231 }
232 }
233 }
234 }
235
236 if !sent {
237 return Err(send_error.unwrap_or_else(|| DiscoveryError::PermissionDenied));
238 }
239
240 let timeout_ms = self
241 .socket
242 .read_timeout()
243 .ok()
244 .flatten()
245 .map(|d| d.as_millis())
246 .unwrap_or_default();
247 let local_port = self
248 .socket
249 .local_addr()
250 .map(|addr| addr.port())
251 .unwrap_or_default();
252 info!(
253 "[ALPINE][DISCOVERY] awaiting reply timeout_ms={} local_port={} remote_hint={}",
254 timeout_ms, local_port, self.remote_addr
255 );
256
257 let mut buf = vec![0u8; 2048];
258 let (len, peer) = match self.socket.recv_from(&mut buf) {
259 Ok(res) => res,
260 Err(err) => return Err(self.map_recv_error(err)),
261 };
262 let reply: DiscoveryReply = serde_cbor::from_slice(&buf[..len])?;
263 let local_addr = self.socket.local_addr().map_err(DiscoveryError::from)?;
264 info!(
265 "[ALPINE][DISCOVERY][RX] reply received peer={} local_addr={} iface={:?} bytes={}",
266 peer, local_addr, self.interface, len
267 );
268 let (device_identity_trusted, device_identity_attestation_error) =
269 self.verify_attestation(&reply);
270 let outcome = DiscoveryOutcome {
271 reply: reply.clone(),
272 peer,
273 client_nonce: nonce,
274 local_addr,
275 device_identity_pubkey: if reply.device_identity_pubkey.is_empty() {
276 None
277 } else {
278 Some(reply.device_identity_pubkey.clone())
279 },
280 device_identity_trusted,
281 device_identity_attestation_error,
282 interface: self.interface.clone(),
283 };
284 Ok(outcome)
285 }
286
287 fn discovery_targets(&self) -> Vec<SocketAddr> {
288 let mut targets = Vec::new();
289
290 if self.prefer_multicast {
291 if !self.ipv6 {
292 if let Ok(addr) = DEFAULT_MULTICAST_IPV4.parse() {
293 push_if_unique(&mut targets, addr);
294 }
295 }
296 if self.ipv6 {
297 if let Ok(addr) = DEFAULT_MULTICAST_IPV6.parse() {
298 push_if_unique(&mut targets, addr);
299 }
300 }
301 }
302
303 push_if_unique(&mut targets, self.remote_addr);
304
305 if self.allow_broadcast && self.remote_addr.ip().is_ipv4() && !self.ipv6 {
306 if let Ok(addr) = DEFAULT_BROADCAST_IPV4.parse() {
307 push_if_unique(&mut targets, addr);
308 }
309 }
310
311 targets
312 }
313
314 fn map_send_error(&self, err: io::Error, target: SocketAddr) -> DiscoveryError {
315 match err.kind() {
316 io::ErrorKind::PermissionDenied => {
317 if target.ip().is_multicast() {
318 DiscoveryError::MulticastUnavailable
319 } else if is_broadcast_addr(target.ip()) {
320 DiscoveryError::BroadcastBlocked
321 } else {
322 DiscoveryError::PermissionDenied
323 }
324 }
325 io::ErrorKind::ConnectionReset | io::ErrorKind::WouldBlock => {
326 DiscoveryError::PermissionDenied
327 }
328 _ => DiscoveryError::Io(err),
329 }
330 }
331
332 fn map_recv_error(&self, err: io::Error) -> DiscoveryError {
333 match err.kind() {
334 io::ErrorKind::TimedOut => DiscoveryError::Timeout,
335 io::ErrorKind::PermissionDenied | io::ErrorKind::ConnectionReset => {
336 DiscoveryError::PermissionDenied
337 }
338 io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
339 _ => DiscoveryError::Io(err),
340 }
341 }
342
343 fn should_continue_after_error(&self, kind: &io::ErrorKind) -> bool {
344 matches!(
345 kind,
346 io::ErrorKind::PermissionDenied
347 | io::ErrorKind::WouldBlock
348 | io::ErrorKind::ConnectionReset
349 )
350 }
351
352 fn verify_attestation(
353 &self,
354 reply: &DiscoveryReply,
355 ) -> (bool, Option<String>) {
356 if reply.device_identity_attestation.is_empty() {
357 return (false, Some("device identity attestation missing".into()));
358 }
359 let Some(registry) = &self.attester_registry else {
360 return (
361 false,
362 Some("attester registry not configured".into()),
363 );
364 };
365 match verify_device_identity_attestation(reply, registry, std::time::SystemTime::now()) {
366 Ok(_) => (true, None),
367 Err(err) => {
368 warn!(
369 "[ALPINE][DISCOVERY][TRUST] attestation verification failed device_id={} err={}",
370 reply.device_id,
371 err
372 );
373 (false, Some(err.to_string()))
374 }
375 }
376 }
377}
378
379impl Drop for DiscoveryClient {
380 fn drop(&mut self) {
381 if let Ok(local) = self.socket.local_addr() {
382 info!(
383 "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped local_addr={} remote_addr={}",
384 local, self.remote_addr
385 );
386 } else {
387 info!(
388 "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped remote_addr={}",
389 self.remote_addr
390 );
391 }
392 }
393}
394
395fn is_broadcast_addr(ip: IpAddr) -> bool {
396 matches!(ip, IpAddr::V4(addr) if addr.is_broadcast())
397}
398
399fn push_if_unique(targets: &mut Vec<SocketAddr>, candidate: SocketAddr) {
400 if !targets.contains(&candidate) {
401 targets.push(candidate);
402 }
403}
404
405fn classify_target(target: SocketAddr, configured: SocketAddr) -> &'static str {
406 if target.ip().is_multicast() {
407 "multicast"
408 } else if is_broadcast_addr(target.ip()) {
409 "broadcast"
410 } else if target == configured {
411 "unicast-configured"
412 } else {
413 "unicast"
414 }
415}