alpine-protocol-sdk 0.1.15

High-level SDK on top of the ALPINE protocol layer.
Documentation
use std::{
    fmt, io,
    net::{IpAddr, SocketAddr, UdpSocket},
    time::Duration,
};

use alpine::messages::{DiscoveryReply, DiscoveryRequest};
use rand::{rngs::OsRng, RngCore};
use serde_cbor;
use tracing::{info, warn};

use socket2::{Domain, Protocol, Socket, Type};

use crate::phase::{current_phase, Phase};

const DEFAULT_MULTICAST_IPV4: &str = "239.255.255.250:19455";
const DEFAULT_MULTICAST_IPV6: &str = "[ff12::1]:19455";
const DEFAULT_BROADCAST_IPV4: &str = "255.255.255.255:19455";

/// Options used to configure the blocking discovery helper.
pub struct DiscoveryClientOptions {
    pub remote_addr: SocketAddr,
    pub local_addr: SocketAddr,
    pub timeout: Duration,
    pub prefer_multicast: bool,
    pub allow_broadcast: bool,
    pub interface: Option<String>,
}

impl DiscoveryClientOptions {
    /// Creates options with the provided remote socket and a default timeout.
    pub fn new(remote_addr: SocketAddr, local_addr: SocketAddr, timeout: Duration) -> Self {
        Self {
            remote_addr,
            local_addr,
            timeout,
            prefer_multicast: false,
            allow_broadcast: true,
            interface: None,
        }
    }

    pub fn disable_multicast(mut self) -> Self {
        self.prefer_multicast = false;
        self
    }

    pub fn disable_broadcast(mut self) -> Self {
        self.allow_broadcast = false;
        self
    }
}

/// Errors that can happen while sending or receiving discovery payloads.
#[derive(Debug)]
pub enum DiscoveryError {
    Io(io::Error),
    Decode(serde_cbor::Error),
    Timeout,
    PermissionDenied,
    MulticastUnavailable,
    BroadcastBlocked,
}

impl fmt::Display for DiscoveryError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            DiscoveryError::Io(err) => write!(f, "io error: {}", err),
            DiscoveryError::Decode(err) => write!(f, "cbors serialization error: {}", err),
            DiscoveryError::Timeout => write!(f, "discovery timed out"),
            DiscoveryError::PermissionDenied => {
                write!(f, "discovery channel permission denied")
            }
            DiscoveryError::MulticastUnavailable => {
                write!(f, "multicast discovery unavailable")
            }
            DiscoveryError::BroadcastBlocked => write!(f, "broadcast discovery blocked"),
        }
    }
}

impl std::error::Error for DiscoveryError {}

impl From<io::Error> for DiscoveryError {
    fn from(err: io::Error) -> Self {
        match err.kind() {
            io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
            _ => DiscoveryError::Io(err),
        }
    }
}

impl From<serde_cbor::Error> for DiscoveryError {
    fn from(err: serde_cbor::Error) -> Self {
        DiscoveryError::Decode(err)
    }
}

/// The outcome of a discovery request.
pub struct DiscoveryOutcome {
    pub reply: DiscoveryReply,
    pub peer: SocketAddr,
    pub client_nonce: Vec<u8>,
    pub local_addr: SocketAddr,
    pub device_identity_pubkey: Option<Vec<u8>>,
    pub interface: Option<String>,
}

/// Stateless discovery helper that wraps the protocol request/response models.
pub struct DiscoveryClient {
    socket: UdpSocket,
    remote_addr: SocketAddr,
    prefer_multicast: bool,
    allow_broadcast: bool,
    ipv6: bool,
    interface: Option<String>,
}

