arcly-http 0.1.2

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! WebSocket boundary: upgrade, per-socket read/write pumps, event dispatch.
//!
//! This is the *only* module that touches `axum::extract::ws` — the analogue of
//! [`crate::web::boundary`] for the real-time layer. Everything above it speaks
//! arcly types ([`WsClient`], [`WsMessage`], [`GatewayRuntime`]).
//!
//! ## Per-connection model (no hot-path locks)
//!
//! ```text
//!            ┌─────────────────── handle_socket task ───────────────────┐
//!  socket ──>│ reader: stream.next() ─> dispatch(event) ─> handler fut  │
//!            │ writer: rx.recv()     ─> sink.send(frame)                │
//!            └───────────────────────────────────────────────────────────┘
//! ```
//! The reader and writer run as independent halves of the split socket. Inbound
//! frames are parsed and routed through the gateway's `dispatch` table (an
//! immutable `&HashMap` — lock-free read). Outbound frames are produced by any
//! task via the registry's sharded channels and drained by this socket's writer.

use std::sync::Arc;

use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::http::HeaderMap;
use axum::routing::{get, MethodRouter};
use futures::{SinkExt, StreamExt};
use tokio::sync::{mpsc, oneshot};

use crate::core::engine::FrozenDiContainer;
use crate::realtime::connection::{ConnectionRegistry, WsClient, WsMessage};
use crate::realtime::gateway::GatewayRuntime;
use crate::web::context::Claims;

/// Per-gateway runtime tuning, sourced from `LaunchConfig` at mount time.
#[derive(Clone, Copy, Debug)]
pub struct WsTuning {
    /// Outbound queue depth per socket — the slow-client memory ceiling.
    pub outbound_buffer: usize,
    /// Hard cap on concurrent sockets across all gateways (`0` = unlimited);
    /// beyond it upgrades are refused with `503` before any socket exists.
    pub max_connections: usize,
    /// Server→client Ping cadence (`ZERO` disables). Pings provoke pongs,
    /// which feed the idle sweeper's `last_seen`.
    pub ping_interval: std::time::Duration,
}

/// Build the axum `MethodRouter` that upgrades HTTP→WebSocket for one gateway.
///
/// If a `JwtService` has been provided in the DI container, the
/// `Authorization: Bearer <token>` header is decoded during the WebSocket
/// handshake and the resulting claims are threaded through to every `WsClient`
/// so gateway handlers can call `client.claims()` for auth decisions.
pub fn ws_route(
    runtime: &'static GatewayRuntime,
    registry: &'static ConnectionRegistry,
    container: &'static FrozenDiContainer,
    tuning: WsTuning,
) -> MethodRouter {
    let handler = move |ws: WebSocketUpgrade, headers: HeaderMap| async move {
        // Admission control happens BEFORE the upgrade — past the cap no
        // socket, queue, or registry entry is ever created.
        if tuning.max_connections > 0 && registry.connection_count() >= tuning.max_connections {
            metrics::counter!("ws_upgrades_refused_total").increment(1);
            return axum::http::Response::builder()
                .status(503)
                .header("retry-after", "5")
                .body(axum::body::Body::from("websocket capacity reached"))
                .expect("static refusal");
        }
        // The SAME unified extraction as the HTTP boundary (pipeline):
        // trace + tenant + credentials in one pass. The handshake
        // authenticates once; gateway handlers see claims AND the resolved
        // tenant, and the connection inherits the caller's trace identity.
        let provenance = crate::pipeline::Provenance::from_headers(&headers, container).await;
        tracing::debug!(
            trace_id = %crate::observability::lean_telemetry::hex_encode(&provenance.trace.trace_id),
            tenant = provenance.tenant.as_deref().map(|t| t.id.as_str()).unwrap_or(""),
            "WS handshake provenance"
        );
        ws.on_upgrade(move |socket| {
            handle_socket(
                socket,
                runtime,
                registry,
                provenance.claims,
                provenance.tenant,
                tuning,
            )
        })
    };
    get(handler)
}

