use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use alpine::control::{ControlClient, ControlCrypto};
use alpine::crypto::identity::NodeCredentials;
use alpine::crypto::X25519KeyExchange;
use alpine::handshake::keepalive;
use alpine::handshake::transport::{CborUdpTransport, ReliableControlChannel, TimeoutTransport};
use alpine::handshake::{HandshakeContext, HandshakeError, HandshakeMessage, HandshakeTransport};
use alpine::messages::{
Acknowledge, CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity,
};
use alpine::profile::StreamProfile;
use alpine::session::{AlnpSession, Ed25519Authenticator};
use alpine::stream::AlnpStream;
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde_cbor;
use serde_json::{json, Value};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use tracing::{info, warn};
use uuid::Uuid;
use crate::transport::UdpFrameTransport;
use crate::{
discovery::DeviceTrustState,
environment::ensure_supported_environment,
error::AlpineSdkError,
phase::{self, Phase},
self_check::run_sdk_self_check,
vendor::VendorExtensionRegistry,
};
use tokio::net::UdpSocket;
type WireTransport = InstrumentedTransport<TimeoutTransport<CborUdpTransport>>;
#[derive(Debug)]
pub struct AlpineClient {
session: AlnpSession,
_transport: Arc<Mutex<WireTransport>>,
udp_socket: Arc<UdpSocket>,
local_addr: SocketAddr,
remote_addr: SocketAddr,
stream: Option<AlnpStream<UdpFrameTransport>>,
control: ControlClient,
keepalive_handle: Option<JoinHandle<()>>,
session_capabilities: Option<CapabilitySet>,
run_id: Option<String>,
vendor_registry: Option<VendorExtensionRegistry>,
}
#[derive(Debug, Clone)]
pub struct ControlReply<T> {
pub ack: Acknowledge,
pub payload: Option<T>,
}
impl<T> ControlReply<T> {
pub fn ok(&self) -> bool {
self.ack.ok
}
pub fn detail(&self) -> Option<&str> {
self.ack.detail.as_deref()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProbeStep {
Ping,
Status,
Health,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProbeState {
Online,
Degraded,
Offline,
}
#[derive(Debug, Clone)]
pub struct ProbeError {
pub step: ProbeStep,
pub message: String,
}
#[derive(Debug, Clone)]
pub struct ProbeResult {
pub state: ProbeState,
pub ping: Option<PingReply>,
pub status: Option<StatusReply>,
pub health: Option<HealthReply>,
pub ping_rtt_ms: Option<u128>,
pub status_rtt_ms: Option<u128>,
pub health_rtt_ms: Option<u128>,
pub detail: Option<String>,
pub errors: Vec<ProbeError>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UiSeverity {
Ok,
Warn,
Critical,
}
#[derive(Debug, Clone)]
pub struct UiBadge {
pub label: String,
pub severity: UiSeverity,
pub color: &'static str,
}
impl ProbeResult {
fn record_error(&mut self, step: ProbeStep, message: String) {
self.errors.push(ProbeError { step, message });
}
pub fn to_device_state(&self, trust: DeviceTrustState) -> DeviceState {
let healthy = self
.status
.as_ref()
.and_then(|payload| payload.healthy)
.or_else(|| self.health.as_ref().and_then(|payload| payload.healthy));
let online = matches!(self.state, ProbeState::Online | ProbeState::Degraded);
let label = if trust != DeviceTrustState::Trusted {
"untrusted".to_string()
} else {
match self.state {
ProbeState::Online => "online".to_string(),
ProbeState::Degraded => "degraded".to_string(),
ProbeState::Offline => "offline".to_string(),
}
};
let last_error = self.errors.first().map(|err| err.message.clone());
DeviceState {
state: self.state,
online,
healthy,
trust,
last_rtt_ms: self.ping_rtt_ms,
detail: self.detail.clone(),
last_error,
label,
}
}
pub fn delta_summary(&self, previous: &ProbeResult) -> Option<String> {
if self.state != previous.state {
return Some(format!(
"state changed from {:?} to {:?}",
previous.state, self.state
));
}
if self.detail != previous.detail {
if let Some(detail) = &self.detail {
return Some(format!("detail changed to '{}'", detail));
}
return Some("detail cleared".to_string());
}
None
}
pub fn to_ui_badge(&self, trust: DeviceTrustState) -> UiBadge {
if trust != DeviceTrustState::Trusted {
return UiBadge {
label: "untrusted".to_string(),
severity: UiSeverity::Warn,
color: "yellow",
};
}
match self.state {
ProbeState::Online => UiBadge {
label: "online".to_string(),
severity: UiSeverity::Ok,
color: "green",
},
ProbeState::Degraded => UiBadge {
label: "degraded".to_string(),
severity: UiSeverity::Warn,
color: "orange",
},
ProbeState::Offline => UiBadge {
label: "offline".to_string(),
severity: UiSeverity::Critical,
color: "red",
},
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ControlCommand {
Ping,
Status,
GetStatus,
Health,
Identity,
Metadata,
}
impl ControlCommand {
pub fn requires_trust(&self) -> bool {
matches!(self, ControlCommand::Identity | ControlCommand::Metadata)
}
pub fn is_observe_safe(&self) -> bool {
matches!(
self,
ControlCommand::Ping
| ControlCommand::Status
| ControlCommand::GetStatus
| ControlCommand::Health
)
}
}
#[derive(Debug)]
pub struct StatusMismatchError {
pub detail: String,
}
impl std::fmt::Display for StatusMismatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.detail)
}
}
impl std::error::Error for StatusMismatchError {}
#[derive(Debug, Clone)]
pub enum DangerousControlCommand {
FirmwareUpdate,
Reboot,
IdentityReset,
Vendor(String),
}
impl DangerousControlCommand {
fn as_command(&self) -> &str {
match self {
DangerousControlCommand::FirmwareUpdate => "firmware_update",
DangerousControlCommand::Reboot => "reboot",
DangerousControlCommand::IdentityReset => "identity_reset",
DangerousControlCommand::Vendor(name) => name.as_str(),
}
}
}
#[derive(Debug, Clone)]
pub enum ControlResponse {
Ping(ControlReply<PingReply>),
Status(ControlReply<StatusReply>),
Health(ControlReply<HealthReply>),
Identity(ControlReply<IdentityReply>),
Metadata(ControlReply<MetadataReply>),
}
#[derive(Debug, Clone)]
pub struct ControlDryRun {
pub command: String,
pub allowed: bool,
pub warnings: Vec<String>,
pub reason: String,
pub run_id: Option<String>,
pub remote_addr: SocketAddr,
}
#[derive(Debug, Clone)]
pub struct ControlRetryPolicy {
pub max_attempts: usize,
pub backoff_base_ms: u64,
pub backoff_max_ms: u64,
}
impl Default for ControlRetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
backoff_base_ms: 150,
backoff_max_ms: 2_000,
}
}
}
#[derive(Debug, Clone)]
pub struct ControlOptions {
pub timeout: Option<Duration>,
pub retry: Option<ControlRetryPolicy>,
pub allow_dangerous: bool,
}
impl Default for ControlOptions {
fn default() -> Self {
Self {
timeout: None,
retry: None,
allow_dangerous: false,
}
}
}
impl ControlOptions {
pub fn allow_dangerous(mut self, allow: bool) -> Self {
self.allow_dangerous = allow;
self
}
pub fn defaults_for_ui() -> Self {
Self {
timeout: Some(Duration::from_millis(500)),
retry: Some(ControlRetryPolicy {
max_attempts: 2,
backoff_base_ms: 150,
backoff_max_ms: 300,
}),
allow_dangerous: false,
}
}
}
#[derive(Debug, Clone)]
pub struct ProbeOptions {
pub include_health: bool,
pub control: ControlOptions,
}
#[derive(Debug, Clone)]
pub struct DeviceState {
pub state: ProbeState,
pub online: bool,
pub healthy: Option<bool>,
pub trust: DeviceTrustState,
pub last_rtt_ms: Option<u128>,
pub detail: Option<String>,
pub last_error: Option<String>,
pub label: String,
}
impl Default for ProbeOptions {
fn default() -> Self {
Self {
include_health: true,
control: ControlOptions::default(),
}
}
}
#[derive(Debug, Deserialize, Clone)]
pub struct PingReply {
#[serde(default)]
pub timestamp_ms: Option<u64>,
#[serde(default)]
pub message: Option<String>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct StatusReply {
#[serde(default)]
pub healthy: Option<bool>,
#[serde(default)]
pub detail: Option<String>,
#[serde(default)]
pub uptime_secs: Option<u64>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct HealthReply {
#[serde(default)]
pub healthy: Option<bool>,
#[serde(default)]
pub detail: Option<String>,
#[serde(default)]
pub metrics: Option<HashMap<String, Value>>,
}
pub type IdentityReply = DeviceIdentity;
#[derive(Debug, Deserialize, Clone)]
pub struct MetadataReply {
#[serde(default)]
pub metadata: HashMap<String, Value>,
}
impl AlpineClient {
pub async fn connect(
local_addr: SocketAddr,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
) -> Result<Self, AlpineSdkError> {
Self::connect_with_context(
local_addr,
remote_addr,
identity,
capabilities,
credentials,
HandshakeContext::default(),
)
.await
}
pub async fn connect_checked(
outcome: crate::discovery::DiscoveryOutcome,
requested: CapabilitySet,
credentials: NodeCredentials,
) -> Result<Self, AlpineSdkError> {
outcome.require_capabilities(&requested)?;
let identity = DeviceIdentity {
device_id: outcome.reply.device_id.clone(),
manufacturer_id: outcome.reply.manufacturer_id.clone(),
model_id: outcome.reply.model_id.clone(),
hardware_rev: outcome.reply.hardware_rev.clone(),
firmware_rev: outcome.reply.firmware_rev.clone(),
};
Self::connect(
outcome.local_addr,
outcome.peer,
identity,
requested,
credentials,
)
.await
}
pub async fn connect_with_nonce(
local_addr: SocketAddr,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
client_nonce: Vec<u8>,
) -> Result<Self, AlpineSdkError> {
Self::connect_with_context(
local_addr,
remote_addr,
identity,
capabilities,
credentials,
HandshakeContext::default().with_client_nonce(client_nonce),
)
.await
}
pub async fn connect_with_context_and_nonce(
local_addr: SocketAddr,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
client_nonce: Vec<u8>,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
Self::connect_with_context(
local_addr,
remote_addr,
identity,
capabilities,
credentials,
context.with_client_nonce(client_nonce),
)
.await
}
pub async fn connect_with_socket_and_nonce(
socket: UdpSocket,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
client_nonce: Vec<u8>,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
Self::connect_with_socket(
socket,
remote_addr,
identity,
capabilities,
credentials,
context.with_client_nonce(client_nonce),
)
.await
}
async fn connect_with_context(
local_addr: SocketAddr,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
run_sdk_self_check();
ensure_supported_environment()?;
let _handshake_phase = phase::claim_handshake()?;
let base_transport =
CborUdpTransport::bind(local_addr, remote_addr, 4096, context.debug_cbor).await?;
Self::connect_with_transport(base_transport, identity, capabilities, credentials, context)
.await
}
async fn connect_with_socket(
socket: UdpSocket,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
let base_transport =
CborUdpTransport::from_socket(socket, remote_addr, 4096, context.debug_cbor)
.map_err(AlpineSdkError::from)?;
Self::connect_with_transport(base_transport, identity, capabilities, credentials, context)
.await
}
async fn connect_with_transport(
base_transport: CborUdpTransport,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
let udp_socket = base_transport.socket();
let bound_local_addr = base_transport.local_addr();
let peer_addr = base_transport.peer_addr();
info!(
"[ALPINE][HANDSHAKE] starting handshake for device {} local_addr={} remote_addr={} phase={}",
identity.device_id,
bound_local_addr,
peer_addr,
phase::current_phase().label()
);
if bound_local_addr.is_ipv4() != peer_addr.is_ipv4() {
warn!(
"[ALPINE][HANDSHAKE][WARN] IPv4/IPv6 mismatch local_addr={} remote_addr={}",
bound_local_addr, peer_addr
);
}
let mut transport = InstrumentedTransport::new(
TimeoutTransport::new(base_transport, context.recv_timeout),
bound_local_addr,
peer_addr,
);
info!(
"[ALPINE][HANDSHAKE] transport ready recv_timeout_ms={}",
context.recv_timeout.as_millis()
);
let session = AlnpSession::connect(
identity,
capabilities.clone(),
Ed25519Authenticator::new(credentials.clone()),
X25519KeyExchange::new(),
context,
&mut transport,
)
.await?;
let transport = Arc::new(Mutex::new(transport));
let established = session
.established()
.ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?
.clone();
let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
transport.clone(),
Duration::from_secs(5),
established.session_id.clone(),
));
let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
.unwrap_or_else(|_| Uuid::new_v4());
let control_crypto = ControlCrypto::new(
session
.keys()
.ok_or_else(|| AlpineSdkError::Io("session keys missing".into()))?,
);
let control =
ControlClient::new(device_uuid, established.session_id.clone(), control_crypto);
info!(
"[ALPINE][HANDSHAKE] session established session_id={} local_addr={} remote_addr={}",
established.session_id, bound_local_addr, peer_addr
);
Ok(Self {
session,
_transport: transport,
udp_socket,
local_addr: bound_local_addr,
remote_addr: peer_addr,
stream: None,
control,
keepalive_handle: Some(keepalive_handle),
session_capabilities: None,
run_id: None,
vendor_registry: None,
})
}
pub fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
let compiled = profile
.compile()
.map_err(|err| HandshakeError::Protocol(err.to_string()))?;
self.session
.set_stream_profile(compiled.clone())
.map_err(|err| AlpineSdkError::HandshakeFailed(err.to_string()))?;
self.session.mark_streaming();
let stream_local = SocketAddr::new(self.local_addr.ip(), 0);
let stream_socket =
UdpFrameTransport::new(stream_local, self.remote_addr).map_err(AlpineSdkError::from)?;
let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
self.stream = Some(stream);
Ok(compiled.config_id().to_string())
}
pub fn send_frame(
&self,
channel_format: ChannelFormat,
channels: Vec<u16>,
priority: u8,
groups: Option<HashMap<String, Vec<u16>>>,
metadata: Option<HashMap<String, Value>>,
) -> Result<(), AlpineSdkError> {
let stream = self
.stream
.as_ref()
.ok_or_else(|| AlpineSdkError::Io("stream not started".into()))?;
stream
.send(channel_format, channels, priority, groups, metadata)
.map_err(AlpineSdkError::from)
}
pub async fn close(mut self) {
self.session.close();
if let Some(handle) = self.keepalive_handle.take() {
handle.abort();
}
}
pub fn control_envelope(
&self,
seq: u64,
op: ControlOp,
payload: Value,
) -> Result<ControlEnvelope, HandshakeError> {
self.control.envelope(seq, op, payload)
}
pub async fn ping(&self) -> Result<ControlReply<PingReply>, AlpineSdkError> {
self.control_command("ping").await
}
pub async fn status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
self.control_command("status").await
}
pub async fn status_standard(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
self.control_request(ControlOp::GetStatus, json!({})).await
}
pub async fn get_status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
let reply = self.status_standard().await?;
if !reply.ack.ok {
let detail = reply
.detail()
.unwrap_or("device rejected standard get_status")
.to_string();
let mismatch = StatusMismatchError {
detail: format!(
"standard get_status rejected; vendor status may be required ({})",
detail
),
};
return Err(AlpineSdkError::StatusMismatch(mismatch.to_string()));
}
Ok(reply)
}
pub async fn status_vendor(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
self.status().await
}
pub async fn health(&self) -> Result<ControlReply<HealthReply>, AlpineSdkError> {
self.control_command("health").await
}
pub async fn identity(&self) -> Result<ControlReply<IdentityReply>, AlpineSdkError> {
self.control_command("identity").await
}
pub async fn metadata(&self) -> Result<ControlReply<MetadataReply>, AlpineSdkError> {
self.control_command("metadata").await
}
pub async fn control(
&self,
command: ControlCommand,
) -> Result<ControlResponse, AlpineSdkError> {
self.control_with_options(command, ControlOptions::default())
.await
}
pub fn control_dry_run(&self, command: ControlCommand) -> ControlDryRun {
let mut warnings = Vec::new();
if command.requires_trust() {
warnings.push("requires trusted device identity".to_string());
}
ControlDryRun {
command: format!("{:?}", command),
allowed: true,
warnings,
reason: "no network calls made".to_string(),
run_id: self.run_id.clone(),
remote_addr: self.remote_addr,
}
}
pub async fn control_with_options(
&self,
command: ControlCommand,
options: ControlOptions,
) -> Result<ControlResponse, AlpineSdkError> {
let attempts = options
.retry
.as_ref()
.map(|policy| policy.max_attempts.max(1))
.unwrap_or(1);
let mut attempt = 0usize;
loop {
attempt = attempt.saturating_add(1);
let run = async { self.control_once(command).await };
let result = if let Some(timeout_duration) = options.timeout {
match timeout(timeout_duration, run).await {
Ok(result) => result,
Err(_) => Err(AlpineSdkError::Timeout),
}
} else {
run.await
};
match result {
Ok(reply) => {
if attempt > 1 {
warn!(
"[ALPINE][CONTROL][WARN] command={:?} succeeded after {} retries",
command,
attempt - 1
);
}
return Ok(reply);
}
Err(err) => {
if attempt >= attempts {
return Err(err);
}
if let Some(policy) = options.retry.as_ref() {
sleep(control_backoff(policy, attempt)).await;
}
}
}
}
}
pub async fn control_dangerous(
&self,
command: DangerousControlCommand,
) -> Result<ControlReply<Value>, AlpineSdkError> {
self.control_dangerous_with_options(command, ControlOptions::default())
.await
}
pub fn control_dangerous_dry_run(
&self,
command: DangerousControlCommand,
options: ControlOptions,
) -> ControlDryRun {
let mut warnings = Vec::new();
let mut allowed = true;
let mut reason = "no network calls made".to_string();
if !options.allow_dangerous {
allowed = false;
reason = "dangerous control blocked; allow_dangerous not set".to_string();
}
let command_label = format!("{:?}", command);
if let DangerousControlCommand::Vendor(name) = &command {
if !is_builtin_vendor_command(name) {
match &self.vendor_registry {
Some(registry) => {
if !registry.is_allowed(name) {
allowed = false;
reason = "vendor extension not registered".to_string();
}
}
None => {
allowed = false;
reason = "vendor extension registry not configured".to_string();
}
}
}
} else {
warnings.push("dangerous command requires explicit allow_dangerous".to_string());
}
ControlDryRun {
command: command_label,
allowed,
warnings,
reason,
run_id: self.run_id.clone(),
remote_addr: self.remote_addr,
}
}
pub async fn control_dangerous_with_options(
&self,
command: DangerousControlCommand,
options: ControlOptions,
) -> Result<ControlReply<Value>, AlpineSdkError> {
if !options.allow_dangerous {
return Err(AlpineSdkError::DangerousControlDisallowed);
}
let attempts = options
.retry
.as_ref()
.map(|policy| policy.max_attempts.max(1))
.unwrap_or(1);
let mut attempt = 0usize;
loop {
attempt = attempt.saturating_add(1);
let run = async { self.control_vendor_once(command.as_command()).await };
let result = if let Some(timeout_duration) = options.timeout {
match timeout(timeout_duration, run).await {
Ok(result) => result,
Err(_) => Err(AlpineSdkError::Timeout),
}
} else {
run.await
};
match result {
Ok(reply) => {
if attempt > 1 {
warn!(
"[ALPINE][CONTROL][WARN] dangerous_command={:?} succeeded after {} retries",
command,
attempt - 1
);
}
return Ok(reply);
}
Err(err) => {
if attempt >= attempts {
return Err(err);
}
if let Some(policy) = options.retry.as_ref() {
sleep(control_backoff(policy, attempt)).await;
}
}
}
}
}
pub async fn probe_status(&self) -> ProbeResult {
self.probe_status_with_options(ProbeOptions::default())
.await
}
pub async fn probe_status_with_options(&self, options: ProbeOptions) -> ProbeResult {
let mut result = ProbeResult {
state: ProbeState::Offline,
ping: None,
status: None,
health: None,
ping_rtt_ms: None,
status_rtt_ms: None,
health_rtt_ms: None,
detail: None,
errors: Vec::new(),
};
let ping_start = std::time::Instant::now();
let ping_reply = self
.control_with_options(ControlCommand::Ping, options.control.clone())
.await;
match ping_reply {
Ok(ControlResponse::Ping(reply)) => {
result.ping_rtt_ms = Some(ping_start.elapsed().as_millis());
result.ping = reply.payload.clone();
if !reply.ok() {
if let Some(detail) = reply.detail() {
result.detail = Some(detail.to_string());
}
}
}
Ok(other) => {
result.record_error(
ProbeStep::Ping,
format!("unexpected ping response: {:?}", other),
);
return result;
}
Err(err) => {
result.record_error(ProbeStep::Ping, err.to_string());
return result;
}
}
let status_start = std::time::Instant::now();
let status_reply = self
.control_with_options(ControlCommand::Status, options.control.clone())
.await;
match status_reply {
Ok(ControlResponse::Status(reply)) => {
result.status_rtt_ms = Some(status_start.elapsed().as_millis());
result.status = reply.payload.clone();
if result.detail.is_none() {
if let Some(detail) = reply.detail() {
result.detail = Some(detail.to_string());
} else if let Some(payload) = reply.payload.as_ref() {
result.detail = payload.detail.clone();
}
}
}
Ok(other) => {
result.record_error(
ProbeStep::Status,
format!("unexpected status response: {:?}", other),
);
}
Err(err) => {
result.record_error(ProbeStep::Status, err.to_string());
}
}
if result.status.is_none() && options.include_health {
let health_start = std::time::Instant::now();
let health_reply = self
.control_with_options(ControlCommand::Health, options.control.clone())
.await;
match health_reply {
Ok(ControlResponse::Health(reply)) => {
result.health_rtt_ms = Some(health_start.elapsed().as_millis());
result.health = reply.payload.clone();
if result.detail.is_none() {
if let Some(detail) = reply.detail() {
result.detail = Some(detail.to_string());
} else if let Some(payload) = reply.payload.as_ref() {
result.detail = payload.detail.clone();
}
}
}
Ok(other) => {
result.record_error(
ProbeStep::Health,
format!("unexpected health response: {:?}", other),
);
}
Err(err) => {
result.record_error(ProbeStep::Health, err.to_string());
}
}
}
let healthy = result
.status
.as_ref()
.and_then(|payload| payload.healthy)
.or_else(|| result.health.as_ref().and_then(|payload| payload.healthy));
result.state = if healthy == Some(false) {
ProbeState::Degraded
} else if result.status.is_some() || result.health.is_some() {
ProbeState::Online
} else if result.errors.is_empty() {
ProbeState::Online
} else {
ProbeState::Degraded
};
result
}
async fn control_once(
&self,
command: ControlCommand,
) -> Result<ControlResponse, AlpineSdkError> {
match command {
ControlCommand::Ping => Ok(ControlResponse::Ping(self.ping().await?)),
ControlCommand::Status => Ok(ControlResponse::Status(self.status().await?)),
ControlCommand::GetStatus => Ok(ControlResponse::Status(self.get_status().await?)),
ControlCommand::Health => Ok(ControlResponse::Health(self.health().await?)),
ControlCommand::Identity => Ok(ControlResponse::Identity(self.identity().await?)),
ControlCommand::Metadata => Ok(ControlResponse::Metadata(self.metadata().await?)),
}
}
async fn control_command<T>(&self, command: &str) -> Result<ControlReply<T>, AlpineSdkError>
where
T: DeserializeOwned,
{
let payload = json!({ "command": command });
self.control_request(ControlOp::Vendor, payload).await
}
async fn control_vendor_once(
&self,
command: &str,
) -> Result<ControlReply<Value>, AlpineSdkError> {
if !is_builtin_vendor_command(command) {
match &self.vendor_registry {
Some(registry) => {
if !registry.is_allowed(command) {
return Err(AlpineSdkError::VendorExtensionNotRegistered(
command.to_string(),
));
}
}
None => {
return Err(AlpineSdkError::VendorExtensionNotRegistered(
command.to_string(),
));
}
}
}
let payload = json!({ "command": command });
self.control_request(ControlOp::Vendor, payload).await
}
async fn control_request<T>(
&self,
op: ControlOp,
payload: Value,
) -> Result<ControlReply<T>, AlpineSdkError>
where
T: DeserializeOwned,
{
let transport = SharedTransport::new(self._transport.clone());
let mut channel = ReliableControlChannel::new(transport);
let ack = self.control.send(&mut channel, op, payload).await?;
let parsed = ControlCrypto::decode_ack_payload::<T>(ack.payload.as_deref())
.map_err(AlpineSdkError::from)?;
Ok(ControlReply {
ack,
payload: parsed,
})
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
pub fn udp_socket(&self) -> Arc<UdpSocket> {
self.udp_socket.clone()
}
pub fn session_id(&self) -> Option<String> {
self.session
.established()
.map(|established| established.session_id.clone())
}
pub fn device_capabilities(&self) -> Option<CapabilitySet> {
self.session
.established()
.map(|established| established.capabilities.clone())
}
pub fn session_capabilities(&self) -> Option<&CapabilitySet> {
self.session_capabilities.as_ref()
}
pub fn effective_capabilities(&self) -> Option<CapabilitySet> {
self.session_capabilities
.clone()
.or_else(|| self.device_capabilities())
}
pub fn narrow_capabilities(&mut self, caps: CapabilitySet) -> Result<(), AlpineSdkError> {
let device_caps = self.device_capabilities().ok_or_else(|| {
AlpineSdkError::InvalidCapabilities("device capabilities missing".into())
})?;
validate_capability_subset(&caps, &device_caps)?;
self.session_capabilities = Some(caps);
Ok(())
}
pub fn run_id(&self) -> Option<&str> {
self.run_id.as_deref()
}
pub fn set_run_id(&mut self, run_id: String) {
self.run_id = Some(run_id);
}
pub fn vendor_extension_registry(&self) -> Option<&VendorExtensionRegistry> {
self.vendor_registry.as_ref()
}
pub fn set_vendor_extension_registry(&mut self, registry: VendorExtensionRegistry) {
self.vendor_registry = Some(registry);
}
}
#[derive(Debug)]
pub struct SessionGuard {
client: AlpineClient,
idle_timeout: Duration,
last_used: Instant,
idle_reason: Option<String>,
}
impl SessionGuard {
pub fn new(client: AlpineClient, idle_timeout: Duration) -> Self {
Self {
client,
idle_timeout,
last_used: Instant::now(),
idle_reason: None,
}
}
pub fn with_idle_reason(mut self, reason: impl Into<String>) -> Self {
self.idle_reason = Some(reason.into());
self
}
pub fn client(&self) -> &AlpineClient {
&self.client
}
pub fn client_mut(&mut self) -> &mut AlpineClient {
self.touch();
&mut self.client
}
pub fn touch(&mut self) {
self.last_used = Instant::now();
}
pub fn idle_for(&self) -> Duration {
self.last_used.elapsed()
}
pub fn is_expired(&self) -> bool {
self.last_used.elapsed() >= self.idle_timeout
}
pub fn idle_reason(&self) -> Option<&str> {
self.idle_reason.as_deref()
}
pub async fn close(self) {
self.client.close().await;
}
pub fn into_inner(self) -> AlpineClient {
self.client
}
}
#[derive(Debug, Clone)]
pub struct SafeClientOptions {
pub require_online: bool,
pub allow_degraded: bool,
pub require_trusted: bool,
pub probe_options: ProbeOptions,
}
impl Default for SafeClientOptions {
fn default() -> Self {
Self {
require_online: true,
allow_degraded: false,
require_trusted: false,
probe_options: ProbeOptions::default(),
}
}
}
impl SafeClientOptions {
pub fn require_trusted(mut self, require: bool) -> Self {
self.require_trusted = require;
self
}
}
#[derive(Debug)]
pub struct SafeClient {
client: AlpineClient,
last_probe: ProbeResult,
last_known_good: Option<ProbeResult>,
trust: DeviceTrustState,
options: SafeClientOptions,
}
impl SafeClient {
pub async fn from_client(
client: AlpineClient,
trust: DeviceTrustState,
options: SafeClientOptions,
) -> Result<Self, AlpineSdkError> {
if options.require_trusted && trust != DeviceTrustState::Trusted {
return Err(AlpineSdkError::UntrustedDevice(
"trusted identity required".into(),
));
}
let mut wrapper = Self {
client,
last_probe: ProbeResult {
state: ProbeState::Offline,
ping: None,
status: None,
health: None,
ping_rtt_ms: None,
status_rtt_ms: None,
health_rtt_ms: None,
detail: None,
errors: Vec::new(),
},
last_known_good: None,
trust,
options,
};
wrapper.refresh_probe().await?;
Ok(wrapper)
}
pub async fn from_discovery(
client: AlpineClient,
outcome: &crate::discovery::DiscoveryOutcome,
options: SafeClientOptions,
) -> Result<Self, AlpineSdkError> {
let trust = outcome.trust_state();
let mut wrapped = Self::from_client(client, trust, options).await?;
wrapped.client.set_run_id(outcome.run_id.clone());
Ok(wrapped)
}
pub fn client(&self) -> &AlpineClient {
&self.client
}
pub fn client_mut(&mut self) -> &mut AlpineClient {
&mut self.client
}
pub fn last_probe(&self) -> &ProbeResult {
&self.last_probe
}
pub fn last_known_good(&self) -> Option<&ProbeResult> {
self.last_known_good.as_ref()
}
pub fn last_known_good_delta(&self) -> Option<String> {
self.last_known_good
.as_ref()
.and_then(|previous| self.last_probe.delta_summary(previous))
}
pub fn trust_state(&self) -> DeviceTrustState {
self.trust.clone()
}
pub async fn refresh_probe(&mut self) -> Result<&ProbeResult, AlpineSdkError> {
let probe = self
.client
.probe_status_with_options(self.options.probe_options.clone())
.await;
let acceptable = self.acceptable_probe_state(&probe);
let failure_detail = if self.options.require_online && !acceptable {
Some(
probe
.detail
.clone()
.or_else(|| probe.errors.first().map(|err| err.message.clone()))
.unwrap_or_else(|| format!("probe state {:?}", probe.state)),
)
} else {
None
};
self.last_probe = probe;
if acceptable {
self.last_known_good = Some(self.last_probe.clone());
}
if let Some(detail) = failure_detail {
return Err(AlpineSdkError::ProbeFailed(detail));
}
Ok(&self.last_probe)
}
pub async fn control(
&mut self,
command: ControlCommand,
) -> Result<ControlResponse, AlpineSdkError> {
if self.options.require_online {
self.refresh_probe().await?;
}
self.client
.control_with_options(command, self.options.probe_options.control.clone())
.await
}
pub async fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
if self.options.require_online {
self.refresh_probe().await?;
}
self.client.start_stream(profile)
}
pub async fn send_frame(
&mut self,
channel_format: ChannelFormat,
channels: Vec<u16>,
priority: u8,
groups: Option<HashMap<String, Vec<u16>>>,
metadata: Option<HashMap<String, Value>>,
) -> Result<(), AlpineSdkError> {
if self.options.require_online {
self.refresh_probe().await?;
}
self.client
.send_frame(channel_format, channels, priority, groups, metadata)
}
pub async fn close(self) {
self.client.close().await;
}
pub fn into_inner(self) -> AlpineClient {
self.client
}
fn acceptable_probe_state(&self, probe: &ProbeResult) -> bool {
match probe.state {
ProbeState::Online => true,
ProbeState::Degraded => self.options.allow_degraded,
ProbeState::Offline => false,
}
}
}
#[derive(Clone)]
struct SharedTransport<T> {
inner: Arc<Mutex<T>>,
}
impl<T> SharedTransport<T> {
fn new(inner: Arc<Mutex<T>>) -> Self {
Self { inner }
}
}
#[async_trait]
impl<T> HandshakeTransport for SharedTransport<T>
where
T: HandshakeTransport + Send,
{
async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
let mut guard = self.inner.lock().await;
guard.send(msg).await
}
async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
let mut guard = self.inner.lock().await;
guard.recv().await
}
}
#[derive(Debug)]
struct InstrumentedTransport<T> {
inner: T,
local_addr: SocketAddr,
remote_addr: SocketAddr,
session_init_sent: bool,
}
impl<T> InstrumentedTransport<T> {
fn new(inner: T, local_addr: SocketAddr, remote_addr: SocketAddr) -> Self {
Self {
inner,
local_addr,
remote_addr,
session_init_sent: false,
}
}
}
#[async_trait]
impl<T> HandshakeTransport for InstrumentedTransport<T>
where
T: HandshakeTransport + Send,
{
async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
let category = message_category(&msg);
let phase_state = phase::current_phase();
if matches!(msg, HandshakeMessage::SessionInit(_)) {
if self.session_init_sent {
warn!(
"[ALPINE][HANDSHAKE][WARN] duplicate SessionInit send attempt detected local_port={} remote_addr={} phase={}",
self.local_addr.port(),
self.remote_addr,
phase_state.label()
);
return Err(HandshakeError::Protocol(
"duplicate SessionInit blocked; reset handshake first".into(),
));
}
self.session_init_sent = true;
}
if phase_state == Phase::Handshake && category != MessageCategory::Handshake {
warn!(
"[ALPINE][BUG] non-handshake packet emitted during handshake msg_type={} local_port={} remote_addr={}",
category.as_str(),
self.local_addr.port(),
self.remote_addr
);
}
let encoded_len = serde_cbor::to_vec(&msg).map(|buf| buf.len()).unwrap_or(0);
info!(
"[ALPINE][TX] msg_type={} variant={} local_port={} remote_addr={} phase={} len={}",
category.as_str(),
message_label(&msg),
self.local_addr.port(),
self.remote_addr,
phase_state.label(),
encoded_len
);
self.inner.send(msg).await
}
async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
let phase_state = phase::current_phase();
let result = self.inner.recv().await;
match &result {
Ok(msg) => {
let category = message_category(msg);
info!(
"[ALPINE][RX] msg_type={} variant={} local_port={} remote_addr={} phase={}",
category.as_str(),
message_label(msg),
self.local_addr.port(),
self.remote_addr,
phase_state.label()
);
}
Err(err) => {
warn!(
"[ALPINE][RX] recv error local_port={} remote_addr={} phase={} error={}",
self.local_addr.port(),
self.remote_addr,
phase_state.label(),
err
);
}
}
result
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(dead_code)]
enum MessageCategory {
Handshake,
Control,
Keepalive,
Ack,
Unknown,
}
impl MessageCategory {
fn as_str(&self) -> &'static str {
match self {
MessageCategory::Handshake => "Handshake",
MessageCategory::Control => "Control",
MessageCategory::Keepalive => "Keepalive",
MessageCategory::Ack => "Ack",
MessageCategory::Unknown => "Unknown",
}
}
}
fn message_category(msg: &HandshakeMessage) -> MessageCategory {
match msg {
HandshakeMessage::SessionInit(_)
| HandshakeMessage::SessionAck(_)
| HandshakeMessage::SessionReady(_)
| HandshakeMessage::SessionComplete(_)
| HandshakeMessage::SessionEstablished(_) => MessageCategory::Handshake,
HandshakeMessage::Control(_) => MessageCategory::Control,
HandshakeMessage::Keepalive(_) => MessageCategory::Keepalive,
HandshakeMessage::Ack(_) => MessageCategory::Ack,
}
}
fn message_label(msg: &HandshakeMessage) -> &'static str {
match msg {
HandshakeMessage::SessionInit(_) => "SessionInit",
HandshakeMessage::SessionAck(_) => "SessionAck",
HandshakeMessage::SessionReady(_) => "SessionReady",
HandshakeMessage::SessionComplete(_) => "SessionComplete",
HandshakeMessage::SessionEstablished(_) => "SessionEstablished",
HandshakeMessage::Control(_) => "Control",
HandshakeMessage::Keepalive(_) => "Keepalive",
HandshakeMessage::Ack(_) => "Ack",
}
}
fn control_backoff(policy: &ControlRetryPolicy, attempt: usize) -> Duration {
let exponent = attempt.saturating_sub(1) as u32;
let factor = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
let delay = policy.backoff_base_ms.saturating_mul(factor);
Duration::from_millis(delay.min(policy.backoff_max_ms))
}
fn is_builtin_vendor_command(command: &str) -> bool {
matches!(
command,
"ping" | "status" | "health" | "identity" | "metadata"
)
}
fn validate_capability_subset(
requested: &CapabilitySet,
device: &CapabilitySet,
) -> Result<(), AlpineSdkError> {
for format in requested.channel_formats.iter() {
if !device.channel_formats.contains(format) {
return Err(AlpineSdkError::InvalidCapabilities(format!(
"channel format {:?} not supported by device",
format
)));
}
}
if requested.max_channels > device.max_channels {
return Err(AlpineSdkError::InvalidCapabilities(format!(
"max channels {} exceeds device max {}",
requested.max_channels, device.max_channels
)));
}
if requested.grouping_supported && !device.grouping_supported {
return Err(AlpineSdkError::InvalidCapabilities(
"grouping not supported by device".into(),
));
}
if requested.streaming_supported && !device.streaming_supported {
return Err(AlpineSdkError::InvalidCapabilities(
"streaming not supported by device".into(),
));
}
if requested.encryption_supported && !device.encryption_supported {
return Err(AlpineSdkError::InvalidCapabilities(
"encryption not supported by device".into(),
));
}
if let Some(extensions) = requested.vendor_extensions.as_ref() {
if let Some(device_extensions) = device.vendor_extensions.as_ref() {
for key in extensions.keys() {
if !device_extensions.contains_key(key) {
return Err(AlpineSdkError::InvalidCapabilities(format!(
"vendor extension {} not supported by device",
key
)));
}
}
} else {
return Err(AlpineSdkError::InvalidCapabilities(
"vendor extensions not supported by device".into(),
));
}
}
Ok(())
}