Skip to main content

avassa_client/
app_connect.rs

1use crate::client::URLExt;
2use bytes::Bytes;
3use futures_util::SinkExt;
4use tokio_tungstenite::tungstenite::Message as WSMessage;
5
6/// Protocol for service instance connections
7#[derive(serde::Serialize)]
8pub enum Protocol {
9    Tcp,
10    Udp,
11}
12
13impl std::fmt::Display for Protocol {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        match self {
16            Protocol::Tcp => write!(f, "tcp"),
17            Protocol::Udp => write!(f, "udp"),
18        }
19    }
20}
21
22pub type Connection = (
23    tokio::sync::mpsc::Sender<Bytes>,
24    tokio::sync::mpsc::Receiver<Bytes>,
25);
26
27pub(crate) async fn connect(
28    client: &crate::Client,
29    application: &str,
30    service_instance: &str,
31    site: &str,
32    protocol: crate::app_connect::Protocol,
33    port: u16,
34    ip_address: Option<std::net::IpAddr>,
35) -> crate::Result<Connection> {
36    let mut ws_url = url::Url::parse(&format!(
37        "wss://{}/v1/state/applications/{application}/service-instances/{service_instance}/connect",
38        client.base_url.host_port()?,
39    ))?;
40
41    ws_url
42        .query_pairs_mut()
43        .append_pair("site", site)
44        .append_pair("protocol", &protocol.to_string())
45        .append_pair("port", &port.to_string());
46    if let Some(ip_address) = ip_address {
47        ws_url
48            .query_pairs_mut()
49            .append_pair("ip-address", &ip_address.to_string());
50    }
51
52    let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(
53        ws_url.to_string().parse()?,
54    )
55    .with_header(
56        "Authorization",
57        format!("Bearer {}", client.bearer_token().await),
58    )
59    .with_sub_protocol("v1.connect.supd");
60    let tls = client.open_tls_stream().await?;
61    let (ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
62
63    let (to_ws_tx, to_ws_rx) = tokio::sync::mpsc::channel(2);
64    let (to_client_tx, to_client_rx) = tokio::sync::mpsc::channel(2);
65    tokio::spawn(connect_loop(ws, to_ws_rx, to_client_tx));
66    Ok((to_ws_tx, to_client_rx))
67}
68
69async fn connect_loop(
70    mut ws: crate::volga::WebSocketStream,
71    mut to_ws_rx: tokio::sync::mpsc::Receiver<Bytes>,
72    to_client_tx: tokio::sync::mpsc::Sender<Bytes>,
73) {
74    loop {
75        tokio::select! {
76            to_ws = to_ws_rx.recv() => {
77                let Some(to_ws) = to_ws else {
78                    // Other side closed the connection
79                    tracing::debug!("Client closed");
80                    break;
81                };
82                if let Err(e) = ws.send(WSMessage::Binary(to_ws)).await {
83                    tracing::error!("Failed to send data to websocket: {e}");
84                    break;
85                }
86            }
87
88            from_ws = crate::volga::get_binary_response(&mut ws) => {
89                let from_ws = match from_ws {
90                    Ok(v) => v,
91                    Err(e) => {
92                    tracing::error!("WS error: {e}");
93                    break;
94                    }
95                };
96                if let Err(e) = to_client_tx.send(from_ws).await {
97                    tracing::debug!("Failed to send data to client: {e}");
98                    break;
99                }
100
101            }
102        };
103    }
104    let _ = ws.close(None).await;
105}