use std::io;
use std::net::SocketAddr;
use std::rc::Rc;
use futures::{self, Future};
use futures::stream::Stream;
use tokio_core::net::{TcpStream, TcpListener};
use tokio_core::reactor::Handle;
use tokio_io::AsyncRead;
use tokio_io::io::{ReadHalf, WriteHalf};
use tokio_io::io::flush;
use net2::TcpBuilder;
use config::{Config, ServerConfig};
use relay::socks5::{self, HandshakeRequest, HandshakeResponse, Address};
use relay::socks5::{TcpRequestHeader, TcpResponseHeader};
use relay::loadbalancing::server::RoundRobin;
use relay::loadbalancing::server::LoadBalancer;
use relay::{BoxIoFuture, boxed_future};
use relay::tcprelay::crypto_io::{EncryptedWrite, DecryptedRead};
use super::{tunnel, ignore_until_end, try_timeout};
#[derive(Debug, Clone)]
struct UdpConfig {
enable_udp: bool,
client_addr: Rc<SocketAddr>,
}
fn handle_socks5_connect(handle: &Handle,
(r, w): (ReadHalf<TcpStream>, WriteHalf<TcpStream>),
client_addr: SocketAddr,
addr: Address,
svr_cfg: Rc<ServerConfig>)
-> BoxIoFuture<()> {
let cloned_addr = addr.clone();
let cloned_svr_cfg = svr_cfg.clone();
let cloned_handle = handle.clone();
let cloned_handle2 = handle.clone();
let timeout = *svr_cfg.timeout();
let fut = super::connect_proxy_server(handle, svr_cfg)
.then(move |res| {
let handle = cloned_handle;
match res {
Ok(svr_s) => {
trace!("Proxy server connected");
let header = TcpResponseHeader::new(socks5::Reply::Succeeded,
Address::SocketAddress(client_addr));
trace!("Send header: {:?}", header);
let fut = try_timeout(header.write_to(w), timeout, &handle);
let fut = try_timeout(fut.and_then(flush), timeout, &handle);
boxed_future(fut.map(move |w| (svr_s, w)))
}
Err(err) => {
use std::io::ErrorKind;
use relay::socks5::Reply;
error!("Failed to connect remote server, {:?}", err);
let reply = match err.kind() {
ErrorKind::ConnectionRefused => Reply::ConnectionRefused,
ErrorKind::ConnectionAborted => Reply::HostUnreachable,
_ => Reply::NetworkUnreachable,
};
let header = TcpResponseHeader::new(reply, Address::SocketAddress(client_addr));
trace!("Send header: {:?}", header);
let fut = try_timeout(header.write_to(w), timeout, &handle);
let fut = try_timeout(fut.and_then(flush), timeout, &handle);
boxed_future(fut.and_then(|_| Err(err)))
}
}
})
.and_then(move |(svr_s, w)| {
let handle = cloned_handle2;
let svr_cfg = cloned_svr_cfg;
let timeout = *svr_cfg.timeout();
super::proxy_server_handshake(svr_s, svr_cfg, addr, handle.clone()).and_then(move |(svr_r, svr_w)| {
let cloned_handle = handle.clone();
let cloned_timeout = timeout;
let rhalf = svr_r.and_then(move |svr_r| svr_r.copy_timeout_opt(w, timeout, handle));
let whalf = svr_w.and_then(move |svr_w| svr_w.copy_timeout_opt(r, cloned_timeout, cloned_handle));
tunnel(cloned_addr, whalf, rhalf)
})
});
Box::new(fut)
}
fn handle_socks5_client(handle: &Handle, s: TcpStream, conf: Rc<ServerConfig>, udp_conf: UdpConfig) -> io::Result<()> {
let cloned_handle = handle.clone();
let client_addr = try!(s.peer_addr());
let cloned_client_addr = client_addr;
let fut = futures::lazy(|| Ok(s.split()))
.and_then(|(r, w)| {
HandshakeRequest::read_from(r).and_then(move |(r, req)| {
trace!("Socks5 {:?}", req);
if !req.methods.contains(&socks5::SOCKS5_AUTH_METHOD_NONE) {
let resp = HandshakeResponse::new(socks5::SOCKS5_AUTH_METHOD_NOT_ACCEPTABLE);
let fut = resp.write_to(w)
.then(|_| {
warn!("Currently shadowsocks-rust does not support authentication");
Err(io::Error::new(io::ErrorKind::Other,
"Currently shadowsocks-rust does not support authentication"))
});
boxed_future(fut)
} else {
let resp = HandshakeResponse::new(socks5::SOCKS5_AUTH_METHOD_NONE);
trace!("Reply handshake {:?}", resp);
let fut = resp.write_to(w).and_then(flush).and_then(|w| Ok((r, w)));
boxed_future(fut)
}
})
})
.and_then(move |(r, w)| {
TcpRequestHeader::read_from(r).then(move |res| {
match res {
Ok((r, h)) => boxed_future(futures::finished((r, w, h))),
Err(err) => {
error!("Failed to get TcpRequestHeader: {}", err);
let fut = TcpResponseHeader::new(err.reply, Address::SocketAddress(client_addr))
.write_to(w)
.then(|_| Err(From::from(err)));
boxed_future(fut)
}
}
})
})
.and_then(move |(r, w, header)| {
trace!("Socks5 {:?}", header);
let addr = header.address;
match header.command {
socks5::Command::TcpConnect => {
info!("CONNECT {}", addr);
handle_socks5_connect(&cloned_handle, (r, w), cloned_client_addr, addr, conf)
}
socks5::Command::TcpBind => {
warn!("BIND is not supported");
let fut = TcpResponseHeader::new(socks5::Reply::CommandNotSupported, addr)
.write_to(w)
.map(|_| ());
boxed_future(fut)
}
socks5::Command::UdpAssociate => {
if udp_conf.enable_udp {
info!("UDP ASSOCIATE {}", addr);
let fut = TcpResponseHeader::new(socks5::Reply::Succeeded, From::from(*udp_conf.client_addr))
.write_to(w)
.and_then(flush)
.and_then(|_| {
ignore_until_end(r).map(|_| ())
});
boxed_future(fut)
} else {
warn!("UDP Associate is not enabled");
let fut = TcpResponseHeader::new(socks5::Reply::CommandNotSupported, addr)
.write_to(w)
.map(|_| ());
boxed_future(fut)
}
}
}
});
handle.spawn(fut.then(|res| {
match res {
Ok(..) => Ok(()),
Err(err) => {
if err.kind() != io::ErrorKind::BrokenPipe {
error!("Failed to handle client: {}", err);
}
Err(())
}
}
}));
Ok(())
}
pub fn run(config: Rc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
let (listener, local_addr) = {
let local_addr = config.local.as_ref().unwrap();
let tcp_builder = match *local_addr {
SocketAddr::V4(..) => TcpBuilder::new_v4(),
SocketAddr::V6(..) => TcpBuilder::new_v6(),
}
.unwrap_or_else(|err| panic!("Failed to create listener, {}", err));
super::reuse_port(&tcp_builder)
.and_then(|builder| builder.reuse_address(true))
.and_then(|builder| builder.bind(local_addr))
.unwrap_or_else(|err| panic!("Failed to bind {}, {}", local_addr, err));
let listener = tcp_builder.listen(1024)
.and_then(|l| TcpListener::from_listener(l, local_addr, &handle))
.unwrap_or_else(|err| panic!("Failed to listen, {}", err));
info!("ShadowSocks TCP Listening on {}", local_addr);
(listener, *local_addr)
};
let udp_conf = UdpConfig {
enable_udp: config.enable_udp,
client_addr: Rc::new(local_addr),
};
let mut servers = RoundRobin::new(&*config);
let listening = listener.incoming()
.for_each(move |(socket, addr)| {
let server_cfg = servers.pick_server();
trace!("Got connection, addr: {}", addr);
trace!("Picked proxy server: {:?}", server_cfg);
handle_socks5_client(&handle, socket, server_cfg, udp_conf.clone())
});
Box::new(listening.map_err(|err| {
error!("Socks5 server run failed: {}", err);
err
}))
}