avassa_client/
app_connect.rs1use crate::client::URLExt;
2use bytes::Bytes;
3use futures_util::SinkExt;
4use tokio_tungstenite::tungstenite::Message as WSMessage;
5
6#[derive(serde::Serialize)]
8pub enum Protocol {
9 Tcp,
10 Udp,
11}
12
13impl std::fmt::Display for Protocol {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 match self {
16 Protocol::Tcp => write!(f, "tcp"),
17 Protocol::Udp => write!(f, "udp"),
18 }
19 }
20}
21
22pub type Connection = (
23 tokio::sync::mpsc::Sender<Bytes>,
24 tokio::sync::mpsc::Receiver<Bytes>,
25);
26
27pub(crate) async fn connect(
28 client: &crate::Client,
29 application: &str,
30 service_instance: &str,
31 site: &str,
32 protocol: crate::app_connect::Protocol,
33 port: u16,
34 ip_address: Option<std::net::IpAddr>,
35) -> crate::Result<Connection> {
36 let mut ws_url = url::Url::parse(&format!(
37 "wss://{}/v1/state/applications/{application}/service-instances/{service_instance}/connect",
38 client.base_url.host_port()?,
39 ))?;
40
41 ws_url
42 .query_pairs_mut()
43 .append_pair("site", site)
44 .append_pair("protocol", &protocol.to_string())
45 .append_pair("port", &port.to_string());
46 if let Some(ip_address) = ip_address {
47 ws_url
48 .query_pairs_mut()
49 .append_pair("ip-address", &ip_address.to_string());
50 }
51
52 let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(
53 ws_url.to_string().parse()?,
54 )
55 .with_header(
56 "Authorization",
57 format!("Bearer {}", client.bearer_token().await),
58 )
59 .with_sub_protocol("v1.connect.supd");
60 let tls = client.open_tls_stream().await?;
61 let (ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
62
63 let (to_ws_tx, to_ws_rx) = tokio::sync::mpsc::channel(2);
64 let (to_client_tx, to_client_rx) = tokio::sync::mpsc::channel(2);
65 tokio::spawn(connect_loop(ws, to_ws_rx, to_client_tx));
66 Ok((to_ws_tx, to_client_rx))
67}
68
69async fn connect_loop(
70 mut ws: crate::volga::WebSocketStream,
71 mut to_ws_rx: tokio::sync::mpsc::Receiver<Bytes>,
72 to_client_tx: tokio::sync::mpsc::Sender<Bytes>,
73) {
74 loop {
75 tokio::select! {
76 to_ws = to_ws_rx.recv() => {
77 let Some(to_ws) = to_ws else {
78 tracing::debug!("Client closed");
80 break;
81 };
82 if let Err(e) = ws.send(WSMessage::Binary(to_ws)).await {
83 tracing::error!("Failed to send data to websocket: {e}");
84 break;
85 }
86 }
87
88 from_ws = crate::volga::get_binary_response(&mut ws) => {
89 let from_ws = match from_ws {
90 Ok(v) => v,
91 Err(e) => {
92 tracing::error!("WS error: {e}");
93 break;
94 }
95 };
96 if let Err(e) = to_client_tx.send(from_ws).await {
97 tracing::debug!("Failed to send data to client: {e}");
98 break;
99 }
100
101 }
102 };
103 }
104 let _ = ws.close(None).await;
105}