athena_rs 3.26.4

Hyper performant polyglot Database driver
Documentation
use std::time::Duration;

use actix_cors::Cors;
use actix_web::middleware::DefaultHeaders;
use actix_web::{App, HttpResponse, HttpServer, Responder, get};
use anyhow::Result;
use serde_json::{Value, json};
use tracing::{info, warn};

const DEFAULT_WSS_GATEWAY_PORT: u16 = 7070;
const WSS_GATEWAY_SERVER_WORKER: &str = "wss_gateway_server";
const CHAT_OUTBOX_WORKER: &str = "chat_outbox_worker";

#[get("/")]
async fn wss_root() -> impl Responder {
    HttpResponse::Ok().json(runtime_payload("ok"))
}

#[get("/health")]
async fn wss_health() -> impl Responder {
    HttpResponse::Ok().json(runtime_payload("ok"))
}

#[get("/ready")]
async fn wss_ready() -> impl Responder {
    HttpResponse::Ok().json(runtime_payload("ready"))
}

#[get("/ping")]
async fn wss_ping() -> impl Responder {
    HttpResponse::Ok()
        .content_type("text/plain; charset=utf-8")
        .body("pong")
}

pub async fn run_wss_gateway_runtime(context: super::RuntimeBootstrapContext) -> Result<()> {
    let state = context.bootstrap.app_state.clone();
    let identity = super::install_process_daemon_identity("athena_wss_gateway", None)?;
    let port = resolve_wss_gateway_port();
    let worker_count = context.config.get_http_worker_count().max(1);
    let keep_alive = Duration::from_secs(context.config.get_http_keep_alive_timeout_secs());
    let client_disconnect_timeout =
        Duration::from_secs(context.config.get_client_disconnect_timeout_value_secs());
    let client_request_timeout =
        Duration::from_secs(context.config.get_client_request_timeout_value_secs());

    crate::api::chat::runtime::spawn_chat_outbox_worker(state.clone());
    crate::daemon::spawn_runtime_registry_heartbeat(
        state.clone(),
        "athena_wss_gateway",
        Some(identity.daemon_id),
        vec![
            WSS_GATEWAY_SERVER_WORKER.to_string(),
            CHAT_OUTBOX_WORKER.to_string(),
        ],
        json!({
            "management_mode": "dedicated_realtime_host",
            "runtime": "athena_wss_gateway",
            "port": port,
            "mounts": ["/chat/*", "/wss/*", "/openapi-wss.yaml", "/health", "/ready"],
        }),
    );

    info!(
        port,
        worker_count,
        config_path = %context.resolved_config_path.display(),
        pipelines_path = %context.pipelines_path,
        seeded_default_config = context.runtime_config_metadata.seeded_default,
        "Starting dedicated Athena WSS/chat host"
    );

    HttpServer::new(move || {
        let cors = Cors::default()
            .allow_any_origin()
            .allow_any_method()
            .allow_any_header();

        App::new()
            .wrap(DefaultHeaders::new().add(("X-Athena-Version", env!("CARGO_PKG_VERSION"))))
            .wrap(cors)
            .app_data(state.clone())
            .service(wss_root)
            .service(wss_health)
            .service(wss_ready)
            .service(wss_ping)
            .service(crate::api::athena_wss_openapi_host)
            .service(crate::wss::gateway_wss_info)
            .service(crate::wss::gateway_wss_route)
            .configure(crate::api::chat::services)
    })
    .workers(worker_count)
    .keep_alive(keep_alive)
    .client_disconnect_timeout(client_disconnect_timeout)
    .client_request_timeout(client_request_timeout)
    .bind(("0.0.0.0", port))?
    .run()
    .await?;

    Ok(())
}

fn runtime_payload(status: &'static str) -> Value {
    json!({
        "status": status,
        "runtime": "athena_wss_gateway",
        "version": env!("CARGO_PKG_VERSION"),
        "surfaces": {
            "websocket": "/wss/gateway",
            "info": "/wss/info",
            "openapi": "/openapi-wss.yaml",
            "chat_http": "/chat/*"
        },
        "notes": [
            "Route /chat/* and /wss/* to this same process when using the dedicated WSS host.",
            "The current chat fanout publisher is process-local, so durable chat writes and websocket subscriptions need the same runtime."
        ]
    })
}

fn resolve_wss_gateway_port() -> u16 {
    parse_port_candidate(
        "ATHENA_WSS_PORT",
        std::env::var("ATHENA_WSS_PORT").ok().as_deref(),
    )
    .or_else(|| parse_port_candidate("PORT", std::env::var("PORT").ok().as_deref()))
    .unwrap_or(DEFAULT_WSS_GATEWAY_PORT)
}

fn parse_port_candidate(name: &str, raw: Option<&str>) -> Option<u16> {
    let value = raw?.trim();
    if value.is_empty() {
        return None;
    }

    match value.parse::<u16>() {
        Ok(port) => Some(port),
        Err(error) => {
            warn!(
                env_var = name,
                value,
                %error,
                "Ignoring invalid WSS gateway port override"
            );
            None
        }
    }
}

#[cfg(test)]
mod tests {
    use super::parse_port_candidate;

    #[test]
    fn parse_port_candidate_accepts_valid_ports() {
        assert_eq!(
            parse_port_candidate("ATHENA_WSS_PORT", Some("7070")),
            Some(7070)
        );
        assert_eq!(parse_port_candidate("PORT", Some(" 8080 ")), Some(8080));
    }

    #[test]
    fn parse_port_candidate_rejects_empty_or_invalid_values() {
        assert_eq!(parse_port_candidate("ATHENA_WSS_PORT", Some("")), None);
        assert_eq!(parse_port_candidate("ATHENA_WSS_PORT", Some("abc")), None);
        assert_eq!(parse_port_candidate("ATHENA_WSS_PORT", Some("70000")), None);
        assert_eq!(parse_port_candidate("ATHENA_WSS_PORT", None), None);
    }
}