shadowsocks-rust 1.3.2

shadowsocks is a fast tunnel proxy that helps you bypass firewalls.
Documentation
//! Local server that accepts SOCKS 5 protocol

use std::io;
use std::net::SocketAddr;
use std::rc::Rc;

use futures::{self, Future};
use futures::stream::Stream;

use tokio_core::net::{TcpStream, TcpListener};
use tokio_core::reactor::Handle;
use tokio_io::AsyncRead;
use tokio_io::io::{ReadHalf, WriteHalf};
use tokio_io::io::flush;

use net2::TcpBuilder;

use config::{Config, ServerConfig};

use relay::socks5::{self, HandshakeRequest, HandshakeResponse, Address};
use relay::socks5::{TcpRequestHeader, TcpResponseHeader};
use relay::loadbalancing::server::RoundRobin;
use relay::loadbalancing::server::LoadBalancer;
use relay::{BoxIoFuture, boxed_future};
use relay::tcprelay::crypto_io::{EncryptedWrite, DecryptedRead};

use super::{tunnel, ignore_until_end, try_timeout};

#[derive(Debug, Clone)]
struct UdpConfig {
    enable_udp: bool,
    client_addr: Rc<SocketAddr>,
}

fn handle_socks5_connect(handle: &Handle,
                         (r, w): (ReadHalf<TcpStream>, WriteHalf<TcpStream>),
                         client_addr: SocketAddr,
                         addr: Address,
                         svr_cfg: Rc<ServerConfig>)
                         -> BoxIoFuture<()> {
    let cloned_addr = addr.clone();
    let cloned_svr_cfg = svr_cfg.clone();
    let cloned_handle = handle.clone();
    let cloned_handle2 = handle.clone();
    let timeout = *svr_cfg.timeout();
    let fut = super::connect_proxy_server(handle, svr_cfg)
        .then(move |res| {
            let handle = cloned_handle;
            match res {
                Ok(svr_s) => {
                    trace!("Proxy server connected");

                    // Tell the client that we are ready
                    let header = TcpResponseHeader::new(socks5::Reply::Succeeded,
                                                        Address::SocketAddress(client_addr));
                    trace!("Send header: {:?}", header);

                    let fut = try_timeout(header.write_to(w), timeout, &handle);
                    let fut = try_timeout(fut.and_then(flush), timeout, &handle);
                    boxed_future(fut.map(move |w| (svr_s, w)))
                }
                Err(err) => {
                    use std::io::ErrorKind;
                    use relay::socks5::Reply;

                    error!("Failed to connect remote server, {:?}", err);

                    let reply = match err.kind() {
                        ErrorKind::ConnectionRefused => Reply::ConnectionRefused,
                        ErrorKind::ConnectionAborted => Reply::HostUnreachable,
                        _ => Reply::NetworkUnreachable,
                    };

                    let header = TcpResponseHeader::new(reply, Address::SocketAddress(client_addr));
                    trace!("Send header: {:?}", header);
                    let fut = try_timeout(header.write_to(w), timeout, &handle);
                    let fut = try_timeout(fut.and_then(flush), timeout, &handle);
                    boxed_future(fut.and_then(|_| Err(err)))
                }
            }
        })
        .and_then(move |(svr_s, w)| {
            let handle = cloned_handle2;
            let svr_cfg = cloned_svr_cfg;
            let timeout = *svr_cfg.timeout();
            super::proxy_server_handshake(svr_s, svr_cfg, addr, handle.clone()).and_then(move |(svr_r, svr_w)| {
                let cloned_handle = handle.clone();
                let cloned_timeout = timeout;

                let rhalf = svr_r.and_then(move |svr_r| svr_r.copy_timeout_opt(w, timeout, handle));
                let whalf = svr_w.and_then(move |svr_w| svr_w.copy_timeout_opt(r, cloned_timeout, cloned_handle));

                tunnel(cloned_addr, whalf, rhalf)
            })
        });

    Box::new(fut)
}

