use anyhow::{anyhow, Context as _};
use boring::pkey::PKey;
use boring::ssl::{SslConnector, SslMethod};
use boring::x509::store::X509StoreBuilder;
use boring::x509::X509;
use client::ProxyClient;
use futures_util::future::BoxFuture;
use http::header::HOST;
use hyper::{upgrade, Version};
use hyper_boring::v1::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::rt::TokioIo;
use socks5_server::connection::connect::state::NeedReply;
use socks5_server::connection::state::NeedAuthenticate;
use socks5_server::{Connect, IncomingConnection};
use std::fs::File;
use std::io::{self, Read};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt as _;
use tokio::net::TcpListener;
use tokio::task;
use tracing::field::Empty;
use tracing::{info_span, Instrument};
use url::Url;
mod client;
pub struct Config {
pub proxy: Url,
pub geohash: String,
pub request_timeout: Option<u64>,
pub masque_preshared_key: Option<String>,
pub proxy_ca: Option<String>,
pub client_cert: Option<String>,
pub client_key: Option<String>,
}
pub async fn start(
config: Config,
listen_addr: &str,
) -> anyhow::Result<BoxFuture<'static, anyhow::Result<()>>> {
let listener = TcpListener::bind(listen_addr).await?;
start_with_listener(config, listener)
}
pub fn start_with_listener(
mut config: Config,
listener: TcpListener,
) -> anyhow::Result<BoxFuture<'static, anyhow::Result<()>>> {
tracing::info!(
"Listen for socks connections @ {}",
listener.local_addr().unwrap()
);
let connector = {
let mut http = HttpConnector::new();
http.enforce_http(false);
let mut ssl = SslConnector::builder(SslMethod::tls())?;
ssl.set_alpn_protos(b"\x02h2")?;
if let Some(proxy_ca) = &config.proxy_ca {
let mut builder = X509StoreBuilder::new()?;
builder.add_cert(X509::from_pem(&load_file(proxy_ca)?)?)?;
ssl.set_verify_cert_store(builder.build())?;
}
match (config.client_cert.take(), config.client_key.take()) {
(None, None) => {}
(None, Some(_)) => anyhow::bail!("client cert is missing"),
(Some(_), None) => anyhow::bail!("client key is missing"),
(Some(client_cert), Some(client_key)) => {
ssl.set_certificate(&*X509::from_pem(client_cert.as_ref())?)?;
ssl.set_private_key(&*PKey::private_key_from_pem(client_key.as_ref())?)?;
}
}
HttpsConnector::with_connector(http, ssl)?
};
let client = ProxyClient::new(connector, config.proxy.as_str().parse().unwrap());
let server = socks5_server::Server::new(listener, Arc::new(socks5_server::auth::NoAuth));
Ok(Box::pin(serve(Arc::new(config), client, server)))
}
async fn serve(
opt: Arc<Config>,
client: ProxyClient,
server: socks5_server::Server<()>,
) -> anyhow::Result<()> {
let mut id = 0;
while let Ok((conn, peer)) = server.accept().await {
let opt = Arc::clone(&opt);
let client = client.clone();
task::spawn(
async move {
match serve_socks5(id, conn, opt, client).await {
Ok(()) => {}
Err(err) => tracing::error!("failed to serve socks5 connect {:#}", &err),
}
}
.instrument(info_span!("connection", ?peer)),
);
id += 1;
}
Ok(())
}
#[tracing::instrument(skip(socket, opt, client), fields(geohash, target, scid))]
async fn serve_socks5(
id: usize,
socket: IncomingConnection<(), NeedAuthenticate>,
opt: Arc<Config>,
client: ProxyClient,
) -> anyhow::Result<()> {
let (socket, ()) = socket.authenticate().await.map_err(fst)?;
let command = socket.wait().await.map_err(fst)?;
let (connect, address) = match command {
socks5_server::Command::Connect(connect, address) => (connect, address),
socks5_server::Command::Associate(associate, address) => {
return associate
.reply(socks5_proto::Reply::CommandNotSupported, address)
.await
.map(|_| ())
.map_err(fst)
.context("failed to reply");
}
socks5_server::Command::Bind(bind, address) => {
return bind
.reply(socks5_proto::Reply::CommandNotSupported, address)
.await
.map(|_| ())
.map_err(fst)
.context("failed to reply");
}
};
let target = match &address {
socks5_proto::Address::SocketAddress(socket_addr) => format!("{socket_addr}"),
socks5_proto::Address::DomainAddress(vec, port) => {
format!("{}:{port}", std::str::from_utf8(vec)?)
}
};
tracing::Span::current()
.record("geohash", &opt.geohash)
.record("target", &target);
tracing::debug!("proxying over H2");
proxy_h2(opt, client, connect, address, &target).await?;
Ok(())
}
async fn proxy_h2(
config: Arc<Config>,
client: ProxyClient,
connect: Connect<NeedReply>,
address: socks5_proto::Address,
target: &str,
) -> Result<(), anyhow::Error> {
let proxy = async {
let mut request = hyper::Request::connect(target)
.version(Version::HTTP_11)
.header(HOST.as_str(), target)
.header("sec-ch-geohash", &config.geohash);
if let Some(preshared_key) = &config.masque_preshared_key {
request = request.header(
"Proxy-Authorization", format!("Preshared {preshared_key}"),
);
}
let request = request
.body(<http_body_util::Empty<&[u8]>>::new())
.unwrap();
tracing::debug!("sending H2 CONNECT request");
let response = tokio::time::timeout(
Duration::from_secs(config.request_timeout.unwrap_or(u64::MAX)),
client.request(request)
)
.await.inspect_err(|err| {
tracing::error!("CONNECT request timed out: {err}");
})??;
tracing::info!(headers = ?response.headers(), status = %response.status(), "connected to proxy");
anyhow::ensure!(
response.status().is_success(),
"proxy connection failed with status: {}",
response.status()
);
tracing::debug!("upgrading connection");
let stream = upgrade::on(response).await?;
Ok(stream)
}
.instrument(info_span!("connecting to proxy", "scid" = Empty))
.await;
let mut stream = match proxy {
Ok(stream) => TokioIo::new(stream),
Err(e) => {
tracing::error!(error = ?e, "failed to connect to proxy");
return connect
.reply(socks5_proto::Reply::GeneralFailure, address)
.await
.map_err(fst)
.map(|_| ())
.context("failed to reply");
}
};
tracing::trace!("sending socks5 success response");
let mut ready = connect
.reply(socks5_proto::Reply::Succeeded, address)
.await
.map_err(fst)?;
tracing::debug!("copying bytes between socks5 connection and H3 CONNECT");
let (body_read, ready_read) =
tokio::io::copy_bidirectional(&mut stream, ready.get_mut()).await?;
tracing::debug!(
bytes_sent_upstream = ready_read,
bytes_send_downstream = body_read,
"shutting down proxy task"
);
async move { stream.shutdown().await.map_err(|e| anyhow!("{e}")) }
.in_current_span()
.await?;
Ok(())
}
fn fst<A, B>((a, _): (A, B)) -> A {
a
}
fn load_file<P: AsRef<Path> + Copy>(path: P) -> io::Result<Vec<u8>> {
let mut buf = vec![];
File::open(path).and_then(|mut f| f.read_to_end(&mut buf))?;
Ok(buf)
}