#![deny(warnings)]
extern crate tokio;
extern crate tokio_io;
extern crate futures;
extern crate bytes;
use std::env;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::thread;
use tokio::prelude::*;
use futures::sync::mpsc;
fn main() -> Result<(), Box<std::error::Error>> {
let mut args = env::args().skip(1).collect::<Vec<_>>();
let tcp = match args.iter().position(|a| a == "--udp") {
Some(i) => {
args.remove(i);
false
}
None => true,
};
let addr = match args.first() {
Some(addr) => addr,
None => Err("this program requires at least one argument")?,
};
let addr = addr.parse::<SocketAddr>()?;
let (stdin_tx, stdin_rx) = mpsc::channel(0);
thread::spawn(|| read_stdin(stdin_tx));
let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx"));
let stdout = if tcp {
tcp::connect(&addr, Box::new(stdin_rx))?
} else {
udp::connect(&addr, Box::new(stdin_rx))?
};
let mut out = io::stdout();
tokio::run({
stdout
.for_each(move |chunk| {
out.write_all(&chunk)
})
.map_err(|e| println!("error reading stdout; error = {:?}", e))
});
Ok(())
}
mod codec {
use std::io;
use bytes::{BufMut, BytesMut};
use tokio::codec::{Encoder, Decoder};
pub struct Bytes;
impl Decoder for Bytes {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
if buf.len() > 0 {
let len = buf.len();
Ok(Some(buf.split_to(len)))
} else {
Ok(None)
}
}
}
impl Encoder for Bytes {
type Item = Vec<u8>;
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
buf.put(&data[..]);
Ok(())
}
}
}
mod tcp {
use tokio;
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::codec::Decoder;
use bytes::BytesMut;
use codec::Bytes;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
pub fn connect(addr: &SocketAddr,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>
{
let tcp = TcpStream::connect(addr);
let stream = Box::new(tcp.map(move |stream| {
let (sink, stream) = Bytes.framed(stream).split();
tokio::spawn(stdin.forward(sink).then(|result| {
if let Err(e) = result {
println!("failed to write to socket: {}", e)
}
Ok(())
}));
stream
}).flatten_stream());
Ok(stream)
}
}
mod udp {
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use tokio;
use tokio::net::{UdpSocket, UdpFramed};
use tokio::prelude::*;
use bytes::BytesMut;
use codec::Bytes;
pub fn connect(&addr: &SocketAddr,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>
{
let addr_to_bind = if addr.ip().is_ipv4() {
"0.0.0.0:0".parse()?
} else {
"[::]:0".parse()?
};
let udp = match UdpSocket::bind(&addr_to_bind) {
Ok(udp) => udp,
Err(_) => Err("failed to bind socket")?,
};
let (sink, stream) = UdpFramed::new(udp, Bytes).split();
let forward_stdin = stdin.map(move |chunk| {
(chunk, addr)
}).forward(sink).then(|result| {
if let Err(e) = result {
println!("failed to write to socket: {}", e)
}
Ok(())
});
let receive = stream.filter_map(move |(chunk, src)| {
if src == addr {
Some(chunk.into())
} else {
None
}
});
let stream = Box::new(future::lazy(|| {
tokio::spawn(forward_stdin);
future::ok(receive)
}).flatten_stream());
Ok(stream)
}
}
fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
let mut stdin = io::stdin();
loop {
let mut buf = vec![0; 1024];
let n = match stdin.read(&mut buf) {
Err(_) |
Ok(0) => break,
Ok(n) => n,
};
buf.truncate(n);
tx = match tx.send(buf).wait() {
Ok(tx) => tx,
Err(_) => break,
};
}
}