alpine/sdk/
client.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::net::SocketAddr;
4use std::net::UdpSocket as StdUdpSocket;
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::sync::Mutex;
9use tokio::task::JoinHandle;
10
11use crate::control::{ControlClient, ControlCrypto};
12use crate::crypto::identity::NodeCredentials;
13use crate::crypto::X25519KeyExchange;
14use crate::handshake::keepalive;
15use crate::handshake::transport::{CborUdpTransport, TimeoutTransport};
16use crate::handshake::{HandshakeContext, HandshakeError};
17use crate::messages::{CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity};
18use crate::profile::{CompiledStreamProfile, StreamProfile};
19use crate::session::AlnpSession;
20use crate::stream::{AlnpStream, FrameTransport, StreamError};
21use serde_json::Value;
22use uuid::Uuid;
23
24/// Errors emitted by the high-level SDK client.
25///
26/// These variants indicate what happened during discovery/handshake, streaming,
27/// or UDP transport. They correspond to the guarantees documented on each method.
28#[derive(Debug)]
29#[non_exhaustive]
30pub enum ClientError {
31    /// OS-level failures such as socket bind/send errors or missing session data.
32    Io(String),
33    /// Handshake or session establishment failures propagated from `AlnpSession`.
34    Handshake(HandshakeError),
35    /// Streaming transport errors (e.g., `AlnpStream::send`).
36    Stream(StreamError),
37}
38
39impl fmt::Display for ClientError {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            ClientError::Io(err) => write!(f, "io error: {}", err),
43            ClientError::Handshake(err) => write!(f, "handshake error: {}", err),
44            ClientError::Stream(err) => write!(f, "stream error: {}", err),
45        }
46    }
47}
48
49impl From<HandshakeError> for ClientError {
50    fn from(err: HandshakeError) -> Self {
51        ClientError::Handshake(err)
52    }
53}
54
55impl From<StreamError> for ClientError {
56    fn from(err: StreamError) -> Self {
57        ClientError::Stream(err)
58    }
59}
60
61impl From<std::io::Error> for ClientError {
62    fn from(err: std::io::Error) -> Self {
63        ClientError::Io(err.to_string())
64    }
65}
66
67/// Thin UDP transport for the ALPINE streaming layer.
68#[derive(Debug)]
69struct UdpFrameTransport {
70    socket: StdUdpSocket,
71    peer: SocketAddr,
72}
73
74impl UdpFrameTransport {
75    fn new(local: SocketAddr, peer: SocketAddr) -> Result<Self, std::io::Error> {
76        let socket = StdUdpSocket::bind(local)?;
77        socket.connect(peer)?;
78        Ok(Self { socket, peer })
79    }
80}
81
82impl FrameTransport for UdpFrameTransport {
83    fn send_frame(&self, bytes: &[u8]) -> Result<(), String> {
84        self.socket
85            .send(bytes)
86            .map_err(|e| format!("udp stream send: {}", e))?;
87        Ok(())
88    }
89}
90
91/// High-level controller client that orchestrates discovery, handshake, streaming,
92/// control, and keepalive flows.
93///
94/// # Guarantees
95/// * Handshake runs over `TimeoutTransport<CborUdpTransport>` and fails fast.
96/// * Streaming uses a compiled `StreamProfile` and cannot change behavior once active.
97/// * Keepalive tasks start after handshake and abort on `close()`.
98#[derive(Debug)]
99pub struct AlpineClient {
100    session: AlnpSession,
101    transport: Arc<Mutex<TimeoutTransport<CborUdpTransport>>>,
102    local_addr: SocketAddr,
103    remote_addr: SocketAddr,
104    stream: Option<AlnpStream<UdpFrameTransport>>,
105    control: ControlClient,
106    keepalive_handle: Option<JoinHandle<()>>,
107}
108
109impl AlpineClient {
110    /// Connects to a remote ALPINE device using the provided credentials.
111    ///
112    /// # Behavior
113    /// * Executes discovery/handshake via `CborUdpTransport` and `TimeoutTransport`.
114    /// * Spins up a keepalive future that ticks every 5 seconds.
115    /// * Builds `ControlClient` once keys are derived so `control_envelope` works.
116    ///
117    /// # Errors
118    /// Returns `ClientError::Io` for socket failures or missing session material,
119    /// `ClientError::Handshake` for protocol errors, and `ClientError::Stream` for
120    /// transport issues.
121    pub async fn connect(
122        local_addr: SocketAddr,
123        remote_addr: SocketAddr,
124        identity: DeviceIdentity,
125        capabilities: CapabilitySet,
126        credentials: NodeCredentials,
127    ) -> Result<Self, ClientError> {
128        let key_exchange = X25519KeyExchange::new();
129        let authenticator = crate::session::Ed25519Authenticator::new(credentials.clone());
130
131        let mut transport = TimeoutTransport::new(
132            CborUdpTransport::bind(local_addr, remote_addr, 2048).await?,
133            Duration::from_secs(3),
134        );
135        let session = AlnpSession::connect(
136            identity,
137            capabilities.clone(),
138            authenticator,
139            key_exchange,
140            HandshakeContext::default(),
141            &mut transport,
142        )
143        .await?;
144
145        let transport = Arc::new(Mutex::new(transport));
146        let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
147            transport.clone(),
148            Duration::from_secs(5),
149            session
150                .established()
151                .ok_or_else(|| ClientError::Io("session missing after handshake".into()))?
152                .session_id,
153        ));
154
155        let established = session
156            .established()
157            .ok_or_else(|| ClientError::Io("session missing after handshake".into()))?;
158        let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
159            .unwrap_or_else(|_| Uuid::new_v4());
160        let control_crypto = ControlCrypto::new(
161            session
162                .keys()
163                .ok_or_else(|| ClientError::Io("session keys missing".into()))?,
164        );
165        let control = ControlClient::new(device_uuid, established.session_id, control_crypto);
166
167        Ok(Self {
168            session,
169            transport,
170            local_addr,
171            remote_addr,
172            stream: None,
173            control,
174            keepalive_handle: Some(keepalive_handle),
175        })
176    }
177
178    /// Starts streaming with the selected profile; `Auto` is the default.
179    ///
180    /// # Guarantees
181    /// * Profiles are validated/normalized; invalid combinations return explicit errors.
182    /// * `config_id` is bound to the session and can't change once streaming begins.
183    /// * Streaming transport is built after the profile is locked.
184    ///
185    /// # Errors
186    /// Returns `ClientError::Io` for socket issues or session material that is missing.
187    /// Returns `ClientError::Handshake` if the profile cannot be bound or the session rejects it.
188    #[must_use]
189    pub async fn start_stream(&mut self, profile: StreamProfile) -> Result<String, ClientError> {
190        let compiled = profile
191            .compile()
192            .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
193        self.session
194            .set_stream_profile(compiled.clone())
195            .map_err(ClientError::Handshake)?;
196        self.session.mark_streaming();
197
198        let stream_socket = UdpFrameTransport::new(self.local_addr, self.remote_addr)?;
199        let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
200        self.stream = Some(stream);
201        Ok(compiled.config_id().to_string())
202    }
203
204    /// Sends a streaming frame via the high-level helper.
205    ///
206    /// # Guarantees
207    /// * Validation reuses `AlnpStream`, so it refuses to send when the session is not ready.
208    /// * Applies jitter strategy before encoding.
209    /// * Requires `start_stream` to have bound a profile before calling.
210    ///
211    /// # Errors
212    /// Returns `StreamError` wrapped in `ClientError::Stream`.
213    #[must_use]
214    pub fn send_frame(
215        &self,
216        channel_format: ChannelFormat,
217        channels: Vec<u16>,
218        priority: u8,
219        groups: Option<HashMap<String, Vec<u16>>>,
220        metadata: Option<HashMap<String, serde_json::Value>>,
221    ) -> Result<(), ClientError> {
222        let stream = self
223            .stream
224            .as_ref()
225            .ok_or_else(|| ClientError::Io("stream not started".into()))?;
226        stream
227            .send(channel_format, channels, priority, groups, metadata)
228            .map_err(ClientError::from)
229    }
230
231    /// Gracefully closes the client, stopping keepalive tasks.
232    ///
233    /// # Behavior
234    /// * Transitions the session state to closed.
235    /// * Aborts the keepalive background job immediately.
236    pub async fn close(mut self) {
237        self.session.close();
238        if let Some(handle) = self.keepalive_handle.take() {
239            handle.abort();
240        }
241    }
242
243    /// Builds an authenticated control envelope ready for transport.
244    ///
245    /// # Guarantees
246    /// * Seals the payload with a MAC derived from the session keys.
247    /// * Does not mutate transport state.
248    ///
249    /// # Errors
250    /// Propagates the underlying `HandshakeError` returned while computing MACs.
251    #[must_use]
252    pub fn control_envelope(
253        &self,
254        seq: u64,
255        op: ControlOp,
256        payload: Value,
257    ) -> Result<ControlEnvelope, HandshakeError> {
258        self.control.envelope(seq, op, payload)
259    }
260}