use time;
use libc;
use ansi_term::Colour::{Red, Yellow};
use std::net::{TcpListener, TcpStream, Shutdown, SocketAddr,IpAddr};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, str};
use std::time::Duration;
use std::io::prelude::*;
use libc::setrlimit;
use std::collections::HashMap;
use std::sync::mpsc::{channel, Sender, Receiver};
struct SharedCounter(Arc<Mutex<usize>>);
impl SharedCounter {
fn new() -> Self {
SharedCounter(Arc::new(Mutex::new(0)))
}
fn increment(&self, quantity: usize) {
let mut counter = self.0.lock().unwrap();
*counter = *counter + quantity;
}
fn get(&self) -> usize {
let counter = self.0.lock().unwrap();
*counter
}
fn reset(&self) {
let mut counter = self.0.lock().unwrap();
*counter = 0;
}
}
struct SharedTimeVec(Arc<Mutex<Vec<u64>>>);
impl SharedTimeVec {
fn new() -> Self {
SharedTimeVec(Arc::new(Mutex::new(Vec::new())))
}
fn insert(&self, value: u64) {
let mut time_vec = self.0.lock().unwrap();
time_vec.push(value);
}
fn get_avg_value(&self) -> f64 {
let mut time_vec = self.0.lock().unwrap();
let sum: u64= time_vec.iter().sum();
return sum as f64/time_vec.len() as f64;
}
fn reset(&self) {
let mut time_vec = self.0.lock().unwrap();
time_vec.clear();
}
}
lazy_static! {
static ref TIME_TABLE: SharedTimeVec = {SharedTimeVec::new()};
static ref NUM_BYTES : SharedCounter = {SharedCounter::new()};
static ref ERROR: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
}
#[derive(Clone)]
pub struct SyncMeterProxy {
pub back_address : String,
pub back_port : u16,
pub front_address: String,
pub front_port : u16,
pub reset_lock_flag: Arc<RwLock<bool>>,
}
impl SyncMeterProxy {
pub fn new(b_addr: String, b_port: u16, f_addr: String, f_port: u16) -> SyncMeterProxy {
SyncMeterProxy {
back_address : b_addr,
back_port : b_port,
front_address: f_addr,
front_port : f_port,
reset_lock_flag: Arc::new(RwLock::new(false)),
}
}
pub fn start(&self) {
let rlim = libc::rlimit {
rlim_cur: 4096,
rlim_max: 4096,
};
unsafe {
libc::setrlimit(libc::RLIMIT_NOFILE, &rlim);
}
let targ_addr: IpAddr = self.front_address.parse()
.expect("Unable to parse FRONT-side Address");
let acceptor = TcpListener::bind((targ_addr, self.front_port)).unwrap();
let mut children = vec![];
for stream in acceptor.incoming() {
let reset_lock_flag_c = self.reset_lock_flag.clone();
let back_addr_c = self.clone().back_address;
let back_port_c = self.back_port;
if *reset_lock_flag_c.read().unwrap() == true {
break;
}
match stream {
Err(e) => println!("Strange connection broken: {}", e),
Ok(stream) => {
children.push(thread::spawn(move || {
let mut stream_c = stream.try_clone().unwrap();
let stream_c2 = stream.try_clone().unwrap();
stream_c2.set_read_timeout(Some(Duration::new(3, 0)));
SyncMeterProxy::start_pipe(stream_c, back_addr_c, back_port_c);
drop(stream);
}));
}
}
}
for child in children {
let _ = child.join();
}
drop(acceptor);
return;
}
pub fn stop_and_reset(&self) {
*self.reset_lock_flag.write().unwrap() = true;
NUM_BYTES.reset();
TIME_TABLE.reset();
let targ_addr: IpAddr = self.front_address.parse()
.expect("Unable to parse FRONT-side Address");
TcpStream::connect((targ_addr, self.front_port));
}
pub fn get_num_bytes_rcvd(&self) -> usize {
return NUM_BYTES.get();
}
pub fn get_latency_ms(&self) -> f64 {
return TIME_TABLE.get_avg_value() / 1000000.0f64;
}
fn start_pipe(front: TcpStream, target_addr: String, target_port: u16) {
let targ_addr: IpAddr = target_addr.parse()
.expect("Unable to parse BACK-side Address");
let mut back = match TcpStream::connect((targ_addr, target_port)) {
Err(e) => {
let mut err = ERROR.lock().unwrap();
if *err == false {
println!("{} Unable to connect to the Target Application. Maybe a bad \
configuration: {}",
Red.paint("*****ERROR***** --> "),
e);
};
*err = true;
front.shutdown(Shutdown::Both);
drop(front);
return;
}
Ok(b) => b,
};
let front_c = front.try_clone().unwrap();
let back_c = back.try_clone().unwrap();
let timedOut = Arc::new(AtomicBool::new(false));
let timedOut_c = timedOut.clone();
let latency_mutex: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
let (tx, rx) = channel();
let latency_mutex_c = latency_mutex.clone();
thread::spawn(move || {
SyncMeterProxy::keep_copying_bench_2_targ(front, back, timedOut, latency_mutex, tx);
});
thread::spawn(move || {
SyncMeterProxy::keep_copying_targ_2_bench(back_c, front_c, timedOut_c, latency_mutex_c, rx);
});
}
fn keep_copying_bench_2_targ(mut front: TcpStream,
mut back: TcpStream,
timedOut: Arc<AtomicBool>,
time_mutex: Arc<Mutex<u64>>, tx: Sender<u8>) {
front.set_read_timeout(Some(Duration::new(1000, 0)));
let mut buf = [0; 1024];
loop {
let read = match front.read(&mut buf) {
Err(ref err) => {
let other = timedOut.swap(true, Ordering::AcqRel);
if other {
drop(front);
drop(back);
return;
}
front.shutdown(Shutdown::Both);
back.shutdown(Shutdown::Both);
drop(front);
drop(back);
return; }
Ok(r) => r,
};
let mut start_time = time_mutex.lock().unwrap();
*start_time=time::precise_time_ns();
timedOut.store(false, Ordering::Release);
match back.write(&buf[0..read]) {
Err(..) => {
timedOut.store(true, Ordering::Release);
front.shutdown(Shutdown::Both);
back.shutdown(Shutdown::Both);
drop(front);
drop(back);
return;
}
Ok(..) => (),
};
tx.send(1).unwrap();
}
}
fn keep_copying_targ_2_bench(mut back: TcpStream,
mut front: TcpStream,
timedOut: Arc<AtomicBool>,
time_mutex: Arc<Mutex<u64>>, rx: Receiver<u8>) {
back.set_read_timeout(Some(Duration::new(1000, 0)));
let mut buf = [0; 1024];
let mut seq_number = 0;
loop {
let read = match back.read(&mut buf) {
Err(ref err) => {
let other = timedOut.swap(true, Ordering::AcqRel);
if other {
drop(back);
drop(front);
return;
}
front.shutdown(Shutdown::Both);
back.shutdown(Shutdown::Both);
drop(back);
drop(front);
return; }
Ok(r) => r,
};
match rx.try_recv(){
Ok(r) => {
let res = *(time_mutex.lock().unwrap());
TIME_TABLE.insert(time::precise_time_ns()-res);
},
RecvError => {},
};
NUM_BYTES.increment(read);
timedOut.store(false, Ordering::Release);
match front.write(&buf[0..read]) {
Err(..) => {
timedOut.store(true, Ordering::Release);
front.shutdown(Shutdown::Both);
back.shutdown(Shutdown::Both);
drop(back);
drop(front);
return;
}
Ok(..) => (),
};
}
drop(back);
drop(front);
}
}