#![deny(warnings)]
extern crate futures;
extern crate tokio;
use tokio::io;
use tokio::net::TcpListener;
use tokio::prelude::*;
use std::collections::HashMap;
use std::env;
use std::io::BufReader;
use std::iter;
use std::sync::{Arc, Mutex};
fn main() -> Result<(), Box<std::error::Error>> {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse()?;
let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);
let connections = Arc::new(Mutex::new(HashMap::new()));
let srv = socket
.incoming()
.map_err(|e| {
println!("failed to accept socket; error = {:?}", e);
e
})
.for_each(move |stream| {
let addr = stream.peer_addr()?;
println!("New Connection: {}", addr);
let (reader, writer) = stream.split();
let (tx, rx) = futures::sync::mpsc::unbounded();
connections.lock().unwrap().insert(addr, tx);
let connections_inner = connections.clone();
let reader = BufReader::new(reader);
let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
let socket_reader = iter.fold(reader, move |reader, _| {
let line = io::read_until(reader, b'\n', Vec::new());
let line = line.and_then(|(reader, vec)| {
if vec.len() == 0 {
Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
} else {
Ok((reader, vec))
}
});
let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));
let connections = connections_inner.clone();
line.map(move |(reader, message)| {
println!("{}: {:?}", addr, message);
let mut conns = connections.lock().unwrap();
if let Ok(msg) = message {
let iter = conns
.iter_mut()
.filter(|&(&k, _)| k != addr)
.map(|(_, v)| v);
for tx in iter {
tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
}
} else {
let tx = conns.get_mut(&addr).unwrap();
tx.unbounded_send("You didn't send valid UTF-8.".to_string())
.unwrap();
}
reader
})
});
let socket_writer = rx.fold(writer, |writer, msg| {
let amt = io::write_all(writer, msg.into_bytes());
let amt = amt.map(|(writer, _)| writer);
amt.map_err(|_| ())
});
let connections = connections.clone();
let socket_reader = socket_reader.map_err(|_| ());
let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
tokio::spawn(connection.then(move |_| {
connections.lock().unwrap().remove(&addr);
println!("Connection {} closed.", addr);
Ok(())
}));
Ok(())
})
.map_err(|err| println!("error occurred: {:?}", err));
tokio::run(srv);
Ok(())
}