use std::panic;
use std::cell::RefCell;
use std::rc::Rc;
use std::env;
use std::io::{self, Read, Write};
use std::net::{SocketAddr, IpAddr};
use std::net::Shutdown;
use std::str;
use std::time::Duration;
use libc;
use futures;
use futures::{Future, Poll, Async};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::reactor::{Core, Handle, Timeout};
use tokio_core::net::{TcpStream, TcpListener};
use tokio_core::io::{Io, read_exact, write_all, Window};
use std::sync::{Arc, Mutex, RwLock};
lazy_static! {
static ref ERROR: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
}
#[derive(Clone, Debug)]
pub struct AsyncMeterProxy {
pub back_address: String,
pub back_port: u16,
pub front_address: String,
pub front_port: u16,
pub num_bytes: Arc<Mutex<f64>>,
pub num_resp: Arc<Mutex<f64>>,
}
impl AsyncMeterProxy {
pub fn new(b_addr: String, b_port: u16, f_addr: String, f_port: u16) -> AsyncMeterProxy {
AsyncMeterProxy {
back_address: b_addr,
back_port: b_port,
front_address: f_addr,
front_port: f_port,
num_bytes: Arc::new(Mutex::new(0.0)),
num_resp: Arc::new(Mutex::new(0.0)),
}
}
pub fn start(&self) {
let rlim = libc::rlimit {
rlim_cur: 4096,
rlim_max: 4096,
};
unsafe {
libc::setrlimit(libc::RLIMIT_NOFILE, &rlim);
}
let mut core = Core::new().unwrap();
let mut lp = Core::new().unwrap();
let pool = CpuPool::new(4);
let buffer = Rc::new(RefCell::new(vec![0; 64 * 1024]));
let handle = lp.handle();
let f_addr_c = self.front_address.clone();
let b_addr_c = self.back_address.clone();
let front_address =
(f_addr_c + ":" + &self.front_port.to_string()).parse::<SocketAddr>().unwrap();
let back_address =
(b_addr_c + ":" + &self.back_port.to_string()).parse::<SocketAddr>().unwrap();
let listener = TcpListener::bind(&front_address, &handle).unwrap();
let clients = listener.incoming().map(move |(socket, addr)| {
(Client {
buffer: buffer.clone(),
pool: pool.clone(),
handle: handle.clone(),
num_bytes: self.num_bytes.clone(),
num_resp: self.num_resp.clone(),
}
.serve(socket, back_address),
addr)
});
let handle = lp.handle();
let server = clients.for_each(|(client, addr)| {
handle.spawn(client.then(move |res| {
match res {
Ok((a, b)) => println!("proxied {}/{} bytes for {}", a, b, addr),
Err(e) => {;
}
}
futures::finished(())
}));
Ok(())
});
lp.run(server).unwrap();
}
pub fn reset(&self) {
{
let mut n_bytes = self.num_bytes.lock().unwrap();
*n_bytes = 0.0;
}
{
let mut n_resp = self.num_resp.lock().unwrap();
*n_resp = 0.0;
}
}
pub fn get_num_kbytes_rcvd(&self) -> f64 {
let n_bytes = self.num_bytes.lock().unwrap();
return *n_bytes as f64 / 1024.0f64;
}
pub fn get_latency(&self) -> f64 {
let n_resp = self.num_resp.lock().unwrap();
return *n_resp as f64 / 1000000.0f64;
}
}
struct Client {
buffer: Rc<RefCell<Vec<u8>>>,
pool: CpuPool,
handle: Handle,
num_bytes: Arc<Mutex<f64>>,
num_resp: Arc<Mutex<f64>>,
}
impl Client {
fn serve(self,
front_socket: TcpStream,
back_addr: SocketAddr)
-> Box<Future<Item = (u64, u64), Error = io::Error>> {
let pool = self.pool.clone();
let handle = self.handle.clone();
let pair = TcpStream::connect(&back_addr, &handle)
.and_then(|back_socket| futures::lazy(|| Ok((back_socket, front_socket))));
let buffer = self.buffer.clone();
let n_bytes = self.num_bytes.clone();
let n_resp = self.num_resp.clone();
mybox(pair.and_then(|(back, front)| {
let back = Rc::new(back);
let front = Rc::new(front);
let half1 = TransferFrontBack::new(back.clone(),
front.clone(),
buffer.clone(),
n_bytes.clone(),
n_resp.clone());
let half2 = TransferBackFront::new(front, back, buffer, n_bytes, n_resp);
half1.join(half2)
}))
}
}
fn mybox<F: Future + 'static>(f: F) -> Box<Future<Item = F::Item, Error = F::Error>> {
Box::new(f)
}
struct TransferBackFront {
reader: Rc<TcpStream>,
writer: Rc<TcpStream>,
buf: Rc<RefCell<Vec<u8>>>,
amt: u64,
num_bytes: Arc<Mutex<f64>>,
num_resp: Arc<Mutex<f64>>,
}
impl TransferBackFront {
fn new(reader: Rc<TcpStream>,
writer: Rc<TcpStream>,
buffer: Rc<RefCell<Vec<u8>>>,
n_bytes: Arc<Mutex<f64>>,
n_resp: Arc<Mutex<f64>>)
-> TransferBackFront {
TransferBackFront {
reader: reader,
writer: writer,
buf: buffer,
amt: 0,
num_bytes: n_bytes,
num_resp: n_resp,
}
}
}
impl Future for TransferBackFront {
type Item = u64;
type Error = io::Error;
fn poll(&mut self) -> Poll<u64, io::Error> {
let mut buffer = self.buf.borrow_mut();
loop {
let read_ready = self.reader.poll_read().is_ready();
let write_ready = self.writer.poll_write().is_ready();
if !read_ready || !write_ready {
return Ok(Async::NotReady);
}
let n = try_nb!((&*self.reader).read(&mut buffer));
if n == 0 {
try!(self.writer.shutdown(Shutdown::Write));
return Ok(self.amt.into());
}
self.amt += n as u64;
let m = try!((&*self.writer).write(&buffer[..n]));
assert_eq!(n, m);
}
}
}
struct TransferFrontBack {
reader: Rc<TcpStream>,
writer: Rc<TcpStream>,
buf: Rc<RefCell<Vec<u8>>>,
amt: u64,
num_bytes: Arc<Mutex<f64>>,
num_resp: Arc<Mutex<f64>>,
}
impl TransferFrontBack {
fn new(reader: Rc<TcpStream>,
writer: Rc<TcpStream>,
buffer: Rc<RefCell<Vec<u8>>>,
n_bytes: Arc<Mutex<f64>>,
n_resp: Arc<Mutex<f64>>)
-> TransferFrontBack {
TransferFrontBack {
reader: reader,
writer: writer,
buf: buffer,
amt: 0,
num_bytes: n_bytes,
num_resp: n_resp,
}
}
}
impl Future for TransferFrontBack {
type Item = u64;
type Error = io::Error;
fn poll(&mut self) -> Poll<u64, io::Error> {
let mut buffer = self.buf.borrow_mut();
loop {
let read_ready = self.reader.poll_read().is_ready();
let write_ready = self.writer.poll_write().is_ready();
if !read_ready || !write_ready {
return Ok(Async::NotReady);
}
let n = try_nb!((&*self.reader).read(&mut buffer));
if n == 0 {
try!(self.writer.shutdown(Shutdown::Write));
return Ok(self.amt.into());
}
self.amt += n as u64;
{
let mut n_bytes = self.num_bytes.lock().unwrap();
*n_bytes += n as f64;
}
{
let mut n_resp = self.num_resp.lock().unwrap();
*n_resp += 1.0;
}
let m = try!((&*self.writer).write(&buffer[..n]));
assert_eq!(n, m);
}
}
}
fn other(desc: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, desc)
}