use std::io;
use std::net::SocketAddr;
use std::rc::Rc;
use futures::{self, Future};
use futures::stream::Stream;
use tokio_core::net::{TcpStream, TcpListener};
use tokio_io::AsyncRead;
use tokio_io::io::{ReadHalf, WriteHalf};
use tokio_io::io::flush;
use net2::TcpBuilder;
use config::ServerConfig;
use relay::socks5::{self, HandshakeRequest, HandshakeResponse, Address};
use relay::socks5::{TcpRequestHeader, TcpResponseHeader};
use relay::loadbalancing::server::RoundRobin;
use relay::loadbalancing::server::LoadBalancer;
use relay::{BoxIoFuture, boxed_future};
use relay::tcprelay::crypto_io::{EncryptedWrite, DecryptedRead};
use relay::Context;
use super::{tunnel, ignore_until_end, try_timeout};
#[derive(Debug, Clone)]
struct UdpConfig {
enable_udp: bool,
client_addr: Rc<SocketAddr>,
}
fn handle_socks5_connect((r, w): (ReadHalf<TcpStream>, WriteHalf<TcpStream>), client_addr: SocketAddr, addr: Address, svr_cfg: Rc<ServerConfig>) -> BoxIoFuture<()> {
let cloned_addr = addr.clone();
let cloned_svr_cfg = svr_cfg.clone();
let timeout = *svr_cfg.timeout();
let fut = super::connect_proxy_server(svr_cfg)
.then(move |res| {
match res {
Ok(svr_s) => {
trace!("Proxy server connected");
let header = TcpResponseHeader::new(socks5::Reply::Succeeded,
Address::SocketAddress(client_addr));
trace!("Send header: {:?}", header);
let fut = Context::with(|ctx| {
let handle = ctx.handle();
let fut = try_timeout(header.write_to(w), timeout, &handle);
try_timeout(fut.and_then(flush), timeout, &handle)
});
boxed_future(fut.map(move |w| (svr_s, w)))
}
Err(err) => {
use std::io::ErrorKind;
use relay::socks5::Reply;
error!("Failed to connect remote server, {:?}", err);
let reply = match err.kind() {
ErrorKind::ConnectionRefused => Reply::ConnectionRefused,
ErrorKind::ConnectionAborted => Reply::HostUnreachable,
_ => Reply::NetworkUnreachable,
};
let header = TcpResponseHeader::new(reply, Address::SocketAddress(client_addr));
trace!("Send header: {:?}", header);
let fut = Context::with(|ctx| {
let handle = ctx.handle();
let fut = try_timeout(header.write_to(w), timeout, &handle);
try_timeout(fut.and_then(flush), timeout, &handle)
});
boxed_future(fut.and_then(|_| Err(err)))
}
}
})
.and_then(move |(svr_s, w)| {
let svr_cfg = cloned_svr_cfg;
let timeout = *svr_cfg.timeout();
super::proxy_server_handshake(svr_s, svr_cfg, addr).and_then(move |(svr_r, svr_w)| {
let cloned_timeout = timeout;
let rhalf = svr_r.and_then(move |svr_r| svr_r.copy_timeout_opt(w, timeout));
let whalf = svr_w.and_then(move |svr_w| svr_w.copy_timeout_opt(r, cloned_timeout));
tunnel(cloned_addr, whalf, rhalf)
})
});
Box::new(fut)
}
fn handle_socks5_client(s: TcpStream, conf: Rc<ServerConfig>, udp_conf: UdpConfig) -> io::Result<()> {
let client_addr = try!(s.peer_addr());
let cloned_client_addr = client_addr;
let fut = futures::lazy(|| Ok(s.split()))
.and_then(|(r, w)| {
HandshakeRequest::read_from(r).and_then(move |(r, req)| {
trace!("Socks5 {:?}", req);
if !req.methods.contains(&socks5::SOCKS5_AUTH_METHOD_NONE) {
let resp = HandshakeResponse::new(socks5::SOCKS5_AUTH_METHOD_NOT_ACCEPTABLE);
let fut = resp.write_to(w)
.then(|_| {
warn!("Currently shadowsocks-rust does not support authentication");
Err(io::Error::new(io::ErrorKind::Other,
"Currently shadowsocks-rust does not support authentication"))
});
boxed_future(fut)
} else {
let resp = HandshakeResponse::new(socks5::SOCKS5_AUTH_METHOD_NONE);
trace!("Reply handshake {:?}", resp);
let fut = resp.write_to(w).and_then(flush).and_then(|w| Ok((r, w)));
boxed_future(fut)
}
})
})
.and_then(move |(r, w)| {
TcpRequestHeader::read_from(r).then(move |res| match res {
Ok((r, h)) => boxed_future(futures::finished((r, w, h))),
Err(err) => {
error!("Failed to get TcpRequestHeader: {}", err);
let fut = TcpResponseHeader::new(err.reply, Address::SocketAddress(client_addr))
.write_to(w)
.then(|_| Err(From::from(err)));
boxed_future(fut)
}
})
})
.and_then(move |(r, w, header)| {
trace!("Socks5 {:?}", header);
let addr = header.address;
match header.command {
socks5::Command::TcpConnect => {
info!("CONNECT {}", addr);
handle_socks5_connect((r, w), cloned_client_addr, addr, conf)
}
socks5::Command::TcpBind => {
warn!("BIND is not supported");
let fut = TcpResponseHeader::new(socks5::Reply::CommandNotSupported, addr)
.write_to(w)
.map(|_| ());
boxed_future(fut)
}
socks5::Command::UdpAssociate => {
if udp_conf.enable_udp {
info!("UDP ASSOCIATE {}", addr);
let fut = TcpResponseHeader::new(socks5::Reply::Succeeded, From::from(*udp_conf.client_addr))
.write_to(w)
.and_then(flush)
.and_then(|_| {
ignore_until_end(r).map(|_| ())
});
boxed_future(fut)
} else {
warn!("UDP Associate is not enabled");
let fut = TcpResponseHeader::new(socks5::Reply::CommandNotSupported, addr)
.write_to(w)
.map(|_| ());
boxed_future(fut)
}
}
}
});
Context::with(|ctx| {
let handle = &ctx.handle;
handle.spawn(fut.then(|res| match res {
Ok(..) => Ok(()),
Err(err) => {
if err.kind() != io::ErrorKind::BrokenPipe {
error!("Failed to handle client: {}", err);
}
Err(())
}
}));
});
Ok(())
}
pub fn run() -> Box<Future<Item = (), Error = io::Error>> {
let (listener, local_addr) = Context::with(|ctx| {
let config = &ctx.config;
let handle = &ctx.handle;
let local_addr = config.local.as_ref().unwrap();
let tcp_builder = match *local_addr {
SocketAddr::V4(..) => TcpBuilder::new_v4(),
SocketAddr::V6(..) => TcpBuilder::new_v6(),
}
.unwrap_or_else(|err| panic!("Failed to create listener, {}", err));
super::reuse_port(&tcp_builder)
.and_then(|builder| builder.reuse_address(true))
.and_then(|builder| builder.bind(local_addr))
.unwrap_or_else(|err| panic!("Failed to bind {}, {}", local_addr, err));
let listener = tcp_builder
.listen(1024)
.and_then(|l| TcpListener::from_listener(l, local_addr, &handle))
.unwrap_or_else(|err| panic!("Failed to listen, {}", err));
info!("ShadowSocks TCP Listening on {}", local_addr);
(listener, *local_addr)
});
let udp_conf = UdpConfig {
enable_udp: Context::with(|ctx| ctx.config.enable_udp),
client_addr: Rc::new(local_addr),
};
let mut servers = Context::with(|ctx| RoundRobin::new(ctx.config()));
let listening = listener
.incoming()
.for_each(move |(socket, addr)| {
let server_cfg = servers.pick_server();
trace!("Got connection, addr: {}", addr);
trace!("Picked proxy server: {:?}", server_cfg);
handle_socks5_client(socket, server_cfg, udp_conf.clone())
});
Box::new(listening.map_err(|err| {
error!("Socks5 server run failed: {}", err);
err
}))
}