alpine/sdk/
client.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::net::SocketAddr;
4use std::net::UdpSocket as StdUdpSocket;
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::sync::Mutex;
9use tokio::task::JoinHandle;
10
11use crate::crypto::identity::NodeCredentials;
12use crate::crypto::X25519KeyExchange;
13use crate::control::{ControlClient, ControlCrypto};
14use crate::handshake::keepalive;
15use crate::handshake::transport::{CborUdpTransport, TimeoutTransport};
16use crate::handshake::{HandshakeContext, HandshakeError};
17use crate::messages::{CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity};
18use crate::profile::{CompiledStreamProfile, StreamProfile};
19use crate::session::AlnpSession;
20use crate::stream::{AlnpStream, FrameTransport, StreamError};
21use serde_json::Value;
22use uuid::Uuid;
23
24/// Errors emitted by the high-level SDK client.
25///
26/// These variants indicate what happened during discovery/handshake, streaming,
27/// or UDP transport. They correspond to the guarantees documented on each method.
28#[derive(Debug)]
29#[non_exhaustive]
30pub enum ClientError {
31    /// OS-level failures such as socket bind/send errors or missing session data.
32    Io(String),
33    /// Handshake or session establishment failures propagated from `AlnpSession`.
34    Handshake(HandshakeError),
35    /// Streaming transport errors (e.g., `AlnpStream::send`).
36    Stream(StreamError),
37}
38
39
40impl fmt::Display for ClientError {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        match self {
43            ClientError::Io(err) => write!(f, "io error: {}", err),
44            ClientError::Handshake(err) => write!(f, "handshake error: {}", err),
45            ClientError::Stream(err) => write!(f, "stream error: {}", err),
46        }
47    }
48}
49
50impl From<HandshakeError> for ClientError {
51    fn from(err: HandshakeError) -> Self {
52        ClientError::Handshake(err)
53    }
54}
55
56impl From<StreamError> for ClientError {
57    fn from(err: StreamError) -> Self {
58        ClientError::Stream(err)
59    }
60}
61
62impl From<std::io::Error> for ClientError {
63    fn from(err: std::io::Error) -> Self {
64        ClientError::Io(err.to_string())
65    }
66}
67
68/// Thin UDP transport for the ALPINE streaming layer.
69#[derive(Debug)]
70struct UdpFrameTransport {
71    socket: StdUdpSocket,
72    peer: SocketAddr,
73}
74
75impl UdpFrameTransport {
76    fn new(local: SocketAddr, peer: SocketAddr) -> Result<Self, std::io::Error> {
77        let socket = StdUdpSocket::bind(local)?;
78        socket.connect(peer)?;
79        Ok(Self { socket, peer })
80    }
81}
82
83impl FrameTransport for UdpFrameTransport {
84    fn send_frame(&self, bytes: &[u8]) -> Result<(), String> {
85        self.socket
86            .send(bytes)
87            .map_err(|e| format!("udp stream send: {}", e))?;
88        Ok(())
89    }
90}
91
92/// High-level controller client that orchestrates discovery, handshake, streaming,
93/// control, and keepalive flows.
94///
95/// # Guarantees
96/// * Handshake runs over `TimeoutTransport<CborUdpTransport>` and fails fast.
97/// * Streaming uses a compiled `StreamProfile` and cannot change behavior once active.
98/// * Keepalive tasks start after handshake and abort on `close()`.
99#[derive(Debug)]
100pub struct AlpineClient {
101    session: AlnpSession,
102    transport: Arc<Mutex<TimeoutTransport<CborUdpTransport>>>,
103    local_addr: SocketAddr,
104    remote_addr: SocketAddr,
105    stream: Option<AlnpStream<UdpFrameTransport>>,
106    control: ControlClient,
107    keepalive_handle: Option<JoinHandle<()>>,
108}
109
110impl AlpineClient {
111    /// Connects to a remote ALPINE device using the provided credentials.
112    ///
113    /// # Behavior
114    /// * Executes discovery/handshake via `CborUdpTransport` and `TimeoutTransport`.
115    /// * Spins up a keepalive future that ticks every 5 seconds.
116    /// * Builds `ControlClient` once keys are derived so `control_envelope` works.
117    ///
118    /// # Errors
119    /// Returns `ClientError::Io` for socket failures or missing session material,
120    /// `ClientError::Handshake` for protocol errors, and `ClientError::Stream` for
121    /// transport issues.
122   pub async fn connect(
123        local_addr: SocketAddr,
124        remote_addr: SocketAddr,
125        identity: DeviceIdentity,
126        capabilities: CapabilitySet,
127        credentials: NodeCredentials,
128    ) -> Result<Self, ClientError> {
129        let key_exchange = X25519KeyExchange::new();
130        let authenticator = crate::session::Ed25519Authenticator::new(credentials.clone());
131
132        let mut transport =
133            TimeoutTransport::new(CborUdpTransport::bind(local_addr, remote_addr, 2048).await?, Duration::from_secs(3));
134        let session = AlnpSession::connect(
135            identity,
136            capabilities.clone(),
137            authenticator,
138            key_exchange,
139            HandshakeContext::default(),
140            &mut transport,
141        )
142        .await?;
143
144        let transport = Arc::new(Mutex::new(transport));
145        let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
146            transport.clone(),
147            Duration::from_secs(5),
148            session
149                .established()
150                .ok_or_else(|| ClientError::Io("session missing after handshake".into()))?
151                .session_id,
152        ));
153
154        let established = session
155            .established()
156            .ok_or_else(|| ClientError::Io("session missing after handshake".into()))?;
157        let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
158            .unwrap_or_else(|_| Uuid::new_v4());
159        let control_crypto = ControlCrypto::new(
160            session
161                .keys()
162                .ok_or_else(|| ClientError::Io("session keys missing".into()))?,
163        );
164        let control = ControlClient::new(device_uuid, established.session_id, control_crypto);
165
166        Ok(Self {
167            session,
168            transport,
169            local_addr,
170            remote_addr,
171            stream: None,
172            control,
173            keepalive_handle: Some(keepalive_handle),
174        })
175    }
176
177    /// Starts streaming with the selected profile; `Auto` is the default.
178    ///
179    /// # Guarantees
180    /// * Profiles are validated/normalized; invalid combinations return explicit errors.
181    /// * `config_id` is bound to the session and can't change once streaming begins.
182    /// * Streaming transport is built after the profile is locked.
183    ///
184    /// # Errors
185    /// Returns `ClientError::Io` for socket issues or session material that is missing.
186    /// Returns `ClientError::Handshake` if the profile cannot be bound or the session rejects it.
187    #[must_use]
188    pub async fn start_stream(
189        &mut self,
190        profile: StreamProfile,
191    ) -> Result<String, ClientError> {
192        let compiled = profile
193            .compile()
194            .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
195        self.session
196            .set_stream_profile(compiled.clone())
197            .map_err(ClientError::Handshake)?;
198        self.session.mark_streaming();
199
200        let stream_socket = UdpFrameTransport::new(self.local_addr, self.remote_addr)?;
201        let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
202        self.stream = Some(stream);
203        Ok(compiled.config_id().to_string())
204    }
205
206    /// Sends a streaming frame via the high-level helper.
207    ///
208    /// # Guarantees
209    /// * Validation reuses `AlnpStream`, so it refuses to send when the session is not ready.
210    /// * Applies jitter strategy before encoding.
211    /// * Requires `start_stream` to have bound a profile before calling.
212    ///
213    /// # Errors
214    /// Returns `StreamError` wrapped in `ClientError::Stream`.
215    #[must_use]
216    pub fn send_frame(
217        &self,
218        channel_format: ChannelFormat,
219        channels: Vec<u16>,
220        priority: u8,
221        groups: Option<HashMap<String, Vec<u16>>>,
222        metadata: Option<HashMap<String, serde_json::Value>>,
223    ) -> Result<(), ClientError> {
224        let stream = self
225            .stream
226            .as_ref()
227            .ok_or_else(|| ClientError::Io("stream not started".into()))?;
228        stream
229            .send(channel_format, channels, priority, groups, metadata)
230            .map_err(ClientError::from)
231    }
232
233    /// Gracefully closes the client, stopping keepalive tasks.
234    ///
235    /// # Behavior
236    /// * Transitions the session state to closed.
237    /// * Aborts the keepalive background job immediately.
238    pub async fn close(mut self) {
239        self.session.close();
240        if let Some(handle) = self.keepalive_handle.take() {
241            handle.abort();
242        }
243    }
244
245    /// Builds an authenticated control envelope ready for transport.
246    ///
247    /// # Guarantees
248    /// * Seals the payload with a MAC derived from the session keys.
249    /// * Does not mutate transport state.
250    ///
251    /// # Errors
252    /// Propagates the underlying `HandshakeError` returned while computing MACs.
253    #[must_use]
254    pub fn control_envelope(
255        &self,
256        seq: u64,
257        op: ControlOp,
258        payload: Value,
259    ) -> Result<ControlEnvelope, HandshakeError> {
260        self.control.envelope(seq, op, payload)
261    }
262}