use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use alpine::control::{ControlClient, ControlCrypto};
use alpine::crypto::identity::NodeCredentials;
use alpine::crypto::X25519KeyExchange;
use alpine::handshake::keepalive;
use alpine::handshake::transport::{CborUdpTransport, ReliableControlChannel, TimeoutTransport};
use alpine::handshake::{HandshakeContext, HandshakeError, HandshakeMessage, HandshakeTransport};
use alpine::messages::{
Acknowledge, CapabilitySet, ChannelFormat, ControlEnvelope, ControlOp, DeviceIdentity,
};
use alpine::profile::StreamProfile;
use alpine::session::{AlnpSession, Ed25519Authenticator};
use alpine::stream::AlnpStream;
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde_cbor;
use serde_json::{json, Value};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{info, warn};
use uuid::Uuid;
use crate::transport::UdpFrameTransport;
use crate::{
error::AlpineSdkError,
phase::{self, Phase},
};
use tokio::net::UdpSocket;
type WireTransport = InstrumentedTransport<TimeoutTransport<CborUdpTransport>>;
#[derive(Debug)]
pub struct AlpineClient {
session: AlnpSession,
_transport: Arc<Mutex<WireTransport>>,
udp_socket: Arc<UdpSocket>,
local_addr: SocketAddr,
remote_addr: SocketAddr,
stream: Option<AlnpStream<UdpFrameTransport>>,
control: ControlClient,
keepalive_handle: Option<JoinHandle<()>>,
}
#[derive(Debug)]
pub struct ControlReply<T> {
pub ack: Acknowledge,
pub payload: Option<T>,
}
impl<T> ControlReply<T> {
pub fn ok(&self) -> bool {
self.ack.ok
}
pub fn detail(&self) -> Option<&str> {
self.ack.detail.as_deref()
}
}
#[derive(Debug, Deserialize)]
pub struct PingReply {
#[serde(default)]
pub timestamp_ms: Option<u64>,
#[serde(default)]
pub message: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct StatusReply {
#[serde(default)]
pub healthy: Option<bool>,
#[serde(default)]
pub detail: Option<String>,
#[serde(default)]
pub uptime_secs: Option<u64>,
}
#[derive(Debug, Deserialize)]
pub struct HealthReply {
#[serde(default)]
pub healthy: Option<bool>,
#[serde(default)]
pub detail: Option<String>,
#[serde(default)]
pub metrics: Option<HashMap<String, Value>>,
}
pub type IdentityReply = DeviceIdentity;
#[derive(Debug, Deserialize)]
pub struct MetadataReply {
#[serde(default)]
pub metadata: HashMap<String, Value>,
}
impl AlpineClient {
pub async fn connect(
local_addr: SocketAddr,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
) -> Result<Self, AlpineSdkError> {
Self::connect_with_context(
local_addr,
remote_addr,
identity,
capabilities,
credentials,
HandshakeContext::default(),
)
.await
}
pub async fn connect_with_nonce(
local_addr: SocketAddr,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
client_nonce: Vec<u8>,
) -> Result<Self, AlpineSdkError> {
Self::connect_with_context(
local_addr,
remote_addr,
identity,
capabilities,
credentials,
HandshakeContext::default().with_client_nonce(client_nonce),
)
.await
}
pub async fn connect_with_context_and_nonce(
local_addr: SocketAddr,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
client_nonce: Vec<u8>,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
Self::connect_with_context(
local_addr,
remote_addr,
identity,
capabilities,
credentials,
context.with_client_nonce(client_nonce),
)
.await
}
pub async fn connect_with_socket_and_nonce(
socket: UdpSocket,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
client_nonce: Vec<u8>,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
Self::connect_with_socket(
socket,
remote_addr,
identity,
capabilities,
credentials,
context.with_client_nonce(client_nonce),
)
.await
}
async fn connect_with_context(
local_addr: SocketAddr,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
let _handshake_phase = phase::claim_handshake()?;
let base_transport =
CborUdpTransport::bind(local_addr, remote_addr, 4096, context.debug_cbor).await?;
Self::connect_with_transport(base_transport, identity, capabilities, credentials, context)
.await
}
async fn connect_with_socket(
socket: UdpSocket,
remote_addr: SocketAddr,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
let base_transport =
CborUdpTransport::from_socket(socket, remote_addr, 4096, context.debug_cbor)
.map_err(AlpineSdkError::from)?;
Self::connect_with_transport(base_transport, identity, capabilities, credentials, context)
.await
}
async fn connect_with_transport(
base_transport: CborUdpTransport,
identity: DeviceIdentity,
capabilities: CapabilitySet,
credentials: NodeCredentials,
context: HandshakeContext,
) -> Result<Self, AlpineSdkError> {
let udp_socket = base_transport.socket();
let bound_local_addr = base_transport.local_addr();
let peer_addr = base_transport.peer_addr();
info!(
"[ALPINE][HANDSHAKE] starting handshake for device {} local_addr={} remote_addr={} phase={}",
identity.device_id,
bound_local_addr,
peer_addr,
phase::current_phase().label()
);
let mut transport = InstrumentedTransport::new(
TimeoutTransport::new(base_transport, context.recv_timeout),
bound_local_addr,
peer_addr,
);
info!(
"[ALPINE][HANDSHAKE] transport ready recv_timeout_ms={}",
context.recv_timeout.as_millis()
);
let session = AlnpSession::connect(
identity,
capabilities.clone(),
Ed25519Authenticator::new(credentials.clone()),
X25519KeyExchange::new(),
context,
&mut transport,
)
.await?;
let transport = Arc::new(Mutex::new(transport));
let established = session
.established()
.ok_or_else(|| AlpineSdkError::Io("session missing after handshake".into()))?
.clone();
let keepalive_handle = tokio::spawn(keepalive::spawn_keepalive(
transport.clone(),
Duration::from_secs(5),
established.session_id.clone(),
));
let device_uuid = Uuid::parse_str(&established.device_identity.device_id)
.unwrap_or_else(|_| Uuid::new_v4());
let control_crypto = ControlCrypto::new(
session
.keys()
.ok_or_else(|| AlpineSdkError::Io("session keys missing".into()))?,
);
let control =
ControlClient::new(device_uuid, established.session_id.clone(), control_crypto);
info!(
"[ALPINE][HANDSHAKE] session established session_id={} local_addr={} remote_addr={}",
established.session_id, bound_local_addr, peer_addr
);
Ok(Self {
session,
_transport: transport,
udp_socket,
local_addr: bound_local_addr,
remote_addr: peer_addr,
stream: None,
control,
keepalive_handle: Some(keepalive_handle),
})
}
pub fn start_stream(&mut self, profile: StreamProfile) -> Result<String, AlpineSdkError> {
let compiled = profile
.compile()
.map_err(|err| HandshakeError::Protocol(err.to_string()))?;
self.session
.set_stream_profile(compiled.clone())
.map_err(|err| AlpineSdkError::HandshakeFailed(err.to_string()))?;
self.session.mark_streaming();
let stream_local = SocketAddr::new(self.local_addr.ip(), 0);
let stream_socket =
UdpFrameTransport::new(stream_local, self.remote_addr).map_err(AlpineSdkError::from)?;
let stream = AlnpStream::new(self.session.clone(), stream_socket, compiled.clone());
self.stream = Some(stream);
Ok(compiled.config_id().to_string())
}
pub fn send_frame(
&self,
channel_format: ChannelFormat,
channels: Vec<u16>,
priority: u8,
groups: Option<HashMap<String, Vec<u16>>>,
metadata: Option<HashMap<String, Value>>,
) -> Result<(), AlpineSdkError> {
let stream = self
.stream
.as_ref()
.ok_or_else(|| AlpineSdkError::Io("stream not started".into()))?;
stream
.send(channel_format, channels, priority, groups, metadata)
.map_err(AlpineSdkError::from)
}
pub async fn close(mut self) {
self.session.close();
if let Some(handle) = self.keepalive_handle.take() {
handle.abort();
}
}
pub fn control_envelope(
&self,
seq: u64,
op: ControlOp,
payload: Value,
) -> Result<ControlEnvelope, HandshakeError> {
self.control.envelope(seq, op, payload)
}
pub async fn ping(&self) -> Result<ControlReply<PingReply>, AlpineSdkError> {
self.control_command("ping").await
}
pub async fn status(&self) -> Result<ControlReply<StatusReply>, AlpineSdkError> {
self.control_command("status").await
}
pub async fn health(&self) -> Result<ControlReply<HealthReply>, AlpineSdkError> {
self.control_command("health").await
}
pub async fn identity(&self) -> Result<ControlReply<IdentityReply>, AlpineSdkError> {
self.control_command("identity").await
}
pub async fn metadata(&self) -> Result<ControlReply<MetadataReply>, AlpineSdkError> {
self.control_command("metadata").await
}
async fn control_command<T>(&self, command: &str) -> Result<ControlReply<T>, AlpineSdkError>
where
T: DeserializeOwned,
{
let payload = json!({ "command": command });
self.control_request(ControlOp::Vendor, payload).await
}
async fn control_request<T>(
&self,
op: ControlOp,
payload: Value,
) -> Result<ControlReply<T>, AlpineSdkError>
where
T: DeserializeOwned,
{
let transport = SharedTransport::new(self._transport.clone());
let mut channel = ReliableControlChannel::new(transport);
let ack = self.control.send(&mut channel, op, payload).await?;
let parsed = ControlCrypto::decode_ack_payload::<T>(ack.payload.as_deref())
.map_err(AlpineSdkError::from)?;
Ok(ControlReply {
ack,
payload: parsed,
})
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
pub fn udp_socket(&self) -> Arc<UdpSocket> {
self.udp_socket.clone()
}
pub fn session_id(&self) -> Option<String> {
self.session
.established()
.map(|established| established.session_id.clone())
}
}
#[derive(Clone)]
struct SharedTransport<T> {
inner: Arc<Mutex<T>>,
}
impl<T> SharedTransport<T> {
fn new(inner: Arc<Mutex<T>>) -> Self {
Self { inner }
}
}
#[async_trait]
impl<T> HandshakeTransport for SharedTransport<T>
where
T: HandshakeTransport + Send,
{
async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
let mut guard = self.inner.lock().await;
guard.send(msg).await
}
async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
let mut guard = self.inner.lock().await;
guard.recv().await
}
}
#[derive(Debug)]
struct InstrumentedTransport<T> {
inner: T,
local_addr: SocketAddr,
remote_addr: SocketAddr,
session_init_sent: bool,
}
impl<T> InstrumentedTransport<T> {
fn new(inner: T, local_addr: SocketAddr, remote_addr: SocketAddr) -> Self {
Self {
inner,
local_addr,
remote_addr,
session_init_sent: false,
}
}
}
#[async_trait]
impl<T> HandshakeTransport for InstrumentedTransport<T>
where
T: HandshakeTransport + Send,
{
async fn send(&mut self, msg: HandshakeMessage) -> Result<(), HandshakeError> {
let category = message_category(&msg);
let phase_state = phase::current_phase();
if matches!(msg, HandshakeMessage::SessionInit(_)) {
if self.session_init_sent {
warn!(
"[ALPINE][HANDSHAKE][WARN] duplicate SessionInit send attempt detected local_port={} remote_addr={} phase={}",
self.local_addr.port(),
self.remote_addr,
phase_state.label()
);
return Err(HandshakeError::Protocol(
"duplicate SessionInit blocked; reset handshake first".into(),
));
}
self.session_init_sent = true;
}
if phase_state == Phase::Handshake && category != MessageCategory::Handshake {
warn!(
"[ALPINE][BUG] non-handshake packet emitted during handshake msg_type={} local_port={} remote_addr={}",
category.as_str(),
self.local_addr.port(),
self.remote_addr
);
}
let encoded_len = serde_cbor::to_vec(&msg).map(|buf| buf.len()).unwrap_or(0);
info!(
"[ALPINE][TX] msg_type={} variant={} local_port={} remote_addr={} phase={} len={}",
category.as_str(),
message_label(&msg),
self.local_addr.port(),
self.remote_addr,
phase_state.label(),
encoded_len
);
self.inner.send(msg).await
}
async fn recv(&mut self) -> Result<HandshakeMessage, HandshakeError> {
let phase_state = phase::current_phase();
let result = self.inner.recv().await;
match &result {
Ok(msg) => {
let category = message_category(msg);
info!(
"[ALPINE][RX] msg_type={} variant={} local_port={} remote_addr={} phase={}",
category.as_str(),
message_label(msg),
self.local_addr.port(),
self.remote_addr,
phase_state.label()
);
}
Err(err) => {
warn!(
"[ALPINE][RX] recv error local_port={} remote_addr={} phase={} error={}",
self.local_addr.port(),
self.remote_addr,
phase_state.label(),
err
);
}
}
result
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(dead_code)]
enum MessageCategory {
Handshake,
Control,
Keepalive,
Ack,
Unknown,
}
impl MessageCategory {
fn as_str(&self) -> &'static str {
match self {
MessageCategory::Handshake => "Handshake",
MessageCategory::Control => "Control",
MessageCategory::Keepalive => "Keepalive",
MessageCategory::Ack => "Ack",
MessageCategory::Unknown => "Unknown",
}
}
}
fn message_category(msg: &HandshakeMessage) -> MessageCategory {
match msg {
HandshakeMessage::SessionInit(_)
| HandshakeMessage::SessionAck(_)
| HandshakeMessage::SessionReady(_)
| HandshakeMessage::SessionComplete(_)
| HandshakeMessage::SessionEstablished(_) => MessageCategory::Handshake,
HandshakeMessage::Control(_) => MessageCategory::Control,
HandshakeMessage::Keepalive(_) => MessageCategory::Keepalive,
HandshakeMessage::Ack(_) => MessageCategory::Ack,
}
}
fn message_label(msg: &HandshakeMessage) -> &'static str {
match msg {
HandshakeMessage::SessionInit(_) => "SessionInit",
HandshakeMessage::SessionAck(_) => "SessionAck",
HandshakeMessage::SessionReady(_) => "SessionReady",
HandshakeMessage::SessionComplete(_) => "SessionComplete",
HandshakeMessage::SessionEstablished(_) => "SessionEstablished",
HandshakeMessage::Control(_) => "Control",
HandshakeMessage::Keepalive(_) => "Keepalive",
HandshakeMessage::Ack(_) => "Ack",
}
}