Skip to main content

ff_server/
api.rs

1//! REST API layer — thin axum handlers over Server methods.
2
3use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8    extract::{DefaultBodyLimit, MatchedPath, Path, Query, Request, State},
9    http::StatusCode,
10    middleware,
11    response::{IntoResponse, Response},
12    routing::{get, post, put, MethodRouter},
13    Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::Method;
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::config::ConfigError;
25use crate::server::{Server, ServerError};
26
27// ── Per-route body-size limits (#97) ──
28//
29// Axum's out-of-the-box `DefaultBodyLimit` is 2 MiB and was applied silently
30// with no per-route override, meaning a misrouted multi-GB JSON would still
31// be buffered up to 2 MiB before rejection and mega-byte payloads on
32// control-plane endpoints (cancel, priority, claim, …) were accepted with
33// no protection beyond that global cap.
34//
35// We apply three categories, ordered by generosity:
36//
37//   * `BODY_LIMIT_LARGE_PAYLOAD` (1 MiB) — carries an application
38//     `input_payload` (`POST /v1/executions`). FlowFabric has no Lua-side
39//     cap on `input_payload` (`execution.lua` SETs whatever is passed);
40//     1 MiB matches common workflow-engine conventions (e.g. AWS Step
41//     Functions input size limit is 256 KB, Temporal's default is 2 MiB)
42//     and is an order of magnitude above typical JSON control envelopes.
43//
44//   * `BODY_LIMIT_MEDIUM_PAYLOAD` (256 KiB) — signal delivery
45//     (`POST /v1/executions/{id}/signal`) carries a `DeliverSignalArgs`
46//     with an optional `Vec<u8> payload`. Signals are notification-shaped
47//     events (webhooks, human approvals), not bulk-data transfers; 256 KiB
48//     is the same ceiling Step Functions imposes on task inputs and is
49//     comfortably above any real correlation-id / approval-blob.
50//
51//   * `BODY_LIMIT_CONTROL` (64 KiB) — everything else. Control-plane JSON
52//     bodies (cancel args, priority changes, claim requests, flow
53//     members, budget/quota definitions, rotate-secret keys) are all tiny
54//     fixed-shape records; 64 KiB leaves plenty of headroom for
55//     metadata-heavy bodies while capping DoS amplification by ~32× vs
56//     the previous axum default.
57//
58// Cross-checked against `lua/*.lua` — the only payload-size caps in the
59// Lua layer are `CAPS_MAX_BYTES=4096` / `CAPS_MAX_TOKENS=256` for
60// capability lists (`lua/helpers.lua`). No other Lua path enforces a
61// byte-length limit on incoming payloads, so HTTP is the right layer to
62// draw the line.
63//
64// Future admin routes that need larger bodies (e.g. bulk import) should
65// introduce a new `BODY_LIMIT_ADMIN_BULK` const and opt in per-route
66// rather than bumping `BODY_LIMIT_CONTROL`.
67
68const BODY_LIMIT_LARGE_PAYLOAD: usize = 1024 * 1024;     // 1 MiB
69const BODY_LIMIT_MEDIUM_PAYLOAD: usize = 256 * 1024;     // 256 KiB
70const BODY_LIMIT_CONTROL: usize = 64 * 1024;             // 64 KiB
71
72/// Limit metadata attached to a request via extension so the 413 error body
73/// can report the route's configured cap. The `DefaultBodyLimit::max(bytes)`
74/// layer enforces; this struct only exists for response shape.
75#[derive(Clone, Copy)]
76struct BodyLimit {
77    bytes: usize,
78}
79
80/// Apply per-route body-size cap + attach [`BodyLimit`] extension so the
81/// JSON rejection handler can report `limit_bytes` in the 413 response.
82///
83/// Three layers, each guarding a different failure mode:
84///
85///   1. `from_fn(enforce_body_limit)` — inspects `Content-Length` up front
86///      and rejects with our structured 413 body BEFORE the handler runs.
87///      This is the primary enforcement path and works regardless of
88///      whether the handler consumes the body. Also attaches the
89///      [`BodyLimit`] extension so `AppJson`'s 413 branch (below, case 2)
90///      can report `limit_bytes` even when rejection happens during
91///      streaming.
92///   2. `DefaultBodyLimit::max(bytes)` — hint consumed by `AppJson`
93///      (`Bytes::from_request` internally) for the case where a client
94///      lies in `Content-Length` or uses `Transfer-Encoding: chunked`:
95///      the body extractor trips on a streaming-length check and we
96///      rewrite the rejection into the structured 413 shape in
97///      `AppJson::from_request`.
98///   3. `tower_http::limit::RequestBodyLimitLayer::new(bytes)` —
99///      transport-level body cap that also fires on chunked transfers
100///      even when the handler never consumes the body. Returns a plain
101///      413 (no JSON body) in that edge case; the structured path in
102///      (1) covers the overwhelmingly common case (clients sending
103///      Content-Length).
104fn with_body_limit(route: MethodRouter<Arc<Server>>, bytes: usize) -> MethodRouter<Arc<Server>> {
105    let r: MethodRouter<Arc<Server>> =
106        route.layer(tower_http::limit::RequestBodyLimitLayer::new(bytes));
107    let r: MethodRouter<Arc<Server>> = r.layer(DefaultBodyLimit::max(bytes));
108    r.layer(middleware::from_fn(
109        move |req: Request, next: middleware::Next| enforce_body_limit(bytes, req, next),
110    ))
111}
112
113/// Content-Length-based body-size enforcement + [`BodyLimit`] extension.
114///
115/// Runs before the handler so routes whose handlers never consume the
116/// request body (`replay_execution`, `revoke_lease`, `reset_budget` — see
117/// Copilot review on PR#100) are still protected against large uploads.
118async fn enforce_body_limit(bytes: usize, mut req: Request, next: middleware::Next) -> Response {
119    if let Some(content_length) = req
120        .headers()
121        .get(axum::http::header::CONTENT_LENGTH)
122        .and_then(|v| v.to_str().ok())
123        .and_then(|s| s.parse::<usize>().ok())
124        && content_length > bytes
125    {
126        let route = req
127            .extensions()
128            .get::<MatchedPath>()
129            .map(|m| m.as_str().to_owned())
130            .unwrap_or_default();
131        let body = PayloadTooLargeBody {
132            error: "payload_too_large",
133            limit_bytes: bytes,
134            route,
135        };
136        return (StatusCode::PAYLOAD_TOO_LARGE, Json(body)).into_response();
137    }
138    req.extensions_mut().insert(BodyLimit { bytes });
139    next.run(req).await
140}
141
142/// Shape for the 413 response body.
143#[derive(Serialize)]
144struct PayloadTooLargeBody {
145    error: &'static str,
146    limit_bytes: usize,
147    route: String,
148}
149
150// ── Custom JSON extractor (uniform JSON error on malformed body) ──
151
152struct AppJson<T>(T);
153
154impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
155where
156    T: serde::de::DeserializeOwned + Send,
157    S: Send + Sync,
158{
159    type Rejection = Response;
160
161    async fn from_request(
162        req: axum::extract::Request,
163        state: &S,
164    ) -> Result<Self, Self::Rejection> {
165        // Snapshot metadata for a potential 413 before `Json::from_request`
166        // consumes the request. `BodyLimit` is `Copy` and `MatchedPath` is
167        // cheap to clone (its payload is an `Arc<str>`), so the non-413
168        // path pays at most two extension lookups plus a ref-count bump.
169        let limit = req.extensions().get::<BodyLimit>().copied();
170        let matched = req.extensions().get::<MatchedPath>().cloned();
171
172        match Json::<T>::from_request(req, state).await {
173            Ok(Json(value)) => Ok(AppJson(value)),
174            Err(rejection) => {
175                let status = rejection.status();
176                tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
177                if status == StatusCode::PAYLOAD_TOO_LARGE {
178                    let limit_bytes = limit.map(|l| l.bytes).unwrap_or(0);
179                    let route = matched
180                        .as_ref()
181                        .map(|m| m.as_str().to_owned())
182                        .unwrap_or_default();
183                    let body = PayloadTooLargeBody {
184                        error: "payload_too_large",
185                        limit_bytes,
186                        route,
187                    };
188                    return Err((status, Json(body)).into_response());
189                }
190                let body = ErrorBody::plain(format!(
191                    "invalid JSON: {}",
192                    status.canonical_reason().unwrap_or("bad request"),
193                ));
194                Err((status, Json(body)).into_response())
195            }
196        }
197    }
198}
199
200// ── Error handling ──
201
202struct ApiError(ServerError);
203
204impl From<ServerError> for ApiError {
205    fn from(e: ServerError) -> Self {
206        Self(e)
207    }
208}
209
210/// HTTP error body. `kind`/`retryable` are populated for 500s backed by
211/// a backend transport fault (see `ff_core::BackendErrorKind`) so HTTP
212/// clients (e.g. cairn-fabric) can make retry decisions without parsing
213/// the `error` string.
214#[derive(Serialize)]
215struct ErrorBody {
216    error: String,
217    #[serde(skip_serializing_if = "Option::is_none")]
218    kind: Option<String>,
219    #[serde(skip_serializing_if = "Option::is_none")]
220    retryable: Option<bool>,
221}
222
223impl ErrorBody {
224    fn plain(error: String) -> Self {
225        Self { error, kind: None, retryable: None }
226    }
227}
228
229impl IntoResponse for ApiError {
230    fn into_response(self) -> Response {
231        let (status, body) = match &self.0 {
232            ServerError::NotFound(msg) => {
233                (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
234            }
235            ServerError::InvalidInput(msg) => {
236                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
237            }
238            ServerError::OperationFailed(msg) => {
239                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
240            }
241            ServerError::ConcurrencyLimitExceeded(source, max) => (
242                StatusCode::TOO_MANY_REQUESTS,
243                ErrorBody {
244                    error: format!(
245                        "too many concurrent {source} calls (server max: {max}); retry with backoff"
246                    ),
247                    kind: None,
248                    retryable: Some(true),
249                },
250            ),
251            ServerError::Backend(be) => {
252                let kind_str = be.kind().as_stable_str();
253                tracing::error!(
254                    kind = kind_str,
255                    message = be.message(),
256                    "backend error"
257                );
258                (
259                    StatusCode::INTERNAL_SERVER_ERROR,
260                    ErrorBody {
261                        error: self.0.to_string(),
262                        kind: Some(kind_str.to_owned()),
263                        retryable: Some(self.0.is_retryable()),
264                    },
265                )
266            }
267            ServerError::BackendContext { source, context } => {
268                let kind_str = source.kind().as_stable_str();
269                tracing::error!(
270                    kind = kind_str,
271                    message = source.message(),
272                    context = %context,
273                    "backend error"
274                );
275                (
276                    StatusCode::INTERNAL_SERVER_ERROR,
277                    ErrorBody {
278                        error: self.0.to_string(),
279                        kind: Some(kind_str.to_owned()),
280                        retryable: Some(self.0.is_retryable()),
281                    },
282                )
283            }
284            ServerError::LibraryLoad(load_err) => {
285                let kind_str = load_err
286                    .valkey_kind()
287                    .map(ff_backend_valkey::classify_ferriskey_kind)
288                    .map(|k| k.as_stable_str());
289                tracing::error!(
290                    kind = kind_str.unwrap_or(""),
291                    error = %load_err,
292                    "library load failure"
293                );
294                (
295                    StatusCode::INTERNAL_SERVER_ERROR,
296                    ErrorBody {
297                        error: format!("library load: {load_err}"),
298                        kind: kind_str.map(str::to_owned),
299                        retryable: Some(self.0.is_retryable()),
300                    },
301                )
302            }
303            // Script / Config / PartitionMismatch — developer or deployment
304            // errors. No Valkey ErrorKind to surface, but retryable=false is
305            // informative: a client-side retry won't change the outcome.
306            other => (
307                StatusCode::INTERNAL_SERVER_ERROR,
308                ErrorBody {
309                    error: other.to_string(),
310                    kind: None,
311                    retryable: Some(false),
312                },
313            ),
314        };
315        (status, Json(body)).into_response()
316    }
317}
318
319// ── Router ──
320
321pub fn router(
322    server: Arc<Server>,
323    cors_origins: &[String],
324    api_token: Option<String>,
325) -> Result<Router, ConfigError> {
326    router_with_metrics(server, cors_origins, api_token, None)
327}
328
329/// Router entry point that also mounts `/metrics` and the HTTP metrics
330/// middleware. Used by `main.rs` when the `observability` feature is on.
331///
332/// When `metrics` is `Some` AND the `observability` feature is compiled
333/// in, `/metrics` is mounted on an un-authenticated nested router (auth
334/// middleware does not apply — Prometheus convention). When the
335/// feature is off OR `metrics` is `None`, no `/metrics` route exists
336/// (returns 404) and no HTTP metrics middleware is installed.
337pub fn router_with_metrics(
338    server: Arc<Server>,
339    cors_origins: &[String],
340    api_token: Option<String>,
341    #[cfg_attr(not(feature = "observability"), allow(unused_variables))]
342    metrics: Option<Arc<crate::Metrics>>,
343) -> Result<Router, ConfigError> {
344    let auth_enabled = api_token.is_some();
345    let cors = build_cors_layer(cors_origins, auth_enabled)?;
346
347    // Per-route body-size caps (#97). See module-level `BODY_LIMIT_*` consts
348    // for the category rationale; each route picks the tightest cap that
349    // still accommodates expected real traffic.
350    let mut app = Router::new()
351        // Executions
352        .route(
353            "/v1/executions",
354            with_body_limit(
355                get(list_executions).post(create_execution),
356                BODY_LIMIT_LARGE_PAYLOAD,
357            ),
358        )
359        .route("/v1/executions/{id}", get(get_execution))
360        .route("/v1/executions/{id}/state", get(get_execution_state))
361        .route(
362            "/v1/executions/{id}/pending-waitpoints",
363            get(list_pending_waitpoints),
364        )
365        .route("/v1/executions/{id}/result", get(get_execution_result))
366        .route(
367            "/v1/executions/{id}/cancel",
368            with_body_limit(post(cancel_execution), BODY_LIMIT_CONTROL),
369        )
370        .route(
371            "/v1/executions/{id}/signal",
372            with_body_limit(post(deliver_signal), BODY_LIMIT_MEDIUM_PAYLOAD),
373        )
374        .route(
375            "/v1/executions/{id}/priority",
376            with_body_limit(put(change_priority), BODY_LIMIT_CONTROL),
377        )
378        .route(
379            "/v1/executions/{id}/replay",
380            with_body_limit(post(replay_execution), BODY_LIMIT_CONTROL),
381        )
382        .route(
383            "/v1/executions/{id}/revoke-lease",
384            with_body_limit(post(revoke_lease), BODY_LIMIT_CONTROL),
385        )
386        // Scheduler-routed claim (Batch C item 2). Worker POSTs lane +
387        // identity + capabilities; server runs budget/quota/capability
388        // admission via ff-scheduler and returns a ClaimGrant on
389        // success (204 No Content when no eligible execution).
390        .route(
391            "/v1/workers/{worker_id}/claim",
392            with_body_limit(post(claim_for_worker), BODY_LIMIT_CONTROL),
393        )
394        // Stream read + tail (RFC-006 #2)
395        .route(
396            "/v1/executions/{id}/attempts/{idx}/stream",
397            get(read_attempt_stream),
398        )
399        .route(
400            "/v1/executions/{id}/attempts/{idx}/stream/tail",
401            get(tail_attempt_stream),
402        )
403        // Flows
404        .route(
405            "/v1/flows",
406            with_body_limit(post(create_flow), BODY_LIMIT_CONTROL),
407        )
408        .route(
409            "/v1/flows/{id}/members",
410            with_body_limit(post(add_execution_to_flow), BODY_LIMIT_CONTROL),
411        )
412        .route(
413            "/v1/flows/{id}/cancel",
414            with_body_limit(post(cancel_flow), BODY_LIMIT_CONTROL),
415        )
416        .route(
417            "/v1/flows/{id}/edges",
418            with_body_limit(post(stage_dependency_edge), BODY_LIMIT_CONTROL),
419        )
420        .route(
421            "/v1/flows/{id}/edges/apply",
422            with_body_limit(post(apply_dependency_to_child), BODY_LIMIT_CONTROL),
423        )
424        // Budgets
425        .route(
426            "/v1/budgets",
427            with_body_limit(post(create_budget), BODY_LIMIT_CONTROL),
428        )
429        .route("/v1/budgets/{id}", get(get_budget_status))
430        .route(
431            "/v1/budgets/{id}/usage",
432            with_body_limit(post(report_usage), BODY_LIMIT_CONTROL),
433        )
434        .route(
435            "/v1/budgets/{id}/reset",
436            with_body_limit(post(reset_budget), BODY_LIMIT_CONTROL),
437        )
438        // Quotas
439        .route(
440            "/v1/quotas",
441            with_body_limit(post(create_quota_policy), BODY_LIMIT_CONTROL),
442        )
443        // Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security)
444        .route(
445            "/v1/admin/rotate-waitpoint-secret",
446            with_body_limit(post(rotate_waitpoint_secret), BODY_LIMIT_CONTROL),
447        )
448        // Health (always unauthenticated)
449        .route("/healthz", get(healthz));
450
451    if let Some(token) = api_token {
452        let token = Arc::new(token);
453        app = app.layer(middleware::from_fn(move |req, next| {
454            let token = token.clone();
455            auth_middleware(token, req, next)
456        }));
457    }
458
459    // PR-94: HTTP metrics middleware. Recorded AFTER the auth layer
460    // above so unauthorized 401s are still counted under their route,
461    // and BEFORE trace/cors so the metric captures handler time
462    // including the auth check itself. No-op when the
463    // `observability` feature is off.
464    #[cfg(feature = "observability")]
465    if let Some(m) = metrics.as_ref() {
466        let m = m.clone();
467        app = app.layer(middleware::from_fn_with_state(
468            m,
469            crate::metrics::http_middleware,
470        ));
471    }
472
473    #[cfg_attr(not(feature = "observability"), allow(unused_mut))]
474    let mut app = app
475        .layer(TraceLayer::new_for_http())
476        .layer(cors)
477        .with_state(server);
478
479    // PR-94: `/metrics` — intentionally unauthenticated. Mounted on a
480    // separate router with its own State so the main app's auth
481    // middleware does not run for scrapes (Prometheus convention).
482    // Network-layer auth (ingress ACL, service-mesh policy, or
483    // metrics-only listen address) is the expected gate; FF does not
484    // own auth for scrape endpoints.
485    #[cfg(feature = "observability")]
486    if let Some(m) = metrics {
487        let metrics_router: Router = Router::new()
488            .route("/metrics", get(crate::metrics::metrics_handler))
489            .with_state(m);
490        app = app.merge(metrics_router);
491    }
492
493    Ok(app)
494}
495
496async fn auth_middleware(
497    token: Arc<String>,
498    req: Request,
499    next: middleware::Next,
500) -> Response {
501    if req.uri().path() == "/healthz" {
502        return next.run(req).await;
503    }
504
505    let auth_header = req
506        .headers()
507        .get("authorization")
508        .and_then(|v| v.to_str().ok());
509
510    let authorized = auth_header
511        .and_then(|v| v.strip_prefix("Bearer "))
512        .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
513
514    if authorized {
515        next.run(req).await
516    } else {
517        (
518            StatusCode::UNAUTHORIZED,
519            Json(ErrorBody::plain(
520                "missing or invalid Authorization header".to_owned(),
521            )),
522        )
523            .into_response()
524    }
525}
526
527fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
528    if a.len() != b.len() {
529        return false;
530    }
531    let mut diff = 0u8;
532    for (x, y) in a.iter().zip(b.iter()) {
533        diff |= x ^ y;
534    }
535    diff == 0
536}
537
538/// Build the CORS layer from the configured `FF_CORS_ORIGINS` list.
539///
540/// Fails closed (#71): if every entry fails to parse as a `HeaderValue`,
541/// return `ConfigError` so startup aborts instead of silently falling back
542/// to `CorsLayer::permissive()` (a typo would otherwise broaden browser
543/// access to "allow any origin").
544///
545/// When `auth_enabled` is true (i.e. `FF_API_TOKEN` is set), `Authorization`
546/// is added to `allow_headers` (#66) so browser preflights for
547/// cross-origin authenticated requests succeed.
548fn build_cors_layer(origins: &[String], auth_enabled: bool) -> Result<CorsLayer, ConfigError> {
549    if origins.iter().any(|o| o == "*") {
550        return Ok(CorsLayer::permissive());
551    }
552
553    let mut parsed = Vec::with_capacity(origins.len());
554    let mut accepted = Vec::with_capacity(origins.len());
555    let mut invalid = Vec::new();
556    for o in origins {
557        match o.parse() {
558            Ok(v) => {
559                parsed.push(v);
560                accepted.push(o.as_str());
561            }
562            Err(_) => invalid.push(o.clone()),
563        }
564    }
565
566    if parsed.is_empty() && !origins.is_empty() {
567        return Err(ConfigError::InvalidValue {
568            var: "FF_CORS_ORIGINS".to_owned(),
569            message: format!(
570                "all configured origins failed to parse as valid HTTP header values: {:?}; \
571                 refusing to fall back to permissive CORS",
572                origins
573            ),
574        });
575    }
576
577    if !invalid.is_empty() {
578        // Collected `accepted` list during the single parse loop above to
579        // avoid an O(N*M) filter-contains scan per log event.
580        tracing::warn!(
581            ?invalid,
582            ?accepted,
583            "some FF_CORS_ORIGINS entries failed to parse and were dropped"
584        );
585    }
586
587    // Prefer typed `header::*` constants over stringly-typed `from_static`
588    // so typos fail to compile rather than fail at runtime.
589    let mut headers = vec![axum::http::header::CONTENT_TYPE];
590    if auth_enabled {
591        headers.push(axum::http::header::AUTHORIZATION);
592    }
593
594    Ok(CorsLayer::new()
595        .allow_origin(AllowOrigin::list(parsed))
596        .allow_methods([Method::GET, Method::POST, Method::PUT])
597        .allow_headers(headers))
598}
599
600// ── Execution handlers ──
601
602#[derive(Deserialize)]
603struct ListExecutionsParams {
604    /// Partition index (`u16`) to enumerate. Serves as the partition
605    /// key for the forward-only cursor listing.
606    partition: u16,
607    /// Exclusive cursor: start listing strictly after this execution
608    /// id. Omit for the first page.
609    #[serde(default)]
610    cursor: Option<String>,
611    #[serde(default = "default_limit")]
612    limit: u64,
613}
614
615fn default_limit() -> u64 { 50 }
616
617/// List executions in one partition with forward-only cursor
618/// pagination.
619///
620/// **Breaking change (unreleased HTTP surface, not on crates.io):** as
621/// of issue #182 this endpoint is a thin forwarder onto
622/// [`ff_core::engine_backend::EngineBackend::list_executions`]. The
623/// previous offset + lane + state filter query parameters were
624/// dropped; the endpoint now returns partition-scoped execution ids
625/// with an opaque `next_cursor`.
626///
627/// Request: `GET /v1/executions?partition=<u16>&cursor=<eid>&limit=<usize>`.
628/// Response: `{ "executions": ["<eid>", ...], "next_cursor": "<eid>" | null }`.
629async fn list_executions(
630    State(server): State<Arc<Server>>,
631    Query(params): Query<ListExecutionsParams>,
632) -> Result<Json<ListExecutionsPage>, ApiError> {
633    let limit = params.limit.min(1000) as usize;
634    let cursor = match params.cursor {
635        Some(raw) if !raw.is_empty() => Some(
636            ff_core::types::ExecutionId::parse(&raw).map_err(|e| {
637                ApiError::from(ServerError::InvalidInput(format!(
638                    "invalid cursor: {e}"
639                )))
640            })?,
641        ),
642        _ => None,
643    };
644    let result = server
645        .list_executions_page(params.partition, cursor, limit)
646        .await?;
647    Ok(Json(result))
648}
649
650async fn create_execution(
651    State(server): State<Arc<Server>>,
652    AppJson(args): AppJson<CreateExecutionArgs>,
653) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
654    let result = server.create_execution(&args).await?;
655    let status = match &result {
656        CreateExecutionResult::Created { .. } => StatusCode::CREATED,
657        CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
658    };
659    Ok((status, Json(result)))
660}
661
662async fn get_execution(
663    State(server): State<Arc<Server>>,
664    Path(id): Path<String>,
665) -> Result<Json<ExecutionInfo>, ApiError> {
666    let eid = parse_execution_id(&id)?;
667    Ok(Json(server.get_execution(&eid).await?))
668}
669
670async fn get_execution_state(
671    State(server): State<Arc<Server>>,
672    Path(id): Path<String>,
673) -> Result<Json<PublicState>, ApiError> {
674    let eid = parse_execution_id(&id)?;
675    Ok(Json(server.get_execution_state(&eid).await?))
676}
677
678/// Returns the actionable (`pending`/`active`) waitpoints for an
679/// execution, including the HMAC `waitpoint_token` required to deliver
680/// signals. Human reviewers use this to look up the token originally
681/// returned only to the suspending worker's `SuspendOutcome`.
682///
683/// SECURITY: `waitpoint_token` is a bearer credential for signal
684/// delivery; leaking it lets a third party forge authority to resume or
685/// influence the execution. Gate the endpoint behind `FF_API_TOKEN` in
686/// any deployment reachable from untrusted networks. The auth middleware
687/// only mounts when `FF_API_TOKEN` is set; this endpoint is
688/// unauthenticated without it, and the server logs a loud warning at
689/// startup so operators notice.
690async fn list_pending_waitpoints(
691    State(server): State<Arc<Server>>,
692    Path(id): Path<String>,
693) -> Result<Json<Vec<PendingWaitpointInfo>>, ApiError> {
694    let eid = parse_execution_id(&id)?;
695    Ok(Json(server.list_pending_waitpoints(&eid).await?))
696}
697
698/// Returns the raw result payload bytes written by the worker's
699/// `ff_complete_execution` call. 404 when the execution has no stored
700/// result (missing entirely, still in-flight, or trimmed by retention —
701/// see below).
702///
703/// # Ordering (required)
704///
705/// Callers MUST poll `GET /v1/executions/{id}/state` until it returns
706/// `completed` before fetching `/result`. Early polls may return 404
707/// because completion writes `public_state = completed` and the result
708/// `SET` in the same atomic Lua; in the normal path the window is
709/// effectively zero, but network round-trip ordering between a state
710/// poll and a result fetch can make the result appear briefly absent
711/// during replay (`ff_replay_execution`).
712///
713/// # Retention / 404 after completed
714///
715/// `get_execution_state == completed` is authoritative for completion.
716/// This endpoint additionally depends on the result bytes not having
717/// been trimmed — v1 sets no retention policy, so
718/// `state = completed` should always pair with a 200 here. Any
719/// future retention-policy feature must call this contract out in its
720/// own docs.
721///
722/// CONTENT-TYPE: `application/octet-stream`. The server is payload-format
723/// agnostic — workers choose the encoding via the SDK's `complete(bytes)`
724/// call, and callers must know the contract. The media-pipeline example
725/// uses JSON by convention (`serde_json::to_vec(&Result)`); adapters can
726/// pick any binary format.
727///
728/// SECURITY: completion payloads can contain PII (e.g. LLM summaries of
729/// user audio). Treat this endpoint like any other read — gate behind
730/// `FF_API_TOKEN` in any deployment reachable from untrusted networks.
731/// The auth middleware only mounts when `FF_API_TOKEN` is set.
732async fn get_execution_result(
733    State(server): State<Arc<Server>>,
734    Path(id): Path<String>,
735) -> Result<Response, ApiError> {
736    let eid = parse_execution_id(&id)?;
737    match server.get_execution_result(&eid).await? {
738        Some(bytes) => Ok((
739            StatusCode::OK,
740            [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
741            bytes,
742        )
743            .into_response()),
744        None => Err(ApiError(ServerError::NotFound(format!(
745            "execution result not found: {eid}"
746        )))),
747    }
748}
749
750async fn cancel_execution(
751    State(server): State<Arc<Server>>,
752    Path(id): Path<String>,
753    AppJson(mut args): AppJson<CancelExecutionArgs>,
754) -> Result<Json<CancelExecutionResult>, ApiError> {
755    let path_eid = parse_execution_id(&id)?;
756    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
757    args.execution_id = path_eid;
758    Ok(Json(server.cancel_execution(&args).await?))
759}
760
761async fn deliver_signal(
762    State(server): State<Arc<Server>>,
763    Path(id): Path<String>,
764    AppJson(mut args): AppJson<DeliverSignalArgs>,
765) -> Result<Json<DeliverSignalResult>, ApiError> {
766    let path_eid = parse_execution_id(&id)?;
767    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
768    args.execution_id = path_eid;
769    Ok(Json(server.deliver_signal(&args).await?))
770}
771
772// ── Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security) ──
773
774#[derive(Deserialize)]
775struct RotateWaitpointSecretBody {
776    new_kid: String,
777    /// Hex-encoded new secret. Even-length, 0-9a-fA-F.
778    new_secret_hex: String,
779}
780
781/// Hard ceiling on how long the rotate endpoint runs before the HTTP
782/// handler bails. Rotation touches every execution partition (up to 256)
783/// with HGET+HMGET+HDEL+HSET per partition; 6 round-trips × 30ms cross-AZ
784/// × 256 partitions ≈ 46s worst-case. The internal SETNX lock is 10s TTL
785/// per partition, so 120s gives ample margin for contention + slow RTTs
786/// while staying below common LB idle timeouts (ALB 60s default, but
787/// typically bumped to 120s+ for admin endpoints).
788///
789/// On timeout: returns HTTP 504 immediately. Valkey-side work may still
790/// finish (the per-partition locks and HSETs are already in flight). The
791/// operator observes a 504 and retries; retry is SAFE — rotation is
792/// idempotent per-partition (same new_kid + same secret → no-op on
793/// already-rotated partitions).
794const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
795
796async fn rotate_waitpoint_secret(
797    State(server): State<Arc<Server>>,
798    AppJson(body): AppJson<RotateWaitpointSecretBody>,
799) -> Result<Response, ApiError> {
800    // Cap the whole endpoint end-to-end. If this trips, the caller's
801    // retry is SAFE — per-partition rotation is idempotent on the same
802    // (new_kid, secret_hex) and the per-partition SETNX lock prevents
803    // double-rotation under concurrent retries.
804    let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
805    let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
806        Ok(r) => r?,
807        Err(_) => {
808            tracing::error!(
809                target: "audit",
810                new_kid = %body.new_kid,
811                timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
812                "waitpoint_hmac_rotation_timeout_http_504"
813            );
814            let body = ErrorBody::plain(format!(
815                "rotation exceeded {}s server-side timeout; retry is safe \
816                 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
817                ROTATE_HTTP_TIMEOUT.as_secs()
818            ));
819            return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
820        }
821    };
822    // Operator action log at audit target (per-partition detail logged inside
823    // rotate_waitpoint_secret). Returns 400 only on actionable fault states.
824    //
825    // Two distinct rotated==0 cases:
826    //   - failed.is_empty() → no partitions at all (num_flow_partitions == 0;
827    //     env_u16_positive rejects this at boot so this is mostly dead code
828    //     for library/Default callers).
829    //   - !failed.is_empty() → every partition attempt raised a real error.
830    //     Operator investigates Valkey/auth/cluster health before retrying.
831    if result.rotated == 0 && result.failed.is_empty() {
832        return Err(ApiError::from(ServerError::OperationFailed(
833            "rotation had no partitions to operate on \
834             (num_flow_partitions is 0 — server misconfigured)"
835                .to_owned(),
836        )));
837    }
838    if result.rotated == 0 && !result.failed.is_empty() {
839        return Err(ApiError::from(ServerError::OperationFailed(
840            "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
841        )));
842    }
843    Ok(Json(result).into_response())
844}
845
846#[derive(Deserialize)]
847struct ChangePriorityBody {
848    new_priority: i32,
849}
850
851async fn change_priority(
852    State(server): State<Arc<Server>>,
853    Path(id): Path<String>,
854    AppJson(body): AppJson<ChangePriorityBody>,
855) -> Result<Json<ChangePriorityResult>, ApiError> {
856    let eid = parse_execution_id(&id)?;
857    Ok(Json(server.change_priority(&eid, body.new_priority).await?))
858}
859
860async fn replay_execution(
861    State(server): State<Arc<Server>>,
862    Path(id): Path<String>,
863) -> Result<Json<ReplayExecutionResult>, ApiError> {
864    let eid = parse_execution_id(&id)?;
865    Ok(Json(server.replay_execution(&eid).await?))
866}
867
868async fn revoke_lease(
869    State(server): State<Arc<Server>>,
870    Path(id): Path<String>,
871) -> Result<Json<RevokeLeaseResult>, ApiError> {
872    let eid = parse_execution_id(&id)?;
873    Ok(Json(server.revoke_lease(&eid).await?))
874}
875
876// ── Scheduler-routed claim (Batch C item 2 PR-B) ──
877//
878// The server exposes the scheduler's `claim_for_worker` cycle via
879// HTTP so ff-sdk workers can acquire claim grants without enabling
880// the `direct-valkey-claim` feature. The request body carries lane +
881// identity + capabilities; the server returns a serialized
882// `ClaimGrant` (or 204 No Content when no eligible execution exists).
883
884/// Request body for `POST /v1/workers/{worker_id}/claim`.
885#[derive(Deserialize)]
886struct ClaimForWorkerBody {
887    lane_id: String,
888    worker_instance_id: String,
889    /// Capability tokens this worker advertises. Sorted + validated
890    /// on the scheduler side; any non-printable/CSV-breaking token
891    /// surfaces as 400.
892    #[serde(default)]
893    capabilities: Vec<String>,
894    /// Grant TTL in milliseconds. Bounded so a worker can't request a
895    /// multi-hour grant and squat the execution.
896    grant_ttl_ms: u64,
897}
898
899/// Wire shape for `ff_core::contracts::ClaimGrant`. Carries the
900/// opaque [`ff_core::partition::PartitionKey`] on the wire; consumers
901/// never see the internal `PartitionFamily` enum (issue #91).
902#[derive(Serialize)]
903struct ClaimGrantDto {
904    execution_id: String,
905    partition_key: ff_core::partition::PartitionKey,
906    grant_key: String,
907    expires_at_ms: u64,
908}
909
910impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
911    fn from(g: ff_core::contracts::ClaimGrant) -> Self {
912        Self {
913            execution_id: g.execution_id.to_string(),
914            partition_key: g.partition_key,
915            grant_key: g.grant_key,
916            expires_at_ms: g.expires_at_ms,
917        }
918    }
919}
920
921/// Maximum grant TTL accepted via HTTP. Mirrors the scheduler's
922/// internal ceiling so a misconfigured worker can't squat an
923/// execution on a multi-hour grant.
924const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
925
926/// Reject empty / whitespace / non-printable identifiers the way
927/// [`LaneId::try_new`] does for lanes. WorkerId + WorkerInstanceId
928/// feed into scheduler scan jitter + Valkey key construction; silent
929/// acceptance of "" or "w\nork" would either mis-key or mis-hash.
930fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
931    if value.is_empty() {
932        return Err(ApiError(ServerError::InvalidInput(format!(
933            "{field}: must not be empty"
934        ))));
935    }
936    if value.len() > 256 {
937        return Err(ApiError(ServerError::InvalidInput(format!(
938            "{field}: exceeds 256 bytes (got {})",
939            value.len()
940        ))));
941    }
942    if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
943        return Err(ApiError(ServerError::InvalidInput(format!(
944            "{field}: must not contain whitespace or control characters"
945        ))));
946    }
947    Ok(())
948}
949
950async fn claim_for_worker(
951    State(server): State<Arc<Server>>,
952    Path(worker_id): Path<String>,
953    AppJson(body): AppJson<ClaimForWorkerBody>,
954) -> Result<Response, ApiError> {
955    validate_identifier("worker_id", &worker_id)?;
956    validate_identifier("worker_instance_id", &body.worker_instance_id)?;
957    let worker_id = WorkerId::new(worker_id);
958    let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
959    let lane = LaneId::try_new(body.lane_id).map_err(|e| {
960        ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
961    })?;
962    if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
963        return Err(ApiError(ServerError::InvalidInput(format!(
964            "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
965        ))));
966    }
967    let caps: std::collections::BTreeSet<String> =
968        body.capabilities.into_iter().collect();
969
970    match server
971        .claim_for_worker(
972            &lane,
973            &worker_id,
974            &worker_instance_id,
975            &caps,
976            body.grant_ttl_ms,
977        )
978        .await?
979    {
980        Some(grant) => Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response()),
981        None => Ok(StatusCode::NO_CONTENT.into_response()),
982    }
983}
984
985// ── Stream read + tail ──
986
987#[derive(Deserialize)]
988struct ReadStreamParams {
989    #[serde(default = "ff_core::contracts::StreamCursor::start")]
990    from: ff_core::contracts::StreamCursor,
991    #[serde(default = "ff_core::contracts::StreamCursor::end")]
992    to: ff_core::contracts::StreamCursor,
993    #[serde(default = "default_read_limit")]
994    limit: u64,
995}
996
997fn default_read_limit() -> u64 { 100 }
998
999#[derive(Serialize)]
1000struct ReadStreamResponse {
1001    frames: Vec<StreamFrame>,
1002    count: usize,
1003    /// When set, the producer has closed this stream — consumer should
1004    /// stop polling. Absent when the stream is still open (or never
1005    /// existed, which is indistinguishable from "still open" at this
1006    /// layer).
1007    #[serde(skip_serializing_if = "Option::is_none")]
1008    closed_at: Option<i64>,
1009    /// Reason from the closing writer: `attempt_success`, `attempt_failure`,
1010    /// `attempt_cancelled`, `attempt_interrupted`. Absent iff still open.
1011    #[serde(skip_serializing_if = "Option::is_none")]
1012    closed_reason: Option<String>,
1013}
1014
1015impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
1016    fn from(sf: ff_core::contracts::StreamFrames) -> Self {
1017        let count = sf.frames.len();
1018        Self {
1019            frames: sf.frames,
1020            count,
1021            closed_at: sf.closed_at.map(|t| t.0),
1022            closed_reason: sf.closed_reason,
1023        }
1024    }
1025}
1026
1027/// REST-layer ceiling on `limit` for stream read/tail responses. Lower
1028/// than the internal `STREAM_READ_HARD_CAP` (10_000) because an HTTP
1029/// response buffers the whole JSON body in memory in axum — a
1030/// `10_000 × max_payload_bytes (65_536)` body is ~640MB per call, which
1031/// is a DoS vector from a single client. Internal callers using FCALL or
1032/// the SDK directly still get the full 10_000 ceiling; REST clients must
1033/// paginate through `from`/`to` for larger spans.
1034///
1035/// v2 candidate: chunked-transfer / SSE when the caller wants > this bound.
1036const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
1037
1038async fn read_attempt_stream(
1039    State(server): State<Arc<Server>>,
1040    Path((id, idx)): Path<(String, u32)>,
1041    Query(params): Query<ReadStreamParams>,
1042) -> Result<Json<ReadStreamResponse>, ApiError> {
1043    if params.limit == 0 {
1044        return Err(ApiError(ServerError::InvalidInput(
1045            "limit must be >= 1".to_owned(),
1046        )));
1047    }
1048    if params.limit > REST_STREAM_LIMIT_CEILING {
1049        return Err(ApiError(ServerError::InvalidInput(format!(
1050            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
1051        ))));
1052    }
1053    let eid = parse_execution_id(&id)?;
1054    let attempt_index = AttemptIndex::new(idx);
1055    let result = server
1056        .read_attempt_stream(
1057            &eid,
1058            attempt_index,
1059            params.from.to_wire(),
1060            params.to.to_wire(),
1061            params.limit,
1062        )
1063        .await?;
1064    Ok(Json(result.into()))
1065}
1066
1067#[derive(Deserialize)]
1068struct TailStreamParams {
1069    #[serde(default = "ff_core::contracts::StreamCursor::beginning")]
1070    after: ff_core::contracts::StreamCursor,
1071    #[serde(default)]
1072    block_ms: u64,
1073    #[serde(default = "default_tail_limit")]
1074    limit: u64,
1075}
1076
1077fn default_tail_limit() -> u64 { 50 }
1078
1079/// Ceiling on BLOCK duration for the tail endpoint. Kept below common LB
1080/// idle timeouts (ALB 60s, nginx 60s, Cloudflare 100s) so the HTTP response
1081/// can't be cut mid-block.
1082///
1083/// Note: ferriskey's client auto-extends its `request_timeout` for XREAD
1084/// BLOCK to `block_ms + 500ms`, so a blocking call with the full ceiling
1085/// never produces a spurious transport timeout. See
1086/// `ff_script::stream_tail` module docs for the exact ferriskey code path.
1087const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1088
1089async fn tail_attempt_stream(
1090    State(server): State<Arc<Server>>,
1091    Path((id, idx)): Path<(String, u32)>,
1092    Query(params): Query<TailStreamParams>,
1093) -> Result<Json<ReadStreamResponse>, ApiError> {
1094    if params.block_ms > MAX_TAIL_BLOCK_MS {
1095        return Err(ApiError(ServerError::InvalidInput(format!(
1096            "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
1097        ))));
1098    }
1099    if params.limit == 0 {
1100        return Err(ApiError(ServerError::InvalidInput(
1101            "limit must be >= 1".to_owned(),
1102        )));
1103    }
1104    if params.limit > REST_STREAM_LIMIT_CEILING {
1105        return Err(ApiError(ServerError::InvalidInput(format!(
1106            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
1107        ))));
1108    }
1109    // XREAD cursor must be a concrete ID — `Start`/`End` are
1110    // XRANGE-only. The opaque-cursor deserializer already rejects
1111    // the bare `-`/`+` wire tokens; this boundary also rejects the
1112    // structured `start`/`end` keywords because XREAD treats them
1113    // as invalid ids. Uses [`StreamCursor::is_concrete`] so the
1114    // SDK + REST guards stay in lock-step.
1115    if !params.after.is_concrete() {
1116        return Err(ApiError(ServerError::InvalidInput(
1117            "after: XREAD cursor must be a concrete entry id; pass '0-0' to start from the beginning"
1118                .to_owned(),
1119        )));
1120    }
1121
1122    let eid = parse_execution_id(&id)?;
1123    let attempt_index = AttemptIndex::new(idx);
1124    let result = server
1125        .tail_attempt_stream(
1126            &eid,
1127            attempt_index,
1128            params.after.to_wire(),
1129            params.block_ms,
1130            params.limit,
1131        )
1132        .await?;
1133    Ok(Json(result.into()))
1134}
1135
1136// ── Flow handlers ──
1137
1138async fn create_flow(
1139    State(server): State<Arc<Server>>,
1140    AppJson(args): AppJson<CreateFlowArgs>,
1141) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
1142    let result = server.create_flow(&args).await?;
1143    let status = match &result {
1144        CreateFlowResult::Created { .. } => StatusCode::CREATED,
1145        CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
1146    };
1147    Ok((status, Json(result)))
1148}
1149
1150async fn add_execution_to_flow(
1151    State(server): State<Arc<Server>>,
1152    Path(id): Path<String>,
1153    AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
1154) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
1155    let path_fid = parse_flow_id(&id)?;
1156    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1157    args.flow_id = path_fid;
1158    let result = server.add_execution_to_flow(&args).await?;
1159    let status = match &result {
1160        AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
1161        AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
1162    };
1163    Ok((status, Json(result)))
1164}
1165
1166/// Cancel a flow.
1167///
1168/// By default the handler returns immediately with
1169/// [`CancelFlowResult::CancellationScheduled`] (or `Cancelled` for flows
1170/// with no members / non-cancel_all policies), and the individual member
1171/// execution cancellations run in a background task on the server.
1172/// Clients can track per-member progress by polling
1173/// `GET /v1/executions/{id}/state` for each id in `member_execution_ids`.
1174///
1175/// Pass `?wait=true` to run the dispatch loop inline; the handler will not
1176/// return until every member has been cancelled. Useful for tests and
1177/// callers that need synchronous completion.
1178async fn cancel_flow(
1179    State(server): State<Arc<Server>>,
1180    Path(id): Path<String>,
1181    Query(params): Query<HashMap<String, String>>,
1182    AppJson(mut args): AppJson<CancelFlowArgs>,
1183) -> Result<Json<CancelFlowResult>, ApiError> {
1184    let path_fid = parse_flow_id(&id)?;
1185    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1186    args.flow_id = path_fid;
1187    let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
1188    let result = if wait {
1189        server.cancel_flow_wait(&args).await?
1190    } else {
1191        server.cancel_flow(&args).await?
1192    };
1193    Ok(Json(result))
1194}
1195
1196async fn stage_dependency_edge(
1197    State(server): State<Arc<Server>>,
1198    Path(id): Path<String>,
1199    AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
1200) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
1201    let path_fid = parse_flow_id(&id)?;
1202    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1203    args.flow_id = path_fid;
1204    let result = server.stage_dependency_edge(&args).await?;
1205    Ok((StatusCode::CREATED, Json(result)))
1206}
1207
1208async fn apply_dependency_to_child(
1209    State(server): State<Arc<Server>>,
1210    Path(id): Path<String>,
1211    AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
1212) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
1213    let path_fid = parse_flow_id(&id)?;
1214    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1215    args.flow_id = path_fid;
1216    Ok(Json(server.apply_dependency_to_child(&args).await?))
1217}
1218
1219// ── Budget / Quota handlers ──
1220
1221async fn create_budget(
1222    State(server): State<Arc<Server>>,
1223    AppJson(args): AppJson<CreateBudgetArgs>,
1224) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
1225    let result = server.create_budget(&args).await?;
1226    let status = match &result {
1227        CreateBudgetResult::Created { .. } => StatusCode::CREATED,
1228        CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
1229    };
1230    Ok((status, Json(result)))
1231}
1232
1233async fn get_budget_status(
1234    State(server): State<Arc<Server>>,
1235    Path(id): Path<String>,
1236) -> Result<Json<BudgetStatus>, ApiError> {
1237    let bid = parse_budget_id(&id)?;
1238    Ok(Json(server.get_budget_status(&bid).await?))
1239}
1240
1241#[derive(Deserialize)]
1242struct ReportUsageBody {
1243    dimensions: HashMap<String, u64>,
1244    now: ff_core::types::TimestampMs,
1245    #[serde(default)]
1246    dedup_key: Option<String>,
1247}
1248
1249async fn report_usage(
1250    State(server): State<Arc<Server>>,
1251    Path(id): Path<String>,
1252    AppJson(body): AppJson<ReportUsageBody>,
1253) -> Result<Json<ReportUsageResult>, ApiError> {
1254    let bid = parse_budget_id(&id)?;
1255    let dims: Vec<String> = body.dimensions.keys().cloned().collect();
1256    let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
1257    let args = ReportUsageArgs {
1258        dimensions: dims,
1259        deltas,
1260        now: body.now,
1261        dedup_key: body.dedup_key,
1262    };
1263    Ok(Json(server.report_usage(&bid, &args).await?))
1264}
1265
1266async fn reset_budget(
1267    State(server): State<Arc<Server>>,
1268    Path(id): Path<String>,
1269) -> Result<Json<ResetBudgetResult>, ApiError> {
1270    let bid = parse_budget_id(&id)?;
1271    Ok(Json(server.reset_budget(&bid).await?))
1272}
1273
1274async fn create_quota_policy(
1275    State(server): State<Arc<Server>>,
1276    AppJson(args): AppJson<CreateQuotaPolicyArgs>,
1277) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
1278    let result = server.create_quota_policy(&args).await?;
1279    let status = match &result {
1280        CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
1281        CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
1282    };
1283    Ok((status, Json(result)))
1284}
1285
1286// ── Health check ──
1287
1288#[derive(Serialize)]
1289struct HealthResponse {
1290    status: &'static str,
1291}
1292
1293async fn healthz(
1294    State(server): State<Arc<Server>>,
1295) -> Result<Json<HealthResponse>, ApiError> {
1296    let _: String = server
1297        .client()
1298        .cmd("PING")
1299        .execute()
1300        .await
1301        .map_err(|e| ApiError(crate::server::backend_context(e, "healthz PING")))?;
1302    Ok(Json(HealthResponse { status: "ok" }))
1303}
1304
1305// ── ID parsing helpers ──
1306
1307/// Return 400 if the body contains an ID that differs from the path ID.
1308fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1309    if body_id != path_id {
1310        return Err(ApiError(ServerError::InvalidInput(format!(
1311            "path {id_name} does not match body {id_name}"
1312        ))));
1313    }
1314    Ok(())
1315}
1316
1317fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1318    ExecutionId::parse(s)
1319        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1320}
1321
1322fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1323    FlowId::parse(s)
1324        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1325}
1326
1327fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1328    BudgetId::parse(s)
1329        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1330}
1331
1332#[cfg(test)]
1333mod cors_tests {
1334    //! CORS + auth preflight tests (#66, #71).
1335    //!
1336    //! Exercised via `tower::ServiceExt::oneshot` against a minimal router
1337    //! that applies only the CORS layer to a noop `/healthz`-style route.
1338    //! This mirrors what browsers actually see (the `CorsLayer` is terminal
1339    //! for OPTIONS preflights) without needing a live `Server`.
1340    use super::*;
1341    use axum::body::Body;
1342    use axum::http::{Request, StatusCode};
1343    use axum::routing::get;
1344    use tower::ServiceExt;
1345
1346    fn app_with_cors(origins: &[String], auth_enabled: bool) -> Router {
1347        let cors = build_cors_layer(origins, auth_enabled)
1348            .expect("build_cors_layer succeeds for valid inputs");
1349        Router::new().route("/noop", get(|| async { "ok" })).layer(cors)
1350    }
1351
1352    #[test]
1353    fn all_origins_invalid_returns_config_error_instead_of_permissive() {
1354        // #71: invalid HeaderValue (contains a control character) should
1355        // fail closed rather than falling back to permissive CORS.
1356        //
1357        // Note: `HeaderValue` parsing is intentionally lax — it accepts
1358        // almost any printable ASCII, so "not a url" parses fine. We use
1359        // a string with an embedded NUL byte to exercise the actual parse
1360        // failure path.
1361        let err = build_cors_layer(&["bad\0origin".to_owned()], false).unwrap_err();
1362        let ConfigError::InvalidValue { var, message } = err;
1363        assert_eq!(var, "FF_CORS_ORIGINS");
1364        assert!(
1365            message.contains("all configured origins failed to parse"),
1366            "message was: {message}"
1367        );
1368    }
1369
1370    #[test]
1371    fn wildcard_still_returns_permissive() {
1372        // Explicit `*` is still the documented permissive path.
1373        let layer = build_cors_layer(&["*".to_owned()], false);
1374        assert!(layer.is_ok());
1375    }
1376
1377    #[test]
1378    fn empty_origins_returns_empty_allowlist_ok() {
1379        // Empty list (no origins configured) produces an empty allowlist;
1380        // no browser origin matches. This is NOT the fail-open case — that
1381        // only happens when origins are configured but all parse-invalid.
1382        let layer = build_cors_layer(&[], false);
1383        assert!(layer.is_ok());
1384    }
1385
1386    #[test]
1387    fn mixed_valid_and_invalid_keeps_valid_entries() {
1388        // A partial typo should not fail the whole boot — we log a warning
1389        // and keep the parsable entries.
1390        let layer = build_cors_layer(
1391            &["https://ok.example.com".to_owned(), "bad\0origin".to_owned()],
1392            false,
1393        );
1394        assert!(layer.is_ok());
1395    }
1396
1397    #[tokio::test]
1398    async fn preflight_allows_authorization_when_auth_enabled() {
1399        // #66: with token auth on, browsers need `Authorization` in the
1400        // preflight's `Access-Control-Allow-Headers` response.
1401        let app = app_with_cors(&["https://client.example.com".to_owned()], true);
1402
1403        let req = Request::builder()
1404            .method("OPTIONS")
1405            .uri("/noop")
1406            .header("origin", "https://client.example.com")
1407            .header("access-control-request-method", "GET")
1408            .header("access-control-request-headers", "authorization,content-type")
1409            .body(Body::empty())
1410            .unwrap();
1411
1412        let resp = app.oneshot(req).await.unwrap();
1413        assert_eq!(resp.status(), StatusCode::OK);
1414
1415        let allow_headers = resp
1416            .headers()
1417            .get("access-control-allow-headers")
1418            .expect("access-control-allow-headers present")
1419            .to_str()
1420            .unwrap()
1421            .to_ascii_lowercase();
1422        assert!(
1423            allow_headers.contains("authorization"),
1424            "expected `authorization` in Access-Control-Allow-Headers, got: {allow_headers}"
1425        );
1426        assert!(
1427            allow_headers.contains("content-type"),
1428            "expected `content-type` in Access-Control-Allow-Headers, got: {allow_headers}"
1429        );
1430    }
1431
1432    #[tokio::test]
1433    async fn preflight_omits_authorization_when_auth_disabled() {
1434        // Negative case: without auth, Authorization is not needed and
1435        // should not be advertised (principle of least privilege).
1436        let app = app_with_cors(&["https://client.example.com".to_owned()], false);
1437
1438        let req = Request::builder()
1439            .method("OPTIONS")
1440            .uri("/noop")
1441            .header("origin", "https://client.example.com")
1442            .header("access-control-request-method", "GET")
1443            .header("access-control-request-headers", "content-type")
1444            .body(Body::empty())
1445            .unwrap();
1446
1447        let resp = app.oneshot(req).await.unwrap();
1448        assert_eq!(resp.status(), StatusCode::OK);
1449
1450        let allow_headers = resp
1451            .headers()
1452            .get("access-control-allow-headers")
1453            .expect("access-control-allow-headers present")
1454            .to_str()
1455            .unwrap()
1456            .to_ascii_lowercase();
1457        assert!(
1458            !allow_headers.contains("authorization"),
1459            "Authorization should not be advertised when auth is off; got: {allow_headers}"
1460        );
1461        assert!(allow_headers.contains("content-type"));
1462    }
1463}
1464
1465#[cfg(test)]
1466mod claim_grant_dto_tests {
1467    //! Wire-shape tests for `ClaimGrantDto` (issue #91).
1468    //!
1469    //! Pins the exact JSON shape the server emits on the
1470    //! `POST /v1/workers/{id}/claim` 200 response so any drift shows
1471    //! up at compile-adjacent test time rather than on a live SDK.
1472    use super::*;
1473    use ff_core::contracts::ClaimGrant;
1474    use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1475    use ff_core::types::{ExecutionId, FlowId};
1476
1477    fn sample_grant(family: PartitionFamily) -> ClaimGrant {
1478        let config = ff_core::partition::PartitionConfig::default();
1479        let fid = FlowId::new();
1480        let eid = ExecutionId::for_flow(&fid, &config);
1481        let p = Partition { family, index: 7 };
1482        ClaimGrant {
1483            execution_id: eid,
1484            partition_key: PartitionKey::from(&p),
1485            grant_key: "ff:exec:{fp:7}:deadbeef:claim_grant".to_owned(),
1486            expires_at_ms: 1_700_000_000_000,
1487        }
1488    }
1489
1490    #[test]
1491    fn claim_grant_dto_emits_opaque_partition_key() {
1492        let dto = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1493        let json = serde_json::to_value(&dto).unwrap();
1494        // `partition_key` serialises transparent: a bare string, not
1495        // an object, not a `partition_family`/`partition_index` pair.
1496        assert_eq!(json["partition_key"], serde_json::json!("{fp:7}"));
1497        assert!(json.get("partition_family").is_none());
1498        assert!(json.get("partition_index").is_none());
1499        assert_eq!(json["expires_at_ms"], serde_json::json!(1_700_000_000_000u64));
1500    }
1501
1502    #[test]
1503    fn claim_grant_dto_collapses_execution_alias_on_wire() {
1504        // RFC-011 §11 alias: Execution-family grants emit the same
1505        // `{fp:N}` hash tag as Flow. This test pins the wire-level
1506        // equivalence.
1507        let dto_flow = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1508        let dto_exec = ClaimGrantDto::from(sample_grant(PartitionFamily::Execution));
1509        let jf = serde_json::to_value(&dto_flow).unwrap();
1510        let je = serde_json::to_value(&dto_exec).unwrap();
1511        assert_eq!(jf["partition_key"], je["partition_key"]);
1512    }
1513}