cellos-server 0.5.2

HTTP control plane for CellOS — admission, projection over JetStream, WebSocket fan-out of CloudEvents. Pure event-sourced architecture.
Documentation
//! RFC 9457 Problem Details for HTTP APIs.
//!
//! Every error path in the server returns `application/problem+json` so
//! that `cellctl` (and the web UI) can render structured diagnostics
//! without parsing free-form strings. The `type` field is a stable
//! identifier — clients may switch on it; the `title`/`detail` fields are
//! human-readable and may change.

use axum::body::{to_bytes, Body};
use axum::http::{header, HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::Json;
use serde::Serialize;

/// Stable error identifier. Adding a variant is a non-breaking change;
/// renaming one IS breaking (clients pin on `type`).
#[derive(Debug, Clone, Copy)]
pub enum AppErrorKind {
    Unauthorized,
    BadRequest,
    NotFound,
    Conflict,
    Internal,
    /// FUZZ-WAVE-1 MED-1: axum's built-in extractors (Json, Path, Query)
    /// reject malformed input with `text/plain`. We catch those rejections
    /// in the response-mapping middleware and re-emit them as
    /// problem+json under these stable `type` URIs.
    PayloadTooLarge,
    UnsupportedMediaType,
    MethodNotAllowed,
    /// FUZZ-CRIT-1: the upstream event store (JetStream/NATS) is
    /// unreachable or timing out. Distinct from `Internal` because the
    /// HTTP control plane itself is healthy — only the data tier behind
    /// `/v1/events` is degraded.
    ServiceUnavailable,
    /// Discriminants from ADR-0010 §Enforcement: cellos-server admission
    /// gate rejection reasons. Surfaced via `application/problem+json`
    /// so cellctl can switch on `type` without parsing `detail`.
    FormationCycle,
    FormationMultipleCoordinators,
    FormationNoCoordinator,
    FormationAuthorityNotNarrowing,
}

impl AppErrorKind {
    pub fn status(self) -> StatusCode {
        match self {
            AppErrorKind::Unauthorized => StatusCode::UNAUTHORIZED,
            AppErrorKind::BadRequest
            | AppErrorKind::FormationCycle
            | AppErrorKind::FormationMultipleCoordinators
            | AppErrorKind::FormationNoCoordinator
            | AppErrorKind::FormationAuthorityNotNarrowing => StatusCode::BAD_REQUEST,
            AppErrorKind::NotFound => StatusCode::NOT_FOUND,
            AppErrorKind::MethodNotAllowed => StatusCode::METHOD_NOT_ALLOWED,
            AppErrorKind::Conflict => StatusCode::CONFLICT,
            AppErrorKind::PayloadTooLarge => StatusCode::PAYLOAD_TOO_LARGE,
            AppErrorKind::UnsupportedMediaType => StatusCode::UNSUPPORTED_MEDIA_TYPE,
            AppErrorKind::Internal => StatusCode::INTERNAL_SERVER_ERROR,
            AppErrorKind::ServiceUnavailable => StatusCode::SERVICE_UNAVAILABLE,
        }
    }

    /// `type` URI identifier per RFC 9457 §3.1. We use relative URI
    /// references rooted at `/problems/` so the server's deployment URL
    /// does not affect the stable identifier.
    pub fn type_uri(self) -> &'static str {
        match self {
            AppErrorKind::Unauthorized => "/problems/unauthorized",
            AppErrorKind::BadRequest => "/problems/bad-request",
            AppErrorKind::NotFound => "/problems/not-found",
            AppErrorKind::Conflict => "/problems/conflict",
            AppErrorKind::Internal => "/problems/internal",
            AppErrorKind::PayloadTooLarge => "/problems/payload-too-large",
            AppErrorKind::UnsupportedMediaType => "/problems/unsupported-media-type",
            AppErrorKind::MethodNotAllowed => "/problems/method-not-allowed",
            AppErrorKind::ServiceUnavailable => "/problems/service-unavailable",
            AppErrorKind::FormationCycle => "/problems/formation/cycle",
            AppErrorKind::FormationMultipleCoordinators => {
                "/problems/formation/multiple-coordinators"
            }
            AppErrorKind::FormationNoCoordinator => "/problems/formation/no-coordinator",
            AppErrorKind::FormationAuthorityNotNarrowing => {
                "/problems/formation/authority-not-narrowing"
            }
        }
    }

    pub fn title(self) -> &'static str {
        match self {
            AppErrorKind::Unauthorized => "Unauthorized",
            AppErrorKind::BadRequest => "Bad Request",
            AppErrorKind::NotFound => "Not Found",
            AppErrorKind::Conflict => "Conflict",
            AppErrorKind::Internal => "Internal Server Error",
            AppErrorKind::PayloadTooLarge => "Payload Too Large",
            AppErrorKind::UnsupportedMediaType => "Unsupported Media Type",
            AppErrorKind::MethodNotAllowed => "Method Not Allowed",
            AppErrorKind::ServiceUnavailable => "Event store unavailable",
            AppErrorKind::FormationCycle => "Formation rejected: authority cycle",
            AppErrorKind::FormationMultipleCoordinators => {
                "Formation rejected: multiple coordinators"
            }
            AppErrorKind::FormationNoCoordinator => "Formation rejected: no coordinator",
            AppErrorKind::FormationAuthorityNotNarrowing => {
                "Formation rejected: authority does not narrow"
            }
        }
    }
}

