1use std::collections::HashMap;
2use std::fmt;
3use std::net::SocketAddr;
4use std::net::UdpSocket as StdUdpSocket;
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::sync::Mutex;
9use tokio::task::JoinHandle;
10
11use crate::crypto::identity::NodeCredentials;
12use crate::crypto::X25519KeyExchange;
13use crate::control::{ControlClient, ControlCrypto};
14use crate::handshake::keepalive;
15use crate::handshake::transport::{CborUdpTransport, TimeoutTransport};
16use crate::handshake::{HandshakeContext, HandshakeError};
17use crate::messages::{CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity};
18use crate::profile::{CompiledStreamProfile, StreamProfile};
19use crate::session::AlnpSession;
20use crate::stream::{AlnpStream, FrameTransport, StreamError};
21use serde_json::Value;
22use uuid::Uuid;
23
24#[derive(Debug)]
29#[non_exhaustive]
30pub enum ClientError {
31 Io(String),
33 Handshake(HandshakeError),
35 Stream(StreamError),
37}
38
39
40impl fmt::Display for ClientError {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 ClientError::Io(err) => write!(f, "io error: {}", err),
44 ClientError::Handshake(err) => write!(f, "handshake error: {}", err),
45 ClientError::Stream(err) => write!(f, "stream error: {}", err),
46 }
47 }
48}
49
50impl From<HandshakeError> for ClientError {
51 fn from(err: HandshakeError) -> Self {
52 ClientError::Handshake(err)
53 }
54}
55
56impl From<StreamError> for ClientError {
57 fn from(err: StreamError) -> Self {
58 ClientError::Stream(err)
59 }
60}
61
62impl From<std::io::Error> for ClientError {
63 fn from(err: std::io::Error) -> Self {
64 ClientError::Io(err.to_string())
65 }
66}
67
68#[derive(Debug)]
70struct UdpFrameTransport {
71 socket: StdUdpSocket,
72 peer: SocketAddr,
73}
74
75impl UdpFrameTransport {
76 fn new(local: SocketAddr, peer: SocketAddr) -> Result<Self, std::io::Error> {
77 let socket = StdUdpSocket::bind(local)?;
78 socket.connect(peer)?;
79 Ok(Self { socket, peer })
80 }
81}
82
83impl FrameTransport for UdpFrameTransport {
84 fn send_frame(&self, bytes: &[u8]) -> Result<(), String> {
85 self.socket
86 .send(bytes)
87 .map_err(|e| format!("udp stream send: {}", e))?;
88 Ok(())
89 }
90}
91
92#[derive(Debug)]
100pub struct AlpineClient {
101 session: AlnpSession,
102 transport: Arc<Mutex<TimeoutTransport<CborUdpTransport>>>,
103 local_addr: SocketAddr,
104 remote_addr: SocketAddr,
105 stream: Option<AlnpStream<UdpFrameTransport>>,
106 control: ControlClient,
107 keepalive_handle: Option<JoinHandle<()>>,
108}
109
110impl AlpineClient {
111 pub async fn connect(
123 local_addr: SocketAddr,
124 remote_addr: SocketAddr,
125 identity: DeviceIdentity,
126 capabilities: CapabilitySet,
127 credentials: NodeCredentials,
128 ) -> Result<Self, ClientError> {
129 let key_exchange = X25519KeyExchange::new();
130 let authenticator = crate::session::Ed25519Authenticator::new(credentials.clone());
131
132 let mut transport =
133 TimeoutTransport::new(CborUdpTransport::bind(local_addr, remote_addr, 2048).await?, Duration::from_secs(3));
134 let session = AlnpSession::connect(
135 identity,
136 capabilities.clone(),
137 authenticator,
138 key_exchange,
139 HandshakeContext::default(),
140 &mut transport,
141 )
142 .await?;
143
144 let transport = Arc::new(Mutex::new(transport));
145 let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
146 transport.clone(),
147 Duration::from_secs(5),
148 session
149 .established()
150 .ok_or_else(|| ClientError::Io("session missing after handshake".into()))?
151 .session_id,
152 ));
153
154 let established = session
155 .established()
156 .ok_or_else(|| ClientError::Io("session missing after handshake".into()))?;
157 let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
158 .unwrap_or_else(|_| Uuid::new_v4());
159 let control_crypto = ControlCrypto::new(
160 session
161 .keys()
162 .ok_or_else(|| ClientError::Io("session keys missing".into()))?,
163 );
164 let control = ControlClient::new(device_uuid, established.session_id, control_crypto);
165
166 Ok(Self {
167 session,
168 transport,
169 local_addr,
170 remote_addr,
171 stream: None,
172 control,
173 keepalive_handle: Some(keepalive_handle),
174 })
175 }
176
177 #[must_use]
188 pub async fn start_stream(
189 &mut self,
190 profile: StreamProfile,
191 ) -> Result<String, ClientError> {
192 let compiled = profile
193 .compile()
194 .map_err(|err| HandshakeError::Protocol(err.to_string()))?;
195 self.session
196 .set_stream_profile(compiled.clone())
197 .map_err(ClientError::Handshake)?;
198 self.session.mark_streaming();
199
200 let stream_socket = UdpFrameTransport::new(self.local_addr, self.remote_addr)?;
201 let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
202 self.stream = Some(stream);
203 Ok(compiled.config_id().to_string())
204 }
205
206 #[must_use]
216 pub fn send_frame(
217 &self,
218 channel_format: ChannelFormat,
219 channels: Vec<u16>,
220 priority: u8,
221 groups: Option<HashMap<String, Vec<u16>>>,
222 metadata: Option<HashMap<String, serde_json::Value>>,
223 ) -> Result<(), ClientError> {
224 let stream = self
225 .stream
226 .as_ref()
227 .ok_or_else(|| ClientError::Io("stream not started".into()))?;
228 stream
229 .send(channel_format, channels, priority, groups, metadata)
230 .map_err(ClientError::from)
231 }
232
233 pub async fn close(mut self) {
239 self.session.close();
240 if let Some(handle) = self.keepalive_handle.take() {
241 handle.abort();
242 }
243 }
244
245 #[must_use]
254 pub fn control_envelope(
255 &self,
256 seq: u64,
257 op: ControlOp,
258 payload: Value,
259 ) -> Result<ControlEnvelope, HandshakeError> {
260 self.control.envelope(seq, op, payload)
261 }
262}