extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use std::collections::HashMap;
use std::rc::Rc;
use std::cell::RefCell;
use std::iter;
use std::env;
use std::io::{Error, ErrorKind, BufReader};
use std::net::SocketAddr;
use std::thread;
use rustc_serialize::{Encodable, Decodable};
use bincode::SizeLimit;
use bincode::rustc_serialize::{encode, decode, DecodingResult};
use futures::Future;
use futures::stream::{self, Stream};
use futures::sync::mpsc;
use tokio_io::codec::length_delimited;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_io::io;
use tokio_io::AsyncRead;
fn transport<T>
(raw_addr: String)
-> (mpsc::UnboundedSender<(SocketAddr, T)>, mpsc::UnboundedReceiver<(SocketAddr, T)>)
where T: Encodable + Decodable + Send + 'static
{
let addr = raw_addr.parse::<SocketAddr>().unwrap();
let (incoming_tx, incoming_rx) = mpsc::unbounded();
let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
thread::spawn(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap(); info!("Listening on: {}", addr);
let connections = Rc::new(RefCell::new(HashMap::new()));
let srv = socket.incoming().for_each(move |(stream, addr)| {
debug!("New Connection: {}", addr);
let (reader, writer) = stream.split();
let framed_reader = length_delimited::FramedRead::new(reader);
let framed_writer = length_delimited::FramedWrite::new(writer);
let (tx, rx) = mpsc::unbounded();
connections.borrow_mut().insert(addr, tx);
let connections_inner = connections.clone();
let iter = stream::iter(iter::repeat(()).map(Ok::<(), Error>));
let socket_reader = iter.fold(framed_reader, move |reader, _| {
let msg = io::read_to_end(reader, Vec::new());
let msg = msg.and_then(|(reader, vec)| {
if vec.len() == 0 {
Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"))
} else {
Ok((reader, vec))
}
});
let msg = msg.map(|(reader, vec)| (reader, decode(&vec[..]).map_err(|_| ())));
let connections = connections_inner.clone();
msg.map(move |(reader, message)| {
trace!("{}: {:?}", addr, message);
let mut conns = connections.borrow_mut();
if let Ok(msg) = message {
let iter = conns.iter_mut()
.filter(|&(&k, _)| k != addr)
.map(|(_, v)| v);
for tx in iter {
tx.send(format!("{}: {}", addr, msg)).unwrap();
}
} else {
let tx = conns.get_mut(&addr).unwrap();
tx.send("You didn't send valid UTF-8.".to_string()).unwrap();
}
reader
})
});
let socket_writer = rx.fold(writer, |writer, msg| {
let amt = io::write_all(writer, msg.into_bytes());
let amt = amt.map(|(writer, _)| writer);
amt.map_err(|_| ())
});
let connections = connections.clone();
let socket_reader = socket_reader.map_err(|_| ());
let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
handle.spawn(connection.then(move |_| {
connections.borrow_mut().remove(&addr);
debug!("Connection {} closed.", addr);
Ok(())
}));
Ok(())
});
core.run(srv).unwrap();
});
(outgoing_tx, incoming_rx)
}