impl DiscoveryClient {
    /// Creates a client that will send discovery packets to `remote_addr`.
    pub fn new(options: DiscoveryClientOptions) -> Result<Self, DiscoveryError> {
        let domain = if options.remote_addr.is_ipv4() {
            Domain::IPV4
        } else {
            Domain::IPV6
        };
        let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
        if options.allow_broadcast && options.remote_addr.is_ipv4() {
            socket.set_broadcast(true)?;
        }
        socket.bind(&options.local_addr.into())?;

        let socket: UdpSocket = socket.into();
        socket.set_read_timeout(Some(options.timeout))?;
        // TTL of 4 keeps traffic on the local segment but makes sure it leaves the host.
        if options.prefer_multicast || options.remote_addr.ip().is_multicast() {
            let _ = socket.set_multicast_ttl_v4(4);
        }
        let local_addr = socket.local_addr().unwrap_or(options.local_addr);
        info!(
            "[ALPINE][DISCOVERY][SOCKET] discovery socket created local_addr={} remote_addr={} prefer_multicast={}",
            local_addr,
            options.remote_addr,
            options.prefer_multicast
        );
        Ok(Self {
            socket,
            remote_addr: options.remote_addr,
            prefer_multicast: options.prefer_multicast,
            allow_broadcast: options.allow_broadcast,
            ipv6: options.remote_addr.is_ipv6() || options.local_addr.is_ipv6(),
            interface: options.interface.clone(),
        })
    }

