extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use std::sync::Arc;
use std::env;
use std::net::{Shutdown, SocketAddr};
use std::io::{self, Read, Write};
use futures::stream::Stream;
use futures::{Future, Poll};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::Core;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{copy, shutdown};
fn main() {
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
let listen_addr = listen_addr.parse::<SocketAddr>().unwrap();
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>().unwrap();
let mut l = Core::new().unwrap();
let handle = l.handle();
let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap();
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
let done = socket.incoming().for_each(move |(client, client_addr)| {
let server = TcpStream::connect(&server_addr, &handle);
let amounts = server.and_then(move |server| {
let client_reader = MyTcpStream(Arc::new(client));
let client_writer = client_reader.clone();
let server_reader = MyTcpStream(Arc::new(server));
let server_writer = server_reader.clone();
let client_to_server = copy(client_reader, server_writer)
.and_then(|(n, _, server_writer)| {
shutdown(server_writer).map(move |_| n)
});
let server_to_client = copy(server_reader, client_writer)
.and_then(|(n, _, client_writer)| {
shutdown(client_writer).map(move |_| n)
});
client_to_server.join(server_to_client)
});
let msg = amounts.map(move |(from_client, from_server)| {
println!("client at {} wrote {} bytes and received {} bytes",
client_addr, from_client, from_server);
}).map_err(|e| {
println!("error: {}", e);
});
handle.spawn(msg);
Ok(())
});
l.run(done).unwrap();
}
#[derive(Clone)]
struct MyTcpStream(Arc<TcpStream>);
impl Read for MyTcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&*self.0).read(buf)
}
}
impl Write for MyTcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&*self.0).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl AsyncRead for MyTcpStream {}
impl AsyncWrite for MyTcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try!(self.0.shutdown(Shutdown::Write));
Ok(().into())
}
}