alpine_protocol_sdk/discovery/
client.rs1use std::{
2 fmt, io,
3 net::{IpAddr, SocketAddr, UdpSocket},
4 time::Duration,
5};
6
7use alpine::messages::{DiscoveryReply, DiscoveryRequest};
8use rand::{rngs::OsRng, RngCore};
9use serde_cbor;
10use tracing::{info, warn};
11
12use socket2::{Domain, Protocol, Socket, Type};
13
14use crate::phase::{current_phase, Phase};
15
16const DEFAULT_MULTICAST_IPV4: &str = "239.255.255.250:19455";
17const DEFAULT_MULTICAST_IPV6: &str = "[ff12::1]:19455";
18const DEFAULT_BROADCAST_IPV4: &str = "255.255.255.255:19455";
19
20pub struct DiscoveryClientOptions {
22 pub remote_addr: SocketAddr,
23 pub local_addr: SocketAddr,
24 pub timeout: Duration,
25 pub prefer_multicast: bool,
26 pub allow_broadcast: bool,
27 pub interface: Option<String>,
28}
29
30impl DiscoveryClientOptions {
31 pub fn new(remote_addr: SocketAddr, local_addr: SocketAddr, timeout: Duration) -> Self {
33 Self {
34 remote_addr,
35 local_addr,
36 timeout,
37 prefer_multicast: false,
38 allow_broadcast: true,
39 interface: None,
40 }
41 }
42
43 pub fn disable_multicast(mut self) -> Self {
44 self.prefer_multicast = false;
45 self
46 }
47
48 pub fn disable_broadcast(mut self) -> Self {
49 self.allow_broadcast = false;
50 self
51 }
52}
53
54#[derive(Debug)]
56pub enum DiscoveryError {
57 Io(io::Error),
58 Decode(serde_cbor::Error),
59 Timeout,
60 PermissionDenied,
61 MulticastUnavailable,
62 BroadcastBlocked,
63}
64
65impl fmt::Display for DiscoveryError {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 match self {
68 DiscoveryError::Io(err) => write!(f, "io error: {}", err),
69 DiscoveryError::Decode(err) => write!(f, "cbors serialization error: {}", err),
70 DiscoveryError::Timeout => write!(f, "discovery timed out"),
71 DiscoveryError::PermissionDenied => {
72 write!(f, "discovery channel permission denied")
73 }
74 DiscoveryError::MulticastUnavailable => {
75 write!(f, "multicast discovery unavailable")
76 }
77 DiscoveryError::BroadcastBlocked => write!(f, "broadcast discovery blocked"),
78 }
79 }
80}
81
82impl std::error::Error for DiscoveryError {}
83
84impl From<io::Error> for DiscoveryError {
85 fn from(err: io::Error) -> Self {
86 match err.kind() {
87 io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
88 _ => DiscoveryError::Io(err),
89 }
90 }
91}
92
93impl From<serde_cbor::Error> for DiscoveryError {
94 fn from(err: serde_cbor::Error) -> Self {
95 DiscoveryError::Decode(err)
96 }
97}
98
99pub struct DiscoveryOutcome {
101 pub reply: DiscoveryReply,
102 pub peer: SocketAddr,
103 pub client_nonce: Vec<u8>,
104 pub local_addr: SocketAddr,
105 pub device_identity_pubkey: Option<Vec<u8>>,
106 pub interface: Option<String>,
107}
108
109pub struct DiscoveryClient {
111 socket: UdpSocket,
112 remote_addr: SocketAddr,
113 prefer_multicast: bool,
114 allow_broadcast: bool,
115 ipv6: bool,
116 interface: Option<String>,
117}
118
119impl DiscoveryClient {
120 pub fn new(options: DiscoveryClientOptions) -> Result<Self, DiscoveryError> {
122 let domain = if options.remote_addr.is_ipv4() {
123 Domain::IPV4
124 } else {
125 Domain::IPV6
126 };
127 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
128 if options.allow_broadcast && options.remote_addr.is_ipv4() {
129 socket.set_broadcast(true)?;
130 }
131 socket.bind(&options.local_addr.into())?;
132
133 let socket: UdpSocket = socket.into();
134 socket.set_read_timeout(Some(options.timeout))?;
135 if options.prefer_multicast || options.remote_addr.ip().is_multicast() {
137 let _ = socket.set_multicast_ttl_v4(4);
138 }
139 let local_addr = socket.local_addr().unwrap_or(options.local_addr);
140 info!(
141 "[ALPINE][DISCOVERY][SOCKET] discovery socket created local_addr={} remote_addr={} prefer_multicast={}",
142 local_addr,
143 options.remote_addr,
144 options.prefer_multicast
145 );
146 Ok(Self {
147 socket,
148 remote_addr: options.remote_addr,
149 prefer_multicast: options.prefer_multicast,
150 allow_broadcast: options.allow_broadcast,
151 ipv6: options.remote_addr.is_ipv6() || options.local_addr.is_ipv6(),
152 interface: options.interface.clone(),
153 })
154 }
155
156 pub fn discover(&self, requested: &[String]) -> Result<DiscoveryOutcome, DiscoveryError> {
158 let phase = current_phase();
159 if phase == Phase::Handshake {
160 warn!(
161 "[ALPINE][BUG] discovery attempted during handshake phase (no sends expected); local_addr={}",
162 self.socket
163 .local_addr()
164 .unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 0)))
165 );
166 }
167
168 let mut nonce = vec![0u8; 32];
169 OsRng.fill_bytes(&mut nonce);
170 let request = DiscoveryRequest::new(requested.to_vec(), nonce.clone());
171 let payload = serde_cbor::to_vec(&request)?;
172 let payload_len = payload.len();
173 debug_assert!(
174 payload_len > 8,
175 "discovery payload unexpectedly small; framing may have drifted"
176 );
177
178 let mut send_error: Option<DiscoveryError> = None;
179 let mut sent = false;
180 for target in self.discovery_targets() {
181 let mode = classify_target(target, self.remote_addr);
182 let local_bind = self
183 .socket
184 .local_addr()
185 .map(|addr| addr.to_string())
186 .unwrap_or_else(|_| "unknown".into());
187 info!(
188 "[ALPINE][DISCOVERY][TX][attempt] target={} mode={} local_bind={} payload_len={}",
189 target,
190 mode,
191 local_bind,
192 payload.len()
193 );
194 match self.socket.send_to(&payload, target) {
195 Ok(bytes) => {
196 info!(
197 "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} bytes_sent={}",
198 target,
199 mode,
200 local_bind,
201 bytes
202 );
203 sent = true;
204 }
205 Err(err) => {
206 let kind = err.kind();
207 let mapped = self.map_send_error(err, target);
208 warn!(
209 "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} phase={} error={}",
210 target,
211 mode,
212 local_bind,
213 phase.label(),
214 mapped
215 );
216 send_error = Some(mapped);
217 if !self.should_continue_after_error(&kind) {
218 return Err(send_error.unwrap());
219 }
220 }
221 }
222 }
223
224 if !sent {
225 return Err(send_error.unwrap_or_else(|| DiscoveryError::PermissionDenied));
226 }
227
228 let timeout_ms = self
229 .socket
230 .read_timeout()
231 .ok()
232 .flatten()
233 .map(|d| d.as_millis())
234 .unwrap_or_default();
235 let local_port = self
236 .socket
237 .local_addr()
238 .map(|addr| addr.port())
239 .unwrap_or_default();
240 info!(
241 "[ALPINE][DISCOVERY] awaiting reply timeout_ms={} local_port={} remote_hint={}",
242 timeout_ms, local_port, self.remote_addr
243 );
244
245 let mut buf = vec![0u8; 2048];
246 let (len, peer) = match self.socket.recv_from(&mut buf) {
247 Ok(res) => res,
248 Err(err) => return Err(self.map_recv_error(err)),
249 };
250 let reply: DiscoveryReply = serde_cbor::from_slice(&buf[..len])?;
251 let local_addr = self.socket.local_addr().map_err(DiscoveryError::from)?;
252 info!(
253 "[ALPINE][DISCOVERY][RX] reply received peer={} local_addr={} iface={:?} bytes={}",
254 peer, local_addr, self.interface, len
255 );
256 let outcome = DiscoveryOutcome {
257 reply: reply.clone(),
258 peer,
259 client_nonce: nonce,
260 local_addr,
261 device_identity_pubkey: if reply.device_identity_pubkey.is_empty() {
262 None
263 } else {
264 Some(reply.device_identity_pubkey.clone())
265 },
266 interface: self.interface.clone(),
267 };
268 Ok(outcome)
269 }
270
271 fn discovery_targets(&self) -> Vec<SocketAddr> {
272 let mut targets = Vec::new();
273
274 if self.prefer_multicast {
275 if !self.ipv6 {
276 if let Ok(addr) = DEFAULT_MULTICAST_IPV4.parse() {
277 push_if_unique(&mut targets, addr);
278 }
279 }
280 if self.ipv6 {
281 if let Ok(addr) = DEFAULT_MULTICAST_IPV6.parse() {
282 push_if_unique(&mut targets, addr);
283 }
284 }
285 }
286
287 push_if_unique(&mut targets, self.remote_addr);
288
289 if self.allow_broadcast && self.remote_addr.ip().is_ipv4() && !self.ipv6 {
290 if let Ok(addr) = DEFAULT_BROADCAST_IPV4.parse() {
291 push_if_unique(&mut targets, addr);
292 }
293 }
294
295 targets
296 }
297
298 fn map_send_error(&self, err: io::Error, target: SocketAddr) -> DiscoveryError {
299 match err.kind() {
300 io::ErrorKind::PermissionDenied => {
301 if target.ip().is_multicast() {
302 DiscoveryError::MulticastUnavailable
303 } else if is_broadcast_addr(target.ip()) {
304 DiscoveryError::BroadcastBlocked
305 } else {
306 DiscoveryError::PermissionDenied
307 }
308 }
309 io::ErrorKind::ConnectionReset | io::ErrorKind::WouldBlock => {
310 DiscoveryError::PermissionDenied
311 }
312 _ => DiscoveryError::Io(err),
313 }
314 }
315
316 fn map_recv_error(&self, err: io::Error) -> DiscoveryError {
317 match err.kind() {
318 io::ErrorKind::TimedOut => DiscoveryError::Timeout,
319 io::ErrorKind::PermissionDenied | io::ErrorKind::ConnectionReset => {
320 DiscoveryError::PermissionDenied
321 }
322 io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
323 _ => DiscoveryError::Io(err),
324 }
325 }
326
327 fn should_continue_after_error(&self, kind: &io::ErrorKind) -> bool {
328 matches!(
329 kind,
330 io::ErrorKind::PermissionDenied
331 | io::ErrorKind::WouldBlock
332 | io::ErrorKind::ConnectionReset
333 )
334 }
335}
336
337impl Drop for DiscoveryClient {
338 fn drop(&mut self) {
339 if let Ok(local) = self.socket.local_addr() {
340 info!(
341 "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped local_addr={} remote_addr={}",
342 local, self.remote_addr
343 );
344 } else {
345 info!(
346 "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped remote_addr={}",
347 self.remote_addr
348 );
349 }
350 }
351}
352
353fn is_broadcast_addr(ip: IpAddr) -> bool {
354 matches!(ip, IpAddr::V4(addr) if addr.is_broadcast())
355}
356
357fn push_if_unique(targets: &mut Vec<SocketAddr>, candidate: SocketAddr) {
358 if !targets.contains(&candidate) {
359 targets.push(candidate);
360 }
361}
362
363fn classify_target(target: SocketAddr, configured: SocketAddr) -> &'static str {
364 if target.ip().is_multicast() {
365 "multicast"
366 } else if is_broadcast_addr(target.ip()) {
367 "broadcast"
368 } else if target == configured {
369 "unicast-configured"
370 } else {
371 "unicast"
372 }
373}