use crate::{
messages::{Messages, ServerSignalMessage, ServerSignalUpdate},
server_signals::ServerSignals,
};
use axum::extract::ws::Message;
use futures::{future::BoxFuture, stream::SplitSink, SinkExt, StreamExt};
use leptos::logging::error;
use std::sync::Arc;
use tokio::{
spawn,
sync::{broadcast::Receiver, RwLock},
};
async fn handle_broadcasts(
mut receiver: Receiver<ServerSignalUpdate>,
sink: Arc<RwLock<SplitSink<axum::extract::ws::WebSocket, axum::extract::ws::Message>>>,
) {
while let Ok(message) = receiver.recv().await {
if sink
.write()
.await
.send(Message::Text(
serde_json::to_string(&Messages::ServerSignal(ServerSignalMessage::Update(
message,
)))
.unwrap()
.into(),
))
.await
.is_err()
{
break;
};
}
}
use axum::extract::WebSocketUpgrade;
use axum::response::Response;
pub fn websocket(
server_signals: ServerSignals,
) -> impl Fn(WebSocketUpgrade) -> BoxFuture<'static, Response> + Clone + Send + 'static {
move |ws: WebSocketUpgrade| {
let value = server_signals.clone();
Box::pin(async move { ws.on_upgrade(move |socket| handle_socket(socket, value)) })
}
}
async fn handle_socket(socket: axum::extract::ws::WebSocket, server_signals: ServerSignals) {
let (send, mut recv) = socket.split();
let send = Arc::new(RwLock::new(send));
let _ = spawn(async move {
while let Some(message) = recv.next().await {
if let Ok(msg) = message {
match msg {
Message::Text(text) => {
if let Ok(message) = serde_json::from_str::<Messages>(&text) {
match message {
Messages::ServerSignal(server_msg) => match server_msg {
ServerSignalMessage::Establish(name) => {
let recv = server_signals
.add_observer(name.clone())
.await
.unwrap();
send.clone()
.write()
.await
.send(Message::Text(
serde_json::to_string(&Messages::ServerSignal(
ServerSignalMessage::EstablishResponse((
name.clone(),
server_signals
.json(name.clone())
.await
.unwrap()
.unwrap(),
)),
))
.unwrap()
.into(),
))
.await
.unwrap();
spawn(handle_broadcasts(recv, send.clone()));
}
_ => error!("Unexpected server signal message from client"),
},
}
} else {
leptos::logging::error!("Error transmitting message")
}
}
Message::Binary(_) => todo!(),
Message::Ping(_) => send
.clone()
.write()
.await
.send(Message::Pong(vec![1, 2, 3].into()))
.await
.unwrap(),
Message::Pong(_) => todo!(),
Message::Close(_) => {}
}
} else {
break;
}
}
})
.await;
}