use moltendb_auth as auth;
use moltendb_core::engine;
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
extract::ws::Utf8Bytes,
};
use futures::{sink::SinkExt, stream::StreamExt};
use tracing::warn;
pub async fn ws_handler(
ws: WebSocketUpgrade,
State((db, _, _max_body_size, _)): State<(engine::Db, auth::UserStore, usize, String)>,
) -> impl axum::response::IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, db))
}
async fn handle_socket(mut socket: WebSocket, db: engine::Db) {
let is_authenticated = match socket.next().await {
Some(Ok(Message::Text(text))) => {
if let Ok(payload) = serde_json::from_str::<serde_json::Value>(&text) {
if payload["action"].as_str() == Some("AUTH") {
if let Some(token) = payload["token"].as_str() {
auth::verify_token(token).is_ok()
} else {
false
}
} else {
false
}
} else {
false
}
}
_ => false,
};
if !is_authenticated {
let _ = socket
.send(Message::Text(Utf8Bytes::from(
r#"{"error":"Authentication required. Send {\"action\":\"AUTH\",\"token\":\"<jwt>\"} as the first message."}"#,
)))
.await;
let _ = socket.close().await;
warn!("🔒 Rejected unauthenticated WebSocket connection.");
return;
}
let _ = socket
.send(Message::Text(Utf8Bytes::from(
r#"{"status":"authenticated","message":"Connected to MoltenDB real-time feed. Use HTTP endpoints for CRUD. Send {\"action\":\"SUBSCRIBE\",\"collection\":\"<name>\"} to register interest."}"#,
)))
.await;
let (mut sender, mut receiver) = socket.split();
let mut rx = db.subscribe();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(_text))) = receiver.next().await {
}
});
let mut send_task = tokio::spawn(async move {
loop {
tokio::select! {
Ok(msg) = rx.recv() => {
if sender.send(Message::Text(Utf8Bytes::from(msg))).await.is_err() {
break; }
}
else => break,
}
}
});
tokio::select! {
_ = (&mut recv_task) => send_task.abort(),
_ = (&mut send_task) => recv_task.abort(),
};
}