Skip to main content

ff_server/
api.rs

1//! REST API layer — thin axum handlers over Server methods.
2
3use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8    extract::{DefaultBodyLimit, MatchedPath, Path, Query, Request, State},
9    http::StatusCode,
10    middleware,
11    response::{IntoResponse, Response},
12    routing::{get, post, put, MethodRouter},
13    Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::Method;
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::config::ConfigError;
25use crate::server::{Server, ServerError};
26
27// ── Per-route body-size limits (#97) ──
28//
29// Axum's out-of-the-box `DefaultBodyLimit` is 2 MiB and was applied silently
30// with no per-route override, meaning a misrouted multi-GB JSON would still
31// be buffered up to 2 MiB before rejection and mega-byte payloads on
32// control-plane endpoints (cancel, priority, claim, …) were accepted with
33// no protection beyond that global cap.
34//
35// We apply three categories, ordered by generosity:
36//
37//   * `BODY_LIMIT_LARGE_PAYLOAD` (1 MiB) — carries an application
38//     `input_payload` (`POST /v1/executions`). FlowFabric has no Lua-side
39//     cap on `input_payload` (`execution.lua` SETs whatever is passed);
40//     1 MiB matches common workflow-engine conventions (e.g. AWS Step
41//     Functions input size limit is 256 KB, Temporal's default is 2 MiB)
42//     and is an order of magnitude above typical JSON control envelopes.
43//
44//   * `BODY_LIMIT_MEDIUM_PAYLOAD` (256 KiB) — signal delivery
45//     (`POST /v1/executions/{id}/signal`) carries a `DeliverSignalArgs`
46//     with an optional `Vec<u8> payload`. Signals are notification-shaped
47//     events (webhooks, human approvals), not bulk-data transfers; 256 KiB
48//     is the same ceiling Step Functions imposes on task inputs and is
49//     comfortably above any real correlation-id / approval-blob.
50//
51//   * `BODY_LIMIT_CONTROL` (64 KiB) — everything else. Control-plane JSON
52//     bodies (cancel args, priority changes, claim requests, flow
53//     members, budget/quota definitions, rotate-secret keys) are all tiny
54//     fixed-shape records; 64 KiB leaves plenty of headroom for
55//     metadata-heavy bodies while capping DoS amplification by ~32× vs
56//     the previous axum default.
57//
58// Cross-checked against `lua/*.lua` — the only payload-size caps in the
59// Lua layer are `CAPS_MAX_BYTES=4096` / `CAPS_MAX_TOKENS=256` for
60// capability lists (`lua/helpers.lua`). No other Lua path enforces a
61// byte-length limit on incoming payloads, so HTTP is the right layer to
62// draw the line.
63//
64// Future admin routes that need larger bodies (e.g. bulk import) should
65// introduce a new `BODY_LIMIT_ADMIN_BULK` const and opt in per-route
66// rather than bumping `BODY_LIMIT_CONTROL`.
67
68const BODY_LIMIT_LARGE_PAYLOAD: usize = 1024 * 1024;     // 1 MiB
69const BODY_LIMIT_MEDIUM_PAYLOAD: usize = 256 * 1024;     // 256 KiB
70const BODY_LIMIT_CONTROL: usize = 64 * 1024;             // 64 KiB
71
72/// Limit metadata attached to a request via extension so the 413 error body
73/// can report the route's configured cap. The `DefaultBodyLimit::max(bytes)`
74/// layer enforces; this struct only exists for response shape.
75#[derive(Clone, Copy)]
76struct BodyLimit {
77    bytes: usize,
78}
79
80/// Apply per-route body-size cap + attach [`BodyLimit`] extension so the
81/// JSON rejection handler can report `limit_bytes` in the 413 response.
82///
83/// Three layers, each guarding a different failure mode:
84///
85///   1. `from_fn(enforce_body_limit)` — inspects `Content-Length` up front
86///      and rejects with our structured 413 body BEFORE the handler runs.
87///      This is the primary enforcement path and works regardless of
88///      whether the handler consumes the body. Also attaches the
89///      [`BodyLimit`] extension so `AppJson`'s 413 branch (below, case 2)
90///      can report `limit_bytes` even when rejection happens during
91///      streaming.
92///   2. `DefaultBodyLimit::max(bytes)` — hint consumed by `AppJson`
93///      (`Bytes::from_request` internally) for the case where a client
94///      lies in `Content-Length` or uses `Transfer-Encoding: chunked`:
95///      the body extractor trips on a streaming-length check and we
96///      rewrite the rejection into the structured 413 shape in
97///      `AppJson::from_request`.
98///   3. `tower_http::limit::RequestBodyLimitLayer::new(bytes)` —
99///      transport-level body cap that also fires on chunked transfers
100///      even when the handler never consumes the body. Returns a plain
101///      413 (no JSON body) in that edge case; the structured path in
102///      (1) covers the overwhelmingly common case (clients sending
103///      Content-Length).
104fn with_body_limit(route: MethodRouter<Arc<Server>>, bytes: usize) -> MethodRouter<Arc<Server>> {
105    let r: MethodRouter<Arc<Server>> =
106        route.layer(tower_http::limit::RequestBodyLimitLayer::new(bytes));
107    let r: MethodRouter<Arc<Server>> = r.layer(DefaultBodyLimit::max(bytes));
108    r.layer(middleware::from_fn(
109        move |req: Request, next: middleware::Next| enforce_body_limit(bytes, req, next),
110    ))
111}
112
113/// Content-Length-based body-size enforcement + [`BodyLimit`] extension.
114///
115/// Runs before the handler so routes whose handlers never consume the
116/// request body (`replay_execution`, `revoke_lease`, `reset_budget` — see
117/// Copilot review on PR#100) are still protected against large uploads.
118async fn enforce_body_limit(bytes: usize, mut req: Request, next: middleware::Next) -> Response {
119    if let Some(content_length) = req
120        .headers()
121        .get(axum::http::header::CONTENT_LENGTH)
122        .and_then(|v| v.to_str().ok())
123        .and_then(|s| s.parse::<usize>().ok())
124        && content_length > bytes
125    {
126        let route = req
127            .extensions()
128            .get::<MatchedPath>()
129            .map(|m| m.as_str().to_owned())
130            .unwrap_or_default();
131        let body = PayloadTooLargeBody {
132            error: "payload_too_large",
133            limit_bytes: bytes,
134            route,
135        };
136        return (StatusCode::PAYLOAD_TOO_LARGE, Json(body)).into_response();
137    }
138    req.extensions_mut().insert(BodyLimit { bytes });
139    next.run(req).await
140}
141
142/// Shape for the 413 response body.
143#[derive(Serialize)]
144struct PayloadTooLargeBody {
145    error: &'static str,
146    limit_bytes: usize,
147    route: String,
148}
149
150// ── Custom JSON extractor (uniform JSON error on malformed body) ──
151
152struct AppJson<T>(T);
153
154impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
155where
156    T: serde::de::DeserializeOwned + Send,
157    S: Send + Sync,
158{
159    type Rejection = Response;
160
161    async fn from_request(
162        req: axum::extract::Request,
163        state: &S,
164    ) -> Result<Self, Self::Rejection> {
165        // Snapshot metadata for a potential 413 before `Json::from_request`
166        // consumes the request. `BodyLimit` is `Copy` and `MatchedPath` is
167        // cheap to clone (its payload is an `Arc<str>`), so the non-413
168        // path pays at most two extension lookups plus a ref-count bump.
169        let limit = req.extensions().get::<BodyLimit>().copied();
170        let matched = req.extensions().get::<MatchedPath>().cloned();
171
172        match Json::<T>::from_request(req, state).await {
173            Ok(Json(value)) => Ok(AppJson(value)),
174            Err(rejection) => {
175                let status = rejection.status();
176                tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
177                if status == StatusCode::PAYLOAD_TOO_LARGE {
178                    let limit_bytes = limit.map(|l| l.bytes).unwrap_or(0);
179                    let route = matched
180                        .as_ref()
181                        .map(|m| m.as_str().to_owned())
182                        .unwrap_or_default();
183                    let body = PayloadTooLargeBody {
184                        error: "payload_too_large",
185                        limit_bytes,
186                        route,
187                    };
188                    return Err((status, Json(body)).into_response());
189                }
190                let body = ErrorBody::plain(format!(
191                    "invalid JSON: {}",
192                    status.canonical_reason().unwrap_or("bad request"),
193                ));
194                Err((status, Json(body)).into_response())
195            }
196        }
197    }
198}
199
200// ── Error handling ──
201
202struct ApiError(ServerError);
203
204impl From<ServerError> for ApiError {
205    fn from(e: ServerError) -> Self {
206        Self(e)
207    }
208}
209
210/// RFC-017 Stage C: handlers that dispatch through the backend trait
211/// surface `EngineError` directly. Wrap into the existing
212/// `ServerError::Engine(Box<EngineError>)` lane so the
213/// `IntoResponse` mapping downstream keeps one code path for
214/// `EngineError`-rooted responses.
215impl From<ff_core::engine_error::EngineError> for ApiError {
216    fn from(e: ff_core::engine_error::EngineError) -> Self {
217        Self(ServerError::Engine(Box::new(e)))
218    }
219}
220
221/// HTTP error body. `kind`/`retryable` are populated for 500s backed by
222/// a backend transport fault (see `ff_core::BackendErrorKind`) so HTTP
223/// clients (e.g. cairn-fabric) can make retry decisions without parsing
224/// the `error` string.
225#[derive(Serialize)]
226struct ErrorBody {
227    error: String,
228    #[serde(skip_serializing_if = "Option::is_none")]
229    kind: Option<String>,
230    #[serde(skip_serializing_if = "Option::is_none")]
231    retryable: Option<bool>,
232}
233
234impl ErrorBody {
235    fn plain(error: String) -> Self {
236        Self { error, kind: None, retryable: None }
237    }
238}
239
240impl IntoResponse for ApiError {
241    fn into_response(self) -> Response {
242        let (status, body) = match &self.0 {
243            ServerError::NotFound(msg) => {
244                (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
245            }
246            ServerError::InvalidInput(msg) => {
247                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
248            }
249            ServerError::OperationFailed(msg) => {
250                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
251            }
252            ServerError::ConcurrencyLimitExceeded(source, max) => (
253                StatusCode::TOO_MANY_REQUESTS,
254                ErrorBody {
255                    error: format!(
256                        "too many concurrent {source} calls (server max: {max}); retry with backoff"
257                    ),
258                    kind: None,
259                    retryable: Some(true),
260                },
261            ),
262            ServerError::Backend(be) => {
263                let kind_str = be.kind().as_stable_str();
264                tracing::error!(
265                    kind = kind_str,
266                    message = be.message(),
267                    "backend error"
268                );
269                (
270                    StatusCode::INTERNAL_SERVER_ERROR,
271                    ErrorBody {
272                        error: self.0.to_string(),
273                        kind: Some(kind_str.to_owned()),
274                        retryable: Some(self.0.is_retryable()),
275                    },
276                )
277            }
278            ServerError::BackendContext { source, context } => {
279                let kind_str = source.kind().as_stable_str();
280                tracing::error!(
281                    kind = kind_str,
282                    message = source.message(),
283                    context = %context,
284                    "backend error"
285                );
286                (
287                    StatusCode::INTERNAL_SERVER_ERROR,
288                    ErrorBody {
289                        error: self.0.to_string(),
290                        kind: Some(kind_str.to_owned()),
291                        retryable: Some(self.0.is_retryable()),
292                    },
293                )
294            }
295            ServerError::LibraryLoad(load_err) => {
296                let kind_str = load_err
297                    .valkey_kind()
298                    .map(ff_backend_valkey::classify_ferriskey_kind)
299                    .map(|k| k.as_stable_str());
300                tracing::error!(
301                    kind = kind_str.unwrap_or(""),
302                    error = %load_err,
303                    "library load failure"
304                );
305                (
306                    StatusCode::INTERNAL_SERVER_ERROR,
307                    ErrorBody {
308                        error: format!("library load: {load_err}"),
309                        kind: kind_str.map(str::to_owned),
310                        retryable: Some(self.0.is_retryable()),
311                    },
312                )
313            }
314            // RFC-017 Stage B: trait-dispatched handlers surface
315            // backend-pool pressure + shutdown races as typed
316            // EngineError variants. Map them to their REST status so
317            // the 429/503 contract preserved from the pre-Stage-B
318            // `ConcurrencyLimitExceeded` arm keeps holding across
319            // migrated handlers (`read_attempt_stream`,
320            // `tail_attempt_stream`, …).
321            ServerError::Engine(boxed) => {
322                use ff_core::engine_error::EngineError as EE;
323                // Peel context wrappers to find the root cause so the
324                // status mapping is stable regardless of nesting.
325                fn root(e: &EE) -> &EE {
326                    match e {
327                        EE::Contextual { source, .. } => root(source),
328                        other => other,
329                    }
330                }
331                match root(boxed) {
332                    EE::ResourceExhausted { pool, max, .. } => (
333                        StatusCode::TOO_MANY_REQUESTS,
334                        ErrorBody {
335                            error: format!(
336                                "too many concurrent {pool} calls (server max: {max}); retry with backoff"
337                            ),
338                            kind: Some("resource_exhausted".into()),
339                            retryable: Some(true),
340                        },
341                    ),
342                    EE::Unavailable { op } => (
343                        StatusCode::SERVICE_UNAVAILABLE,
344                        ErrorBody {
345                            error: format!("backend op unavailable: {op}"),
346                            kind: Some("unavailable".into()),
347                            retryable: Some(false),
348                        },
349                    ),
350                    EE::NotFound { entity } => (
351                        StatusCode::NOT_FOUND,
352                        ErrorBody::plain(format!("not found: {entity}")),
353                    ),
354                    // `EngineError::Validation` is caller-supplied-input
355                    // rejection (invalid waitpoint HMAC, oversize payload,
356                    // malformed capabilities, …). Before Stage A these
357                    // surfaced as `ServerError::InvalidInput` → 400; once
358                    // `deliver_signal` and friends migrated to trait
359                    // dispatch the same-semantics errors arrived as
360                    // `EngineError::Validation`. Preserve the 400.
361                    EE::Validation { kind, detail } => {
362                        use ff_core::engine_error::ValidationKind as VK;
363                        let code = match kind {
364                            VK::InvalidInput => "invalid_input",
365                            VK::CapabilityMismatch => "capability_mismatch",
366                            VK::InvalidCapabilities => "invalid_capabilities",
367                            VK::InvalidPolicyJson => "invalid_policy_json",
368                            VK::PayloadTooLarge => "payload_too_large",
369                            VK::SignalLimitExceeded => "signal_limit_exceeded",
370                            VK::InvalidWaitpointKey => "invalid_waitpoint_key",
371                            VK::InvalidToken => "invalid_token",
372                            VK::WaitpointNotTokenBound => "waitpoint_not_token_bound",
373                            VK::RetentionLimitExceeded => "retention_limit_exceeded",
374                            VK::InvalidLeaseForSuspend => "invalid_lease_for_suspend",
375                            VK::InvalidDependency => "invalid_dependency",
376                            VK::InvalidWaitpointForExecution => "invalid_waitpoint_for_execution",
377                            VK::InvalidBlockingReason => "invalid_blocking_reason",
378                            VK::InvalidOffset => "invalid_offset",
379                            VK::Unauthorized => "unauthorized",
380                            VK::InvalidBudgetScope => "invalid_budget_scope",
381                            VK::BudgetOverrideNotAllowed => "budget_override_not_allowed",
382                            VK::InvalidQuotaSpec => "invalid_quota_spec",
383                            VK::InvalidKid => "invalid_kid",
384                            VK::InvalidSecretHex => "invalid_secret_hex",
385                            VK::InvalidGraceMs => "invalid_grace_ms",
386                            VK::InvalidTagKey => "invalid_tag_key",
387                            VK::InvalidFrameType => "invalid_frame_type",
388                            _ => "validation_error",
389                        };
390                        let msg = if detail.is_empty() {
391                            code.to_string()
392                        } else {
393                            format!("{code}: {detail}")
394                        };
395                        (StatusCode::BAD_REQUEST, ErrorBody::plain(msg))
396                    }
397                    // RFC-017 Stage C: operator-control + budget
398                    // paths surface domain-level conflicts (cycle,
399                    // dep-already-exists, rotation-kid-clash). 409
400                    // Conflict per RFC-010 §10.7.
401                    EE::Conflict(kind) => {
402                        use ff_core::engine_error::ConflictKind as CK;
403                        let code = match kind {
404                            CK::DependencyAlreadyExists { .. } => "dependency_already_exists",
405                            CK::CycleDetected => "cycle_detected",
406                            CK::SelfReferencingEdge => "self_referencing_edge",
407                            CK::ExecutionAlreadyInFlow => "execution_already_in_flow",
408                            CK::WaitpointAlreadyExists => "waitpoint_already_exists",
409                            CK::BudgetAttachConflict => "budget_attach_conflict",
410                            CK::QuotaAttachConflict => "quota_attach_conflict",
411                            CK::RotationConflict(_) => "rotation_conflict",
412                            CK::ActiveAttemptExists => "active_attempt_exists",
413                            _ => "conflict",
414                        };
415                        (
416                            StatusCode::CONFLICT,
417                            ErrorBody {
418                                error: format!("{code}: {kind:?}"),
419                                kind: Some(code.into()),
420                                retryable: Some(false),
421                            },
422                        )
423                    }
424                    // RFC-017 Stage C: retryable contention (lease
425                    // conflict, rate-limit, stale-grant). 409 preserves
426                    // the pre-migration `ServerError::OperationFailed
427                    // → 400` for most of these; we upgrade to 409 so
428                    // clients can distinguish domain-retryable from
429                    // input-validation 400s.
430                    EE::Contention(ck) => {
431                        use ff_core::engine_error::ContentionKind as CK;
432                        let (status, code, retryable) = match ck {
433                            CK::RetryExhausted => (
434                                StatusCode::INTERNAL_SERVER_ERROR,
435                                "retry_exhausted",
436                                false,
437                            ),
438                            CK::RateLimitExceeded => (
439                                StatusCode::TOO_MANY_REQUESTS,
440                                "rate_limit_exceeded",
441                                true,
442                            ),
443                            CK::ConcurrencyLimitExceeded => (
444                                StatusCode::TOO_MANY_REQUESTS,
445                                "concurrency_limit_exceeded",
446                                true,
447                            ),
448                            _ => (StatusCode::CONFLICT, "contention", true),
449                        };
450                        (
451                            status,
452                            ErrorBody {
453                                error: format!("{code}: {ck:?}"),
454                                kind: Some(code.into()),
455                                retryable: Some(retryable),
456                            },
457                        )
458                    }
459                    // RFC-017 Stage C: legal-but-surprising state
460                    // transitions surfaced by the migrated operator-
461                    // control handlers. Most are benign no-ops that
462                    // the client should swallow; a handful are true
463                    // caller errors (ReplayNotAllowed, NotRunnable,
464                    // ExecutionNotTerminal).
465                    EE::State(sk) => {
466                        use ff_core::engine_error::StateKind as SK;
467                        let (status, code) = match sk {
468                            // Replay / non-terminal gating — caller
469                            // input gating error, 409.
470                            SK::ExecutionNotTerminal => {
471                                (StatusCode::CONFLICT, "execution_not_terminal")
472                            }
473                            SK::MaxReplaysExhausted => {
474                                (StatusCode::CONFLICT, "max_replays_exhausted")
475                            }
476                            SK::ReplayNotAllowed => {
477                                (StatusCode::CONFLICT, "replay_not_allowed")
478                            }
479                            SK::NotRunnable => (StatusCode::CONFLICT, "not_runnable"),
480                            SK::Terminal => (StatusCode::CONFLICT, "terminal"),
481                            SK::FlowAlreadyTerminal => {
482                                (StatusCode::CONFLICT, "flow_already_terminal")
483                            }
484                            // Budget / admission — 409 (breach) vs 200
485                            // (soft; kept client-returnable). Stage C
486                            // handlers don't currently emit these as
487                            // Errs (soft-breach arrives as
488                            // `ReportUsageResult::SoftBreach`), so
489                            // these arms are defensive.
490                            SK::BudgetExceeded => (StatusCode::CONFLICT, "budget_exceeded"),
491                            SK::BudgetSoftExceeded => {
492                                (StatusCode::CONFLICT, "budget_soft_exceeded")
493                            }
494                            // Benign no-ops — caller retried something
495                            // that already completed. Pre-migration
496                            // this surfaced as
497                            // `ServerError::OperationFailed` → 400;
498                            // 409 is the correct status for "you tried
499                            // to X but the system is already past X".
500                            SK::AlreadySatisfied
501                            | SK::DuplicateSignal
502                            | SK::OkAlreadyApplied
503                            | SK::AttemptAlreadyTerminal
504                            | SK::StreamAlreadyClosed
505                            | SK::LeaseExpired
506                            | SK::LeaseRevoked => (StatusCode::CONFLICT, "already_satisfied"),
507                            _ => (StatusCode::CONFLICT, "state_conflict"),
508                        };
509                        (
510                            status,
511                            ErrorBody {
512                                error: format!("{code}: {sk:?}"),
513                                kind: Some(code.into()),
514                                retryable: Some(false),
515                            },
516                        )
517                    }
518                    _ => (
519                        StatusCode::INTERNAL_SERVER_ERROR,
520                        ErrorBody {
521                            error: self.0.to_string(),
522                            kind: None,
523                            retryable: Some(self.0.is_retryable()),
524                        },
525                    ),
526                }
527            }
528            // Script / Config / PartitionMismatch — developer or deployment
529            // errors. No Valkey ErrorKind to surface, but retryable=false is
530            // informative: a client-side retry won't change the outcome.
531            other => (
532                StatusCode::INTERNAL_SERVER_ERROR,
533                ErrorBody {
534                    error: other.to_string(),
535                    kind: None,
536                    retryable: Some(false),
537                },
538            ),
539        };
540        (status, Json(body)).into_response()
541    }
542}
543
544// ── Router ──
545
546pub fn router(
547    server: Arc<Server>,
548    cors_origins: &[String],
549    api_token: Option<String>,
550) -> Result<Router, ConfigError> {
551    router_with_metrics(server, cors_origins, api_token, None)
552}
553
554/// Router entry point that also mounts `/metrics` and the HTTP metrics
555/// middleware. Used by `main.rs` when the `observability` feature is on.
556///
557/// When `metrics` is `Some` AND the `observability` feature is compiled
558/// in, `/metrics` is mounted on an un-authenticated nested router (auth
559/// middleware does not apply — Prometheus convention). When the
560/// feature is off OR `metrics` is `None`, no `/metrics` route exists
561/// (returns 404) and no HTTP metrics middleware is installed.
562pub fn router_with_metrics(
563    server: Arc<Server>,
564    cors_origins: &[String],
565    api_token: Option<String>,
566    #[cfg_attr(not(feature = "observability"), allow(unused_variables))]
567    metrics: Option<Arc<crate::Metrics>>,
568) -> Result<Router, ConfigError> {
569    let auth_enabled = api_token.is_some();
570    let cors = build_cors_layer(cors_origins, auth_enabled)?;
571
572    // Per-route body-size caps (#97). See module-level `BODY_LIMIT_*` consts
573    // for the category rationale; each route picks the tightest cap that
574    // still accommodates expected real traffic.
575    let mut app = Router::new()
576        // Executions
577        .route(
578            "/v1/executions",
579            with_body_limit(
580                get(list_executions).post(create_execution),
581                BODY_LIMIT_LARGE_PAYLOAD,
582            ),
583        )
584        .route("/v1/executions/{id}", get(get_execution))
585        .route("/v1/executions/{id}/state", get(get_execution_state))
586        .route(
587            "/v1/executions/{id}/pending-waitpoints",
588            get(list_pending_waitpoints),
589        )
590        .route("/v1/executions/{id}/result", get(get_execution_result))
591        .route(
592            "/v1/executions/{id}/cancel",
593            with_body_limit(post(cancel_execution), BODY_LIMIT_CONTROL),
594        )
595        .route(
596            "/v1/executions/{id}/signal",
597            with_body_limit(post(deliver_signal), BODY_LIMIT_MEDIUM_PAYLOAD),
598        )
599        .route(
600            "/v1/executions/{id}/priority",
601            with_body_limit(put(change_priority), BODY_LIMIT_CONTROL),
602        )
603        .route(
604            "/v1/executions/{id}/replay",
605            with_body_limit(post(replay_execution), BODY_LIMIT_CONTROL),
606        )
607        .route(
608            "/v1/executions/{id}/revoke-lease",
609            with_body_limit(post(revoke_lease), BODY_LIMIT_CONTROL),
610        )
611        // Scheduler-routed claim (Batch C item 2). Worker POSTs lane +
612        // identity + capabilities; server runs budget/quota/capability
613        // admission via ff-scheduler and returns a ClaimGrant on
614        // success (204 No Content when no eligible execution).
615        .route(
616            "/v1/workers/{worker_id}/claim",
617            with_body_limit(post(claim_for_worker), BODY_LIMIT_CONTROL),
618        )
619        // Stream read + tail (RFC-006 #2)
620        .route(
621            "/v1/executions/{id}/attempts/{idx}/stream",
622            get(read_attempt_stream),
623        )
624        .route(
625            "/v1/executions/{id}/attempts/{idx}/stream/tail",
626            get(tail_attempt_stream),
627        )
628        // Flows
629        .route(
630            "/v1/flows",
631            with_body_limit(post(create_flow), BODY_LIMIT_CONTROL),
632        )
633        .route(
634            "/v1/flows/{id}/members",
635            with_body_limit(post(add_execution_to_flow), BODY_LIMIT_CONTROL),
636        )
637        .route(
638            "/v1/flows/{id}/cancel",
639            with_body_limit(post(cancel_flow), BODY_LIMIT_CONTROL),
640        )
641        .route(
642            "/v1/flows/{id}/edges",
643            with_body_limit(post(stage_dependency_edge), BODY_LIMIT_CONTROL),
644        )
645        .route(
646            "/v1/flows/{id}/edges/apply",
647            with_body_limit(post(apply_dependency_to_child), BODY_LIMIT_CONTROL),
648        )
649        // Budgets
650        .route(
651            "/v1/budgets",
652            with_body_limit(post(create_budget), BODY_LIMIT_CONTROL),
653        )
654        .route("/v1/budgets/{id}", get(get_budget_status))
655        .route(
656            "/v1/budgets/{id}/usage",
657            with_body_limit(post(report_usage), BODY_LIMIT_CONTROL),
658        )
659        .route(
660            "/v1/budgets/{id}/reset",
661            with_body_limit(post(reset_budget), BODY_LIMIT_CONTROL),
662        )
663        // Quotas
664        .route(
665            "/v1/quotas",
666            with_body_limit(post(create_quota_policy), BODY_LIMIT_CONTROL),
667        )
668        // Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security)
669        .route(
670            "/v1/admin/rotate-waitpoint-secret",
671            with_body_limit(post(rotate_waitpoint_secret), BODY_LIMIT_CONTROL),
672        )
673        // Health (always unauthenticated)
674        .route("/healthz", get(healthz));
675
676    if let Some(token) = api_token {
677        let token = Arc::new(token);
678        app = app.layer(middleware::from_fn(move |req, next| {
679            let token = token.clone();
680            auth_middleware(token, req, next)
681        }));
682    }
683
684    // PR-94: HTTP metrics middleware. Recorded AFTER the auth layer
685    // above so unauthorized 401s are still counted under their route,
686    // and BEFORE trace/cors so the metric captures handler time
687    // including the auth check itself. No-op when the
688    // `observability` feature is off.
689    #[cfg(feature = "observability")]
690    if let Some(m) = metrics.as_ref() {
691        let m = m.clone();
692        app = app.layer(middleware::from_fn_with_state(
693            m,
694            crate::metrics::http_middleware,
695        ));
696    }
697
698    #[cfg_attr(not(feature = "observability"), allow(unused_mut))]
699    let mut app = app
700        .layer(TraceLayer::new_for_http())
701        .layer(cors)
702        .with_state(server);
703
704    // PR-94: `/metrics` — intentionally unauthenticated. Mounted on a
705    // separate router with its own State so the main app's auth
706    // middleware does not run for scrapes (Prometheus convention).
707    // Network-layer auth (ingress ACL, service-mesh policy, or
708    // metrics-only listen address) is the expected gate; FF does not
709    // own auth for scrape endpoints.
710    #[cfg(feature = "observability")]
711    if let Some(m) = metrics {
712        let metrics_router: Router = Router::new()
713            .route("/metrics", get(crate::metrics::metrics_handler))
714            .with_state(m);
715        app = app.merge(metrics_router);
716    }
717
718    Ok(app)
719}
720
721async fn auth_middleware(
722    token: Arc<String>,
723    req: Request,
724    next: middleware::Next,
725) -> Response {
726    if req.uri().path() == "/healthz" {
727        return next.run(req).await;
728    }
729
730    let auth_header = req
731        .headers()
732        .get("authorization")
733        .and_then(|v| v.to_str().ok());
734
735    let authorized = auth_header
736        .and_then(|v| v.strip_prefix("Bearer "))
737        .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
738
739    if authorized {
740        next.run(req).await
741    } else {
742        (
743            StatusCode::UNAUTHORIZED,
744            Json(ErrorBody::plain(
745                "missing or invalid Authorization header".to_owned(),
746            )),
747        )
748            .into_response()
749    }
750}
751
752fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
753    if a.len() != b.len() {
754        return false;
755    }
756    let mut diff = 0u8;
757    for (x, y) in a.iter().zip(b.iter()) {
758        diff |= x ^ y;
759    }
760    diff == 0
761}
762
763/// Build the CORS layer from the configured `FF_CORS_ORIGINS` list.
764///
765/// Fails closed (#71): if every entry fails to parse as a `HeaderValue`,
766/// return `ConfigError` so startup aborts instead of silently falling back
767/// to `CorsLayer::permissive()` (a typo would otherwise broaden browser
768/// access to "allow any origin").
769///
770/// When `auth_enabled` is true (i.e. `FF_API_TOKEN` is set), `Authorization`
771/// is added to `allow_headers` (#66) so browser preflights for
772/// cross-origin authenticated requests succeed.
773fn build_cors_layer(origins: &[String], auth_enabled: bool) -> Result<CorsLayer, ConfigError> {
774    if origins.iter().any(|o| o == "*") {
775        return Ok(CorsLayer::permissive());
776    }
777
778    let mut parsed = Vec::with_capacity(origins.len());
779    let mut accepted = Vec::with_capacity(origins.len());
780    let mut invalid = Vec::new();
781    for o in origins {
782        match o.parse() {
783            Ok(v) => {
784                parsed.push(v);
785                accepted.push(o.as_str());
786            }
787            Err(_) => invalid.push(o.clone()),
788        }
789    }
790
791    if parsed.is_empty() && !origins.is_empty() {
792        return Err(ConfigError::InvalidValue {
793            var: "FF_CORS_ORIGINS".to_owned(),
794            message: format!(
795                "all configured origins failed to parse as valid HTTP header values: {:?}; \
796                 refusing to fall back to permissive CORS",
797                origins
798            ),
799        });
800    }
801
802    if !invalid.is_empty() {
803        // Collected `accepted` list during the single parse loop above to
804        // avoid an O(N*M) filter-contains scan per log event.
805        tracing::warn!(
806            ?invalid,
807            ?accepted,
808            "some FF_CORS_ORIGINS entries failed to parse and were dropped"
809        );
810    }
811
812    // Prefer typed `header::*` constants over stringly-typed `from_static`
813    // so typos fail to compile rather than fail at runtime.
814    let mut headers = vec![axum::http::header::CONTENT_TYPE];
815    if auth_enabled {
816        headers.push(axum::http::header::AUTHORIZATION);
817    }
818
819    Ok(CorsLayer::new()
820        .allow_origin(AllowOrigin::list(parsed))
821        .allow_methods([Method::GET, Method::POST, Method::PUT])
822        .allow_headers(headers))
823}
824
825// ── Execution handlers ──
826
827#[derive(Deserialize)]
828struct ListExecutionsParams {
829    /// Partition index (`u16`) to enumerate. Serves as the partition
830    /// key for the forward-only cursor listing.
831    partition: u16,
832    /// Exclusive cursor: start listing strictly after this execution
833    /// id. Omit for the first page.
834    #[serde(default)]
835    cursor: Option<String>,
836    #[serde(default = "default_limit")]
837    limit: u64,
838}
839
840fn default_limit() -> u64 { 50 }
841
842/// List executions in one partition with forward-only cursor
843/// pagination.
844///
845/// **Breaking change (unreleased HTTP surface, not on crates.io):** as
846/// of issue #182 this endpoint is a thin forwarder onto
847/// [`ff_core::engine_backend::EngineBackend::list_executions`]. The
848/// previous offset + lane + state filter query parameters were
849/// dropped; the endpoint now returns partition-scoped execution ids
850/// with an opaque `next_cursor`.
851///
852/// Request: `GET /v1/executions?partition=<u16>&cursor=<eid>&limit=<usize>`.
853/// Response: `{ "executions": ["<eid>", ...], "next_cursor": "<eid>" | null }`.
854async fn list_executions(
855    State(server): State<Arc<Server>>,
856    Query(params): Query<ListExecutionsParams>,
857) -> Result<Json<ListExecutionsPage>, ApiError> {
858    let limit = params.limit.min(1000) as usize;
859    let cursor = match params.cursor {
860        Some(raw) if !raw.is_empty() => Some(
861            ff_core::types::ExecutionId::parse(&raw).map_err(|e| {
862                ApiError::from(ServerError::InvalidInput(format!(
863                    "invalid cursor: {e}"
864                )))
865            })?,
866        ),
867        _ => None,
868    };
869    let result = server
870        .list_executions_page(params.partition, cursor, limit)
871        .await?;
872    Ok(Json(result))
873}
874
875async fn create_execution(
876    State(server): State<Arc<Server>>,
877    AppJson(args): AppJson<CreateExecutionArgs>,
878) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
879    // RFC-017 Stage D1 (§4 row 6): dispatch through the backend trait.
880    // `ValkeyBackend::create_execution` wraps the same FCALL with the
881    // same 24h idempotency TTL default — zero wire impact.
882    let result = server.backend().create_execution(args).await?;
883    let status = match &result {
884        CreateExecutionResult::Created { .. } => StatusCode::CREATED,
885        CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
886    };
887    Ok((status, Json(result)))
888}
889
890async fn get_execution(
891    State(server): State<Arc<Server>>,
892    Path(id): Path<String>,
893) -> Result<Json<ExecutionInfo>, ApiError> {
894    let eid = parse_execution_id(&id)?;
895    Ok(Json(server.get_execution(&eid).await?))
896}
897
898async fn get_execution_state(
899    State(server): State<Arc<Server>>,
900    Path(id): Path<String>,
901) -> Result<Json<PublicState>, ApiError> {
902    let eid = parse_execution_id(&id)?;
903    Ok(Json(server.get_execution_state(&eid).await?))
904}
905
906/// Returns the actionable (`pending`/`active`) waitpoints for an
907/// execution as the sanitised `PendingWaitpointInfo` shape
908/// (6 original fields + `token_kid` + `token_fingerprint` +
909/// `execution_id`). The raw HMAC `waitpoint_token` is NOT returned —
910/// callers correlate waitpoints via `(token_kid, token_fingerprint)`
911/// and obtain the delivery credential through the worker's
912/// `SuspendOutcome` at suspend time.
913///
914/// (RFC-017 Stage E4 / v0.8.0 §8: the legacy `waitpoint_token` wire
915/// field was removed; the one-release deprecation window closed with
916/// this release.)
917async fn list_pending_waitpoints(
918    State(server): State<Arc<Server>>,
919    Path(id): Path<String>,
920) -> Result<Response, ApiError> {
921    // RFC-017 Stage E4 (v0.8.0 §8): wire response serializes the
922    // sanitised `PendingWaitpointInfo` directly — no raw HMAC
923    // `waitpoint_token`, no `Deprecation: ff-017` header. Consumers
924    // correlate via `(token_kid, token_fingerprint)`.
925    let eid = parse_execution_id(&id)?;
926    let args = ff_core::contracts::ListPendingWaitpointsArgs::new(eid);
927    let page = server.backend().list_pending_waitpoints(args).await?;
928    Ok(Json(page.entries).into_response())
929}
930
931/// Returns the raw result payload bytes written by the worker's
932/// `ff_complete_execution` call. 404 when the execution has no stored
933/// result (missing entirely, still in-flight, or trimmed by retention —
934/// see below).
935///
936/// # Ordering (required)
937///
938/// Callers MUST poll `GET /v1/executions/{id}/state` until it returns
939/// `completed` before fetching `/result`. Early polls may return 404
940/// because completion writes `public_state = completed` and the result
941/// `SET` in the same atomic Lua; in the normal path the window is
942/// effectively zero, but network round-trip ordering between a state
943/// poll and a result fetch can make the result appear briefly absent
944/// during replay (`ff_replay_execution`).
945///
946/// # Retention / 404 after completed
947///
948/// `get_execution_state == completed` is authoritative for completion.
949/// This endpoint additionally depends on the result bytes not having
950/// been trimmed — v1 sets no retention policy, so
951/// `state = completed` should always pair with a 200 here. Any
952/// future retention-policy feature must call this contract out in its
953/// own docs.
954///
955/// CONTENT-TYPE: `application/octet-stream`. The server is payload-format
956/// agnostic — workers choose the encoding via the SDK's `complete(bytes)`
957/// call, and callers must know the contract. The media-pipeline example
958/// uses JSON by convention (`serde_json::to_vec(&Result)`); adapters can
959/// pick any binary format.
960///
961/// SECURITY: completion payloads can contain PII (e.g. LLM summaries of
962/// user audio). Treat this endpoint like any other read — gate behind
963/// `FF_API_TOKEN` in any deployment reachable from untrusted networks.
964/// The auth middleware only mounts when `FF_API_TOKEN` is set.
965async fn get_execution_result(
966    State(server): State<Arc<Server>>,
967    Path(id): Path<String>,
968) -> Result<Response, ApiError> {
969    // RFC-017 Stage D1 (§4 row 2): trait-dispatched execution result read.
970    let eid = parse_execution_id(&id)?;
971    match server.backend().get_execution_result(&eid).await? {
972        Some(bytes) => Ok((
973            StatusCode::OK,
974            [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
975            bytes,
976        )
977            .into_response()),
978        None => Err(ApiError(ServerError::NotFound(format!(
979            "execution result not found: {eid}"
980        )))),
981    }
982}
983
984async fn cancel_execution(
985    State(server): State<Arc<Server>>,
986    Path(id): Path<String>,
987    AppJson(mut args): AppJson<CancelExecutionArgs>,
988) -> Result<Json<CancelExecutionResult>, ApiError> {
989    // RFC-017 Stage C migration (§4 row 2): dispatch through the
990    // backend trait. Pre-read + FCALL + parse live inside
991    // `ValkeyBackend::cancel_execution`. The inherent
992    // `Server::cancel_execution` was deleted with this migration;
993    // `cancel_flow_inner`'s internal dispatch keeps its own path.
994    let path_eid = parse_execution_id(&id)?;
995    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
996    args.execution_id = path_eid;
997    Ok(Json(server.backend().cancel_execution(args).await?))
998}
999
1000async fn deliver_signal(
1001    State(server): State<Arc<Server>>,
1002    Path(id): Path<String>,
1003    AppJson(mut args): AppJson<DeliverSignalArgs>,
1004) -> Result<Json<DeliverSignalResult>, ApiError> {
1005    let path_eid = parse_execution_id(&id)?;
1006    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
1007    args.execution_id = path_eid;
1008    Ok(Json(server.deliver_signal(&args).await?))
1009}
1010
1011// ── Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security) ──
1012
1013#[derive(Deserialize)]
1014struct RotateWaitpointSecretBody {
1015    new_kid: String,
1016    /// Hex-encoded new secret. Even-length, 0-9a-fA-F.
1017    new_secret_hex: String,
1018}
1019
1020/// Hard ceiling on how long the rotate endpoint runs before the HTTP
1021/// handler bails. Rotation touches every execution partition (up to 256)
1022/// with HGET+HMGET+HDEL+HSET per partition; 6 round-trips × 30ms cross-AZ
1023/// × 256 partitions ≈ 46s worst-case. The internal SETNX lock is 10s TTL
1024/// per partition, so 120s gives ample margin for contention + slow RTTs
1025/// while staying below common LB idle timeouts (ALB 60s default, but
1026/// typically bumped to 120s+ for admin endpoints).
1027///
1028/// On timeout: returns HTTP 504 immediately. Valkey-side work may still
1029/// finish (the per-partition locks and HSETs are already in flight). The
1030/// operator observes a 504 and retries; retry is SAFE — rotation is
1031/// idempotent per-partition (same new_kid + same secret → no-op on
1032/// already-rotated partitions).
1033const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
1034
1035async fn rotate_waitpoint_secret(
1036    State(server): State<Arc<Server>>,
1037    AppJson(body): AppJson<RotateWaitpointSecretBody>,
1038) -> Result<Response, ApiError> {
1039    // Cap the whole endpoint end-to-end. If this trips, the caller's
1040    // retry is SAFE — per-partition rotation is idempotent on the same
1041    // (new_kid, secret_hex) and the per-partition SETNX lock prevents
1042    // double-rotation under concurrent retries.
1043    let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
1044    let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
1045        Ok(r) => r?,
1046        Err(_) => {
1047            tracing::error!(
1048                target: "audit",
1049                new_kid = %body.new_kid,
1050                timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
1051                "waitpoint_hmac_rotation_timeout_http_504"
1052            );
1053            let body = ErrorBody::plain(format!(
1054                "rotation exceeded {}s server-side timeout; retry is safe \
1055                 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
1056                ROTATE_HTTP_TIMEOUT.as_secs()
1057            ));
1058            return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
1059        }
1060    };
1061    // Operator action log at audit target (per-partition detail logged inside
1062    // rotate_waitpoint_secret). Returns 400 only on actionable fault states.
1063    //
1064    // Two distinct rotated==0 cases:
1065    //   - failed.is_empty() → no partitions at all (num_flow_partitions == 0;
1066    //     env_u16_positive rejects this at boot so this is mostly dead code
1067    //     for library/Default callers).
1068    //   - !failed.is_empty() → every partition attempt raised a real error.
1069    //     Operator investigates Valkey/auth/cluster health before retrying.
1070    if result.rotated == 0 && result.failed.is_empty() {
1071        return Err(ApiError::from(ServerError::OperationFailed(
1072            "rotation had no partitions to operate on \
1073             (num_flow_partitions is 0 — server misconfigured)"
1074                .to_owned(),
1075        )));
1076    }
1077    if result.rotated == 0 && !result.failed.is_empty() {
1078        return Err(ApiError::from(ServerError::OperationFailed(
1079            "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
1080        )));
1081    }
1082    Ok(Json(result).into_response())
1083}
1084
1085#[derive(Deserialize)]
1086struct ChangePriorityBody {
1087    new_priority: i32,
1088}
1089
1090async fn change_priority(
1091    State(server): State<Arc<Server>>,
1092    Path(id): Path<String>,
1093    AppJson(body): AppJson<ChangePriorityBody>,
1094) -> Result<Json<ChangePriorityResult>, ApiError> {
1095    // RFC-017 Stage C migration (§4 row 17): dispatch through trait.
1096    let eid = parse_execution_id(&id)?;
1097    let args = ff_core::contracts::ChangePriorityArgs {
1098        execution_id: eid,
1099        new_priority: body.new_priority,
1100        // Empty lane triggers backend-internal HGET pre-read; wire
1101        // format carries no lane field today (the ChangePriorityBody
1102        // REST shape only has `new_priority`). Matches legacy
1103        // `Server::change_priority` behaviour.
1104        lane_id: LaneId::new(""),
1105        now: ff_core::types::TimestampMs::now(),
1106    };
1107    Ok(Json(server.backend().change_priority(args).await?))
1108}
1109
1110async fn replay_execution(
1111    State(server): State<Arc<Server>>,
1112    Path(id): Path<String>,
1113) -> Result<Json<ReplayExecutionResult>, ApiError> {
1114    // RFC-017 Stage C migration (§4 row 3 Hard — variadic KEYS
1115    // pre-read lives inside `ValkeyBackend::replay_execution`).
1116    let eid = parse_execution_id(&id)?;
1117    let args = ff_core::contracts::ReplayExecutionArgs {
1118        execution_id: eid,
1119        now: ff_core::types::TimestampMs::now(),
1120    };
1121    Ok(Json(server.backend().replay_execution(args).await?))
1122}
1123
1124async fn revoke_lease(
1125    State(server): State<Arc<Server>>,
1126    Path(id): Path<String>,
1127) -> Result<Json<RevokeLeaseResult>, ApiError> {
1128    // RFC-017 Stage C migration (§4 row 19).
1129    let eid = parse_execution_id(&id)?;
1130    let args = ff_core::contracts::RevokeLeaseArgs {
1131        execution_id: eid,
1132        expected_lease_id: None,
1133        // Empty WIID → backend HGET pre-read. Matches pre-migration
1134        // shape where `Server::revoke_lease` read `current_worker_
1135        // instance_id` off exec_core itself.
1136        worker_instance_id: WorkerInstanceId::new(""),
1137        reason: "operator_revoke".to_owned(),
1138    };
1139    Ok(Json(server.backend().revoke_lease(args).await?))
1140}
1141
1142// ── Scheduler-routed claim (Batch C item 2 PR-B) ──
1143//
1144// The server exposes the scheduler's `claim_for_worker` cycle via
1145// HTTP so ff-sdk workers can acquire claim grants without enabling
1146// the `direct-valkey-claim` feature. The request body carries lane +
1147// identity + capabilities; the server returns a serialized
1148// `ClaimGrant` (or 204 No Content when no eligible execution exists).
1149
1150/// Request body for `POST /v1/workers/{worker_id}/claim`.
1151#[derive(Deserialize)]
1152struct ClaimForWorkerBody {
1153    lane_id: String,
1154    worker_instance_id: String,
1155    /// Capability tokens this worker advertises. Sorted + validated
1156    /// on the scheduler side; any non-printable/CSV-breaking token
1157    /// surfaces as 400.
1158    #[serde(default)]
1159    capabilities: Vec<String>,
1160    /// Grant TTL in milliseconds. Bounded so a worker can't request a
1161    /// multi-hour grant and squat the execution.
1162    grant_ttl_ms: u64,
1163}
1164
1165/// Wire shape for `ff_core::contracts::ClaimGrant`. Carries the
1166/// opaque [`ff_core::partition::PartitionKey`] on the wire; consumers
1167/// never see the internal `PartitionFamily` enum (issue #91).
1168#[derive(Serialize)]
1169struct ClaimGrantDto {
1170    execution_id: String,
1171    partition_key: ff_core::partition::PartitionKey,
1172    grant_key: String,
1173    expires_at_ms: u64,
1174}
1175
1176impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
1177    fn from(g: ff_core::contracts::ClaimGrant) -> Self {
1178        Self {
1179            execution_id: g.execution_id.to_string(),
1180            partition_key: g.partition_key,
1181            grant_key: g.grant_key,
1182            expires_at_ms: g.expires_at_ms,
1183        }
1184    }
1185}
1186
1187/// Maximum grant TTL accepted via HTTP. Mirrors the scheduler's
1188/// internal ceiling so a misconfigured worker can't squat an
1189/// execution on a multi-hour grant.
1190const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
1191
1192/// Reject empty / whitespace / non-printable identifiers the way
1193/// [`LaneId::try_new`] does for lanes. WorkerId + WorkerInstanceId
1194/// feed into scheduler scan jitter + Valkey key construction; silent
1195/// acceptance of "" or "w\nork" would either mis-key or mis-hash.
1196fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
1197    if value.is_empty() {
1198        return Err(ApiError(ServerError::InvalidInput(format!(
1199            "{field}: must not be empty"
1200        ))));
1201    }
1202    if value.len() > 256 {
1203        return Err(ApiError(ServerError::InvalidInput(format!(
1204            "{field}: exceeds 256 bytes (got {})",
1205            value.len()
1206        ))));
1207    }
1208    if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
1209        return Err(ApiError(ServerError::InvalidInput(format!(
1210            "{field}: must not contain whitespace or control characters"
1211        ))));
1212    }
1213    Ok(())
1214}
1215
1216async fn claim_for_worker(
1217    State(server): State<Arc<Server>>,
1218    Path(worker_id): Path<String>,
1219    AppJson(body): AppJson<ClaimForWorkerBody>,
1220) -> Result<Response, ApiError> {
1221    validate_identifier("worker_id", &worker_id)?;
1222    validate_identifier("worker_instance_id", &body.worker_instance_id)?;
1223    let worker_id = WorkerId::new(worker_id);
1224    let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
1225    let lane = LaneId::try_new(body.lane_id).map_err(|e| {
1226        ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
1227    })?;
1228    if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
1229        return Err(ApiError(ServerError::InvalidInput(format!(
1230            "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
1231        ))));
1232    }
1233    let caps: std::collections::BTreeSet<String> =
1234        body.capabilities.into_iter().collect();
1235
1236    // RFC-017 Stage C migration (§4 row 9 / §7): dispatch through
1237    // the backend trait — `ValkeyBackend::claim_for_worker` forwards
1238    // to its wired `ff_scheduler::Scheduler` handle.
1239    let args = ff_core::contracts::ClaimForWorkerArgs::new(
1240        lane,
1241        worker_id,
1242        worker_instance_id,
1243        caps,
1244        body.grant_ttl_ms,
1245    );
1246    match server.backend().claim_for_worker(args).await? {
1247        ff_core::contracts::ClaimForWorkerOutcome::Granted(grant) => {
1248            Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response())
1249        }
1250        ff_core::contracts::ClaimForWorkerOutcome::NoWork => {
1251            Ok(StatusCode::NO_CONTENT.into_response())
1252        }
1253        // `ClaimForWorkerOutcome` is `#[non_exhaustive]` for additive
1254        // variants (e.g. `BackPressured { retry_after_ms }`). Until
1255        // such a variant lands, surface any new shape as 503 with a
1256        // clear message pointing at the client-library version
1257        // mismatch.
1258        _ => Ok((
1259            StatusCode::SERVICE_UNAVAILABLE,
1260            Json(ErrorBody::plain(
1261                "claim_for_worker: backend returned a non-exhaustive outcome this server build does not understand".to_owned(),
1262            )),
1263        )
1264            .into_response()),
1265    }
1266}
1267
1268// ── Stream read + tail ──
1269
1270#[derive(Deserialize)]
1271struct ReadStreamParams {
1272    #[serde(default = "ff_core::contracts::StreamCursor::start")]
1273    from: ff_core::contracts::StreamCursor,
1274    #[serde(default = "ff_core::contracts::StreamCursor::end")]
1275    to: ff_core::contracts::StreamCursor,
1276    #[serde(default = "default_read_limit")]
1277    limit: u64,
1278}
1279
1280fn default_read_limit() -> u64 { 100 }
1281
1282#[derive(Serialize)]
1283struct ReadStreamResponse {
1284    frames: Vec<StreamFrame>,
1285    count: usize,
1286    /// When set, the producer has closed this stream — consumer should
1287    /// stop polling. Absent when the stream is still open (or never
1288    /// existed, which is indistinguishable from "still open" at this
1289    /// layer).
1290    #[serde(skip_serializing_if = "Option::is_none")]
1291    closed_at: Option<i64>,
1292    /// Reason from the closing writer: `attempt_success`, `attempt_failure`,
1293    /// `attempt_cancelled`, `attempt_interrupted`. Absent iff still open.
1294    #[serde(skip_serializing_if = "Option::is_none")]
1295    closed_reason: Option<String>,
1296}
1297
1298impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
1299    fn from(sf: ff_core::contracts::StreamFrames) -> Self {
1300        let count = sf.frames.len();
1301        Self {
1302            frames: sf.frames,
1303            count,
1304            closed_at: sf.closed_at.map(|t| t.0),
1305            closed_reason: sf.closed_reason,
1306        }
1307    }
1308}
1309
1310/// REST-layer ceiling on `limit` for stream read/tail responses. Lower
1311/// than the internal `STREAM_READ_HARD_CAP` (10_000) because an HTTP
1312/// response buffers the whole JSON body in memory in axum — a
1313/// `10_000 × max_payload_bytes (65_536)` body is ~640MB per call, which
1314/// is a DoS vector from a single client. Internal callers using FCALL or
1315/// the SDK directly still get the full 10_000 ceiling; REST clients must
1316/// paginate through `from`/`to` for larger spans.
1317///
1318/// v2 candidate: chunked-transfer / SSE when the caller wants > this bound.
1319const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
1320
1321async fn read_attempt_stream(
1322    State(server): State<Arc<Server>>,
1323    Path((id, idx)): Path<(String, u32)>,
1324    Query(params): Query<ReadStreamParams>,
1325) -> Result<Json<ReadStreamResponse>, ApiError> {
1326    if params.limit == 0 {
1327        return Err(ApiError(ServerError::InvalidInput(
1328            "limit must be >= 1".to_owned(),
1329        )));
1330    }
1331    if params.limit > REST_STREAM_LIMIT_CEILING {
1332        return Err(ApiError(ServerError::InvalidInput(format!(
1333            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
1334        ))));
1335    }
1336    let eid = parse_execution_id(&id)?;
1337    let attempt_index = AttemptIndex::new(idx);
1338    let result = server
1339        .read_attempt_stream(
1340            &eid,
1341            attempt_index,
1342            params.from.to_wire(),
1343            params.to.to_wire(),
1344            params.limit,
1345        )
1346        .await?;
1347    Ok(Json(result.into()))
1348}
1349
1350#[derive(Deserialize)]
1351struct TailStreamParams {
1352    #[serde(default = "ff_core::contracts::StreamCursor::beginning")]
1353    after: ff_core::contracts::StreamCursor,
1354    #[serde(default)]
1355    block_ms: u64,
1356    #[serde(default = "default_tail_limit")]
1357    limit: u64,
1358}
1359
1360fn default_tail_limit() -> u64 { 50 }
1361
1362/// Ceiling on BLOCK duration for the tail endpoint. Kept below common LB
1363/// idle timeouts (ALB 60s, nginx 60s, Cloudflare 100s) so the HTTP response
1364/// can't be cut mid-block.
1365///
1366/// Note: ferriskey's client auto-extends its `request_timeout` for XREAD
1367/// BLOCK to `block_ms + 500ms`, so a blocking call with the full ceiling
1368/// never produces a spurious transport timeout. See
1369/// `ff_script::stream_tail` module docs for the exact ferriskey code path.
1370const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1371
1372async fn tail_attempt_stream(
1373    State(server): State<Arc<Server>>,
1374    Path((id, idx)): Path<(String, u32)>,
1375    Query(params): Query<TailStreamParams>,
1376) -> Result<Json<ReadStreamResponse>, ApiError> {
1377    if params.block_ms > MAX_TAIL_BLOCK_MS {
1378        return Err(ApiError(ServerError::InvalidInput(format!(
1379            "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
1380        ))));
1381    }
1382    if params.limit == 0 {
1383        return Err(ApiError(ServerError::InvalidInput(
1384            "limit must be >= 1".to_owned(),
1385        )));
1386    }
1387    if params.limit > REST_STREAM_LIMIT_CEILING {
1388        return Err(ApiError(ServerError::InvalidInput(format!(
1389            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
1390        ))));
1391    }
1392    // XREAD cursor must be a concrete ID — `Start`/`End` are
1393    // XRANGE-only. The opaque-cursor deserializer already rejects
1394    // the bare `-`/`+` wire tokens; this boundary also rejects the
1395    // structured `start`/`end` keywords because XREAD treats them
1396    // as invalid ids. Uses [`StreamCursor::is_concrete`] so the
1397    // SDK + REST guards stay in lock-step.
1398    if !params.after.is_concrete() {
1399        return Err(ApiError(ServerError::InvalidInput(
1400            "after: XREAD cursor must be a concrete entry id; pass '0-0' to start from the beginning"
1401                .to_owned(),
1402        )));
1403    }
1404
1405    let eid = parse_execution_id(&id)?;
1406    let attempt_index = AttemptIndex::new(idx);
1407    let result = server
1408        .tail_attempt_stream(
1409            &eid,
1410            attempt_index,
1411            params.after.to_wire(),
1412            params.block_ms,
1413            params.limit,
1414        )
1415        .await?;
1416    Ok(Json(result.into()))
1417}
1418
1419// ── Flow handlers ──
1420
1421async fn create_flow(
1422    State(server): State<Arc<Server>>,
1423    AppJson(args): AppJson<CreateFlowArgs>,
1424) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
1425    // RFC-017 Stage D1 (§4 row 5): trait-dispatched ingress.
1426    let result = server.backend().create_flow(args).await?;
1427    let status = match &result {
1428        CreateFlowResult::Created { .. } => StatusCode::CREATED,
1429        CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
1430    };
1431    Ok((status, Json(result)))
1432}
1433
1434async fn add_execution_to_flow(
1435    State(server): State<Arc<Server>>,
1436    Path(id): Path<String>,
1437    AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
1438) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
1439    let path_fid = parse_flow_id(&id)?;
1440    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1441    args.flow_id = path_fid;
1442    // RFC-017 Stage D1 (§4 row 5): trait-dispatched ingress.
1443    let result = server.backend().add_execution_to_flow(args).await?;
1444    let status = match &result {
1445        AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
1446        AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
1447    };
1448    Ok((status, Json(result)))
1449}
1450
1451/// Cancel a flow.
1452///
1453/// By default the handler returns immediately with
1454/// [`CancelFlowResult::CancellationScheduled`] (or `Cancelled` for flows
1455/// with no members / non-cancel_all policies), and the individual member
1456/// execution cancellations run in a background task on the server.
1457/// Clients can track per-member progress by polling
1458/// `GET /v1/executions/{id}/state` for each id in `member_execution_ids`.
1459///
1460/// Pass `?wait=true` to run the dispatch loop inline; the handler will not
1461/// return until every member has been cancelled. Useful for tests and
1462/// callers that need synchronous completion.
1463async fn cancel_flow(
1464    State(server): State<Arc<Server>>,
1465    Path(id): Path<String>,
1466    Query(params): Query<HashMap<String, String>>,
1467    AppJson(mut args): AppJson<CancelFlowArgs>,
1468) -> Result<Json<CancelFlowResult>, ApiError> {
1469    let path_fid = parse_flow_id(&id)?;
1470    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1471    args.flow_id = path_fid;
1472    let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
1473    // RFC-017 Stage D1 (§4 row 5): header-only dispatch via the trait.
1474    // The synchronous `?wait=true` path retains the inherent
1475    // `Server::cancel_flow_wait` route because the trait's
1476    // `cancel_flow(id, policy, wait)` signature rejects `wait` variants
1477    // other than `NoWait` (Valkey impl returns `Unavailable` for timed
1478    // waits — `ff_cancel_flow` FCALL is NoWait by construction). The
1479    // async member-cancel dispatcher inside `Server::cancel_flow_inner`
1480    // continues to service `?wait=false` through the inherent path
1481    // until D2 relocates it into `ValkeyBackend::cancel_flow_with_args`.
1482    let result = if wait {
1483        server.cancel_flow_wait(&args).await?
1484    } else {
1485        server.cancel_flow(&args).await?
1486    };
1487    Ok(Json(result))
1488}
1489
1490async fn stage_dependency_edge(
1491    State(server): State<Arc<Server>>,
1492    Path(id): Path<String>,
1493    AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
1494) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
1495    let path_fid = parse_flow_id(&id)?;
1496    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1497    args.flow_id = path_fid;
1498    // RFC-017 Stage D1 (§4 row 5): trait-dispatched ingress.
1499    let result = server.backend().stage_dependency_edge(args).await?;
1500    Ok((StatusCode::CREATED, Json(result)))
1501}
1502
1503async fn apply_dependency_to_child(
1504    State(server): State<Arc<Server>>,
1505    Path(id): Path<String>,
1506    AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
1507) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
1508    let path_fid = parse_flow_id(&id)?;
1509    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1510    args.flow_id = path_fid;
1511    // RFC-017 Stage D1 (§4 row 5): trait-dispatched ingress.
1512    Ok(Json(server.backend().apply_dependency_to_child(args).await?))
1513}
1514
1515// ── Budget / Quota handlers ──
1516
1517async fn create_budget(
1518    State(server): State<Arc<Server>>,
1519    AppJson(args): AppJson<CreateBudgetArgs>,
1520) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
1521    // RFC-017 Stage C migration (§4 row 7).
1522    let result = server.backend().create_budget(args).await?;
1523    let status = match &result {
1524        CreateBudgetResult::Created { .. } => StatusCode::CREATED,
1525        CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
1526    };
1527    Ok((status, Json(result)))
1528}
1529
1530async fn get_budget_status(
1531    State(server): State<Arc<Server>>,
1532    Path(id): Path<String>,
1533) -> Result<Json<BudgetStatus>, ApiError> {
1534    // RFC-017 Stage C migration (§4 row 7 — budget read).
1535    let bid = parse_budget_id(&id)?;
1536    Ok(Json(server.backend().get_budget_status(&bid).await?))
1537}
1538
1539#[derive(Deserialize)]
1540struct ReportUsageBody {
1541    dimensions: HashMap<String, u64>,
1542    now: ff_core::types::TimestampMs,
1543    #[serde(default)]
1544    dedup_key: Option<String>,
1545}
1546
1547async fn report_usage(
1548    State(server): State<Arc<Server>>,
1549    Path(id): Path<String>,
1550    AppJson(body): AppJson<ReportUsageBody>,
1551) -> Result<Json<ReportUsageResult>, ApiError> {
1552    // RFC-017 Stage C migration (§4 row 7 — admin path via
1553    // `report_usage_admin`; no worker handle consumed).
1554    let bid = parse_budget_id(&id)?;
1555    let dims: Vec<String> = body.dimensions.keys().cloned().collect();
1556    let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
1557    let mut args = ff_core::contracts::ReportUsageAdminArgs::new(dims, deltas, body.now);
1558    if let Some(k) = body.dedup_key {
1559        args = args.with_dedup_key(k);
1560    }
1561    Ok(Json(server.backend().report_usage_admin(&bid, args).await?))
1562}
1563
1564async fn reset_budget(
1565    State(server): State<Arc<Server>>,
1566    Path(id): Path<String>,
1567) -> Result<Json<ResetBudgetResult>, ApiError> {
1568    // RFC-017 Stage C migration (§4 row 7).
1569    let bid = parse_budget_id(&id)?;
1570    let args = ff_core::contracts::ResetBudgetArgs {
1571        budget_id: bid,
1572        now: ff_core::types::TimestampMs::now(),
1573    };
1574    Ok(Json(server.backend().reset_budget(args).await?))
1575}
1576
1577async fn create_quota_policy(
1578    State(server): State<Arc<Server>>,
1579    AppJson(args): AppJson<CreateQuotaPolicyArgs>,
1580) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
1581    // RFC-017 Stage C migration (§4 row 7).
1582    let result = server.backend().create_quota_policy(args).await?;
1583    let status = match &result {
1584        CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
1585        CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
1586    };
1587    Ok((status, Json(result)))
1588}
1589
1590// ── Health check ──
1591
1592#[derive(Serialize)]
1593struct HealthResponse {
1594    status: &'static str,
1595}
1596
1597async fn healthz(
1598    State(server): State<Arc<Server>>,
1599) -> Result<Json<HealthResponse>, ApiError> {
1600    // RFC-017 Stage D2 (§4 row 1): dispatch through the backend trait's
1601    // `ping` so the healthz probe remains backend-agnostic. Valkey
1602    // impl issues `PING`; Postgres impl (Wave 4) runs `SELECT 1`.
1603    server
1604        .backend()
1605        .ping()
1606        .await
1607        .map_err(|e| ApiError(ServerError::Engine(Box::new(e))))?;
1608    Ok(Json(HealthResponse { status: "ok" }))
1609}
1610
1611// ── ID parsing helpers ──
1612
1613/// Return 400 if the body contains an ID that differs from the path ID.
1614fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1615    if body_id != path_id {
1616        return Err(ApiError(ServerError::InvalidInput(format!(
1617            "path {id_name} does not match body {id_name}"
1618        ))));
1619    }
1620    Ok(())
1621}
1622
1623fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1624    ExecutionId::parse(s)
1625        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1626}
1627
1628fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1629    FlowId::parse(s)
1630        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1631}
1632
1633fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1634    BudgetId::parse(s)
1635        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1636}
1637
1638#[cfg(test)]
1639mod cors_tests {
1640    //! CORS + auth preflight tests (#66, #71).
1641    //!
1642    //! Exercised via `tower::ServiceExt::oneshot` against a minimal router
1643    //! that applies only the CORS layer to a noop `/healthz`-style route.
1644    //! This mirrors what browsers actually see (the `CorsLayer` is terminal
1645    //! for OPTIONS preflights) without needing a live `Server`.
1646    use super::*;
1647    use axum::body::Body;
1648    use axum::http::{Request, StatusCode};
1649    use axum::routing::get;
1650    use tower::ServiceExt;
1651
1652    fn app_with_cors(origins: &[String], auth_enabled: bool) -> Router {
1653        let cors = build_cors_layer(origins, auth_enabled)
1654            .expect("build_cors_layer succeeds for valid inputs");
1655        Router::new().route("/noop", get(|| async { "ok" })).layer(cors)
1656    }
1657
1658    #[test]
1659    fn all_origins_invalid_returns_config_error_instead_of_permissive() {
1660        // #71: invalid HeaderValue (contains a control character) should
1661        // fail closed rather than falling back to permissive CORS.
1662        //
1663        // Note: `HeaderValue` parsing is intentionally lax — it accepts
1664        // almost any printable ASCII, so "not a url" parses fine. We use
1665        // a string with an embedded NUL byte to exercise the actual parse
1666        // failure path.
1667        let err = build_cors_layer(&["bad\0origin".to_owned()], false).unwrap_err();
1668        let ConfigError::InvalidValue { var, message } = err;
1669        assert_eq!(var, "FF_CORS_ORIGINS");
1670        assert!(
1671            message.contains("all configured origins failed to parse"),
1672            "message was: {message}"
1673        );
1674    }
1675
1676    #[test]
1677    fn wildcard_still_returns_permissive() {
1678        // Explicit `*` is still the documented permissive path.
1679        let layer = build_cors_layer(&["*".to_owned()], false);
1680        assert!(layer.is_ok());
1681    }
1682
1683    #[test]
1684    fn empty_origins_returns_empty_allowlist_ok() {
1685        // Empty list (no origins configured) produces an empty allowlist;
1686        // no browser origin matches. This is NOT the fail-open case — that
1687        // only happens when origins are configured but all parse-invalid.
1688        let layer = build_cors_layer(&[], false);
1689        assert!(layer.is_ok());
1690    }
1691
1692    #[test]
1693    fn mixed_valid_and_invalid_keeps_valid_entries() {
1694        // A partial typo should not fail the whole boot — we log a warning
1695        // and keep the parsable entries.
1696        let layer = build_cors_layer(
1697            &["https://ok.example.com".to_owned(), "bad\0origin".to_owned()],
1698            false,
1699        );
1700        assert!(layer.is_ok());
1701    }
1702
1703    #[tokio::test]
1704    async fn preflight_allows_authorization_when_auth_enabled() {
1705        // #66: with token auth on, browsers need `Authorization` in the
1706        // preflight's `Access-Control-Allow-Headers` response.
1707        let app = app_with_cors(&["https://client.example.com".to_owned()], true);
1708
1709        let req = Request::builder()
1710            .method("OPTIONS")
1711            .uri("/noop")
1712            .header("origin", "https://client.example.com")
1713            .header("access-control-request-method", "GET")
1714            .header("access-control-request-headers", "authorization,content-type")
1715            .body(Body::empty())
1716            .unwrap();
1717
1718        let resp = app.oneshot(req).await.unwrap();
1719        assert_eq!(resp.status(), StatusCode::OK);
1720
1721        let allow_headers = resp
1722            .headers()
1723            .get("access-control-allow-headers")
1724            .expect("access-control-allow-headers present")
1725            .to_str()
1726            .unwrap()
1727            .to_ascii_lowercase();
1728        assert!(
1729            allow_headers.contains("authorization"),
1730            "expected `authorization` in Access-Control-Allow-Headers, got: {allow_headers}"
1731        );
1732        assert!(
1733            allow_headers.contains("content-type"),
1734            "expected `content-type` in Access-Control-Allow-Headers, got: {allow_headers}"
1735        );
1736    }
1737
1738    #[tokio::test]
1739    async fn preflight_omits_authorization_when_auth_disabled() {
1740        // Negative case: without auth, Authorization is not needed and
1741        // should not be advertised (principle of least privilege).
1742        let app = app_with_cors(&["https://client.example.com".to_owned()], false);
1743
1744        let req = Request::builder()
1745            .method("OPTIONS")
1746            .uri("/noop")
1747            .header("origin", "https://client.example.com")
1748            .header("access-control-request-method", "GET")
1749            .header("access-control-request-headers", "content-type")
1750            .body(Body::empty())
1751            .unwrap();
1752
1753        let resp = app.oneshot(req).await.unwrap();
1754        assert_eq!(resp.status(), StatusCode::OK);
1755
1756        let allow_headers = resp
1757            .headers()
1758            .get("access-control-allow-headers")
1759            .expect("access-control-allow-headers present")
1760            .to_str()
1761            .unwrap()
1762            .to_ascii_lowercase();
1763        assert!(
1764            !allow_headers.contains("authorization"),
1765            "Authorization should not be advertised when auth is off; got: {allow_headers}"
1766        );
1767        assert!(allow_headers.contains("content-type"));
1768    }
1769}
1770
1771#[cfg(test)]
1772mod claim_grant_dto_tests {
1773    //! Wire-shape tests for `ClaimGrantDto` (issue #91).
1774    //!
1775    //! Pins the exact JSON shape the server emits on the
1776    //! `POST /v1/workers/{id}/claim` 200 response so any drift shows
1777    //! up at compile-adjacent test time rather than on a live SDK.
1778    use super::*;
1779    use ff_core::contracts::ClaimGrant;
1780    use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1781    use ff_core::types::{ExecutionId, FlowId};
1782
1783    fn sample_grant(family: PartitionFamily) -> ClaimGrant {
1784        let config = ff_core::partition::PartitionConfig::default();
1785        let fid = FlowId::new();
1786        let eid = ExecutionId::for_flow(&fid, &config);
1787        let p = Partition { family, index: 7 };
1788        ClaimGrant {
1789            execution_id: eid,
1790            partition_key: PartitionKey::from(&p),
1791            grant_key: "ff:exec:{fp:7}:deadbeef:claim_grant".to_owned(),
1792            expires_at_ms: 1_700_000_000_000,
1793        }
1794    }
1795
1796    #[test]
1797    fn claim_grant_dto_emits_opaque_partition_key() {
1798        let dto = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1799        let json = serde_json::to_value(&dto).unwrap();
1800        // `partition_key` serialises transparent: a bare string, not
1801        // an object, not a `partition_family`/`partition_index` pair.
1802        assert_eq!(json["partition_key"], serde_json::json!("{fp:7}"));
1803        assert!(json.get("partition_family").is_none());
1804        assert!(json.get("partition_index").is_none());
1805        assert_eq!(json["expires_at_ms"], serde_json::json!(1_700_000_000_000u64));
1806    }
1807
1808    #[test]
1809    fn claim_grant_dto_collapses_execution_alias_on_wire() {
1810        // RFC-011 §11 alias: Execution-family grants emit the same
1811        // `{fp:N}` hash tag as Flow. This test pins the wire-level
1812        // equivalence.
1813        let dto_flow = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1814        let dto_exec = ClaimGrantDto::from(sample_grant(PartitionFamily::Execution));
1815        let jf = serde_json::to_value(&dto_flow).unwrap();
1816        let je = serde_json::to_value(&dto_exec).unwrap();
1817        assert_eq!(jf["partition_key"], je["partition_key"]);
1818    }
1819}