use std::time::Duration;
use actix_cors::Cors;
use actix_web::middleware::DefaultHeaders;
use actix_web::{App, HttpResponse, HttpServer, Responder, get};
use anyhow::Result;
use serde_json::{Value, json};
use tracing::{info, warn};
const DEFAULT_WSS_GATEWAY_PORT: u16 = 7070;
const WSS_GATEWAY_SERVER_WORKER: &str = "wss_gateway_server";
const CHAT_OUTBOX_WORKER: &str = "chat_outbox_worker";
#[get("/")]
async fn wss_root() -> impl Responder {
HttpResponse::Ok().json(runtime_payload("ok"))
}
#[get("/health")]
async fn wss_health() -> impl Responder {
HttpResponse::Ok().json(runtime_payload("ok"))
}
#[get("/ready")]
async fn wss_ready() -> impl Responder {
HttpResponse::Ok().json(runtime_payload("ready"))
}
#[get("/ping")]
async fn wss_ping() -> impl Responder {
HttpResponse::Ok()
.content_type("text/plain; charset=utf-8")
.body("pong")
}
pub async fn run_wss_gateway_runtime(context: super::RuntimeBootstrapContext) -> Result<()> {
let state = context.bootstrap.app_state.clone();
let identity = super::install_process_daemon_identity("athena_wss_gateway", None)?;
let port = resolve_wss_gateway_port();
let worker_count = context.config.get_http_worker_count().max(1);
let keep_alive = Duration::from_secs(context.config.get_http_keep_alive_timeout_secs());
let client_disconnect_timeout =
Duration::from_secs(context.config.get_client_disconnect_timeout_value_secs());
let client_request_timeout =
Duration::from_secs(context.config.get_client_request_timeout_value_secs());
crate::api::chat::runtime::spawn_chat_outbox_worker(state.clone());
crate::daemon::spawn_runtime_registry_heartbeat(
state.clone(),
"athena_wss_gateway",
Some(identity.daemon_id),
vec![
WSS_GATEWAY_SERVER_WORKER.to_string(),
CHAT_OUTBOX_WORKER.to_string(),
],
json!({
"management_mode": "dedicated_realtime_host",
"runtime": "athena_wss_gateway",
"port": port,
"mounts": ["/chat/*", "/wss/*", "/openapi-wss.yaml", "/health", "/ready"],
}),
);
info!(
port,
worker_count,
config_path = %context.resolved_config_path.display(),
pipelines_path = %context.pipelines_path,
seeded_default_config = context.runtime_config_metadata.seeded_default,
"Starting dedicated Athena WSS/chat host"
);
HttpServer::new(move || {
let cors = Cors::default()
.allow_any_origin()
.allow_any_method()
.allow_any_header();
App::new()
.wrap(DefaultHeaders::new().add(("X-Athena-Version", env!("CARGO_PKG_VERSION"))))
.wrap(cors)
.app_data(state.clone())
.service(wss_root)
.service(wss_health)
.service(wss_ready)
.service(wss_ping)
.service(crate::api::athena_wss_openapi_host)
.service(crate::wss::gateway_wss_info)
.service(crate::wss::gateway_wss_route)
.configure(crate::api::chat::services)
})
.workers(worker_count)
.keep_alive(keep_alive)
.client_disconnect_timeout(client_disconnect_timeout)
.client_request_timeout(client_request_timeout)
.bind(("0.0.0.0", port))?
.run()
.await?;
Ok(())
}
fn runtime_payload(status: &'static str) -> Value {
json!({
"status": status,
"runtime": "athena_wss_gateway",
"version": env!("CARGO_PKG_VERSION"),
"surfaces": {
"websocket": "/wss/gateway",
"info": "/wss/info",
"openapi": "/openapi-wss.yaml",
"chat_http": "/chat/*"
},
"notes": [
"Route /chat/* and /wss/* to this same process when using the dedicated WSS host.",
"The current chat fanout publisher is process-local, so durable chat writes and websocket subscriptions need the same runtime."
]
})
}
fn resolve_wss_gateway_port() -> u16 {
parse_port_candidate(
"ATHENA_WSS_PORT",
std::env::var("ATHENA_WSS_PORT").ok().as_deref(),
)
.or_else(|| parse_port_candidate("PORT", std::env::var("PORT").ok().as_deref()))
.unwrap_or(DEFAULT_WSS_GATEWAY_PORT)
}
fn parse_port_candidate(name: &str, raw: Option<&str>) -> Option<u16> {
let value = raw?.trim();
if value.is_empty() {
return None;
}
match value.parse::<u16>() {
Ok(port) => Some(port),
Err(error) => {
warn!(
env_var = name,
value,
%error,
"Ignoring invalid WSS gateway port override"
);
None
}
}
}
#[cfg(test)]
mod tests {
use super::parse_port_candidate;
#[test]
fn parse_port_candidate_accepts_valid_ports() {
assert_eq!(
parse_port_candidate("ATHENA_WSS_PORT", Some("7070")),
Some(7070)
);
assert_eq!(parse_port_candidate("PORT", Some(" 8080 ")), Some(8080));
}
#[test]
fn parse_port_candidate_rejects_empty_or_invalid_values() {
assert_eq!(parse_port_candidate("ATHENA_WSS_PORT", Some("")), None);
assert_eq!(parse_port_candidate("ATHENA_WSS_PORT", Some("abc")), None);
assert_eq!(parse_port_candidate("ATHENA_WSS_PORT", Some("70000")), None);
assert_eq!(parse_port_candidate("ATHENA_WSS_PORT", None), None);
}
}