1use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8 extract::{DefaultBodyLimit, MatchedPath, Path, Query, Request, State},
9 http::StatusCode,
10 middleware,
11 response::{IntoResponse, Response},
12 routing::{get, post, put, MethodRouter},
13 Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::Method;
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::config::ConfigError;
25use crate::server::{Server, ServerError};
26
27const BODY_LIMIT_LARGE_PAYLOAD: usize = 1024 * 1024; const BODY_LIMIT_MEDIUM_PAYLOAD: usize = 256 * 1024; const BODY_LIMIT_CONTROL: usize = 64 * 1024; #[derive(Clone, Copy)]
76struct BodyLimit {
77 bytes: usize,
78}
79
80fn with_body_limit(route: MethodRouter<Arc<Server>>, bytes: usize) -> MethodRouter<Arc<Server>> {
105 let r: MethodRouter<Arc<Server>> =
106 route.layer(tower_http::limit::RequestBodyLimitLayer::new(bytes));
107 let r: MethodRouter<Arc<Server>> = r.layer(DefaultBodyLimit::max(bytes));
108 r.layer(middleware::from_fn(
109 move |req: Request, next: middleware::Next| enforce_body_limit(bytes, req, next),
110 ))
111}
112
113async fn enforce_body_limit(bytes: usize, mut req: Request, next: middleware::Next) -> Response {
119 if let Some(content_length) = req
120 .headers()
121 .get(axum::http::header::CONTENT_LENGTH)
122 .and_then(|v| v.to_str().ok())
123 .and_then(|s| s.parse::<usize>().ok())
124 && content_length > bytes
125 {
126 let route = req
127 .extensions()
128 .get::<MatchedPath>()
129 .map(|m| m.as_str().to_owned())
130 .unwrap_or_default();
131 let body = PayloadTooLargeBody {
132 error: "payload_too_large",
133 limit_bytes: bytes,
134 route,
135 };
136 return (StatusCode::PAYLOAD_TOO_LARGE, Json(body)).into_response();
137 }
138 req.extensions_mut().insert(BodyLimit { bytes });
139 next.run(req).await
140}
141
142#[derive(Serialize)]
144struct PayloadTooLargeBody {
145 error: &'static str,
146 limit_bytes: usize,
147 route: String,
148}
149
150struct AppJson<T>(T);
153
154impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
155where
156 T: serde::de::DeserializeOwned + Send,
157 S: Send + Sync,
158{
159 type Rejection = Response;
160
161 async fn from_request(
162 req: axum::extract::Request,
163 state: &S,
164 ) -> Result<Self, Self::Rejection> {
165 let limit = req.extensions().get::<BodyLimit>().copied();
170 let matched = req.extensions().get::<MatchedPath>().cloned();
171
172 match Json::<T>::from_request(req, state).await {
173 Ok(Json(value)) => Ok(AppJson(value)),
174 Err(rejection) => {
175 let status = rejection.status();
176 tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
177 if status == StatusCode::PAYLOAD_TOO_LARGE {
178 let limit_bytes = limit.map(|l| l.bytes).unwrap_or(0);
179 let route = matched
180 .as_ref()
181 .map(|m| m.as_str().to_owned())
182 .unwrap_or_default();
183 let body = PayloadTooLargeBody {
184 error: "payload_too_large",
185 limit_bytes,
186 route,
187 };
188 return Err((status, Json(body)).into_response());
189 }
190 let body = ErrorBody::plain(format!(
191 "invalid JSON: {}",
192 status.canonical_reason().unwrap_or("bad request"),
193 ));
194 Err((status, Json(body)).into_response())
195 }
196 }
197 }
198}
199
200struct ApiError(ServerError);
203
204impl From<ServerError> for ApiError {
205 fn from(e: ServerError) -> Self {
206 Self(e)
207 }
208}
209
210#[derive(Serialize)]
215struct ErrorBody {
216 error: String,
217 #[serde(skip_serializing_if = "Option::is_none")]
218 kind: Option<String>,
219 #[serde(skip_serializing_if = "Option::is_none")]
220 retryable: Option<bool>,
221}
222
223impl ErrorBody {
224 fn plain(error: String) -> Self {
225 Self { error, kind: None, retryable: None }
226 }
227}
228
229impl IntoResponse for ApiError {
230 fn into_response(self) -> Response {
231 let (status, body) = match &self.0 {
232 ServerError::NotFound(msg) => {
233 (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
234 }
235 ServerError::InvalidInput(msg) => {
236 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
237 }
238 ServerError::OperationFailed(msg) => {
239 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
240 }
241 ServerError::ConcurrencyLimitExceeded(source, max) => (
242 StatusCode::TOO_MANY_REQUESTS,
243 ErrorBody {
244 error: format!(
245 "too many concurrent {source} calls (server max: {max}); retry with backoff"
246 ),
247 kind: None,
248 retryable: Some(true),
249 },
250 ),
251 ServerError::Backend(be) => {
252 let kind_str = be.kind().as_stable_str();
253 tracing::error!(
254 kind = kind_str,
255 message = be.message(),
256 "backend error"
257 );
258 (
259 StatusCode::INTERNAL_SERVER_ERROR,
260 ErrorBody {
261 error: self.0.to_string(),
262 kind: Some(kind_str.to_owned()),
263 retryable: Some(self.0.is_retryable()),
264 },
265 )
266 }
267 ServerError::BackendContext { source, context } => {
268 let kind_str = source.kind().as_stable_str();
269 tracing::error!(
270 kind = kind_str,
271 message = source.message(),
272 context = %context,
273 "backend error"
274 );
275 (
276 StatusCode::INTERNAL_SERVER_ERROR,
277 ErrorBody {
278 error: self.0.to_string(),
279 kind: Some(kind_str.to_owned()),
280 retryable: Some(self.0.is_retryable()),
281 },
282 )
283 }
284 ServerError::LibraryLoad(load_err) => {
285 let kind_str = load_err
286 .valkey_kind()
287 .map(ff_backend_valkey::classify_ferriskey_kind)
288 .map(|k| k.as_stable_str());
289 tracing::error!(
290 kind = kind_str.unwrap_or(""),
291 error = %load_err,
292 "library load failure"
293 );
294 (
295 StatusCode::INTERNAL_SERVER_ERROR,
296 ErrorBody {
297 error: format!("library load: {load_err}"),
298 kind: kind_str.map(str::to_owned),
299 retryable: Some(self.0.is_retryable()),
300 },
301 )
302 }
303 other => (
307 StatusCode::INTERNAL_SERVER_ERROR,
308 ErrorBody {
309 error: other.to_string(),
310 kind: None,
311 retryable: Some(false),
312 },
313 ),
314 };
315 (status, Json(body)).into_response()
316 }
317}
318
319pub fn router(
322 server: Arc<Server>,
323 cors_origins: &[String],
324 api_token: Option<String>,
325) -> Result<Router, ConfigError> {
326 router_with_metrics(server, cors_origins, api_token, None)
327}
328
329pub fn router_with_metrics(
338 server: Arc<Server>,
339 cors_origins: &[String],
340 api_token: Option<String>,
341 #[cfg_attr(not(feature = "observability"), allow(unused_variables))]
342 metrics: Option<Arc<crate::Metrics>>,
343) -> Result<Router, ConfigError> {
344 let auth_enabled = api_token.is_some();
345 let cors = build_cors_layer(cors_origins, auth_enabled)?;
346
347 let mut app = Router::new()
351 .route(
353 "/v1/executions",
354 with_body_limit(
355 get(list_executions).post(create_execution),
356 BODY_LIMIT_LARGE_PAYLOAD,
357 ),
358 )
359 .route("/v1/executions/{id}", get(get_execution))
360 .route("/v1/executions/{id}/state", get(get_execution_state))
361 .route(
362 "/v1/executions/{id}/pending-waitpoints",
363 get(list_pending_waitpoints),
364 )
365 .route("/v1/executions/{id}/result", get(get_execution_result))
366 .route(
367 "/v1/executions/{id}/cancel",
368 with_body_limit(post(cancel_execution), BODY_LIMIT_CONTROL),
369 )
370 .route(
371 "/v1/executions/{id}/signal",
372 with_body_limit(post(deliver_signal), BODY_LIMIT_MEDIUM_PAYLOAD),
373 )
374 .route(
375 "/v1/executions/{id}/priority",
376 with_body_limit(put(change_priority), BODY_LIMIT_CONTROL),
377 )
378 .route(
379 "/v1/executions/{id}/replay",
380 with_body_limit(post(replay_execution), BODY_LIMIT_CONTROL),
381 )
382 .route(
383 "/v1/executions/{id}/revoke-lease",
384 with_body_limit(post(revoke_lease), BODY_LIMIT_CONTROL),
385 )
386 .route(
391 "/v1/workers/{worker_id}/claim",
392 with_body_limit(post(claim_for_worker), BODY_LIMIT_CONTROL),
393 )
394 .route(
396 "/v1/executions/{id}/attempts/{idx}/stream",
397 get(read_attempt_stream),
398 )
399 .route(
400 "/v1/executions/{id}/attempts/{idx}/stream/tail",
401 get(tail_attempt_stream),
402 )
403 .route(
405 "/v1/flows",
406 with_body_limit(post(create_flow), BODY_LIMIT_CONTROL),
407 )
408 .route(
409 "/v1/flows/{id}/members",
410 with_body_limit(post(add_execution_to_flow), BODY_LIMIT_CONTROL),
411 )
412 .route(
413 "/v1/flows/{id}/cancel",
414 with_body_limit(post(cancel_flow), BODY_LIMIT_CONTROL),
415 )
416 .route(
417 "/v1/flows/{id}/edges",
418 with_body_limit(post(stage_dependency_edge), BODY_LIMIT_CONTROL),
419 )
420 .route(
421 "/v1/flows/{id}/edges/apply",
422 with_body_limit(post(apply_dependency_to_child), BODY_LIMIT_CONTROL),
423 )
424 .route(
426 "/v1/budgets",
427 with_body_limit(post(create_budget), BODY_LIMIT_CONTROL),
428 )
429 .route("/v1/budgets/{id}", get(get_budget_status))
430 .route(
431 "/v1/budgets/{id}/usage",
432 with_body_limit(post(report_usage), BODY_LIMIT_CONTROL),
433 )
434 .route(
435 "/v1/budgets/{id}/reset",
436 with_body_limit(post(reset_budget), BODY_LIMIT_CONTROL),
437 )
438 .route(
440 "/v1/quotas",
441 with_body_limit(post(create_quota_policy), BODY_LIMIT_CONTROL),
442 )
443 .route(
445 "/v1/admin/rotate-waitpoint-secret",
446 with_body_limit(post(rotate_waitpoint_secret), BODY_LIMIT_CONTROL),
447 )
448 .route("/healthz", get(healthz));
450
451 if let Some(token) = api_token {
452 let token = Arc::new(token);
453 app = app.layer(middleware::from_fn(move |req, next| {
454 let token = token.clone();
455 auth_middleware(token, req, next)
456 }));
457 }
458
459 #[cfg(feature = "observability")]
465 if let Some(m) = metrics.as_ref() {
466 let m = m.clone();
467 app = app.layer(middleware::from_fn_with_state(
468 m,
469 crate::metrics::http_middleware,
470 ));
471 }
472
473 #[cfg_attr(not(feature = "observability"), allow(unused_mut))]
474 let mut app = app
475 .layer(TraceLayer::new_for_http())
476 .layer(cors)
477 .with_state(server);
478
479 #[cfg(feature = "observability")]
486 if let Some(m) = metrics {
487 let metrics_router: Router = Router::new()
488 .route("/metrics", get(crate::metrics::metrics_handler))
489 .with_state(m);
490 app = app.merge(metrics_router);
491 }
492
493 Ok(app)
494}
495
496async fn auth_middleware(
497 token: Arc<String>,
498 req: Request,
499 next: middleware::Next,
500) -> Response {
501 if req.uri().path() == "/healthz" {
502 return next.run(req).await;
503 }
504
505 let auth_header = req
506 .headers()
507 .get("authorization")
508 .and_then(|v| v.to_str().ok());
509
510 let authorized = auth_header
511 .and_then(|v| v.strip_prefix("Bearer "))
512 .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
513
514 if authorized {
515 next.run(req).await
516 } else {
517 (
518 StatusCode::UNAUTHORIZED,
519 Json(ErrorBody::plain(
520 "missing or invalid Authorization header".to_owned(),
521 )),
522 )
523 .into_response()
524 }
525}
526
527fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
528 if a.len() != b.len() {
529 return false;
530 }
531 let mut diff = 0u8;
532 for (x, y) in a.iter().zip(b.iter()) {
533 diff |= x ^ y;
534 }
535 diff == 0
536}
537
538fn build_cors_layer(origins: &[String], auth_enabled: bool) -> Result<CorsLayer, ConfigError> {
549 if origins.iter().any(|o| o == "*") {
550 return Ok(CorsLayer::permissive());
551 }
552
553 let mut parsed = Vec::with_capacity(origins.len());
554 let mut accepted = Vec::with_capacity(origins.len());
555 let mut invalid = Vec::new();
556 for o in origins {
557 match o.parse() {
558 Ok(v) => {
559 parsed.push(v);
560 accepted.push(o.as_str());
561 }
562 Err(_) => invalid.push(o.clone()),
563 }
564 }
565
566 if parsed.is_empty() && !origins.is_empty() {
567 return Err(ConfigError::InvalidValue {
568 var: "FF_CORS_ORIGINS".to_owned(),
569 message: format!(
570 "all configured origins failed to parse as valid HTTP header values: {:?}; \
571 refusing to fall back to permissive CORS",
572 origins
573 ),
574 });
575 }
576
577 if !invalid.is_empty() {
578 tracing::warn!(
581 ?invalid,
582 ?accepted,
583 "some FF_CORS_ORIGINS entries failed to parse and were dropped"
584 );
585 }
586
587 let mut headers = vec![axum::http::header::CONTENT_TYPE];
590 if auth_enabled {
591 headers.push(axum::http::header::AUTHORIZATION);
592 }
593
594 Ok(CorsLayer::new()
595 .allow_origin(AllowOrigin::list(parsed))
596 .allow_methods([Method::GET, Method::POST, Method::PUT])
597 .allow_headers(headers))
598}
599
600#[derive(Deserialize)]
603struct ListExecutionsParams {
604 partition: u16,
607 #[serde(default)]
610 cursor: Option<String>,
611 #[serde(default = "default_limit")]
612 limit: u64,
613}
614
615fn default_limit() -> u64 { 50 }
616
617async fn list_executions(
630 State(server): State<Arc<Server>>,
631 Query(params): Query<ListExecutionsParams>,
632) -> Result<Json<ListExecutionsPage>, ApiError> {
633 let limit = params.limit.min(1000) as usize;
634 let cursor = match params.cursor {
635 Some(raw) if !raw.is_empty() => Some(
636 ff_core::types::ExecutionId::parse(&raw).map_err(|e| {
637 ApiError::from(ServerError::InvalidInput(format!(
638 "invalid cursor: {e}"
639 )))
640 })?,
641 ),
642 _ => None,
643 };
644 let result = server
645 .list_executions_page(params.partition, cursor, limit)
646 .await?;
647 Ok(Json(result))
648}
649
650async fn create_execution(
651 State(server): State<Arc<Server>>,
652 AppJson(args): AppJson<CreateExecutionArgs>,
653) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
654 let result = server.create_execution(&args).await?;
655 let status = match &result {
656 CreateExecutionResult::Created { .. } => StatusCode::CREATED,
657 CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
658 };
659 Ok((status, Json(result)))
660}
661
662async fn get_execution(
663 State(server): State<Arc<Server>>,
664 Path(id): Path<String>,
665) -> Result<Json<ExecutionInfo>, ApiError> {
666 let eid = parse_execution_id(&id)?;
667 Ok(Json(server.get_execution(&eid).await?))
668}
669
670async fn get_execution_state(
671 State(server): State<Arc<Server>>,
672 Path(id): Path<String>,
673) -> Result<Json<PublicState>, ApiError> {
674 let eid = parse_execution_id(&id)?;
675 Ok(Json(server.get_execution_state(&eid).await?))
676}
677
678async fn list_pending_waitpoints(
691 State(server): State<Arc<Server>>,
692 Path(id): Path<String>,
693) -> Result<Json<Vec<PendingWaitpointInfo>>, ApiError> {
694 let eid = parse_execution_id(&id)?;
695 Ok(Json(server.list_pending_waitpoints(&eid).await?))
696}
697
698async fn get_execution_result(
733 State(server): State<Arc<Server>>,
734 Path(id): Path<String>,
735) -> Result<Response, ApiError> {
736 let eid = parse_execution_id(&id)?;
737 match server.get_execution_result(&eid).await? {
738 Some(bytes) => Ok((
739 StatusCode::OK,
740 [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
741 bytes,
742 )
743 .into_response()),
744 None => Err(ApiError(ServerError::NotFound(format!(
745 "execution result not found: {eid}"
746 )))),
747 }
748}
749
750async fn cancel_execution(
751 State(server): State<Arc<Server>>,
752 Path(id): Path<String>,
753 AppJson(mut args): AppJson<CancelExecutionArgs>,
754) -> Result<Json<CancelExecutionResult>, ApiError> {
755 let path_eid = parse_execution_id(&id)?;
756 check_id_match(&path_eid, &args.execution_id, "execution_id")?;
757 args.execution_id = path_eid;
758 Ok(Json(server.cancel_execution(&args).await?))
759}
760
761async fn deliver_signal(
762 State(server): State<Arc<Server>>,
763 Path(id): Path<String>,
764 AppJson(mut args): AppJson<DeliverSignalArgs>,
765) -> Result<Json<DeliverSignalResult>, ApiError> {
766 let path_eid = parse_execution_id(&id)?;
767 check_id_match(&path_eid, &args.execution_id, "execution_id")?;
768 args.execution_id = path_eid;
769 Ok(Json(server.deliver_signal(&args).await?))
770}
771
772#[derive(Deserialize)]
775struct RotateWaitpointSecretBody {
776 new_kid: String,
777 new_secret_hex: String,
779}
780
781const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
795
796async fn rotate_waitpoint_secret(
797 State(server): State<Arc<Server>>,
798 AppJson(body): AppJson<RotateWaitpointSecretBody>,
799) -> Result<Response, ApiError> {
800 let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
805 let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
806 Ok(r) => r?,
807 Err(_) => {
808 tracing::error!(
809 target: "audit",
810 new_kid = %body.new_kid,
811 timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
812 "waitpoint_hmac_rotation_timeout_http_504"
813 );
814 let body = ErrorBody::plain(format!(
815 "rotation exceeded {}s server-side timeout; retry is safe \
816 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
817 ROTATE_HTTP_TIMEOUT.as_secs()
818 ));
819 return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
820 }
821 };
822 if result.rotated == 0 && result.failed.is_empty() {
832 return Err(ApiError::from(ServerError::OperationFailed(
833 "rotation had no partitions to operate on \
834 (num_flow_partitions is 0 — server misconfigured)"
835 .to_owned(),
836 )));
837 }
838 if result.rotated == 0 && !result.failed.is_empty() {
839 return Err(ApiError::from(ServerError::OperationFailed(
840 "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
841 )));
842 }
843 Ok(Json(result).into_response())
844}
845
846#[derive(Deserialize)]
847struct ChangePriorityBody {
848 new_priority: i32,
849}
850
851async fn change_priority(
852 State(server): State<Arc<Server>>,
853 Path(id): Path<String>,
854 AppJson(body): AppJson<ChangePriorityBody>,
855) -> Result<Json<ChangePriorityResult>, ApiError> {
856 let eid = parse_execution_id(&id)?;
857 Ok(Json(server.change_priority(&eid, body.new_priority).await?))
858}
859
860async fn replay_execution(
861 State(server): State<Arc<Server>>,
862 Path(id): Path<String>,
863) -> Result<Json<ReplayExecutionResult>, ApiError> {
864 let eid = parse_execution_id(&id)?;
865 Ok(Json(server.replay_execution(&eid).await?))
866}
867
868async fn revoke_lease(
869 State(server): State<Arc<Server>>,
870 Path(id): Path<String>,
871) -> Result<Json<RevokeLeaseResult>, ApiError> {
872 let eid = parse_execution_id(&id)?;
873 Ok(Json(server.revoke_lease(&eid).await?))
874}
875
876#[derive(Deserialize)]
886struct ClaimForWorkerBody {
887 lane_id: String,
888 worker_instance_id: String,
889 #[serde(default)]
893 capabilities: Vec<String>,
894 grant_ttl_ms: u64,
897}
898
899#[derive(Serialize)]
903struct ClaimGrantDto {
904 execution_id: String,
905 partition_key: ff_core::partition::PartitionKey,
906 grant_key: String,
907 expires_at_ms: u64,
908}
909
910impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
911 fn from(g: ff_core::contracts::ClaimGrant) -> Self {
912 Self {
913 execution_id: g.execution_id.to_string(),
914 partition_key: g.partition_key,
915 grant_key: g.grant_key,
916 expires_at_ms: g.expires_at_ms,
917 }
918 }
919}
920
921const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
925
926fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
931 if value.is_empty() {
932 return Err(ApiError(ServerError::InvalidInput(format!(
933 "{field}: must not be empty"
934 ))));
935 }
936 if value.len() > 256 {
937 return Err(ApiError(ServerError::InvalidInput(format!(
938 "{field}: exceeds 256 bytes (got {})",
939 value.len()
940 ))));
941 }
942 if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
943 return Err(ApiError(ServerError::InvalidInput(format!(
944 "{field}: must not contain whitespace or control characters"
945 ))));
946 }
947 Ok(())
948}
949
950async fn claim_for_worker(
951 State(server): State<Arc<Server>>,
952 Path(worker_id): Path<String>,
953 AppJson(body): AppJson<ClaimForWorkerBody>,
954) -> Result<Response, ApiError> {
955 validate_identifier("worker_id", &worker_id)?;
956 validate_identifier("worker_instance_id", &body.worker_instance_id)?;
957 let worker_id = WorkerId::new(worker_id);
958 let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
959 let lane = LaneId::try_new(body.lane_id).map_err(|e| {
960 ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
961 })?;
962 if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
963 return Err(ApiError(ServerError::InvalidInput(format!(
964 "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
965 ))));
966 }
967 let caps: std::collections::BTreeSet<String> =
968 body.capabilities.into_iter().collect();
969
970 match server
971 .claim_for_worker(
972 &lane,
973 &worker_id,
974 &worker_instance_id,
975 &caps,
976 body.grant_ttl_ms,
977 )
978 .await?
979 {
980 Some(grant) => Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response()),
981 None => Ok(StatusCode::NO_CONTENT.into_response()),
982 }
983}
984
985#[derive(Deserialize)]
988struct ReadStreamParams {
989 #[serde(default = "ff_core::contracts::StreamCursor::start")]
990 from: ff_core::contracts::StreamCursor,
991 #[serde(default = "ff_core::contracts::StreamCursor::end")]
992 to: ff_core::contracts::StreamCursor,
993 #[serde(default = "default_read_limit")]
994 limit: u64,
995}
996
997fn default_read_limit() -> u64 { 100 }
998
999#[derive(Serialize)]
1000struct ReadStreamResponse {
1001 frames: Vec<StreamFrame>,
1002 count: usize,
1003 #[serde(skip_serializing_if = "Option::is_none")]
1008 closed_at: Option<i64>,
1009 #[serde(skip_serializing_if = "Option::is_none")]
1012 closed_reason: Option<String>,
1013}
1014
1015impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
1016 fn from(sf: ff_core::contracts::StreamFrames) -> Self {
1017 let count = sf.frames.len();
1018 Self {
1019 frames: sf.frames,
1020 count,
1021 closed_at: sf.closed_at.map(|t| t.0),
1022 closed_reason: sf.closed_reason,
1023 }
1024 }
1025}
1026
1027const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
1037
1038async fn read_attempt_stream(
1039 State(server): State<Arc<Server>>,
1040 Path((id, idx)): Path<(String, u32)>,
1041 Query(params): Query<ReadStreamParams>,
1042) -> Result<Json<ReadStreamResponse>, ApiError> {
1043 if params.limit == 0 {
1044 return Err(ApiError(ServerError::InvalidInput(
1045 "limit must be >= 1".to_owned(),
1046 )));
1047 }
1048 if params.limit > REST_STREAM_LIMIT_CEILING {
1049 return Err(ApiError(ServerError::InvalidInput(format!(
1050 "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
1051 ))));
1052 }
1053 let eid = parse_execution_id(&id)?;
1054 let attempt_index = AttemptIndex::new(idx);
1055 let result = server
1056 .read_attempt_stream(
1057 &eid,
1058 attempt_index,
1059 params.from.to_wire(),
1060 params.to.to_wire(),
1061 params.limit,
1062 )
1063 .await?;
1064 Ok(Json(result.into()))
1065}
1066
1067#[derive(Deserialize)]
1068struct TailStreamParams {
1069 #[serde(default = "ff_core::contracts::StreamCursor::beginning")]
1070 after: ff_core::contracts::StreamCursor,
1071 #[serde(default)]
1072 block_ms: u64,
1073 #[serde(default = "default_tail_limit")]
1074 limit: u64,
1075}
1076
1077fn default_tail_limit() -> u64 { 50 }
1078
1079const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1088
1089async fn tail_attempt_stream(
1090 State(server): State<Arc<Server>>,
1091 Path((id, idx)): Path<(String, u32)>,
1092 Query(params): Query<TailStreamParams>,
1093) -> Result<Json<ReadStreamResponse>, ApiError> {
1094 if params.block_ms > MAX_TAIL_BLOCK_MS {
1095 return Err(ApiError(ServerError::InvalidInput(format!(
1096 "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
1097 ))));
1098 }
1099 if params.limit == 0 {
1100 return Err(ApiError(ServerError::InvalidInput(
1101 "limit must be >= 1".to_owned(),
1102 )));
1103 }
1104 if params.limit > REST_STREAM_LIMIT_CEILING {
1105 return Err(ApiError(ServerError::InvalidInput(format!(
1106 "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
1107 ))));
1108 }
1109 if !params.after.is_concrete() {
1116 return Err(ApiError(ServerError::InvalidInput(
1117 "after: XREAD cursor must be a concrete entry id; pass '0-0' to start from the beginning"
1118 .to_owned(),
1119 )));
1120 }
1121
1122 let eid = parse_execution_id(&id)?;
1123 let attempt_index = AttemptIndex::new(idx);
1124 let result = server
1125 .tail_attempt_stream(
1126 &eid,
1127 attempt_index,
1128 params.after.to_wire(),
1129 params.block_ms,
1130 params.limit,
1131 )
1132 .await?;
1133 Ok(Json(result.into()))
1134}
1135
1136async fn create_flow(
1139 State(server): State<Arc<Server>>,
1140 AppJson(args): AppJson<CreateFlowArgs>,
1141) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
1142 let result = server.create_flow(&args).await?;
1143 let status = match &result {
1144 CreateFlowResult::Created { .. } => StatusCode::CREATED,
1145 CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
1146 };
1147 Ok((status, Json(result)))
1148}
1149
1150async fn add_execution_to_flow(
1151 State(server): State<Arc<Server>>,
1152 Path(id): Path<String>,
1153 AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
1154) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
1155 let path_fid = parse_flow_id(&id)?;
1156 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1157 args.flow_id = path_fid;
1158 let result = server.add_execution_to_flow(&args).await?;
1159 let status = match &result {
1160 AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
1161 AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
1162 };
1163 Ok((status, Json(result)))
1164}
1165
1166async fn cancel_flow(
1179 State(server): State<Arc<Server>>,
1180 Path(id): Path<String>,
1181 Query(params): Query<HashMap<String, String>>,
1182 AppJson(mut args): AppJson<CancelFlowArgs>,
1183) -> Result<Json<CancelFlowResult>, ApiError> {
1184 let path_fid = parse_flow_id(&id)?;
1185 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1186 args.flow_id = path_fid;
1187 let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
1188 let result = if wait {
1189 server.cancel_flow_wait(&args).await?
1190 } else {
1191 server.cancel_flow(&args).await?
1192 };
1193 Ok(Json(result))
1194}
1195
1196async fn stage_dependency_edge(
1197 State(server): State<Arc<Server>>,
1198 Path(id): Path<String>,
1199 AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
1200) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
1201 let path_fid = parse_flow_id(&id)?;
1202 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1203 args.flow_id = path_fid;
1204 let result = server.stage_dependency_edge(&args).await?;
1205 Ok((StatusCode::CREATED, Json(result)))
1206}
1207
1208async fn apply_dependency_to_child(
1209 State(server): State<Arc<Server>>,
1210 Path(id): Path<String>,
1211 AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
1212) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
1213 let path_fid = parse_flow_id(&id)?;
1214 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1215 args.flow_id = path_fid;
1216 Ok(Json(server.apply_dependency_to_child(&args).await?))
1217}
1218
1219async fn create_budget(
1222 State(server): State<Arc<Server>>,
1223 AppJson(args): AppJson<CreateBudgetArgs>,
1224) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
1225 let result = server.create_budget(&args).await?;
1226 let status = match &result {
1227 CreateBudgetResult::Created { .. } => StatusCode::CREATED,
1228 CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
1229 };
1230 Ok((status, Json(result)))
1231}
1232
1233async fn get_budget_status(
1234 State(server): State<Arc<Server>>,
1235 Path(id): Path<String>,
1236) -> Result<Json<BudgetStatus>, ApiError> {
1237 let bid = parse_budget_id(&id)?;
1238 Ok(Json(server.get_budget_status(&bid).await?))
1239}
1240
1241#[derive(Deserialize)]
1242struct ReportUsageBody {
1243 dimensions: HashMap<String, u64>,
1244 now: ff_core::types::TimestampMs,
1245 #[serde(default)]
1246 dedup_key: Option<String>,
1247}
1248
1249async fn report_usage(
1250 State(server): State<Arc<Server>>,
1251 Path(id): Path<String>,
1252 AppJson(body): AppJson<ReportUsageBody>,
1253) -> Result<Json<ReportUsageResult>, ApiError> {
1254 let bid = parse_budget_id(&id)?;
1255 let dims: Vec<String> = body.dimensions.keys().cloned().collect();
1256 let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
1257 let args = ReportUsageArgs {
1258 dimensions: dims,
1259 deltas,
1260 now: body.now,
1261 dedup_key: body.dedup_key,
1262 };
1263 Ok(Json(server.report_usage(&bid, &args).await?))
1264}
1265
1266async fn reset_budget(
1267 State(server): State<Arc<Server>>,
1268 Path(id): Path<String>,
1269) -> Result<Json<ResetBudgetResult>, ApiError> {
1270 let bid = parse_budget_id(&id)?;
1271 Ok(Json(server.reset_budget(&bid).await?))
1272}
1273
1274async fn create_quota_policy(
1275 State(server): State<Arc<Server>>,
1276 AppJson(args): AppJson<CreateQuotaPolicyArgs>,
1277) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
1278 let result = server.create_quota_policy(&args).await?;
1279 let status = match &result {
1280 CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
1281 CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
1282 };
1283 Ok((status, Json(result)))
1284}
1285
1286#[derive(Serialize)]
1289struct HealthResponse {
1290 status: &'static str,
1291}
1292
1293async fn healthz(
1294 State(server): State<Arc<Server>>,
1295) -> Result<Json<HealthResponse>, ApiError> {
1296 let _: String = server
1297 .client()
1298 .cmd("PING")
1299 .execute()
1300 .await
1301 .map_err(|e| ApiError(crate::server::backend_context(e, "healthz PING")))?;
1302 Ok(Json(HealthResponse { status: "ok" }))
1303}
1304
1305fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1309 if body_id != path_id {
1310 return Err(ApiError(ServerError::InvalidInput(format!(
1311 "path {id_name} does not match body {id_name}"
1312 ))));
1313 }
1314 Ok(())
1315}
1316
1317fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1318 ExecutionId::parse(s)
1319 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1320}
1321
1322fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1323 FlowId::parse(s)
1324 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1325}
1326
1327fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1328 BudgetId::parse(s)
1329 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1330}
1331
1332#[cfg(test)]
1333mod cors_tests {
1334 use super::*;
1341 use axum::body::Body;
1342 use axum::http::{Request, StatusCode};
1343 use axum::routing::get;
1344 use tower::ServiceExt;
1345
1346 fn app_with_cors(origins: &[String], auth_enabled: bool) -> Router {
1347 let cors = build_cors_layer(origins, auth_enabled)
1348 .expect("build_cors_layer succeeds for valid inputs");
1349 Router::new().route("/noop", get(|| async { "ok" })).layer(cors)
1350 }
1351
1352 #[test]
1353 fn all_origins_invalid_returns_config_error_instead_of_permissive() {
1354 let err = build_cors_layer(&["bad\0origin".to_owned()], false).unwrap_err();
1362 let ConfigError::InvalidValue { var, message } = err;
1363 assert_eq!(var, "FF_CORS_ORIGINS");
1364 assert!(
1365 message.contains("all configured origins failed to parse"),
1366 "message was: {message}"
1367 );
1368 }
1369
1370 #[test]
1371 fn wildcard_still_returns_permissive() {
1372 let layer = build_cors_layer(&["*".to_owned()], false);
1374 assert!(layer.is_ok());
1375 }
1376
1377 #[test]
1378 fn empty_origins_returns_empty_allowlist_ok() {
1379 let layer = build_cors_layer(&[], false);
1383 assert!(layer.is_ok());
1384 }
1385
1386 #[test]
1387 fn mixed_valid_and_invalid_keeps_valid_entries() {
1388 let layer = build_cors_layer(
1391 &["https://ok.example.com".to_owned(), "bad\0origin".to_owned()],
1392 false,
1393 );
1394 assert!(layer.is_ok());
1395 }
1396
1397 #[tokio::test]
1398 async fn preflight_allows_authorization_when_auth_enabled() {
1399 let app = app_with_cors(&["https://client.example.com".to_owned()], true);
1402
1403 let req = Request::builder()
1404 .method("OPTIONS")
1405 .uri("/noop")
1406 .header("origin", "https://client.example.com")
1407 .header("access-control-request-method", "GET")
1408 .header("access-control-request-headers", "authorization,content-type")
1409 .body(Body::empty())
1410 .unwrap();
1411
1412 let resp = app.oneshot(req).await.unwrap();
1413 assert_eq!(resp.status(), StatusCode::OK);
1414
1415 let allow_headers = resp
1416 .headers()
1417 .get("access-control-allow-headers")
1418 .expect("access-control-allow-headers present")
1419 .to_str()
1420 .unwrap()
1421 .to_ascii_lowercase();
1422 assert!(
1423 allow_headers.contains("authorization"),
1424 "expected `authorization` in Access-Control-Allow-Headers, got: {allow_headers}"
1425 );
1426 assert!(
1427 allow_headers.contains("content-type"),
1428 "expected `content-type` in Access-Control-Allow-Headers, got: {allow_headers}"
1429 );
1430 }
1431
1432 #[tokio::test]
1433 async fn preflight_omits_authorization_when_auth_disabled() {
1434 let app = app_with_cors(&["https://client.example.com".to_owned()], false);
1437
1438 let req = Request::builder()
1439 .method("OPTIONS")
1440 .uri("/noop")
1441 .header("origin", "https://client.example.com")
1442 .header("access-control-request-method", "GET")
1443 .header("access-control-request-headers", "content-type")
1444 .body(Body::empty())
1445 .unwrap();
1446
1447 let resp = app.oneshot(req).await.unwrap();
1448 assert_eq!(resp.status(), StatusCode::OK);
1449
1450 let allow_headers = resp
1451 .headers()
1452 .get("access-control-allow-headers")
1453 .expect("access-control-allow-headers present")
1454 .to_str()
1455 .unwrap()
1456 .to_ascii_lowercase();
1457 assert!(
1458 !allow_headers.contains("authorization"),
1459 "Authorization should not be advertised when auth is off; got: {allow_headers}"
1460 );
1461 assert!(allow_headers.contains("content-type"));
1462 }
1463}
1464
1465#[cfg(test)]
1466mod claim_grant_dto_tests {
1467 use super::*;
1473 use ff_core::contracts::ClaimGrant;
1474 use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1475 use ff_core::types::{ExecutionId, FlowId};
1476
1477 fn sample_grant(family: PartitionFamily) -> ClaimGrant {
1478 let config = ff_core::partition::PartitionConfig::default();
1479 let fid = FlowId::new();
1480 let eid = ExecutionId::for_flow(&fid, &config);
1481 let p = Partition { family, index: 7 };
1482 ClaimGrant {
1483 execution_id: eid,
1484 partition_key: PartitionKey::from(&p),
1485 grant_key: "ff:exec:{fp:7}:deadbeef:claim_grant".to_owned(),
1486 expires_at_ms: 1_700_000_000_000,
1487 }
1488 }
1489
1490 #[test]
1491 fn claim_grant_dto_emits_opaque_partition_key() {
1492 let dto = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1493 let json = serde_json::to_value(&dto).unwrap();
1494 assert_eq!(json["partition_key"], serde_json::json!("{fp:7}"));
1497 assert!(json.get("partition_family").is_none());
1498 assert!(json.get("partition_index").is_none());
1499 assert_eq!(json["expires_at_ms"], serde_json::json!(1_700_000_000_000u64));
1500 }
1501
1502 #[test]
1503 fn claim_grant_dto_collapses_execution_alias_on_wire() {
1504 let dto_flow = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1508 let dto_exec = ClaimGrantDto::from(sample_grant(PartitionFamily::Execution));
1509 let jf = serde_json::to_value(&dto_flow).unwrap();
1510 let je = serde_json::to_value(&dto_exec).unwrap();
1511 assert_eq!(jf["partition_key"], je["partition_key"]);
1512 }
1513}