#[derive(Debug, Clone)]
pub struct AppError {
    pub kind: AppErrorKind,
    pub detail: String,
}

impl AppError {
    pub fn new(kind: AppErrorKind, detail: impl Into<String>) -> Self {
        Self {
            kind,
            detail: detail.into(),
        }
    }

    pub fn bad_request(detail: impl Into<String>) -> Self {
        Self::new(AppErrorKind::BadRequest, detail)
    }

    pub fn unauthorized(detail: impl Into<String>) -> Self {
        Self::new(AppErrorKind::Unauthorized, detail)
    }

    pub fn not_found(detail: impl Into<String>) -> Self {
        Self::new(AppErrorKind::NotFound, detail)
    }

    pub fn internal(detail: impl Into<String>) -> Self {
        Self::new(AppErrorKind::Internal, detail)
    }

    pub fn payload_too_large(detail: impl Into<String>) -> Self {
        Self::new(AppErrorKind::PayloadTooLarge, detail)
    }

    pub fn unsupported_media_type(detail: impl Into<String>) -> Self {
        Self::new(AppErrorKind::UnsupportedMediaType, detail)
    }

    pub fn method_not_allowed(detail: impl Into<String>) -> Self {
        Self::new(AppErrorKind::MethodNotAllowed, detail)
    }

    /// Redacted 503 for upstream-data-tier failures. The `detail` text is
    /// fixed at the type level so callers cannot accidentally splice
    /// internal stream/subject names into the response body — the
    /// FUZZ-CRIT-1 leak. Operators get the underlying cause via the WARN
    /// log emitted at the call site, not via this user-visible body.
    pub fn service_unavailable() -> Self {
        Self::new(
            AppErrorKind::ServiceUnavailable,
            "Event store is temporarily unreachable; retry later",
        )
    }
}

/// Media type identifier per RFC 9457 §3.
pub const PROBLEM_JSON_CT: &str = "application/problem+json";

/// Build a problem+json response from a kind + detail string, bypassing
/// the full `AppError` construction path. Used by fallbacks and the
/// rejection-normalising middleware where we already know the status.
pub fn problem_response(kind: AppErrorKind, detail: impl Into<String>) -> Response {
    AppError::new(kind, detail).into_response()
}

/// Wire shape of the problem document (RFC 9457 §3.1).
#[derive(Debug, Serialize)]
struct ProblemDetails<'a> {
    #[serde(rename = "type")]
    type_uri: &'a str,
    title: &'a str,
    status: u16,
    detail: &'a str,
}

impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        let status = self.kind.status();
        let body = ProblemDetails {
            type_uri: self.kind.type_uri(),
            title: self.kind.title(),
            status: status.as_u16(),
            detail: &self.detail,
        };
        let mut resp = (status, Json(body)).into_response();
        // RFC 9457 §3 — the media type is `application/problem+json`.
        resp.headers_mut().insert(
            axum::http::header::CONTENT_TYPE,
            axum::http::HeaderValue::from_static("application/problem+json"),
        );
        resp
    }
}

impl From<anyhow::Error> for AppError {
    fn from(e: anyhow::Error) -> Self {
        AppError::internal(format!("{e:#}"))
    }
}

impl From<serde_json::Error> for AppError {
    fn from(e: serde_json::Error) -> Self {
        AppError::bad_request(format!("invalid json: {e}"))
    }
}

