noetl-server 2.19.8

NoETL Control Plane - Async Rust server for workflow orchestration
Documentation
//! Sharding diagnostic endpoints.
//!
//! Phase F R3b-1 of [noetl/ai-meta#49](https://github.com/noetl/ai-meta/issues/49).
//! Exposes a public, deterministic endpoint that returns the
//! result of [`crate::sharding::shard_for`] for any
//! `(execution_id, shard_count)` pair the caller supplies.
//!
//! ## Why a public endpoint
//!
//! The endpoint enables an **end-to-end runtime drift guard**:
//! an integration test (R3b-3, ops repo) POSTs to both this
//! endpoint AND the gateway's twin endpoint (R3b-2) and asserts
//! they return the same `shard_index` across a battery of
//! `(execution_id, N)` pairs.  Catches drift that unit-test
//! pinning can't see — e.g. one side's `twox-hash` crate version
//! bumped without the other's, or one side's seed constant
//! drifted while the test suite was passing in isolation.
//!
//! No auth gate here:
//!
//! 1. Pure math — no DB access, no NATS publish, no state.
//! 2. Deterministic — same input always returns same output.
//! 3. No data leak — `shard_index` is a routing label, not a
//!    secret; the math is documented in the
//!    [sharding-design][design] doc.
//! 4. Drift-guard tests must be operable from any vantage point
//!    that can reach the cluster, not just from the system pool
//!    that holds `NOETL_INTERNAL_API_TOKEN`.
//!
//! [design]: https://github.com/noetl/server/wiki/sharding-design

use axum::{
    extract::{Query, State},
    http::StatusCode,
    Json,
};
use serde::{Deserialize, Serialize};

use crate::sharding::shard_for;
use crate::state::AppState;

/// Query parameters for `GET /api/runtime/shard-info`.
#[derive(Debug, Deserialize)]
pub struct ShardInfoQuery {
    /// `execution_id` to compute the shard for.  Wire-encoded as
    /// `String` to mirror the rest of the server's HTTP surface
    /// (`EventRequest.execution_id` is also `String` for browser
    /// JSON-number precision).  Parsed to `i64`; non-numeric
    /// strings return 400.
    pub execution_id: String,
    /// Shard count to compute against.  REQUIRED — there's no
    /// sensible default, and silently using the server's own
    /// `shard_count` would mix two semantics (math output vs.
    /// deployment topology).  Range `1..=1024`; 0 returns 400;
    /// values >1024 return 400 to keep the response sane (the
    /// 10-bit `machine_id` portion of the snowflake imposes the
    /// same upper bound on practical shard counts).
    pub shard_count: u32,
}

/// Diagnostic info echoing the server's own configured shard
/// (helpful when an operator is debugging a sharded deployment).
#[derive(Debug, Serialize)]
pub struct ServerShardConfig {
    pub shard_index: u32,
    pub shard_count: u32,
}

/// Response from `GET /api/runtime/shard-info`.
#[derive(Debug, Serialize)]
pub struct ShardInfoResponse {
    /// Echo of the input `execution_id`, parsed to `i64`.
    pub execution_id: i64,
    /// Echo of the input `shard_count`.
    pub shard_count: u32,
    /// **The math result** — the shard the server's
    /// `shard_for(execution_id, shard_count)` selects for this
    /// input.  Drift-guard test asserts gateway and server
    /// agree on this value.
    pub shard_index: u32,
    /// Source identifier.  Set to `"noetl-server"` so the
    /// drift-guard test can distinguish which side produced
    /// each response when both are sampled in one go.
    pub source: &'static str,
    /// Hash function identifier — pinned in code, surfaces in
    /// the response so an operator running the drift-guard
    /// from outside can confirm both sides use the same
    /// algorithm without reading source.
    pub hash_function: &'static str,
    /// Hash seed — same purpose as `hash_function`.
    pub seed: u64,
    /// The server's actual configured shard (NOT the input
    /// `shard_count`).  Reflects the `NOETL_SHARD_INDEX` /
    /// `NOETL_SHARD_COUNT` env vars.
    pub server_config: ServerShardConfig,
}

