use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use url::Url;
use wsrx::proxy;
use crate::cli::logger::init_logger;
pub async fn launch(
address: String, host: Option<String>, port: Option<u16>, log_json: Option<bool>,
) {
let log_json = log_json.unwrap_or(false);
init_logger(log_json);
let port = port.unwrap_or(0);
let host = host.unwrap_or(String::from("127.0.0.1"));
let listener = TcpListener::bind(format!("{host}:{port}"))
.await
.expect("failed to bind port");
let Ok(url) = Url::parse(&address) else {
error!("Invalid url, please check your input.");
return;
};
if url.scheme() != "ws" && url.scheme() != "wss" {
error!("Invalid url scheme, only `ws` and `wss` are supported.");
return;
}
let url = url.as_ref().to_string();
info!(
"Hi, I am not RX, RX is here -> {}",
listener.local_addr().unwrap()
);
warn!(
"wsrx will not report non-critical errors by default, you can set `RUST_LOG=wsrx=debug` to see more details."
);
let token = CancellationToken::new();
let url = Arc::new(url);
loop {
let Ok((tcp, _)) = listener.accept().await else {
error!("Failed to accept tcp connection, exiting.");
token.cancel();
return;
};
if token.is_cancelled() {
return;
}
let url = url.clone();
let peer_addr = tcp.peer_addr().unwrap();
info!("CREATE remote <-wsrx-> {}", peer_addr);
let token = token.clone();
tokio::spawn(async move {
match proxy_ws_addr(url.as_ref(), tcp, token).await {
Ok(_) => {}
Err(e) => {
info!("REMOVE remote <-wsrx-> {} with error", peer_addr);
debug!("TCP connection closed: {}", e);
}
}
});
}
}
async fn proxy_ws_addr(
addr: impl AsRef<str>, tcp: TcpStream, token: CancellationToken,
) -> Result<(), wsrx::Error> {
let peer_addr = tcp.peer_addr().unwrap();
let (ws, _) = tokio_tungstenite::connect_async(addr.as_ref()).await?;
proxy(ws.into(), tcp, token).await?;
info!("REMOVE remote <-wsrx-> {}", peer_addr);
Ok(())
}