/// FUZZ-WAVE-1 MED-1 / MED-2: response-mapping middleware that
/// guarantees every 4xx leaving the server carries
/// `Content-Type: application/problem+json` (RFC 9457 §3).
///
/// axum's built-in extractors (`Json`, `Path`, `Query`,
/// `DefaultBodyLimit`) reject malformed input by returning a bare
/// `text/plain` body with the error string. The application-level
/// `AppError` path is already problem+json; this layer brings axum's
/// built-in rejections — plus the 404/405 fallbacks below — into the
/// same wire shape.
///
/// Strategy: inspect the outgoing response. If status is 4xx **and**
/// the existing Content-Type is **not** `application/problem+json`,
/// drain the body, pick a kind from the status, and re-emit. Headers
/// other than Content-Type/Content-Length are preserved verbatim — this
/// matters for 405 where axum already set `Allow:`.
///
/// 2xx, 3xx, and 5xx responses pass through unchanged. The Critical
/// finding in the wave-1 report (5xx leak) is out of scope for this
/// fix; this middleware only normalises 4xx content-type.
pub async fn normalize_problem_response(resp: Response) -> Response {
    let status = resp.status();

    if !status.is_client_error() {
        return resp;
    }

    let is_problem_json = resp
        .headers()
        .get(header::CONTENT_TYPE)
        .and_then(|v| v.to_str().ok())
        .map(|ct| ct.starts_with(PROBLEM_JSON_CT))
        .unwrap_or(false);

    if is_problem_json {
        return resp;
    }

    // Preserve headers we want to carry across the body rewrite. The
    // `Allow` header on a 405 is the most important — RFC 9110 §15.5.6
    // requires it and operators rely on it to discover the valid verbs.
    let allow_header = resp.headers().get(header::ALLOW).cloned();

    let (parts, body) = resp.into_parts();
    // 64 KiB is more than enough for an axum rejection string. If a
    // hostile upstream layer ever attaches a giant body to a 4xx we
    // drop it on the floor and fall back to a generic detail.
    let detail_bytes = to_bytes(body, 64 * 1024).await.unwrap_or_default();
    let detail = std::str::from_utf8(&detail_bytes)
        .unwrap_or("")
        .trim()
        .to_string();

    let kind = match status {
        StatusCode::BAD_REQUEST => AppErrorKind::BadRequest,
        StatusCode::UNAUTHORIZED => AppErrorKind::Unauthorized,
        StatusCode::NOT_FOUND => AppErrorKind::NotFound,
        StatusCode::METHOD_NOT_ALLOWED => AppErrorKind::MethodNotAllowed,
        StatusCode::CONFLICT => AppErrorKind::Conflict,
        StatusCode::PAYLOAD_TOO_LARGE => AppErrorKind::PayloadTooLarge,
        StatusCode::UNSUPPORTED_MEDIA_TYPE => AppErrorKind::UnsupportedMediaType,
        // Other 4xx (422, 415, 429, ...) — fall back to a generic
        // bad-request shape but keep the original status code below.
        _ => AppErrorKind::BadRequest,
    };

    // Empty body (e.g. axum 0.7's built-in 404/405) produces a useless
    // detail. Synthesize a sensible one so adopters see *something*
    // structured.
    let detail = if detail.is_empty() {
        match status {
            StatusCode::NOT_FOUND => "no route matched the request path".to_string(),
            StatusCode::METHOD_NOT_ALLOWED => "HTTP method not allowed for this path".to_string(),
            StatusCode::PAYLOAD_TOO_LARGE => "request body exceeds the per-route cap".to_string(),
            _ => parts
                .status
                .canonical_reason()
                .unwrap_or("client error")
                .to_string(),
        }
    } else {
        detail
    };

    let body = ProblemDetails {
        type_uri: kind.type_uri(),
        title: kind.title(),
        status: status.as_u16(),
        detail: &detail,
    };
    let body_bytes = serde_json::to_vec(&body)
        .unwrap_or_else(|_| br#"{"type":"/problems/internal","title":"Internal Server Error","status":500,"detail":"failed to serialise problem document"}"#.to_vec());

    let mut new = Response::builder()
        .status(status)
        .body(Body::from(body_bytes))
        .expect("problem+json response build");

    // Copy through every original header except those that no longer
    // describe the rewritten body.
    for (name, value) in parts.headers.iter() {
        if name == header::CONTENT_TYPE || name == header::CONTENT_LENGTH {
            continue;
        }
        new.headers_mut().append(name.clone(), value.clone());
    }
    new.headers_mut().insert(
        header::CONTENT_TYPE,
        HeaderValue::from_static(PROBLEM_JSON_CT),
    );
    // Preserve Allow if axum's method router set it and we didn't catch
    // it in the loop above (header iteration is the canonical source,
    // but this is a belt-and-braces guarantee for the 405 contract).
    if let Some(v) = allow_header {
        new.headers_mut().insert(header::ALLOW, v);
    }

    new
}