alpine_protocol_sdk/
client.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use alpine::control::{ControlClient, ControlCrypto};
7use alpine::crypto::identity::NodeCredentials;
8use alpine::crypto::X25519KeyExchange;
9use alpine::handshake::keepalive;
10use alpine::handshake::transport::{CborUdpTransport, ReliableControlChannel, TimeoutTransport};
11use alpine::handshake::{HandshakeContext, HandshakeError, HandshakeMessage, HandshakeTransport};
12use alpine::messages::{
13    Acknowledge, CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity,
14};
15use alpine::profile::StreamProfile;
16use alpine::session::{AlnpSession, Ed25519Authenticator};
17use alpine::stream::AlnpStream;
18use async_trait::async_trait;
19use serde::de::DeserializeOwned;
20use serde::Deserialize;
21use serde_json::{json, Value};
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24use uuid::Uuid;
25
26use crate::error::AlpineSdkError;
27use crate::transport::UdpFrameTransport;
28
29/// High-level client that wraps the ALPINE protocol primitives.
30#[derive(Debug)]
31pub struct AlpineClient {
32    session: AlnpSession,
33    _transport: Arc<Mutex<TimeoutTransport<CborUdpTransport>>>,
34    local_addr: SocketAddr,
35    remote_addr: SocketAddr,
36    stream: Option<AlnpStream<UdpFrameTransport>>,
37    control: ControlClient,
38    keepalive_handle: Option<JoinHandle<()>>,
39}
40
41/// Typed control response returned by `AlpineClient` helpers.
42#[derive(Debug)]
43pub struct ControlReply<T> {
44    pub ack: Acknowledge,
45    pub payload: Option<T>,
46}
47
48impl<T> ControlReply<T> {
49    pub fn ok(&self) -> bool {
50        self.ack.ok
51    }
52
53    pub fn detail(&self) -> Option<&str> {
54        self.ack.detail.as_deref()
55    }
56}
57
58/// Ping reply payload (may be partial depending on device support).
59#[derive(Debug, Deserialize)]
60pub struct PingReply {
61    #[serde(default)]
62    pub timestamp_ms: Option<u64>,
63    #[serde(default)]
64    pub message: Option<String>,
65}
66
67/// Status reply payload returned by the `status` helper.
68#[derive(Debug, Deserialize)]
69pub struct StatusReply {
70    #[serde(default)]
71    pub healthy: Option<bool>,
72    #[serde(default)]
73    pub detail: Option<String>,
74    #[serde(default)]
75    pub uptime_secs: Option<u64>,
76}
77
78/// Health reply payload, including optional metrics metadata.
79#[derive(Debug, Deserialize)]
80pub struct HealthReply {
81    #[serde(default)]
82    pub healthy: Option<bool>,
83    #[serde(default)]
84    pub detail: Option<String>,
85    #[serde(default)]
86    pub metrics: Option<HashMap<String, Value>>,
87}
88
89/// Alias for the fetched device identity.
90pub type IdentityReply = DeviceIdentity;
91
92/// Metadata reply payload is an arbitrary map of CBOR values.
93#[derive(Debug, Deserialize)]
94pub struct MetadataReply {
95    #[serde(default)]
96    pub metadata: HashMap<String, Value>,
97}
98
99impl AlpineClient {
100    /// Opens a session with the provided device identity and capabilities.
101    pub async fn connect(
102        local_addr: SocketAddr,
103        remote_addr: SocketAddr,
104        identity: DeviceIdentity,
105        capabilities: CapabilitySet,
106        credentials: NodeCredentials,
107    ) -> Result<Self, AlpineSdkError> {
108        let key_exchange = X25519KeyExchange::new();
109        let authenticator = Ed25519Authenticator::new(credentials.clone());
110
111        let mut transport = TimeoutTransport::new(
112            CborUdpTransport::bind(local_addr, remote_addr, 2048).await?,
113            Duration::from_secs(3),
114        );
115        let session = AlnpSession::connect(
116            identity,
117            capabilities.clone(),
118            authenticator,
119            key_exchange,
120            HandshakeContext::default(),
121            &mut transport,
122        )
123        .await?;
124
125        let transport = Arc::new(Mutex::new(transport));
126        let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
127            transport.clone(),
128            Duration::from_secs(5),
129            session
130                .established()
131                .ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?
132                .session_id,
133        ));
134
135        let established = session
136            .established()
137            .ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?;
138        let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
139            .unwrap_or_else(|_| Uuid::new_v4());
140        let control_crypto = ControlCrypto::new(
141            session
142                .keys()
143                .ok_or_else(|| AlpineSdkError::Io("session keys missing".into()))?,
144        );
145        let control = ControlClient::new(device_uuid, established.session_id, control_crypto);
146
147        Ok(Self {
148            session,
149            _transport: transport,
150            local_addr,
151            remote_addr,
152            stream: None,
153            control,
154            keepalive_handle: Some(keepalive_handle),
155        })
156    }
157
158    /// Starts streaming with the supplied profile and returns the generated config id.
159    pub fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
160        let compiled = profile
161            .compile()
162            .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
163        self.session
164            .set_stream_profile(compiled.clone())
165            .map_err(AlpineSdkError::Handshake)?;
166        self.session.mark_streaming();
167
168        let stream_socket = UdpFrameTransport::new(self.local_addr, self.remote_addr)?;
169        let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
170        self.stream = Some(stream);
171        Ok(compiled.config_id().to_string())
172    }
173
174    /// Sends a streaming frame over the active session.
175    pub fn send_frame(
176        &self,
177        channel_format: ChannelFormat,
178        channels: Vec<u16>,
179        priority: u8,
180        groups: Option<HashMap<String, Vec<u16>>>,
181        metadata: Option<HashMap<String, Value>>,
182    ) -> Result<(), AlpineSdkError> {
183        let stream = self
184            .stream
185            .as_ref()
186            .ok_or_else(|| AlpineSdkError::Io("stream not started".into()))?;
187        stream
188            .send(channel_format, channels, priority, groups, metadata)
189            .map_err(AlpineSdkError::from)
190    }
191
192    /// Stops keep-alive and shuts down the session.
193    pub async fn close(mut self) {
194        self.session.close();
195        if let Some(handle) = self.keepalive_handle.take() {
196            handle.abort();
197        }
198    }
199
200    /// Builds a signed control envelope for the active session.
201    pub fn control_envelope(
202        &self,
203        seq: u64,
204        op: ControlOp,
205        payload: Value,
206    ) -> Result<ControlEnvelope, HandshakeError> {
207        self.control.envelope(seq, op, payload)
208    }
209
210    /// Sends a ping command and returns the parsed reply (CBOR payload optional).
211    pub async fn ping(&self) -> Result<ControlReply<PingReply>, AlpineSdkError> {
212        self.control_command("ping").await
213    }
214
215    /// Returns the status payload the node publishes for callers.
216    pub async fn status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
217        self.control_command("status").await
218    }
219
220    /// Reads the health payload, including optional metrics.
221    pub async fn health(&self) -> Result<ControlReply<HealthReply>, AlpineSdkError> {
222        self.control_command("health").await
223    }
224
225    /// Requests the device identity through the control channel.
226    pub async fn identity(&self) -> Result<ControlReply<IdentityReply>, AlpineSdkError> {
227        self.control_command("identity").await
228    }
229
230    /// Fetches metadata that the device publishes in CBOR.
231    pub async fn metadata(&self) -> Result<ControlReply<MetadataReply>, AlpineSdkError> {
232        self.control_command("metadata").await
233    }
234
235    async fn control_command<T>(&self, command: &str) -> Result<ControlReply<T>, AlpineSdkError>
236    where
237        T: DeserializeOwned,
238    {
239        let payload = json!({ "command": command });
240        self.control_request(ControlOp::Vendor, payload).await
241    }
242
243    async fn control_request<T>(
244        &self,
245        op: ControlOp,
246        payload: Value,
247    ) -> Result<ControlReply<T>, AlpineSdkError>
248    where
249        T: DeserializeOwned,
250    {
251        let transport = SharedTransport::new(self._transport.clone());
252        let mut channel = ReliableControlChannel::new(transport);
253        let ack = self.control.send(&mut channel, op, payload).await?;
254        let parsed = ControlCrypto::decode_ack_payload::<T>(ack.payload.as_deref())
255            .map_err(AlpineSdkError::from)?;
256        Ok(ControlReply {
257            ack,
258            payload: parsed,
259        })
260    }
261}
262
263#[derive(Clone)]
264struct SharedTransport<T> {
265    inner: Arc<Mutex<T>>,
266}
267
268impl<T> SharedTransport<T> {
269    fn new(inner: Arc<Mutex<T>>) -> Self {
270        Self { inner }
271    }
272}
273
274#[async_trait]
275impl<T> HandshakeTransport for SharedTransport<T>
276where
277    T: HandshakeTransport + Send,
278{
279    async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
280        let mut guard = self.inner.lock().await;
281        guard.send(msg).await
282    }
283
284    async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
285        let mut guard = self.inner.lock().await;
286        guard.recv().await
287    }
288}