Skip to main content

alpine_protocol_sdk/session/
session.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use alpine::control::{ControlClient, ControlCrypto};
7use alpine::crypto::identity::NodeCredentials;
8use alpine::crypto::X25519KeyExchange;
9use alpine::handshake::keepalive;
10use alpine::handshake::transport::{CborUdpTransport, ReliableControlChannel, TimeoutTransport};
11use alpine::handshake::{HandshakeContext, HandshakeError, HandshakeMessage, HandshakeTransport};
12use alpine::messages::{
13    Acknowledge, CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity,
14};
15use alpine::profile::StreamProfile;
16use alpine::session::{AlnpSession, Ed25519Authenticator};
17use alpine::stream::AlnpStream;
18use async_trait::async_trait;
19use serde::de::DeserializeOwned;
20use serde::Deserialize;
21use serde_cbor;
22use serde_json::{json, Value};
23use tokio::sync::Mutex;
24use tokio::task::JoinHandle;
25use tokio::time::{sleep, timeout};
26use tracing::{info, warn};
27use uuid::Uuid;
28
29use crate::transport::UdpFrameTransport;
30use crate::{
31    discovery::DeviceTrustState,
32    environment::ensure_supported_environment,
33    error::AlpineSdkError,
34    phase::{self, Phase},
35    self_check::run_sdk_self_check,
36    vendor::VendorExtensionRegistry,
37};
38use tokio::net::UdpSocket;
39
40type WireTransport = InstrumentedTransport<TimeoutTransport<CborUdpTransport>>;
41
42/// High-level client that wraps the ALPINE protocol primitives.
43#[derive(Debug)]
44pub struct AlpineClient {
45    session: AlnpSession,
46    _transport: Arc<Mutex<WireTransport>>,
47    udp_socket: Arc<UdpSocket>,
48    local_addr: SocketAddr,
49    remote_addr: SocketAddr,
50    stream: Option<AlnpStream<UdpFrameTransport>>,
51    control: ControlClient,
52    keepalive_handle: Option<JoinHandle<()>>,
53    session_capabilities: Option<CapabilitySet>,
54    run_id: Option<String>,
55    vendor_registry: Option<VendorExtensionRegistry>,
56}
57
58/// Typed control response returned by `AlpineClient` helpers.
59#[derive(Debug, Clone)]
60pub struct ControlReply<T> {
61    pub ack: Acknowledge,
62    pub payload: Option<T>,
63}
64
65impl<T> ControlReply<T> {
66    pub fn ok(&self) -> bool {
67        self.ack.ok
68    }
69
70    pub fn detail(&self) -> Option<&str> {
71        self.ack.detail.as_deref()
72    }
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum ProbeStep {
77    Ping,
78    Status,
79    Health,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum ProbeState {
84    Online,
85    Degraded,
86    Offline,
87}
88
89#[derive(Debug, Clone)]
90pub struct ProbeError {
91    pub step: ProbeStep,
92    pub message: String,
93}
94
95#[derive(Debug, Clone)]
96pub struct ProbeResult {
97    pub state: ProbeState,
98    pub ping: Option<PingReply>,
99    pub status: Option<StatusReply>,
100    pub health: Option<HealthReply>,
101    pub ping_rtt_ms: Option<u128>,
102    pub status_rtt_ms: Option<u128>,
103    pub health_rtt_ms: Option<u128>,
104    pub detail: Option<String>,
105    pub errors: Vec<ProbeError>,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum UiSeverity {
110    Ok,
111    Warn,
112    Critical,
113}
114
115#[derive(Debug, Clone)]
116pub struct UiBadge {
117    pub label: String,
118    pub severity: UiSeverity,
119    pub color: &'static str,
120}
121
122impl ProbeResult {
123    fn record_error(&mut self, step: ProbeStep, message: String) {
124        self.errors.push(ProbeError { step, message });
125    }
126
127    pub fn to_device_state(&self, trust: DeviceTrustState) -> DeviceState {
128        let healthy = self
129            .status
130            .as_ref()
131            .and_then(|payload| payload.healthy)
132            .or_else(|| self.health.as_ref().and_then(|payload| payload.healthy));
133        let online = matches!(self.state, ProbeState::Online | ProbeState::Degraded);
134        let label = if trust != DeviceTrustState::Trusted {
135            "untrusted".to_string()
136        } else {
137            match self.state {
138                ProbeState::Online => "online".to_string(),
139                ProbeState::Degraded => "degraded".to_string(),
140                ProbeState::Offline => "offline".to_string(),
141            }
142        };
143        let last_error = self.errors.first().map(|err| err.message.clone());
144        DeviceState {
145            state: self.state,
146            online,
147            healthy,
148            trust,
149            last_rtt_ms: self.ping_rtt_ms,
150            detail: self.detail.clone(),
151            last_error,
152            label,
153        }
154    }
155
156    pub fn delta_summary(&self, previous: &ProbeResult) -> Option<String> {
157        if self.state != previous.state {
158            return Some(format!(
159                "state changed from {:?} to {:?}",
160                previous.state, self.state
161            ));
162        }
163        if self.detail != previous.detail {
164            if let Some(detail) = &self.detail {
165                return Some(format!("detail changed to '{}'", detail));
166            }
167            return Some("detail cleared".to_string());
168        }
169        None
170    }
171
172    pub fn to_ui_badge(&self, trust: DeviceTrustState) -> UiBadge {
173        if trust != DeviceTrustState::Trusted {
174            return UiBadge {
175                label: "untrusted".to_string(),
176                severity: UiSeverity::Warn,
177                color: "yellow",
178            };
179        }
180        match self.state {
181            ProbeState::Online => UiBadge {
182                label: "online".to_string(),
183                severity: UiSeverity::Ok,
184                color: "green",
185            },
186            ProbeState::Degraded => UiBadge {
187                label: "degraded".to_string(),
188                severity: UiSeverity::Warn,
189                color: "orange",
190            },
191            ProbeState::Offline => UiBadge {
192                label: "offline".to_string(),
193                severity: UiSeverity::Critical,
194                color: "red",
195            },
196        }
197    }
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub enum ControlCommand {
202    Ping,
203    Status,
204    GetStatus,
205    Health,
206    Identity,
207    Metadata,
208}
209
210impl ControlCommand {
211    /// Indicates whether the command should only run against trusted devices.
212    pub fn requires_trust(&self) -> bool {
213        matches!(self, ControlCommand::Identity | ControlCommand::Metadata)
214    }
215
216    pub fn is_observe_safe(&self) -> bool {
217        matches!(
218            self,
219            ControlCommand::Ping
220                | ControlCommand::Status
221                | ControlCommand::GetStatus
222                | ControlCommand::Health
223        )
224    }
225}
226
227#[derive(Debug)]
228pub struct StatusMismatchError {
229    pub detail: String,
230}
231
232impl std::fmt::Display for StatusMismatchError {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        write!(f, "{}", self.detail)
235    }
236}
237
238impl std::error::Error for StatusMismatchError {}
239
240#[derive(Debug, Clone)]
241pub enum DangerousControlCommand {
242    FirmwareUpdate,
243    Reboot,
244    IdentityReset,
245    Vendor(String),
246}
247
248impl DangerousControlCommand {
249    fn as_command(&self) -> &str {
250        match self {
251            DangerousControlCommand::FirmwareUpdate => "firmware_update",
252            DangerousControlCommand::Reboot => "reboot",
253            DangerousControlCommand::IdentityReset => "identity_reset",
254            DangerousControlCommand::Vendor(name) => name.as_str(),
255        }
256    }
257}
258
259#[derive(Debug, Clone)]
260pub enum ControlResponse {
261    Ping(ControlReply<PingReply>),
262    Status(ControlReply<StatusReply>),
263    Health(ControlReply<HealthReply>),
264    Identity(ControlReply<IdentityReply>),
265    Metadata(ControlReply<MetadataReply>),
266}
267
268#[derive(Debug, Clone)]
269pub struct ControlDryRun {
270    pub command: String,
271    pub allowed: bool,
272    pub warnings: Vec<String>,
273    pub reason: String,
274    pub run_id: Option<String>,
275    pub remote_addr: SocketAddr,
276}
277
278#[derive(Debug, Clone)]
279pub struct ControlRetryPolicy {
280    pub max_attempts: usize,
281    pub backoff_base_ms: u64,
282    pub backoff_max_ms: u64,
283}
284
285impl Default for ControlRetryPolicy {
286    fn default() -> Self {
287        Self {
288            max_attempts: 3,
289            backoff_base_ms: 150,
290            backoff_max_ms: 2_000,
291        }
292    }
293}
294
295#[derive(Debug, Clone)]
296pub struct ControlOptions {
297    pub timeout: Option<Duration>,
298    pub retry: Option<ControlRetryPolicy>,
299    pub allow_dangerous: bool,
300}
301
302impl Default for ControlOptions {
303    fn default() -> Self {
304        Self {
305            timeout: None,
306            retry: None,
307            allow_dangerous: false,
308        }
309    }
310}
311
312impl ControlOptions {
313    pub fn allow_dangerous(mut self, allow: bool) -> Self {
314        self.allow_dangerous = allow;
315        self
316    }
317
318    pub fn defaults_for_ui() -> Self {
319        Self {
320            timeout: Some(Duration::from_millis(500)),
321            retry: Some(ControlRetryPolicy {
322                max_attempts: 2,
323                backoff_base_ms: 150,
324                backoff_max_ms: 300,
325            }),
326            allow_dangerous: false,
327        }
328    }
329}
330
331#[derive(Debug, Clone)]
332pub struct ProbeOptions {
333    pub include_health: bool,
334    pub control: ControlOptions,
335}
336
337#[derive(Debug, Clone)]
338pub struct DeviceState {
339    pub state: ProbeState,
340    pub online: bool,
341    pub healthy: Option<bool>,
342    pub trust: DeviceTrustState,
343    pub last_rtt_ms: Option<u128>,
344    pub detail: Option<String>,
345    pub last_error: Option<String>,
346    pub label: String,
347}
348
349impl Default for ProbeOptions {
350    fn default() -> Self {
351        Self {
352            include_health: true,
353            control: ControlOptions::default(),
354        }
355    }
356}
357
358/// Ping reply payload (may be partial depending on device support).
359#[derive(Debug, Deserialize, Clone)]
360pub struct PingReply {
361    #[serde(default)]
362    pub timestamp_ms: Option<u64>,
363    #[serde(default)]
364    pub message: Option<String>,
365}
366
367/// Status reply payload returned by the `status` helper.
368#[derive(Debug, Deserialize, Clone)]
369pub struct StatusReply {
370    #[serde(default)]
371    pub healthy: Option<bool>,
372    #[serde(default)]
373    pub detail: Option<String>,
374    #[serde(default)]
375    pub uptime_secs: Option<u64>,
376}
377
378/// Health reply payload, including optional metrics metadata.
379#[derive(Debug, Deserialize, Clone)]
380pub struct HealthReply {
381    #[serde(default)]
382    pub healthy: Option<bool>,
383    #[serde(default)]
384    pub detail: Option<String>,
385    #[serde(default)]
386    pub metrics: Option<HashMap<String, Value>>,
387}
388
389/// Alias for the fetched device identity.
390pub type IdentityReply = DeviceIdentity;
391
392/// Metadata reply payload is an arbitrary map of CBOR values.
393#[derive(Debug, Deserialize, Clone)]
394pub struct MetadataReply {
395    #[serde(default)]
396    pub metadata: HashMap<String, Value>,
397}
398
399impl AlpineClient {
400    /// Opens a session with the provided device identity and capabilities.
401    pub async fn connect(
402        local_addr: SocketAddr,
403        remote_addr: SocketAddr,
404        identity: DeviceIdentity,
405        capabilities: CapabilitySet,
406        credentials: NodeCredentials,
407    ) -> Result<Self, AlpineSdkError> {
408        Self::connect_with_context(
409            local_addr,
410            remote_addr,
411            identity,
412            capabilities,
413            credentials,
414            HandshakeContext::default(),
415        )
416        .await
417    }
418
419    pub async fn connect_checked(
420        outcome: crate::discovery::DiscoveryOutcome,
421        requested: CapabilitySet,
422        credentials: NodeCredentials,
423    ) -> Result<Self, AlpineSdkError> {
424        outcome.require_capabilities(&requested)?;
425        let identity = DeviceIdentity {
426            device_id: outcome.reply.device_id.clone(),
427            manufacturer_id: outcome.reply.manufacturer_id.clone(),
428            model_id: outcome.reply.model_id.clone(),
429            hardware_rev: outcome.reply.hardware_rev.clone(),
430            firmware_rev: outcome.reply.firmware_rev.clone(),
431        };
432        Self::connect(
433            outcome.local_addr,
434            outcome.peer,
435            identity,
436            requested,
437            credentials,
438        )
439        .await
440    }
441
442    pub async fn connect_with_nonce(
443        local_addr: SocketAddr,
444        remote_addr: SocketAddr,
445        identity: DeviceIdentity,
446        capabilities: CapabilitySet,
447        credentials: NodeCredentials,
448        client_nonce: Vec<u8>,
449    ) -> Result<Self, AlpineSdkError> {
450        Self::connect_with_context(
451            local_addr,
452            remote_addr,
453            identity,
454            capabilities,
455            credentials,
456            HandshakeContext::default().with_client_nonce(client_nonce),
457        )
458        .await
459    }
460
461    pub async fn connect_with_context_and_nonce(
462        local_addr: SocketAddr,
463        remote_addr: SocketAddr,
464        identity: DeviceIdentity,
465        capabilities: CapabilitySet,
466        credentials: NodeCredentials,
467        client_nonce: Vec<u8>,
468        context: HandshakeContext,
469    ) -> Result<Self, AlpineSdkError> {
470        Self::connect_with_context(
471            local_addr,
472            remote_addr,
473            identity,
474            capabilities,
475            credentials,
476            context.with_client_nonce(client_nonce),
477        )
478        .await
479    }
480
481    pub async fn connect_with_socket_and_nonce(
482        socket: UdpSocket,
483        remote_addr: SocketAddr,
484        identity: DeviceIdentity,
485        capabilities: CapabilitySet,
486        credentials: NodeCredentials,
487        client_nonce: Vec<u8>,
488        context: HandshakeContext,
489    ) -> Result<Self, AlpineSdkError> {
490        Self::connect_with_socket(
491            socket,
492            remote_addr,
493            identity,
494            capabilities,
495            credentials,
496            context.with_client_nonce(client_nonce),
497        )
498        .await
499    }
500
501    async fn connect_with_context(
502        local_addr: SocketAddr,
503        remote_addr: SocketAddr,
504        identity: DeviceIdentity,
505        capabilities: CapabilitySet,
506        credentials: NodeCredentials,
507        context: HandshakeContext,
508    ) -> Result<Self, AlpineSdkError> {
509        run_sdk_self_check();
510        ensure_supported_environment()?;
511        let _handshake_phase = phase::claim_handshake()?;
512        let base_transport =
513            CborUdpTransport::bind(local_addr, remote_addr, 4096, context.debug_cbor).await?;
514        Self::connect_with_transport(base_transport, identity, capabilities, credentials, context)
515            .await
516    }
517
518    async fn connect_with_socket(
519        socket: UdpSocket,
520        remote_addr: SocketAddr,
521        identity: DeviceIdentity,
522        capabilities: CapabilitySet,
523        credentials: NodeCredentials,
524        context: HandshakeContext,
525    ) -> Result<Self, AlpineSdkError> {
526        let base_transport =
527            CborUdpTransport::from_socket(socket, remote_addr, 4096, context.debug_cbor)
528                .map_err(AlpineSdkError::from)?;
529        Self::connect_with_transport(base_transport, identity, capabilities, credentials, context)
530            .await
531    }
532
533    async fn connect_with_transport(
534        base_transport: CborUdpTransport,
535        identity: DeviceIdentity,
536        capabilities: CapabilitySet,
537        credentials: NodeCredentials,
538        context: HandshakeContext,
539    ) -> Result<Self, AlpineSdkError> {
540        let udp_socket = base_transport.socket();
541        let bound_local_addr = base_transport.local_addr();
542        let peer_addr = base_transport.peer_addr();
543        info!(
544            "[ALPINE][HANDSHAKE] starting handshake for device {} local_addr={} remote_addr={} phase={}",
545            identity.device_id,
546            bound_local_addr,
547            peer_addr,
548            phase::current_phase().label()
549        );
550        if bound_local_addr.is_ipv4() != peer_addr.is_ipv4() {
551            warn!(
552                "[ALPINE][HANDSHAKE][WARN] IPv4/IPv6 mismatch local_addr={} remote_addr={}",
553                bound_local_addr, peer_addr
554            );
555        }
556        let mut transport = InstrumentedTransport::new(
557            TimeoutTransport::new(base_transport, context.recv_timeout),
558            bound_local_addr,
559            peer_addr,
560        );
561        info!(
562            "[ALPINE][HANDSHAKE] transport ready recv_timeout_ms={}",
563            context.recv_timeout.as_millis()
564        );
565        let session = AlnpSession::connect(
566            identity,
567            capabilities.clone(),
568            Ed25519Authenticator::new(credentials.clone()),
569            X25519KeyExchange::new(),
570            context,
571            &mut transport,
572        )
573        .await?;
574
575        let transport = Arc::new(Mutex::new(transport));
576        let established = session
577            .established()
578            .ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?
579            .clone();
580        let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
581            transport.clone(),
582            Duration::from_secs(5),
583            established.session_id.clone(),
584        ));
585        let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
586            .unwrap_or_else(|_| Uuid::new_v4());
587        let control_crypto = ControlCrypto::new(
588            session
589                .keys()
590                .ok_or_else(|| AlpineSdkError::Io("session keys missing".into()))?,
591        );
592        let control =
593            ControlClient::new(device_uuid, established.session_id.clone(), control_crypto);
594        info!(
595            "[ALPINE][HANDSHAKE] session established session_id={} local_addr={} remote_addr={}",
596            established.session_id, bound_local_addr, peer_addr
597        );
598
599        Ok(Self {
600            session,
601            _transport: transport,
602            udp_socket,
603            local_addr: bound_local_addr,
604            remote_addr: peer_addr,
605            stream: None,
606            control,
607            keepalive_handle: Some(keepalive_handle),
608            session_capabilities: None,
609            run_id: None,
610            vendor_registry: None,
611        })
612    }
613
614    /// Starts streaming with the supplied profile and returns the generated config id.
615    pub fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
616        let compiled = profile
617            .compile()
618            .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
619        self.session
620            .set_stream_profile(compiled.clone())
621            .map_err(|err| AlpineSdkError::HandshakeFailed(err.to_string()))?;
622        self.session.mark_streaming();
623
624        // Use an ephemeral port for streaming to avoid rebinding the handshake/control socket.
625        let stream_local = SocketAddr::new(self.local_addr.ip(), 0);
626        let stream_socket =
627            UdpFrameTransport::new(stream_local, self.remote_addr).map_err(AlpineSdkError::from)?;
628        let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
629        self.stream = Some(stream);
630        Ok(compiled.config_id().to_string())
631    }
632
633    /// Sends a streaming frame over the active session.
634    pub fn send_frame(
635        &self,
636        channel_format: ChannelFormat,
637        channels: Vec<u16>,
638        priority: u8,
639        groups: Option<HashMap<String, Vec<u16>>>,
640        metadata: Option<HashMap<String, Value>>,
641    ) -> Result<(), AlpineSdkError> {
642        let stream = self
643            .stream
644            .as_ref()
645            .ok_or_else(|| AlpineSdkError::Io("stream not started".into()))?;
646        stream
647            .send(channel_format, channels, priority, groups, metadata)
648            .map_err(AlpineSdkError::from)
649    }
650
651    /// Stops keep-alive and shuts down the session.
652    pub async fn close(mut self) {
653        self.session.close();
654        if let Some(handle) = self.keepalive_handle.take() {
655            handle.abort();
656        }
657    }
658
659    /// Builds a signed control envelope for the active session.
660    pub fn control_envelope(
661        &self,
662        seq: u64,
663        op: ControlOp,
664        payload: Value,
665    ) -> Result<ControlEnvelope, HandshakeError> {
666        self.control.envelope(seq, op, payload)
667    }
668
669    /// Sends a ping command and returns the parsed reply (CBOR payload optional).
670    pub async fn ping(&self) -> Result<ControlReply<PingReply>, AlpineSdkError> {
671        self.control_command("ping").await
672    }
673
674    /// Returns the status payload the node publishes for callers (vendor command).
675    pub async fn status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
676        self.control_command("status").await
677    }
678
679    /// Returns the status payload via the standard control op.
680    pub async fn status_standard(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
681        self.control_request(ControlOp::GetStatus, json!({})).await
682    }
683
684    /// Returns the standard status payload or an explicit mismatch error.
685    pub async fn get_status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
686        let reply = self.status_standard().await?;
687        if !reply.ack.ok {
688            let detail = reply
689                .detail()
690                .unwrap_or("device rejected standard get_status")
691                .to_string();
692            let mismatch = StatusMismatchError {
693                detail: format!(
694                    "standard get_status rejected; vendor status may be required ({})",
695                    detail
696                ),
697            };
698            return Err(AlpineSdkError::StatusMismatch(mismatch.to_string()));
699        }
700        Ok(reply)
701    }
702
703    /// Returns the status payload via the vendor command.
704    pub async fn status_vendor(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
705        self.status().await
706    }
707
708    /// Reads the health payload, including optional metrics (vendor command).
709    pub async fn health(&self) -> Result<ControlReply<HealthReply>, AlpineSdkError> {
710        self.control_command("health").await
711    }
712
713    /// Requests the device identity through the control channel.
714    pub async fn identity(&self) -> Result<ControlReply<IdentityReply>, AlpineSdkError> {
715        self.control_command("identity").await
716    }
717
718    /// Fetches metadata that the device publishes in CBOR.
719    pub async fn metadata(&self) -> Result<ControlReply<MetadataReply>, AlpineSdkError> {
720        self.control_command("metadata").await
721    }
722
723    /// Sends a control command and returns a typed response.
724    pub async fn control(
725        &self,
726        command: ControlCommand,
727    ) -> Result<ControlResponse, AlpineSdkError> {
728        self.control_with_options(command, ControlOptions::default())
729            .await
730    }
731
732    pub fn control_dry_run(&self, command: ControlCommand) -> ControlDryRun {
733        let mut warnings = Vec::new();
734        if command.requires_trust() {
735            warnings.push("requires trusted device identity".to_string());
736        }
737        ControlDryRun {
738            command: format!("{:?}", command),
739            allowed: true,
740            warnings,
741            reason: "no network calls made".to_string(),
742            run_id: self.run_id.clone(),
743            remote_addr: self.remote_addr,
744        }
745    }
746
747    /// Sends a control command with timeout and retry options.
748    pub async fn control_with_options(
749        &self,
750        command: ControlCommand,
751        options: ControlOptions,
752    ) -> Result<ControlResponse, AlpineSdkError> {
753        let attempts = options
754            .retry
755            .as_ref()
756            .map(|policy| policy.max_attempts.max(1))
757            .unwrap_or(1);
758        let mut attempt = 0usize;
759        loop {
760            attempt = attempt.saturating_add(1);
761            let run = async { self.control_once(command).await };
762            let result = if let Some(timeout_duration) = options.timeout {
763                match timeout(timeout_duration, run).await {
764                    Ok(result) => result,
765                    Err(_) => Err(AlpineSdkError::Timeout),
766                }
767            } else {
768                run.await
769            };
770
771            match result {
772                Ok(reply) => {
773                    if attempt > 1 {
774                        warn!(
775                            "[ALPINE][CONTROL][WARN] command={:?} succeeded after {} retries",
776                            command,
777                            attempt - 1
778                        );
779                    }
780                    return Ok(reply);
781                }
782                Err(err) => {
783                    if attempt >= attempts {
784                        return Err(err);
785                    }
786                    if let Some(policy) = options.retry.as_ref() {
787                        sleep(control_backoff(policy, attempt)).await;
788                    }
789                }
790            }
791        }
792    }
793
794    /// Sends a dangerous control command after explicit opt-in.
795    pub async fn control_dangerous(
796        &self,
797        command: DangerousControlCommand,
798    ) -> Result<ControlReply<Value>, AlpineSdkError> {
799        self.control_dangerous_with_options(command, ControlOptions::default())
800            .await
801    }
802
803    pub fn control_dangerous_dry_run(
804        &self,
805        command: DangerousControlCommand,
806        options: ControlOptions,
807    ) -> ControlDryRun {
808        let mut warnings = Vec::new();
809        let mut allowed = true;
810        let mut reason = "no network calls made".to_string();
811        if !options.allow_dangerous {
812            allowed = false;
813            reason = "dangerous control blocked; allow_dangerous not set".to_string();
814        }
815        let command_label = format!("{:?}", command);
816        if let DangerousControlCommand::Vendor(name) = &command {
817            if !is_builtin_vendor_command(name) {
818                match &self.vendor_registry {
819                    Some(registry) => {
820                        if !registry.is_allowed(name) {
821                            allowed = false;
822                            reason = "vendor extension not registered".to_string();
823                        }
824                    }
825                    None => {
826                        allowed = false;
827                        reason = "vendor extension registry not configured".to_string();
828                    }
829                }
830            }
831        } else {
832            warnings.push("dangerous command requires explicit allow_dangerous".to_string());
833        }
834        ControlDryRun {
835            command: command_label,
836            allowed,
837            warnings,
838            reason,
839            run_id: self.run_id.clone(),
840            remote_addr: self.remote_addr,
841        }
842    }
843
844    /// Sends a dangerous control command with timeout and retry options.
845    pub async fn control_dangerous_with_options(
846        &self,
847        command: DangerousControlCommand,
848        options: ControlOptions,
849    ) -> Result<ControlReply<Value>, AlpineSdkError> {
850        if !options.allow_dangerous {
851            return Err(AlpineSdkError::DangerousControlDisallowed);
852        }
853        let attempts = options
854            .retry
855            .as_ref()
856            .map(|policy| policy.max_attempts.max(1))
857            .unwrap_or(1);
858        let mut attempt = 0usize;
859        loop {
860            attempt = attempt.saturating_add(1);
861            let run = async { self.control_vendor_once(command.as_command()).await };
862            let result = if let Some(timeout_duration) = options.timeout {
863                match timeout(timeout_duration, run).await {
864                    Ok(result) => result,
865                    Err(_) => Err(AlpineSdkError::Timeout),
866                }
867            } else {
868                run.await
869            };
870
871            match result {
872                Ok(reply) => {
873                    if attempt > 1 {
874                        warn!(
875                            "[ALPINE][CONTROL][WARN] dangerous_command={:?} succeeded after {} retries",
876                            command,
877                            attempt - 1
878                        );
879                    }
880                    return Ok(reply);
881                }
882                Err(err) => {
883                    if attempt >= attempts {
884                        return Err(err);
885                    }
886                    if let Some(policy) = options.retry.as_ref() {
887                        sleep(control_backoff(policy, attempt)).await;
888                    }
889                }
890            }
891        }
892    }
893
894    /// Runs a single liveness + health probe using ping and status (health optional).
895    pub async fn probe_status(&self) -> ProbeResult {
896        self.probe_status_with_options(ProbeOptions::default())
897            .await
898    }
899
900    /// Runs a configurable liveness probe and returns a normalized status result.
901    pub async fn probe_status_with_options(&self, options: ProbeOptions) -> ProbeResult {
902        let mut result = ProbeResult {
903            state: ProbeState::Offline,
904            ping: None,
905            status: None,
906            health: None,
907            ping_rtt_ms: None,
908            status_rtt_ms: None,
909            health_rtt_ms: None,
910            detail: None,
911            errors: Vec::new(),
912        };
913
914        let ping_start = std::time::Instant::now();
915        let ping_reply = self
916            .control_with_options(ControlCommand::Ping, options.control.clone())
917            .await;
918        match ping_reply {
919            Ok(ControlResponse::Ping(reply)) => {
920                result.ping_rtt_ms = Some(ping_start.elapsed().as_millis());
921                result.ping = reply.payload.clone();
922                if !reply.ok() {
923                    if let Some(detail) = reply.detail() {
924                        result.detail = Some(detail.to_string());
925                    }
926                }
927            }
928            Ok(other) => {
929                result.record_error(
930                    ProbeStep::Ping,
931                    format!("unexpected ping response: {:?}", other),
932                );
933                return result;
934            }
935            Err(err) => {
936                result.record_error(ProbeStep::Ping, err.to_string());
937                return result;
938            }
939        }
940
941        let status_start = std::time::Instant::now();
942        let status_reply = self
943            .control_with_options(ControlCommand::Status, options.control.clone())
944            .await;
945        match status_reply {
946            Ok(ControlResponse::Status(reply)) => {
947                result.status_rtt_ms = Some(status_start.elapsed().as_millis());
948                result.status = reply.payload.clone();
949                if result.detail.is_none() {
950                    if let Some(detail) = reply.detail() {
951                        result.detail = Some(detail.to_string());
952                    } else if let Some(payload) = reply.payload.as_ref() {
953                        result.detail = payload.detail.clone();
954                    }
955                }
956            }
957            Ok(other) => {
958                result.record_error(
959                    ProbeStep::Status,
960                    format!("unexpected status response: {:?}", other),
961                );
962            }
963            Err(err) => {
964                result.record_error(ProbeStep::Status, err.to_string());
965            }
966        }
967
968        if result.status.is_none() && options.include_health {
969            let health_start = std::time::Instant::now();
970            let health_reply = self
971                .control_with_options(ControlCommand::Health, options.control.clone())
972                .await;
973            match health_reply {
974                Ok(ControlResponse::Health(reply)) => {
975                    result.health_rtt_ms = Some(health_start.elapsed().as_millis());
976                    result.health = reply.payload.clone();
977                    if result.detail.is_none() {
978                        if let Some(detail) = reply.detail() {
979                            result.detail = Some(detail.to_string());
980                        } else if let Some(payload) = reply.payload.as_ref() {
981                            result.detail = payload.detail.clone();
982                        }
983                    }
984                }
985                Ok(other) => {
986                    result.record_error(
987                        ProbeStep::Health,
988                        format!("unexpected health response: {:?}", other),
989                    );
990                }
991                Err(err) => {
992                    result.record_error(ProbeStep::Health, err.to_string());
993                }
994            }
995        }
996
997        let healthy = result
998            .status
999            .as_ref()
1000            .and_then(|payload| payload.healthy)
1001            .or_else(|| result.health.as_ref().and_then(|payload| payload.healthy));
1002
1003        result.state = if healthy == Some(false) {
1004            ProbeState::Degraded
1005        } else if result.status.is_some() || result.health.is_some() {
1006            ProbeState::Online
1007        } else if result.errors.is_empty() {
1008            ProbeState::Online
1009        } else {
1010            ProbeState::Degraded
1011        };
1012
1013        result
1014    }
1015
1016    async fn control_once(
1017        &self,
1018        command: ControlCommand,
1019    ) -> Result<ControlResponse, AlpineSdkError> {
1020        match command {
1021            ControlCommand::Ping => Ok(ControlResponse::Ping(self.ping().await?)),
1022            ControlCommand::Status => Ok(ControlResponse::Status(self.status().await?)),
1023            ControlCommand::GetStatus => Ok(ControlResponse::Status(self.get_status().await?)),
1024            ControlCommand::Health => Ok(ControlResponse::Health(self.health().await?)),
1025            ControlCommand::Identity => Ok(ControlResponse::Identity(self.identity().await?)),
1026            ControlCommand::Metadata => Ok(ControlResponse::Metadata(self.metadata().await?)),
1027        }
1028    }
1029
1030    async fn control_command<T>(&self, command: &str) -> Result<ControlReply<T>, AlpineSdkError>
1031    where
1032        T: DeserializeOwned,
1033    {
1034        // These helpers use the vendor control op with a string command payload.
1035        let payload = json!({ "command": command });
1036        self.control_request(ControlOp::Vendor, payload).await
1037    }
1038
1039    async fn control_vendor_once(
1040        &self,
1041        command: &str,
1042    ) -> Result<ControlReply<Value>, AlpineSdkError> {
1043        if !is_builtin_vendor_command(command) {
1044            match &self.vendor_registry {
1045                Some(registry) => {
1046                    if !registry.is_allowed(command) {
1047                        return Err(AlpineSdkError::VendorExtensionNotRegistered(
1048                            command.to_string(),
1049                        ));
1050                    }
1051                }
1052                None => {
1053                    return Err(AlpineSdkError::VendorExtensionNotRegistered(
1054                        command.to_string(),
1055                    ));
1056                }
1057            }
1058        }
1059        let payload = json!({ "command": command });
1060        self.control_request(ControlOp::Vendor, payload).await
1061    }
1062
1063    async fn control_request<T>(
1064        &self,
1065        op: ControlOp,
1066        payload: Value,
1067    ) -> Result<ControlReply<T>, AlpineSdkError>
1068    where
1069        T: DeserializeOwned,
1070    {
1071        let transport = SharedTransport::new(self._transport.clone());
1072        let mut channel = ReliableControlChannel::new(transport);
1073        let ack = self.control.send(&mut channel, op, payload).await?;
1074        let parsed = ControlCrypto::decode_ack_payload::<T>(ack.payload.as_deref())
1075            .map_err(AlpineSdkError::from)?;
1076        Ok(ControlReply {
1077            ack,
1078            payload: parsed,
1079        })
1080    }
1081
1082    /// Returns the currently bound local address.
1083    pub fn local_addr(&self) -> SocketAddr {
1084        self.local_addr
1085    }
1086
1087    /// Returns the remote device address.
1088    pub fn remote_addr(&self) -> SocketAddr {
1089        self.remote_addr
1090    }
1091
1092    /// Returns a std UDP socket cloned from the handshake transport (same local port).
1093    pub fn udp_socket(&self) -> Arc<UdpSocket> {
1094        self.udp_socket.clone()
1095    }
1096
1097    /// Returns the active session ID if available.
1098    pub fn session_id(&self) -> Option<String> {
1099        self.session
1100            .established()
1101            .map(|established| established.session_id.clone())
1102    }
1103
1104    /// Returns the device capabilities captured at handshake time.
1105    pub fn device_capabilities(&self) -> Option<CapabilitySet> {
1106        self.session
1107            .established()
1108            .map(|established| established.capabilities.clone())
1109    }
1110
1111    /// Returns the narrowed session capabilities, if configured.
1112    pub fn session_capabilities(&self) -> Option<&CapabilitySet> {
1113        self.session_capabilities.as_ref()
1114    }
1115
1116    /// Returns the effective capabilities (session override if present).
1117    pub fn effective_capabilities(&self) -> Option<CapabilitySet> {
1118        self.session_capabilities
1119            .clone()
1120            .or_else(|| self.device_capabilities())
1121    }
1122
1123    /// Narrows the session capabilities to a subset of the device capabilities.
1124    pub fn narrow_capabilities(&mut self, caps: CapabilitySet) -> Result<(), AlpineSdkError> {
1125        let device_caps = self.device_capabilities().ok_or_else(|| {
1126            AlpineSdkError::InvalidCapabilities("device capabilities missing".into())
1127        })?;
1128        validate_capability_subset(&caps, &device_caps)?;
1129        self.session_capabilities = Some(caps);
1130        Ok(())
1131    }
1132
1133    /// Returns the correlation ID tied to the discovery run, if available.
1134    pub fn run_id(&self) -> Option<&str> {
1135        self.run_id.as_deref()
1136    }
1137
1138    pub fn set_run_id(&mut self, run_id: String) {
1139        self.run_id = Some(run_id);
1140    }
1141
1142    pub fn vendor_extension_registry(&self) -> Option<&VendorExtensionRegistry> {
1143        self.vendor_registry.as_ref()
1144    }
1145
1146    pub fn set_vendor_extension_registry(&mut self, registry: VendorExtensionRegistry) {
1147        self.vendor_registry = Some(registry);
1148    }
1149}
1150
1151#[derive(Debug)]
1152pub struct SessionGuard {
1153    client: AlpineClient,
1154    idle_timeout: Duration,
1155    last_used: Instant,
1156    idle_reason: Option<String>,
1157}
1158
1159impl SessionGuard {
1160    pub fn new(client: AlpineClient, idle_timeout: Duration) -> Self {
1161        Self {
1162            client,
1163            idle_timeout,
1164            last_used: Instant::now(),
1165            idle_reason: None,
1166        }
1167    }
1168
1169    pub fn with_idle_reason(mut self, reason: impl Into<String>) -> Self {
1170        self.idle_reason = Some(reason.into());
1171        self
1172    }
1173
1174    pub fn client(&self) -> &AlpineClient {
1175        &self.client
1176    }
1177
1178    pub fn client_mut(&mut self) -> &mut AlpineClient {
1179        self.touch();
1180        &mut self.client
1181    }
1182
1183    pub fn touch(&mut self) {
1184        self.last_used = Instant::now();
1185    }
1186
1187    pub fn idle_for(&self) -> Duration {
1188        self.last_used.elapsed()
1189    }
1190
1191    pub fn is_expired(&self) -> bool {
1192        self.last_used.elapsed() >= self.idle_timeout
1193    }
1194
1195    pub fn idle_reason(&self) -> Option<&str> {
1196        self.idle_reason.as_deref()
1197    }
1198
1199    pub async fn close(self) {
1200        self.client.close().await;
1201    }
1202
1203    pub fn into_inner(self) -> AlpineClient {
1204        self.client
1205    }
1206}
1207
1208#[derive(Debug, Clone)]
1209pub struct SafeClientOptions {
1210    pub require_online: bool,
1211    pub allow_degraded: bool,
1212    pub require_trusted: bool,
1213    pub probe_options: ProbeOptions,
1214}
1215
1216impl Default for SafeClientOptions {
1217    fn default() -> Self {
1218        Self {
1219            require_online: true,
1220            allow_degraded: false,
1221            require_trusted: false,
1222            probe_options: ProbeOptions::default(),
1223        }
1224    }
1225}
1226
1227impl SafeClientOptions {
1228    pub fn require_trusted(mut self, require: bool) -> Self {
1229        self.require_trusted = require;
1230        self
1231    }
1232}
1233
1234#[derive(Debug)]
1235pub struct SafeClient {
1236    client: AlpineClient,
1237    last_probe: ProbeResult,
1238    last_known_good: Option<ProbeResult>,
1239    trust: DeviceTrustState,
1240    options: SafeClientOptions,
1241}
1242
1243impl SafeClient {
1244    pub async fn from_client(
1245        client: AlpineClient,
1246        trust: DeviceTrustState,
1247        options: SafeClientOptions,
1248    ) -> Result<Self, AlpineSdkError> {
1249        if options.require_trusted && trust != DeviceTrustState::Trusted {
1250            return Err(AlpineSdkError::UntrustedDevice(
1251                "trusted identity required".into(),
1252            ));
1253        }
1254        let mut wrapper = Self {
1255            client,
1256            last_probe: ProbeResult {
1257                state: ProbeState::Offline,
1258                ping: None,
1259                status: None,
1260                health: None,
1261                ping_rtt_ms: None,
1262                status_rtt_ms: None,
1263                health_rtt_ms: None,
1264                detail: None,
1265                errors: Vec::new(),
1266            },
1267            last_known_good: None,
1268            trust,
1269            options,
1270        };
1271        wrapper.refresh_probe().await?;
1272        Ok(wrapper)
1273    }
1274
1275    pub async fn from_discovery(
1276        client: AlpineClient,
1277        outcome: &crate::discovery::DiscoveryOutcome,
1278        options: SafeClientOptions,
1279    ) -> Result<Self, AlpineSdkError> {
1280        let trust = outcome.trust_state();
1281        let mut wrapped = Self::from_client(client, trust, options).await?;
1282        wrapped.client.set_run_id(outcome.run_id.clone());
1283        Ok(wrapped)
1284    }
1285
1286    pub fn client(&self) -> &AlpineClient {
1287        &self.client
1288    }
1289
1290    pub fn client_mut(&mut self) -> &mut AlpineClient {
1291        &mut self.client
1292    }
1293
1294    pub fn last_probe(&self) -> &ProbeResult {
1295        &self.last_probe
1296    }
1297
1298    pub fn last_known_good(&self) -> Option<&ProbeResult> {
1299        self.last_known_good.as_ref()
1300    }
1301
1302    pub fn last_known_good_delta(&self) -> Option<String> {
1303        self.last_known_good
1304            .as_ref()
1305            .and_then(|previous| self.last_probe.delta_summary(previous))
1306    }
1307
1308    pub fn trust_state(&self) -> DeviceTrustState {
1309        self.trust.clone()
1310    }
1311
1312    pub async fn refresh_probe(&mut self) -> Result<&ProbeResult, AlpineSdkError> {
1313        let probe = self
1314            .client
1315            .probe_status_with_options(self.options.probe_options.clone())
1316            .await;
1317        let acceptable = self.acceptable_probe_state(&probe);
1318        let failure_detail = if self.options.require_online && !acceptable {
1319            Some(
1320                probe
1321                    .detail
1322                    .clone()
1323                    .or_else(|| probe.errors.first().map(|err| err.message.clone()))
1324                    .unwrap_or_else(|| format!("probe state {:?}", probe.state)),
1325            )
1326        } else {
1327            None
1328        };
1329        self.last_probe = probe;
1330        if acceptable {
1331            self.last_known_good = Some(self.last_probe.clone());
1332        }
1333        if let Some(detail) = failure_detail {
1334            return Err(AlpineSdkError::ProbeFailed(detail));
1335        }
1336        Ok(&self.last_probe)
1337    }
1338
1339    pub async fn control(
1340        &mut self,
1341        command: ControlCommand,
1342    ) -> Result<ControlResponse, AlpineSdkError> {
1343        if self.options.require_online {
1344            self.refresh_probe().await?;
1345        }
1346        self.client
1347            .control_with_options(command, self.options.probe_options.control.clone())
1348            .await
1349    }
1350
1351    pub async fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
1352        if self.options.require_online {
1353            self.refresh_probe().await?;
1354        }
1355        self.client.start_stream(profile)
1356    }
1357
1358    pub async fn send_frame(
1359        &mut self,
1360        channel_format: ChannelFormat,
1361        channels: Vec<u16>,
1362        priority: u8,
1363        groups: Option<HashMap<String, Vec<u16>>>,
1364        metadata: Option<HashMap<String, Value>>,
1365    ) -> Result<(), AlpineSdkError> {
1366        if self.options.require_online {
1367            self.refresh_probe().await?;
1368        }
1369        self.client
1370            .send_frame(channel_format, channels, priority, groups, metadata)
1371    }
1372
1373    pub async fn close(self) {
1374        self.client.close().await;
1375    }
1376
1377    pub fn into_inner(self) -> AlpineClient {
1378        self.client
1379    }
1380
1381    fn acceptable_probe_state(&self, probe: &ProbeResult) -> bool {
1382        match probe.state {
1383            ProbeState::Online => true,
1384            ProbeState::Degraded => self.options.allow_degraded,
1385            ProbeState::Offline => false,
1386        }
1387    }
1388}
1389
1390#[derive(Clone)]
1391struct SharedTransport<T> {
1392    inner: Arc<Mutex<T>>,
1393}
1394
1395impl<T> SharedTransport<T> {
1396    fn new(inner: Arc<Mutex<T>>) -> Self {
1397        Self { inner }
1398    }
1399}
1400
1401#[async_trait]
1402impl<T> HandshakeTransport for SharedTransport<T>
1403where
1404    T: HandshakeTransport + Send,
1405{
1406    async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
1407        let mut guard = self.inner.lock().await;
1408        guard.send(msg).await
1409    }
1410
1411    async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
1412        let mut guard = self.inner.lock().await;
1413        guard.recv().await
1414    }
1415}
1416
1417#[derive(Debug)]
1418struct InstrumentedTransport<T> {
1419    inner: T,
1420    local_addr: SocketAddr,
1421    remote_addr: SocketAddr,
1422    session_init_sent: bool,
1423}
1424
1425impl<T> InstrumentedTransport<T> {
1426    fn new(inner: T, local_addr: SocketAddr, remote_addr: SocketAddr) -> Self {
1427        Self {
1428            inner,
1429            local_addr,
1430            remote_addr,
1431            session_init_sent: false,
1432        }
1433    }
1434}
1435
1436#[async_trait]
1437impl<T> HandshakeTransport for InstrumentedTransport<T>
1438where
1439    T: HandshakeTransport + Send,
1440{
1441    async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
1442        let category = message_category(&msg);
1443        let phase_state = phase::current_phase();
1444        if matches!(msg, HandshakeMessage::SessionInit(_)) {
1445            if self.session_init_sent {
1446                warn!(
1447                    "[ALPINE][HANDSHAKE][WARN] duplicate SessionInit send attempt detected local_port={} remote_addr={} phase={}",
1448                    self.local_addr.port(),
1449                    self.remote_addr,
1450                    phase_state.label()
1451                );
1452                return Err(HandshakeError::Protocol(
1453                    "duplicate SessionInit blocked; reset handshake first".into(),
1454                ));
1455            }
1456            self.session_init_sent = true;
1457        }
1458        if phase_state == Phase::Handshake && category != MessageCategory::Handshake {
1459            warn!(
1460                "[ALPINE][BUG] non-handshake packet emitted during handshake msg_type={} local_port={} remote_addr={}",
1461                category.as_str(),
1462                self.local_addr.port(),
1463                self.remote_addr
1464            );
1465        }
1466        let encoded_len = serde_cbor::to_vec(&msg).map(|buf| buf.len()).unwrap_or(0);
1467        info!(
1468            "[ALPINE][TX] msg_type={} variant={} local_port={} remote_addr={} phase={} len={}",
1469            category.as_str(),
1470            message_label(&msg),
1471            self.local_addr.port(),
1472            self.remote_addr,
1473            phase_state.label(),
1474            encoded_len
1475        );
1476        self.inner.send(msg).await
1477    }
1478
1479    async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
1480        let phase_state = phase::current_phase();
1481        let result = self.inner.recv().await;
1482        match &result {
1483            Ok(msg) => {
1484                let category = message_category(msg);
1485                info!(
1486                    "[ALPINE][RX] msg_type={} variant={} local_port={} remote_addr={} phase={}",
1487                    category.as_str(),
1488                    message_label(msg),
1489                    self.local_addr.port(),
1490                    self.remote_addr,
1491                    phase_state.label()
1492                );
1493            }
1494            Err(err) => {
1495                warn!(
1496                    "[ALPINE][RX] recv error local_port={} remote_addr={} phase={} error={}",
1497                    self.local_addr.port(),
1498                    self.remote_addr,
1499                    phase_state.label(),
1500                    err
1501                );
1502            }
1503        }
1504        result
1505    }
1506}
1507
1508#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1509#[allow(dead_code)]
1510enum MessageCategory {
1511    Handshake,
1512    Control,
1513    Keepalive,
1514    Ack,
1515    Unknown,
1516}
1517
1518impl MessageCategory {
1519    fn as_str(&self) -> &'static str {
1520        match self {
1521            MessageCategory::Handshake => "Handshake",
1522            MessageCategory::Control => "Control",
1523            MessageCategory::Keepalive => "Keepalive",
1524            MessageCategory::Ack => "Ack",
1525            MessageCategory::Unknown => "Unknown",
1526        }
1527    }
1528}
1529
1530fn message_category(msg: &HandshakeMessage) -> MessageCategory {
1531    match msg {
1532        HandshakeMessage::SessionInit(_)
1533        | HandshakeMessage::SessionAck(_)
1534        | HandshakeMessage::SessionReady(_)
1535        | HandshakeMessage::SessionComplete(_)
1536        | HandshakeMessage::SessionEstablished(_) => MessageCategory::Handshake,
1537        HandshakeMessage::Control(_) => MessageCategory::Control,
1538        HandshakeMessage::Keepalive(_) => MessageCategory::Keepalive,
1539        HandshakeMessage::Ack(_) => MessageCategory::Ack,
1540    }
1541}
1542
1543fn message_label(msg: &HandshakeMessage) -> &'static str {
1544    match msg {
1545        HandshakeMessage::SessionInit(_) => "SessionInit",
1546        HandshakeMessage::SessionAck(_) => "SessionAck",
1547        HandshakeMessage::SessionReady(_) => "SessionReady",
1548        HandshakeMessage::SessionComplete(_) => "SessionComplete",
1549        HandshakeMessage::SessionEstablished(_) => "SessionEstablished",
1550        HandshakeMessage::Control(_) => "Control",
1551        HandshakeMessage::Keepalive(_) => "Keepalive",
1552        HandshakeMessage::Ack(_) => "Ack",
1553    }
1554}
1555
1556fn control_backoff(policy: &ControlRetryPolicy, attempt: usize) -> Duration {
1557    let exponent = attempt.saturating_sub(1) as u32;
1558    let factor = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
1559    let delay = policy.backoff_base_ms.saturating_mul(factor);
1560    Duration::from_millis(delay.min(policy.backoff_max_ms))
1561}
1562
1563fn is_builtin_vendor_command(command: &str) -> bool {
1564    matches!(
1565        command,
1566        "ping" | "status" | "health" | "identity" | "metadata"
1567    )
1568}
1569
1570fn validate_capability_subset(
1571    requested: &CapabilitySet,
1572    device: &CapabilitySet,
1573) -> Result<(), AlpineSdkError> {
1574    for format in requested.channel_formats.iter() {
1575        if !device.channel_formats.contains(format) {
1576            return Err(AlpineSdkError::InvalidCapabilities(format!(
1577                "channel format {:?} not supported by device",
1578                format
1579            )));
1580        }
1581    }
1582    if requested.max_channels > device.max_channels {
1583        return Err(AlpineSdkError::InvalidCapabilities(format!(
1584            "max channels {} exceeds device max {}",
1585            requested.max_channels, device.max_channels
1586        )));
1587    }
1588    if requested.grouping_supported && !device.grouping_supported {
1589        return Err(AlpineSdkError::InvalidCapabilities(
1590            "grouping not supported by device".into(),
1591        ));
1592    }
1593    if requested.streaming_supported && !device.streaming_supported {
1594        return Err(AlpineSdkError::InvalidCapabilities(
1595            "streaming not supported by device".into(),
1596        ));
1597    }
1598    if requested.encryption_supported && !device.encryption_supported {
1599        return Err(AlpineSdkError::InvalidCapabilities(
1600            "encryption not supported by device".into(),
1601        ));
1602    }
1603    if let Some(extensions) = requested.vendor_extensions.as_ref() {
1604        if let Some(device_extensions) = device.vendor_extensions.as_ref() {
1605            for key in extensions.keys() {
1606                if !device_extensions.contains_key(key) {
1607                    return Err(AlpineSdkError::InvalidCapabilities(format!(
1608                        "vendor extension {} not supported by device",
1609                        key
1610                    )));
1611                }
1612            }
1613        } else {
1614            return Err(AlpineSdkError::InvalidCapabilities(
1615                "vendor extensions not supported by device".into(),
1616            ));
1617        }
1618    }
1619    Ok(())
1620}