extern crate futures;
extern crate tokio;
extern crate websocket;
use std::fmt::Debug;
use websocket::async::Server;
use websocket::message::{Message, OwnedMessage};
use websocket::server::InvalidConnection;
use futures::{future, Future, Sink, Stream};
use tokio::runtime::TaskExecutor;
fn main() {
let mut runtime = tokio::runtime::Builder::new().build().unwrap();
let executor = runtime.executor();
let server = Server::bind("127.0.0.1:2794", &tokio::reactor::Handle::default()).unwrap();
let f = server
.incoming()
.then(future::ok) .filter(|event| {
match event {
Ok(_) => true, Err(InvalidConnection { ref error, .. }) => {
println!("Bad client: {}", error);
false }
}
})
.and_then(|event| event) .map_err(|_| ()) .for_each(move |(upgrade, addr)| {
println!("Got a connection from: {}", addr);
if !upgrade.protocols().iter().any(|s| s == "rust-websocket") {
spawn_future(upgrade.reject(), "Upgrade Rejection", &executor);
return Ok(());
}
let f = upgrade
.use_protocol("rust-websocket")
.accept()
.and_then(|(s, _)| s.send(Message::text("Hello World!").into()))
.and_then(|s| {
let (sink, stream) = s.split();
stream
.take_while(|m| Ok(!m.is_close()))
.filter_map(|m| {
println!("Message from Client: {:?}", m);
match m {
OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
OwnedMessage::Pong(_) => None,
_ => Some(m),
}
})
.forward(sink)
.and_then(|(_, sink)| sink.send(OwnedMessage::Close(None)))
});
spawn_future(f, "Client Status", &executor);
Ok(())
});
runtime.block_on(f).unwrap();
}
fn spawn_future<F, I, E>(f: F, desc: &'static str, executor: &TaskExecutor)
where
F: Future<Item = I, Error = E> + 'static + Send,
E: Debug,
{
executor.spawn(
f.map_err(move |e| println!("{}: '{:?}'", desc, e))
.map(move |_| println!("{}: Finished.", desc)),
);
}