avassa-client 0.11.3

Library for integrating with the Avassa APIs
Documentation
use crate::client::URLExt;
use bytes::Bytes;
use futures_util::SinkExt;
use tokio_tungstenite::tungstenite::Message as WSMessage;

/// Protocol for service instance connections
#[derive(serde::Serialize)]
pub enum Protocol {
    Tcp,
    Udp,
}

impl std::fmt::Display for Protocol {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Protocol::Tcp => write!(f, "tcp"),
            Protocol::Udp => write!(f, "udp"),
        }
    }
}

pub type Connection = (
    tokio::sync::mpsc::Sender<Bytes>,
    tokio::sync::mpsc::Receiver<Bytes>,
);

pub(crate) async fn connect(
    client: &crate::Client,
    application: &str,
    service_instance: &str,
    site: &str,
    protocol: crate::app_connect::Protocol,
    port: u16,
    ip_address: Option<std::net::IpAddr>,
) -> crate::Result<Connection> {
    let mut ws_url = url::Url::parse(&format!(
        "wss://{}/v1/state/applications/{application}/service-instances/{service_instance}/connect",
        client.base_url.host_port()?,
    ))?;

    ws_url
        .query_pairs_mut()
        .append_pair("site", site)
        .append_pair("protocol", &protocol.to_string())
        .append_pair("port", &port.to_string());
    if let Some(ip_address) = ip_address {
        ws_url
            .query_pairs_mut()
            .append_pair("ip-address", &ip_address.to_string());
    }

    let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(
        ws_url.to_string().parse()?,
    )
    .with_header(
        "Authorization",
        format!("Bearer {}", client.bearer_token().await),
    )
    .with_sub_protocol("v1.connect.supd");
    let tls = client.open_tls_stream().await?;
    let (ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;

    let (to_ws_tx, to_ws_rx) = tokio::sync::mpsc::channel(2);
    let (to_client_tx, to_client_rx) = tokio::sync::mpsc::channel(2);
    tokio::spawn(connect_loop(ws, to_ws_rx, to_client_tx));
    Ok((to_ws_tx, to_client_rx))
}

async fn connect_loop(
    mut ws: crate::volga::WebSocketStream,
    mut to_ws_rx: tokio::sync::mpsc::Receiver<Bytes>,
    to_client_tx: tokio::sync::mpsc::Sender<Bytes>,
) {
    loop {
        tokio::select! {
            to_ws = to_ws_rx.recv() => {
                let Some(to_ws) = to_ws else {
                    // Other side closed the connection
                    tracing::debug!("Client closed");
                    break;
                };
                if let Err(e) = ws.send(WSMessage::Binary(to_ws)).await {
                    tracing::error!("Failed to send data to websocket: {e}");
                    break;
                }
            }

            from_ws = crate::volga::get_binary_response(&mut ws) => {
                let from_ws = match from_ws {
                    Ok(v) => v,
                    Err(e) => {
                    tracing::error!("WS error: {e}");
                    break;
                    }
                };
                if let Err(e) = to_client_tx.send(from_ws).await {
                    tracing::debug!("Failed to send data to client: {e}");
                    break;
                }

            }
        };
    }
    let _ = ws.close(None).await;
}