use crate::app_state::GlobalAppState;
use crate::yrs_axum::{AxumSink, AxumStream, YrsDoc};
use axum::{
extract::{
ws::{WebSocket, WebSocketUpgrade},
Path, State,
},
response::IntoResponse,
};
use futures::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Mutex;
pub async fn get_yjs_default_room_handler(
ws: WebSocketUpgrade,
State(state): State<GlobalAppState>,
) -> impl IntoResponse {
let room_name = String::from("__default");
ws.on_upgrade(|socket| yjs_socket_handler(room_name, socket, state))
}
pub async fn get_yjs_named_room_handler(
ws: WebSocketUpgrade,
Path(room_name): Path<String>,
State(state): State<GlobalAppState>,
) -> impl IntoResponse {
println!("Got request for room {}", room_name);
ws.on_upgrade(|socket| yjs_socket_handler(room_name, socket, state))
}
async fn yjs_socket_handler(
room_name: String,
ws: WebSocket,
state: GlobalAppState,
) {
if !state.get_docs().contains_key(&room_name) {
state
.get_docs()
.insert(room_name.clone(), YrsDoc::new().await);
}
let yrs_doc_item = state.get_docs().get(&room_name).unwrap();
let yrs_doc = yrs_doc_item.value();
let (sink, stream) = ws.split();
let sink = Arc::new(Mutex::new(AxumSink::from(sink)));
let stream = AxumStream::from(stream);
let bsub = yrs_doc.get_broadcast().subscribe(sink, stream);
match bsub.completed().await {
Ok(_) => println!("broadcasting finished successfully"),
Err(e) => {
eprintln!("broadcasting finished abruptly: {}", e)
}
}
}