avassa-client 0.14.0

Library for integrating with the Avassa APIs
Documentation
use crate::client::URLExt as _;
use bytes::Bytes;
use futures_util::SinkExt as _;
use tokio_tungstenite::tungstenite::Message as WSMessage;

/// Protocol for service instance connections.
#[derive(serde::Serialize, Clone, Copy)]
#[expect(clippy::exhaustive_enums, reason="All protocols")]
pub enum Protocol {
    Tcp,
    Udp,
}

impl core::fmt::Display for Protocol {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match *self {
            Protocol::Tcp => write!(f, "tcp"),
            Protocol::Udp => write!(f, "udp"),
        }
    }
}

pub type Connection = (
    tokio::sync::mpsc::Sender<Bytes>,
    tokio::sync::mpsc::Receiver<Bytes>,
);

/// Establish the connection to site -> app -> svc.
#[expect(clippy::single_call_fn, reason="Don't want to inline")]
pub(crate) async fn connect(
    client: &crate::Client,
    application: &str,
    service_instance: &str,
    site: &str,
    protocol: crate::app_connect::Protocol,
    port: u16,
    ip_address: Option<core::net::IpAddr>,
) -> crate::Result<Connection> {
    let mut ws_url = url::Url::parse(&format!(
        "wss://{}/v1/state/applications/{application}/service-instances/{service_instance}/connect",
        client.get_base_url().host_port()?,
    ))?;

    ws_url
        .query_pairs_mut()
        .append_pair("site", site)
        .append_pair("protocol", &protocol.to_string())
        .append_pair("port", &port.to_string());
    if let Some(ip_address) = ip_address {
        ws_url
            .query_pairs_mut()
            .append_pair("ip-address", &ip_address.to_string());
    }

    let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(
        ws_url.to_string().parse()?,
    )
    .with_header(
        "Authorization",
        format!("Bearer {}", client.bearer_token().await),
    )
    .with_sub_protocol("v1.connect.supd");
    let tls = client.open_tls_stream().await?;
    let (ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;

    let (to_ws_tx, to_ws_rx) = tokio::sync::mpsc::channel(2);
    let (to_client_tx, to_client_rx) = tokio::sync::mpsc::channel(2);
    tokio::spawn(connect_loop(ws, to_ws_rx, to_client_tx));
    Ok((to_ws_tx, to_client_rx))
}

#[expect(clippy::let_underscore_must_use, reason="ws.close")]
#[expect(clippy::single_call_fn, reason="Don't want to inline")]
async fn connect_loop(
    mut ws: crate::volga::WebSocketStream,
    mut to_ws_rx: tokio::sync::mpsc::Receiver<Bytes>,
    to_client_tx: tokio::sync::mpsc::Sender<Bytes>,
) {
    loop {
        tokio::select! {
            to_ws = to_ws_rx.recv() => {
                let Some(to_ws) = to_ws else {
                    // Other side closed the connection
                    tracing::debug!("Client closed");
                    break;
                };
                if let Err(err) = ws.send(WSMessage::Binary(to_ws)).await {
                    tracing::error!("Failed to send data to websocket: {err}");
                    break;
                }
            }

            from_ws = crate::volga::get_binary_response(&mut ws) => {
                let from_ws = match from_ws {
                    Ok(bin) => bin,
                    Err(err) => {
                    tracing::error!("WS error: {err}");
                    break;
                    }
                };
                if let Err(err) = to_client_tx.send(from_ws).await {
                    tracing::debug!("Failed to send data to client: {err}");
                    break;
                }

            }
        };
    }
    let _: core::result::Result<_,_> = ws.close(None).await;
}