extern crate clap;
extern crate fibers;
extern crate futures;
extern crate handy_async;
use clap::{App, Arg};
use fibers::{Executor, Spawn, ThreadPoolExecutor};
use futures::{Future, Stream};
use handy_async::io::{AsyncWrite, ReadFrom};
use handy_async::pattern::AllowPartial;
fn main() {
let matches = App::new("tcp_echo_cli")
.arg(
Arg::with_name("SERVER_HOST")
.short("h")
.takes_value(true)
.default_value("127.0.0.1"),
)
.arg(
Arg::with_name("SERVER_PORT")
.short("p")
.takes_value(true)
.default_value("3000"),
)
.get_matches();
let host = matches.value_of("SERVER_HOST").unwrap();
let port = matches.value_of("SERVER_PORT").unwrap();
let addr = format!("{}:{}", host, port)
.parse()
.expect("Invalid TCP address");
let mut executor = ThreadPoolExecutor::new().expect("Cannot create Executor");
let handle = executor.handle();
let monitor = executor.spawn_monitor(fibers::net::TcpStream::connect(addr).and_then(
move |stream| {
println!("# CONNECTED: {}", addr);
let (reader, writer) = (stream.clone(), stream);
let stdin_stream = vec![0; 1024]
.allow_partial()
.into_stream(fibers::io::stdin());
handle.spawn(
stdin_stream
.map_err(|e| e.into_error())
.fold(writer, |writer, (mut buf, size)| {
buf.truncate(size);
writer
.async_write_all(buf)
.map(|(w, _)| w)
.map_err(|e| e.into_error())
})
.then(|r| {
println!("# Writer finished: {:?}", r);
Ok(())
}),
);
let stream = vec![0; 1024].allow_partial().into_stream(reader);
stream
.map_err(|e| e.into_error())
.for_each(|(mut buf, len)| {
buf.truncate(len);
println!("{}", String::from_utf8(buf).expect("Invalid UTF-8"));
Ok(())
})
},
));
let result = executor.run_fiber(monitor).expect("Execution failed");
println!("# Disconnected: {:?}", result);
}