use std::io::{self, Read, Write};
use std::net::SocketAddr;
use tokio_core::reactor::Handle;
use tokio_core::net::TcpStream;
use tokio_io::io::flush;
use tokio_io::{AsyncRead, AsyncWrite};
use futures::{self, Future, Poll, Async};
use relay::socks5::{self, HandshakeRequest, HandshakeResponse, Address, TcpRequestHeader, TcpResponseHeader, Command,
Reply};
use relay::{BoxIoFuture, boxed_future};
pub struct Socks5Client {
stream: TcpStream,
}
impl Socks5Client {
pub fn connect<A>(addr: A, proxy: SocketAddr, handle: Handle) -> BoxIoFuture<Socks5Client>
where Address: From<A>,
A: 'static
{
let fut = futures::lazy(move || TcpStream::connect(&proxy, &handle))
.and_then(move |s| {
let hs = HandshakeRequest::new(vec![socks5::SOCKS5_AUTH_METHOD_NONE]);
trace!("Client connected, going to send handshake: {:?}", hs);
hs.write_to(s)
.and_then(flush)
.and_then(HandshakeResponse::read_from)
.and_then(|(s, rsp)| {
trace!("Got handshake response: {:?}", rsp);
assert_eq!(rsp.chosen_method, socks5::SOCKS5_AUTH_METHOD_NONE);
Ok(s)
})
})
.and_then(move |s| {
let h = TcpRequestHeader::new(Command::TcpConnect, From::from(addr));
trace!("Going to connect, req: {:?}", h);
h.write_to(s)
.and_then(flush)
.and_then(|s| TcpResponseHeader::read_from(s).map_err(From::from))
.and_then(|(s, rsp)| {
trace!("Got response: {:?}", rsp);
match rsp.reply {
Reply::Succeeded => Ok(s),
r => {
let err = io::Error::new(io::ErrorKind::Other, format!("{}", r));
Err(err)
}
}
})
})
.map(|s| Socks5Client { stream: s });
boxed_future(fut)
}
pub fn udp_associate<A>(addr: A, proxy: SocketAddr, handle: Handle) -> BoxIoFuture<(Socks5Client, Address)>
where Address: From<A>,
A: 'static
{
let fut = futures::lazy(move || TcpStream::connect(&proxy, &handle))
.and_then(move |s| {
let hs = HandshakeRequest::new(vec![socks5::SOCKS5_AUTH_METHOD_NONE]);
trace!("Client connected, going to send handshake: {:?}", hs);
hs.write_to(s)
.and_then(flush)
.and_then(HandshakeResponse::read_from)
.and_then(|(s, rsp)| {
trace!("Got handshake response: {:?}", rsp);
assert_eq!(rsp.chosen_method, socks5::SOCKS5_AUTH_METHOD_NONE);
Ok(s)
})
})
.and_then(move |s| {
let h = TcpRequestHeader::new(Command::UdpAssociate, From::from(addr));
trace!("Going to connect, req: {:?}", h);
h.write_to(s)
.and_then(flush)
.and_then(|s| TcpResponseHeader::read_from(s).map_err(From::from))
.and_then(|(s, rsp)| {
trace!("Got response: {:?}", rsp);
match rsp.reply {
Reply::Succeeded => Ok((s, rsp.address)),
r => {
let err = io::Error::new(io::ErrorKind::Other, format!("{}", r));
Err(err)
}
}
})
})
.map(|(s, a)| (Socks5Client { stream: s }, a));
boxed_future(fut)
}
}
impl Read for Socks5Client {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stream.read(buf)
}
}
impl Write for Socks5Client {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.stream.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.stream.flush()
}
}
impl AsyncRead for Socks5Client {}
impl AsyncWrite for Socks5Client {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(Async::Ready(()))
}
}