extern crate bytes;
extern crate corona;
extern crate futures;
extern crate tokio;
use std::cell::RefCell;
use std::io::{BufRead, BufReader, Error as IoError};
use std::iter;
use std::panic::AssertUnwindSafe;
use std::rc::Rc;
use bytes::BytesMut;
use corona::Coroutine;
use corona::io::BlockingWrapper;
use corona::prelude::*;
use corona::wrappers::SinkSender;
use futures::{future, Future};
use futures::unsync::mpsc::{self, Sender, Receiver};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncRead, WriteHalf};
use tokio::codec::{Encoder, FramedWrite};
struct LineEncoder;
impl Encoder for LineEncoder {
type Item = Rc<String>;
type Error = IoError;
fn encode(&mut self, item: Rc<String>, dst: &mut BytesMut) -> Result<(), IoError> {
dst.extend_from_slice(item.as_bytes());
dst.extend_from_slice(b"\n");
Ok(())
}
}
type Client = FramedWrite<WriteHalf<TcpStream>, LineEncoder>;
type Clients = Rc<RefCell<Vec<Client>>>;
fn handle_connection(connection: TcpStream,
clients: &Clients,
mut msgs: Sender<String>)
{
let (input, output) = connection.split();
let writer = FramedWrite::new(output, LineEncoder);
clients.borrow_mut().push(writer);
let input = BufReader::new(BlockingWrapper::new(input));
Coroutine::from_thread_local().spawn_catch_panic(AssertUnwindSafe(move || {
for line in input.lines() {
let line = line.expect("Broken line on input");
msgs.coro_send(line).expect("The broadcaster suddenly disappeared");
}
eprintln!("A connection terminated");
})).expect("Wrong stack size");
}
fn broadcaster(msgs: Receiver<String>, clients: &Clients) {
let mut extracted = Vec::new();
for msg in msgs.iter_ok() {
{ let mut borrowed = clients.borrow_mut();
extracted.extend(borrowed.drain(..));
}
let broken_idxs = {
let msg = Rc::new(msg);
let all_sent = extracted.iter_mut()
.map(|client| SinkSender::new(client, iter::once(Rc::clone(&msg))))
.map(|send_future| send_future.then(|res| Ok::<_, IoError>(res.is_ok())));
future::join_all(all_sent) .coro_wait() .unwrap() .into_iter()
.enumerate()
.filter_map(|(idx, success)| if success {
None
} else {
Some(idx)
})
.collect::<Vec<_>>()
};
for idx in broken_idxs.into_iter().rev() {
extracted.swap_remove(idx);
}
}
}
fn acceptor(clients: &Clients, sender: &Sender<String>) {
let listener = TcpListener::bind(&"[::]:1234".parse().unwrap()).unwrap();
for attempt in listener.incoming().iter_result() {
match attempt {
Ok(connection) => {
eprintln!("Received a connection");
handle_connection(connection, clients, sender.clone());
},
Err(e) => eprintln!("An error accepting a connection: {}", e),
}
}
}
fn main() {
Coroutine::new().stack_size(32_768).run(|| {
let (sender, receiver) = mpsc::channel(100);
let clients = Clients::default();
let clients_rc = Rc::clone(&clients);
corona::spawn(move || broadcaster(receiver, &clients_rc));
acceptor(&clients, &sender);
}).expect("Wrong stack size");
}