cellos-server 0.5.1

HTTP control plane for CellOS — admission, projection over JetStream, WebSocket fan-out of CloudEvents. Pure event-sourced architecture.
Documentation
//! `cellos-server` binary entrypoint.
//!
//! Reads three env vars at startup:
//! - `CELLOS_SERVER_BIND` (default `127.0.0.1:8080`) — TCP listen addr.
//! - `CELLOS_NATS_URL` (default `nats://127.0.0.1:4222`) — broker URL.
//! - `CELLOS_SERVER_API_TOKEN` (REQUIRED) — bearer token clients must
//!   present. Server refuses to start if unset — fail-closed over
//!   fail-open is a doctrine non-negotiable.

use std::process::ExitCode;

use anyhow::Context;
use tokio::net::TcpListener;
use tracing::{info, warn};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};

use cellos_server::jetstream::{ensure_stream, replay_projection};
use cellos_server::{router, AppState};

#[tokio::main]
async fn main() -> ExitCode {
    // HIGH-B5 (red-team wave-1, fixed wave-2.5): attach the redacted filter
    // to the fmt layer so reqwest/hyper TRACE events carrying bearer
    // tokens are suppressed before they reach stderr — even when the
    // operator sets `RUST_LOG=trace`. See
    // `cellos_core::observability::redacted_filter` for the policy.
    let fmt_layer = tracing_subscriber::fmt::layer()
        .json()
        .with_filter(cellos_core::observability::redacted_filter());

    tracing_subscriber::registry()
        .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
        .with(fmt_layer)
        .init();

    match run().await {
        Ok(()) => ExitCode::SUCCESS,
        Err(e) => {
            // `{:#}` walks the anyhow chain so the operator sees the full
            // root cause, not just the top-level message.
            tracing::error!(error = format!("{e:#}"), "cellos-server fatal");
            ExitCode::FAILURE
        }
    }
}

async fn run() -> anyhow::Result<()> {
    let bind = std::env::var("CELLOS_SERVER_BIND").unwrap_or_else(|_| "127.0.0.1:8080".to_owned());
    let nats_url =
        std::env::var("CELLOS_NATS_URL").unwrap_or_else(|_| "nats://127.0.0.1:4222".to_owned());
    let token = std::env::var("CELLOS_SERVER_API_TOKEN")
        .context("CELLOS_SERVER_API_TOKEN must be set (fail-closed)")?;

    if token.trim().is_empty() {
        anyhow::bail!("CELLOS_SERVER_API_TOKEN must not be empty");
    }

    // NATS connection is best-effort at startup. A broker outage MUST
    // NOT prevent the HTTP query interface from serving cached state —
    // operators need to be able to inspect the system precisely when the
    // event log is unhealthy. WebSocket clients will see an immediate
    // close with `no upstream broker configured` until reconnect lands.
    let nats = match async_nats::connect(&nats_url).await {
        Ok(c) => {
            info!(%nats_url, "connected to NATS");
            Some(c)
        }
        Err(e) => {
            warn!(%nats_url, error = %e, "NATS unreachable at startup; serving cached state only");
            None
        }
    };

    // ADR-0011 SEAM resolution: rebuild the projection cache by replaying
    // JetStream from sequence 1 before opening the HTTP listener. If NATS
    // is unavailable (test mode, broker down, replay disabled via env),
    // the server starts with an empty cache and serves what it has.
    let mut state = AppState::new(nats.clone(), token);
    if let Some(client) = &nats {
        let skip_replay = std::env::var("CELLOS_SERVER_SKIP_REPLAY")
            .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
            .unwrap_or(false);
        match ensure_stream(client).await {
            Ok(ctx) => {
                if skip_replay {
                    info!("CELLOS_SERVER_SKIP_REPLAY set; skipping projection replay");
                } else if let Err(e) = replay_projection(&state, &ctx).await {
                    warn!(error = %format!("{e:#}"), "projection replay failed; starting with empty cache");
                }
                state.jetstream = Some(ctx);
            }
            Err(e) => {
                warn!(error = %format!("{e:#}"), "ensure_stream failed; WebSocket will reject until JetStream becomes available");
            }
        }
    }

    let app = router(state);

    let listener = TcpListener::bind(&bind)
        .await
        .with_context(|| format!("binding {bind}"))?;
    info!(%bind, "cellos-server listening");

    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await
        .context("axum::serve")?;

    Ok(())
}

async fn shutdown_signal() {
    // SIGTERM (k8s, systemd) and SIGINT (Ctrl-C) both trigger graceful
    // shutdown so in-flight requests are drained.
    let ctrl_c = async {
        tokio::signal::ctrl_c()
            .await
            .expect("install Ctrl-C handler");
    };
    #[cfg(unix)]
    let terminate = async {
        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
            .expect("install SIGTERM handler")
            .recv()
            .await;
    };
    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();
    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }
    info!("shutdown signal received; draining");
}