    /// Sends a discovery payload with the requested capability names and waits for a reply.
    pub fn discover(&self, requested: &[String]) -> Result<DiscoveryOutcome, DiscoveryError> {
        let phase = current_phase();
        if phase == Phase::Handshake {
            warn!(
                "[ALPINE][BUG] discovery attempted during handshake phase (no sends expected); local_addr={}",
                self.socket
                    .local_addr()
                    .unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 0)))
            );
        }

        let mut nonce = vec![0u8; 32];
        OsRng.fill_bytes(&mut nonce);
        let request = DiscoveryRequest::new(requested.to_vec(), nonce.clone());
        let payload = serde_cbor::to_vec(&request)?;
        let payload_len = payload.len();
        debug_assert!(
            payload_len > 8,
            "discovery payload unexpectedly small; framing may have drifted"
        );

        let mut send_error: Option<DiscoveryError> = None;
        let mut sent = false;
        for target in self.discovery_targets() {
            let mode = classify_target(target, self.remote_addr);
            let local_bind = self
                .socket
                .local_addr()
                .map(|addr| addr.to_string())
                .unwrap_or_else(|_| "unknown".into());
            info!(
                "[ALPINE][DISCOVERY][TX][attempt] target={} mode={} local_bind={} payload_len={}",
                target,
                mode,
                local_bind,
                payload.len()
            );
            match self.socket.send_to(&payload, target) {
                Ok(bytes) => {
                    info!(
                        "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} bytes_sent={}",
                        target,
                        mode,
                        local_bind,
                        bytes
                    );
                    sent = true;
                }
                Err(err) => {
                    let kind = err.kind();
                    let mapped = self.map_send_error(err, target);
                    warn!(
                        "[ALPINE][DISCOVERY][TX][result] target={} mode={} local_bind={} phase={} error={}",
                        target,
                        mode,
                        local_bind,
                        phase.label(),
                        mapped
                    );
                    send_error = Some(mapped);
                    if !self.should_continue_after_error(&kind) {
                        return Err(send_error.unwrap());
                    }
                }
            }
        }

        if !sent {
            return Err(send_error.unwrap_or_else(|| DiscoveryError::PermissionDenied));
        }

        let timeout_ms = self
            .socket
            .read_timeout()
            .ok()
            .flatten()
            .map(|d| d.as_millis())
            .unwrap_or_default();
        let local_port = self
            .socket
            .local_addr()
            .map(|addr| addr.port())
            .unwrap_or_default();
        info!(
            "[ALPINE][DISCOVERY] awaiting reply timeout_ms={} local_port={} remote_hint={}",
            timeout_ms, local_port, self.remote_addr
        );

        let mut buf = vec![0u8; 2048];
        let (len, peer) = match self.socket.recv_from(&mut buf) {
            Ok(res) => res,
            Err(err) => return Err(self.map_recv_error(err)),
        };
        let reply: DiscoveryReply = serde_cbor::from_slice(&buf[..len])?;
        let local_addr = self.socket.local_addr().map_err(DiscoveryError::from)?;
        info!(
            "[ALPINE][DISCOVERY][RX] reply received peer={} local_addr={} iface={:?} bytes={}",
            peer, local_addr, self.interface, len
        );
        let outcome = DiscoveryOutcome {
            reply: reply.clone(),
            peer,
            client_nonce: nonce,
            local_addr,
            device_identity_pubkey: if reply.device_identity_pubkey.is_empty() {
                None
            } else {
                Some(reply.device_identity_pubkey.clone())
            },
            interface: self.interface.clone(),
        };
        Ok(outcome)
    }

    fn discovery_targets(&self) -> Vec<SocketAddr> {
        let mut targets = Vec::new();

        if self.prefer_multicast {
            if !self.ipv6 {
                if let Ok(addr) = DEFAULT_MULTICAST_IPV4.parse() {
                    push_if_unique(&mut targets, addr);
                }
            }
            if self.ipv6 {
                if let Ok(addr) = DEFAULT_MULTICAST_IPV6.parse() {
                    push_if_unique(&mut targets, addr);
                }
            }
        }

        push_if_unique(&mut targets, self.remote_addr);

        if self.allow_broadcast && self.remote_addr.ip().is_ipv4() && !self.ipv6 {
            if let Ok(addr) = DEFAULT_BROADCAST_IPV4.parse() {
                push_if_unique(&mut targets, addr);
            }
        }

        targets
    }

    fn map_send_error(&self, err: io::Error, target: SocketAddr) -> DiscoveryError {
        match err.kind() {
            io::ErrorKind::PermissionDenied => {
                if target.ip().is_multicast() {
                    DiscoveryError::MulticastUnavailable
                } else if is_broadcast_addr(target.ip()) {
                    DiscoveryError::BroadcastBlocked
                } else {
                    DiscoveryError::PermissionDenied
                }
            }
            io::ErrorKind::ConnectionReset | io::ErrorKind::WouldBlock => {
                DiscoveryError::PermissionDenied
            }
            _ => DiscoveryError::Io(err),
        }
    }

    fn map_recv_error(&self, err: io::Error) -> DiscoveryError {
        match err.kind() {
            io::ErrorKind::TimedOut => DiscoveryError::Timeout,
            io::ErrorKind::PermissionDenied | io::ErrorKind::ConnectionReset => {
                DiscoveryError::PermissionDenied
            }
            io::ErrorKind::WouldBlock => DiscoveryError::Timeout,
            _ => DiscoveryError::Io(err),
        }
    }

    fn should_continue_after_error(&self, kind: &io::ErrorKind) -> bool {
        matches!(
            kind,
            io::ErrorKind::PermissionDenied
                | io::ErrorKind::WouldBlock
                | io::ErrorKind::ConnectionReset
        )
    }
}

impl Drop for DiscoveryClient {
    fn drop(&mut self) {
        if let Ok(local) = self.socket.local_addr() {
            info!(
                "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped local_addr={} remote_addr={}",
                local, self.remote_addr
            );
        } else {
            info!(
                "[ALPINE][DISCOVERY][SOCKET] discovery socket dropped remote_addr={}",
                self.remote_addr
            );
        }
    }
}

fn is_broadcast_addr(ip: IpAddr) -> bool {
    matches!(ip, IpAddr::V4(addr) if addr.is_broadcast())
}

fn push_if_unique(targets: &mut Vec<SocketAddr>, candidate: SocketAddr) {
    if !targets.contains(&candidate) {
        targets.push(candidate);
    }
}

fn classify_target(target: SocketAddr, configured: SocketAddr) -> &'static str {
    if target.ip().is_multicast() {
        "multicast"
    } else if is_broadcast_addr(target.ip()) {
        "broadcast"
    } else if target == configured {
        "unicast-configured"
    } else {
        "unicast"
    }
}