mod auto;
mod context;
mod http;
mod socks;
use std::{net::SocketAddr, time::Duration};
use tokio::net::{TcpListener, TcpStream};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use self::{auto::AutoDetectServer, context::Context, http::HttpServer, socks::Socks5Server};
use crate::{AuthMode, BootArgs, Proxy, Result, connect::Connector};
pub trait Acceptor {
async fn accept(self, conn: (TcpStream, SocketAddr));
}
pub trait Server {
async fn start(self) -> std::io::Result<()>;
#[inline]
async fn incoming(listener: &mut TcpListener) -> (TcpStream, SocketAddr) {
loop {
match listener.accept().await {
Ok(conn) => return conn,
Err(err) => {
tracing::trace!("Failed to accept connection: {err}");
tokio::time::sleep(Duration::from_millis(50)).await
}
}
}
}
}
pub fn run(args: BootArgs) -> Result<()> {
let filter = EnvFilter::from_default_env()
.add_directive(args.log.into())
.add_directive("netlink_proto=error".parse()?);
tracing::subscriber::set_global_default(
FmtSubscriber::builder()
.with_max_level(args.log)
.with_env_filter(filter)
.finish(),
)?;
let workers = args
.workers
.unwrap_or(std::thread::available_parallelism()?.get());
tracing::info!("OS: {}", std::env::consts::OS);
tracing::info!("Arch: {}", std::env::consts::ARCH);
tracing::info!("Version: {}", env!("CARGO_PKG_VERSION"));
tracing::info!("Concurrent: {}", args.concurrent);
tracing::info!("Worker threads: {}", workers);
tracing::info!("Connect timeout: {:?}s", args.connect_timeout);
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()?
.block_on(async {
#[cfg(target_os = "linux")]
if let Some(cidr) = &args.cidr {
crate::route::sysctl_ipv6_no_local_bind(cidr);
crate::route::sysctl_ipv6_all_enable_ipv6(cidr);
crate::route::sysctl_route_add_cidr(cidr).await;
}
let context = move |auth: AuthMode| Context {
auth,
bind: args.bind,
concurrent: args.concurrent,
connect_timeout: args.connect_timeout,
connector: Connector::new(
args.cidr,
args.cidr_range,
args.fallback,
args.connect_timeout,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
args.tcp_user_timeout,
args.reuseaddr,
),
};
tokio::select! {
result = async {
match args.proxy {
Proxy::Http { auth } => {
HttpServer::new(context(auth))?.start().await
}
Proxy::Https { auth, tls_cert, tls_key } => {
HttpServer::new(context(auth))?
.with_https(tls_cert, tls_key)?
.start()
.await
}
Proxy::Socks5 { auth } => {
Socks5Server::new(context(auth))?.start().await
}
Proxy::Auto { auth, tls_cert, tls_key } => {
AutoDetectServer::new(context(auth), tls_cert, tls_key)?
.start()
.await
}
}
} => result,
_ = tokio::signal::ctrl_c() => {
tracing::info!("Shutdown signal received, shutting down gracefully...");
Ok(())
},
}
})?;
Ok(())
}