Skip to main content

ff_server/
api.rs

1//! REST API layer — thin axum handlers over Server methods.
2
3use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8    extract::{Path, Query, Request, State},
9    http::StatusCode,
10    middleware,
11    response::{IntoResponse, Response},
12    routing::{get, post, put},
13    Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::{HeaderName, Method};
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::server::{Server, ServerError};
25
26// ── Custom JSON extractor (uniform JSON error on malformed body) ──
27
28struct AppJson<T>(T);
29
30impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
31where
32    T: serde::de::DeserializeOwned + Send,
33    S: Send + Sync,
34{
35    type Rejection = Response;
36
37    async fn from_request(
38        req: axum::extract::Request,
39        state: &S,
40    ) -> Result<Self, Self::Rejection> {
41        match Json::<T>::from_request(req, state).await {
42            Ok(Json(value)) => Ok(AppJson(value)),
43            Err(rejection) => {
44                let status = rejection.status();
45                tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
46                let body = ErrorBody::plain(format!(
47                    "invalid JSON: {}",
48                    status.canonical_reason().unwrap_or("bad request"),
49                ));
50                Err((status, Json(body)).into_response())
51            }
52        }
53    }
54}
55
56// ── Error handling ──
57
58struct ApiError(ServerError);
59
60impl From<ServerError> for ApiError {
61    fn from(e: ServerError) -> Self {
62        Self(e)
63    }
64}
65
66/// HTTP error body. `kind`/`retryable` are populated for 500s backed by a
67/// `ferriskey::Error` so HTTP clients (e.g. cairn-fabric) can make retry
68/// decisions without parsing the `error` string.
69#[derive(Serialize)]
70struct ErrorBody {
71    error: String,
72    #[serde(skip_serializing_if = "Option::is_none")]
73    kind: Option<String>,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    retryable: Option<bool>,
76}
77
78impl ErrorBody {
79    fn plain(error: String) -> Self {
80        Self { error, kind: None, retryable: None }
81    }
82}
83
84impl IntoResponse for ApiError {
85    fn into_response(self) -> Response {
86        use ff_script::retry::kind_to_stable_str;
87
88        let (status, body) = match &self.0 {
89            ServerError::NotFound(msg) => {
90                (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
91            }
92            ServerError::InvalidInput(msg) => {
93                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
94            }
95            ServerError::OperationFailed(msg) => {
96                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
97            }
98            ServerError::ConcurrencyLimitExceeded(source, max) => (
99                StatusCode::TOO_MANY_REQUESTS,
100                ErrorBody {
101                    error: format!(
102                        "too many concurrent {source} calls (server max: {max}); retry with backoff"
103                    ),
104                    kind: None,
105                    retryable: Some(true),
106                },
107            ),
108            ServerError::Valkey(e) => {
109                let kind_str = kind_to_stable_str(e.kind());
110                tracing::error!(
111                    kind = kind_str,
112                    code = e.code().unwrap_or(""),
113                    detail = e.detail().unwrap_or(""),
114                    "valkey error"
115                );
116                (
117                    StatusCode::INTERNAL_SERVER_ERROR,
118                    ErrorBody {
119                        error: self.0.to_string(),
120                        kind: Some(kind_str.to_owned()),
121                        retryable: Some(self.0.is_retryable()),
122                    },
123                )
124            }
125            ServerError::ValkeyContext { source, context } => {
126                let kind_str = kind_to_stable_str(source.kind());
127                tracing::error!(
128                    kind = kind_str,
129                    code = source.code().unwrap_or(""),
130                    detail = source.detail().unwrap_or(""),
131                    context = %context,
132                    "valkey error"
133                );
134                (
135                    StatusCode::INTERNAL_SERVER_ERROR,
136                    ErrorBody {
137                        error: self.0.to_string(),
138                        kind: Some(kind_str.to_owned()),
139                        retryable: Some(self.0.is_retryable()),
140                    },
141                )
142            }
143            ServerError::LibraryLoad(load_err) => {
144                let kind_str = load_err.valkey_kind().map(kind_to_stable_str);
145                tracing::error!(
146                    kind = kind_str.unwrap_or(""),
147                    error = %load_err,
148                    "library load failure"
149                );
150                (
151                    StatusCode::INTERNAL_SERVER_ERROR,
152                    ErrorBody {
153                        error: format!("library load: {load_err}"),
154                        kind: kind_str.map(str::to_owned),
155                        retryable: Some(self.0.is_retryable()),
156                    },
157                )
158            }
159            // Script / Config / PartitionMismatch — developer or deployment
160            // errors. No Valkey ErrorKind to surface, but retryable=false is
161            // informative: a client-side retry won't change the outcome.
162            other => (
163                StatusCode::INTERNAL_SERVER_ERROR,
164                ErrorBody {
165                    error: other.to_string(),
166                    kind: None,
167                    retryable: Some(false),
168                },
169            ),
170        };
171        (status, Json(body)).into_response()
172    }
173}
174
175// ── Router ──
176
177pub fn router(server: Arc<Server>, cors_origins: &[String], api_token: Option<String>) -> Router {
178    let cors = build_cors_layer(cors_origins);
179
180    let mut app = Router::new()
181        // Executions
182        .route("/v1/executions", get(list_executions).post(create_execution))
183        .route("/v1/executions/{id}", get(get_execution))
184        .route("/v1/executions/{id}/state", get(get_execution_state))
185        .route(
186            "/v1/executions/{id}/pending-waitpoints",
187            get(list_pending_waitpoints),
188        )
189        .route("/v1/executions/{id}/result", get(get_execution_result))
190        .route("/v1/executions/{id}/cancel", post(cancel_execution))
191        .route("/v1/executions/{id}/signal", post(deliver_signal))
192        .route("/v1/executions/{id}/priority", put(change_priority))
193        .route("/v1/executions/{id}/replay", post(replay_execution))
194        .route("/v1/executions/{id}/revoke-lease", post(revoke_lease))
195        // Scheduler-routed claim (Batch C item 2). Worker POSTs lane +
196        // identity + capabilities; server runs budget/quota/capability
197        // admission via ff-scheduler and returns a ClaimGrant on
198        // success (204 No Content when no eligible execution).
199        .route("/v1/workers/{worker_id}/claim", post(claim_for_worker))
200        // Stream read + tail (RFC-006 #2)
201        .route(
202            "/v1/executions/{id}/attempts/{idx}/stream",
203            get(read_attempt_stream),
204        )
205        .route(
206            "/v1/executions/{id}/attempts/{idx}/stream/tail",
207            get(tail_attempt_stream),
208        )
209        // Flows
210        .route("/v1/flows", post(create_flow))
211        .route("/v1/flows/{id}/members", post(add_execution_to_flow))
212        .route("/v1/flows/{id}/cancel", post(cancel_flow))
213        .route("/v1/flows/{id}/edges", post(stage_dependency_edge))
214        .route("/v1/flows/{id}/edges/apply", post(apply_dependency_to_child))
215        // Budgets
216        .route("/v1/budgets", post(create_budget))
217        .route("/v1/budgets/{id}", get(get_budget_status))
218        .route("/v1/budgets/{id}/usage", post(report_usage))
219        .route("/v1/budgets/{id}/reset", post(reset_budget))
220        // Quotas
221        .route("/v1/quotas", post(create_quota_policy))
222        // Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security)
223        .route("/v1/admin/rotate-waitpoint-secret", post(rotate_waitpoint_secret))
224        // Health (always unauthenticated)
225        .route("/healthz", get(healthz));
226
227    if let Some(token) = api_token {
228        let token = Arc::new(token);
229        app = app.layer(middleware::from_fn(move |req, next| {
230            let token = token.clone();
231            auth_middleware(token, req, next)
232        }));
233    }
234
235    app.layer(TraceLayer::new_for_http())
236        .layer(cors)
237        .with_state(server)
238}
239
240async fn auth_middleware(
241    token: Arc<String>,
242    req: Request,
243    next: middleware::Next,
244) -> Response {
245    if req.uri().path() == "/healthz" {
246        return next.run(req).await;
247    }
248
249    let auth_header = req
250        .headers()
251        .get("authorization")
252        .and_then(|v| v.to_str().ok());
253
254    let authorized = auth_header
255        .and_then(|v| v.strip_prefix("Bearer "))
256        .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
257
258    if authorized {
259        next.run(req).await
260    } else {
261        (
262            StatusCode::UNAUTHORIZED,
263            Json(ErrorBody::plain(
264                "missing or invalid Authorization header".to_owned(),
265            )),
266        )
267            .into_response()
268    }
269}
270
271fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
272    if a.len() != b.len() {
273        return false;
274    }
275    let mut diff = 0u8;
276    for (x, y) in a.iter().zip(b.iter()) {
277        diff |= x ^ y;
278    }
279    diff == 0
280}
281
282fn build_cors_layer(origins: &[String]) -> CorsLayer {
283    if origins.iter().any(|o| o == "*") {
284        return CorsLayer::permissive();
285    }
286    let parsed: Vec<_> = origins
287        .iter()
288        .filter_map(|o| o.parse().ok())
289        .collect();
290    if parsed.is_empty() && !origins.is_empty() {
291        tracing::warn!(
292            configured = ?origins,
293            "all configured CORS origins failed to parse, falling back to permissive"
294        );
295        return CorsLayer::permissive();
296    }
297    CorsLayer::new()
298        .allow_origin(AllowOrigin::list(parsed))
299        .allow_methods([Method::GET, Method::POST, Method::PUT])
300        .allow_headers([HeaderName::from_static("content-type")])
301}
302
303// ── Execution handlers ──
304
305#[derive(Deserialize)]
306struct ListExecutionsParams {
307    partition: u16,
308    #[serde(default = "default_lane")]
309    lane: String,
310    #[serde(default = "default_state_filter")]
311    state: String,
312    #[serde(default = "default_limit")]
313    limit: u64,
314    #[serde(default)]
315    offset: u64,
316}
317
318fn default_lane() -> String { "default".to_owned() }
319fn default_state_filter() -> String { "eligible".to_owned() }
320fn default_limit() -> u64 { 50 }
321
322async fn list_executions(
323    State(server): State<Arc<Server>>,
324    Query(params): Query<ListExecutionsParams>,
325) -> Result<Json<ListExecutionsResult>, ApiError> {
326    let lane = ff_core::types::LaneId::try_new(params.lane.clone())
327        .map_err(|e| ApiError::from(ServerError::InvalidInput(format!("invalid lane: {e}"))))?;
328    let limit = params.limit.min(1000);
329    let result = server
330        .list_executions(params.partition, &lane, &params.state, params.offset, limit)
331        .await?;
332    Ok(Json(result))
333}
334
335async fn create_execution(
336    State(server): State<Arc<Server>>,
337    AppJson(args): AppJson<CreateExecutionArgs>,
338) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
339    let result = server.create_execution(&args).await?;
340    let status = match &result {
341        CreateExecutionResult::Created { .. } => StatusCode::CREATED,
342        CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
343    };
344    Ok((status, Json(result)))
345}
346
347async fn get_execution(
348    State(server): State<Arc<Server>>,
349    Path(id): Path<String>,
350) -> Result<Json<ExecutionInfo>, ApiError> {
351    let eid = parse_execution_id(&id)?;
352    Ok(Json(server.get_execution(&eid).await?))
353}
354
355async fn get_execution_state(
356    State(server): State<Arc<Server>>,
357    Path(id): Path<String>,
358) -> Result<Json<PublicState>, ApiError> {
359    let eid = parse_execution_id(&id)?;
360    Ok(Json(server.get_execution_state(&eid).await?))
361}
362
363/// Returns the actionable (`pending`/`active`) waitpoints for an
364/// execution, including the HMAC `waitpoint_token` required to deliver
365/// signals. Human reviewers use this to look up the token originally
366/// returned only to the suspending worker's `SuspendOutcome`.
367///
368/// SECURITY: `waitpoint_token` is a bearer credential for signal
369/// delivery; leaking it lets a third party forge authority to resume or
370/// influence the execution. Gate the endpoint behind `FF_API_TOKEN` in
371/// any deployment reachable from untrusted networks. The auth middleware
372/// only mounts when `FF_API_TOKEN` is set; this endpoint is
373/// unauthenticated without it, and the server logs a loud warning at
374/// startup so operators notice.
375async fn list_pending_waitpoints(
376    State(server): State<Arc<Server>>,
377    Path(id): Path<String>,
378) -> Result<Json<Vec<PendingWaitpointInfo>>, ApiError> {
379    let eid = parse_execution_id(&id)?;
380    Ok(Json(server.list_pending_waitpoints(&eid).await?))
381}
382
383/// Returns the raw result payload bytes written by the worker's
384/// `ff_complete_execution` call. 404 when the execution has no stored
385/// result (missing entirely, still in-flight, or trimmed by retention —
386/// see below).
387///
388/// # Ordering (required)
389///
390/// Callers MUST poll `GET /v1/executions/{id}/state` until it returns
391/// `completed` before fetching `/result`. Early polls may return 404
392/// because completion writes `public_state = completed` and the result
393/// `SET` in the same atomic Lua; in the normal path the window is
394/// effectively zero, but network round-trip ordering between a state
395/// poll and a result fetch can make the result appear briefly absent
396/// during replay (`ff_replay_execution`).
397///
398/// # Retention / 404 after completed
399///
400/// `get_execution_state == completed` is authoritative for completion.
401/// This endpoint additionally depends on the result bytes not having
402/// been trimmed — v1 sets no retention policy, so
403/// `state = completed` should always pair with a 200 here. Any
404/// future retention-policy feature must call this contract out in its
405/// own docs.
406///
407/// CONTENT-TYPE: `application/octet-stream`. The server is payload-format
408/// agnostic — workers choose the encoding via the SDK's `complete(bytes)`
409/// call, and callers must know the contract. The media-pipeline example
410/// uses JSON by convention (`serde_json::to_vec(&Result)`); adapters can
411/// pick any binary format.
412///
413/// SECURITY: completion payloads can contain PII (e.g. LLM summaries of
414/// user audio). Treat this endpoint like any other read — gate behind
415/// `FF_API_TOKEN` in any deployment reachable from untrusted networks.
416/// The auth middleware only mounts when `FF_API_TOKEN` is set.
417async fn get_execution_result(
418    State(server): State<Arc<Server>>,
419    Path(id): Path<String>,
420) -> Result<Response, ApiError> {
421    let eid = parse_execution_id(&id)?;
422    match server.get_execution_result(&eid).await? {
423        Some(bytes) => Ok((
424            StatusCode::OK,
425            [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
426            bytes,
427        )
428            .into_response()),
429        None => Err(ApiError(ServerError::NotFound(format!(
430            "execution result not found: {eid}"
431        )))),
432    }
433}
434
435async fn cancel_execution(
436    State(server): State<Arc<Server>>,
437    Path(id): Path<String>,
438    AppJson(mut args): AppJson<CancelExecutionArgs>,
439) -> Result<Json<CancelExecutionResult>, ApiError> {
440    let path_eid = parse_execution_id(&id)?;
441    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
442    args.execution_id = path_eid;
443    Ok(Json(server.cancel_execution(&args).await?))
444}
445
446async fn deliver_signal(
447    State(server): State<Arc<Server>>,
448    Path(id): Path<String>,
449    AppJson(mut args): AppJson<DeliverSignalArgs>,
450) -> Result<Json<DeliverSignalResult>, ApiError> {
451    let path_eid = parse_execution_id(&id)?;
452    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
453    args.execution_id = path_eid;
454    Ok(Json(server.deliver_signal(&args).await?))
455}
456
457// ── Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security) ──
458
459#[derive(Deserialize)]
460struct RotateWaitpointSecretBody {
461    new_kid: String,
462    /// Hex-encoded new secret. Even-length, 0-9a-fA-F.
463    new_secret_hex: String,
464}
465
466/// Hard ceiling on how long the rotate endpoint runs before the HTTP
467/// handler bails. Rotation touches every execution partition (up to 256)
468/// with HGET+HMGET+HDEL+HSET per partition; 6 round-trips × 30ms cross-AZ
469/// × 256 partitions ≈ 46s worst-case. The internal SETNX lock is 10s TTL
470/// per partition, so 120s gives ample margin for contention + slow RTTs
471/// while staying below common LB idle timeouts (ALB 60s default, but
472/// typically bumped to 120s+ for admin endpoints).
473///
474/// On timeout: returns HTTP 504 immediately. Valkey-side work may still
475/// finish (the per-partition locks and HSETs are already in flight). The
476/// operator observes a 504 and retries; retry is SAFE — rotation is
477/// idempotent per-partition (same new_kid + same secret → no-op on
478/// already-rotated partitions).
479const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
480
481async fn rotate_waitpoint_secret(
482    State(server): State<Arc<Server>>,
483    AppJson(body): AppJson<RotateWaitpointSecretBody>,
484) -> Result<Response, ApiError> {
485    // Cap the whole endpoint end-to-end. If this trips, the caller's
486    // retry is SAFE — per-partition rotation is idempotent on the same
487    // (new_kid, secret_hex) and the per-partition SETNX lock prevents
488    // double-rotation under concurrent retries.
489    let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
490    let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
491        Ok(r) => r?,
492        Err(_) => {
493            tracing::error!(
494                target: "audit",
495                new_kid = %body.new_kid,
496                timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
497                "waitpoint_hmac_rotation_timeout_http_504"
498            );
499            let body = ErrorBody::plain(format!(
500                "rotation exceeded {}s server-side timeout; retry is safe \
501                 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
502                ROTATE_HTTP_TIMEOUT.as_secs()
503            ));
504            return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
505        }
506    };
507    // Operator action log at audit target (per-partition detail logged inside
508    // rotate_waitpoint_secret). Returns 400 only on actionable fault states.
509    //
510    // Two distinct rotated==0 cases:
511    //   - failed.is_empty() → no partitions at all (num_flow_partitions == 0;
512    //     env_u16_positive rejects this at boot so this is mostly dead code
513    //     for library/Default callers).
514    //   - !failed.is_empty() → every partition attempt raised a real error.
515    //     Operator investigates Valkey/auth/cluster health before retrying.
516    if result.rotated == 0 && result.failed.is_empty() {
517        return Err(ApiError::from(ServerError::OperationFailed(
518            "rotation had no partitions to operate on \
519             (num_flow_partitions is 0 — server misconfigured)"
520                .to_owned(),
521        )));
522    }
523    if result.rotated == 0 && !result.failed.is_empty() {
524        return Err(ApiError::from(ServerError::OperationFailed(
525            "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
526        )));
527    }
528    Ok(Json(result).into_response())
529}
530
531#[derive(Deserialize)]
532struct ChangePriorityBody {
533    new_priority: i32,
534}
535
536async fn change_priority(
537    State(server): State<Arc<Server>>,
538    Path(id): Path<String>,
539    AppJson(body): AppJson<ChangePriorityBody>,
540) -> Result<Json<ChangePriorityResult>, ApiError> {
541    let eid = parse_execution_id(&id)?;
542    Ok(Json(server.change_priority(&eid, body.new_priority).await?))
543}
544
545async fn replay_execution(
546    State(server): State<Arc<Server>>,
547    Path(id): Path<String>,
548) -> Result<Json<ReplayExecutionResult>, ApiError> {
549    let eid = parse_execution_id(&id)?;
550    Ok(Json(server.replay_execution(&eid).await?))
551}
552
553async fn revoke_lease(
554    State(server): State<Arc<Server>>,
555    Path(id): Path<String>,
556) -> Result<Json<RevokeLeaseResult>, ApiError> {
557    let eid = parse_execution_id(&id)?;
558    Ok(Json(server.revoke_lease(&eid).await?))
559}
560
561// ── Scheduler-routed claim (Batch C item 2 PR-B) ──
562//
563// The server exposes the scheduler's `claim_for_worker` cycle via
564// HTTP so ff-sdk workers can acquire claim grants without enabling
565// the `direct-valkey-claim` feature. The request body carries lane +
566// identity + capabilities; the server returns a serialized
567// `ClaimGrant` (or 204 No Content when no eligible execution exists).
568
569/// Request body for `POST /v1/workers/{worker_id}/claim`.
570#[derive(Deserialize)]
571struct ClaimForWorkerBody {
572    lane_id: String,
573    worker_instance_id: String,
574    /// Capability tokens this worker advertises. Sorted + validated
575    /// on the scheduler side; any non-printable/CSV-breaking token
576    /// surfaces as 400.
577    #[serde(default)]
578    capabilities: Vec<String>,
579    /// Grant TTL in milliseconds. Bounded so a worker can't request a
580    /// multi-hour grant and squat the execution.
581    grant_ttl_ms: u64,
582}
583
584/// Wire shape for `ff_core::contracts::ClaimGrant`. Core type is not
585/// serde-derived (it carries a `Partition` with a non-scalar family
586/// enum); keep a DTO here so we own the HTTP shape without touching
587/// the core contract.
588#[derive(Serialize)]
589struct ClaimGrantDto {
590    execution_id: String,
591    partition_family: &'static str,
592    partition_index: u16,
593    grant_key: String,
594    expires_at_ms: u64,
595}
596
597impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
598    fn from(g: ff_core::contracts::ClaimGrant) -> Self {
599        let family = match g.partition.family {
600            ff_core::partition::PartitionFamily::Flow => "flow",
601            ff_core::partition::PartitionFamily::Execution => "execution",
602            ff_core::partition::PartitionFamily::Budget => "budget",
603            ff_core::partition::PartitionFamily::Quota => "quota",
604        };
605        Self {
606            execution_id: g.execution_id.to_string(),
607            partition_family: family,
608            partition_index: g.partition.index,
609            grant_key: g.grant_key,
610            expires_at_ms: g.expires_at_ms,
611        }
612    }
613}
614
615/// Maximum grant TTL accepted via HTTP. Mirrors the scheduler's
616/// internal ceiling so a misconfigured worker can't squat an
617/// execution on a multi-hour grant.
618const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
619
620/// Reject empty / whitespace / non-printable identifiers the way
621/// [`LaneId::try_new`] does for lanes. WorkerId + WorkerInstanceId
622/// feed into scheduler scan jitter + Valkey key construction; silent
623/// acceptance of "" or "w\nork" would either mis-key or mis-hash.
624fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
625    if value.is_empty() {
626        return Err(ApiError(ServerError::InvalidInput(format!(
627            "{field}: must not be empty"
628        ))));
629    }
630    if value.len() > 256 {
631        return Err(ApiError(ServerError::InvalidInput(format!(
632            "{field}: exceeds 256 bytes (got {})",
633            value.len()
634        ))));
635    }
636    if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
637        return Err(ApiError(ServerError::InvalidInput(format!(
638            "{field}: must not contain whitespace or control characters"
639        ))));
640    }
641    Ok(())
642}
643
644async fn claim_for_worker(
645    State(server): State<Arc<Server>>,
646    Path(worker_id): Path<String>,
647    AppJson(body): AppJson<ClaimForWorkerBody>,
648) -> Result<Response, ApiError> {
649    validate_identifier("worker_id", &worker_id)?;
650    validate_identifier("worker_instance_id", &body.worker_instance_id)?;
651    let worker_id = WorkerId::new(worker_id);
652    let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
653    let lane = LaneId::try_new(body.lane_id).map_err(|e| {
654        ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
655    })?;
656    if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
657        return Err(ApiError(ServerError::InvalidInput(format!(
658            "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
659        ))));
660    }
661    let caps: std::collections::BTreeSet<String> =
662        body.capabilities.into_iter().collect();
663
664    match server
665        .claim_for_worker(
666            &lane,
667            &worker_id,
668            &worker_instance_id,
669            &caps,
670            body.grant_ttl_ms,
671        )
672        .await?
673    {
674        Some(grant) => Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response()),
675        None => Ok(StatusCode::NO_CONTENT.into_response()),
676    }
677}
678
679// ── Stream read + tail ──
680
681#[derive(Deserialize)]
682struct ReadStreamParams {
683    #[serde(default = "default_from_id")]
684    from: String,
685    #[serde(default = "default_to_id")]
686    to: String,
687    #[serde(default = "default_read_limit")]
688    limit: u64,
689}
690
691fn default_from_id() -> String { "-".to_owned() }
692fn default_to_id() -> String { "+".to_owned() }
693fn default_read_limit() -> u64 { 100 }
694
695/// Reject malformed `from`/`to`/`after` stream IDs at the REST boundary so
696/// Valkey doesn't have to. Accepts `"-"`, `"+"`, and `<ms>` or `<ms>-<seq>`
697/// decimal IDs as Valkey XRANGE/XREAD does.
698///
699/// Without this, a request like `?from=abc` would round-trip to Valkey,
700/// surface as a script error, and return HTTP 500 — which is wrong for
701/// caller-input validation.
702fn validate_stream_id(s: &str, field: &str, allow_open_markers: bool) -> Result<(), ApiError> {
703    if allow_open_markers && (s == "-" || s == "+") {
704        return Ok(());
705    }
706    // Allowed: `<digits>` or `<digits>-<digits>`.
707    let (ms_part, seq_part) = match s.split_once('-') {
708        Some((ms, seq)) => (ms, Some(seq)),
709        None => (s, None),
710    };
711    let ms_valid = !ms_part.is_empty() && ms_part.chars().all(|c| c.is_ascii_digit());
712    let seq_valid = seq_part
713        .map(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit()))
714        .unwrap_or(true);
715    if ms_valid && seq_valid {
716        Ok(())
717    } else {
718        Err(ApiError(ServerError::InvalidInput(format!(
719            "{field}: invalid stream ID '{s}' (expected '-', '+', '<ms>', or '<ms>-<seq>')"
720        ))))
721    }
722}
723
724#[derive(Serialize)]
725struct ReadStreamResponse {
726    frames: Vec<StreamFrame>,
727    count: usize,
728    /// When set, the producer has closed this stream — consumer should
729    /// stop polling. Absent when the stream is still open (or never
730    /// existed, which is indistinguishable from "still open" at this
731    /// layer).
732    #[serde(skip_serializing_if = "Option::is_none")]
733    closed_at: Option<i64>,
734    /// Reason from the closing writer: `attempt_success`, `attempt_failure`,
735    /// `attempt_cancelled`, `attempt_interrupted`. Absent iff still open.
736    #[serde(skip_serializing_if = "Option::is_none")]
737    closed_reason: Option<String>,
738}
739
740impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
741    fn from(sf: ff_core::contracts::StreamFrames) -> Self {
742        let count = sf.frames.len();
743        Self {
744            frames: sf.frames,
745            count,
746            closed_at: sf.closed_at.map(|t| t.0),
747            closed_reason: sf.closed_reason,
748        }
749    }
750}
751
752/// REST-layer ceiling on `limit` for stream read/tail responses. Lower
753/// than the internal `STREAM_READ_HARD_CAP` (10_000) because an HTTP
754/// response buffers the whole JSON body in memory in axum — a
755/// `10_000 × max_payload_bytes (65_536)` body is ~640MB per call, which
756/// is a DoS vector from a single client. Internal callers using FCALL or
757/// the SDK directly still get the full 10_000 ceiling; REST clients must
758/// paginate through `from`/`to` for larger spans.
759///
760/// v2 candidate: chunked-transfer / SSE when the caller wants > this bound.
761const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
762
763async fn read_attempt_stream(
764    State(server): State<Arc<Server>>,
765    Path((id, idx)): Path<(String, u32)>,
766    Query(params): Query<ReadStreamParams>,
767) -> Result<Json<ReadStreamResponse>, ApiError> {
768    if params.limit == 0 {
769        return Err(ApiError(ServerError::InvalidInput(
770            "limit must be >= 1".to_owned(),
771        )));
772    }
773    if params.limit > REST_STREAM_LIMIT_CEILING {
774        return Err(ApiError(ServerError::InvalidInput(format!(
775            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
776        ))));
777    }
778    validate_stream_id(&params.from, "from", true)?;
779    validate_stream_id(&params.to, "to", true)?;
780
781    let eid = parse_execution_id(&id)?;
782    let attempt_index = AttemptIndex::new(idx);
783    let result = server
784        .read_attempt_stream(&eid, attempt_index, &params.from, &params.to, params.limit)
785        .await?;
786    Ok(Json(result.into()))
787}
788
789#[derive(Deserialize)]
790struct TailStreamParams {
791    #[serde(default = "default_tail_after")]
792    after: String,
793    #[serde(default)]
794    block_ms: u64,
795    #[serde(default = "default_tail_limit")]
796    limit: u64,
797}
798
799fn default_tail_after() -> String { "0-0".to_owned() }
800fn default_tail_limit() -> u64 { 50 }
801
802/// Ceiling on BLOCK duration for the tail endpoint. Kept below common LB
803/// idle timeouts (ALB 60s, nginx 60s, Cloudflare 100s) so the HTTP response
804/// can't be cut mid-block.
805///
806/// Note: ferriskey's client auto-extends its `request_timeout` for XREAD
807/// BLOCK to `block_ms + 500ms`, so a blocking call with the full ceiling
808/// never produces a spurious transport timeout. See
809/// `ff_script::stream_tail` module docs for the exact ferriskey code path.
810const MAX_TAIL_BLOCK_MS: u64 = 30_000;
811
812async fn tail_attempt_stream(
813    State(server): State<Arc<Server>>,
814    Path((id, idx)): Path<(String, u32)>,
815    Query(params): Query<TailStreamParams>,
816) -> Result<Json<ReadStreamResponse>, ApiError> {
817    if params.block_ms > MAX_TAIL_BLOCK_MS {
818        return Err(ApiError(ServerError::InvalidInput(format!(
819            "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
820        ))));
821    }
822    if params.limit == 0 {
823        return Err(ApiError(ServerError::InvalidInput(
824            "limit must be >= 1".to_owned(),
825        )));
826    }
827    if params.limit > REST_STREAM_LIMIT_CEILING {
828        return Err(ApiError(ServerError::InvalidInput(format!(
829            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
830        ))));
831    }
832    // XREAD cursor must be a concrete ID — "-"/"+" are XRANGE-only.
833    validate_stream_id(&params.after, "after", false)?;
834
835    let eid = parse_execution_id(&id)?;
836    let attempt_index = AttemptIndex::new(idx);
837    let result = server
838        .tail_attempt_stream(&eid, attempt_index, &params.after, params.block_ms, params.limit)
839        .await?;
840    Ok(Json(result.into()))
841}
842
843// ── Flow handlers ──
844
845async fn create_flow(
846    State(server): State<Arc<Server>>,
847    AppJson(args): AppJson<CreateFlowArgs>,
848) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
849    let result = server.create_flow(&args).await?;
850    let status = match &result {
851        CreateFlowResult::Created { .. } => StatusCode::CREATED,
852        CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
853    };
854    Ok((status, Json(result)))
855}
856
857async fn add_execution_to_flow(
858    State(server): State<Arc<Server>>,
859    Path(id): Path<String>,
860    AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
861) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
862    let path_fid = parse_flow_id(&id)?;
863    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
864    args.flow_id = path_fid;
865    let result = server.add_execution_to_flow(&args).await?;
866    let status = match &result {
867        AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
868        AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
869    };
870    Ok((status, Json(result)))
871}
872
873/// Cancel a flow.
874///
875/// By default the handler returns immediately with
876/// [`CancelFlowResult::CancellationScheduled`] (or `Cancelled` for flows
877/// with no members / non-cancel_all policies), and the individual member
878/// execution cancellations run in a background task on the server.
879/// Clients can track per-member progress by polling
880/// `GET /v1/executions/{id}/state` for each id in `member_execution_ids`.
881///
882/// Pass `?wait=true` to run the dispatch loop inline; the handler will not
883/// return until every member has been cancelled. Useful for tests and
884/// callers that need synchronous completion.
885async fn cancel_flow(
886    State(server): State<Arc<Server>>,
887    Path(id): Path<String>,
888    Query(params): Query<HashMap<String, String>>,
889    AppJson(mut args): AppJson<CancelFlowArgs>,
890) -> Result<Json<CancelFlowResult>, ApiError> {
891    let path_fid = parse_flow_id(&id)?;
892    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
893    args.flow_id = path_fid;
894    let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
895    let result = if wait {
896        server.cancel_flow_wait(&args).await?
897    } else {
898        server.cancel_flow(&args).await?
899    };
900    Ok(Json(result))
901}
902
903async fn stage_dependency_edge(
904    State(server): State<Arc<Server>>,
905    Path(id): Path<String>,
906    AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
907) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
908    let path_fid = parse_flow_id(&id)?;
909    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
910    args.flow_id = path_fid;
911    let result = server.stage_dependency_edge(&args).await?;
912    Ok((StatusCode::CREATED, Json(result)))
913}
914
915async fn apply_dependency_to_child(
916    State(server): State<Arc<Server>>,
917    Path(id): Path<String>,
918    AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
919) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
920    let path_fid = parse_flow_id(&id)?;
921    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
922    args.flow_id = path_fid;
923    Ok(Json(server.apply_dependency_to_child(&args).await?))
924}
925
926// ── Budget / Quota handlers ──
927
928async fn create_budget(
929    State(server): State<Arc<Server>>,
930    AppJson(args): AppJson<CreateBudgetArgs>,
931) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
932    let result = server.create_budget(&args).await?;
933    let status = match &result {
934        CreateBudgetResult::Created { .. } => StatusCode::CREATED,
935        CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
936    };
937    Ok((status, Json(result)))
938}
939
940async fn get_budget_status(
941    State(server): State<Arc<Server>>,
942    Path(id): Path<String>,
943) -> Result<Json<BudgetStatus>, ApiError> {
944    let bid = parse_budget_id(&id)?;
945    Ok(Json(server.get_budget_status(&bid).await?))
946}
947
948#[derive(Deserialize)]
949struct ReportUsageBody {
950    dimensions: HashMap<String, u64>,
951    now: ff_core::types::TimestampMs,
952    #[serde(default)]
953    dedup_key: Option<String>,
954}
955
956async fn report_usage(
957    State(server): State<Arc<Server>>,
958    Path(id): Path<String>,
959    AppJson(body): AppJson<ReportUsageBody>,
960) -> Result<Json<ReportUsageResult>, ApiError> {
961    let bid = parse_budget_id(&id)?;
962    let dims: Vec<String> = body.dimensions.keys().cloned().collect();
963    let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
964    let args = ReportUsageArgs {
965        dimensions: dims,
966        deltas,
967        now: body.now,
968        dedup_key: body.dedup_key,
969    };
970    Ok(Json(server.report_usage(&bid, &args).await?))
971}
972
973async fn reset_budget(
974    State(server): State<Arc<Server>>,
975    Path(id): Path<String>,
976) -> Result<Json<ResetBudgetResult>, ApiError> {
977    let bid = parse_budget_id(&id)?;
978    Ok(Json(server.reset_budget(&bid).await?))
979}
980
981async fn create_quota_policy(
982    State(server): State<Arc<Server>>,
983    AppJson(args): AppJson<CreateQuotaPolicyArgs>,
984) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
985    let result = server.create_quota_policy(&args).await?;
986    let status = match &result {
987        CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
988        CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
989    };
990    Ok((status, Json(result)))
991}
992
993// ── Health check ──
994
995#[derive(Serialize)]
996struct HealthResponse {
997    status: &'static str,
998}
999
1000async fn healthz(
1001    State(server): State<Arc<Server>>,
1002) -> Result<Json<HealthResponse>, ApiError> {
1003    let _: String = server
1004        .client()
1005        .cmd("PING")
1006        .execute()
1007        .await
1008        .map_err(|e| ApiError(ServerError::ValkeyContext { source: e, context: "healthz PING".into() }))?;
1009    Ok(Json(HealthResponse { status: "ok" }))
1010}
1011
1012// ── ID parsing helpers ──
1013
1014/// Return 400 if the body contains an ID that differs from the path ID.
1015fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1016    if body_id != path_id {
1017        return Err(ApiError(ServerError::InvalidInput(format!(
1018            "path {id_name} does not match body {id_name}"
1019        ))));
1020    }
1021    Ok(())
1022}
1023
1024fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1025    ExecutionId::parse(s)
1026        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1027}
1028
1029fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1030    FlowId::parse(s)
1031        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1032}
1033
1034fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1035    BudgetId::parse(s)
1036        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1037}