wsrx 0.5.16

Controlled TCP-over-WebSocket forwarding tunnel.
Documentation
use std::sync::Arc;

use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use url::Url;
use wsrx::proxy;

use crate::cli::logger::init_logger;

pub async fn launch(
    address: String, host: Option<String>, port: Option<u16>, log_json: Option<bool>,
) {
    let log_json = log_json.unwrap_or(false);
    init_logger(log_json);
    let port = port.unwrap_or(0);
    let host = host.unwrap_or(String::from("127.0.0.1"));
    let listener = TcpListener::bind(format!("{host}:{port}"))
        .await
        .expect("failed to bind port");
    let Ok(url) = Url::parse(&address) else {
        error!("Invalid url, please check your input.");
        return;
    };
    if url.scheme() != "ws" && url.scheme() != "wss" {
        error!("Invalid url scheme, only `ws` and `wss` are supported.");
        return;
    }
    let url = url.as_ref().to_string();
    info!(
        "Hi, I am not RX, RX is here -> {}",
        listener.local_addr().unwrap()
    );
    warn!(
        "wsrx will not report non-critical errors by default, you can set `RUST_LOG=wsrx=debug` to see more details."
    );

    let token = CancellationToken::new();
    let url = Arc::new(url);

    // This loop will "run forever"
    loop {
        let Ok((tcp, _)) = listener.accept().await else {
            error!("Failed to accept tcp connection, exiting.");
            token.cancel();
            return;
        };

        if token.is_cancelled() {
            return;
        }

        let url = url.clone();
        let peer_addr = tcp.peer_addr().unwrap();

        info!("CREATE remote <-wsrx-> {}", peer_addr);

        let token = token.clone();
        tokio::spawn(async move {
            match proxy_ws_addr(url.as_ref(), tcp, token).await {
                Ok(_) => {}
                Err(e) => {
                    info!("REMOVE remote <-wsrx-> {} with error", peer_addr);
                    debug!("TCP connection closed: {}", e);
                }
            }
        });
    }
}

async fn proxy_ws_addr(
    addr: impl AsRef<str>, tcp: TcpStream, token: CancellationToken,
) -> Result<(), wsrx::Error> {
    let peer_addr = tcp.peer_addr().unwrap();
    let (ws, _) = tokio_tungstenite::connect_async(addr.as_ref()).await?;
    proxy(ws.into(), tcp, token).await?;
    info!("REMOVE remote <-wsrx-> {}", peer_addr);
    Ok(())
}