#![allow(unused_variables)]
#![allow(dead_code)]
use crate::adapter::ConnectionHandler;
use axum::extract::{Path, Query, State};
use axum::http::HeaderMap;
use axum::response::IntoResponse;
use fastwebsockets::upgrade;
use serde::Deserialize;
use std::sync::Arc;
use tracing::log::error;
#[derive(Debug, Deserialize)]
pub struct ConnectionQuery {
protocol: Option<u8>,
client: Option<String>,
version: Option<String>,
}
pub async fn handle_ws_upgrade(
Path(app_key): Path<String>,
Query(_params): Query<ConnectionQuery>,
headers: HeaderMap,
ws: upgrade::IncomingUpgrade,
State(handler): State<Arc<ConnectionHandler>>,
) -> impl IntoResponse {
let (response, fut) = match ws.upgrade() {
Ok((response, fut)) => (response, fut),
Err(e) => {
error!("WebSocket upgrade failed: {e}");
if let Some(ref metrics) = handler.metrics {
let metrics_locked = metrics.lock().await;
metrics_locked.mark_connection_error(&app_key, "websocket_upgrade_failed");
}
return (http::StatusCode::BAD_REQUEST, "WebSocket upgrade failed").into_response();
}
};
let origin = headers
.get(axum::http::header::ORIGIN)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
tokio::task::spawn(async move {
if let Err(e) = handler.handle_socket(fut, app_key.clone(), origin).await {
error!("Error handling socket: {e}");
if let Some(ref metrics) = handler.metrics {
let metrics_locked = metrics.lock().await;
match &e {
crate::error::Error::ApplicationNotFound
| crate::error::Error::ApplicationDisabled
| crate::error::Error::OriginNotAllowed
| crate::error::Error::Auth(_)
| crate::error::Error::InvalidMessageFormat(_)
| crate::error::Error::InvalidEventName(_) => {
}
_ => {
metrics_locked.mark_connection_error(&app_key, "socket_handling_failed");
}
}
}
}
});
response.into_response()
}