extern crate futures;
extern crate futures_cpupool;
extern crate flate2;
extern crate tokio_core;
extern crate tokio_io;
use std::io;
use std::env;
use std::net::SocketAddr;
use futures::{Future, Stream, Poll};
use futures_cpupool::CpuPool;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::Core;
use tokio_io::{AsyncRead, AsyncWrite};
use flate2::write::GzEncoder;
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let pool = CpuPool::new_num_cpus();
let server = socket.incoming().for_each(move |(socket, addr)| {
handle.spawn(compress(socket, &pool).then(move |result| {
match result {
Ok((r, w)) => println!("{}: compressed {} bytes to {}", addr, r, w),
Err(e) => println!("{}: failed when compressing: {}", addr, e),
}
Ok(())
}));
Ok(())
});
core.run(server).unwrap();
}
fn compress(socket: TcpStream, pool: &CpuPool)
-> Box<Future<Item = (u64, u64), Error = io::Error>>
{
use tokio_io::io;
let (read, write) = socket.split();
let write = Count { io: write, amt: 0 };
let write = GzEncoder::new(write, flate2::Compression::best());
let process = io::copy(read, write).and_then(|(amt, _read, write)| {
io::shutdown(write).map(move |io| (amt, io.get_ref().amt))
});
Box::new(pool.spawn(process))
}
struct Count<T> {
io: T,
amt: u64,
}
impl<T: io::Write> io::Write for Count<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.io.write(buf)?;
self.amt += n as u64;
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl<T: AsyncWrite> AsyncWrite for Count<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.io.shutdown()
}
}