use super::connection::handle_socket;
use super::events::EventMessage;
use super::state::AppState;
use axum::{
Json,
extract::{State, ws::WebSocketUpgrade},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use serde_json::json;
use std::collections::HashSet;
use tokio::sync::MutexGuard;
use tracing::warn;
pub async fn ws_handler(
ws: WebSocketUpgrade,
headers: HeaderMap,
State(state): State<AppState>,
) -> impl IntoResponse {
let client_id = headers
.get("x-athena-client")
.or_else(|| headers.get("X-Athena-Client"))
.and_then(|h| h.to_str().ok())
.unwrap_or("")
.trim()
.to_string();
if client_id.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": "Missing X-Athena-Client header",
"message": "Pass X-Athena-Client when connecting to subscribe to CDC events",
"hint": "Add request header: X-Athena-Client: <your_client_name> (use ws:// not wss:// if the server has no TLS)"
})),
)
.into_response();
}
ws.on_upgrade(move |socket| handle_socket(socket, state, client_id))
}
pub async fn publish_event(
State(state): State<AppState>,
Json(event): Json<EventMessage>,
) -> impl IntoResponse {
let organization_id = event.organization_id.clone();
let has_subscribers = {
let subscribers: MutexGuard<'_, HashSet<String>> = state.active_subscribers.lock().await;
subscribers.contains(&organization_id)
};
if !has_subscribers {
return (
StatusCode::OK,
Json(json!({
"message": "there was no subscriber to this organization_id channel so the message has been voided",
"organization_id": organization_id,
"status": "voided",
"success": true
})),
)
.into_response();
}
let event_json = match serde_json::to_string(&event) {
Ok(s) => s,
Err(e) => {
warn!("Failed to serialize event for broadcast: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to serialize event",
)
.into_response();
}
};
if let Err(err) = state.tx.send(event_json) {
warn!("Failed to broadcast event: {}", err);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to broadcast event",
)
.into_response();
}
(
StatusCode::OK,
Json(json!({
"status": "delivered",
"success": true,
"organization_id": organization_id
})),
)
.into_response()
}
pub async fn status() -> impl IntoResponse {
(
StatusCode::OK,
Json(json!({
"status": "ok",
"success": true,
"server": "dms-server-api"
})),
)
.into_response()
}