/// Drive one upgraded socket to completion: register, pump, dispatch, drain.
async fn handle_socket(
    socket: WebSocket,
    runtime: &'static GatewayRuntime,
    registry: &'static ConnectionRegistry,
    claims: Option<Arc<Claims>>,
    tenant: Option<Arc<crate::web::tenant::TenantConfig>>,
    tuning: WsTuning,
) {
    let (mut sink, mut stream) = socket.split();

    // Outbound queue: any task enqueues, this socket's writer drains.
    // **Bounded** — the depth is the per-socket memory ceiling; a client
    // that can't drain it gets evicted by the registry, never buffered
    // without limit.
    let (tx, mut rx) = mpsc::channel::<WsMessage>(tuning.outbound_buffer.max(1));
    let id = registry.register(tx, claims.clone());
    let client = WsClient::__new(id, registry, claims, tenant);

    // One-shot signal: when the writer exits for *any* reason (peer error,
    // server-initiated Close, or channel closed), the reader is unblocked so it
    // stops polling the stream and runs on_disconnect + unregister. Without this,
    // a server-initiated close would leave the reader blocked on stream.next()
    // indefinitely if the peer never sends a Close echo.
    let (close_tx, mut close_rx) = oneshot::channel::<()>();

    // Writer half — owns the sink; exits when the queue closes or the peer
    // dies. A periodic Ping (when configured) keeps NATs/proxies open and
    // provokes pongs that feed the idle sweeper's `last_seen`.
    let ping_every = tuning.ping_interval;
    let writer = tokio::spawn(async move {
        let mut ping = (!ping_every.is_zero()).then(|| {
            let mut t = tokio::time::interval(ping_every);
            t.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
            t
        });
        loop {
            let msg = if let Some(t) = ping.as_mut() {
                tokio::select! {
                    m = rx.recv() => m,
                    _ = t.tick() => {
                        if sink.send(Message::Ping(Vec::new())).await.is_err() {
                            break;
                        }
                        continue;
                    }
                }
            } else {
                rx.recv().await
            };
            let Some(msg) = msg else { break };
            let frame = match msg {
                WsMessage::Text(arc) => Message::Text(String::from(arc.as_ref())),
                WsMessage::Ping => Message::Ping(Vec::new()),
                WsMessage::Close => {
                    // Send close frame then exit immediately — do NOT loop back
                    // to rx.recv(), which would keep the sink open indefinitely.
                    let _ = sink.send(Message::Close(None)).await;
                    break;
                }
            };
            if sink.send(frame).await.is_err() {
                break;
            }
        }
        // Dropping close_tx signals the reader regardless of why we exited.
        drop(close_tx);
    });

    (runtime.on_connect)(client.clone()).await;

    // Reader half — routes inbound frames to subscribed handlers.
    // Also watches for the writer-exit signal so a server-initiated close
    // (WsMessage::Close enqueued by a handler) terminates the reader promptly.
    loop {
        tokio::select! {
            biased;
            // Writer exited (server-initiated close or peer write error).
            _ = &mut close_rx => break,
            frame = stream.next() => match frame {
                None => break,
                Some(Err(_)) => break,
                Some(Ok(frame)) => {
                    // Any inbound frame (including pongs from our pings)
                    // proves the link is alive for the idle sweeper.
                    registry.touch(id);
                    match frame {
                        Message::Text(text) => dispatch_event(runtime, &client, &text).await,
                        Message::Binary(_) => { /* binary multiplexing not enabled */ }
                        Message::Close(_) => break,
                        Message::Ping(_) | Message::Pong(_) => { /* axum auto-replies to pings */ }
                    }
                }
            }
        }
    }

    (runtime.on_disconnect)(client.clone()).await;
    registry.unregister(id);
    writer.abort();
}

/// Parse one `{ "event": ..., "data": ... }` envelope and invoke its handler.
/// Unknown events and malformed frames are ignored (a hostile client can't
/// crash the dispatcher).
async fn dispatch_event(runtime: &'static GatewayRuntime, client: &WsClient, raw: &str) {
    let Ok(value) = serde_json::from_str::<serde_json::Value>(raw) else {
        return;
    };
    let Some(event) = value.get("event").and_then(|e| e.as_str()) else {
        return;
    };
    let Some(handler) = runtime.handler(event) else {
        return;
    };

    let data = value
        .get("data")
        .cloned()
        .unwrap_or(serde_json::Value::Null);
    let data_str: Arc<str> = Arc::from(serde_json::to_string(&data).unwrap_or_default());

    metrics::counter!("ws_messages_in_total").increment(1);
    // Handler errors stay at the transport edge — gateways own their own
    // error-to-client signalling — but they are counted and logged so a
    // misbehaving event handler is visible on dashboards.
    if let Err(e) = handler(client.clone(), data_str).await {
        metrics::counter!("ws_handler_errors_total").increment(1);
        tracing::debug!(conn = client.id(), event, error = %e, "gateway handler error");
    }
}