1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use alpine::control::{ControlClient, ControlCrypto};
7use alpine::crypto::identity::NodeCredentials;
8use alpine::crypto::X25519KeyExchange;
9use alpine::handshake::keepalive;
10use alpine::handshake::transport::{CborUdpTransport, ReliableControlChannel, TimeoutTransport};
11use alpine::handshake::{HandshakeContext, HandshakeError, HandshakeMessage, HandshakeTransport};
12use alpine::messages::{
13 Acknowledge, CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity,
14};
15use alpine::profile::StreamProfile;
16use alpine::session::{AlnpSession, Ed25519Authenticator};
17use alpine::stream::AlnpStream;
18use async_trait::async_trait;
19use serde::de::DeserializeOwned;
20use serde::Deserialize;
21use serde_json::{json, Value};
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24use uuid::Uuid;
25
26use crate::error::AlpineSdkError;
27use crate::transport::UdpFrameTransport;
28
29#[derive(Debug)]
31pub struct AlpineClient {
32 session: AlnpSession,
33 _transport: Arc<Mutex<TimeoutTransport<CborUdpTransport>>>,
34 local_addr: SocketAddr,
35 remote_addr: SocketAddr,
36 stream: Option<AlnpStream<UdpFrameTransport>>,
37 control: ControlClient,
38 keepalive_handle: Option<JoinHandle<()>>,
39}
40
41#[derive(Debug)]
43pub struct ControlReply<T> {
44 pub ack: Acknowledge,
45 pub payload: Option<T>,
46}
47
48impl<T> ControlReply<T> {
49 pub fn ok(&self) -> bool {
50 self.ack.ok
51 }
52
53 pub fn detail(&self) -> Option<&str> {
54 self.ack.detail.as_deref()
55 }
56}
57
58#[derive(Debug, Deserialize)]
60pub struct PingReply {
61 #[serde(default)]
62 pub timestamp_ms: Option<u64>,
63 #[serde(default)]
64 pub message: Option<String>,
65}
66
67#[derive(Debug, Deserialize)]
69pub struct StatusReply {
70 #[serde(default)]
71 pub healthy: Option<bool>,
72 #[serde(default)]
73 pub detail: Option<String>,
74 #[serde(default)]
75 pub uptime_secs: Option<u64>,
76}
77
78#[derive(Debug, Deserialize)]
80pub struct HealthReply {
81 #[serde(default)]
82 pub healthy: Option<bool>,
83 #[serde(default)]
84 pub detail: Option<String>,
85 #[serde(default)]
86 pub metrics: Option<HashMap<String, Value>>,
87}
88
89pub type IdentityReply = DeviceIdentity;
91
92#[derive(Debug, Deserialize)]
94pub struct MetadataReply {
95 #[serde(default)]
96 pub metadata: HashMap<String, Value>,
97}
98
99impl AlpineClient {
100 pub async fn connect(
102 local_addr: SocketAddr,
103 remote_addr: SocketAddr,
104 identity: DeviceIdentity,
105 capabilities: CapabilitySet,
106 credentials: NodeCredentials,
107 ) -> Result<Self, AlpineSdkError> {
108 let key_exchange = X25519KeyExchange::new();
109 let authenticator = Ed25519Authenticator::new(credentials.clone());
110
111 let mut transport = TimeoutTransport::new(
112 CborUdpTransport::bind(local_addr, remote_addr, 2048).await?,
113 Duration::from_secs(3),
114 );
115 let session = AlnpSession::connect(
116 identity,
117 capabilities.clone(),
118 authenticator,
119 key_exchange,
120 HandshakeContext::default(),
121 &mut transport,
122 )
123 .await?;
124
125 let transport = Arc::new(Mutex::new(transport));
126 let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
127 transport.clone(),
128 Duration::from_secs(5),
129 session
130 .established()
131 .ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?
132 .session_id,
133 ));
134
135 let established = session
136 .established()
137 .ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?;
138 let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
139 .unwrap_or_else(|_| Uuid::new_v4());
140 let control_crypto = ControlCrypto::new(
141 session
142 .keys()
143 .ok_or_else(|| AlpineSdkError::Io("session keys missing".into()))?,
144 );
145 let control = ControlClient::new(device_uuid, established.session_id, control_crypto);
146
147 Ok(Self {
148 session,
149 _transport: transport,
150 local_addr,
151 remote_addr,
152 stream: None,
153 control,
154 keepalive_handle: Some(keepalive_handle),
155 })
156 }
157
158 pub fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
160 let compiled = profile
161 .compile()
162 .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
163 self.session
164 .set_stream_profile(compiled.clone())
165 .map_err(AlpineSdkError::Handshake)?;
166 self.session.mark_streaming();
167
168 let stream_socket = UdpFrameTransport::new(self.local_addr, self.remote_addr)?;
169 let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
170 self.stream = Some(stream);
171 Ok(compiled.config_id().to_string())
172 }
173
174 pub fn send_frame(
176 &self,
177 channel_format: ChannelFormat,
178 channels: Vec<u16>,
179 priority: u8,
180 groups: Option<HashMap<String, Vec<u16>>>,
181 metadata: Option<HashMap<String, Value>>,
182 ) -> Result<(), AlpineSdkError> {
183 let stream = self
184 .stream
185 .as_ref()
186 .ok_or_else(|| AlpineSdkError::Io("stream not started".into()))?;
187 stream
188 .send(channel_format, channels, priority, groups, metadata)
189 .map_err(AlpineSdkError::from)
190 }
191
192 pub async fn close(mut self) {
194 self.session.close();
195 if let Some(handle) = self.keepalive_handle.take() {
196 handle.abort();
197 }
198 }
199
200 pub fn control_envelope(
202 &self,
203 seq: u64,
204 op: ControlOp,
205 payload: Value,
206 ) -> Result<ControlEnvelope, HandshakeError> {
207 self.control.envelope(seq, op, payload)
208 }
209
210 pub async fn ping(&self) -> Result<ControlReply<PingReply>, AlpineSdkError> {
212 self.control_command("ping").await
213 }
214
215 pub async fn status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
217 self.control_command("status").await
218 }
219
220 pub async fn health(&self) -> Result<ControlReply<HealthReply>, AlpineSdkError> {
222 self.control_command("health").await
223 }
224
225 pub async fn identity(&self) -> Result<ControlReply<IdentityReply>, AlpineSdkError> {
227 self.control_command("identity").await
228 }
229
230 pub async fn metadata(&self) -> Result<ControlReply<MetadataReply>, AlpineSdkError> {
232 self.control_command("metadata").await
233 }
234
235 async fn control_command<T>(&self, command: &str) -> Result<ControlReply<T>, AlpineSdkError>
236 where
237 T: DeserializeOwned,
238 {
239 let payload = json!({ "command": command });
240 self.control_request(ControlOp::Vendor, payload).await
241 }
242
243 async fn control_request<T>(
244 &self,
245 op: ControlOp,
246 payload: Value,
247 ) -> Result<ControlReply<T>, AlpineSdkError>
248 where
249 T: DeserializeOwned,
250 {
251 let transport = SharedTransport::new(self._transport.clone());
252 let mut channel = ReliableControlChannel::new(transport);
253 let ack = self.control.send(&mut channel, op, payload).await?;
254 let parsed = ControlCrypto::decode_ack_payload::<T>(ack.payload.as_deref())
255 .map_err(AlpineSdkError::from)?;
256 Ok(ControlReply {
257 ack,
258 payload: parsed,
259 })
260 }
261}
262
263#[derive(Clone)]
264struct SharedTransport<T> {
265 inner: Arc<Mutex<T>>,
266}
267
268impl<T> SharedTransport<T> {
269 fn new(inner: Arc<Mutex<T>>) -> Self {
270 Self { inner }
271 }
272}
273
274#[async_trait]
275impl<T> HandshakeTransport for SharedTransport<T>
276where
277 T: HandshakeTransport + Send,
278{
279 async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
280 let mut guard = self.inner.lock().await;
281 guard.send(msg).await
282 }
283
284 async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
285 let mut guard = self.inner.lock().await;
286 guard.recv().await
287 }
288}