fn handle_socks5_client(handle: &Handle, s: TcpStream, conf: Rc<ServerConfig>, udp_conf: UdpConfig) -> io::Result<()> {
    let cloned_handle = handle.clone();
    let client_addr = try!(s.peer_addr());
    let cloned_client_addr = client_addr;
    let fut = futures::lazy(|| Ok(s.split()))
        .and_then(|(r, w)| {
            // Socks5 handshakes
            HandshakeRequest::read_from(r).and_then(move |(r, req)| {
                trace!("Socks5 {:?}", req);

                if !req.methods.contains(&socks5::SOCKS5_AUTH_METHOD_NONE) {
                    let resp = HandshakeResponse::new(socks5::SOCKS5_AUTH_METHOD_NOT_ACCEPTABLE);
                    let fut = resp.write_to(w)
                        .then(|_| {
                            warn!("Currently shadowsocks-rust does not support authentication");
                            Err(io::Error::new(io::ErrorKind::Other,
                                               "Currently shadowsocks-rust does not support authentication"))
                        });
                    boxed_future(fut)
                } else {
                    // Reply to client
                    let resp = HandshakeResponse::new(socks5::SOCKS5_AUTH_METHOD_NONE);
                    trace!("Reply handshake {:?}", resp);
                    let fut = resp.write_to(w).and_then(flush).and_then(|w| Ok((r, w)));
                    boxed_future(fut)
                }
            })
        })
        .and_then(move |(r, w)| {
            // Fetch headers
            TcpRequestHeader::read_from(r).then(move |res| {
                match res {
                    Ok((r, h)) => boxed_future(futures::finished((r, w, h))),
                    Err(err) => {
                        error!("Failed to get TcpRequestHeader: {}", err);
                        let fut = TcpResponseHeader::new(err.reply, Address::SocketAddress(client_addr))
                            .write_to(w)
                            .then(|_| Err(From::from(err)));
                        boxed_future(fut)
                    }
                }
            })
        })
        .and_then(move |(r, w, header)| {
            trace!("Socks5 {:?}", header);

            let addr = header.address;
            match header.command {
                socks5::Command::TcpConnect => {
                    info!("CONNECT {}", addr);
                    handle_socks5_connect(&cloned_handle, (r, w), cloned_client_addr, addr, conf)
                }
                socks5::Command::TcpBind => {
                    warn!("BIND is not supported");
                    let fut = TcpResponseHeader::new(socks5::Reply::CommandNotSupported, addr)
                        .write_to(w)
                        .map(|_| ());
                    boxed_future(fut)
                }
                socks5::Command::UdpAssociate => {
                    if udp_conf.enable_udp {
                        info!("UDP ASSOCIATE {}", addr);
                        let fut = TcpResponseHeader::new(socks5::Reply::Succeeded, From::from(*udp_conf.client_addr))
                            .write_to(w)
                            .and_then(flush)
                            .and_then(|_| {
                                // Hold the connection until it ends by its own
                                ignore_until_end(r).map(|_| ())
                            });

                        boxed_future(fut)
                    } else {
                        warn!("UDP Associate is not enabled");
                        let fut = TcpResponseHeader::new(socks5::Reply::CommandNotSupported, addr)
                            .write_to(w)
                            .map(|_| ());
                        boxed_future(fut)
                    }
                }
            }
        });

    // Runs in Tokio
    handle.spawn(fut.then(|res| {
        match res {
            Ok(..) => Ok(()),
            Err(err) => {
                if err.kind() != io::ErrorKind::BrokenPipe {
                    error!("Failed to handle client: {}", err);
                }
                Err(())
            }
        }
    }));

    Ok(())
}

/// Starts a TCP local server with Socks5 proxy protocol
pub fn run(config: Rc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
    let (listener, local_addr) = {
        let local_addr = config.local.as_ref().unwrap();

        let tcp_builder = match *local_addr {
                SocketAddr::V4(..) => TcpBuilder::new_v4(),
                SocketAddr::V6(..) => TcpBuilder::new_v6(),
            }
            .unwrap_or_else(|err| panic!("Failed to create listener, {}", err));

        super::reuse_port(&tcp_builder)
            .and_then(|builder| builder.reuse_address(true))
            .and_then(|builder| builder.bind(local_addr))
            .unwrap_or_else(|err| panic!("Failed to bind {}, {}", local_addr, err));

        let listener = tcp_builder.listen(1024)
            .and_then(|l| TcpListener::from_listener(l, local_addr, &handle))
            .unwrap_or_else(|err| panic!("Failed to listen, {}", err));

        info!("ShadowSocks TCP Listening on {}", local_addr);
        (listener, *local_addr)
    };

    let udp_conf = UdpConfig {
        enable_udp: config.enable_udp,
        client_addr: Rc::new(local_addr),
    };

    let mut servers = RoundRobin::new(&*config);
    let listening = listener.incoming()
        .for_each(move |(socket, addr)| {
            let server_cfg = servers.pick_server();
            trace!("Got connection, addr: {}", addr);
            trace!("Picked proxy server: {:?}", server_cfg);
            handle_socks5_client(&handle, socket, server_cfg, udp_conf.clone())
        });

    Box::new(listening.map_err(|err| {
        error!("Socks5 server run failed: {}", err);
        err
    }))
}