#[forbid(unsafe_code)]
#[macro_use]
extern crate log;
use fast_socks5::{
client,
server::{transfer, Socks5ServerProtocol},
util::target_addr::TargetAddr,
ReplyError, Result, Socks5Command, SocksError,
};
use std::{
collections::HashSet,
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use structopt::StructOpt;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt},
net::TcpListener,
sync::RwLock,
task,
};
#[derive(Debug, StructOpt)]
#[structopt(
name = "socks5-router",
about = "A socks5 demo 'router' proxying requests to further downstream socks5 servers."
)]
struct Opt {
#[structopt(short, long)]
pub listen_addr: String,
#[structopt(subcommand, name = "auth")] pub auth: AuthMode,
}
#[derive(StructOpt, Debug, PartialEq)]
enum AuthMode {
NoAuth,
Password {
#[structopt(short, long)]
username: String,
#[structopt(short, long)]
password: String,
},
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
spawn_socks_server().await
}
async fn spawn_socks_server() -> Result<()> {
let opt: &'static Opt = Box::leak(Box::new(Opt::from_args()));
let backends = Arc::new(RwLock::new(HashSet::new()));
let listener = TcpListener::bind(&opt.listen_addr).await?;
info!("Listen for socks connections @ {}", &opt.listen_addr);
loop {
match listener.accept().await {
Ok((socket, _client_addr)) => {
spawn_and_log_error(serve_socks5(opt, backends.clone(), socket));
}
Err(err) => {
error!("accept error = {:?}", err);
}
}
}
}
static CONN_NUM: AtomicUsize = AtomicUsize::new(0);
async fn serve_socks5(
opt: &Opt,
backends: Arc<RwLock<HashSet<String>>>,
socket: tokio::net::TcpStream,
) -> Result<(), SocksError> {
let (proto, cmd, target_addr) = match &opt.auth {
AuthMode::NoAuth => Socks5ServerProtocol::accept_no_auth(socket).await?,
AuthMode::Password { username, password } => {
Socks5ServerProtocol::accept_password_auth(socket, |user, pass| {
user == *username && pass == *password
})
.await?
.0
}
}
.read_command()
.await?;
if cmd != Socks5Command::TCPConnect {
proto.reply_error(&ReplyError::CommandNotSupported).await?;
return Err(ReplyError::CommandNotSupported.into());
}
if let TargetAddr::Domain(ref domain, _) = target_addr {
if domain == "admin.internal" {
let inner = proto
.reply_success(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0))
.await?;
return serve_admin_console(backends, inner).await;
}
}
let (target_addr, target_port) = target_addr.into_string_and_port();
let backends = backends.read().await;
let backends: Vec<_> = backends.iter().collect(); if backends.is_empty() {
warn!("No backends! Go add one using the console");
proto.reply_error(&ReplyError::NetworkUnreachable).await?;
return Ok(());
}
let n = CONN_NUM.fetch_add(1, Ordering::SeqCst);
let mut config = client::Config::default();
config.set_skip_auth(true);
let client = client::Socks5Stream::connect(
backends[n % backends.len()],
target_addr,
target_port,
config,
)
.await?;
drop(backends);
let inner = proto
.reply_success(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0))
.await?;
transfer(inner, client).await;
Ok(())
}
async fn serve_admin_console(
backends: Arc<RwLock<HashSet<String>>>,
socket: tokio::net::TcpStream,
) -> Result<(), SocksError> {
let mut stream = tokio::io::BufReader::new(socket);
stream.write_all(b"Welcome to the router admin console! Use LIST, ADD, or REMOVE commands to manage proxies.\n").await?;
let mut buf = String::with_capacity(128);
while let Ok(_) = stream.read_line(&mut buf).await {
if buf.starts_with("LIST") {
let backends = backends.read().await;
for addr in backends.iter() {
stream.write_all(addr.as_bytes()).await?;
stream.write_all(b"\n").await?;
}
} else if buf.starts_with("ADD ") {
let mut backends = backends.write().await;
if let Some(adr) = buf.strip_prefix("ADD ") {
backends.insert(adr.trim().to_owned());
}
} else if buf.starts_with("REMOVE ") {
let mut backends = backends.write().await;
if let Some(adr) = buf.strip_prefix("REMOVE ") {
backends.remove(adr.trim());
}
}
buf.clear();
}
Ok(())
}
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
task::spawn(async move {
match fut.await {
Ok(()) => {}
Err(err) => error!("{:#}", &err),
}
})
}