1use std::collections::HashMap;
2use std::fmt;
3use std::net::SocketAddr;
4use std::net::UdpSocket as StdUdpSocket;
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::sync::Mutex;
9use tokio::task::JoinHandle;
10
11use crate::control::{ControlClient, ControlCrypto};
12use crate::crypto::identity::NodeCredentials;
13use crate::crypto::X25519KeyExchange;
14use crate::handshake::keepalive;
15use crate::handshake::transport::{CborUdpTransport, TimeoutTransport};
16use crate::handshake::{HandshakeContext, HandshakeError};
17use crate::messages::{CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity};
18use crate::profile::{CompiledStreamProfile, StreamProfile};
19use crate::session::AlnpSession;
20use crate::stream::{AlnpStream, FrameTransport, StreamError};
21use serde_json::Value;
22use uuid::Uuid;
23
24#[derive(Debug)]
29#[non_exhaustive]
30pub enum ClientError {
31 Io(String),
33 Handshake(HandshakeError),
35 Stream(StreamError),
37}
38
39impl fmt::Display for ClientError {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 ClientError::Io(err) => write!(f, "io error: {}", err),
43 ClientError::Handshake(err) => write!(f, "handshake error: {}", err),
44 ClientError::Stream(err) => write!(f, "stream error: {}", err),
45 }
46 }
47}
48
49impl From<HandshakeError> for ClientError {
50 fn from(err: HandshakeError) -> Self {
51 ClientError::Handshake(err)
52 }
53}
54
55impl From<StreamError> for ClientError {
56 fn from(err: StreamError) -> Self {
57 ClientError::Stream(err)
58 }
59}
60
61impl From<std::io::Error> for ClientError {
62 fn from(err: std::io::Error) -> Self {
63 ClientError::Io(err.to_string())
64 }
65}
66
67#[derive(Debug)]
69struct UdpFrameTransport {
70 socket: StdUdpSocket,
71 peer: SocketAddr,
72}
73
74impl UdpFrameTransport {
75 fn new(local: SocketAddr, peer: SocketAddr) -> Result<Self, std::io::Error> {
76 let socket = StdUdpSocket::bind(local)?;
77 socket.connect(peer)?;
78 Ok(Self { socket, peer })
79 }
80}
81
82impl FrameTransport for UdpFrameTransport {
83 fn send_frame(&self, bytes: &[u8]) -> Result<(), String> {
84 self.socket
85 .send(bytes)
86 .map_err(|e| format!("udp stream send: {}", e))?;
87 Ok(())
88 }
89}
90
91#[derive(Debug)]
99pub struct AlpineClient {
100 session: AlnpSession,
101 transport: Arc<Mutex<TimeoutTransport<CborUdpTransport>>>,
102 local_addr: SocketAddr,
103 remote_addr: SocketAddr,
104 stream: Option<AlnpStream<UdpFrameTransport>>,
105 control: ControlClient,
106 keepalive_handle: Option<JoinHandle<()>>,
107}
108
109impl AlpineClient {
110 pub async fn connect(
122 local_addr: SocketAddr,
123 remote_addr: SocketAddr,
124 identity: DeviceIdentity,
125 capabilities: CapabilitySet,
126 credentials: NodeCredentials,
127 ) -> Result<Self, ClientError> {
128 let key_exchange = X25519KeyExchange::new();
129 let authenticator = crate::session::Ed25519Authenticator::new(credentials.clone());
130
131 let mut transport = TimeoutTransport::new(
132 CborUdpTransport::bind(local_addr, remote_addr, 2048).await?,
133 Duration::from_secs(3),
134 );
135 let session = AlnpSession::connect(
136 identity,
137 capabilities.clone(),
138 authenticator,
139 key_exchange,
140 HandshakeContext::default(),
141 &mut transport,
142 )
143 .await?;
144
145 let transport = Arc::new(Mutex::new(transport));
146 let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
147 transport.clone(),
148 Duration::from_secs(5),
149 session
150 .established()
151 .ok_or_else(|| ClientError::Io("session missing after handshake".into()))?
152 .session_id,
153 ));
154
155 let established = session
156 .established()
157 .ok_or_else(|| ClientError::Io("session missing after handshake".into()))?;
158 let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
159 .unwrap_or_else(|_| Uuid::new_v4());
160 let control_crypto = ControlCrypto::new(
161 session
162 .keys()
163 .ok_or_else(|| ClientError::Io("session keys missing".into()))?,
164 );
165 let control = ControlClient::new(device_uuid, established.session_id, control_crypto);
166
167 Ok(Self {
168 session,
169 transport,
170 local_addr,
171 remote_addr,
172 stream: None,
173 control,
174 keepalive_handle: Some(keepalive_handle),
175 })
176 }
177
178 #[must_use]
189 pub async fn start_stream(&mut self, profile: StreamProfile) -> Result<String, ClientError> {
190 let compiled = profile
191 .compile()
192 .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
193 self.session
194 .set_stream_profile(compiled.clone())
195 .map_err(ClientError::Handshake)?;
196 self.session.mark_streaming();
197
198 let stream_socket = UdpFrameTransport::new(self.local_addr, self.remote_addr)?;
199 let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
200 self.stream = Some(stream);
201 Ok(compiled.config_id().to_string())
202 }
203
204 #[must_use]
214 pub fn send_frame(
215 &self,
216 channel_format: ChannelFormat,
217 channels: Vec<u16>,
218 priority: u8,
219 groups: Option<HashMap<String, Vec<u16>>>,
220 metadata: Option<HashMap<String, serde_json::Value>>,
221 ) -> Result<(), ClientError> {
222 let stream = self
223 .stream
224 .as_ref()
225 .ok_or_else(|| ClientError::Io("stream not started".into()))?;
226 stream
227 .send(channel_format, channels, priority, groups, metadata)
228 .map_err(ClientError::from)
229 }
230
231 pub async fn close(mut self) {
237 self.session.close();
238 if let Some(handle) = self.keepalive_handle.take() {
239 handle.abort();
240 }
241 }
242
243 #[must_use]
252 pub fn control_envelope(
253 &self,
254 seq: u64,
255 op: ControlOp,
256 payload: Value,
257 ) -> Result<ControlEnvelope, HandshakeError> {
258 self.control.envelope(seq, op, payload)
259 }
260}