use crate::client::URLExt;
use bytes::Bytes;
use futures_util::SinkExt;
use tokio_tungstenite::tungstenite::Message as WSMessage;
#[derive(serde::Serialize)]
pub enum Protocol {
Tcp,
Udp,
}
impl std::fmt::Display for Protocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Protocol::Tcp => write!(f, "tcp"),
Protocol::Udp => write!(f, "udp"),
}
}
}
pub type Connection = (
tokio::sync::mpsc::Sender<Bytes>,
tokio::sync::mpsc::Receiver<Bytes>,
);
pub(crate) async fn connect(
client: &crate::Client,
application: &str,
service_instance: &str,
site: &str,
protocol: crate::app_connect::Protocol,
port: u16,
ip_address: Option<std::net::IpAddr>,
) -> crate::Result<Connection> {
let mut ws_url = url::Url::parse(&format!(
"wss://{}/v1/state/applications/{application}/service-instances/{service_instance}/connect",
client.base_url.host_port()?,
))?;
ws_url
.query_pairs_mut()
.append_pair("site", site)
.append_pair("protocol", &protocol.to_string())
.append_pair("port", &port.to_string());
if let Some(ip_address) = ip_address {
ws_url
.query_pairs_mut()
.append_pair("ip-address", &ip_address.to_string());
}
let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(
ws_url.to_string().parse()?,
)
.with_header(
"Authorization",
format!("Bearer {}", client.bearer_token().await),
)
.with_sub_protocol("v1.connect.supd");
let tls = client.open_tls_stream().await?;
let (ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
let (to_ws_tx, to_ws_rx) = tokio::sync::mpsc::channel(2);
let (to_client_tx, to_client_rx) = tokio::sync::mpsc::channel(2);
tokio::spawn(connect_loop(ws, to_ws_rx, to_client_tx));
Ok((to_ws_tx, to_client_rx))
}
async fn connect_loop(
mut ws: crate::volga::WebSocketStream,
mut to_ws_rx: tokio::sync::mpsc::Receiver<Bytes>,
to_client_tx: tokio::sync::mpsc::Sender<Bytes>,
) {
loop {
tokio::select! {
to_ws = to_ws_rx.recv() => {
let Some(to_ws) = to_ws else {
tracing::debug!("Client closed");
break;
};
if let Err(e) = ws.send(WSMessage::Binary(to_ws)).await {
tracing::error!("Failed to send data to websocket: {e}");
break;
}
}
from_ws = crate::volga::get_binary_response(&mut ws) => {
let from_ws = match from_ws {
Ok(v) => v,
Err(e) => {
tracing::error!("WS error: {e}");
break;
}
};
if let Err(e) = to_client_tx.send(from_ws).await {
tracing::debug!("Failed to send data to client: {e}");
break;
}
}
};
}
let _ = ws.close(None).await;
}