Skip to main content

ff_server/
api.rs

1//! REST API layer — thin axum handlers over Server methods.
2
3use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8    extract::{Path, Query, Request, State},
9    http::StatusCode,
10    middleware,
11    response::{IntoResponse, Response},
12    routing::{get, post, put},
13    Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::{HeaderName, Method};
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::server::{Server, ServerError};
25
26// ── Custom JSON extractor (uniform JSON error on malformed body) ──
27
28struct AppJson<T>(T);
29
30impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
31where
32    T: serde::de::DeserializeOwned + Send,
33    S: Send + Sync,
34{
35    type Rejection = Response;
36
37    async fn from_request(
38        req: axum::extract::Request,
39        state: &S,
40    ) -> Result<Self, Self::Rejection> {
41        match Json::<T>::from_request(req, state).await {
42            Ok(Json(value)) => Ok(AppJson(value)),
43            Err(rejection) => {
44                let status = rejection.status();
45                tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
46                let body = ErrorBody::plain(format!(
47                    "invalid JSON: {}",
48                    status.canonical_reason().unwrap_or("bad request"),
49                ));
50                Err((status, Json(body)).into_response())
51            }
52        }
53    }
54}
55
56// ── Error handling ──
57
58struct ApiError(ServerError);
59
60impl From<ServerError> for ApiError {
61    fn from(e: ServerError) -> Self {
62        Self(e)
63    }
64}
65
66/// HTTP error body. `kind`/`retryable` are populated for 500s backed by a
67/// `ferriskey::Error` so HTTP clients (e.g. cairn-fabric) can make retry
68/// decisions without parsing the `error` string.
69#[derive(Serialize)]
70struct ErrorBody {
71    error: String,
72    #[serde(skip_serializing_if = "Option::is_none")]
73    kind: Option<String>,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    retryable: Option<bool>,
76}
77
78impl ErrorBody {
79    fn plain(error: String) -> Self {
80        Self { error, kind: None, retryable: None }
81    }
82}
83
84impl IntoResponse for ApiError {
85    fn into_response(self) -> Response {
86        use ff_script::retry::kind_to_stable_str;
87
88        let (status, body) = match &self.0 {
89            ServerError::NotFound(msg) => {
90                (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
91            }
92            ServerError::InvalidInput(msg) => {
93                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
94            }
95            ServerError::OperationFailed(msg) => {
96                (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
97            }
98            ServerError::ConcurrencyLimitExceeded(source, max) => (
99                StatusCode::TOO_MANY_REQUESTS,
100                ErrorBody {
101                    error: format!(
102                        "too many concurrent {source} calls (server max: {max}); retry with backoff"
103                    ),
104                    kind: None,
105                    retryable: Some(true),
106                },
107            ),
108            ServerError::Valkey(e) => {
109                let kind_str = kind_to_stable_str(e.kind());
110                tracing::error!(
111                    kind = kind_str,
112                    code = e.code().unwrap_or(""),
113                    detail = e.detail().unwrap_or(""),
114                    "valkey error"
115                );
116                (
117                    StatusCode::INTERNAL_SERVER_ERROR,
118                    ErrorBody {
119                        error: self.0.to_string(),
120                        kind: Some(kind_str.to_owned()),
121                        retryable: Some(self.0.is_retryable()),
122                    },
123                )
124            }
125            ServerError::ValkeyContext { source, context } => {
126                let kind_str = kind_to_stable_str(source.kind());
127                tracing::error!(
128                    kind = kind_str,
129                    code = source.code().unwrap_or(""),
130                    detail = source.detail().unwrap_or(""),
131                    context = %context,
132                    "valkey error"
133                );
134                (
135                    StatusCode::INTERNAL_SERVER_ERROR,
136                    ErrorBody {
137                        error: self.0.to_string(),
138                        kind: Some(kind_str.to_owned()),
139                        retryable: Some(self.0.is_retryable()),
140                    },
141                )
142            }
143            ServerError::LibraryLoad(load_err) => {
144                let kind_str = load_err.valkey_kind().map(kind_to_stable_str);
145                tracing::error!(
146                    kind = kind_str.unwrap_or(""),
147                    error = %load_err,
148                    "library load failure"
149                );
150                (
151                    StatusCode::INTERNAL_SERVER_ERROR,
152                    ErrorBody {
153                        error: format!("library load: {load_err}"),
154                        kind: kind_str.map(str::to_owned),
155                        retryable: Some(self.0.is_retryable()),
156                    },
157                )
158            }
159            // Script / Config / PartitionMismatch — developer or deployment
160            // errors. No Valkey ErrorKind to surface, but retryable=false is
161            // informative: a client-side retry won't change the outcome.
162            other => (
163                StatusCode::INTERNAL_SERVER_ERROR,
164                ErrorBody {
165                    error: other.to_string(),
166                    kind: None,
167                    retryable: Some(false),
168                },
169            ),
170        };
171        (status, Json(body)).into_response()
172    }
173}
174
175// ── Router ──
176
177pub fn router(server: Arc<Server>, cors_origins: &[String], api_token: Option<String>) -> Router {
178    let cors = build_cors_layer(cors_origins);
179
180    let mut app = Router::new()
181        // Executions
182        .route("/v1/executions", get(list_executions).post(create_execution))
183        .route("/v1/executions/{id}", get(get_execution))
184        .route("/v1/executions/{id}/state", get(get_execution_state))
185        .route(
186            "/v1/executions/{id}/pending-waitpoints",
187            get(list_pending_waitpoints),
188        )
189        .route("/v1/executions/{id}/result", get(get_execution_result))
190        .route("/v1/executions/{id}/cancel", post(cancel_execution))
191        .route("/v1/executions/{id}/signal", post(deliver_signal))
192        .route("/v1/executions/{id}/priority", put(change_priority))
193        .route("/v1/executions/{id}/replay", post(replay_execution))
194        .route("/v1/executions/{id}/revoke-lease", post(revoke_lease))
195        // Scheduler-routed claim (Batch C item 2). Worker POSTs lane +
196        // identity + capabilities; server runs budget/quota/capability
197        // admission via ff-scheduler and returns a ClaimGrant on
198        // success (204 No Content when no eligible execution).
199        .route("/v1/workers/{worker_id}/claim", post(claim_for_worker))
200        // Stream read + tail (RFC-006 #2)
201        .route(
202            "/v1/executions/{id}/attempts/{idx}/stream",
203            get(read_attempt_stream),
204        )
205        .route(
206            "/v1/executions/{id}/attempts/{idx}/stream/tail",
207            get(tail_attempt_stream),
208        )
209        // Flows
210        .route("/v1/flows", post(create_flow))
211        .route("/v1/flows/{id}/members", post(add_execution_to_flow))
212        .route("/v1/flows/{id}/cancel", post(cancel_flow))
213        .route("/v1/flows/{id}/edges", post(stage_dependency_edge))
214        .route("/v1/flows/{id}/edges/apply", post(apply_dependency_to_child))
215        // Budgets
216        .route("/v1/budgets", post(create_budget))
217        .route("/v1/budgets/{id}", get(get_budget_status))
218        .route("/v1/budgets/{id}/usage", post(report_usage))
219        .route("/v1/budgets/{id}/reset", post(reset_budget))
220        // Quotas
221        .route("/v1/quotas", post(create_quota_policy))
222        // Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security)
223        .route("/v1/admin/rotate-waitpoint-secret", post(rotate_waitpoint_secret))
224        // Health (always unauthenticated)
225        .route("/healthz", get(healthz));
226
227    if let Some(token) = api_token {
228        let token = Arc::new(token);
229        app = app.layer(middleware::from_fn(move |req, next| {
230            let token = token.clone();
231            auth_middleware(token, req, next)
232        }));
233    }
234
235    app.layer(TraceLayer::new_for_http())
236        .layer(cors)
237        .with_state(server)
238}
239
240async fn auth_middleware(
241    token: Arc<String>,
242    req: Request,
243    next: middleware::Next,
244) -> Response {
245    if req.uri().path() == "/healthz" {
246        return next.run(req).await;
247    }
248
249    let auth_header = req
250        .headers()
251        .get("authorization")
252        .and_then(|v| v.to_str().ok());
253
254    let authorized = auth_header
255        .and_then(|v| v.strip_prefix("Bearer "))
256        .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
257
258    if authorized {
259        next.run(req).await
260    } else {
261        (
262            StatusCode::UNAUTHORIZED,
263            Json(ErrorBody::plain(
264                "missing or invalid Authorization header".to_owned(),
265            )),
266        )
267            .into_response()
268    }
269}
270
271fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
272    if a.len() != b.len() {
273        return false;
274    }
275    let mut diff = 0u8;
276    for (x, y) in a.iter().zip(b.iter()) {
277        diff |= x ^ y;
278    }
279    diff == 0
280}
281
282fn build_cors_layer(origins: &[String]) -> CorsLayer {
283    if origins.iter().any(|o| o == "*") {
284        return CorsLayer::permissive();
285    }
286    let parsed: Vec<_> = origins
287        .iter()
288        .filter_map(|o| o.parse().ok())
289        .collect();
290    if parsed.is_empty() && !origins.is_empty() {
291        tracing::warn!(
292            configured = ?origins,
293            "all configured CORS origins failed to parse, falling back to permissive"
294        );
295        return CorsLayer::permissive();
296    }
297    CorsLayer::new()
298        .allow_origin(AllowOrigin::list(parsed))
299        .allow_methods([Method::GET, Method::POST, Method::PUT])
300        .allow_headers([HeaderName::from_static("content-type")])
301}
302
303// ── Execution handlers ──
304
305#[derive(Deserialize)]
306struct ListExecutionsParams {
307    partition: u16,
308    #[serde(default = "default_lane")]
309    lane: String,
310    #[serde(default = "default_state_filter")]
311    state: String,
312    #[serde(default = "default_limit")]
313    limit: u64,
314    #[serde(default)]
315    offset: u64,
316}
317
318fn default_lane() -> String { "default".to_owned() }
319fn default_state_filter() -> String { "eligible".to_owned() }
320fn default_limit() -> u64 { 50 }
321
322async fn list_executions(
323    State(server): State<Arc<Server>>,
324    Query(params): Query<ListExecutionsParams>,
325) -> Result<Json<ListExecutionsResult>, ApiError> {
326    let lane = ff_core::types::LaneId::try_new(params.lane.clone())
327        .map_err(|e| ApiError::from(ServerError::InvalidInput(format!("invalid lane: {e}"))))?;
328    let limit = params.limit.min(1000);
329    let result = server
330        .list_executions(params.partition, &lane, &params.state, params.offset, limit)
331        .await?;
332    Ok(Json(result))
333}
334
335async fn create_execution(
336    State(server): State<Arc<Server>>,
337    AppJson(args): AppJson<CreateExecutionArgs>,
338) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
339    let result = server.create_execution(&args).await?;
340    let status = match &result {
341        CreateExecutionResult::Created { .. } => StatusCode::CREATED,
342        CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
343    };
344    Ok((status, Json(result)))
345}
346
347async fn get_execution(
348    State(server): State<Arc<Server>>,
349    Path(id): Path<String>,
350) -> Result<Json<ExecutionInfo>, ApiError> {
351    let eid = parse_execution_id(&id)?;
352    Ok(Json(server.get_execution(&eid).await?))
353}
354
355async fn get_execution_state(
356    State(server): State<Arc<Server>>,
357    Path(id): Path<String>,
358) -> Result<Json<PublicState>, ApiError> {
359    let eid = parse_execution_id(&id)?;
360    Ok(Json(server.get_execution_state(&eid).await?))
361}
362
363/// Returns the actionable (`pending`/`active`) waitpoints for an
364/// execution, including the HMAC `waitpoint_token` required to deliver
365/// signals. Human reviewers use this to look up the token originally
366/// returned only to the suspending worker's `SuspendOutcome`.
367///
368/// SECURITY: `waitpoint_token` is a bearer credential for signal
369/// delivery; leaking it lets a third party forge authority to resume or
370/// influence the execution. Gate the endpoint behind `FF_API_TOKEN` in
371/// any deployment reachable from untrusted networks. The auth middleware
372/// only mounts when `FF_API_TOKEN` is set; this endpoint is
373/// unauthenticated without it, and the server logs a loud warning at
374/// startup so operators notice.
375async fn list_pending_waitpoints(
376    State(server): State<Arc<Server>>,
377    Path(id): Path<String>,
378) -> Result<Json<Vec<PendingWaitpointInfo>>, ApiError> {
379    let eid = parse_execution_id(&id)?;
380    Ok(Json(server.list_pending_waitpoints(&eid).await?))
381}
382
383/// Returns the raw result payload bytes written by the worker's
384/// `ff_complete_execution` call. 404 when the execution has no stored
385/// result (missing entirely, still in-flight, or trimmed by retention —
386/// see below).
387///
388/// # Ordering (required)
389///
390/// Callers MUST poll `GET /v1/executions/{id}/state` until it returns
391/// `completed` before fetching `/result`. Early polls may return 404
392/// because completion writes `public_state = completed` and the result
393/// `SET` in the same atomic Lua; in the normal path the window is
394/// effectively zero, but network round-trip ordering between a state
395/// poll and a result fetch can make the result appear briefly absent
396/// during replay (`ff_replay_execution`).
397///
398/// # Retention / 404 after completed
399///
400/// `get_execution_state == completed` is authoritative for completion.
401/// This endpoint additionally depends on the result bytes not having
402/// been trimmed — v1 sets no retention policy, so
403/// `state = completed` should always pair with a 200 here. Any
404/// future retention-policy feature must call this contract out in its
405/// own docs.
406///
407/// CONTENT-TYPE: `application/octet-stream`. The server is payload-format
408/// agnostic — workers choose the encoding via the SDK's `complete(bytes)`
409/// call, and callers must know the contract. The media-pipeline example
410/// uses JSON by convention (`serde_json::to_vec(&Result)`); adapters can
411/// pick any binary format.
412///
413/// SECURITY: completion payloads can contain PII (e.g. LLM summaries of
414/// user audio). Treat this endpoint like any other read — gate behind
415/// `FF_API_TOKEN` in any deployment reachable from untrusted networks.
416/// The auth middleware only mounts when `FF_API_TOKEN` is set.
417async fn get_execution_result(
418    State(server): State<Arc<Server>>,
419    Path(id): Path<String>,
420) -> Result<Response, ApiError> {
421    let eid = parse_execution_id(&id)?;
422    match server.get_execution_result(&eid).await? {
423        Some(bytes) => Ok((
424            StatusCode::OK,
425            [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
426            bytes,
427        )
428            .into_response()),
429        None => Err(ApiError(ServerError::NotFound(format!(
430            "execution result not found: {eid}"
431        )))),
432    }
433}
434
435async fn cancel_execution(
436    State(server): State<Arc<Server>>,
437    Path(id): Path<String>,
438    AppJson(mut args): AppJson<CancelExecutionArgs>,
439) -> Result<Json<CancelExecutionResult>, ApiError> {
440    let path_eid = parse_execution_id(&id)?;
441    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
442    args.execution_id = path_eid;
443    Ok(Json(server.cancel_execution(&args).await?))
444}
445
446async fn deliver_signal(
447    State(server): State<Arc<Server>>,
448    Path(id): Path<String>,
449    AppJson(mut args): AppJson<DeliverSignalArgs>,
450) -> Result<Json<DeliverSignalResult>, ApiError> {
451    let path_eid = parse_execution_id(&id)?;
452    check_id_match(&path_eid, &args.execution_id, "execution_id")?;
453    args.execution_id = path_eid;
454    Ok(Json(server.deliver_signal(&args).await?))
455}
456
457// ── Admin: rotate waitpoint HMAC secret (RFC-004 §Waitpoint Security) ──
458
459#[derive(Deserialize)]
460struct RotateWaitpointSecretBody {
461    new_kid: String,
462    /// Hex-encoded new secret. Even-length, 0-9a-fA-F.
463    new_secret_hex: String,
464}
465
466/// Hard ceiling on how long the rotate endpoint runs before the HTTP
467/// handler bails. Rotation touches every execution partition (up to 256)
468/// with HGET+HMGET+HDEL+HSET per partition; 6 round-trips × 30ms cross-AZ
469/// × 256 partitions ≈ 46s worst-case. The internal SETNX lock is 10s TTL
470/// per partition, so 120s gives ample margin for contention + slow RTTs
471/// while staying below common LB idle timeouts (ALB 60s default, but
472/// typically bumped to 120s+ for admin endpoints).
473///
474/// On timeout: returns HTTP 504 immediately. Valkey-side work may still
475/// finish (the per-partition locks and HSETs are already in flight). The
476/// operator observes a 504 and retries; retry is SAFE — rotation is
477/// idempotent per-partition (same new_kid + same secret → no-op on
478/// already-rotated partitions).
479const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
480
481async fn rotate_waitpoint_secret(
482    State(server): State<Arc<Server>>,
483    AppJson(body): AppJson<RotateWaitpointSecretBody>,
484) -> Result<Response, ApiError> {
485    // Cap the whole endpoint end-to-end. If this trips, the caller's
486    // retry is SAFE — per-partition rotation is idempotent on the same
487    // (new_kid, secret_hex) and the per-partition SETNX lock prevents
488    // double-rotation under concurrent retries.
489    let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
490    let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
491        Ok(r) => r?,
492        Err(_) => {
493            tracing::error!(
494                target: "audit",
495                new_kid = %body.new_kid,
496                timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
497                "waitpoint_hmac_rotation_timeout_http_504"
498            );
499            let body = ErrorBody::plain(format!(
500                "rotation exceeded {}s server-side timeout; retry is safe \
501                 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
502                ROTATE_HTTP_TIMEOUT.as_secs()
503            ));
504            return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
505        }
506    };
507    // Operator action log at audit target (per-partition detail logged inside
508    // rotate_waitpoint_secret). Returns 200 if at least one partition rotated
509    // OR at least one partition was already in-flight under a concurrent
510    // rotation. Returns 400 only on actionable fault states.
511    //
512    // Three distinct rotated==0 cases:
513    //   - all in_progress → concurrent rotation handling it, NOT a failure.
514    //     200 OK with the structured result so operators see what's happening.
515    //   - failed.is_empty() && in_progress.is_empty() → no partitions at all
516    //     (num_flow_partitions == 0; env_u16_positive rejects this at
517    //     boot so this is mostly dead code for library/Default callers).
518    //   - !failed.is_empty() → every partition attempt raised a real error.
519    //     Operator investigates Valkey/auth/cluster health before retrying.
520    if result.rotated == 0 && result.failed.is_empty() && result.in_progress.is_empty() {
521        return Err(ApiError::from(ServerError::OperationFailed(
522            "rotation had no partitions to operate on \
523             (num_flow_partitions is 0 — server misconfigured)"
524                .to_owned(),
525        )));
526    }
527    if result.rotated == 0 && !result.failed.is_empty() {
528        return Err(ApiError::from(ServerError::OperationFailed(
529            "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
530        )));
531    }
532    // rotated==0 but some in_progress → 200 OK; caller polls / re-runs.
533    Ok(Json(result).into_response())
534}
535
536#[derive(Deserialize)]
537struct ChangePriorityBody {
538    new_priority: i32,
539}
540
541async fn change_priority(
542    State(server): State<Arc<Server>>,
543    Path(id): Path<String>,
544    AppJson(body): AppJson<ChangePriorityBody>,
545) -> Result<Json<ChangePriorityResult>, ApiError> {
546    let eid = parse_execution_id(&id)?;
547    Ok(Json(server.change_priority(&eid, body.new_priority).await?))
548}
549
550async fn replay_execution(
551    State(server): State<Arc<Server>>,
552    Path(id): Path<String>,
553) -> Result<Json<ReplayExecutionResult>, ApiError> {
554    let eid = parse_execution_id(&id)?;
555    Ok(Json(server.replay_execution(&eid).await?))
556}
557
558async fn revoke_lease(
559    State(server): State<Arc<Server>>,
560    Path(id): Path<String>,
561) -> Result<Json<RevokeLeaseResult>, ApiError> {
562    let eid = parse_execution_id(&id)?;
563    Ok(Json(server.revoke_lease(&eid).await?))
564}
565
566// ── Scheduler-routed claim (Batch C item 2 PR-B) ──
567//
568// The server exposes the scheduler's `claim_for_worker` cycle via
569// HTTP so ff-sdk workers can acquire claim grants without enabling
570// the `direct-valkey-claim` feature. The request body carries lane +
571// identity + capabilities; the server returns a serialized
572// `ClaimGrant` (or 204 No Content when no eligible execution exists).
573
574/// Request body for `POST /v1/workers/{worker_id}/claim`.
575#[derive(Deserialize)]
576struct ClaimForWorkerBody {
577    lane_id: String,
578    worker_instance_id: String,
579    /// Capability tokens this worker advertises. Sorted + validated
580    /// on the scheduler side; any non-printable/CSV-breaking token
581    /// surfaces as 400.
582    #[serde(default)]
583    capabilities: Vec<String>,
584    /// Grant TTL in milliseconds. Bounded so a worker can't request a
585    /// multi-hour grant and squat the execution.
586    grant_ttl_ms: u64,
587}
588
589/// Wire shape for `ff_core::contracts::ClaimGrant`. Core type is not
590/// serde-derived (it carries a `Partition` with a non-scalar family
591/// enum); keep a DTO here so we own the HTTP shape without touching
592/// the core contract.
593#[derive(Serialize)]
594struct ClaimGrantDto {
595    execution_id: String,
596    partition_family: &'static str,
597    partition_index: u16,
598    grant_key: String,
599    expires_at_ms: u64,
600}
601
602impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
603    fn from(g: ff_core::contracts::ClaimGrant) -> Self {
604        let family = match g.partition.family {
605            ff_core::partition::PartitionFamily::Flow => "flow",
606            ff_core::partition::PartitionFamily::Execution => "execution",
607            ff_core::partition::PartitionFamily::Budget => "budget",
608            ff_core::partition::PartitionFamily::Quota => "quota",
609        };
610        Self {
611            execution_id: g.execution_id.to_string(),
612            partition_family: family,
613            partition_index: g.partition.index,
614            grant_key: g.grant_key,
615            expires_at_ms: g.expires_at_ms,
616        }
617    }
618}
619
620/// Maximum grant TTL accepted via HTTP. Mirrors the scheduler's
621/// internal ceiling so a misconfigured worker can't squat an
622/// execution on a multi-hour grant.
623const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
624
625/// Reject empty / whitespace / non-printable identifiers the way
626/// [`LaneId::try_new`] does for lanes. WorkerId + WorkerInstanceId
627/// feed into scheduler scan jitter + Valkey key construction; silent
628/// acceptance of "" or "w\nork" would either mis-key or mis-hash.
629fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
630    if value.is_empty() {
631        return Err(ApiError(ServerError::InvalidInput(format!(
632            "{field}: must not be empty"
633        ))));
634    }
635    if value.len() > 256 {
636        return Err(ApiError(ServerError::InvalidInput(format!(
637            "{field}: exceeds 256 bytes (got {})",
638            value.len()
639        ))));
640    }
641    if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
642        return Err(ApiError(ServerError::InvalidInput(format!(
643            "{field}: must not contain whitespace or control characters"
644        ))));
645    }
646    Ok(())
647}
648
649async fn claim_for_worker(
650    State(server): State<Arc<Server>>,
651    Path(worker_id): Path<String>,
652    AppJson(body): AppJson<ClaimForWorkerBody>,
653) -> Result<Response, ApiError> {
654    validate_identifier("worker_id", &worker_id)?;
655    validate_identifier("worker_instance_id", &body.worker_instance_id)?;
656    let worker_id = WorkerId::new(worker_id);
657    let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
658    let lane = LaneId::try_new(body.lane_id).map_err(|e| {
659        ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
660    })?;
661    if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
662        return Err(ApiError(ServerError::InvalidInput(format!(
663            "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
664        ))));
665    }
666    let caps: std::collections::BTreeSet<String> =
667        body.capabilities.into_iter().collect();
668
669    match server
670        .claim_for_worker(
671            &lane,
672            &worker_id,
673            &worker_instance_id,
674            &caps,
675            body.grant_ttl_ms,
676        )
677        .await?
678    {
679        Some(grant) => Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response()),
680        None => Ok(StatusCode::NO_CONTENT.into_response()),
681    }
682}
683
684// ── Stream read + tail ──
685
686#[derive(Deserialize)]
687struct ReadStreamParams {
688    #[serde(default = "default_from_id")]
689    from: String,
690    #[serde(default = "default_to_id")]
691    to: String,
692    #[serde(default = "default_read_limit")]
693    limit: u64,
694}
695
696fn default_from_id() -> String { "-".to_owned() }
697fn default_to_id() -> String { "+".to_owned() }
698fn default_read_limit() -> u64 { 100 }
699
700/// Reject malformed `from`/`to`/`after` stream IDs at the REST boundary so
701/// Valkey doesn't have to. Accepts `"-"`, `"+"`, and `<ms>` or `<ms>-<seq>`
702/// decimal IDs as Valkey XRANGE/XREAD does.
703///
704/// Without this, a request like `?from=abc` would round-trip to Valkey,
705/// surface as a script error, and return HTTP 500 — which is wrong for
706/// caller-input validation.
707fn validate_stream_id(s: &str, field: &str, allow_open_markers: bool) -> Result<(), ApiError> {
708    if allow_open_markers && (s == "-" || s == "+") {
709        return Ok(());
710    }
711    // Allowed: `<digits>` or `<digits>-<digits>`.
712    let (ms_part, seq_part) = match s.split_once('-') {
713        Some((ms, seq)) => (ms, Some(seq)),
714        None => (s, None),
715    };
716    let ms_valid = !ms_part.is_empty() && ms_part.chars().all(|c| c.is_ascii_digit());
717    let seq_valid = seq_part
718        .map(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit()))
719        .unwrap_or(true);
720    if ms_valid && seq_valid {
721        Ok(())
722    } else {
723        Err(ApiError(ServerError::InvalidInput(format!(
724            "{field}: invalid stream ID '{s}' (expected '-', '+', '<ms>', or '<ms>-<seq>')"
725        ))))
726    }
727}
728
729#[derive(Serialize)]
730struct ReadStreamResponse {
731    frames: Vec<StreamFrame>,
732    count: usize,
733    /// When set, the producer has closed this stream — consumer should
734    /// stop polling. Absent when the stream is still open (or never
735    /// existed, which is indistinguishable from "still open" at this
736    /// layer).
737    #[serde(skip_serializing_if = "Option::is_none")]
738    closed_at: Option<i64>,
739    /// Reason from the closing writer: `attempt_success`, `attempt_failure`,
740    /// `attempt_cancelled`, `attempt_interrupted`. Absent iff still open.
741    #[serde(skip_serializing_if = "Option::is_none")]
742    closed_reason: Option<String>,
743}
744
745impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
746    fn from(sf: ff_core::contracts::StreamFrames) -> Self {
747        let count = sf.frames.len();
748        Self {
749            frames: sf.frames,
750            count,
751            closed_at: sf.closed_at.map(|t| t.0),
752            closed_reason: sf.closed_reason,
753        }
754    }
755}
756
757/// REST-layer ceiling on `limit` for stream read/tail responses. Lower
758/// than the internal `STREAM_READ_HARD_CAP` (10_000) because an HTTP
759/// response buffers the whole JSON body in memory in axum — a
760/// `10_000 × max_payload_bytes (65_536)` body is ~640MB per call, which
761/// is a DoS vector from a single client. Internal callers using FCALL or
762/// the SDK directly still get the full 10_000 ceiling; REST clients must
763/// paginate through `from`/`to` for larger spans.
764///
765/// v2 candidate: chunked-transfer / SSE when the caller wants > this bound.
766const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
767
768async fn read_attempt_stream(
769    State(server): State<Arc<Server>>,
770    Path((id, idx)): Path<(String, u32)>,
771    Query(params): Query<ReadStreamParams>,
772) -> Result<Json<ReadStreamResponse>, ApiError> {
773    if params.limit == 0 {
774        return Err(ApiError(ServerError::InvalidInput(
775            "limit must be >= 1".to_owned(),
776        )));
777    }
778    if params.limit > REST_STREAM_LIMIT_CEILING {
779        return Err(ApiError(ServerError::InvalidInput(format!(
780            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
781        ))));
782    }
783    validate_stream_id(&params.from, "from", true)?;
784    validate_stream_id(&params.to, "to", true)?;
785
786    let eid = parse_execution_id(&id)?;
787    let attempt_index = AttemptIndex::new(idx);
788    let result = server
789        .read_attempt_stream(&eid, attempt_index, &params.from, &params.to, params.limit)
790        .await?;
791    Ok(Json(result.into()))
792}
793
794#[derive(Deserialize)]
795struct TailStreamParams {
796    #[serde(default = "default_tail_after")]
797    after: String,
798    #[serde(default)]
799    block_ms: u64,
800    #[serde(default = "default_tail_limit")]
801    limit: u64,
802}
803
804fn default_tail_after() -> String { "0-0".to_owned() }
805fn default_tail_limit() -> u64 { 50 }
806
807/// Ceiling on BLOCK duration for the tail endpoint. Kept below common LB
808/// idle timeouts (ALB 60s, nginx 60s, Cloudflare 100s) so the HTTP response
809/// can't be cut mid-block.
810///
811/// Note: ferriskey's client auto-extends its `request_timeout` for XREAD
812/// BLOCK to `block_ms + 500ms`, so a blocking call with the full ceiling
813/// never produces a spurious transport timeout. See
814/// `ff_script::stream_tail` module docs for the exact ferriskey code path.
815const MAX_TAIL_BLOCK_MS: u64 = 30_000;
816
817async fn tail_attempt_stream(
818    State(server): State<Arc<Server>>,
819    Path((id, idx)): Path<(String, u32)>,
820    Query(params): Query<TailStreamParams>,
821) -> Result<Json<ReadStreamResponse>, ApiError> {
822    if params.block_ms > MAX_TAIL_BLOCK_MS {
823        return Err(ApiError(ServerError::InvalidInput(format!(
824            "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
825        ))));
826    }
827    if params.limit == 0 {
828        return Err(ApiError(ServerError::InvalidInput(
829            "limit must be >= 1".to_owned(),
830        )));
831    }
832    if params.limit > REST_STREAM_LIMIT_CEILING {
833        return Err(ApiError(ServerError::InvalidInput(format!(
834            "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
835        ))));
836    }
837    // XREAD cursor must be a concrete ID — "-"/"+" are XRANGE-only.
838    validate_stream_id(&params.after, "after", false)?;
839
840    let eid = parse_execution_id(&id)?;
841    let attempt_index = AttemptIndex::new(idx);
842    let result = server
843        .tail_attempt_stream(&eid, attempt_index, &params.after, params.block_ms, params.limit)
844        .await?;
845    Ok(Json(result.into()))
846}
847
848// ── Flow handlers ──
849
850async fn create_flow(
851    State(server): State<Arc<Server>>,
852    AppJson(args): AppJson<CreateFlowArgs>,
853) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
854    let result = server.create_flow(&args).await?;
855    let status = match &result {
856        CreateFlowResult::Created { .. } => StatusCode::CREATED,
857        CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
858    };
859    Ok((status, Json(result)))
860}
861
862async fn add_execution_to_flow(
863    State(server): State<Arc<Server>>,
864    Path(id): Path<String>,
865    AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
866) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
867    let path_fid = parse_flow_id(&id)?;
868    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
869    args.flow_id = path_fid;
870    let result = server.add_execution_to_flow(&args).await?;
871    let status = match &result {
872        AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
873        AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
874    };
875    Ok((status, Json(result)))
876}
877
878/// Cancel a flow.
879///
880/// By default the handler returns immediately with
881/// [`CancelFlowResult::CancellationScheduled`] (or `Cancelled` for flows
882/// with no members / non-cancel_all policies), and the individual member
883/// execution cancellations run in a background task on the server.
884/// Clients can track per-member progress by polling
885/// `GET /v1/executions/{id}/state` for each id in `member_execution_ids`.
886///
887/// Pass `?wait=true` to run the dispatch loop inline; the handler will not
888/// return until every member has been cancelled. Useful for tests and
889/// callers that need synchronous completion.
890async fn cancel_flow(
891    State(server): State<Arc<Server>>,
892    Path(id): Path<String>,
893    Query(params): Query<HashMap<String, String>>,
894    AppJson(mut args): AppJson<CancelFlowArgs>,
895) -> Result<Json<CancelFlowResult>, ApiError> {
896    let path_fid = parse_flow_id(&id)?;
897    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
898    args.flow_id = path_fid;
899    let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
900    let result = if wait {
901        server.cancel_flow_wait(&args).await?
902    } else {
903        server.cancel_flow(&args).await?
904    };
905    Ok(Json(result))
906}
907
908async fn stage_dependency_edge(
909    State(server): State<Arc<Server>>,
910    Path(id): Path<String>,
911    AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
912) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
913    let path_fid = parse_flow_id(&id)?;
914    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
915    args.flow_id = path_fid;
916    let result = server.stage_dependency_edge(&args).await?;
917    Ok((StatusCode::CREATED, Json(result)))
918}
919
920async fn apply_dependency_to_child(
921    State(server): State<Arc<Server>>,
922    Path(id): Path<String>,
923    AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
924) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
925    let path_fid = parse_flow_id(&id)?;
926    check_id_match(&path_fid, &args.flow_id, "flow_id")?;
927    args.flow_id = path_fid;
928    Ok(Json(server.apply_dependency_to_child(&args).await?))
929}
930
931// ── Budget / Quota handlers ──
932
933async fn create_budget(
934    State(server): State<Arc<Server>>,
935    AppJson(args): AppJson<CreateBudgetArgs>,
936) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
937    let result = server.create_budget(&args).await?;
938    let status = match &result {
939        CreateBudgetResult::Created { .. } => StatusCode::CREATED,
940        CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
941    };
942    Ok((status, Json(result)))
943}
944
945async fn get_budget_status(
946    State(server): State<Arc<Server>>,
947    Path(id): Path<String>,
948) -> Result<Json<BudgetStatus>, ApiError> {
949    let bid = parse_budget_id(&id)?;
950    Ok(Json(server.get_budget_status(&bid).await?))
951}
952
953#[derive(Deserialize)]
954struct ReportUsageBody {
955    dimensions: HashMap<String, u64>,
956    now: ff_core::types::TimestampMs,
957    #[serde(default)]
958    dedup_key: Option<String>,
959}
960
961async fn report_usage(
962    State(server): State<Arc<Server>>,
963    Path(id): Path<String>,
964    AppJson(body): AppJson<ReportUsageBody>,
965) -> Result<Json<ReportUsageResult>, ApiError> {
966    let bid = parse_budget_id(&id)?;
967    let dims: Vec<String> = body.dimensions.keys().cloned().collect();
968    let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
969    let args = ReportUsageArgs {
970        dimensions: dims,
971        deltas,
972        now: body.now,
973        dedup_key: body.dedup_key,
974    };
975    Ok(Json(server.report_usage(&bid, &args).await?))
976}
977
978async fn reset_budget(
979    State(server): State<Arc<Server>>,
980    Path(id): Path<String>,
981) -> Result<Json<ResetBudgetResult>, ApiError> {
982    let bid = parse_budget_id(&id)?;
983    Ok(Json(server.reset_budget(&bid).await?))
984}
985
986async fn create_quota_policy(
987    State(server): State<Arc<Server>>,
988    AppJson(args): AppJson<CreateQuotaPolicyArgs>,
989) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
990    let result = server.create_quota_policy(&args).await?;
991    let status = match &result {
992        CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
993        CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
994    };
995    Ok((status, Json(result)))
996}
997
998// ── Health check ──
999
1000#[derive(Serialize)]
1001struct HealthResponse {
1002    status: &'static str,
1003}
1004
1005async fn healthz(
1006    State(server): State<Arc<Server>>,
1007) -> Result<Json<HealthResponse>, ApiError> {
1008    let _: String = server
1009        .client()
1010        .cmd("PING")
1011        .execute()
1012        .await
1013        .map_err(|e| ApiError(ServerError::ValkeyContext { source: e, context: "healthz PING".into() }))?;
1014    Ok(Json(HealthResponse { status: "ok" }))
1015}
1016
1017// ── ID parsing helpers ──
1018
1019/// Return 400 if the body contains an ID that differs from the path ID.
1020fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1021    if body_id != path_id {
1022        return Err(ApiError(ServerError::InvalidInput(format!(
1023            "path {id_name} does not match body {id_name}"
1024        ))));
1025    }
1026    Ok(())
1027}
1028
1029fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1030    ExecutionId::parse(s)
1031        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1032}
1033
1034fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1035    FlowId::parse(s)
1036        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1037}
1038
1039fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1040    BudgetId::parse(s)
1041        .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1042}