athena_rs 2.9.1

Database gateway API
Documentation
//! HTTP and WebSocket upgrade handlers for the CDC server.

use super::connection::handle_socket;
use super::events::EventMessage;
use super::state::AppState;
use axum::{
    Json,
    extract::{State, ws::WebSocketUpgrade},
    http::{HeaderMap, StatusCode},
    response::IntoResponse,
};
use serde_json::json;
use std::collections::HashSet;
use tokio::sync::MutexGuard;
use tracing::warn;

/// WebSocket upgrade handler.
///
/// Requires `X-Athena-Client` header (case-insensitive). Returns 400 with a JSON body if missing.
pub async fn ws_handler(
    ws: WebSocketUpgrade,
    headers: HeaderMap,
    State(state): State<AppState>,
) -> impl IntoResponse {
    let client_id = headers
        .get("x-athena-client")
        .or_else(|| headers.get("X-Athena-Client"))
        .and_then(|h| h.to_str().ok())
        .unwrap_or("")
        .trim()
        .to_string();

    if client_id.is_empty() {
        return (
            StatusCode::BAD_REQUEST,
            Json(json!({
                "error": "Missing X-Athena-Client header",
                "message": "Pass X-Athena-Client when connecting to subscribe to CDC events",
                "hint": "Add request header: X-Athena-Client: <your_client_name> (use ws:// not wss:// if the server has no TLS)"
            })),
        )
            .into_response();
    }

    ws.on_upgrade(move |socket| handle_socket(socket, state, client_id))
}

/// POST /events — publish an event.
///
/// Returns 200 with `status: "voided"` if no WebSocket subscriber for `organization_id`;
/// otherwise broadcasts and returns 200 with `status: "delivered"`.
pub async fn publish_event(
    State(state): State<AppState>,
    Json(event): Json<EventMessage>,
) -> impl IntoResponse {
    let organization_id = event.organization_id.clone();

    let has_subscribers = {
        let subscribers: MutexGuard<'_, HashSet<String>> = state.active_subscribers.lock().await;
        subscribers.contains(&organization_id)
    };

    if !has_subscribers {
        return (
            StatusCode::OK,
            Json(json!({
                "message": "there was no subscriber to this organization_id channel so the message has been voided",
                "organization_id": organization_id,
                "status": "voided",
                "success": true
            })),
        )
            .into_response();
    }

    let event_json = match serde_json::to_string(&event) {
        Ok(s) => s,
        Err(e) => {
            warn!("Failed to serialize event for broadcast: {}", e);
            return (
                StatusCode::INTERNAL_SERVER_ERROR,
                "Failed to serialize event",
            )
                .into_response();
        }
    };
    if let Err(err) = state.tx.send(event_json) {
        warn!("Failed to broadcast event: {}", err);
        return (
            StatusCode::INTERNAL_SERVER_ERROR,
            "Failed to broadcast event",
        )
            .into_response();
    }

    (
        StatusCode::OK,
        Json(json!({
            "status": "delivered",
            "success": true,
            "organization_id": organization_id
        })),
    )
        .into_response()
}

/// GET /status — server health; returns JSON with `status: "ok"`.
pub async fn status() -> impl IntoResponse {
    (
        StatusCode::OK,
        Json(json!({
            "status": "ok",
            "success": true,
            "server": "dms-server-api"
        })),
    )
        .into_response()
}