use bytes::Bytes;
use futures::Stream;
use rweb::{get, post, sse::Event, Filter, Rejection, Reply};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
enum Message {
UserId(usize),
Reply(String),
}
#[derive(Debug)]
struct NotUtf8;
impl rweb::reject::Reject for NotUtf8 {}
type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
#[post("/chat/{my_id}")]
async fn send_chat(
my_id: usize,
#[body] msg: Bytes,
#[data] users: Users,
) -> Result<impl Reply, Rejection> {
let msg = std::str::from_utf8(&msg)
.map(String::from)
.map_err(|_e| rweb::reject::custom(NotUtf8))?;
user_message(my_id, msg, &users);
Ok(rweb::reply())
}
#[get("/chat")]
fn recv_chat(#[data] users: Users) -> impl Reply {
let stream = user_connected(users);
rweb::sse::reply(rweb::sse::keep_alive().stream(stream))
}
#[get("/")]
fn index() -> impl Reply {
rweb::http::Response::builder()
.header("content-type", "text/html; charset=utf-8")
.body(INDEX_HTML)
}
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let users = Arc::new(Mutex::new(HashMap::new()));
let chat_send = send_chat(users.clone());
let chat_recv = recv_chat(users.clone());
let routes = index().or(chat_recv).or(chat_send);
rweb::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
fn user_connected(users: Users) -> impl Stream<Item = Result<Event, rweb::Error>> + Send + 'static {
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
eprintln!("new chat user: {}", my_id);
let (tx, rx) = mpsc::unbounded_channel();
match tx.send(Message::UserId(my_id)) {
Ok(()) => (),
Err(_disconnected) => {
}
}
let users2 = users.clone();
users.lock().unwrap().insert(my_id, tx);
let (mut dtx, mut drx) = oneshot::channel::<()>();
tokio::task::spawn(async move {
dtx.closed().await;
drx.close();
user_disconnected(my_id, &users2);
});
UnboundedReceiverStream::new(rx).map(|msg| match msg {
Message::UserId(my_id) => Ok(rweb::sse::Event::default()
.event("user")
.data(my_id.to_string())),
Message::Reply(reply) => Ok(rweb::sse::Event::default().data(reply)),
})
}
fn user_message(my_id: usize, msg: String, users: &Users) {
let new_msg = format!("<User#{}>: {}", my_id, msg);
for (&uid, tx) in users.lock().unwrap().iter_mut() {
if my_id != uid {
match tx.send(Message::Reply(new_msg.clone())) {
Ok(()) => (),
Err(_disconnected) => {
}
}
}
}
}
fn user_disconnected(my_id: usize, users: &Users) {
eprintln!("good bye user: {}", my_id);
users.lock().unwrap().remove(&my_id);
}
static INDEX_HTML: &str = r#"
<!DOCTYPE html>
<html>
<head>
<title>Warp Chat</title>
</head>
<body>
<h1>warp chat</h1>
<div id="chat">
<p><em>Connecting...</em></p>
</div>
<input type="text" id="text" />
<button type="button" id="send">Send</button>
<script type="text/javascript">
var uri = 'http://' + location.host + '/chat';
var sse = new EventSource(uri);
function message(data) {
var line = document.createElement('p');
line.innerText = data;
chat.appendChild(line);
}
sse.onopen = function() {
chat.innerHTML = "<p><em>Connected!</em></p>";
}
var user_id;
sse.addEventListener("user", function(msg) {
user_id = msg.data;
});
sse.onmessage = function(msg) {
message(msg.data);
};
send.onclick = function() {
var msg = text.value;
var xhr = new XMLHttpRequest();
xhr.open("POST", uri + '/' + user_id, true);
xhr.send(msg);
text.value = '';
message('<You>: ' + msg);
};
</script>
</body>
</html>
"#;