1use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8 extract::{DefaultBodyLimit, MatchedPath, Path, Query, Request, State},
9 http::StatusCode,
10 middleware,
11 response::{IntoResponse, Response},
12 routing::{get, post, put, MethodRouter},
13 Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::Method;
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::config::ConfigError;
25use crate::server::{Server, ServerError};
26
27const BODY_LIMIT_LARGE_PAYLOAD: usize = 1024 * 1024; const BODY_LIMIT_MEDIUM_PAYLOAD: usize = 256 * 1024; const BODY_LIMIT_CONTROL: usize = 64 * 1024; #[derive(Clone, Copy)]
76struct BodyLimit {
77 bytes: usize,
78}
79
80fn with_body_limit(route: MethodRouter<Arc<Server>>, bytes: usize) -> MethodRouter<Arc<Server>> {
105 let r: MethodRouter<Arc<Server>> =
106 route.layer(tower_http::limit::RequestBodyLimitLayer::new(bytes));
107 let r: MethodRouter<Arc<Server>> = r.layer(DefaultBodyLimit::max(bytes));
108 r.layer(middleware::from_fn(
109 move |req: Request, next: middleware::Next| enforce_body_limit(bytes, req, next),
110 ))
111}
112
113async fn enforce_body_limit(bytes: usize, mut req: Request, next: middleware::Next) -> Response {
119 if let Some(content_length) = req
120 .headers()
121 .get(axum::http::header::CONTENT_LENGTH)
122 .and_then(|v| v.to_str().ok())
123 .and_then(|s| s.parse::<usize>().ok())
124 && content_length > bytes
125 {
126 let route = req
127 .extensions()
128 .get::<MatchedPath>()
129 .map(|m| m.as_str().to_owned())
130 .unwrap_or_default();
131 let body = PayloadTooLargeBody {
132 error: "payload_too_large",
133 limit_bytes: bytes,
134 route,
135 };
136 return (StatusCode::PAYLOAD_TOO_LARGE, Json(body)).into_response();
137 }
138 req.extensions_mut().insert(BodyLimit { bytes });
139 next.run(req).await
140}
141
142#[derive(Serialize)]
144struct PayloadTooLargeBody {
145 error: &'static str,
146 limit_bytes: usize,
147 route: String,
148}
149
150struct AppJson<T>(T);
153
154impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
155where
156 T: serde::de::DeserializeOwned + Send,
157 S: Send + Sync,
158{
159 type Rejection = Response;
160
161 async fn from_request(
162 req: axum::extract::Request,
163 state: &S,
164 ) -> Result<Self, Self::Rejection> {
165 let limit = req.extensions().get::<BodyLimit>().copied();
170 let matched = req.extensions().get::<MatchedPath>().cloned();
171
172 match Json::<T>::from_request(req, state).await {
173 Ok(Json(value)) => Ok(AppJson(value)),
174 Err(rejection) => {
175 let status = rejection.status();
176 tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
177 if status == StatusCode::PAYLOAD_TOO_LARGE {
178 let limit_bytes = limit.map(|l| l.bytes).unwrap_or(0);
179 let route = matched
180 .as_ref()
181 .map(|m| m.as_str().to_owned())
182 .unwrap_or_default();
183 let body = PayloadTooLargeBody {
184 error: "payload_too_large",
185 limit_bytes,
186 route,
187 };
188 return Err((status, Json(body)).into_response());
189 }
190 let body = ErrorBody::plain(format!(
191 "invalid JSON: {}",
192 status.canonical_reason().unwrap_or("bad request"),
193 ));
194 Err((status, Json(body)).into_response())
195 }
196 }
197 }
198}
199
200struct ApiError(ServerError);
203
204impl From<ServerError> for ApiError {
205 fn from(e: ServerError) -> Self {
206 Self(e)
207 }
208}
209
210#[derive(Serialize)]
214struct ErrorBody {
215 error: String,
216 #[serde(skip_serializing_if = "Option::is_none")]
217 kind: Option<String>,
218 #[serde(skip_serializing_if = "Option::is_none")]
219 retryable: Option<bool>,
220}
221
222impl ErrorBody {
223 fn plain(error: String) -> Self {
224 Self { error, kind: None, retryable: None }
225 }
226}
227
228impl IntoResponse for ApiError {
229 fn into_response(self) -> Response {
230 use ff_script::retry::kind_to_stable_str;
231
232 let (status, body) = match &self.0 {
233 ServerError::NotFound(msg) => {
234 (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
235 }
236 ServerError::InvalidInput(msg) => {
237 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
238 }
239 ServerError::OperationFailed(msg) => {
240 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
241 }
242 ServerError::ConcurrencyLimitExceeded(source, max) => (
243 StatusCode::TOO_MANY_REQUESTS,
244 ErrorBody {
245 error: format!(
246 "too many concurrent {source} calls (server max: {max}); retry with backoff"
247 ),
248 kind: None,
249 retryable: Some(true),
250 },
251 ),
252 ServerError::Valkey(e) => {
253 let kind_str = kind_to_stable_str(e.kind());
254 tracing::error!(
255 kind = kind_str,
256 code = e.code().unwrap_or(""),
257 detail = e.detail().unwrap_or(""),
258 "valkey error"
259 );
260 (
261 StatusCode::INTERNAL_SERVER_ERROR,
262 ErrorBody {
263 error: self.0.to_string(),
264 kind: Some(kind_str.to_owned()),
265 retryable: Some(self.0.is_retryable()),
266 },
267 )
268 }
269 ServerError::ValkeyContext { source, context } => {
270 let kind_str = kind_to_stable_str(source.kind());
271 tracing::error!(
272 kind = kind_str,
273 code = source.code().unwrap_or(""),
274 detail = source.detail().unwrap_or(""),
275 context = %context,
276 "valkey error"
277 );
278 (
279 StatusCode::INTERNAL_SERVER_ERROR,
280 ErrorBody {
281 error: self.0.to_string(),
282 kind: Some(kind_str.to_owned()),
283 retryable: Some(self.0.is_retryable()),
284 },
285 )
286 }
287 ServerError::LibraryLoad(load_err) => {
288 let kind_str = load_err.valkey_kind().map(kind_to_stable_str);
289 tracing::error!(
290 kind = kind_str.unwrap_or(""),
291 error = %load_err,
292 "library load failure"
293 );
294 (
295 StatusCode::INTERNAL_SERVER_ERROR,
296 ErrorBody {
297 error: format!("library load: {load_err}"),
298 kind: kind_str.map(str::to_owned),
299 retryable: Some(self.0.is_retryable()),
300 },
301 )
302 }
303 other => (
307 StatusCode::INTERNAL_SERVER_ERROR,
308 ErrorBody {
309 error: other.to_string(),
310 kind: None,
311 retryable: Some(false),
312 },
313 ),
314 };
315 (status, Json(body)).into_response()
316 }
317}
318
319pub fn router(
322 server: Arc<Server>,
323 cors_origins: &[String],
324 api_token: Option<String>,
325) -> Result<Router, ConfigError> {
326 router_with_metrics(server, cors_origins, api_token, None)
327}
328
329pub fn router_with_metrics(
338 server: Arc<Server>,
339 cors_origins: &[String],
340 api_token: Option<String>,
341 #[cfg_attr(not(feature = "observability"), allow(unused_variables))]
342 metrics: Option<Arc<crate::Metrics>>,
343) -> Result<Router, ConfigError> {
344 let auth_enabled = api_token.is_some();
345 let cors = build_cors_layer(cors_origins, auth_enabled)?;
346
347 let mut app = Router::new()
351 .route(
353 "/v1/executions",
354 with_body_limit(
355 get(list_executions).post(create_execution),
356 BODY_LIMIT_LARGE_PAYLOAD,
357 ),
358 )
359 .route("/v1/executions/{id}", get(get_execution))
360 .route("/v1/executions/{id}/state", get(get_execution_state))
361 .route(
362 "/v1/executions/{id}/pending-waitpoints",
363 get(list_pending_waitpoints),
364 )
365 .route("/v1/executions/{id}/result", get(get_execution_result))
366 .route(
367 "/v1/executions/{id}/cancel",
368 with_body_limit(post(cancel_execution), BODY_LIMIT_CONTROL),
369 )
370 .route(
371 "/v1/executions/{id}/signal",
372 with_body_limit(post(deliver_signal), BODY_LIMIT_MEDIUM_PAYLOAD),
373 )
374 .route(
375 "/v1/executions/{id}/priority",
376 with_body_limit(put(change_priority), BODY_LIMIT_CONTROL),
377 )
378 .route(
379 "/v1/executions/{id}/replay",
380 with_body_limit(post(replay_execution), BODY_LIMIT_CONTROL),
381 )
382 .route(
383 "/v1/executions/{id}/revoke-lease",
384 with_body_limit(post(revoke_lease), BODY_LIMIT_CONTROL),
385 )
386 .route(
391 "/v1/workers/{worker_id}/claim",
392 with_body_limit(post(claim_for_worker), BODY_LIMIT_CONTROL),
393 )
394 .route(
396 "/v1/executions/{id}/attempts/{idx}/stream",
397 get(read_attempt_stream),
398 )
399 .route(
400 "/v1/executions/{id}/attempts/{idx}/stream/tail",
401 get(tail_attempt_stream),
402 )
403 .route(
405 "/v1/flows",
406 with_body_limit(post(create_flow), BODY_LIMIT_CONTROL),
407 )
408 .route(
409 "/v1/flows/{id}/members",
410 with_body_limit(post(add_execution_to_flow), BODY_LIMIT_CONTROL),
411 )
412 .route(
413 "/v1/flows/{id}/cancel",
414 with_body_limit(post(cancel_flow), BODY_LIMIT_CONTROL),
415 )
416 .route(
417 "/v1/flows/{id}/edges",
418 with_body_limit(post(stage_dependency_edge), BODY_LIMIT_CONTROL),
419 )
420 .route(
421 "/v1/flows/{id}/edges/apply",
422 with_body_limit(post(apply_dependency_to_child), BODY_LIMIT_CONTROL),
423 )
424 .route(
426 "/v1/budgets",
427 with_body_limit(post(create_budget), BODY_LIMIT_CONTROL),
428 )
429 .route("/v1/budgets/{id}", get(get_budget_status))
430 .route(
431 "/v1/budgets/{id}/usage",
432 with_body_limit(post(report_usage), BODY_LIMIT_CONTROL),
433 )
434 .route(
435 "/v1/budgets/{id}/reset",
436 with_body_limit(post(reset_budget), BODY_LIMIT_CONTROL),
437 )
438 .route(
440 "/v1/quotas",
441 with_body_limit(post(create_quota_policy), BODY_LIMIT_CONTROL),
442 )
443 .route(
445 "/v1/admin/rotate-waitpoint-secret",
446 with_body_limit(post(rotate_waitpoint_secret), BODY_LIMIT_CONTROL),
447 )
448 .route("/healthz", get(healthz));
450
451 if let Some(token) = api_token {
452 let token = Arc::new(token);
453 app = app.layer(middleware::from_fn(move |req, next| {
454 let token = token.clone();
455 auth_middleware(token, req, next)
456 }));
457 }
458
459 #[cfg(feature = "observability")]
465 if let Some(m) = metrics.as_ref() {
466 let m = m.clone();
467 app = app.layer(middleware::from_fn_with_state(
468 m,
469 crate::metrics::http_middleware,
470 ));
471 }
472
473 #[cfg_attr(not(feature = "observability"), allow(unused_mut))]
474 let mut app = app
475 .layer(TraceLayer::new_for_http())
476 .layer(cors)
477 .with_state(server);
478
479 #[cfg(feature = "observability")]
486 if let Some(m) = metrics {
487 let metrics_router: Router = Router::new()
488 .route("/metrics", get(crate::metrics::metrics_handler))
489 .with_state(m);
490 app = app.merge(metrics_router);
491 }
492
493 Ok(app)
494}
495
496async fn auth_middleware(
497 token: Arc<String>,
498 req: Request,
499 next: middleware::Next,
500) -> Response {
501 if req.uri().path() == "/healthz" {
502 return next.run(req).await;
503 }
504
505 let auth_header = req
506 .headers()
507 .get("authorization")
508 .and_then(|v| v.to_str().ok());
509
510 let authorized = auth_header
511 .and_then(|v| v.strip_prefix("Bearer "))
512 .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
513
514 if authorized {
515 next.run(req).await
516 } else {
517 (
518 StatusCode::UNAUTHORIZED,
519 Json(ErrorBody::plain(
520 "missing or invalid Authorization header".to_owned(),
521 )),
522 )
523 .into_response()
524 }
525}
526
527fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
528 if a.len() != b.len() {
529 return false;
530 }
531 let mut diff = 0u8;
532 for (x, y) in a.iter().zip(b.iter()) {
533 diff |= x ^ y;
534 }
535 diff == 0
536}
537
538fn build_cors_layer(origins: &[String], auth_enabled: bool) -> Result<CorsLayer, ConfigError> {
549 if origins.iter().any(|o| o == "*") {
550 return Ok(CorsLayer::permissive());
551 }
552
553 let mut parsed = Vec::with_capacity(origins.len());
554 let mut accepted = Vec::with_capacity(origins.len());
555 let mut invalid = Vec::new();
556 for o in origins {
557 match o.parse() {
558 Ok(v) => {
559 parsed.push(v);
560 accepted.push(o.as_str());
561 }
562 Err(_) => invalid.push(o.clone()),
563 }
564 }
565
566 if parsed.is_empty() && !origins.is_empty() {
567 return Err(ConfigError::InvalidValue {
568 var: "FF_CORS_ORIGINS".to_owned(),
569 message: format!(
570 "all configured origins failed to parse as valid HTTP header values: {:?}; \
571 refusing to fall back to permissive CORS",
572 origins
573 ),
574 });
575 }
576
577 if !invalid.is_empty() {
578 tracing::warn!(
581 ?invalid,
582 ?accepted,
583 "some FF_CORS_ORIGINS entries failed to parse and were dropped"
584 );
585 }
586
587 let mut headers = vec![axum::http::header::CONTENT_TYPE];
590 if auth_enabled {
591 headers.push(axum::http::header::AUTHORIZATION);
592 }
593
594 Ok(CorsLayer::new()
595 .allow_origin(AllowOrigin::list(parsed))
596 .allow_methods([Method::GET, Method::POST, Method::PUT])
597 .allow_headers(headers))
598}
599
600#[derive(Deserialize)]
603struct ListExecutionsParams {
604 partition: u16,
605 #[serde(default = "default_lane")]
606 lane: String,
607 #[serde(default = "default_state_filter")]
608 state: String,
609 #[serde(default = "default_limit")]
610 limit: u64,
611 #[serde(default)]
612 offset: u64,
613}
614
615fn default_lane() -> String { "default".to_owned() }
616fn default_state_filter() -> String { "eligible".to_owned() }
617fn default_limit() -> u64 { 50 }
618
619async fn list_executions(
620 State(server): State<Arc<Server>>,
621 Query(params): Query<ListExecutionsParams>,
622) -> Result<Json<ListExecutionsResult>, ApiError> {
623 let lane = ff_core::types::LaneId::try_new(params.lane.clone())
624 .map_err(|e| ApiError::from(ServerError::InvalidInput(format!("invalid lane: {e}"))))?;
625 let limit = params.limit.min(1000);
626 let result = server
627 .list_executions(params.partition, &lane, ¶ms.state, params.offset, limit)
628 .await?;
629 Ok(Json(result))
630}
631
632async fn create_execution(
633 State(server): State<Arc<Server>>,
634 AppJson(args): AppJson<CreateExecutionArgs>,
635) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
636 let result = server.create_execution(&args).await?;
637 let status = match &result {
638 CreateExecutionResult::Created { .. } => StatusCode::CREATED,
639 CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
640 };
641 Ok((status, Json(result)))
642}
643
644async fn get_execution(
645 State(server): State<Arc<Server>>,
646 Path(id): Path<String>,
647) -> Result<Json<ExecutionInfo>, ApiError> {
648 let eid = parse_execution_id(&id)?;
649 Ok(Json(server.get_execution(&eid).await?))
650}
651
652async fn get_execution_state(
653 State(server): State<Arc<Server>>,
654 Path(id): Path<String>,
655) -> Result<Json<PublicState>, ApiError> {
656 let eid = parse_execution_id(&id)?;
657 Ok(Json(server.get_execution_state(&eid).await?))
658}
659
660async fn list_pending_waitpoints(
673 State(server): State<Arc<Server>>,
674 Path(id): Path<String>,
675) -> Result<Json<Vec<PendingWaitpointInfo>>, ApiError> {
676 let eid = parse_execution_id(&id)?;
677 Ok(Json(server.list_pending_waitpoints(&eid).await?))
678}
679
680async fn get_execution_result(
715 State(server): State<Arc<Server>>,
716 Path(id): Path<String>,
717) -> Result<Response, ApiError> {
718 let eid = parse_execution_id(&id)?;
719 match server.get_execution_result(&eid).await? {
720 Some(bytes) => Ok((
721 StatusCode::OK,
722 [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
723 bytes,
724 )
725 .into_response()),
726 None => Err(ApiError(ServerError::NotFound(format!(
727 "execution result not found: {eid}"
728 )))),
729 }
730}
731
732async fn cancel_execution(
733 State(server): State<Arc<Server>>,
734 Path(id): Path<String>,
735 AppJson(mut args): AppJson<CancelExecutionArgs>,
736) -> Result<Json<CancelExecutionResult>, ApiError> {
737 let path_eid = parse_execution_id(&id)?;
738 check_id_match(&path_eid, &args.execution_id, "execution_id")?;
739 args.execution_id = path_eid;
740 Ok(Json(server.cancel_execution(&args).await?))
741}
742
743async fn deliver_signal(
744 State(server): State<Arc<Server>>,
745 Path(id): Path<String>,
746 AppJson(mut args): AppJson<DeliverSignalArgs>,
747) -> Result<Json<DeliverSignalResult>, ApiError> {
748 let path_eid = parse_execution_id(&id)?;
749 check_id_match(&path_eid, &args.execution_id, "execution_id")?;
750 args.execution_id = path_eid;
751 Ok(Json(server.deliver_signal(&args).await?))
752}
753
754#[derive(Deserialize)]
757struct RotateWaitpointSecretBody {
758 new_kid: String,
759 new_secret_hex: String,
761}
762
763const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
777
778async fn rotate_waitpoint_secret(
779 State(server): State<Arc<Server>>,
780 AppJson(body): AppJson<RotateWaitpointSecretBody>,
781) -> Result<Response, ApiError> {
782 let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
787 let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
788 Ok(r) => r?,
789 Err(_) => {
790 tracing::error!(
791 target: "audit",
792 new_kid = %body.new_kid,
793 timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
794 "waitpoint_hmac_rotation_timeout_http_504"
795 );
796 let body = ErrorBody::plain(format!(
797 "rotation exceeded {}s server-side timeout; retry is safe \
798 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
799 ROTATE_HTTP_TIMEOUT.as_secs()
800 ));
801 return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
802 }
803 };
804 if result.rotated == 0 && result.failed.is_empty() {
814 return Err(ApiError::from(ServerError::OperationFailed(
815 "rotation had no partitions to operate on \
816 (num_flow_partitions is 0 — server misconfigured)"
817 .to_owned(),
818 )));
819 }
820 if result.rotated == 0 && !result.failed.is_empty() {
821 return Err(ApiError::from(ServerError::OperationFailed(
822 "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
823 )));
824 }
825 Ok(Json(result).into_response())
826}
827
828#[derive(Deserialize)]
829struct ChangePriorityBody {
830 new_priority: i32,
831}
832
833async fn change_priority(
834 State(server): State<Arc<Server>>,
835 Path(id): Path<String>,
836 AppJson(body): AppJson<ChangePriorityBody>,
837) -> Result<Json<ChangePriorityResult>, ApiError> {
838 let eid = parse_execution_id(&id)?;
839 Ok(Json(server.change_priority(&eid, body.new_priority).await?))
840}
841
842async fn replay_execution(
843 State(server): State<Arc<Server>>,
844 Path(id): Path<String>,
845) -> Result<Json<ReplayExecutionResult>, ApiError> {
846 let eid = parse_execution_id(&id)?;
847 Ok(Json(server.replay_execution(&eid).await?))
848}
849
850async fn revoke_lease(
851 State(server): State<Arc<Server>>,
852 Path(id): Path<String>,
853) -> Result<Json<RevokeLeaseResult>, ApiError> {
854 let eid = parse_execution_id(&id)?;
855 Ok(Json(server.revoke_lease(&eid).await?))
856}
857
858#[derive(Deserialize)]
868struct ClaimForWorkerBody {
869 lane_id: String,
870 worker_instance_id: String,
871 #[serde(default)]
875 capabilities: Vec<String>,
876 grant_ttl_ms: u64,
879}
880
881#[derive(Serialize)]
885struct ClaimGrantDto {
886 execution_id: String,
887 partition_key: ff_core::partition::PartitionKey,
888 grant_key: String,
889 expires_at_ms: u64,
890}
891
892impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
893 fn from(g: ff_core::contracts::ClaimGrant) -> Self {
894 Self {
895 execution_id: g.execution_id.to_string(),
896 partition_key: g.partition_key,
897 grant_key: g.grant_key,
898 expires_at_ms: g.expires_at_ms,
899 }
900 }
901}
902
903const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
907
908fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
913 if value.is_empty() {
914 return Err(ApiError(ServerError::InvalidInput(format!(
915 "{field}: must not be empty"
916 ))));
917 }
918 if value.len() > 256 {
919 return Err(ApiError(ServerError::InvalidInput(format!(
920 "{field}: exceeds 256 bytes (got {})",
921 value.len()
922 ))));
923 }
924 if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
925 return Err(ApiError(ServerError::InvalidInput(format!(
926 "{field}: must not contain whitespace or control characters"
927 ))));
928 }
929 Ok(())
930}
931
932async fn claim_for_worker(
933 State(server): State<Arc<Server>>,
934 Path(worker_id): Path<String>,
935 AppJson(body): AppJson<ClaimForWorkerBody>,
936) -> Result<Response, ApiError> {
937 validate_identifier("worker_id", &worker_id)?;
938 validate_identifier("worker_instance_id", &body.worker_instance_id)?;
939 let worker_id = WorkerId::new(worker_id);
940 let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
941 let lane = LaneId::try_new(body.lane_id).map_err(|e| {
942 ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
943 })?;
944 if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
945 return Err(ApiError(ServerError::InvalidInput(format!(
946 "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
947 ))));
948 }
949 let caps: std::collections::BTreeSet<String> =
950 body.capabilities.into_iter().collect();
951
952 match server
953 .claim_for_worker(
954 &lane,
955 &worker_id,
956 &worker_instance_id,
957 &caps,
958 body.grant_ttl_ms,
959 )
960 .await?
961 {
962 Some(grant) => Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response()),
963 None => Ok(StatusCode::NO_CONTENT.into_response()),
964 }
965}
966
967#[derive(Deserialize)]
970struct ReadStreamParams {
971 #[serde(default = "ff_core::contracts::StreamCursor::start")]
972 from: ff_core::contracts::StreamCursor,
973 #[serde(default = "ff_core::contracts::StreamCursor::end")]
974 to: ff_core::contracts::StreamCursor,
975 #[serde(default = "default_read_limit")]
976 limit: u64,
977}
978
979fn default_read_limit() -> u64 { 100 }
980
981#[derive(Serialize)]
982struct ReadStreamResponse {
983 frames: Vec<StreamFrame>,
984 count: usize,
985 #[serde(skip_serializing_if = "Option::is_none")]
990 closed_at: Option<i64>,
991 #[serde(skip_serializing_if = "Option::is_none")]
994 closed_reason: Option<String>,
995}
996
997impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
998 fn from(sf: ff_core::contracts::StreamFrames) -> Self {
999 let count = sf.frames.len();
1000 Self {
1001 frames: sf.frames,
1002 count,
1003 closed_at: sf.closed_at.map(|t| t.0),
1004 closed_reason: sf.closed_reason,
1005 }
1006 }
1007}
1008
1009const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
1019
1020async fn read_attempt_stream(
1021 State(server): State<Arc<Server>>,
1022 Path((id, idx)): Path<(String, u32)>,
1023 Query(params): Query<ReadStreamParams>,
1024) -> Result<Json<ReadStreamResponse>, ApiError> {
1025 if params.limit == 0 {
1026 return Err(ApiError(ServerError::InvalidInput(
1027 "limit must be >= 1".to_owned(),
1028 )));
1029 }
1030 if params.limit > REST_STREAM_LIMIT_CEILING {
1031 return Err(ApiError(ServerError::InvalidInput(format!(
1032 "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
1033 ))));
1034 }
1035 let eid = parse_execution_id(&id)?;
1036 let attempt_index = AttemptIndex::new(idx);
1037 let result = server
1038 .read_attempt_stream(
1039 &eid,
1040 attempt_index,
1041 params.from.to_wire(),
1042 params.to.to_wire(),
1043 params.limit,
1044 )
1045 .await?;
1046 Ok(Json(result.into()))
1047}
1048
1049#[derive(Deserialize)]
1050struct TailStreamParams {
1051 #[serde(default = "ff_core::contracts::StreamCursor::beginning")]
1052 after: ff_core::contracts::StreamCursor,
1053 #[serde(default)]
1054 block_ms: u64,
1055 #[serde(default = "default_tail_limit")]
1056 limit: u64,
1057}
1058
1059fn default_tail_limit() -> u64 { 50 }
1060
1061const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1070
1071async fn tail_attempt_stream(
1072 State(server): State<Arc<Server>>,
1073 Path((id, idx)): Path<(String, u32)>,
1074 Query(params): Query<TailStreamParams>,
1075) -> Result<Json<ReadStreamResponse>, ApiError> {
1076 if params.block_ms > MAX_TAIL_BLOCK_MS {
1077 return Err(ApiError(ServerError::InvalidInput(format!(
1078 "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
1079 ))));
1080 }
1081 if params.limit == 0 {
1082 return Err(ApiError(ServerError::InvalidInput(
1083 "limit must be >= 1".to_owned(),
1084 )));
1085 }
1086 if params.limit > REST_STREAM_LIMIT_CEILING {
1087 return Err(ApiError(ServerError::InvalidInput(format!(
1088 "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
1089 ))));
1090 }
1091 if !params.after.is_concrete() {
1098 return Err(ApiError(ServerError::InvalidInput(
1099 "after: XREAD cursor must be a concrete entry id; pass '0-0' to start from the beginning"
1100 .to_owned(),
1101 )));
1102 }
1103
1104 let eid = parse_execution_id(&id)?;
1105 let attempt_index = AttemptIndex::new(idx);
1106 let result = server
1107 .tail_attempt_stream(
1108 &eid,
1109 attempt_index,
1110 params.after.to_wire(),
1111 params.block_ms,
1112 params.limit,
1113 )
1114 .await?;
1115 Ok(Json(result.into()))
1116}
1117
1118async fn create_flow(
1121 State(server): State<Arc<Server>>,
1122 AppJson(args): AppJson<CreateFlowArgs>,
1123) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
1124 let result = server.create_flow(&args).await?;
1125 let status = match &result {
1126 CreateFlowResult::Created { .. } => StatusCode::CREATED,
1127 CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
1128 };
1129 Ok((status, Json(result)))
1130}
1131
1132async fn add_execution_to_flow(
1133 State(server): State<Arc<Server>>,
1134 Path(id): Path<String>,
1135 AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
1136) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
1137 let path_fid = parse_flow_id(&id)?;
1138 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1139 args.flow_id = path_fid;
1140 let result = server.add_execution_to_flow(&args).await?;
1141 let status = match &result {
1142 AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
1143 AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
1144 };
1145 Ok((status, Json(result)))
1146}
1147
1148async fn cancel_flow(
1161 State(server): State<Arc<Server>>,
1162 Path(id): Path<String>,
1163 Query(params): Query<HashMap<String, String>>,
1164 AppJson(mut args): AppJson<CancelFlowArgs>,
1165) -> Result<Json<CancelFlowResult>, ApiError> {
1166 let path_fid = parse_flow_id(&id)?;
1167 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1168 args.flow_id = path_fid;
1169 let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
1170 let result = if wait {
1171 server.cancel_flow_wait(&args).await?
1172 } else {
1173 server.cancel_flow(&args).await?
1174 };
1175 Ok(Json(result))
1176}
1177
1178async fn stage_dependency_edge(
1179 State(server): State<Arc<Server>>,
1180 Path(id): Path<String>,
1181 AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
1182) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
1183 let path_fid = parse_flow_id(&id)?;
1184 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1185 args.flow_id = path_fid;
1186 let result = server.stage_dependency_edge(&args).await?;
1187 Ok((StatusCode::CREATED, Json(result)))
1188}
1189
1190async fn apply_dependency_to_child(
1191 State(server): State<Arc<Server>>,
1192 Path(id): Path<String>,
1193 AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
1194) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
1195 let path_fid = parse_flow_id(&id)?;
1196 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
1197 args.flow_id = path_fid;
1198 Ok(Json(server.apply_dependency_to_child(&args).await?))
1199}
1200
1201async fn create_budget(
1204 State(server): State<Arc<Server>>,
1205 AppJson(args): AppJson<CreateBudgetArgs>,
1206) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
1207 let result = server.create_budget(&args).await?;
1208 let status = match &result {
1209 CreateBudgetResult::Created { .. } => StatusCode::CREATED,
1210 CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
1211 };
1212 Ok((status, Json(result)))
1213}
1214
1215async fn get_budget_status(
1216 State(server): State<Arc<Server>>,
1217 Path(id): Path<String>,
1218) -> Result<Json<BudgetStatus>, ApiError> {
1219 let bid = parse_budget_id(&id)?;
1220 Ok(Json(server.get_budget_status(&bid).await?))
1221}
1222
1223#[derive(Deserialize)]
1224struct ReportUsageBody {
1225 dimensions: HashMap<String, u64>,
1226 now: ff_core::types::TimestampMs,
1227 #[serde(default)]
1228 dedup_key: Option<String>,
1229}
1230
1231async fn report_usage(
1232 State(server): State<Arc<Server>>,
1233 Path(id): Path<String>,
1234 AppJson(body): AppJson<ReportUsageBody>,
1235) -> Result<Json<ReportUsageResult>, ApiError> {
1236 let bid = parse_budget_id(&id)?;
1237 let dims: Vec<String> = body.dimensions.keys().cloned().collect();
1238 let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
1239 let args = ReportUsageArgs {
1240 dimensions: dims,
1241 deltas,
1242 now: body.now,
1243 dedup_key: body.dedup_key,
1244 };
1245 Ok(Json(server.report_usage(&bid, &args).await?))
1246}
1247
1248async fn reset_budget(
1249 State(server): State<Arc<Server>>,
1250 Path(id): Path<String>,
1251) -> Result<Json<ResetBudgetResult>, ApiError> {
1252 let bid = parse_budget_id(&id)?;
1253 Ok(Json(server.reset_budget(&bid).await?))
1254}
1255
1256async fn create_quota_policy(
1257 State(server): State<Arc<Server>>,
1258 AppJson(args): AppJson<CreateQuotaPolicyArgs>,
1259) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
1260 let result = server.create_quota_policy(&args).await?;
1261 let status = match &result {
1262 CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
1263 CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
1264 };
1265 Ok((status, Json(result)))
1266}
1267
1268#[derive(Serialize)]
1271struct HealthResponse {
1272 status: &'static str,
1273}
1274
1275async fn healthz(
1276 State(server): State<Arc<Server>>,
1277) -> Result<Json<HealthResponse>, ApiError> {
1278 let _: String = server
1279 .client()
1280 .cmd("PING")
1281 .execute()
1282 .await
1283 .map_err(|e| ApiError(ServerError::ValkeyContext { source: e, context: "healthz PING".into() }))?;
1284 Ok(Json(HealthResponse { status: "ok" }))
1285}
1286
1287fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1291 if body_id != path_id {
1292 return Err(ApiError(ServerError::InvalidInput(format!(
1293 "path {id_name} does not match body {id_name}"
1294 ))));
1295 }
1296 Ok(())
1297}
1298
1299fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1300 ExecutionId::parse(s)
1301 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1302}
1303
1304fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1305 FlowId::parse(s)
1306 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1307}
1308
1309fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1310 BudgetId::parse(s)
1311 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1312}
1313
1314#[cfg(test)]
1315mod cors_tests {
1316 use super::*;
1323 use axum::body::Body;
1324 use axum::http::{Request, StatusCode};
1325 use axum::routing::get;
1326 use tower::ServiceExt;
1327
1328 fn app_with_cors(origins: &[String], auth_enabled: bool) -> Router {
1329 let cors = build_cors_layer(origins, auth_enabled)
1330 .expect("build_cors_layer succeeds for valid inputs");
1331 Router::new().route("/noop", get(|| async { "ok" })).layer(cors)
1332 }
1333
1334 #[test]
1335 fn all_origins_invalid_returns_config_error_instead_of_permissive() {
1336 let err = build_cors_layer(&["bad\0origin".to_owned()], false).unwrap_err();
1344 let ConfigError::InvalidValue { var, message } = err;
1345 assert_eq!(var, "FF_CORS_ORIGINS");
1346 assert!(
1347 message.contains("all configured origins failed to parse"),
1348 "message was: {message}"
1349 );
1350 }
1351
1352 #[test]
1353 fn wildcard_still_returns_permissive() {
1354 let layer = build_cors_layer(&["*".to_owned()], false);
1356 assert!(layer.is_ok());
1357 }
1358
1359 #[test]
1360 fn empty_origins_returns_empty_allowlist_ok() {
1361 let layer = build_cors_layer(&[], false);
1365 assert!(layer.is_ok());
1366 }
1367
1368 #[test]
1369 fn mixed_valid_and_invalid_keeps_valid_entries() {
1370 let layer = build_cors_layer(
1373 &["https://ok.example.com".to_owned(), "bad\0origin".to_owned()],
1374 false,
1375 );
1376 assert!(layer.is_ok());
1377 }
1378
1379 #[tokio::test]
1380 async fn preflight_allows_authorization_when_auth_enabled() {
1381 let app = app_with_cors(&["https://client.example.com".to_owned()], true);
1384
1385 let req = Request::builder()
1386 .method("OPTIONS")
1387 .uri("/noop")
1388 .header("origin", "https://client.example.com")
1389 .header("access-control-request-method", "GET")
1390 .header("access-control-request-headers", "authorization,content-type")
1391 .body(Body::empty())
1392 .unwrap();
1393
1394 let resp = app.oneshot(req).await.unwrap();
1395 assert_eq!(resp.status(), StatusCode::OK);
1396
1397 let allow_headers = resp
1398 .headers()
1399 .get("access-control-allow-headers")
1400 .expect("access-control-allow-headers present")
1401 .to_str()
1402 .unwrap()
1403 .to_ascii_lowercase();
1404 assert!(
1405 allow_headers.contains("authorization"),
1406 "expected `authorization` in Access-Control-Allow-Headers, got: {allow_headers}"
1407 );
1408 assert!(
1409 allow_headers.contains("content-type"),
1410 "expected `content-type` in Access-Control-Allow-Headers, got: {allow_headers}"
1411 );
1412 }
1413
1414 #[tokio::test]
1415 async fn preflight_omits_authorization_when_auth_disabled() {
1416 let app = app_with_cors(&["https://client.example.com".to_owned()], false);
1419
1420 let req = Request::builder()
1421 .method("OPTIONS")
1422 .uri("/noop")
1423 .header("origin", "https://client.example.com")
1424 .header("access-control-request-method", "GET")
1425 .header("access-control-request-headers", "content-type")
1426 .body(Body::empty())
1427 .unwrap();
1428
1429 let resp = app.oneshot(req).await.unwrap();
1430 assert_eq!(resp.status(), StatusCode::OK);
1431
1432 let allow_headers = resp
1433 .headers()
1434 .get("access-control-allow-headers")
1435 .expect("access-control-allow-headers present")
1436 .to_str()
1437 .unwrap()
1438 .to_ascii_lowercase();
1439 assert!(
1440 !allow_headers.contains("authorization"),
1441 "Authorization should not be advertised when auth is off; got: {allow_headers}"
1442 );
1443 assert!(allow_headers.contains("content-type"));
1444 }
1445}
1446
1447#[cfg(test)]
1448mod claim_grant_dto_tests {
1449 use super::*;
1455 use ff_core::contracts::ClaimGrant;
1456 use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1457 use ff_core::types::{ExecutionId, FlowId};
1458
1459 fn sample_grant(family: PartitionFamily) -> ClaimGrant {
1460 let config = ff_core::partition::PartitionConfig::default();
1461 let fid = FlowId::new();
1462 let eid = ExecutionId::for_flow(&fid, &config);
1463 let p = Partition { family, index: 7 };
1464 ClaimGrant {
1465 execution_id: eid,
1466 partition_key: PartitionKey::from(&p),
1467 grant_key: "ff:exec:{fp:7}:deadbeef:claim_grant".to_owned(),
1468 expires_at_ms: 1_700_000_000_000,
1469 }
1470 }
1471
1472 #[test]
1473 fn claim_grant_dto_emits_opaque_partition_key() {
1474 let dto = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1475 let json = serde_json::to_value(&dto).unwrap();
1476 assert_eq!(json["partition_key"], serde_json::json!("{fp:7}"));
1479 assert!(json.get("partition_family").is_none());
1480 assert!(json.get("partition_index").is_none());
1481 assert_eq!(json["expires_at_ms"], serde_json::json!(1_700_000_000_000u64));
1482 }
1483
1484 #[test]
1485 fn claim_grant_dto_collapses_execution_alias_on_wire() {
1486 let dto_flow = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
1490 let dto_exec = ClaimGrantDto::from(sample_grant(PartitionFamily::Execution));
1491 let jf = serde_json::to_value(&dto_flow).unwrap();
1492 let je = serde_json::to_value(&dto_exec).unwrap();
1493 assert_eq!(jf["partition_key"], je["partition_key"]);
1494 }
1495}