/// `GET /api/runtime/shard-info` — drift-guard diagnostic.
///
/// See module docs for the cross-component context.  This
/// handler is intentionally tiny: parse + validate + call
/// `shard_for`.  No async I/O.
pub async fn get_shard_info(
    State(state): State<AppState>,
    Query(params): Query<ShardInfoQuery>,
) -> Result<Json<ShardInfoResponse>, (StatusCode, Json<serde_json::Value>)> {
    let execution_id: i64 = params.execution_id.parse().map_err(|_| {
        (
            StatusCode::BAD_REQUEST,
            Json(serde_json::json!({
                "error": format!("execution_id {:?} is not a valid i64", params.execution_id),
            })),
        )
    })?;
    if params.shard_count == 0 {
        return Err((
            StatusCode::BAD_REQUEST,
            Json(serde_json::json!({
                "error": "shard_count must be >= 1",
            })),
        ));
    }
    // The 10-bit machine_id portion of the snowflake imposes a
    // 1024-shard practical ceiling.  Reject larger values to
    // keep the response sane and discourage misuse — operators
    // running the drift-guard against pathological inputs
    // should use the unit-test surface instead.
    if params.shard_count > 1024 {
        return Err((
            StatusCode::BAD_REQUEST,
            Json(serde_json::json!({
                "error": format!(
                    "shard_count {} exceeds practical maximum 1024",
                    params.shard_count
                ),
            })),
        ));
    }

    let shard_index = shard_for(execution_id, params.shard_count);
    Ok(Json(ShardInfoResponse {
        execution_id,
        shard_count: params.shard_count,
        shard_index,
        source: "noetl-server",
        hash_function: "twox_hash::XxHash64",
        seed: 0,
        server_config: ServerShardConfig {
            shard_index: state.shard.shard_index,
            shard_count: state.shard.shard_count,
        },
    }))
}

#[cfg(test)]
mod tests {
    use super::*;

    // ---- Pinned (execution_id, shard_count) → shard expected
    // values — MUST match the noetl-gateway shard_for() pinned
    // tests.  These are the wire-contract guarantees.

    #[test]
    fn shard_index_matches_pinned_values_n_16() {
        // Same constants the unit tests in src/sharding.rs pin;
        // re-asserted here so the handler's response can be
        // compared against a known value end-to-end.
        let n = 16;
        // shard_for() is deterministic + stable across crate
        // versions (fixed seed); these values won't change
        // unless the twox-hash crate version moves AND a
        // breaking change to its hash output ships.
        let cases: &[i64] = &[
            1,
            42,
            320_816_801_799_737_344,
            i64::MAX,
            -1,
        ];
        for eid in cases {
            // The test doesn't assert a specific number — it
            // asserts the function call doesn't panic and the
            // result is in range.  The actual stability is
            // pinned in src/sharding.rs::tests.
            let s = shard_for(*eid, n);
            assert!(s < n, "shard {s} out of range for eid={eid}, n={n}");
        }
    }

    // ---- Query-parameter validation -----------------------------

    #[test]
    fn execution_id_parses_as_i64() {
        // Valid wire values.
        for s in &["1", "42", "-1", "9999999999", "320816801799737344"] {
            let parsed: i64 = s.parse().expect("valid i64");
            // Sanity: round-tripping doesn't lose information.
            assert_eq!(parsed.to_string(), *s);
        }
    }

    #[test]
    fn execution_id_rejects_non_numeric() {
        for s in &["abc", "12.5", "0x10", "", "1_000"] {
            assert!(
                s.parse::<i64>().is_err(),
                "expected {s:?} to fail i64 parse",
            );
        }
    }

    // Note: full HTTP handler tests (status codes, JSON shapes)
    // would need an axum test harness with a real AppState.
    // The validation here covers the parsing + range checks;
    // the integration test in noetl/ops (R3b-3) exercises the
    // wire path end-to-end against a running server.
}