alpine_protocol_sdk/
client.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use alpine::control::{ControlClient, ControlCrypto};
7use alpine::crypto::identity::NodeCredentials;
8use alpine::crypto::X25519KeyExchange;
9use alpine::handshake::keepalive;
10use alpine::handshake::transport::{CborUdpTransport, TimeoutTransport};
11use alpine::handshake::{HandshakeContext, HandshakeError};
12use alpine::messages::{CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity};
13use alpine::profile::StreamProfile;
14use alpine::session::{AlnpSession, Ed25519Authenticator};
15use alpine::stream::AlnpStream;
16use serde_json::Value;
17use tokio::sync::Mutex;
18use tokio::task::JoinHandle;
19use uuid::Uuid;
20
21use crate::error::AlpineSdkError;
22use crate::transport::UdpFrameTransport;
23
24/// High-level client that wraps the ALPINE protocol primitives.
25#[derive(Debug)]
26pub struct AlpineClient {
27    session: AlnpSession,
28    _transport: Arc<Mutex<TimeoutTransport<CborUdpTransport>>>,
29    local_addr: SocketAddr,
30    remote_addr: SocketAddr,
31    stream: Option<AlnpStream<UdpFrameTransport>>,
32    control: ControlClient,
33    keepalive_handle: Option<JoinHandle<()>>,
34}
35
36impl AlpineClient {
37    /// Opens a session with the provided device identity and capabilities.
38    pub async fn connect(
39        local_addr: SocketAddr,
40        remote_addr: SocketAddr,
41        identity: DeviceIdentity,
42        capabilities: CapabilitySet,
43        credentials: NodeCredentials,
44    ) -> Result<Self, AlpineSdkError> {
45        let key_exchange = X25519KeyExchange::new();
46        let authenticator = Ed25519Authenticator::new(credentials.clone());
47
48        let mut transport = TimeoutTransport::new(
49            CborUdpTransport::bind(local_addr, remote_addr, 2048).await?,
50            Duration::from_secs(3),
51        );
52        let session = AlnpSession::connect(
53            identity,
54            capabilities.clone(),
55            authenticator,
56            key_exchange,
57            HandshakeContext::default(),
58            &mut transport,
59        )
60        .await?;
61
62        let transport = Arc::new(Mutex::new(transport));
63        let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
64            transport.clone(),
65            Duration::from_secs(5),
66            session
67                .established()
68                .ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?
69                .session_id,
70        ));
71
72        let established = session
73            .established()
74            .ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?;
75        let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
76            .unwrap_or_else(|_| Uuid::new_v4());
77        let control_crypto = ControlCrypto::new(
78            session
79                .keys()
80                .ok_or_else(|| AlpineSdkError::Io("session keys missing".into()))?,
81        );
82        let control = ControlClient::new(device_uuid, established.session_id, control_crypto);
83
84        Ok(Self {
85            session,
86            _transport: transport,
87            local_addr,
88            remote_addr,
89            stream: None,
90            control,
91            keepalive_handle: Some(keepalive_handle),
92        })
93    }
94
95    /// Starts streaming with the supplied profile and returns the generated config id.
96    pub fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
97        let compiled = profile
98            .compile()
99            .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
100        self.session
101            .set_stream_profile(compiled.clone())
102            .map_err(AlpineSdkError::Handshake)?;
103        self.session.mark_streaming();
104
105        let stream_socket = UdpFrameTransport::new(self.local_addr, self.remote_addr)?;
106        let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
107        self.stream = Some(stream);
108        Ok(compiled.config_id().to_string())
109    }
110
111    /// Sends a streaming frame over the active session.
112    pub fn send_frame(
113        &self,
114        channel_format: ChannelFormat,
115        channels: Vec<u16>,
116        priority: u8,
117        groups: Option<HashMap<String, Vec<u16>>>,
118        metadata: Option<HashMap<String, Value>>,
119    ) -> Result<(), AlpineSdkError> {
120        let stream = self
121            .stream
122            .as_ref()
123            .ok_or_else(|| AlpineSdkError::Io("stream not started".into()))?;
124        stream
125            .send(channel_format, channels, priority, groups, metadata)
126            .map_err(AlpineSdkError::from)
127    }
128
129    /// Stops keep-alive and shuts down the session.
130    pub async fn close(mut self) {
131        self.session.close();
132        if let Some(handle) = self.keepalive_handle.take() {
133            handle.abort();
134        }
135    }
136
137    /// Builds a signed control envelope for the active session.
138    pub fn control_envelope(
139        &self,
140        seq: u64,
141        op: ControlOp,
142        payload: Value,
143    ) -> Result<ControlEnvelope, HandshakeError> {
144        self.control.envelope(seq, op, payload)
145    }
146}