use std::time::Duration;
use actix_ws::Message;
use futures_util::{
StreamExt as _,
future::{Either, select},
};
use switchy_async::sync::mpsc;
use tokio::{pin, time::interval};
use crate::ws::{ConnId, server::WsServerHandle};
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
#[cfg_attr(feature = "profiling", profiling::function)]
#[allow(clippy::future_not_send)]
pub async fn handle_ws(
ws_server: WsServerHandle,
profile: String,
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
) {
log::debug!("connected");
let mut name = None;
let mut last_heartbeat = switchy_time::instant_now();
let mut interval = interval(HEARTBEAT_INTERVAL);
let (conn_tx, mut conn_rx) = mpsc::unbounded();
let conn_id = ws_server.connect(profile, conn_tx).await;
let close_reason = loop {
#[cfg(feature = "profiling")]
profiling::function_scope!("loop");
let tick = interval.tick();
pin!(tick);
let msg_rx = conn_rx.recv_async();
pin!(msg_rx);
let messages = select(msg_stream.next(), msg_rx);
pin!(messages);
match select(messages, tick).await {
Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => match msg {
Message::Ping(bytes) => {
last_heartbeat = switchy_time::instant_now();
session.pong(&bytes).await.unwrap();
}
Message::Pong(_) => {
last_heartbeat = switchy_time::instant_now();
}
Message::Text(text) => {
process_text_msg(&ws_server, &text, conn_id, &mut name).await;
}
Message::Binary(bytes) => match String::from_utf8(bytes.to_vec()) {
Ok(text) => {
process_text_msg(&ws_server, &text, conn_id, &mut name).await;
}
Err(e) => {
log::warn!("unexpected binary message: {e:?}");
}
},
Message::Close(reason) => break reason,
_ => {
break None;
}
},
Either::Left((Either::Left((Some(Err(err)), _)), _)) => {
log::error!("{err}");
break None;
}
Either::Left((Either::Left((None, _)), _)) => break None,
Either::Left((Either::Right((Ok(ws_msg), _)), _)) => {
if let Err(err) = session.text(ws_msg).await {
log::error!("Failed to send text message: {err:?}");
}
}
Either::Left((Either::Right((Err(_), _)), _)) => unreachable!(
"all connection message senders were dropped; ws server may have panicked"
),
Either::Right((_inst, _)) => {
if switchy_time::instant_now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);
break None;
}
let _ = session.ping(b"").await;
}
}
};
ws_server.disconnect(conn_id).await;
let _ = session.close(close_reason).await;
}
async fn process_text_msg(
ws_server: &WsServerHandle,
text: &str,
conn: ConnId,
name: &mut Option<String>,
) {
let msg = text.trim();
let msg = name
.as_mut()
.map_or_else(|| msg.to_owned(), |name| format!("{name}: {msg}"));
ws_server.send_message(conn, msg).await;
}