use std::time::{Duration, Instant};
use actix_ws::Message;
use futures_util::{
future::{select, Either},
StreamExt as _,
};
use tokio::{pin, sync::mpsc, time::interval};
use super::ChatServerHandle;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
pub async fn chat_ws(
chat_server: ChatServerHandle,
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
) {
log::info!("connected");
session
.text("Ok, Websocket connection established ~")
.await
.unwrap();
let mut name = None;
let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL);
let (conn_tx, mut conn_rx) = mpsc::unbounded_channel();
let conn_id = chat_server.connect(conn_tx).await;
let close_reason = loop {
let tick = interval.tick();
pin!(tick);
let msg_rx = conn_rx.recv();
pin!(msg_rx);
let messages = select(msg_stream.next(), msg_rx);
pin!(messages);
match select(messages, tick).await {
Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => {
log::debug!("msg: {msg:?}");
match msg {
Message::Ping(bytes) => {
last_heartbeat = Instant::now();
session.pong(&bytes).await.unwrap();
}
Message::Pong(_) => {
last_heartbeat = Instant::now();
}
Message::Text(text) => {
super::process_text_msg(
&chat_server,
&mut session,
&text,
conn_id,
&mut name,
)
.await;
}
Message::Binary(_bin) => {
log::warn!("unexpected binary message");
}
Message::Close(reason) => break reason,
_ => {
break None;
}
}
}
Either::Left((Either::Left((Some(Err(err)), _)), _)) => {
log::error!("{}", err);
break None;
}
Either::Left((Either::Left((None, _)), _)) => break None,
Either::Left((Either::Right((Some(chat_msg), _)), _)) => {
session.text(chat_msg).await.unwrap();
}
Either::Left((Either::Right((None, _)), _)) => unreachable!(
"all connection message senders were dropped; chat server may have panicked"
),
Either::Right((_inst, _)) => {
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);
break None;
}
let _ = session.ping(b"Are you there?").await;
}
};
};
chat_server.disconnect(conn_id);
let _ = session.close(close_reason).await;
}