extern crate mles_utils;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate bytes;
mod ws;
use std::{env, process};
use std::io::{self, Read, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs};
use std::thread;
use std::io::{Error, ErrorKind};
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
use std::time::Duration;
use bytes::{BufMut, BytesMut};
use futures::{Sink, Future, Stream};
use futures::sync::mpsc;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use tokio_io::AsyncRead;
use tokio_io::codec::{Encoder, Decoder};
use mles_utils::*;
use ws::*;
const SRVPORT: &str = ":8077";
const USAGE: &str = "Usage: mles-client <server-address> [--use-websockets]";
const KEEPALIVE: u64 = 5;
fn main() {
let mut ws_enabled: Option<bool> = None;
let addr = env::args()
.nth(1)
.unwrap_or_else(|| {
println!("{}", USAGE);
process::exit(1);
});
let raddr = addr + SRVPORT;
let raddr: Vec<_> = raddr.to_socket_addrs()
.unwrap_or_else(|_| vec![SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)].into_iter())
.collect();
let raddr = *raddr.first().unwrap();
let raddr = Some(raddr).unwrap();
if 0 == raddr.port() {
println!("{}", USAGE);
process::exit(1);
}
if let Some(ws) = env::args().nth(2) {
if ws == "--use-websockets" {
ws_enabled = Some(true);
} else {
println!("{}", USAGE);
process::exit(1);
}
}
let keyval = match env::var("MLES_KEY") {
Ok(val) => val,
Err(_) => "".to_string(),
};
let keyaddr = match env::var("MLES_ADDR_KEY") {
Ok(val) => val,
Err(_) => "".to_string(),
};
if ws_enabled.is_some() {
process_ws_proxy(raddr, keyval, keyaddr);
} else {
let (stdin_tx, stdin_rx) = mpsc::channel(16);
let mut core = Core::new().unwrap();
let handle = core.handle();
let tcp = TcpStream::connect(&raddr, &handle);
let mut cid: Option<u32> = None;
let mut key: Option<u64> = None;
let mut keys = Vec::new();
thread::spawn(|| read_stdin(stdin_tx));
let stdin_rx = stdin_rx.map_err(|_| panic!());
let mut stdout = io::stdout();
let client = tcp.and_then(|stream| {
let _val = stream.set_nodelay(true)
.map_err(|_| panic!("Cannot set to no delay"));
let _val = stream.set_keepalive(Some(Duration::new(KEEPALIVE, 0)))
.map_err(|_| panic!("Cannot set keepalive"));
let laddr = match stream.local_addr() {
Ok(laddr) => laddr,
Err(_) => {
let addr = "0.0.0.0:0";
addr.parse::<SocketAddr>().unwrap()
}
};
if !keyval.is_empty() {
keys.push(keyval.clone());
} else {
keys.push(addr2str(&laddr));
if !keyaddr.is_empty() {
keys.push(keyaddr.clone());
}
}
let (sink, stream) = stream.framed(Bytes).split();
let stdin_rx = stdin_rx.and_then(|buf| {
if None == key {
let decoded_message = message_decode(buf.as_slice());
keys.push(decoded_message.get_uid().to_string());
keys.push(decoded_message.get_channel().to_string());
key = Some(do_hash(&keys));
cid = Some(select_cid());
}
let mut cidv = write_selected_cid(cid.unwrap());
let keyv = write_key(key.unwrap());
cidv.extend(keyv);
cidv.extend(buf);
Ok(cidv)
});
let send_stdin = stdin_rx.forward(sink);
let write_stdout = stream.for_each(move |buf| {
let decoded = message_decode(buf.to_vec().as_slice());
let mut msg = "".to_string();
if !decoded.get_message().is_empty() {
msg.push_str(decoded.get_uid());
msg.push_str(":");
msg.push_str(String::from_utf8_lossy(decoded.get_message().as_slice())
.into_owned()
.as_str());
}
stdout.write_all(&msg.into_bytes())
});
send_stdin
.map(|_| ())
.select(write_stdout.map(|_| ()))
.then(|_| Ok(()))
});
match core.run(client) {
Ok(_) => {}
Err(err) => {
println!("Error: {}", err);
process::exit(1);
}
};
}
}
struct Bytes;
impl Decoder for Bytes {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
if buf.len() >= mles_utils::HDRKEYL {
if read_hdr_type(&buf.to_vec()) != 'M' as u32 {
let len = buf.len();
buf.split_to(len);
return Ok(None);
}
let hdr_len = read_hdr_len(&buf.to_vec());
if 0 == hdr_len {
let len = buf.len();
buf.split_to(len);
return Ok(None);
}
let len = buf.len();
if len < (mles_utils::HDRKEYL + hdr_len) {
return Ok(None);
}
if mles_utils::HDRKEYL + hdr_len < len {
buf.split_to(mles_utils::HDRKEYL);
return Ok(Some(buf.split_to(hdr_len)));
}
buf.split_to(mles_utils::HDRKEYL);
Ok(Some(buf.split_to(hdr_len)))
} else {
Ok(None)
}
}
fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
self.decode(buf)
}
}
impl Encoder for Bytes {
type Item = Vec<u8>;
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
let mut msgv = write_hdr_without_cid(data.len() - mles_utils::KEYL - mles_utils::CIDL);
msgv.extend(data);
buf.put(&msgv[..]);
Ok(())
}
}
fn read_stdin(mut rx: mpsc::Sender<Vec<u8>>) {
let mut stdin = io::stdin();
let mut stdout = io::stdout();
let _val = stdout.write_all(b"User name?\n");
let mut buf = vec![0; 80];
let n = match stdin.read(&mut buf) {
Err(_) | Ok(0) => return,
Ok(n) => n,
};
buf.truncate(n - 1);
let mut userstr = String::from_utf8_lossy(buf.clone().as_slice()).into_owned();
if userstr.ends_with('\r') {
let len = userstr.len();
userstr.truncate(len - 1);
}
let _val = stdout.write_all(b"Channel?\n");
let mut buf = vec![0; 80];
let n = match stdin.read(&mut buf) {
Err(_) | Ok(0) => return,
Ok(n) => n,
};
buf.truncate(n - 1);
let mut channelstr = String::from_utf8_lossy(buf.clone().as_slice()).into_owned();
if channelstr.ends_with('\r') {
let len = channelstr.len();
channelstr.truncate(len - 1);
}
let msg = message_encode(&Msg::new(userstr.clone(), channelstr.clone(), Vec::new()));
rx = rx.send(msg).wait().unwrap();
let mut msg = userstr.clone();
msg += " to ";
msg += channelstr.as_str();
let mut welcome = "Welcome ".to_string();
welcome += msg.as_str();
welcome += "!\n";
let _val = stdout.write_all(welcome.as_bytes());
loop {
let mut buf = vec![0;80];
let n = match stdin.read(&mut buf) {
Err(_) | Ok(0) => break,
Ok(n) => n,
};
buf.truncate(n);
let str = String::from_utf8_lossy(buf.as_slice()).into_owned();
let msg = Msg::new(userstr.clone(), channelstr.clone(), str.into_bytes());
let msg = message_encode(&msg);
rx = rx.send(msg).wait().unwrap();
}
}
pub fn process_mles_client(raddr: SocketAddr, keyval: String, keyaddr: String,
ws_tx: UnboundedSender<Vec<u8>>, mles_rx: UnboundedReceiver<Vec<u8>>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let tcp = TcpStream::connect(&raddr, &handle);
let mut cid: Option<u32> = None;
let mut key: Option<u64> = None;
let mut keys = Vec::new();
let client = tcp.and_then(|stream| {
let _val = stream.set_nodelay(true)
.map_err(|_| panic!("Cannot set to no delay"));
let _val = stream.set_keepalive(Some(Duration::new(KEEPALIVE, 0)))
.map_err(|_| panic!("Cannot set keepalive"));
let laddr = match stream.local_addr() {
Ok(laddr) => laddr,
Err(_) => {
let addr = "0.0.0.0:0";
addr.parse::<SocketAddr>().unwrap()
}
};
if !keyval.is_empty() {
keys.push(keyval);
} else {
keys.push(addr2str(&laddr));
if !keyaddr.is_empty() {
keys.push(keyaddr);
}
}
let (sink, stream) = stream.framed(Bytes).split();
let mles_rx = mles_rx.map_err(|_| panic!()); let mles_rx = mles_rx.and_then(|buf| {
if buf.is_empty() {
return Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"));
}
if None == key {
let decoded_message = message_decode(buf.as_slice());
keys.push(decoded_message.get_uid().to_string());
keys.push(decoded_message.get_channel().to_string());
key = Some(do_hash(&keys));
cid = Some(select_cid());
}
let mut cidv = write_selected_cid(cid.unwrap());
let keyv = write_key(key.unwrap());
cidv.extend(keyv);
cidv.extend(buf);
Ok(cidv)
});
let send_wsrx = mles_rx.forward(sink);
let write_wstx = stream.for_each(move |buf| {
let ws_tx_inner = ws_tx.clone();
let _ = ws_tx_inner.send(buf.to_vec()).wait().map_err(|err| {
Error::new(ErrorKind::Other, err)
});
Ok(())
});
send_wsrx
.map(|_| ())
.select(write_wstx.map(|_| ()))
.then(|_| Ok(()))
});
match core.run(client) {
Ok(_) => {}
Err(err) => {
println!("Error: {}", err);
}
};
}