1use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8 extract::{DefaultBodyLimit, MatchedPath, Path, Query, Request, State},
9 http::StatusCode,
10 middleware,
11 response::{IntoResponse, Response},
12 routing::{get, post, put, MethodRouter},
13 Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::Method;
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::config::ConfigError;
25use crate::server::{Server, ServerError};
26
27const BODY_LIMIT_LARGE_PAYLOAD: usize = 1024 * 1024; const BODY_LIMIT_MEDIUM_PAYLOAD: usize = 256 * 1024; const BODY_LIMIT_CONTROL: usize = 64 * 1024; #[derive(Clone, Copy)]
76struct BodyLimit {
77 bytes: usize,
78}
79
80fn with_body_limit(route: MethodRouter<Arc<Server>>, bytes: usize) -> MethodRouter<Arc<Server>> {
105 let r: MethodRouter<Arc<Server>> =
106 route.layer(tower_http::limit::RequestBodyLimitLayer::new(bytes));
107 let r: MethodRouter<Arc<Server>> = r.layer(DefaultBodyLimit::max(bytes));
108 r.layer(middleware::from_fn(
109 move |req: Request, next: middleware::Next| enforce_body_limit(bytes, req, next),
110 ))
111}
112
113async fn enforce_body_limit(bytes: usize, mut req: Request, next: middleware::Next) -> Response {
119 if let Some(content_length) = req
120 .headers()
121 .get(axum::http::header::CONTENT_LENGTH)
122 .and_then(|v| v.to_str().ok())
123 .and_then(|s| s.parse::<usize>().ok())
124 && content_length > bytes
125 {
126 let route = req
127 .extensions()
128 .get::<MatchedPath>()
129 .map(|m| m.as_str().to_owned())
130 .unwrap_or_default();
131 let body = PayloadTooLargeBody {
132 error: "payload_too_large",
133 limit_bytes: bytes,
134 route,
135 };
136 return (StatusCode::PAYLOAD_TOO_LARGE, Json(body)).into_response();
137 }
138 req.extensions_mut().insert(BodyLimit { bytes });
139 next.run(req).await
140}
141
142#[derive(Serialize)]
144struct PayloadTooLargeBody {
145 error: &'static str,
146 limit_bytes: usize,
147 route: String,
148}
149
150struct AppJson<T>(T);
153
154impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
155where
156 T: serde::de::DeserializeOwned + Send,
157 S: Send + Sync,
158{
159 type Rejection = Response;
160
161 async fn from_request(
162 req: axum::extract::Request,
163 state: &S,
164 ) -> Result<Self, Self::Rejection> {
165 let limit = req.extensions().get::<BodyLimit>().copied();
170 let matched = req.extensions().get::<MatchedPath>().cloned();
171
172 match Json::<T>::from_request(req, state).await {
173 Ok(Json(value)) => Ok(AppJson(value)),
174 Err(rejection) => {
175 let status = rejection.status();
176 tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
177 if status == StatusCode::PAYLOAD_TOO_LARGE {
178 let limit_bytes = limit.map(|l| l.bytes).unwrap_or(0);
179 let route = matched
180 .as_ref()
181 .map(|m| m.as_str().to_owned())
182 .unwrap_or_default();
183 let body = PayloadTooLargeBody {
184 error: "payload_too_large",
185 limit_bytes,
186 route,
187 };
188 return Err((status, Json(body)).into_response());
189 }
190 let body = ErrorBody::plain(format!(
191 "invalid JSON: {}",
192 status.canonical_reason().unwrap_or("bad request"),
193 ));
194 Err((status, Json(body)).into_response())
195 }
196 }
197 }
198}
199
200struct ApiError(ServerError);
203
204impl From<ServerError> for ApiError {
205 fn from(e: ServerError) -> Self {
206 Self(e)
207 }
208}
209
210impl From<ff_core::engine_error::EngineError> for ApiError {
216 fn from(e: ff_core::engine_error::EngineError) -> Self {
217 Self(ServerError::Engine(Box::new(e)))
218 }
219}
220
221#[derive(Serialize)]
226struct ErrorBody {
227 error: String,
228 #[serde(skip_serializing_if = "Option::is_none")]
229 kind: Option<String>,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 retryable: Option<bool>,
232}
233
234impl ErrorBody {
235 fn plain(error: String) -> Self {
236 Self { error, kind: None, retryable: None }
237 }
238}
239
240impl IntoResponse for ApiError {
241 fn into_response(self) -> Response {
242 let (status, body) = match &self.0 {
243 ServerError::NotFound(msg) => {
244 (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
245 }
246 ServerError::InvalidInput(msg) => {
247 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
248 }
249 ServerError::OperationFailed(msg) => {
250 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
251 }
252 ServerError::ConcurrencyLimitExceeded(source, max) => (
253 StatusCode::TOO_MANY_REQUESTS,
254 ErrorBody {
255 error: format!(
256 "too many concurrent {source} calls (server max: {max}); retry with backoff"
257 ),
258 kind: None,
259 retryable: Some(true),
260 },
261 ),
262 ServerError::Backend(be) => {
263 let kind_str = be.kind().as_stable_str();
264 tracing::error!(
265 kind = kind_str,
266 message = be.message(),
267 "backend error"
268 );
269 (
270 StatusCode::INTERNAL_SERVER_ERROR,
271 ErrorBody {
272 error: self.0.to_string(),
273 kind: Some(kind_str.to_owned()),
274 retryable: Some(self.0.is_retryable()),
275 },
276 )
277 }
278 ServerError::BackendContext { source, context } => {
279 let kind_str = source.kind().as_stable_str();
280 tracing::error!(
281 kind = kind_str,
282 message = source.message(),
283 context = %context,
284 "backend error"
285 );
286 (
287 StatusCode::INTERNAL_SERVER_ERROR,
288 ErrorBody {
289 error: self.0.to_string(),
290 kind: Some(kind_str.to_owned()),
291 retryable: Some(self.0.is_retryable()),
292 },
293 )
294 }
295 ServerError::LibraryLoad(load_err) => {
296 let kind_str = load_err
297 .valkey_kind()
298 .map(ff_backend_valkey::classify_ferriskey_kind)
299 .map(|k| k.as_stable_str());
300 tracing::error!(
301 kind = kind_str.unwrap_or(""),
302 error = %load_err,
303 "library load failure"
304 );
305 (
306 StatusCode::INTERNAL_SERVER_ERROR,
307 ErrorBody {
308 error: format!("library load: {load_err}"),
309 kind: kind_str.map(str::to_owned),
310 retryable: Some(self.0.is_retryable()),
311 },
312 )
313 }
314 ServerError::Engine(boxed) => {
322 use ff_core::engine_error::EngineError as EE;
323 fn root(e: &EE) -> &EE {
326 match e {
327 EE::Contextual { source, .. } => root(source),
328 other => other,
329 }
330 }
331 match root(boxed) {
332 EE::ResourceExhausted { pool, max, .. } => (
333 StatusCode::TOO_MANY_REQUESTS,
334 ErrorBody {
335 error: format!(
336 "too many concurrent {pool} calls (server max: {max}); retry with backoff"
337 ),
338 kind: Some("resource_exhausted".into()),
339 retryable: Some(true),
340 },
341 ),
342 EE::Unavailable { op } => (
343 StatusCode::SERVICE_UNAVAILABLE,
344 ErrorBody {
345 error: format!("backend op unavailable: {op}"),
346 kind: Some("unavailable".into()),
347 retryable: Some(false),
348 },
349 ),
350 EE::NotFound { entity } => (
351 StatusCode::NOT_FOUND,
352 ErrorBody::plain(format!("not found: {entity}")),
353 ),
354 EE::Validation { kind, detail } => {
362 use ff_core::engine_error::ValidationKind as VK;
363 let code = match kind {
364 VK::InvalidInput => "invalid_input",
365 VK::CapabilityMismatch => "capability_mismatch",
366 VK::InvalidCapabilities => "invalid_capabilities",
367 VK::InvalidPolicyJson => "invalid_policy_json",
368 VK::PayloadTooLarge => "payload_too_large",
369 VK::SignalLimitExceeded => "signal_limit_exceeded",
370 VK::InvalidWaitpointKey => "invalid_waitpoint_key",
371 VK::InvalidToken => "invalid_token",
372 VK::WaitpointNotTokenBound => "waitpoint_not_token_bound",
373 VK::RetentionLimitExceeded => "retention_limit_exceeded",
374 VK::InvalidLeaseForSuspend => "invalid_lease_for_suspend",
375 VK::InvalidDependency => "invalid_dependency",
376 VK::InvalidWaitpointForExecution => "invalid_waitpoint_for_execution",
377 VK::InvalidBlockingReason => "invalid_blocking_reason",
378 VK::InvalidOffset => "invalid_offset",
379 VK::Unauthorized => "unauthorized",
380 VK::InvalidBudgetScope => "invalid_budget_scope",
381 VK::BudgetOverrideNotAllowed => "budget_override_not_allowed",
382 VK::InvalidQuotaSpec => "invalid_quota_spec",
383 VK::InvalidKid => "invalid_kid",
384 VK::InvalidSecretHex => "invalid_secret_hex",
385 VK::InvalidGraceMs => "invalid_grace_ms",
386 VK::InvalidTagKey => "invalid_tag_key",
387 VK::InvalidFrameType => "invalid_frame_type",
388 _ => "validation_error",
389 };
390 let msg = if detail.is_empty() {
391 code.to_string()
392 } else {
393 format!("{code}: {detail}")
394 };
395 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg))
396 }
397 EE::Conflict(kind) => {
402 use ff_core::engine_error::ConflictKind as CK;
403 let code = match kind {
404 CK::DependencyAlreadyExists { .. } => "dependency_already_exists",
405 CK::CycleDetected => "cycle_detected",
406 CK::SelfReferencingEdge => "self_referencing_edge",
407 CK::ExecutionAlreadyInFlow => "execution_already_in_flow",
408 CK::WaitpointAlreadyExists => "waitpoint_already_exists",
409 CK::BudgetAttachConflict => "budget_attach_conflict",
410 CK::QuotaAttachConflict => "quota_attach_conflict",
411 CK::RotationConflict(_) => "rotation_conflict",
412 CK::ActiveAttemptExists => "active_attempt_exists",
413 _ => "conflict",
414 };
415 (
416 StatusCode::CONFLICT,
417 ErrorBody {
418 error: format!("{code}: {kind:?}"),
419 kind: Some(code.into()),
420 retryable: Some(false),
421 },
422 )
423 }
424 EE::Contention(ck) => {
431 use ff_core::engine_error::ContentionKind as CK;
432 let (status, code, retryable) = match ck {
433 CK::RetryExhausted => (
434 StatusCode::INTERNAL_SERVER_ERROR,
435 "retry_exhausted",
436 false,
437 ),
438 CK::RateLimitExceeded => (
439 StatusCode::TOO_MANY_REQUESTS,
440 "rate_limit_exceeded",
441 true,
442 ),
443 CK::ConcurrencyLimitExceeded => (
444 StatusCode::TOO_MANY_REQUESTS,
445 "concurrency_limit_exceeded",
446 true,
447 ),
448 _ => (StatusCode::CONFLICT, "contention", true),
449 };
450 (
451 status,
452 ErrorBody {
453 error: format!("{code}: {ck:?}"),
454 kind: Some(code.into()),
455 retryable: Some(retryable),
456 },
457 )
458 }
459 EE::State(sk) => {
466 use ff_core::engine_error::StateKind as SK;
467 let (status, code) = match sk {
468 SK::ExecutionNotTerminal => {
471 (StatusCode::CONFLICT, "execution_not_terminal")
472 }
473 SK::MaxReplaysExhausted => {
474 (StatusCode::CONFLICT, "max_replays_exhausted")
475 }
476 SK::ReplayNotAllowed => {
477 (StatusCode::CONFLICT, "replay_not_allowed")
478 }
479 SK::NotRunnable => (StatusCode::CONFLICT, "not_runnable"),
480 SK::Terminal => (StatusCode::CONFLICT, "terminal"),
481 SK::FlowAlreadyTerminal => {
482 (StatusCode::CONFLICT, "flow_already_terminal")
483 }
484 SK::BudgetExceeded => (StatusCode::CONFLICT, "budget_exceeded"),
491 SK::BudgetSoftExceeded => {
492 (StatusCode::CONFLICT, "budget_soft_exceeded")
493 }
494 SK::AlreadySatisfied
501 | SK::DuplicateSignal
502 | SK::OkAlreadyApplied
503 | SK::AttemptAlreadyTerminal
504 | SK::StreamAlreadyClosed
505 | SK::LeaseExpired
506 | SK::LeaseRevoked => (StatusCode::CONFLICT, "already_satisfied"),
507 _ => (StatusCode::CONFLICT, "state_conflict"),
508 };
509 (
510 status,
511 ErrorBody {
512 error: format!("{code}: {sk:?}"),
513 kind: Some(code.into()),
514 retryable: Some(false),
515 },
516 )
517 }
518 _ => (
519 StatusCode::INTERNAL_SERVER_ERROR,
520 ErrorBody {
521 error: self.0.to_string(),
522 kind: None,
523 retryable: Some(self.0.is_retryable()),
524 },
525 ),
526 }
527 }
528 other => (
532 StatusCode::INTERNAL_SERVER_ERROR,
533 ErrorBody {
534 error: other.to_string(),
535 kind: None,
536 retryable: Some(false),
537 },
538 ),
539 };
540 (status, Json(body)).into_response()
541 }
542}
543
544pub fn router(
547 server: Arc<Server>,
548 cors_origins: &[String],
549 api_token: Option<String>,
550) -> Result<Router, ConfigError> {
551 router_with_metrics(server, cors_origins, api_token, None)
552}
553
554pub fn router_with_metrics(
563 server: Arc<Server>,
564 cors_origins: &[String],
565 api_token: Option<String>,
566 #[cfg_attr(not(feature = "observability"), allow(unused_variables))]
567 metrics: Option<Arc<crate::Metrics>>,
568) -> Result<Router, ConfigError> {
569 let auth_enabled = api_token.is_some();
570 let cors = build_cors_layer(cors_origins, auth_enabled)?;
571
572 let mut app = Router::new()
576 .route(
578 "/v1/executions",
579 with_body_limit(
580 get(list_executions).post(create_execution),
581 BODY_LIMIT_LARGE_PAYLOAD,
582 ),
583 )
584 .route("/v1/executions/{id}", get(get_execution))
585 .route("/v1/executions/{id}/state", get(get_execution_state))
586 .route(
587 "/v1/executions/{id}/pending-waitpoints",
588 get(list_pending_waitpoints),
589 )
590 .route("/v1/executions/{id}/result", get(get_execution_result))
591 .route(
592 "/v1/executions/{id}/cancel",
593 with_body_limit(post(cancel_execution), BODY_LIMIT_CONTROL),
594 )
595 .route(
596 "/v1/executions/{id}/signal",
597 with_body_limit(post(deliver_signal), BODY_LIMIT_MEDIUM_PAYLOAD),
598 )
599 .route(
600 "/v1/executions/{id}/priority",
601 with_body_limit(put(change_priority), BODY_LIMIT_CONTROL),
602 )
603 .route(
604 "/v1/executions/{id}/replay",
605 with_body_limit(post(replay_execution), BODY_LIMIT_CONTROL),
606 )
607 .route(
608 "/v1/executions/{id}/revoke-lease",
609 with_body_limit(post(revoke_lease), BODY_LIMIT_CONTROL),
610 )
611 .route(
616 "/v1/workers/{worker_id}/claim",
617 with_body_limit(post(claim_for_worker), BODY_LIMIT_CONTROL),
618 )
619 .route(
621 "/v1/executions/{id}/attempts/{idx}/stream",
622 get(read_attempt_stream),
623 )
624 .route(
625 "/v1/executions/{id}/attempts/{idx}/stream/tail",
626 get(tail_attempt_stream),
627 )
628 .route(
630 "/v1/flows",
631 with_body_limit(post(create_flow), BODY_LIMIT_CONTROL),
632 )
633 .route(
634 "/v1/flows/{id}/members",
635 with_body_limit(post(add_execution_to_flow), BODY_LIMIT_CONTROL),
636 )
637 .route(
638 "/v1/flows/{id}/cancel",
639 with_body_limit(post(cancel_flow), BODY_LIMIT_CONTROL),
640 )
641 .route(
642 "/v1/flows/{id}/edges",
643 with_body_limit(post(stage_dependency_edge), BODY_LIMIT_CONTROL),
644 )
645 .route(
646 "/v1/flows/{id}/edges/apply",
647 with_body_limit(post(apply_dependency_to_child), BODY_LIMIT_CONTROL),
648 )
649 .route(
651 "/v1/budgets",
652 with_body_limit(post(create_budget), BODY_LIMIT_CONTROL),
653 )
654 .route("/v1/budgets/{id}", get(get_budget_status))
655 .route(
656 "/v1/budgets/{id}/usage",
657 with_body_limit(post(report_usage), BODY_LIMIT_CONTROL),
658 )
659 .route(
660 "/v1/budgets/{id}/reset",
661 with_body_limit(post(reset_budget), BODY_LIMIT_CONTROL),
662 )
663 .route(
665 "/v1/quotas",
666 with_body_limit(post(create_quota_policy), BODY_LIMIT_CONTROL),
667 )
668 .route(
670 "/v1/admin/rotate-waitpoint-secret",
671 with_body_limit(post(rotate_waitpoint_secret), BODY_LIMIT_CONTROL),
672 )
673 .route("/healthz", get(healthz));
675
676 if let Some(token) = api_token {
677 let token = Arc::new(token);
678 app = app.layer(middleware::from_fn(move |req, next| {
679 let token = token.clone();
680 auth_middleware(token, req, next)
681 }));
682 }
683
684 #[cfg(feature = "observability")]
690 if let Some(m) = metrics.as_ref() {
691 let m = m.clone();
692 app = app.layer(middleware::from_fn_with_state(
693 m,
694 crate::metrics::http_middleware,
695 ));
696 }
697
698 #[cfg_attr(not(feature = "observability"), allow(unused_mut))]
699 let mut app = app
700 .layer(TraceLayer::new_for_http())
701 .layer(cors)
702 .with_state(server);
703
704 #[cfg(feature = "observability")]
711 if let Some(m) = metrics {
712 let metrics_router: Router = Router::new()
713 .route("/metrics", get(crate::metrics::metrics_handler))
714 .with_state(m);
715 app = app.merge(metrics_router);
716 }
717
718 Ok(app)
719}
720
721async fn auth_middleware(
722 token: Arc<String>,
723 req: Request,
724 next: middleware::Next,
725) -> Response {
726 if req.uri().path() == "/healthz" {
727 return next.run(req).await;
728 }
729
730 let auth_header = req
731 .headers()
732 .get("authorization")
733 .and_then(|v| v.to_str().ok());
734
735 let authorized = auth_header
736 .and_then(|v| v.strip_prefix("Bearer "))
737 .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
738
739 if authorized {
740 next.run(req).await
741 } else {
742 (
743 StatusCode::UNAUTHORIZED,
744 Json(ErrorBody::plain(
745 "missing or invalid Authorization header".to_owned(),
746 )),
747 )
748 .into_response()
749 }
750}
751
752fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
753 if a.len() != b.len() {
754 return false;
755 }
756 let mut diff = 0u8;
757 for (x, y) in a.iter().zip(b.iter()) {
758 diff |= x ^ y;
759 }
760 diff == 0
761}
762
763fn build_cors_layer(origins: &[String], auth_enabled: bool) -> Result<CorsLayer, ConfigError> {
774 if origins.iter().any(|o| o == "*") {
775 return Ok(CorsLayer::permissive());
776 }
777
778 let mut parsed = Vec::with_capacity(origins.len());
779 let mut accepted = Vec::with_capacity(origins.len());
780 let mut invalid = Vec::new();
781 for o in origins {
782 match o.parse() {
783 Ok(v) => {
784 parsed.push(v);
785 accepted.push(o.as_str());
786 }
787 Err(_) => invalid.push(o.clone()),
788 }
789 }
790
791 if parsed.is_empty() && !origins.is_empty() {
792 return Err(ConfigError::InvalidValue {
793 var: "FF_CORS_ORIGINS".to_owned(),
794 message: format!(
795 "all configured origins failed to parse as valid HTTP header values: {:?}; \
796 refusing to fall back to permissive CORS",
797 origins
798 ),
799 });
800 }
801
802 if !invalid.is_empty() {
803 tracing::warn!(
806 ?invalid,
807 ?accepted,
808 "some FF_CORS_ORIGINS entries failed to parse and were dropped"
809 );
810 }
811
812 let mut headers = vec![axum::http::header::CONTENT_TYPE];
815 if auth_enabled {
816 headers.push(axum::http::header::AUTHORIZATION);
817 }
818
819 Ok(CorsLayer::new()
820 .allow_origin(AllowOrigin::list(parsed))
821 .allow_methods([Method::GET, Method::POST, Method::PUT])
822 .allow_headers(headers))
823}
824
825#[derive(Deserialize)]
828struct ListExecutionsParams {
829 partition: u16,
832 #[serde(default)]
835 cursor: Option<String>,
836 #[serde(default = "default_limit")]
837 limit: u64,
838}
839
840fn default_limit() -> u64 { 50 }
841
842async fn list_executions(
855 State(server): State<Arc<Server>>,
856 Query(params): Query<ListExecutionsParams>,
857) -> Result<Json<ListExecutionsPage>, ApiError> {
858 let limit = params.limit.min(1000) as usize;
859 let cursor = match params.cursor {
860 Some(raw) if !raw.is_empty() => Some(
861 ff_core::types::ExecutionId::parse(&raw).map_err(|e| {
862 ApiError::from(ServerError::InvalidInput(format!(
863 "invalid cursor: {e}"
864 )))
865 })?,
866 ),
867 _ => None,
868 };
869 let result = server
870 .list_executions_page(params.partition, cursor, limit)
871 .await?;
872 Ok(Json(result))
873}
874
875async fn create_execution(
876 State(server): State<Arc<Server>>,
877 AppJson(args): AppJson<CreateExecutionArgs>,
878) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
879 let result = server.backend().create_execution(args).await?;
883 let status = match &result {
884 CreateExecutionResult::Created { .. } => StatusCode::CREATED,
885 CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
886 };
887 Ok((status, Json(result)))
888}
889
890async fn get_execution(
891 State(server): State<Arc<Server>>,
892 Path(id): Path<String>,
893) -> Result<Json<ExecutionInfo>, ApiError> {
894 let eid = parse_execution_id(&id)?;
895 Ok(Json(server.get_execution(&eid).await?))
896}
897
898async fn get_execution_state(
899 State(server): State<Arc<Server>>,
900 Path(id): Path<String>,
901) -> Result<Json<PublicState>, ApiError> {
902 let eid = parse_execution_id(&id)?;
903 Ok(Json(server.get_execution_state(&eid).await?))
904}
905
906async fn list_pending_waitpoints(
918 State(server): State<Arc<Server>>,
919 Path(id): Path<String>,
920) -> Result<Response, ApiError> {
921 let eid = parse_execution_id(&id)?;
926 let args = ff_core::contracts::ListPendingWaitpointsArgs::new(eid);
927 let page = server.backend().list_pending_waitpoints(args).await?;
928 Ok(Json(page.entries).into_response())
929}
930
931async fn get_execution_result(
966 State(server): State<Arc<Server>>,
967 Path(id): Path<String>,
968) -> Result<Response, ApiError> {
969 let eid = parse_execution_id(&id)?;
971 match server.backend().get_execution_result(&eid).await? {
972 Some(bytes) => Ok((
973 StatusCode::OK,
974 [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
975 bytes,
976 )
977 .into_response()),
978 None => Err(ApiError(ServerError::NotFound(format!(
979 "execution result not found: {eid}"
980 )))),
981 }
982}
983
984async fn cancel_execution(
985 State(server): State<Arc<Server>>,
986 Path(id): Path<String>,
987 AppJson(mut args): AppJson<CancelExecutionArgs>,
988) -> Result<Json<CancelExecutionResult>, ApiError> {
989 let path_eid = parse_execution_id(&id)?;
995 check_id_match(&path_eid, &args.execution_id, "execution_id")?;
996 args.execution_id = path_eid;
997 Ok(Json(server.backend().cancel_execution(args).await?))
998}
999
1000async fn deliver_signal(
1001 State(server): State<Arc<Server>>,
1002 Path(id): Path<String>,
1003 AppJson(mut args): AppJson<DeliverSignalArgs>,
1004) -> Result<Json<DeliverSignalResult>, ApiError> {
1005 let path_eid = parse_execution_id(&id)?;
1006 check_id_match(&path_eid, &args.execution_id, "execution_id")?;
1007 args.execution_id = path_eid;
1008 Ok(Json(server.deliver_signal(&args).await?))
1009}
1010
1011#[derive(Deserialize)]
1014struct RotateWaitpointSecretBody {
1015 new_kid: String,
1016 new_secret_hex: String,
1018}
1019
1020const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
1034
1035async fn rotate_waitpoint_secret(
1036 State(server): State<Arc<Server>>,
1037 AppJson(body): AppJson<RotateWaitpointSecretBody>,
1038) -> Result<Response, ApiError> {
1039 let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
1044 let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
1045 Ok(r) => r?,
1046 Err(_) => {
1047 tracing::error!(
1048 target: "audit",
1049 new_kid = %body.new_kid,
1050 timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
1051 "waitpoint_hmac_rotation_timeout_http_504"
1052 );
1053 let body = ErrorBody::plain(format!(
1054 "rotation exceeded {}s server-side timeout; retry is safe \
1055 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
1056 ROTATE_HTTP_TIMEOUT.as_secs()
1057 ));
1058 return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
1059 }
1060 };
1061 if result.rotated == 0 && result.failed.is_empty() {
1071 return Err(ApiError::from(ServerError::OperationFailed(
1072 "rotation had no partitions to operate on \
1073 (num_flow_partitions is 0 — server misconfigured)"
1074 .to_owned(),
1075 )));
1076 }
1077 if result.rotated == 0 && !result.failed.is_empty() {
1078 return Err(ApiError::from(ServerError::OperationFailed(
1079 "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
1080 )));
1081 }
1082 Ok(Json(result).into_response())
1083}
1084
1085#[derive(Deserialize)]
1086struct ChangePriorityBody {
1087 new_priority: i32,
1088}
1089
1090async fn change_priority(
1091 State(server): State<Arc<Server>>,
1092 Path(id): Path<String>,
1093 AppJson(body): AppJson<ChangePriorityBody>,
1094) -> Result<Json<ChangePriorityResult>, ApiError> {
1095 let eid = parse_execution_id(&id)?;
1097 let args = ff_core::contracts::ChangePriorityArgs {
1098 execution_id: eid,
1099 new_priority: body.new_priority,
1100 lane_id: LaneId::new(""),
1105 now: ff_core::types::TimestampMs::now(),
1106 };
1107 Ok(Json(server.backend().change_priority(args).await?))
1108}
1109
1110async fn replay_execution(
1111 State(server): State<Arc<Server>>,
1112 Path(id): Path<String>,
1113) -> Result<Json<ReplayExecutionResult>, ApiError> {
1114 let eid = parse_execution_id(&id)?;
1117 let args = ff_core::contracts::ReplayExecutionArgs {
1118 execution_id: eid,
1119 now: ff_core::types::TimestampMs::now(),
1120 };
1121 Ok(Json(server.backend().replay_execution(args).await?))
1122}
1123
1124async fn revoke_lease(
1125 State(server): State<Arc<Server>>,
1126 Path(id): Path<String>,
1127) -> Result<Json<RevokeLeaseResult>, ApiError> {
1128 let eid = parse_execution_id(&id)?;
1130 let args = ff_core::contracts::RevokeLeaseArgs {
1131 execution_id: eid,
1132 expected_lease_id: None,
1133 worker_instance_id: WorkerInstanceId::new(""),
1137 reason: "operator_revoke".to_owned(),
1138 };
1139 Ok(Json(server.backend().revoke_lease(args).await?))
1140}
1141
1142#[derive(Deserialize)]
1152struct ClaimForWorkerBody {
1153 lane_id: String,
1154 worker_instance_id: String,
1155 #[serde(default)]
1159 capabilities: Vec<String>,
1160 grant_ttl_ms: u64,
1163}
1164
1165#[derive(Serialize)]
1169struct ClaimGrantDto {
1170 execution_id: String,
1171 partition_key: ff_core::partition::PartitionKey,
1172 grant_key: String,
1173 expires_at_ms: u64,
1174}
1175
1176impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
1177 fn from(g: ff_core::contracts::ClaimGrant) -> Self {
1178 Self {
1179 execution_id: g.execution_id.to_string(),
1180 partition_key: g.partition_key,
1181 grant_key: g.grant_key,
1182 expires_at_ms: g.expires_at_ms,
1183 }
1184 }
1185}
1186
1187const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
1191
1192fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
1197 if value.is_empty() {
1198 return Err(ApiError(ServerError::InvalidInput(format!(
1199 "{field}: must not be empty"
1200 ))));
1201 }
1202 if value.len() > 256 {
1203 return Err(ApiError(ServerError::InvalidInput(format!(
1204 "{field}: exceeds 256 bytes (got {})",
1205 value.len()
1206 ))));
1207 }
1208 if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
1209 return Err(ApiError(ServerError::InvalidInput(format!(
1210 "{field}: must not contain whitespace or control characters"
1211 ))));
1212 }
1213 Ok(())
1214}
1215
1216async fn claim_for_worker(
1217 State(server): State<Arc<Server>>,
1218 Path(worker_id): Path<String>,
1219 AppJson(body): AppJson<ClaimForWorkerBody>,
1220) -> Result<Response, ApiError> {
1221 validate_identifier("worker_id", &worker_id)?;
1222 validate_identifier("worker_instance_id", &body.worker_instance_id)?;
1223 let worker_id = WorkerId::new(worker_id);
1224 let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
1225 let lane = LaneId::try_new(body.lane_id).map_err(|e| {
1226 ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
1227 })?;
1228 if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
1229 return Err(ApiError(ServerError::InvalidInput(format!(
1230 "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
1231 ))));
1232 }
1233 let caps: std::collections::BTreeSet<String> =
1234 body.capabilities.into_iter().collect();
1235
1236 let args = ff_core::contracts::ClaimForWorkerArgs::new(
1240 lane,
1241 worker_id,
1242 worker_instance_id,
1243 caps,
1244 body.grant_ttl_ms,
1245 );
1246 match server.backend().claim_for_worker(args).await? {
1247 ff_core::contracts::ClaimForWorkerOutcome::Granted(grant) => {
1248 Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response())
1249 }
1250 ff_core::contracts::ClaimForWorkerOutcome::NoWork => {
1251 Ok(StatusCode::NO_CONTENT.into_response())
1252 }
1253 _ => Ok((
1259 StatusCode::SERVICE_UNAVAILABLE,
1260 Json(ErrorBody::plain(
1261 "claim_for_worker: backend returned a non-exhaustive outcome this server build does not understand".to_owned(),
1262 )),
1263 )
1264 .into_response()),
1265 }
1266}
1267
1268#[derive(Deserialize)]
1271struct ReadStreamParams {
1272 #[serde(default = "ff_core::contracts::StreamCursor::start")]
1273 from: ff_core::contracts::StreamCursor,
1274 #[serde(default = "ff_core::contracts::StreamCursor::end")]
1275 to: ff_core::contracts::StreamCursor,
1276 #[serde(default = "default_read_limit")]
1277 limit: u64,
1278}
1279
1280fn default_read_limit() -> u64 { 100 }
1281
1282#[derive(Serialize)]
1283struct ReadStreamResponse {
1284 frames: Vec<StreamFrame>,
1285 count: usize,
1286 #[serde(skip_serializing_if = "Option::is_none")]
1291 closed_at: Option<i64>,
1292 #[serde(skip_serializing_if = "Option::is_none")]
1295 closed_reason: Option<String>,
1296}
1297
1298impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
1299 fn from(sf: ff_core::contracts::StreamFrames) -> Self {
1300 let count = sf.frames.len();
1301 Self {
1302 frames: sf.frames,
1303 count,
1304 closed_at: sf.closed_at.map(|t| t.0),
1305 closed_reason: sf.closed_reason,
1306 }
1307 }
1308}
1309
1310const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
1320
1321async fn read_attempt_stream(
1322 State(server): State<Arc<Server>>,
1323 Path((id, idx)): Path<(String, u32)>,
1324 Query(params): Query<ReadStreamParams>,
1325) -> Result<Json<ReadStreamResponse>, ApiError> {
1326 if params.limit == 0 {
1327 return Err(ApiError(ServerError::InvalidInput(
1328 "limit must be >= 1".to_owned(),
1329 )));
1330 }
1331 if params.limit > REST_STREAM_LIMIT_CEILING {
1332 return Err(ApiError(ServerError::InvalidInput(format!(
1333 "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
1334 ))));
1335 }
1336 let eid = parse_execution_id(&id)?;
1337 let attempt_index = AttemptIndex::new(idx);
1338 let result = server
1339 .read_attempt_stream(
1340 &eid,
1341 attempt_index,
1342 params.from.to_wire(),
1343 params.to.to_wire(),
1344 params.limit,
1345 )
1346 .await?;
1347 Ok(Json(result.into()))
1348}
1349
1350#[derive(Deserialize)]
1351struct TailStreamParams {
1352 #[serde(default = "ff_core::contracts::StreamCursor::beginning")]
1353 after: ff_core::contracts::StreamCursor,
1354 #[serde(default)]
1355 block_ms: u64,
1356 #[serde(default = "default_tail_limit")]
1357 limit: u64,
1358}
1359
1360fn default_tail_limit() -> u64 { 50 }
1361
1362const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1371
1372async fn tail_attempt_stream(
1373 State(server): State<Arc<Server>>,
1374 Path((id, idx)): Path<(String, u32)>,
1375 Query(params): Query<TailStreamParams>,
1376) -> Result<Json<ReadStreamResponse>, ApiError> {
1377 if params.block_ms > MAX_TAIL_BLOCK_MS {
1378 return Err(ApiError(ServerError::InvalidInput(format!(
1379 "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
1380 ))));
1381 }
1382 if params.limit == 0 {
1383 return Err(ApiError(ServerError::InvalidInput(
1384 "limit must be >= 1".to_owned(),
1385 )));
1386 }
1387 if params.limit > REST_STREAM_LIMIT_CEILING {
1388 return Err(ApiError(ServerError::InvalidInput(format!(
1389 "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
1390 ))));
1391 }
1392 if !params.after.is_concrete() {
1399 return Err(ApiError(ServerError::InvalidInput(
1400 "after: XREAD cursor must be a concrete entry id; pass '0-0' to start from the beginning"
1401 .to_owned(),
1402 )));
1403 }
1404
1405 let eid = parse_execution_id(&id)?;
1406 let attempt_index = AttemptIndex::new(idx);
1407 let result = server
1408 .tail_attempt_stream(
1409 &eid,
1410 attempt_index,
1411 params.after.to_wire(),
1412 params.block_ms,
1413 params.limit,
1414 )
1415 .await?;
1416 Ok(Json(result.into()))
1417}
1418
1419async fn create_flow(
1422 State(server): State<Arc<Server>>,
1423 AppJson(args): AppJson<CreateFlowArgs>,
1424) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
1425 let result = server.backend().create_flow(args).await?;
1427 let status = match &result {
1428 CreateFlowResult::Created { .. } => StatusCode::CREATED,
1429 CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
1430 };
1431 Ok((status, Json(result)))
1432}
1433
1434async fn add_execution_to_flow(
1435 State(server): State<Arc<Server>>,
1436 Path(id): Path<String>,
1437 AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
1438) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
1439 let path_fid = parse_flow_id(&id)?;
1440 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1441 args.flow_id = path_fid;
1442 let result = server.backend().add_execution_to_flow(args).await?;
1444 let status = match &result {
1445 AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
1446 AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
1447 };
1448 Ok((status, Json(result)))
1449}
1450
1451async fn cancel_flow(
1464 State(server): State<Arc<Server>>,
1465 Path(id): Path<String>,
1466 Query(params): Query<HashMap<String, String>>,
1467 AppJson(mut args): AppJson<CancelFlowArgs>,
1468) -> Result<Json<CancelFlowResult>, ApiError> {
1469 let path_fid = parse_flow_id(&id)?;
1470 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1471 args.flow_id = path_fid;
1472 let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
1473 let result = if wait {
1483 server.cancel_flow_wait(&args).await?
1484 } else {
1485 server.cancel_flow(&args).await?
1486 };
1487 Ok(Json(result))
1488}
1489
1490async fn stage_dependency_edge(
1491 State(server): State<Arc<Server>>,
1492 Path(id): Path<String>,
1493 AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
1494) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
1495 let path_fid = parse_flow_id(&id)?;
1496 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1497 args.flow_id = path_fid;
1498 let result = server.backend().stage_dependency_edge(args).await?;
1500 Ok((StatusCode::CREATED, Json(result)))
1501}
1502
1503async fn apply_dependency_to_child(
1504 State(server): State<Arc<Server>>,
1505 Path(id): Path<String>,
1506 AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
1507) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
1508 let path_fid = parse_flow_id(&id)?;
1509 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1510 args.flow_id = path_fid;
1511 Ok(Json(server.backend().apply_dependency_to_child(args).await?))
1513}
1514
1515async fn create_budget(
1518 State(server): State<Arc<Server>>,
1519 AppJson(args): AppJson<CreateBudgetArgs>,
1520) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
1521 let result = server.backend().create_budget(args).await?;
1523 let status = match &result {
1524 CreateBudgetResult::Created { .. } => StatusCode::CREATED,
1525 CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
1526 };
1527 Ok((status, Json(result)))
1528}
1529
1530async fn get_budget_status(
1531 State(server): State<Arc<Server>>,
1532 Path(id): Path<String>,
1533) -> Result<Json<BudgetStatus>, ApiError> {
1534 let bid = parse_budget_id(&id)?;
1536 Ok(Json(server.backend().get_budget_status(&bid).await?))
1537}
1538
1539#[derive(Deserialize)]
1540struct ReportUsageBody {
1541 dimensions: HashMap<String, u64>,
1542 now: ff_core::types::TimestampMs,
1543 #[serde(default)]
1544 dedup_key: Option<String>,
1545}
1546
1547async fn report_usage(
1548 State(server): State<Arc<Server>>,
1549 Path(id): Path<String>,
1550 AppJson(body): AppJson<ReportUsageBody>,
1551) -> Result<Json<ReportUsageResult>, ApiError> {
1552 let bid = parse_budget_id(&id)?;
1555 let dims: Vec<String> = body.dimensions.keys().cloned().collect();
1556 let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
1557 let mut args = ff_core::contracts::ReportUsageAdminArgs::new(dims, deltas, body.now);
1558 if let Some(k) = body.dedup_key {
1559 args = args.with_dedup_key(k);
1560 }
1561 Ok(Json(server.backend().report_usage_admin(&bid, args).await?))
1562}
1563
1564async fn reset_budget(
1565 State(server): State<Arc<Server>>,
1566 Path(id): Path<String>,
1567) -> Result<Json<ResetBudgetResult>, ApiError> {
1568 let bid = parse_budget_id(&id)?;
1570 let args = ff_core::contracts::ResetBudgetArgs {
1571 budget_id: bid,
1572 now: ff_core::types::TimestampMs::now(),
1573 };
1574 Ok(Json(server.backend().reset_budget(args).await?))
1575}
1576
1577async fn create_quota_policy(
1578 State(server): State<Arc<Server>>,
1579 AppJson(args): AppJson<CreateQuotaPolicyArgs>,
1580) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
1581 let result = server.backend().create_quota_policy(args).await?;
1583 let status = match &result {
1584 CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
1585 CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
1586 };
1587 Ok((status, Json(result)))
1588}
1589
1590#[derive(Serialize)]
1593struct HealthResponse {
1594 status: &'static str,
1595}
1596
1597async fn healthz(
1598 State(server): State<Arc<Server>>,
1599) -> Result<Json<HealthResponse>, ApiError> {
1600 server
1604 .backend()
1605 .ping()
1606 .await
1607 .map_err(|e| ApiError(ServerError::Engine(Box::new(e))))?;
1608 Ok(Json(HealthResponse { status: "ok" }))
1609}
1610
1611fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1615 if body_id != path_id {
1616 return Err(ApiError(ServerError::InvalidInput(format!(
1617 "path {id_name} does not match body {id_name}"
1618 ))));
1619 }
1620 Ok(())
1621}
1622
1623fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1624 ExecutionId::parse(s)
1625 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1626}
1627
1628fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1629 FlowId::parse(s)
1630 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1631}
1632
1633fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1634 BudgetId::parse(s)
1635 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1636}
1637
1638#[cfg(test)]
1639mod cors_tests {
1640 use super::*;
1647 use axum::body::Body;
1648 use axum::http::{Request, StatusCode};
1649 use axum::routing::get;
1650 use tower::ServiceExt;
1651
1652 fn app_with_cors(origins: &[String], auth_enabled: bool) -> Router {
1653 let cors = build_cors_layer(origins, auth_enabled)
1654 .expect("build_cors_layer succeeds for valid inputs");
1655 Router::new().route("/noop", get(|| async { "ok" })).layer(cors)
1656 }
1657
1658 #[test]
1659 fn all_origins_invalid_returns_config_error_instead_of_permissive() {
1660 let err = build_cors_layer(&["bad\0origin".to_owned()], false).unwrap_err();
1668 let ConfigError::InvalidValue { var, message } = err;
1669 assert_eq!(var, "FF_CORS_ORIGINS");
1670 assert!(
1671 message.contains("all configured origins failed to parse"),
1672 "message was: {message}"
1673 );
1674 }
1675
1676 #[test]
1677 fn wildcard_still_returns_permissive() {
1678 let layer = build_cors_layer(&["*".to_owned()], false);
1680 assert!(layer.is_ok());
1681 }
1682
1683 #[test]
1684 fn empty_origins_returns_empty_allowlist_ok() {
1685 let layer = build_cors_layer(&[], false);
1689 assert!(layer.is_ok());
1690 }
1691
1692 #[test]
1693 fn mixed_valid_and_invalid_keeps_valid_entries() {
1694 let layer = build_cors_layer(
1697 &["https://ok.example.com".to_owned(), "bad\0origin".to_owned()],
1698 false,
1699 );
1700 assert!(layer.is_ok());
1701 }
1702
1703 #[tokio::test]
1704 async fn preflight_allows_authorization_when_auth_enabled() {
1705 let app = app_with_cors(&["https://client.example.com".to_owned()], true);
1708
1709 let req = Request::builder()
1710 .method("OPTIONS")
1711 .uri("/noop")
1712 .header("origin", "https://client.example.com")
1713 .header("access-control-request-method", "GET")
1714 .header("access-control-request-headers", "authorization,content-type")
1715 .body(Body::empty())
1716 .unwrap();
1717
1718 let resp = app.oneshot(req).await.unwrap();
1719 assert_eq!(resp.status(), StatusCode::OK);
1720
1721 let allow_headers = resp
1722 .headers()
1723 .get("access-control-allow-headers")
1724 .expect("access-control-allow-headers present")
1725 .to_str()
1726 .unwrap()
1727 .to_ascii_lowercase();
1728 assert!(
1729 allow_headers.contains("authorization"),
1730 "expected `authorization` in Access-Control-Allow-Headers, got: {allow_headers}"
1731 );
1732 assert!(
1733 allow_headers.contains("content-type"),
1734 "expected `content-type` in Access-Control-Allow-Headers, got: {allow_headers}"
1735 );
1736 }
1737
1738 #[tokio::test]
1739 async fn preflight_omits_authorization_when_auth_disabled() {
1740 let app = app_with_cors(&["https://client.example.com".to_owned()], false);
1743
1744 let req = Request::builder()
1745 .method("OPTIONS")
1746 .uri("/noop")
1747 .header("origin", "https://client.example.com")
1748 .header("access-control-request-method", "GET")
1749 .header("access-control-request-headers", "content-type")
1750 .body(Body::empty())
1751 .unwrap();
1752
1753 let resp = app.oneshot(req).await.unwrap();
1754 assert_eq!(resp.status(), StatusCode::OK);
1755
1756 let allow_headers = resp
1757 .headers()
1758 .get("access-control-allow-headers")
1759 .expect("access-control-allow-headers present")
1760 .to_str()
1761 .unwrap()
1762 .to_ascii_lowercase();
1763 assert!(
1764 !allow_headers.contains("authorization"),
1765 "Authorization should not be advertised when auth is off; got: {allow_headers}"
1766 );
1767 assert!(allow_headers.contains("content-type"));
1768 }
1769}
1770
1771#[cfg(test)]
1772mod claim_grant_dto_tests {
1773 use super::*;
1779 use ff_core::contracts::ClaimGrant;
1780 use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1781 use ff_core::types::{ExecutionId, FlowId};
1782
1783 fn sample_grant(family: PartitionFamily) -> ClaimGrant {
1784 let config = ff_core::partition::PartitionConfig::default();
1785 let fid = FlowId::new();
1786 let eid = ExecutionId::for_flow(&fid, &config);
1787 let p = Partition { family, index: 7 };
1788 ClaimGrant {
1789 execution_id: eid,
1790 partition_key: PartitionKey::from(&p),
1791 grant_key: "ff:exec:{fp:7}:deadbeef:claim_grant".to_owned(),
1792 expires_at_ms: 1_700_000_000_000,
1793 }
1794 }
1795
1796 #[test]
1797 fn claim_grant_dto_emits_opaque_partition_key() {
1798 let dto = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1799 let json = serde_json::to_value(&dto).unwrap();
1800 assert_eq!(json["partition_key"], serde_json::json!("{fp:7}"));
1803 assert!(json.get("partition_family").is_none());
1804 assert!(json.get("partition_index").is_none());
1805 assert_eq!(json["expires_at_ms"], serde_json::json!(1_700_000_000_000u64));
1806 }
1807
1808 #[test]
1809 fn claim_grant_dto_collapses_execution_alias_on_wire() {
1810 let dto_flow = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1814 let dto_exec = ClaimGrantDto::from(sample_grant(PartitionFamily::Execution));
1815 let jf = serde_json::to_value(&dto_flow).unwrap();
1816 let je = serde_json::to_value(&dto_exec).unwrap();
1817 assert_eq!(jf["partition_key"], je["partition_key"]);
1818 }
1819}