Skip to main content

ff_server/
api.rs

1//! REST API layer — thin axum handlers over Server methods.
2
3use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8    extract::{DefaultBodyLimit, MatchedPath, Path, Query, Request, State},
9    http::StatusCode,
10    middleware,
11    response::{IntoResponse, Response},
12    routing::{get, post, put, MethodRouter},
13    Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::Method;
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::config::ConfigError;
25use crate::server::{Server, ServerError};
26
27// ── Per-route body-size limits (#97) ──
28//
29// Axum's out-of-the-box `DefaultBodyLimit` is 2 MiB and was applied silently
30// with no per-route override, meaning a misrouted multi-GB JSON would still
31// be buffered up to 2 MiB before rejection and mega-byte payloads on
32// control-plane endpoints (cancel, priority, claim, …) were accepted with
33// no protection beyond that global cap.
34//
35// We apply three categories, ordered by generosity:
36//
37//   * `BODY_LIMIT_LARGE_PAYLOAD` (1 MiB) — carries an application
38//     `input_payload` (`POST /v1/executions`). FlowFabric has no Lua-side
39//     cap on `input_payload` (`execution.lua` SETs whatever is passed);
40//     1 MiB matches common workflow-engine conventions (e.g. AWS Step
41//     Functions input size limit is 256 KB, Temporal's default is 2 MiB)
42//     and is an order of magnitude above typical JSON control envelopes.
43//
44//   * `BODY_LIMIT_MEDIUM_PAYLOAD` (256 KiB) — signal delivery
45//     (`POST /v1/executions/{id}/signal`) carries a `DeliverSignalArgs`
46//     with an optional `Vec<u8> payload`. Signals are notification-shaped
47//     events (webhooks, human approvals), not bulk-data transfers; 256 KiB
48//     is the same ceiling Step Functions imposes on task inputs and is
49//     comfortably above any real correlation-id / approval-blob.
50//
51//   * `BODY_LIMIT_CONTROL` (64 KiB) — everything else. Control-plane JSON
52//     bodies (cancel args, priority changes, claim requests, flow
53//     members, budget/quota definitions, rotate-secret keys) are all tiny
54//     fixed-shape records; 64 KiB leaves plenty of headroom for
55//     metadata-heavy bodies while capping DoS amplification by ~32× vs
56//     the previous axum default.
57//
58// Cross-checked against `lua/*.lua` — the only payload-size caps in the
59// Lua layer are `CAPS_MAX_BYTES=4096` / `CAPS_MAX_TOKENS=256` for
60// capability lists (`lua/helpers.lua`). No other Lua path enforces a
61// byte-length limit on incoming payloads, so HTTP is the right layer to
62// draw the line.
63//
64// Future admin routes that need larger bodies (e.g. bulk import) should
65// introduce a new `BODY_LIMIT_ADMIN_BULK` const and opt in per-route
66// rather than bumping `BODY_LIMIT_CONTROL`.
67
68const BODY_LIMIT_LARGE_PAYLOAD: usize = 1024 * 1024;     // 1 MiB
69const BODY_LIMIT_MEDIUM_PAYLOAD: usize = 256 * 1024;     // 256 KiB
70const BODY_LIMIT_CONTROL: usize = 64 * 1024;             // 64 KiB
71
72/// Limit metadata attached to a request via extension so the 413 error body
73/// can report the route's configured cap. The `DefaultBodyLimit::max(bytes)`
74/// layer enforces; this struct only exists for response shape.
75#[derive(Clone, Copy)]
76struct BodyLimit {
77    bytes: usize,
78}
79
80/// Apply per-route body-size cap + attach [`BodyLimit`] extension so the
81/// JSON rejection handler can report `limit_bytes` in the 413 response.
82///
83/// Three layers, each guarding a different failure mode:
84///
85///   1. `from_fn(enforce_body_limit)` — inspects `Content-Length` up front
86///      and rejects with our structured 413 body BEFORE the handler runs.
87///      This is the primary enforcement path and works regardless of
88///      whether the handler consumes the body. Also attaches the
89///      [`BodyLimit`] extension so `AppJson`'s 413 branch (below, case 2)
90///      can report `limit_bytes` even when rejection happens during
91///      streaming.
92///   2. `DefaultBodyLimit::max(bytes)` — hint consumed by `AppJson`
93///      (`Bytes::from_request` internally) for the case where a client
94///      lies in `Content-Length` or uses `Transfer-Encoding: chunked`:
95///      the body extractor trips on a streaming-length check and we
96///      rewrite the rejection into the structured 413 shape in
97///      `AppJson::from_request`.
98///   3. `tower_http::limit::RequestBodyLimitLayer::new(bytes)` —
99///      transport-level body cap that also fires on chunked transfers
100///      even when the handler never consumes the body. Returns a plain
101///      413 (no JSON body) in that edge case; the structured path in
102///      (1) covers the overwhelmingly common case (clients sending
103///      Content-Length).
104fn with_body_limit(route: MethodRouter<Arc<Server>>, bytes: usize) -> MethodRouter<Arc<Server>> {
105    let r: MethodRouter<Arc<Server>> =
106        route.layer(tower_http::limit::RequestBodyLimitLayer::new(bytes));
107    let r: MethodRouter<Arc<Server>> = r.layer(DefaultBodyLimit::max(bytes));
108    r.layer(middleware::from_fn(
109        move |req: Request, next: middleware::Next| enforce_body_limit(bytes, req, next),
110    ))
111}
112
113/// Content-Length-based body-size enforcement + [`BodyLimit`] extension.
114///
115/// Runs before the handler so routes whose handlers never consume the
116/// request body (`replay_execution`, `revoke_lease`, `reset_budget` — see
117/// Copilot review on PR#100) are still protected against large uploads.
118async fn enforce_body_limit(bytes: usize, mut req: Request, next: middleware::Next) -> Response {
119    if let Some(content_length) = req
120        .headers()
121        .get(axum::http::header::CONTENT_LENGTH)
122        .and_then(|v| v.to_str().ok())
123        .and_then(|s| s.parse::<usize>().ok())
124        && content_length > bytes
125    {
126        let route = req
127            .extensions()
128            .get::<MatchedPath>()
129            .map(|m| m.as_str().to_owned())
130            .unwrap_or_default();
131        let body = PayloadTooLargeBody {
132            error: "payload_too_large",
133            limit_bytes: bytes,
134            route,
135        };
136        return (StatusCode::PAYLOAD_TOO_LARGE, Json(body)).into_response();
137    }
138    req.extensions_mut().insert(BodyLimit { bytes });
139    next.run(req).await
140}
141
142/// Shape for the 413 response body.
143#[derive(Serialize)]
144struct PayloadTooLargeBody {
145    error: &'static str,
146    limit_bytes: usize,
147    route: String,
148}
149
150// ── Custom JSON extractor (uniform JSON error on malformed body) ──
151
152struct AppJson<T>(T);
153
154impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
155where
156    T: serde::de::DeserializeOwned + Send,
157    S: Send + Sync,
158{
159    type Rejection = Response;
160
161    async fn from_request(
162        req: axum::extract::Request,
163        state: &S,
164    ) -> Result<Self, Self::Rejection> {
165        // Snapshot metadata for a potential 413 before `Json::from_request`
166        // consumes the request. `BodyLimit` is `Copy` and `MatchedPath` is
167        // cheap to clone (its payload is an `Arc<str>`), so the non-413
168        // path pays at most two extension lookups plus a ref-count bump.
169        let limit = req.extensions().get::<BodyLimit>().copied();
170        let matched = req.extensions().get::<MatchedPath>().cloned();
171
172        match Json::<T>::from_request(req, state).await {
173            Ok(Json(value)) => Ok(AppJson(value)),
174            Err(rejection) => {
175                let status = rejection.status();
176                tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
177                if status == StatusCode::PAYLOAD_TOO_LARGE {
178                    let limit_bytes = limit.map(|l| l.bytes).unwrap_or(0);
179                    let route = matched
180                        .as_ref()
181                        .map(|m| m.as_str().to_owned())
182                        .unwrap_or_default();
183                    let body = PayloadTooLargeBody {
184                        error: "payload_too_large",
185                        limit_bytes,
186                        route,
187                    };
188                    return Err((status, Json(body)).into_response());
189                }
190                let body = ErrorBody::plain(format!(
191                    "invalid JSON: {}",
192                    status.canonical_reason().unwrap_or("bad request"),
193                ));
194                Err((status, Json(body)).into_response())
195            }
196        }
197    }
198}
199
200// ── Error handling ──
201
202struct ApiError(ServerError);
203
204impl From<ServerError> for ApiError {
205    fn from(e: ServerError) -> Self {
206        Self(e)
207    }
208}
209
210/// HTTP error body. `kind`/`retryable` are populated for 500s backed by a
211/// `ferriskey::Error` so HTTP clients (e.g. cairn-fabric) can make retry
212/// decisions without parsing the `error` string.
213#[derive(Serialize)]
214struct ErrorBody {
215    error: String,
216    #[serde(skip_serializing_if = "Option::is_none")]
217    kind: Option<String>,
218    #[serde(skip_serializing_if = "Option::is_none")]
219    retryable: Option<bool>,
220}
221
222impl ErrorBody {
223    fn plain(error: String) -> Self {
224        Self { error, kind: None, retryable: None }
225    }
226}
227
228impl IntoResponse for ApiError {
229    fn into_response(self) -> Response {
230        use ff_script::retry::kind_to_stable_str;
231
232        let (status, body) = match &self.0 {
233            ServerError::NotFound(msg) => {
234                (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
235            }
236            ServerError::InvalidInput(msg) => {
237                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
238            }
239            ServerError::OperationFailed(msg) => {
240                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
241            }
242            ServerError::ConcurrencyLimitExceeded(source, max) => (
243                StatusCode::TOO_MANY_REQUESTS,
244                ErrorBody {
245                    error: format!(
246                        "too many concurrent {source} calls (server max: {max}); retry with backoff"
247                    ),
248                    kind: None,
249                    retryable: Some(true),
250                },
251            ),
252            ServerError::Valkey(e) => {
253                let kind_str = kind_to_stable_str(e.kind());
254                tracing::error!(
255                    kind = kind_str,
256                    code = e.code().unwrap_or(""),
257                    detail = e.detail().unwrap_or(""),
258                    "valkey error"
259                );
260                (
261                    StatusCode::INTERNAL_SERVER_ERROR,
262                    ErrorBody {
263                        error: self.0.to_string(),
264                        kind: Some(kind_str.to_owned()),
265                        retryable: Some(self.0.is_retryable()),
266                    },
267                )
268            }
269            ServerError::ValkeyContext { source, context } => {
270                let kind_str = kind_to_stable_str(source.kind());
271                tracing::error!(
272                    kind = kind_str,
273                    code = source.code().unwrap_or(""),
274                    detail = source.detail().unwrap_or(""),
275                    context = %context,
276                    "valkey error"
277                );
278                (
279                    StatusCode::INTERNAL_SERVER_ERROR,
280                    ErrorBody {
281                        error: self.0.to_string(),
282                        kind: Some(kind_str.to_owned()),
283                        retryable: Some(self.0.is_retryable()),
284                    },
285                )
286            }
287            ServerError::LibraryLoad(load_err) => {
288                let kind_str = load_err.valkey_kind().map(kind_to_stable_str);
289                tracing::error!(
290                    kind = kind_str.unwrap_or(""),
291                    error = %load_err,
292                    "library load failure"
293                );
294                (
295                    StatusCode::INTERNAL_SERVER_ERROR,
296                    ErrorBody {
297                        error: format!("library load: {load_err}"),
298                        kind: kind_str.map(str::to_owned),
299                        retryable: Some(self.0.is_retryable()),
300                    },
301                )
302            }
303            // Script / Config / PartitionMismatch — developer or deployment
304            // errors. No Valkey ErrorKind to surface, but retryable=false is
305            // informative: a client-side retry won't change the outcome.
306            other => (
307                StatusCode::INTERNAL_SERVER_ERROR,
308                ErrorBody {
309                    error: other.to_string(),
310                    kind: None,
311                    retryable: Some(false),
312                },
313            ),
314        };
315        (status, Json(body)).into_response()
316    }
317}
318
319// ── Router ──
320
321pub fn router(
322    server: Arc<Server>,
323    cors_origins: &[String],
324    api_token: Option<String>,
325) -> Result<Router, ConfigError> {
326    router_with_metrics(server, cors_origins, api_token, None)
327}
328
329/// Router entry point that also mounts `/metrics` and the HTTP metrics
330/// middleware. Used by `main.rs` when the `observability` feature is on.
331///
332/// When `metrics` is `Some` AND the `observability` feature is compiled
333/// in, `/metrics` is mounted on an un-authenticated nested router (auth
334/// middleware does not apply — Prometheus convention). When the
335/// feature is off OR `metrics` is `None`, no `/metrics` route exists
336/// (returns 404) and no HTTP metrics middleware is installed.
337pub fn router_with_metrics(
338    server: Arc<Server>,
339    cors_origins: &[String],
340    api_token: Option<String>,
341    #[cfg_attr(not(feature = "observability"), allow(unused_variables))]
342    metrics: Option<Arc<crate::Metrics>>,
343) -> Result<Router, ConfigError> {
344    let auth_enabled = api_token.is_some();
345    let cors = build_cors_layer(cors_origins, auth_enabled)?;
346
347    // Per-route body-size caps (#97). See module-level `BODY_LIMIT_*` consts
348    // for the category rationale; each route picks the tightest cap that
349    // still accommodates expected real traffic.
350    let mut app = Router::new()
351        // Executions
352        .route(
353            "/v1/executions",
354            with_body_limit(
355                get(list_executions).post(create_execution),
356                BODY_LIMIT_LARGE_PAYLOAD,
357            ),
358        )
359        .route("/v1/executions/{id}", get(get_execution))
360        .route("/v1/executions/{id}/state", get(get_execution_state))
361        .route(
362            "/v1/executions/{id}/pending-waitpoints",
363            get(list_pending_waitpoints),
364        )
365        .route("/v1/executions/{id}/result", get(get_execution_result))
366        .route(
367            "/v1/executions/{id}/cancel",
368            with_body_limit(post(cancel_execution), BODY_LIMIT_CONTROL),
369        )
370        .route(
371            "/v1/executions/{id}/signal",
372            with_body_limit(post(deliver_signal), BODY_LIMIT_MEDIUM_PAYLOAD),
373        )
374        .route(
375            "/v1/executions/{id}/priority",
376            with_body_limit(put(change_priority), BODY_LIMIT_CONTROL),
377        )
378        .route(
379            "/v1/executions/{id}/replay",
380            with_body_limit(post(replay_execution), BODY_LIMIT_CONTROL),
381        )
382        .route(
383            "/v1/executions/{id}/revoke-lease",
384            with_body_limit(post(revoke_lease), BODY_LIMIT_CONTROL),
385        )
386        // Scheduler-routed claim (Batch C item 2). Worker POSTs lane +
387        // identity + capabilities; server runs budget/quota/capability
388        // admission via ff-scheduler and returns a ClaimGrant on
389        // success (204 No Content when no eligible execution).
390        .route(
391            "/v1/workers/{worker_id}/claim",
392            with_body_limit(post(claim_for_worker), BODY_LIMIT_CONTROL),
393        )
394        // Stream read + tail (RFC-006 #2)
395        .route(
396            "/v1/executions/{id}/attempts/{idx}/stream",
397            get(read_attempt_stream),
398        )
399        .route(
400            "/v1/executions/{id}/attempts/{idx}/stream/tail",
401            get(tail_attempt_stream),
402        )
403        // Flows
404        .route(
405            "/v1/flows",
406            with_body_limit(post(create_flow), BODY_LIMIT_CONTROL),
407        )
408        .route(
409            "/v1/flows/{id}/members",
410            with_body_limit(post(add_execution_to_flow), BODY_LIMIT_CONTROL),
411        )
412        .route(
413            "/v1/flows/{id}/cancel",
414            with_body_limit(post(cancel_flow), BODY_LIMIT_CONTROL),
415        )
416        .route(
417            "/v1/flows/{id}/edges",
418            with_body_limit(post(stage_dependency_edge), BODY_LIMIT_CONTROL),
419        )
420        .route(
421            "/v1/flows/{id}/edges/apply",
422            with_body_limit(post(apply_dependency_to_child), BODY_LIMIT_CONTROL),
423        )
424        // Budgets
425        .route(
426            "/v1/budgets",
427            with_body_limit(post(create_budget), BODY_LIMIT_CONTROL),
428        )
429        .route("/v1/budgets/{id}", get(get_budget_status))
430        .route(
431            "/v1/budgets/{id}/usage",
432            with_body_limit(post(report_usage), BODY_LIMIT_CONTROL),
433        )
434        .route(
435            "/v1/budgets/{id}/reset",
436            with_body_limit(post(reset_budget), BODY_LIMIT_CONTROL),
437        )
438        // Quotas
439        .route(
440            "/v1/quotas",
441            with_body_limit(post(create_quota_policy), BODY_LIMIT_CONTROL),
442        )
443        // Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security)
444        .route(
445            "/v1/admin/rotate-waitpoint-secret",
446            with_body_limit(post(rotate_waitpoint_secret), BODY_LIMIT_CONTROL),
447        )
448        // Health (always unauthenticated)
449        .route("/healthz", get(healthz));
450
451    if let Some(token) = api_token {
452        let token = Arc::new(token);
453        app = app.layer(middleware::from_fn(move |req, next| {
454            let token = token.clone();
455            auth_middleware(token, req, next)
456        }));
457    }
458
459    // PR-94: HTTP metrics middleware. Recorded AFTER the auth layer
460    // above so unauthorized 401s are still counted under their route,
461    // and BEFORE trace/cors so the metric captures handler time
462    // including the auth check itself. No-op when the
463    // `observability` feature is off.
464    #[cfg(feature = "observability")]
465    if let Some(m) = metrics.as_ref() {
466        let m = m.clone();
467        app = app.layer(middleware::from_fn_with_state(
468            m,
469            crate::metrics::http_middleware,
470        ));
471    }
472
473    #[cfg_attr(not(feature = "observability"), allow(unused_mut))]
474    let mut app = app
475        .layer(TraceLayer::new_for_http())
476        .layer(cors)
477        .with_state(server);
478
479    // PR-94: `/metrics` — intentionally unauthenticated. Mounted on a
480    // separate router with its own State so the main app's auth
481    // middleware does not run for scrapes (Prometheus convention).
482    // Network-layer auth (ingress ACL, service-mesh policy, or
483    // metrics-only listen address) is the expected gate; FF does not
484    // own auth for scrape endpoints.
485    #[cfg(feature = "observability")]
486    if let Some(m) = metrics {
487        let metrics_router: Router = Router::new()
488            .route("/metrics", get(crate::metrics::metrics_handler))
489            .with_state(m);
490        app = app.merge(metrics_router);
491    }
492
493    Ok(app)
494}
495
496async fn auth_middleware(
497    token: Arc<String>,
498    req: Request,
499    next: middleware::Next,
500) -> Response {
501    if req.uri().path() == "/healthz" {
502        return next.run(req).await;
503    }
504
505    let auth_header = req
506        .headers()
507        .get("authorization")
508        .and_then(|v| v.to_str().ok());
509
510    let authorized = auth_header
511        .and_then(|v| v.strip_prefix("Bearer "))
512        .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
513
514    if authorized {
515        next.run(req).await
516    } else {
517        (
518            StatusCode::UNAUTHORIZED,
519            Json(ErrorBody::plain(
520                "missing or invalid Authorization header".to_owned(),
521            )),
522        )
523            .into_response()
524    }
525}
526
527fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
528    if a.len() != b.len() {
529        return false;
530    }
531    let mut diff = 0u8;
532    for (x, y) in a.iter().zip(b.iter()) {
533        diff |= x ^ y;
534    }
535    diff == 0
536}
537
538/// Build the CORS layer from the configured `FF_CORS_ORIGINS` list.
539///
540/// Fails closed (#71): if every entry fails to parse as a `HeaderValue`,
541/// return `ConfigError` so startup aborts instead of silently falling back
542/// to `CorsLayer::permissive()` (a typo would otherwise broaden browser
543/// access to "allow any origin").
544///
545/// When `auth_enabled` is true (i.e. `FF_API_TOKEN` is set), `Authorization`
546/// is added to `allow_headers` (#66) so browser preflights for
547/// cross-origin authenticated requests succeed.
548fn build_cors_layer(origins: &[String], auth_enabled: bool) -> Result<CorsLayer, ConfigError> {
549    if origins.iter().any(|o| o == "*") {
550        return Ok(CorsLayer::permissive());
551    }
552
553    let mut parsed = Vec::with_capacity(origins.len());
554    let mut accepted = Vec::with_capacity(origins.len());
555    let mut invalid = Vec::new();
556    for o in origins {
557        match o.parse() {
558            Ok(v) => {
559                parsed.push(v);
560                accepted.push(o.as_str());
561            }
562            Err(_) => invalid.push(o.clone()),
563        }
564    }
565
566    if parsed.is_empty() && !origins.is_empty() {
567        return Err(ConfigError::InvalidValue {
568            var: "FF_CORS_ORIGINS".to_owned(),
569            message: format!(
570                "all configured origins failed to parse as valid HTTP header values: {:?}; \
571                 refusing to fall back to permissive CORS",
572                origins
573            ),
574        });
575    }
576
577    if !invalid.is_empty() {
578        // Collected `accepted` list during the single parse loop above to
579        // avoid an O(N*M) filter-contains scan per log event.
580        tracing::warn!(
581            ?invalid,
582            ?accepted,
583            "some FF_CORS_ORIGINS entries failed to parse and were dropped"
584        );
585    }
586
587    // Prefer typed `header::*` constants over stringly-typed `from_static`
588    // so typos fail to compile rather than fail at runtime.
589    let mut headers = vec![axum::http::header::CONTENT_TYPE];
590    if auth_enabled {
591        headers.push(axum::http::header::AUTHORIZATION);
592    }
593
594    Ok(CorsLayer::new()
595        .allow_origin(AllowOrigin::list(parsed))
596        .allow_methods([Method::GET, Method::POST, Method::PUT])
597        .allow_headers(headers))
598}
599
600// ── Execution handlers ──
601
602#[derive(Deserialize)]
603struct ListExecutionsParams {
604    partition: u16,
605    #[serde(default = "default_lane")]
606    lane: String,
607    #[serde(default = "default_state_filter")]
608    state: String,
609    #[serde(default = "default_limit")]
610    limit: u64,
611    #[serde(default)]
612    offset: u64,
613}
614
615fn default_lane() -> String { "default".to_owned() }
616fn default_state_filter() -> String { "eligible".to_owned() }
617fn default_limit() -> u64 { 50 }
618
619async fn list_executions(
620    State(server): State<Arc<Server>>,
621    Query(params): Query<ListExecutionsParams>,
622) -> Result<Json<ListExecutionsResult>, ApiError> {
623    let lane = ff_core::types::LaneId::try_new(params.lane.clone())
624        .map_err(|e| ApiError::from(ServerError::InvalidInput(format!("invalid lane: {e}"))))?;
625    let limit = params.limit.min(1000);
626    let result = server
627        .list_executions(params.partition, &lane, &params.state, params.offset, limit)
628        .await?;
629    Ok(Json(result))
630}
631
632async fn create_execution(
633    State(server): State<Arc<Server>>,
634    AppJson(args): AppJson<CreateExecutionArgs>,
635) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
636    let result = server.create_execution(&args).await?;
637    let status = match &result {
638        CreateExecutionResult::Created { .. } => StatusCode::CREATED,
639        CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
640    };
641    Ok((status, Json(result)))
642}
643
644async fn get_execution(
645    State(server): State<Arc<Server>>,
646    Path(id): Path<String>,
647) -> Result<Json<ExecutionInfo>, ApiError> {
648    let eid = parse_execution_id(&id)?;
649    Ok(Json(server.get_execution(&eid).await?))
650}
651
652async fn get_execution_state(
653    State(server): State<Arc<Server>>,
654    Path(id): Path<String>,
655) -> Result<Json<PublicState>, ApiError> {
656    let eid = parse_execution_id(&id)?;
657    Ok(Json(server.get_execution_state(&eid).await?))
658}
659
660/// Returns the actionable (`pending`/`active`) waitpoints for an
661/// execution, including the HMAC `waitpoint_token` required to deliver
662/// signals. Human reviewers use this to look up the token originally
663/// returned only to the suspending worker's `SuspendOutcome`.
664///
665/// SECURITY: `waitpoint_token` is a bearer credential for signal
666/// delivery; leaking it lets a third party forge authority to resume or
667/// influence the execution. Gate the endpoint behind `FF_API_TOKEN` in
668/// any deployment reachable from untrusted networks. The auth middleware
669/// only mounts when `FF_API_TOKEN` is set; this endpoint is
670/// unauthenticated without it, and the server logs a loud warning at
671/// startup so operators notice.
672async fn list_pending_waitpoints(
673    State(server): State<Arc<Server>>,
674    Path(id): Path<String>,
675) -> Result<Json<Vec<PendingWaitpointInfo>>, ApiError> {
676    let eid = parse_execution_id(&id)?;
677    Ok(Json(server.list_pending_waitpoints(&eid).await?))
678}
679
680/// Returns the raw result payload bytes written by the worker's
681/// `ff_complete_execution` call. 404 when the execution has no stored
682/// result (missing entirely, still in-flight, or trimmed by retention —
683/// see below).
684///
685/// # Ordering (required)
686///
687/// Callers MUST poll `GET /v1/executions/{id}/state` until it returns
688/// `completed` before fetching `/result`. Early polls may return 404
689/// because completion writes `public_state = completed` and the result
690/// `SET` in the same atomic Lua; in the normal path the window is
691/// effectively zero, but network round-trip ordering between a state
692/// poll and a result fetch can make the result appear briefly absent
693/// during replay (`ff_replay_execution`).
694///
695/// # Retention / 404 after completed
696///
697/// `get_execution_state == completed` is authoritative for completion.
698/// This endpoint additionally depends on the result bytes not having
699/// been trimmed — v1 sets no retention policy, so
700/// `state = completed` should always pair with a 200 here. Any
701/// future retention-policy feature must call this contract out in its
702/// own docs.
703///
704/// CONTENT-TYPE: `application/octet-stream`. The server is payload-format
705/// agnostic — workers choose the encoding via the SDK's `complete(bytes)`
706/// call, and callers must know the contract. The media-pipeline example
707/// uses JSON by convention (`serde_json::to_vec(&Result)`); adapters can
708/// pick any binary format.
709///
710/// SECURITY: completion payloads can contain PII (e.g. LLM summaries of
711/// user audio). Treat this endpoint like any other read — gate behind
712/// `FF_API_TOKEN` in any deployment reachable from untrusted networks.
713/// The auth middleware only mounts when `FF_API_TOKEN` is set.
714async fn get_execution_result(
715    State(server): State<Arc<Server>>,
716    Path(id): Path<String>,
717) -> Result<Response, ApiError> {
718    let eid = parse_execution_id(&id)?;
719    match server.get_execution_result(&eid).await? {
720        Some(bytes) => Ok((
721            StatusCode::OK,
722            [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
723            bytes,
724        )
725            .into_response()),
726        None => Err(ApiError(ServerError::NotFound(format!(
727            "execution result not found: {eid}"
728        )))),
729    }
730}
731
732async fn cancel_execution(
733    State(server): State<Arc<Server>>,
734    Path(id): Path<String>,
735    AppJson(mut args): AppJson<CancelExecutionArgs>,
736) -> Result<Json<CancelExecutionResult>, ApiError> {
737    let path_eid = parse_execution_id(&id)?;
738    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
739    args.execution_id = path_eid;
740    Ok(Json(server.cancel_execution(&args).await?))
741}
742
743async fn deliver_signal(
744    State(server): State<Arc<Server>>,
745    Path(id): Path<String>,
746    AppJson(mut args): AppJson<DeliverSignalArgs>,
747) -> Result<Json<DeliverSignalResult>, ApiError> {
748    let path_eid = parse_execution_id(&id)?;
749    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
750    args.execution_id = path_eid;
751    Ok(Json(server.deliver_signal(&args).await?))
752}
753
754// ── Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security) ──
755
756#[derive(Deserialize)]
757struct RotateWaitpointSecretBody {
758    new_kid: String,
759    /// Hex-encoded new secret. Even-length, 0-9a-fA-F.
760    new_secret_hex: String,
761}
762
763/// Hard ceiling on how long the rotate endpoint runs before the HTTP
764/// handler bails. Rotation touches every execution partition (up to 256)
765/// with HGET+HMGET+HDEL+HSET per partition; 6 round-trips × 30ms cross-AZ
766/// × 256 partitions ≈ 46s worst-case. The internal SETNX lock is 10s TTL
767/// per partition, so 120s gives ample margin for contention + slow RTTs
768/// while staying below common LB idle timeouts (ALB 60s default, but
769/// typically bumped to 120s+ for admin endpoints).
770///
771/// On timeout: returns HTTP 504 immediately. Valkey-side work may still
772/// finish (the per-partition locks and HSETs are already in flight). The
773/// operator observes a 504 and retries; retry is SAFE — rotation is
774/// idempotent per-partition (same new_kid + same secret → no-op on
775/// already-rotated partitions).
776const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
777
778async fn rotate_waitpoint_secret(
779    State(server): State<Arc<Server>>,
780    AppJson(body): AppJson<RotateWaitpointSecretBody>,
781) -> Result<Response, ApiError> {
782    // Cap the whole endpoint end-to-end. If this trips, the caller's
783    // retry is SAFE — per-partition rotation is idempotent on the same
784    // (new_kid, secret_hex) and the per-partition SETNX lock prevents
785    // double-rotation under concurrent retries.
786    let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
787    let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
788        Ok(r) => r?,
789        Err(_) => {
790            tracing::error!(
791                target: "audit",
792                new_kid = %body.new_kid,
793                timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
794                "waitpoint_hmac_rotation_timeout_http_504"
795            );
796            let body = ErrorBody::plain(format!(
797                "rotation exceeded {}s server-side timeout; retry is safe \
798                 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
799                ROTATE_HTTP_TIMEOUT.as_secs()
800            ));
801            return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
802        }
803    };
804    // Operator action log at audit target (per-partition detail logged inside
805    // rotate_waitpoint_secret). Returns 400 only on actionable fault states.
806    //
807    // Two distinct rotated==0 cases:
808    //   - failed.is_empty() → no partitions at all (num_flow_partitions == 0;
809    //     env_u16_positive rejects this at boot so this is mostly dead code
810    //     for library/Default callers).
811    //   - !failed.is_empty() → every partition attempt raised a real error.
812    //     Operator investigates Valkey/auth/cluster health before retrying.
813    if result.rotated == 0 && result.failed.is_empty() {
814        return Err(ApiError::from(ServerError::OperationFailed(
815            "rotation had no partitions to operate on \
816             (num_flow_partitions is 0 — server misconfigured)"
817                .to_owned(),
818        )));
819    }
820    if result.rotated == 0 && !result.failed.is_empty() {
821        return Err(ApiError::from(ServerError::OperationFailed(
822            "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
823        )));
824    }
825    Ok(Json(result).into_response())
826}
827
828#[derive(Deserialize)]
829struct ChangePriorityBody {
830    new_priority: i32,
831}
832
833async fn change_priority(
834    State(server): State<Arc<Server>>,
835    Path(id): Path<String>,
836    AppJson(body): AppJson<ChangePriorityBody>,
837) -> Result<Json<ChangePriorityResult>, ApiError> {
838    let eid = parse_execution_id(&id)?;
839    Ok(Json(server.change_priority(&eid, body.new_priority).await?))
840}
841
842async fn replay_execution(
843    State(server): State<Arc<Server>>,
844    Path(id): Path<String>,
845) -> Result<Json<ReplayExecutionResult>, ApiError> {
846    let eid = parse_execution_id(&id)?;
847    Ok(Json(server.replay_execution(&eid).await?))
848}
849
850async fn revoke_lease(
851    State(server): State<Arc<Server>>,
852    Path(id): Path<String>,
853) -> Result<Json<RevokeLeaseResult>, ApiError> {
854    let eid = parse_execution_id(&id)?;
855    Ok(Json(server.revoke_lease(&eid).await?))
856}
857
858// ── Scheduler-routed claim (Batch C item 2 PR-B) ──
859//
860// The server exposes the scheduler's `claim_for_worker` cycle via
861// HTTP so ff-sdk workers can acquire claim grants without enabling
862// the `direct-valkey-claim` feature. The request body carries lane +
863// identity + capabilities; the server returns a serialized
864// `ClaimGrant` (or 204 No Content when no eligible execution exists).
865
866/// Request body for `POST /v1/workers/{worker_id}/claim`.
867#[derive(Deserialize)]
868struct ClaimForWorkerBody {
869    lane_id: String,
870    worker_instance_id: String,
871    /// Capability tokens this worker advertises. Sorted + validated
872    /// on the scheduler side; any non-printable/CSV-breaking token
873    /// surfaces as 400.
874    #[serde(default)]
875    capabilities: Vec<String>,
876    /// Grant TTL in milliseconds. Bounded so a worker can't request a
877    /// multi-hour grant and squat the execution.
878    grant_ttl_ms: u64,
879}
880
881/// Wire shape for `ff_core::contracts::ClaimGrant`. Carries the
882/// opaque [`ff_core::partition::PartitionKey`] on the wire; consumers
883/// never see the internal `PartitionFamily` enum (issue #91).
884#[derive(Serialize)]
885struct ClaimGrantDto {
886    execution_id: String,
887    partition_key: ff_core::partition::PartitionKey,
888    grant_key: String,
889    expires_at_ms: u64,
890}
891
892impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
893    fn from(g: ff_core::contracts::ClaimGrant) -> Self {
894        Self {
895            execution_id: g.execution_id.to_string(),
896            partition_key: g.partition_key,
897            grant_key: g.grant_key,
898            expires_at_ms: g.expires_at_ms,
899        }
900    }
901}
902
903/// Maximum grant TTL accepted via HTTP. Mirrors the scheduler's
904/// internal ceiling so a misconfigured worker can't squat an
905/// execution on a multi-hour grant.
906const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
907
908/// Reject empty / whitespace / non-printable identifiers the way
909/// [`LaneId::try_new`] does for lanes. WorkerId + WorkerInstanceId
910/// feed into scheduler scan jitter + Valkey key construction; silent
911/// acceptance of "" or "w\nork" would either mis-key or mis-hash.
912fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
913    if value.is_empty() {
914        return Err(ApiError(ServerError::InvalidInput(format!(
915            "{field}: must not be empty"
916        ))));
917    }
918    if value.len() > 256 {
919        return Err(ApiError(ServerError::InvalidInput(format!(
920            "{field}: exceeds 256 bytes (got {})",
921            value.len()
922        ))));
923    }
924    if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
925        return Err(ApiError(ServerError::InvalidInput(format!(
926            "{field}: must not contain whitespace or control characters"
927        ))));
928    }
929    Ok(())
930}
931
932async fn claim_for_worker(
933    State(server): State<Arc<Server>>,
934    Path(worker_id): Path<String>,
935    AppJson(body): AppJson<ClaimForWorkerBody>,
936) -> Result<Response, ApiError> {
937    validate_identifier("worker_id", &worker_id)?;
938    validate_identifier("worker_instance_id", &body.worker_instance_id)?;
939    let worker_id = WorkerId::new(worker_id);
940    let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
941    let lane = LaneId::try_new(body.lane_id).map_err(|e| {
942        ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
943    })?;
944    if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
945        return Err(ApiError(ServerError::InvalidInput(format!(
946            "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
947        ))));
948    }
949    let caps: std::collections::BTreeSet<String> =
950        body.capabilities.into_iter().collect();
951
952    match server
953        .claim_for_worker(
954            &lane,
955            &worker_id,
956            &worker_instance_id,
957            &caps,
958            body.grant_ttl_ms,
959        )
960        .await?
961    {
962        Some(grant) => Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response()),
963        None => Ok(StatusCode::NO_CONTENT.into_response()),
964    }
965}
966
967// ── Stream read + tail ──
968
969#[derive(Deserialize)]
970struct ReadStreamParams {
971    #[serde(default = "ff_core::contracts::StreamCursor::start")]
972    from: ff_core::contracts::StreamCursor,
973    #[serde(default = "ff_core::contracts::StreamCursor::end")]
974    to: ff_core::contracts::StreamCursor,
975    #[serde(default = "default_read_limit")]
976    limit: u64,
977}
978
979fn default_read_limit() -> u64 { 100 }
980
981#[derive(Serialize)]
982struct ReadStreamResponse {
983    frames: Vec<StreamFrame>,
984    count: usize,
985    /// When set, the producer has closed this stream — consumer should
986    /// stop polling. Absent when the stream is still open (or never
987    /// existed, which is indistinguishable from "still open" at this
988    /// layer).
989    #[serde(skip_serializing_if = "Option::is_none")]
990    closed_at: Option<i64>,
991    /// Reason from the closing writer: `attempt_success`, `attempt_failure`,
992    /// `attempt_cancelled`, `attempt_interrupted`. Absent iff still open.
993    #[serde(skip_serializing_if = "Option::is_none")]
994    closed_reason: Option<String>,
995}
996
997impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
998    fn from(sf: ff_core::contracts::StreamFrames) -> Self {
999        let count = sf.frames.len();
1000        Self {
1001            frames: sf.frames,
1002            count,
1003            closed_at: sf.closed_at.map(|t| t.0),
1004            closed_reason: sf.closed_reason,
1005        }
1006    }
1007}
1008
1009/// REST-layer ceiling on `limit` for stream read/tail responses. Lower
1010/// than the internal `STREAM_READ_HARD_CAP` (10_000) because an HTTP
1011/// response buffers the whole JSON body in memory in axum — a
1012/// `10_000 × max_payload_bytes (65_536)` body is ~640MB per call, which
1013/// is a DoS vector from a single client. Internal callers using FCALL or
1014/// the SDK directly still get the full 10_000 ceiling; REST clients must
1015/// paginate through `from`/`to` for larger spans.
1016///
1017/// v2 candidate: chunked-transfer / SSE when the caller wants > this bound.
1018const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
1019
1020async fn read_attempt_stream(
1021    State(server): State<Arc<Server>>,
1022    Path((id, idx)): Path<(String, u32)>,
1023    Query(params): Query<ReadStreamParams>,
1024) -> Result<Json<ReadStreamResponse>, ApiError> {
1025    if params.limit == 0 {
1026        return Err(ApiError(ServerError::InvalidInput(
1027            "limit must be >= 1".to_owned(),
1028        )));
1029    }
1030    if params.limit > REST_STREAM_LIMIT_CEILING {
1031        return Err(ApiError(ServerError::InvalidInput(format!(
1032            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
1033        ))));
1034    }
1035    let eid = parse_execution_id(&id)?;
1036    let attempt_index = AttemptIndex::new(idx);
1037    let result = server
1038        .read_attempt_stream(
1039            &eid,
1040            attempt_index,
1041            params.from.to_wire(),
1042            params.to.to_wire(),
1043            params.limit,
1044        )
1045        .await?;
1046    Ok(Json(result.into()))
1047}
1048
1049#[derive(Deserialize)]
1050struct TailStreamParams {
1051    #[serde(default = "ff_core::contracts::StreamCursor::beginning")]
1052    after: ff_core::contracts::StreamCursor,
1053    #[serde(default)]
1054    block_ms: u64,
1055    #[serde(default = "default_tail_limit")]
1056    limit: u64,
1057}
1058
1059fn default_tail_limit() -> u64 { 50 }
1060
1061/// Ceiling on BLOCK duration for the tail endpoint. Kept below common LB
1062/// idle timeouts (ALB 60s, nginx 60s, Cloudflare 100s) so the HTTP response
1063/// can't be cut mid-block.
1064///
1065/// Note: ferriskey's client auto-extends its `request_timeout` for XREAD
1066/// BLOCK to `block_ms + 500ms`, so a blocking call with the full ceiling
1067/// never produces a spurious transport timeout. See
1068/// `ff_script::stream_tail` module docs for the exact ferriskey code path.
1069const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1070
1071async fn tail_attempt_stream(
1072    State(server): State<Arc<Server>>,
1073    Path((id, idx)): Path<(String, u32)>,
1074    Query(params): Query<TailStreamParams>,
1075) -> Result<Json<ReadStreamResponse>, ApiError> {
1076    if params.block_ms > MAX_TAIL_BLOCK_MS {
1077        return Err(ApiError(ServerError::InvalidInput(format!(
1078            "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
1079        ))));
1080    }
1081    if params.limit == 0 {
1082        return Err(ApiError(ServerError::InvalidInput(
1083            "limit must be >= 1".to_owned(),
1084        )));
1085    }
1086    if params.limit > REST_STREAM_LIMIT_CEILING {
1087        return Err(ApiError(ServerError::InvalidInput(format!(
1088            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
1089        ))));
1090    }
1091    // XREAD cursor must be a concrete ID — `Start`/`End` are
1092    // XRANGE-only. The opaque-cursor deserializer already rejects
1093    // the bare `-`/`+` wire tokens; this boundary also rejects the
1094    // structured `start`/`end` keywords because XREAD treats them
1095    // as invalid ids. Uses [`StreamCursor::is_concrete`] so the
1096    // SDK + REST guards stay in lock-step.
1097    if !params.after.is_concrete() {
1098        return Err(ApiError(ServerError::InvalidInput(
1099            "after: XREAD cursor must be a concrete entry id; pass '0-0' to start from the beginning"
1100                .to_owned(),
1101        )));
1102    }
1103
1104    let eid = parse_execution_id(&id)?;
1105    let attempt_index = AttemptIndex::new(idx);
1106    let result = server
1107        .tail_attempt_stream(
1108            &eid,
1109            attempt_index,
1110            params.after.to_wire(),
1111            params.block_ms,
1112            params.limit,
1113        )
1114        .await?;
1115    Ok(Json(result.into()))
1116}
1117
1118// ── Flow handlers ──
1119
1120async fn create_flow(
1121    State(server): State<Arc<Server>>,
1122    AppJson(args): AppJson<CreateFlowArgs>,
1123) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
1124    let result = server.create_flow(&args).await?;
1125    let status = match &result {
1126        CreateFlowResult::Created { .. } => StatusCode::CREATED,
1127        CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
1128    };
1129    Ok((status, Json(result)))
1130}
1131
1132async fn add_execution_to_flow(
1133    State(server): State<Arc<Server>>,
1134    Path(id): Path<String>,
1135    AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
1136) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
1137    let path_fid = parse_flow_id(&id)?;
1138    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1139    args.flow_id = path_fid;
1140    let result = server.add_execution_to_flow(&args).await?;
1141    let status = match &result {
1142        AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
1143        AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
1144    };
1145    Ok((status, Json(result)))
1146}
1147
1148/// Cancel a flow.
1149///
1150/// By default the handler returns immediately with
1151/// [`CancelFlowResult::CancellationScheduled`] (or `Cancelled` for flows
1152/// with no members / non-cancel_all policies), and the individual member
1153/// execution cancellations run in a background task on the server.
1154/// Clients can track per-member progress by polling
1155/// `GET /v1/executions/{id}/state` for each id in `member_execution_ids`.
1156///
1157/// Pass `?wait=true` to run the dispatch loop inline; the handler will not
1158/// return until every member has been cancelled. Useful for tests and
1159/// callers that need synchronous completion.
1160async fn cancel_flow(
1161    State(server): State<Arc<Server>>,
1162    Path(id): Path<String>,
1163    Query(params): Query<HashMap<String, String>>,
1164    AppJson(mut args): AppJson<CancelFlowArgs>,
1165) -> Result<Json<CancelFlowResult>, ApiError> {
1166    let path_fid = parse_flow_id(&id)?;
1167    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1168    args.flow_id = path_fid;
1169    let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
1170    let result = if wait {
1171        server.cancel_flow_wait(&args).await?
1172    } else {
1173        server.cancel_flow(&args).await?
1174    };
1175    Ok(Json(result))
1176}
1177
1178async fn stage_dependency_edge(
1179    State(server): State<Arc<Server>>,
1180    Path(id): Path<String>,
1181    AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
1182) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
1183    let path_fid = parse_flow_id(&id)?;
1184    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1185    args.flow_id = path_fid;
1186    let result = server.stage_dependency_edge(&args).await?;
1187    Ok((StatusCode::CREATED, Json(result)))
1188}
1189
1190async fn apply_dependency_to_child(
1191    State(server): State<Arc<Server>>,
1192    Path(id): Path<String>,
1193    AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
1194) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
1195    let path_fid = parse_flow_id(&id)?;
1196    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1197    args.flow_id = path_fid;
1198    Ok(Json(server.apply_dependency_to_child(&args).await?))
1199}
1200
1201// ── Budget / Quota handlers ──
1202
1203async fn create_budget(
1204    State(server): State<Arc<Server>>,
1205    AppJson(args): AppJson<CreateBudgetArgs>,
1206) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
1207    let result = server.create_budget(&args).await?;
1208    let status = match &result {
1209        CreateBudgetResult::Created { .. } => StatusCode::CREATED,
1210        CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
1211    };
1212    Ok((status, Json(result)))
1213}
1214
1215async fn get_budget_status(
1216    State(server): State<Arc<Server>>,
1217    Path(id): Path<String>,
1218) -> Result<Json<BudgetStatus>, ApiError> {
1219    let bid = parse_budget_id(&id)?;
1220    Ok(Json(server.get_budget_status(&bid).await?))
1221}
1222
1223#[derive(Deserialize)]
1224struct ReportUsageBody {
1225    dimensions: HashMap<String, u64>,
1226    now: ff_core::types::TimestampMs,
1227    #[serde(default)]
1228    dedup_key: Option<String>,
1229}
1230
1231async fn report_usage(
1232    State(server): State<Arc<Server>>,
1233    Path(id): Path<String>,
1234    AppJson(body): AppJson<ReportUsageBody>,
1235) -> Result<Json<ReportUsageResult>, ApiError> {
1236    let bid = parse_budget_id(&id)?;
1237    let dims: Vec<String> = body.dimensions.keys().cloned().collect();
1238    let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
1239    let args = ReportUsageArgs {
1240        dimensions: dims,
1241        deltas,
1242        now: body.now,
1243        dedup_key: body.dedup_key,
1244    };
1245    Ok(Json(server.report_usage(&bid, &args).await?))
1246}
1247
1248async fn reset_budget(
1249    State(server): State<Arc<Server>>,
1250    Path(id): Path<String>,
1251) -> Result<Json<ResetBudgetResult>, ApiError> {
1252    let bid = parse_budget_id(&id)?;
1253    Ok(Json(server.reset_budget(&bid).await?))
1254}
1255
1256async fn create_quota_policy(
1257    State(server): State<Arc<Server>>,
1258    AppJson(args): AppJson<CreateQuotaPolicyArgs>,
1259) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
1260    let result = server.create_quota_policy(&args).await?;
1261    let status = match &result {
1262        CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
1263        CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
1264    };
1265    Ok((status, Json(result)))
1266}
1267
1268// ── Health check ──
1269
1270#[derive(Serialize)]
1271struct HealthResponse {
1272    status: &'static str,
1273}
1274
1275async fn healthz(
1276    State(server): State<Arc<Server>>,
1277) -> Result<Json<HealthResponse>, ApiError> {
1278    let _: String = server
1279        .client()
1280        .cmd("PING")
1281        .execute()
1282        .await
1283        .map_err(|e| ApiError(ServerError::ValkeyContext { source: e, context: "healthz PING".into() }))?;
1284    Ok(Json(HealthResponse { status: "ok" }))
1285}
1286
1287// ── ID parsing helpers ──
1288
1289/// Return 400 if the body contains an ID that differs from the path ID.
1290fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1291    if body_id != path_id {
1292        return Err(ApiError(ServerError::InvalidInput(format!(
1293            "path {id_name} does not match body {id_name}"
1294        ))));
1295    }
1296    Ok(())
1297}
1298
1299fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1300    ExecutionId::parse(s)
1301        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1302}
1303
1304fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1305    FlowId::parse(s)
1306        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1307}
1308
1309fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1310    BudgetId::parse(s)
1311        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1312}
1313
1314#[cfg(test)]
1315mod cors_tests {
1316    //! CORS + auth preflight tests (#66, #71).
1317    //!
1318    //! Exercised via `tower::ServiceExt::oneshot` against a minimal router
1319    //! that applies only the CORS layer to a noop `/healthz`-style route.
1320    //! This mirrors what browsers actually see (the `CorsLayer` is terminal
1321    //! for OPTIONS preflights) without needing a live `Server`.
1322    use super::*;
1323    use axum::body::Body;
1324    use axum::http::{Request, StatusCode};
1325    use axum::routing::get;
1326    use tower::ServiceExt;
1327
1328    fn app_with_cors(origins: &[String], auth_enabled: bool) -> Router {
1329        let cors = build_cors_layer(origins, auth_enabled)
1330            .expect("build_cors_layer succeeds for valid inputs");
1331        Router::new().route("/noop", get(|| async { "ok" })).layer(cors)
1332    }
1333
1334    #[test]
1335    fn all_origins_invalid_returns_config_error_instead_of_permissive() {
1336        // #71: invalid HeaderValue (contains a control character) should
1337        // fail closed rather than falling back to permissive CORS.
1338        //
1339        // Note: `HeaderValue` parsing is intentionally lax — it accepts
1340        // almost any printable ASCII, so "not a url" parses fine. We use
1341        // a string with an embedded NUL byte to exercise the actual parse
1342        // failure path.
1343        let err = build_cors_layer(&["bad\0origin".to_owned()], false).unwrap_err();
1344        let ConfigError::InvalidValue { var, message } = err;
1345        assert_eq!(var, "FF_CORS_ORIGINS");
1346        assert!(
1347            message.contains("all configured origins failed to parse"),
1348            "message was: {message}"
1349        );
1350    }
1351
1352    #[test]
1353    fn wildcard_still_returns_permissive() {
1354        // Explicit `*` is still the documented permissive path.
1355        let layer = build_cors_layer(&["*".to_owned()], false);
1356        assert!(layer.is_ok());
1357    }
1358
1359    #[test]
1360    fn empty_origins_returns_empty_allowlist_ok() {
1361        // Empty list (no origins configured) produces an empty allowlist;
1362        // no browser origin matches. This is NOT the fail-open case — that
1363        // only happens when origins are configured but all parse-invalid.
1364        let layer = build_cors_layer(&[], false);
1365        assert!(layer.is_ok());
1366    }
1367
1368    #[test]
1369    fn mixed_valid_and_invalid_keeps_valid_entries() {
1370        // A partial typo should not fail the whole boot — we log a warning
1371        // and keep the parsable entries.
1372        let layer = build_cors_layer(
1373            &["https://ok.example.com".to_owned(), "bad\0origin".to_owned()],
1374            false,
1375        );
1376        assert!(layer.is_ok());
1377    }
1378
1379    #[tokio::test]
1380    async fn preflight_allows_authorization_when_auth_enabled() {
1381        // #66: with token auth on, browsers need `Authorization` in the
1382        // preflight's `Access-Control-Allow-Headers` response.
1383        let app = app_with_cors(&["https://client.example.com".to_owned()], true);
1384
1385        let req = Request::builder()
1386            .method("OPTIONS")
1387            .uri("/noop")
1388            .header("origin", "https://client.example.com")
1389            .header("access-control-request-method", "GET")
1390            .header("access-control-request-headers", "authorization,content-type")
1391            .body(Body::empty())
1392            .unwrap();
1393
1394        let resp = app.oneshot(req).await.unwrap();
1395        assert_eq!(resp.status(), StatusCode::OK);
1396
1397        let allow_headers = resp
1398            .headers()
1399            .get("access-control-allow-headers")
1400            .expect("access-control-allow-headers present")
1401            .to_str()
1402            .unwrap()
1403            .to_ascii_lowercase();
1404        assert!(
1405            allow_headers.contains("authorization"),
1406            "expected `authorization` in Access-Control-Allow-Headers, got: {allow_headers}"
1407        );
1408        assert!(
1409            allow_headers.contains("content-type"),
1410            "expected `content-type` in Access-Control-Allow-Headers, got: {allow_headers}"
1411        );
1412    }
1413
1414    #[tokio::test]
1415    async fn preflight_omits_authorization_when_auth_disabled() {
1416        // Negative case: without auth, Authorization is not needed and
1417        // should not be advertised (principle of least privilege).
1418        let app = app_with_cors(&["https://client.example.com".to_owned()], false);
1419
1420        let req = Request::builder()
1421            .method("OPTIONS")
1422            .uri("/noop")
1423            .header("origin", "https://client.example.com")
1424            .header("access-control-request-method", "GET")
1425            .header("access-control-request-headers", "content-type")
1426            .body(Body::empty())
1427            .unwrap();
1428
1429        let resp = app.oneshot(req).await.unwrap();
1430        assert_eq!(resp.status(), StatusCode::OK);
1431
1432        let allow_headers = resp
1433            .headers()
1434            .get("access-control-allow-headers")
1435            .expect("access-control-allow-headers present")
1436            .to_str()
1437            .unwrap()
1438            .to_ascii_lowercase();
1439        assert!(
1440            !allow_headers.contains("authorization"),
1441            "Authorization should not be advertised when auth is off; got: {allow_headers}"
1442        );
1443        assert!(allow_headers.contains("content-type"));
1444    }
1445}
1446
1447#[cfg(test)]
1448mod claim_grant_dto_tests {
1449    //! Wire-shape tests for `ClaimGrantDto` (issue #91).
1450    //!
1451    //! Pins the exact JSON shape the server emits on the
1452    //! `POST /v1/workers/{id}/claim` 200 response so any drift shows
1453    //! up at compile-adjacent test time rather than on a live SDK.
1454    use super::*;
1455    use ff_core::contracts::ClaimGrant;
1456    use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1457    use ff_core::types::{ExecutionId, FlowId};
1458
1459    fn sample_grant(family: PartitionFamily) -> ClaimGrant {
1460        let config = ff_core::partition::PartitionConfig::default();
1461        let fid = FlowId::new();
1462        let eid = ExecutionId::for_flow(&fid, &config);
1463        let p = Partition { family, index: 7 };
1464        ClaimGrant {
1465            execution_id: eid,
1466            partition_key: PartitionKey::from(&p),
1467            grant_key: "ff:exec:{fp:7}:deadbeef:claim_grant".to_owned(),
1468            expires_at_ms: 1_700_000_000_000,
1469        }
1470    }
1471
1472    #[test]
1473    fn claim_grant_dto_emits_opaque_partition_key() {
1474        let dto = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1475        let json = serde_json::to_value(&dto).unwrap();
1476        // `partition_key` serialises transparent: a bare string, not
1477        // an object, not a `partition_family`/`partition_index` pair.
1478        assert_eq!(json["partition_key"], serde_json::json!("{fp:7}"));
1479        assert!(json.get("partition_family").is_none());
1480        assert!(json.get("partition_index").is_none());
1481        assert_eq!(json["expires_at_ms"], serde_json::json!(1_700_000_000_000u64));
1482    }
1483
1484    #[test]
1485    fn claim_grant_dto_collapses_execution_alias_on_wire() {
1486        // RFC-011 §11 alias: Execution-family grants emit the same
1487        // `{fp:N}` hash tag as Flow. This test pins the wire-level
1488        // equivalence.
1489        let dto_flow = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1490        let dto_exec = ClaimGrantDto::from(sample_grant(PartitionFamily::Execution));
1491        let jf = serde_json::to_value(&dto_flow).unwrap();
1492        let je = serde_json::to_value(&dto_exec).unwrap();
1493        assert_eq!(jf["partition_key"], je["partition_key"]);
1494    }
1495}