#![deny(warnings)]
use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
};
use futures::{future, Future, FutureExt, StreamExt};
use rweb::{
ws::{Message, WebSocket},
Filter,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Result<Message, rweb::Error>>>>>;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let users = Arc::new(Mutex::new(HashMap::new()));
let users = rweb::any().map(move || users.clone());
let chat = rweb::path("chat")
.and(rweb::ws())
.and(users)
.map(|ws: rweb::ws::Ws, users| {
ws.on_upgrade(move |socket| user_connected(socket, users).map(|result| result.unwrap()))
});
let index = rweb::path::end().map(|| rweb::reply::html(INDEX_HTML));
let routes = index.or(chat);
rweb::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
fn user_connected(ws: WebSocket, users: Users) -> impl Future<Output = Result<(), ()>> {
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
eprintln!("new chat user: {}", my_id);
let (user_ws_tx, user_ws_rx) = ws.split();
let (tx, rx) = mpsc::unbounded_channel();
tokio::task::spawn(
UnboundedReceiverStream::new(rx)
.forward(user_ws_tx)
.map(|result| {
if let Err(e) = result {
eprintln!("websocket send error: {}", e);
}
}),
);
users.lock().unwrap().insert(my_id, tx);
let users2 = users.clone();
user_ws_rx
.for_each(move |msg| {
user_message(my_id, msg.unwrap(), &users);
future::ready(())
})
.then(move |result| {
user_disconnected(my_id, &users2);
future::ok(result)
})
}
fn user_message(my_id: usize, msg: Message, users: &Users) {
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{}>: {}", my_id, msg);
for (&uid, tx) in users.lock().unwrap().iter_mut() {
if my_id != uid {
match tx.send(Ok(Message::text(new_msg.clone()))) {
Ok(()) => (),
Err(_disconnected) => {
}
}
}
}
}
fn user_disconnected(my_id: usize, users: &Users) {
eprintln!("good bye user: {}", my_id);
users.lock().unwrap().remove(&my_id);
}
static INDEX_HTML: &str = r#"
<!DOCTYPE html>
<html>
<head>
<title>Warp Chat</title>
</head>
<body>
<h1>warp chat</h1>
<div id="chat">
<p><em>Connecting...</em></p>
</div>
<input type="text" id="text" />
<button type="button" id="send">Send</button>
<script type="text/javascript">
var uri = 'ws://' + location.host + '/chat';
var ws = new WebSocket(uri);
function message(data) {
var line = document.createElement('p');
line.innerText = data;
chat.appendChild(line);
}
ws.onopen = function() {
chat.innerHTML = "<p><em>Connected!</em></p>";
}
ws.onmessage = function(msg) {
message(msg.data);
};
send.onclick = function() {
var msg = text.value;
ws.send(msg);
text.value = '';
message('<You>: ' + msg);
};
</script>
</body>
</html>
"#;