use crate::client::URLExt as _;
use bytes::Bytes;
use futures_util::SinkExt as _;
use tokio_tungstenite::tungstenite::Message as WSMessage;
#[derive(serde::Serialize, Clone, Copy)]
#[expect(clippy::exhaustive_enums, reason="All protocols")]
pub enum Protocol {
Tcp,
Udp,
}
impl core::fmt::Display for Protocol {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match *self {
Protocol::Tcp => write!(f, "tcp"),
Protocol::Udp => write!(f, "udp"),
}
}
}
pub type Connection = (
tokio::sync::mpsc::Sender<Bytes>,
tokio::sync::mpsc::Receiver<Bytes>,
);
#[expect(clippy::single_call_fn, reason="Don't want to inline")]
pub(crate) async fn connect(
client: &crate::Client,
application: &str,
service_instance: &str,
site: &str,
protocol: crate::app_connect::Protocol,
port: u16,
ip_address: Option<core::net::IpAddr>,
) -> crate::Result<Connection> {
let mut ws_url = url::Url::parse(&format!(
"wss://{}/v1/state/applications/{application}/service-instances/{service_instance}/connect",
client.get_base_url().host_port()?,
))?;
ws_url
.query_pairs_mut()
.append_pair("site", site)
.append_pair("protocol", &protocol.to_string())
.append_pair("port", &port.to_string());
if let Some(ip_address) = ip_address {
ws_url
.query_pairs_mut()
.append_pair("ip-address", &ip_address.to_string());
}
let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(
ws_url.to_string().parse()?,
)
.with_header(
"Authorization",
format!("Bearer {}", client.bearer_token().await),
)
.with_sub_protocol("v1.connect.supd");
let tls = client.open_tls_stream().await?;
let (ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
let (to_ws_tx, to_ws_rx) = tokio::sync::mpsc::channel(2);
let (to_client_tx, to_client_rx) = tokio::sync::mpsc::channel(2);
tokio::spawn(connect_loop(ws, to_ws_rx, to_client_tx));
Ok((to_ws_tx, to_client_rx))
}
#[expect(clippy::let_underscore_must_use, reason="ws.close")]
#[expect(clippy::single_call_fn, reason="Don't want to inline")]
async fn connect_loop(
mut ws: crate::volga::WebSocketStream,
mut to_ws_rx: tokio::sync::mpsc::Receiver<Bytes>,
to_client_tx: tokio::sync::mpsc::Sender<Bytes>,
) {
loop {
tokio::select! {
to_ws = to_ws_rx.recv() => {
let Some(to_ws) = to_ws else {
tracing::debug!("Client closed");
break;
};
if let Err(err) = ws.send(WSMessage::Binary(to_ws)).await {
tracing::error!("Failed to send data to websocket: {err}");
break;
}
}
from_ws = crate::volga::get_binary_response(&mut ws) => {
let from_ws = match from_ws {
Ok(bin) => bin,
Err(err) => {
tracing::error!("WS error: {err}");
break;
}
};
if let Err(err) = to_client_tx.send(from_ws).await {
tracing::debug!("Failed to send data to client: {err}");
break;
}
}
};
}
let _: core::result::Result<_,_> = ws.close(None).await;
}