1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use alpine::control::{ControlClient, ControlCrypto};
7use alpine::crypto::identity::NodeCredentials;
8use alpine::crypto::X25519KeyExchange;
9use alpine::handshake::keepalive;
10use alpine::handshake::transport::{CborUdpTransport, ReliableControlChannel, TimeoutTransport};
11use alpine::handshake::{HandshakeContext, HandshakeError, HandshakeMessage, HandshakeTransport};
12use alpine::messages::{
13 Acknowledge, CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity,
14};
15use alpine::profile::StreamProfile;
16use alpine::session::{AlnpSession, Ed25519Authenticator};
17use alpine::stream::AlnpStream;
18use async_trait::async_trait;
19use serde::de::DeserializeOwned;
20use serde::Deserialize;
21use serde_cbor;
22use serde_json::{json, Value};
23use tokio::sync::Mutex;
24use tokio::task::JoinHandle;
25use tokio::time::{sleep, timeout};
26use tracing::{info, warn};
27use uuid::Uuid;
28
29use crate::transport::UdpFrameTransport;
30use crate::{
31 discovery::DeviceTrustState,
32 environment::ensure_supported_environment,
33 error::AlpineSdkError,
34 phase::{self, Phase},
35 self_check::run_sdk_self_check,
36 vendor::VendorExtensionRegistry,
37};
38use tokio::net::UdpSocket;
39
40type WireTransport = InstrumentedTransport<TimeoutTransport<CborUdpTransport>>;
41
42#[derive(Debug)]
44pub struct AlpineClient {
45 session: AlnpSession,
46 _transport: Arc<Mutex<WireTransport>>,
47 udp_socket: Arc<UdpSocket>,
48 local_addr: SocketAddr,
49 remote_addr: SocketAddr,
50 stream: Option<AlnpStream<UdpFrameTransport>>,
51 control: ControlClient,
52 keepalive_handle: Option<JoinHandle<()>>,
53 session_capabilities: Option<CapabilitySet>,
54 run_id: Option<String>,
55 vendor_registry: Option<VendorExtensionRegistry>,
56}
57
58#[derive(Debug, Clone)]
60pub struct ControlReply<T> {
61 pub ack: Acknowledge,
62 pub payload: Option<T>,
63}
64
65impl<T> ControlReply<T> {
66 pub fn ok(&self) -> bool {
67 self.ack.ok
68 }
69
70 pub fn detail(&self) -> Option<&str> {
71 self.ack.detail.as_deref()
72 }
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum ProbeStep {
77 Ping,
78 Status,
79 Health,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum ProbeState {
84 Online,
85 Degraded,
86 Offline,
87}
88
89#[derive(Debug, Clone)]
90pub struct ProbeError {
91 pub step: ProbeStep,
92 pub message: String,
93}
94
95#[derive(Debug, Clone)]
96pub struct ProbeResult {
97 pub state: ProbeState,
98 pub ping: Option<PingReply>,
99 pub status: Option<StatusReply>,
100 pub health: Option<HealthReply>,
101 pub ping_rtt_ms: Option<u128>,
102 pub status_rtt_ms: Option<u128>,
103 pub health_rtt_ms: Option<u128>,
104 pub detail: Option<String>,
105 pub errors: Vec<ProbeError>,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum UiSeverity {
110 Ok,
111 Warn,
112 Critical,
113}
114
115#[derive(Debug, Clone)]
116pub struct UiBadge {
117 pub label: String,
118 pub severity: UiSeverity,
119 pub color: &'static str,
120}
121
122impl ProbeResult {
123 fn record_error(&mut self, step: ProbeStep, message: String) {
124 self.errors.push(ProbeError { step, message });
125 }
126
127 pub fn to_device_state(&self, trust: DeviceTrustState) -> DeviceState {
128 let healthy = self
129 .status
130 .as_ref()
131 .and_then(|payload| payload.healthy)
132 .or_else(|| self.health.as_ref().and_then(|payload| payload.healthy));
133 let online = matches!(self.state, ProbeState::Online | ProbeState::Degraded);
134 let label = if trust != DeviceTrustState::Trusted {
135 "untrusted".to_string()
136 } else {
137 match self.state {
138 ProbeState::Online => "online".to_string(),
139 ProbeState::Degraded => "degraded".to_string(),
140 ProbeState::Offline => "offline".to_string(),
141 }
142 };
143 let last_error = self.errors.first().map(|err| err.message.clone());
144 DeviceState {
145 state: self.state,
146 online,
147 healthy,
148 trust,
149 last_rtt_ms: self.ping_rtt_ms,
150 detail: self.detail.clone(),
151 last_error,
152 label,
153 }
154 }
155
156 pub fn delta_summary(&self, previous: &ProbeResult) -> Option<String> {
157 if self.state != previous.state {
158 return Some(format!(
159 "state changed from {:?} to {:?}",
160 previous.state, self.state
161 ));
162 }
163 if self.detail != previous.detail {
164 if let Some(detail) = &self.detail {
165 return Some(format!("detail changed to '{}'", detail));
166 }
167 return Some("detail cleared".to_string());
168 }
169 None
170 }
171
172 pub fn to_ui_badge(&self, trust: DeviceTrustState) -> UiBadge {
173 if trust != DeviceTrustState::Trusted {
174 return UiBadge {
175 label: "untrusted".to_string(),
176 severity: UiSeverity::Warn,
177 color: "yellow",
178 };
179 }
180 match self.state {
181 ProbeState::Online => UiBadge {
182 label: "online".to_string(),
183 severity: UiSeverity::Ok,
184 color: "green",
185 },
186 ProbeState::Degraded => UiBadge {
187 label: "degraded".to_string(),
188 severity: UiSeverity::Warn,
189 color: "orange",
190 },
191 ProbeState::Offline => UiBadge {
192 label: "offline".to_string(),
193 severity: UiSeverity::Critical,
194 color: "red",
195 },
196 }
197 }
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub enum ControlCommand {
202 Ping,
203 Status,
204 GetStatus,
205 Health,
206 Identity,
207 Metadata,
208}
209
210impl ControlCommand {
211 pub fn requires_trust(&self) -> bool {
213 matches!(self, ControlCommand::Identity | ControlCommand::Metadata)
214 }
215
216 pub fn is_observe_safe(&self) -> bool {
217 matches!(
218 self,
219 ControlCommand::Ping
220 | ControlCommand::Status
221 | ControlCommand::GetStatus
222 | ControlCommand::Health
223 )
224 }
225}
226
227#[derive(Debug)]
228pub struct StatusMismatchError {
229 pub detail: String,
230}
231
232impl std::fmt::Display for StatusMismatchError {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 write!(f, "{}", self.detail)
235 }
236}
237
238impl std::error::Error for StatusMismatchError {}
239
240#[derive(Debug, Clone)]
241pub enum DangerousControlCommand {
242 FirmwareUpdate,
243 Reboot,
244 IdentityReset,
245 Vendor(String),
246}
247
248impl DangerousControlCommand {
249 fn as_command(&self) -> &str {
250 match self {
251 DangerousControlCommand::FirmwareUpdate => "firmware_update",
252 DangerousControlCommand::Reboot => "reboot",
253 DangerousControlCommand::IdentityReset => "identity_reset",
254 DangerousControlCommand::Vendor(name) => name.as_str(),
255 }
256 }
257}
258
259#[derive(Debug, Clone)]
260pub enum ControlResponse {
261 Ping(ControlReply<PingReply>),
262 Status(ControlReply<StatusReply>),
263 Health(ControlReply<HealthReply>),
264 Identity(ControlReply<IdentityReply>),
265 Metadata(ControlReply<MetadataReply>),
266}
267
268#[derive(Debug, Clone)]
269pub struct ControlDryRun {
270 pub command: String,
271 pub allowed: bool,
272 pub warnings: Vec<String>,
273 pub reason: String,
274 pub run_id: Option<String>,
275 pub remote_addr: SocketAddr,
276}
277
278#[derive(Debug, Clone)]
279pub struct ControlRetryPolicy {
280 pub max_attempts: usize,
281 pub backoff_base_ms: u64,
282 pub backoff_max_ms: u64,
283}
284
285impl Default for ControlRetryPolicy {
286 fn default() -> Self {
287 Self {
288 max_attempts: 3,
289 backoff_base_ms: 150,
290 backoff_max_ms: 2_000,
291 }
292 }
293}
294
295#[derive(Debug, Clone)]
296pub struct ControlOptions {
297 pub timeout: Option<Duration>,
298 pub retry: Option<ControlRetryPolicy>,
299 pub allow_dangerous: bool,
300}
301
302impl Default for ControlOptions {
303 fn default() -> Self {
304 Self {
305 timeout: None,
306 retry: None,
307 allow_dangerous: false,
308 }
309 }
310}
311
312impl ControlOptions {
313 pub fn allow_dangerous(mut self, allow: bool) -> Self {
314 self.allow_dangerous = allow;
315 self
316 }
317
318 pub fn defaults_for_ui() -> Self {
319 Self {
320 timeout: Some(Duration::from_millis(500)),
321 retry: Some(ControlRetryPolicy {
322 max_attempts: 2,
323 backoff_base_ms: 150,
324 backoff_max_ms: 300,
325 }),
326 allow_dangerous: false,
327 }
328 }
329}
330
331#[derive(Debug, Clone)]
332pub struct ProbeOptions {
333 pub include_health: bool,
334 pub control: ControlOptions,
335}
336
337#[derive(Debug, Clone)]
338pub struct DeviceState {
339 pub state: ProbeState,
340 pub online: bool,
341 pub healthy: Option<bool>,
342 pub trust: DeviceTrustState,
343 pub last_rtt_ms: Option<u128>,
344 pub detail: Option<String>,
345 pub last_error: Option<String>,
346 pub label: String,
347}
348
349impl Default for ProbeOptions {
350 fn default() -> Self {
351 Self {
352 include_health: true,
353 control: ControlOptions::default(),
354 }
355 }
356}
357
358#[derive(Debug, Deserialize, Clone)]
360pub struct PingReply {
361 #[serde(default)]
362 pub timestamp_ms: Option<u64>,
363 #[serde(default)]
364 pub message: Option<String>,
365}
366
367#[derive(Debug, Deserialize, Clone)]
369pub struct StatusReply {
370 #[serde(default)]
371 pub healthy: Option<bool>,
372 #[serde(default)]
373 pub detail: Option<String>,
374 #[serde(default)]
375 pub uptime_secs: Option<u64>,
376}
377
378#[derive(Debug, Deserialize, Clone)]
380pub struct HealthReply {
381 #[serde(default)]
382 pub healthy: Option<bool>,
383 #[serde(default)]
384 pub detail: Option<String>,
385 #[serde(default)]
386 pub metrics: Option<HashMap<String, Value>>,
387}
388
389pub type IdentityReply = DeviceIdentity;
391
392#[derive(Debug, Deserialize, Clone)]
394pub struct MetadataReply {
395 #[serde(default)]
396 pub metadata: HashMap<String, Value>,
397}
398
399impl AlpineClient {
400 pub async fn connect(
402 local_addr: SocketAddr,
403 remote_addr: SocketAddr,
404 identity: DeviceIdentity,
405 capabilities: CapabilitySet,
406 credentials: NodeCredentials,
407 ) -> Result<Self, AlpineSdkError> {
408 Self::connect_with_context(
409 local_addr,
410 remote_addr,
411 identity,
412 capabilities,
413 credentials,
414 HandshakeContext::default(),
415 )
416 .await
417 }
418
419 pub async fn connect_checked(
420 outcome: crate::discovery::DiscoveryOutcome,
421 requested: CapabilitySet,
422 credentials: NodeCredentials,
423 ) -> Result<Self, AlpineSdkError> {
424 outcome.require_capabilities(&requested)?;
425 let identity = DeviceIdentity {
426 device_id: outcome.reply.device_id.clone(),
427 manufacturer_id: outcome.reply.manufacturer_id.clone(),
428 model_id: outcome.reply.model_id.clone(),
429 hardware_rev: outcome.reply.hardware_rev.clone(),
430 firmware_rev: outcome.reply.firmware_rev.clone(),
431 };
432 Self::connect(
433 outcome.local_addr,
434 outcome.peer,
435 identity,
436 requested,
437 credentials,
438 )
439 .await
440 }
441
442 pub async fn connect_with_nonce(
443 local_addr: SocketAddr,
444 remote_addr: SocketAddr,
445 identity: DeviceIdentity,
446 capabilities: CapabilitySet,
447 credentials: NodeCredentials,
448 client_nonce: Vec<u8>,
449 ) -> Result<Self, AlpineSdkError> {
450 Self::connect_with_context(
451 local_addr,
452 remote_addr,
453 identity,
454 capabilities,
455 credentials,
456 HandshakeContext::default().with_client_nonce(client_nonce),
457 )
458 .await
459 }
460
461 pub async fn connect_with_context_and_nonce(
462 local_addr: SocketAddr,
463 remote_addr: SocketAddr,
464 identity: DeviceIdentity,
465 capabilities: CapabilitySet,
466 credentials: NodeCredentials,
467 client_nonce: Vec<u8>,
468 context: HandshakeContext,
469 ) -> Result<Self, AlpineSdkError> {
470 Self::connect_with_context(
471 local_addr,
472 remote_addr,
473 identity,
474 capabilities,
475 credentials,
476 context.with_client_nonce(client_nonce),
477 )
478 .await
479 }
480
481 pub async fn connect_with_socket_and_nonce(
482 socket: UdpSocket,
483 remote_addr: SocketAddr,
484 identity: DeviceIdentity,
485 capabilities: CapabilitySet,
486 credentials: NodeCredentials,
487 client_nonce: Vec<u8>,
488 context: HandshakeContext,
489 ) -> Result<Self, AlpineSdkError> {
490 Self::connect_with_socket(
491 socket,
492 remote_addr,
493 identity,
494 capabilities,
495 credentials,
496 context.with_client_nonce(client_nonce),
497 )
498 .await
499 }
500
501 async fn connect_with_context(
502 local_addr: SocketAddr,
503 remote_addr: SocketAddr,
504 identity: DeviceIdentity,
505 capabilities: CapabilitySet,
506 credentials: NodeCredentials,
507 context: HandshakeContext,
508 ) -> Result<Self, AlpineSdkError> {
509 run_sdk_self_check();
510 ensure_supported_environment()?;
511 let _handshake_phase = phase::claim_handshake()?;
512 let base_transport =
513 CborUdpTransport::bind(local_addr, remote_addr, 4096, context.debug_cbor).await?;
514 Self::connect_with_transport(base_transport, identity, capabilities, credentials, context)
515 .await
516 }
517
518 async fn connect_with_socket(
519 socket: UdpSocket,
520 remote_addr: SocketAddr,
521 identity: DeviceIdentity,
522 capabilities: CapabilitySet,
523 credentials: NodeCredentials,
524 context: HandshakeContext,
525 ) -> Result<Self, AlpineSdkError> {
526 let base_transport =
527 CborUdpTransport::from_socket(socket, remote_addr, 4096, context.debug_cbor)
528 .map_err(AlpineSdkError::from)?;
529 Self::connect_with_transport(base_transport, identity, capabilities, credentials, context)
530 .await
531 }
532
533 async fn connect_with_transport(
534 base_transport: CborUdpTransport,
535 identity: DeviceIdentity,
536 capabilities: CapabilitySet,
537 credentials: NodeCredentials,
538 context: HandshakeContext,
539 ) -> Result<Self, AlpineSdkError> {
540 let udp_socket = base_transport.socket();
541 let bound_local_addr = base_transport.local_addr();
542 let peer_addr = base_transport.peer_addr();
543 info!(
544 "[ALPINE][HANDSHAKE] starting handshake for device {} local_addr={} remote_addr={} phase={}",
545 identity.device_id,
546 bound_local_addr,
547 peer_addr,
548 phase::current_phase().label()
549 );
550 if bound_local_addr.is_ipv4() != peer_addr.is_ipv4() {
551 warn!(
552 "[ALPINE][HANDSHAKE][WARN] IPv4/IPv6 mismatch local_addr={} remote_addr={}",
553 bound_local_addr, peer_addr
554 );
555 }
556 let mut transport = InstrumentedTransport::new(
557 TimeoutTransport::new(base_transport, context.recv_timeout),
558 bound_local_addr,
559 peer_addr,
560 );
561 info!(
562 "[ALPINE][HANDSHAKE] transport ready recv_timeout_ms={}",
563 context.recv_timeout.as_millis()
564 );
565 let session = AlnpSession::connect(
566 identity,
567 capabilities.clone(),
568 Ed25519Authenticator::new(credentials.clone()),
569 X25519KeyExchange::new(),
570 context,
571 &mut transport,
572 )
573 .await?;
574
575 let transport = Arc::new(Mutex::new(transport));
576 let established = session
577 .established()
578 .ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?
579 .clone();
580 let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
581 transport.clone(),
582 Duration::from_secs(5),
583 established.session_id.clone(),
584 ));
585 let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
586 .unwrap_or_else(|_| Uuid::new_v4());
587 let control_crypto = ControlCrypto::new(
588 session
589 .keys()
590 .ok_or_else(|| AlpineSdkError::Io("session keys missing".into()))?,
591 );
592 let control =
593 ControlClient::new(device_uuid, established.session_id.clone(), control_crypto);
594 info!(
595 "[ALPINE][HANDSHAKE] session established session_id={} local_addr={} remote_addr={}",
596 established.session_id, bound_local_addr, peer_addr
597 );
598
599 Ok(Self {
600 session,
601 _transport: transport,
602 udp_socket,
603 local_addr: bound_local_addr,
604 remote_addr: peer_addr,
605 stream: None,
606 control,
607 keepalive_handle: Some(keepalive_handle),
608 session_capabilities: None,
609 run_id: None,
610 vendor_registry: None,
611 })
612 }
613
614 pub fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
616 let compiled = profile
617 .compile()
618 .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
619 self.session
620 .set_stream_profile(compiled.clone())
621 .map_err(|err| AlpineSdkError::HandshakeFailed(err.to_string()))?;
622 self.session.mark_streaming();
623
624 let stream_local = SocketAddr::new(self.local_addr.ip(), 0);
626 let stream_socket =
627 UdpFrameTransport::new(stream_local, self.remote_addr).map_err(AlpineSdkError::from)?;
628 let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
629 self.stream = Some(stream);
630 Ok(compiled.config_id().to_string())
631 }
632
633 pub fn send_frame(
635 &self,
636 channel_format: ChannelFormat,
637 channels: Vec<u16>,
638 priority: u8,
639 groups: Option<HashMap<String, Vec<u16>>>,
640 metadata: Option<HashMap<String, Value>>,
641 ) -> Result<(), AlpineSdkError> {
642 let stream = self
643 .stream
644 .as_ref()
645 .ok_or_else(|| AlpineSdkError::Io("stream not started".into()))?;
646 stream
647 .send(channel_format, channels, priority, groups, metadata)
648 .map_err(AlpineSdkError::from)
649 }
650
651 pub async fn close(mut self) {
653 self.session.close();
654 if let Some(handle) = self.keepalive_handle.take() {
655 handle.abort();
656 }
657 }
658
659 pub fn control_envelope(
661 &self,
662 seq: u64,
663 op: ControlOp,
664 payload: Value,
665 ) -> Result<ControlEnvelope, HandshakeError> {
666 self.control.envelope(seq, op, payload)
667 }
668
669 pub async fn ping(&self) -> Result<ControlReply<PingReply>, AlpineSdkError> {
671 self.control_command("ping").await
672 }
673
674 pub async fn status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
676 self.control_command("status").await
677 }
678
679 pub async fn status_standard(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
681 self.control_request(ControlOp::GetStatus, json!({})).await
682 }
683
684 pub async fn get_status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
686 let reply = self.status_standard().await?;
687 if !reply.ack.ok {
688 let detail = reply
689 .detail()
690 .unwrap_or("device rejected standard get_status")
691 .to_string();
692 let mismatch = StatusMismatchError {
693 detail: format!(
694 "standard get_status rejected; vendor status may be required ({})",
695 detail
696 ),
697 };
698 return Err(AlpineSdkError::StatusMismatch(mismatch.to_string()));
699 }
700 Ok(reply)
701 }
702
703 pub async fn status_vendor(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
705 self.status().await
706 }
707
708 pub async fn health(&self) -> Result<ControlReply<HealthReply>, AlpineSdkError> {
710 self.control_command("health").await
711 }
712
713 pub async fn identity(&self) -> Result<ControlReply<IdentityReply>, AlpineSdkError> {
715 self.control_command("identity").await
716 }
717
718 pub async fn metadata(&self) -> Result<ControlReply<MetadataReply>, AlpineSdkError> {
720 self.control_command("metadata").await
721 }
722
723 pub async fn control(
725 &self,
726 command: ControlCommand,
727 ) -> Result<ControlResponse, AlpineSdkError> {
728 self.control_with_options(command, ControlOptions::default())
729 .await
730 }
731
732 pub fn control_dry_run(&self, command: ControlCommand) -> ControlDryRun {
733 let mut warnings = Vec::new();
734 if command.requires_trust() {
735 warnings.push("requires trusted device identity".to_string());
736 }
737 ControlDryRun {
738 command: format!("{:?}", command),
739 allowed: true,
740 warnings,
741 reason: "no network calls made".to_string(),
742 run_id: self.run_id.clone(),
743 remote_addr: self.remote_addr,
744 }
745 }
746
747 pub async fn control_with_options(
749 &self,
750 command: ControlCommand,
751 options: ControlOptions,
752 ) -> Result<ControlResponse, AlpineSdkError> {
753 let attempts = options
754 .retry
755 .as_ref()
756 .map(|policy| policy.max_attempts.max(1))
757 .unwrap_or(1);
758 let mut attempt = 0usize;
759 loop {
760 attempt = attempt.saturating_add(1);
761 let run = async { self.control_once(command).await };
762 let result = if let Some(timeout_duration) = options.timeout {
763 match timeout(timeout_duration, run).await {
764 Ok(result) => result,
765 Err(_) => Err(AlpineSdkError::Timeout),
766 }
767 } else {
768 run.await
769 };
770
771 match result {
772 Ok(reply) => {
773 if attempt > 1 {
774 warn!(
775 "[ALPINE][CONTROL][WARN] command={:?} succeeded after {} retries",
776 command,
777 attempt - 1
778 );
779 }
780 return Ok(reply);
781 }
782 Err(err) => {
783 if attempt >= attempts {
784 return Err(err);
785 }
786 if let Some(policy) = options.retry.as_ref() {
787 sleep(control_backoff(policy, attempt)).await;
788 }
789 }
790 }
791 }
792 }
793
794 pub async fn control_dangerous(
796 &self,
797 command: DangerousControlCommand,
798 ) -> Result<ControlReply<Value>, AlpineSdkError> {
799 self.control_dangerous_with_options(command, ControlOptions::default())
800 .await
801 }
802
803 pub fn control_dangerous_dry_run(
804 &self,
805 command: DangerousControlCommand,
806 options: ControlOptions,
807 ) -> ControlDryRun {
808 let mut warnings = Vec::new();
809 let mut allowed = true;
810 let mut reason = "no network calls made".to_string();
811 if !options.allow_dangerous {
812 allowed = false;
813 reason = "dangerous control blocked; allow_dangerous not set".to_string();
814 }
815 let command_label = format!("{:?}", command);
816 if let DangerousControlCommand::Vendor(name) = &command {
817 if !is_builtin_vendor_command(name) {
818 match &self.vendor_registry {
819 Some(registry) => {
820 if !registry.is_allowed(name) {
821 allowed = false;
822 reason = "vendor extension not registered".to_string();
823 }
824 }
825 None => {
826 allowed = false;
827 reason = "vendor extension registry not configured".to_string();
828 }
829 }
830 }
831 } else {
832 warnings.push("dangerous command requires explicit allow_dangerous".to_string());
833 }
834 ControlDryRun {
835 command: command_label,
836 allowed,
837 warnings,
838 reason,
839 run_id: self.run_id.clone(),
840 remote_addr: self.remote_addr,
841 }
842 }
843
844 pub async fn control_dangerous_with_options(
846 &self,
847 command: DangerousControlCommand,
848 options: ControlOptions,
849 ) -> Result<ControlReply<Value>, AlpineSdkError> {
850 if !options.allow_dangerous {
851 return Err(AlpineSdkError::DangerousControlDisallowed);
852 }
853 let attempts = options
854 .retry
855 .as_ref()
856 .map(|policy| policy.max_attempts.max(1))
857 .unwrap_or(1);
858 let mut attempt = 0usize;
859 loop {
860 attempt = attempt.saturating_add(1);
861 let run = async { self.control_vendor_once(command.as_command()).await };
862 let result = if let Some(timeout_duration) = options.timeout {
863 match timeout(timeout_duration, run).await {
864 Ok(result) => result,
865 Err(_) => Err(AlpineSdkError::Timeout),
866 }
867 } else {
868 run.await
869 };
870
871 match result {
872 Ok(reply) => {
873 if attempt > 1 {
874 warn!(
875 "[ALPINE][CONTROL][WARN] dangerous_command={:?} succeeded after {} retries",
876 command,
877 attempt - 1
878 );
879 }
880 return Ok(reply);
881 }
882 Err(err) => {
883 if attempt >= attempts {
884 return Err(err);
885 }
886 if let Some(policy) = options.retry.as_ref() {
887 sleep(control_backoff(policy, attempt)).await;
888 }
889 }
890 }
891 }
892 }
893
894 pub async fn probe_status(&self) -> ProbeResult {
896 self.probe_status_with_options(ProbeOptions::default())
897 .await
898 }
899
900 pub async fn probe_status_with_options(&self, options: ProbeOptions) -> ProbeResult {
902 let mut result = ProbeResult {
903 state: ProbeState::Offline,
904 ping: None,
905 status: None,
906 health: None,
907 ping_rtt_ms: None,
908 status_rtt_ms: None,
909 health_rtt_ms: None,
910 detail: None,
911 errors: Vec::new(),
912 };
913
914 let ping_start = std::time::Instant::now();
915 let ping_reply = self
916 .control_with_options(ControlCommand::Ping, options.control.clone())
917 .await;
918 match ping_reply {
919 Ok(ControlResponse::Ping(reply)) => {
920 result.ping_rtt_ms = Some(ping_start.elapsed().as_millis());
921 result.ping = reply.payload.clone();
922 if !reply.ok() {
923 if let Some(detail) = reply.detail() {
924 result.detail = Some(detail.to_string());
925 }
926 }
927 }
928 Ok(other) => {
929 result.record_error(
930 ProbeStep::Ping,
931 format!("unexpected ping response: {:?}", other),
932 );
933 return result;
934 }
935 Err(err) => {
936 result.record_error(ProbeStep::Ping, err.to_string());
937 return result;
938 }
939 }
940
941 let status_start = std::time::Instant::now();
942 let status_reply = self
943 .control_with_options(ControlCommand::Status, options.control.clone())
944 .await;
945 match status_reply {
946 Ok(ControlResponse::Status(reply)) => {
947 result.status_rtt_ms = Some(status_start.elapsed().as_millis());
948 result.status = reply.payload.clone();
949 if result.detail.is_none() {
950 if let Some(detail) = reply.detail() {
951 result.detail = Some(detail.to_string());
952 } else if let Some(payload) = reply.payload.as_ref() {
953 result.detail = payload.detail.clone();
954 }
955 }
956 }
957 Ok(other) => {
958 result.record_error(
959 ProbeStep::Status,
960 format!("unexpected status response: {:?}", other),
961 );
962 }
963 Err(err) => {
964 result.record_error(ProbeStep::Status, err.to_string());
965 }
966 }
967
968 if result.status.is_none() && options.include_health {
969 let health_start = std::time::Instant::now();
970 let health_reply = self
971 .control_with_options(ControlCommand::Health, options.control.clone())
972 .await;
973 match health_reply {
974 Ok(ControlResponse::Health(reply)) => {
975 result.health_rtt_ms = Some(health_start.elapsed().as_millis());
976 result.health = reply.payload.clone();
977 if result.detail.is_none() {
978 if let Some(detail) = reply.detail() {
979 result.detail = Some(detail.to_string());
980 } else if let Some(payload) = reply.payload.as_ref() {
981 result.detail = payload.detail.clone();
982 }
983 }
984 }
985 Ok(other) => {
986 result.record_error(
987 ProbeStep::Health,
988 format!("unexpected health response: {:?}", other),
989 );
990 }
991 Err(err) => {
992 result.record_error(ProbeStep::Health, err.to_string());
993 }
994 }
995 }
996
997 let healthy = result
998 .status
999 .as_ref()
1000 .and_then(|payload| payload.healthy)
1001 .or_else(|| result.health.as_ref().and_then(|payload| payload.healthy));
1002
1003 result.state = if healthy == Some(false) {
1004 ProbeState::Degraded
1005 } else if result.status.is_some() || result.health.is_some() {
1006 ProbeState::Online
1007 } else if result.errors.is_empty() {
1008 ProbeState::Online
1009 } else {
1010 ProbeState::Degraded
1011 };
1012
1013 result
1014 }
1015
1016 async fn control_once(
1017 &self,
1018 command: ControlCommand,
1019 ) -> Result<ControlResponse, AlpineSdkError> {
1020 match command {
1021 ControlCommand::Ping => Ok(ControlResponse::Ping(self.ping().await?)),
1022 ControlCommand::Status => Ok(ControlResponse::Status(self.status().await?)),
1023 ControlCommand::GetStatus => Ok(ControlResponse::Status(self.get_status().await?)),
1024 ControlCommand::Health => Ok(ControlResponse::Health(self.health().await?)),
1025 ControlCommand::Identity => Ok(ControlResponse::Identity(self.identity().await?)),
1026 ControlCommand::Metadata => Ok(ControlResponse::Metadata(self.metadata().await?)),
1027 }
1028 }
1029
1030 async fn control_command<T>(&self, command: &str) -> Result<ControlReply<T>, AlpineSdkError>
1031 where
1032 T: DeserializeOwned,
1033 {
1034 let payload = json!({ "command": command });
1036 self.control_request(ControlOp::Vendor, payload).await
1037 }
1038
1039 async fn control_vendor_once(
1040 &self,
1041 command: &str,
1042 ) -> Result<ControlReply<Value>, AlpineSdkError> {
1043 if !is_builtin_vendor_command(command) {
1044 match &self.vendor_registry {
1045 Some(registry) => {
1046 if !registry.is_allowed(command) {
1047 return Err(AlpineSdkError::VendorExtensionNotRegistered(
1048 command.to_string(),
1049 ));
1050 }
1051 }
1052 None => {
1053 return Err(AlpineSdkError::VendorExtensionNotRegistered(
1054 command.to_string(),
1055 ));
1056 }
1057 }
1058 }
1059 let payload = json!({ "command": command });
1060 self.control_request(ControlOp::Vendor, payload).await
1061 }
1062
1063 async fn control_request<T>(
1064 &self,
1065 op: ControlOp,
1066 payload: Value,
1067 ) -> Result<ControlReply<T>, AlpineSdkError>
1068 where
1069 T: DeserializeOwned,
1070 {
1071 let transport = SharedTransport::new(self._transport.clone());
1072 let mut channel = ReliableControlChannel::new(transport);
1073 let ack = self.control.send(&mut channel, op, payload).await?;
1074 let parsed = ControlCrypto::decode_ack_payload::<T>(ack.payload.as_deref())
1075 .map_err(AlpineSdkError::from)?;
1076 Ok(ControlReply {
1077 ack,
1078 payload: parsed,
1079 })
1080 }
1081
1082 pub fn local_addr(&self) -> SocketAddr {
1084 self.local_addr
1085 }
1086
1087 pub fn remote_addr(&self) -> SocketAddr {
1089 self.remote_addr
1090 }
1091
1092 pub fn udp_socket(&self) -> Arc<UdpSocket> {
1094 self.udp_socket.clone()
1095 }
1096
1097 pub fn session_id(&self) -> Option<String> {
1099 self.session
1100 .established()
1101 .map(|established| established.session_id.clone())
1102 }
1103
1104 pub fn device_capabilities(&self) -> Option<CapabilitySet> {
1106 self.session
1107 .established()
1108 .map(|established| established.capabilities.clone())
1109 }
1110
1111 pub fn session_capabilities(&self) -> Option<&CapabilitySet> {
1113 self.session_capabilities.as_ref()
1114 }
1115
1116 pub fn effective_capabilities(&self) -> Option<CapabilitySet> {
1118 self.session_capabilities
1119 .clone()
1120 .or_else(|| self.device_capabilities())
1121 }
1122
1123 pub fn narrow_capabilities(&mut self, caps: CapabilitySet) -> Result<(), AlpineSdkError> {
1125 let device_caps = self.device_capabilities().ok_or_else(|| {
1126 AlpineSdkError::InvalidCapabilities("device capabilities missing".into())
1127 })?;
1128 validate_capability_subset(&caps, &device_caps)?;
1129 self.session_capabilities = Some(caps);
1130 Ok(())
1131 }
1132
1133 pub fn run_id(&self) -> Option<&str> {
1135 self.run_id.as_deref()
1136 }
1137
1138 pub fn set_run_id(&mut self, run_id: String) {
1139 self.run_id = Some(run_id);
1140 }
1141
1142 pub fn vendor_extension_registry(&self) -> Option<&VendorExtensionRegistry> {
1143 self.vendor_registry.as_ref()
1144 }
1145
1146 pub fn set_vendor_extension_registry(&mut self, registry: VendorExtensionRegistry) {
1147 self.vendor_registry = Some(registry);
1148 }
1149}
1150
1151#[derive(Debug)]
1152pub struct SessionGuard {
1153 client: AlpineClient,
1154 idle_timeout: Duration,
1155 last_used: Instant,
1156 idle_reason: Option<String>,
1157}
1158
1159impl SessionGuard {
1160 pub fn new(client: AlpineClient, idle_timeout: Duration) -> Self {
1161 Self {
1162 client,
1163 idle_timeout,
1164 last_used: Instant::now(),
1165 idle_reason: None,
1166 }
1167 }
1168
1169 pub fn with_idle_reason(mut self, reason: impl Into<String>) -> Self {
1170 self.idle_reason = Some(reason.into());
1171 self
1172 }
1173
1174 pub fn client(&self) -> &AlpineClient {
1175 &self.client
1176 }
1177
1178 pub fn client_mut(&mut self) -> &mut AlpineClient {
1179 self.touch();
1180 &mut self.client
1181 }
1182
1183 pub fn touch(&mut self) {
1184 self.last_used = Instant::now();
1185 }
1186
1187 pub fn idle_for(&self) -> Duration {
1188 self.last_used.elapsed()
1189 }
1190
1191 pub fn is_expired(&self) -> bool {
1192 self.last_used.elapsed() >= self.idle_timeout
1193 }
1194
1195 pub fn idle_reason(&self) -> Option<&str> {
1196 self.idle_reason.as_deref()
1197 }
1198
1199 pub async fn close(self) {
1200 self.client.close().await;
1201 }
1202
1203 pub fn into_inner(self) -> AlpineClient {
1204 self.client
1205 }
1206}
1207
1208#[derive(Debug, Clone)]
1209pub struct SafeClientOptions {
1210 pub require_online: bool,
1211 pub allow_degraded: bool,
1212 pub require_trusted: bool,
1213 pub probe_options: ProbeOptions,
1214}
1215
1216impl Default for SafeClientOptions {
1217 fn default() -> Self {
1218 Self {
1219 require_online: true,
1220 allow_degraded: false,
1221 require_trusted: false,
1222 probe_options: ProbeOptions::default(),
1223 }
1224 }
1225}
1226
1227impl SafeClientOptions {
1228 pub fn require_trusted(mut self, require: bool) -> Self {
1229 self.require_trusted = require;
1230 self
1231 }
1232}
1233
1234#[derive(Debug)]
1235pub struct SafeClient {
1236 client: AlpineClient,
1237 last_probe: ProbeResult,
1238 last_known_good: Option<ProbeResult>,
1239 trust: DeviceTrustState,
1240 options: SafeClientOptions,
1241}
1242
1243impl SafeClient {
1244 pub async fn from_client(
1245 client: AlpineClient,
1246 trust: DeviceTrustState,
1247 options: SafeClientOptions,
1248 ) -> Result<Self, AlpineSdkError> {
1249 if options.require_trusted && trust != DeviceTrustState::Trusted {
1250 return Err(AlpineSdkError::UntrustedDevice(
1251 "trusted identity required".into(),
1252 ));
1253 }
1254 let mut wrapper = Self {
1255 client,
1256 last_probe: ProbeResult {
1257 state: ProbeState::Offline,
1258 ping: None,
1259 status: None,
1260 health: None,
1261 ping_rtt_ms: None,
1262 status_rtt_ms: None,
1263 health_rtt_ms: None,
1264 detail: None,
1265 errors: Vec::new(),
1266 },
1267 last_known_good: None,
1268 trust,
1269 options,
1270 };
1271 wrapper.refresh_probe().await?;
1272 Ok(wrapper)
1273 }
1274
1275 pub async fn from_discovery(
1276 client: AlpineClient,
1277 outcome: &crate::discovery::DiscoveryOutcome,
1278 options: SafeClientOptions,
1279 ) -> Result<Self, AlpineSdkError> {
1280 let trust = outcome.trust_state();
1281 let mut wrapped = Self::from_client(client, trust, options).await?;
1282 wrapped.client.set_run_id(outcome.run_id.clone());
1283 Ok(wrapped)
1284 }
1285
1286 pub fn client(&self) -> &AlpineClient {
1287 &self.client
1288 }
1289
1290 pub fn client_mut(&mut self) -> &mut AlpineClient {
1291 &mut self.client
1292 }
1293
1294 pub fn last_probe(&self) -> &ProbeResult {
1295 &self.last_probe
1296 }
1297
1298 pub fn last_known_good(&self) -> Option<&ProbeResult> {
1299 self.last_known_good.as_ref()
1300 }
1301
1302 pub fn last_known_good_delta(&self) -> Option<String> {
1303 self.last_known_good
1304 .as_ref()
1305 .and_then(|previous| self.last_probe.delta_summary(previous))
1306 }
1307
1308 pub fn trust_state(&self) -> DeviceTrustState {
1309 self.trust.clone()
1310 }
1311
1312 pub async fn refresh_probe(&mut self) -> Result<&ProbeResult, AlpineSdkError> {
1313 let probe = self
1314 .client
1315 .probe_status_with_options(self.options.probe_options.clone())
1316 .await;
1317 let acceptable = self.acceptable_probe_state(&probe);
1318 let failure_detail = if self.options.require_online && !acceptable {
1319 Some(
1320 probe
1321 .detail
1322 .clone()
1323 .or_else(|| probe.errors.first().map(|err| err.message.clone()))
1324 .unwrap_or_else(|| format!("probe state {:?}", probe.state)),
1325 )
1326 } else {
1327 None
1328 };
1329 self.last_probe = probe;
1330 if acceptable {
1331 self.last_known_good = Some(self.last_probe.clone());
1332 }
1333 if let Some(detail) = failure_detail {
1334 return Err(AlpineSdkError::ProbeFailed(detail));
1335 }
1336 Ok(&self.last_probe)
1337 }
1338
1339 pub async fn control(
1340 &mut self,
1341 command: ControlCommand,
1342 ) -> Result<ControlResponse, AlpineSdkError> {
1343 if self.options.require_online {
1344 self.refresh_probe().await?;
1345 }
1346 self.client
1347 .control_with_options(command, self.options.probe_options.control.clone())
1348 .await
1349 }
1350
1351 pub async fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
1352 if self.options.require_online {
1353 self.refresh_probe().await?;
1354 }
1355 self.client.start_stream(profile)
1356 }
1357
1358 pub async fn send_frame(
1359 &mut self,
1360 channel_format: ChannelFormat,
1361 channels: Vec<u16>,
1362 priority: u8,
1363 groups: Option<HashMap<String, Vec<u16>>>,
1364 metadata: Option<HashMap<String, Value>>,
1365 ) -> Result<(), AlpineSdkError> {
1366 if self.options.require_online {
1367 self.refresh_probe().await?;
1368 }
1369 self.client
1370 .send_frame(channel_format, channels, priority, groups, metadata)
1371 }
1372
1373 pub async fn close(self) {
1374 self.client.close().await;
1375 }
1376
1377 pub fn into_inner(self) -> AlpineClient {
1378 self.client
1379 }
1380
1381 fn acceptable_probe_state(&self, probe: &ProbeResult) -> bool {
1382 match probe.state {
1383 ProbeState::Online => true,
1384 ProbeState::Degraded => self.options.allow_degraded,
1385 ProbeState::Offline => false,
1386 }
1387 }
1388}
1389
1390#[derive(Clone)]
1391struct SharedTransport<T> {
1392 inner: Arc<Mutex<T>>,
1393}
1394
1395impl<T> SharedTransport<T> {
1396 fn new(inner: Arc<Mutex<T>>) -> Self {
1397 Self { inner }
1398 }
1399}
1400
1401#[async_trait]
1402impl<T> HandshakeTransport for SharedTransport<T>
1403where
1404 T: HandshakeTransport + Send,
1405{
1406 async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
1407 let mut guard = self.inner.lock().await;
1408 guard.send(msg).await
1409 }
1410
1411 async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
1412 let mut guard = self.inner.lock().await;
1413 guard.recv().await
1414 }
1415}
1416
1417#[derive(Debug)]
1418struct InstrumentedTransport<T> {
1419 inner: T,
1420 local_addr: SocketAddr,
1421 remote_addr: SocketAddr,
1422 session_init_sent: bool,
1423}
1424
1425impl<T> InstrumentedTransport<T> {
1426 fn new(inner: T, local_addr: SocketAddr, remote_addr: SocketAddr) -> Self {
1427 Self {
1428 inner,
1429 local_addr,
1430 remote_addr,
1431 session_init_sent: false,
1432 }
1433 }
1434}
1435
1436#[async_trait]
1437impl<T> HandshakeTransport for InstrumentedTransport<T>
1438where
1439 T: HandshakeTransport + Send,
1440{
1441 async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
1442 let category = message_category(&msg);
1443 let phase_state = phase::current_phase();
1444 if matches!(msg, HandshakeMessage::SessionInit(_)) {
1445 if self.session_init_sent {
1446 warn!(
1447 "[ALPINE][HANDSHAKE][WARN] duplicate SessionInit send attempt detected local_port={} remote_addr={} phase={}",
1448 self.local_addr.port(),
1449 self.remote_addr,
1450 phase_state.label()
1451 );
1452 return Err(HandshakeError::Protocol(
1453 "duplicate SessionInit blocked; reset handshake first".into(),
1454 ));
1455 }
1456 self.session_init_sent = true;
1457 }
1458 if phase_state == Phase::Handshake && category != MessageCategory::Handshake {
1459 warn!(
1460 "[ALPINE][BUG] non-handshake packet emitted during handshake msg_type={} local_port={} remote_addr={}",
1461 category.as_str(),
1462 self.local_addr.port(),
1463 self.remote_addr
1464 );
1465 }
1466 let encoded_len = serde_cbor::to_vec(&msg).map(|buf| buf.len()).unwrap_or(0);
1467 info!(
1468 "[ALPINE][TX] msg_type={} variant={} local_port={} remote_addr={} phase={} len={}",
1469 category.as_str(),
1470 message_label(&msg),
1471 self.local_addr.port(),
1472 self.remote_addr,
1473 phase_state.label(),
1474 encoded_len
1475 );
1476 self.inner.send(msg).await
1477 }
1478
1479 async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
1480 let phase_state = phase::current_phase();
1481 let result = self.inner.recv().await;
1482 match &result {
1483 Ok(msg) => {
1484 let category = message_category(msg);
1485 info!(
1486 "[ALPINE][RX] msg_type={} variant={} local_port={} remote_addr={} phase={}",
1487 category.as_str(),
1488 message_label(msg),
1489 self.local_addr.port(),
1490 self.remote_addr,
1491 phase_state.label()
1492 );
1493 }
1494 Err(err) => {
1495 warn!(
1496 "[ALPINE][RX] recv error local_port={} remote_addr={} phase={} error={}",
1497 self.local_addr.port(),
1498 self.remote_addr,
1499 phase_state.label(),
1500 err
1501 );
1502 }
1503 }
1504 result
1505 }
1506}
1507
1508#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1509#[allow(dead_code)]
1510enum MessageCategory {
1511 Handshake,
1512 Control,
1513 Keepalive,
1514 Ack,
1515 Unknown,
1516}
1517
1518impl MessageCategory {
1519 fn as_str(&self) -> &'static str {
1520 match self {
1521 MessageCategory::Handshake => "Handshake",
1522 MessageCategory::Control => "Control",
1523 MessageCategory::Keepalive => "Keepalive",
1524 MessageCategory::Ack => "Ack",
1525 MessageCategory::Unknown => "Unknown",
1526 }
1527 }
1528}
1529
1530fn message_category(msg: &HandshakeMessage) -> MessageCategory {
1531 match msg {
1532 HandshakeMessage::SessionInit(_)
1533 | HandshakeMessage::SessionAck(_)
1534 | HandshakeMessage::SessionReady(_)
1535 | HandshakeMessage::SessionComplete(_)
1536 | HandshakeMessage::SessionEstablished(_) => MessageCategory::Handshake,
1537 HandshakeMessage::Control(_) => MessageCategory::Control,
1538 HandshakeMessage::Keepalive(_) => MessageCategory::Keepalive,
1539 HandshakeMessage::Ack(_) => MessageCategory::Ack,
1540 }
1541}
1542
1543fn message_label(msg: &HandshakeMessage) -> &'static str {
1544 match msg {
1545 HandshakeMessage::SessionInit(_) => "SessionInit",
1546 HandshakeMessage::SessionAck(_) => "SessionAck",
1547 HandshakeMessage::SessionReady(_) => "SessionReady",
1548 HandshakeMessage::SessionComplete(_) => "SessionComplete",
1549 HandshakeMessage::SessionEstablished(_) => "SessionEstablished",
1550 HandshakeMessage::Control(_) => "Control",
1551 HandshakeMessage::Keepalive(_) => "Keepalive",
1552 HandshakeMessage::Ack(_) => "Ack",
1553 }
1554}
1555
1556fn control_backoff(policy: &ControlRetryPolicy, attempt: usize) -> Duration {
1557 let exponent = attempt.saturating_sub(1) as u32;
1558 let factor = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
1559 let delay = policy.backoff_base_ms.saturating_mul(factor);
1560 Duration::from_millis(delay.min(policy.backoff_max_ms))
1561}
1562
1563fn is_builtin_vendor_command(command: &str) -> bool {
1564 matches!(
1565 command,
1566 "ping" | "status" | "health" | "identity" | "metadata"
1567 )
1568}
1569
1570fn validate_capability_subset(
1571 requested: &CapabilitySet,
1572 device: &CapabilitySet,
1573) -> Result<(), AlpineSdkError> {
1574 for format in requested.channel_formats.iter() {
1575 if !device.channel_formats.contains(format) {
1576 return Err(AlpineSdkError::InvalidCapabilities(format!(
1577 "channel format {:?} not supported by device",
1578 format
1579 )));
1580 }
1581 }
1582 if requested.max_channels > device.max_channels {
1583 return Err(AlpineSdkError::InvalidCapabilities(format!(
1584 "max channels {} exceeds device max {}",
1585 requested.max_channels, device.max_channels
1586 )));
1587 }
1588 if requested.grouping_supported && !device.grouping_supported {
1589 return Err(AlpineSdkError::InvalidCapabilities(
1590 "grouping not supported by device".into(),
1591 ));
1592 }
1593 if requested.streaming_supported && !device.streaming_supported {
1594 return Err(AlpineSdkError::InvalidCapabilities(
1595 "streaming not supported by device".into(),
1596 ));
1597 }
1598 if requested.encryption_supported && !device.encryption_supported {
1599 return Err(AlpineSdkError::InvalidCapabilities(
1600 "encryption not supported by device".into(),
1601 ));
1602 }
1603 if let Some(extensions) = requested.vendor_extensions.as_ref() {
1604 if let Some(device_extensions) = device.vendor_extensions.as_ref() {
1605 for key in extensions.keys() {
1606 if !device_extensions.contains_key(key) {
1607 return Err(AlpineSdkError::InvalidCapabilities(format!(
1608 "vendor extension {} not supported by device",
1609 key
1610 )));
1611 }
1612 }
1613 } else {
1614 return Err(AlpineSdkError::InvalidCapabilities(
1615 "vendor extensions not supported by device".into(),
1616 ));
1617 }
1618 }
1619 Ok(())
1620}