use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use axum::{
extract::{DefaultBodyLimit, MatchedPath, Path, Query, Request, State},
http::StatusCode,
middleware,
response::{IntoResponse, Response},
routing::{get, post, put, MethodRouter},
Json, Router,
};
use serde::{Deserialize, Serialize};
use axum::http::Method;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;
use ff_core::contracts::*;
use ff_core::state::PublicState;
use ff_core::types::*;
use crate::config::ConfigError;
use crate::server::{Server, ServerError};
const BODY_LIMIT_LARGE_PAYLOAD: usize = 1024 * 1024; const BODY_LIMIT_MEDIUM_PAYLOAD: usize = 256 * 1024; const BODY_LIMIT_CONTROL: usize = 64 * 1024;
#[derive(Clone, Copy)]
struct BodyLimit {
bytes: usize,
}
fn with_body_limit(route: MethodRouter<Arc<Server>>, bytes: usize) -> MethodRouter<Arc<Server>> {
let r: MethodRouter<Arc<Server>> =
route.layer(tower_http::limit::RequestBodyLimitLayer::new(bytes));
let r: MethodRouter<Arc<Server>> = r.layer(DefaultBodyLimit::max(bytes));
r.layer(middleware::from_fn(
move |req: Request, next: middleware::Next| enforce_body_limit(bytes, req, next),
))
}
async fn enforce_body_limit(bytes: usize, mut req: Request, next: middleware::Next) -> Response {
if let Some(content_length) = req
.headers()
.get(axum::http::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<usize>().ok())
&& content_length > bytes
{
let route = req
.extensions()
.get::<MatchedPath>()
.map(|m| m.as_str().to_owned())
.unwrap_or_default();
let body = PayloadTooLargeBody {
error: "payload_too_large",
limit_bytes: bytes,
route,
};
return (StatusCode::PAYLOAD_TOO_LARGE, Json(body)).into_response();
}
req.extensions_mut().insert(BodyLimit { bytes });
next.run(req).await
}
#[derive(Serialize)]
struct PayloadTooLargeBody {
error: &'static str,
limit_bytes: usize,
route: String,
}
struct AppJson<T>(T);
impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
where
T: serde::de::DeserializeOwned + Send,
S: Send + Sync,
{
type Rejection = Response;
async fn from_request(
req: axum::extract::Request,
state: &S,
) -> Result<Self, Self::Rejection> {
let limit = req.extensions().get::<BodyLimit>().copied();
let matched = req.extensions().get::<MatchedPath>().cloned();
match Json::<T>::from_request(req, state).await {
Ok(Json(value)) => Ok(AppJson(value)),
Err(rejection) => {
let status = rejection.status();
tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
if status == StatusCode::PAYLOAD_TOO_LARGE {
let limit_bytes = limit.map(|l| l.bytes).unwrap_or(0);
let route = matched
.as_ref()
.map(|m| m.as_str().to_owned())
.unwrap_or_default();
let body = PayloadTooLargeBody {
error: "payload_too_large",
limit_bytes,
route,
};
return Err((status, Json(body)).into_response());
}
let body = ErrorBody::plain(format!(
"invalid JSON: {}",
status.canonical_reason().unwrap_or("bad request"),
));
Err((status, Json(body)).into_response())
}
}
}
}
struct ApiError(ServerError);
impl From<ServerError> for ApiError {
fn from(e: ServerError) -> Self {
Self(e)
}
}
impl From<ff_core::engine_error::EngineError> for ApiError {
fn from(e: ff_core::engine_error::EngineError) -> Self {
Self(ServerError::Engine(Box::new(e)))
}
}
#[derive(Serialize)]
struct ErrorBody {
error: String,
#[serde(skip_serializing_if = "Option::is_none")]
kind: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
retryable: Option<bool>,
}
impl ErrorBody {
fn plain(error: String) -> Self {
Self { error, kind: None, retryable: None }
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let (status, body) = match &self.0 {
ServerError::NotFound(msg) => {
(StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
}
ServerError::InvalidInput(msg) => {
(StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
}
ServerError::OperationFailed(msg) => {
(StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
}
ServerError::ConcurrencyLimitExceeded(source, max) => (
StatusCode::TOO_MANY_REQUESTS,
ErrorBody {
error: format!(
"too many concurrent {source} calls (server max: {max}); retry with backoff"
),
kind: None,
retryable: Some(true),
},
),
ServerError::Backend(be) => {
let kind_str = be.kind().as_stable_str();
tracing::error!(
kind = kind_str,
message = be.message(),
"backend error"
);
(
StatusCode::INTERNAL_SERVER_ERROR,
ErrorBody {
error: self.0.to_string(),
kind: Some(kind_str.to_owned()),
retryable: Some(self.0.is_retryable()),
},
)
}
ServerError::BackendContext { source, context } => {
let kind_str = source.kind().as_stable_str();
tracing::error!(
kind = kind_str,
message = source.message(),
context = %context,
"backend error"
);
(
StatusCode::INTERNAL_SERVER_ERROR,
ErrorBody {
error: self.0.to_string(),
kind: Some(kind_str.to_owned()),
retryable: Some(self.0.is_retryable()),
},
)
}
ServerError::LibraryLoad(load_err) => {
let kind_str = load_err
.valkey_kind()
.map(ff_backend_valkey::classify_ferriskey_kind)
.map(|k| k.as_stable_str());
tracing::error!(
kind = kind_str.unwrap_or(""),
error = %load_err,
"library load failure"
);
(
StatusCode::INTERNAL_SERVER_ERROR,
ErrorBody {
error: format!("library load: {load_err}"),
kind: kind_str.map(str::to_owned),
retryable: Some(self.0.is_retryable()),
},
)
}
ServerError::Engine(boxed) => {
use ff_core::engine_error::EngineError as EE;
fn root(e: &EE) -> &EE {
match e {
EE::Contextual { source, .. } => root(source),
other => other,
}
}
match root(boxed) {
EE::ResourceExhausted { pool, max, .. } => (
StatusCode::TOO_MANY_REQUESTS,
ErrorBody {
error: format!(
"too many concurrent {pool} calls (server max: {max}); retry with backoff"
),
kind: Some("resource_exhausted".into()),
retryable: Some(true),
},
),
EE::Unavailable { op } => (
StatusCode::SERVICE_UNAVAILABLE,
ErrorBody {
error: format!("backend op unavailable: {op}"),
kind: Some("unavailable".into()),
retryable: Some(false),
},
),
EE::NotFound { entity } => (
StatusCode::NOT_FOUND,
ErrorBody::plain(format!("not found: {entity}")),
),
EE::Validation { kind, detail } => {
use ff_core::engine_error::ValidationKind as VK;
let code = match kind {
VK::InvalidInput => "invalid_input",
VK::CapabilityMismatch => "capability_mismatch",
VK::InvalidCapabilities => "invalid_capabilities",
VK::InvalidPolicyJson => "invalid_policy_json",
VK::PayloadTooLarge => "payload_too_large",
VK::SignalLimitExceeded => "signal_limit_exceeded",
VK::InvalidWaitpointKey => "invalid_waitpoint_key",
VK::InvalidToken => "invalid_token",
VK::WaitpointNotTokenBound => "waitpoint_not_token_bound",
VK::RetentionLimitExceeded => "retention_limit_exceeded",
VK::InvalidLeaseForSuspend => "invalid_lease_for_suspend",
VK::InvalidDependency => "invalid_dependency",
VK::InvalidWaitpointForExecution => "invalid_waitpoint_for_execution",
VK::InvalidBlockingReason => "invalid_blocking_reason",
VK::InvalidOffset => "invalid_offset",
VK::Unauthorized => "unauthorized",
VK::InvalidBudgetScope => "invalid_budget_scope",
VK::BudgetOverrideNotAllowed => "budget_override_not_allowed",
VK::InvalidQuotaSpec => "invalid_quota_spec",
VK::InvalidKid => "invalid_kid",
VK::InvalidSecretHex => "invalid_secret_hex",
VK::InvalidGraceMs => "invalid_grace_ms",
VK::InvalidTagKey => "invalid_tag_key",
VK::InvalidFrameType => "invalid_frame_type",
_ => "validation_error",
};
let msg = if detail.is_empty() {
code.to_string()
} else {
format!("{code}: {detail}")
};
(StatusCode::BAD_REQUEST, ErrorBody::plain(msg))
}
EE::Conflict(kind) => {
use ff_core::engine_error::ConflictKind as CK;
let code = match kind {
CK::DependencyAlreadyExists { .. } => "dependency_already_exists",
CK::CycleDetected => "cycle_detected",
CK::SelfReferencingEdge => "self_referencing_edge",
CK::ExecutionAlreadyInFlow => "execution_already_in_flow",
CK::WaitpointAlreadyExists => "waitpoint_already_exists",
CK::BudgetAttachConflict => "budget_attach_conflict",
CK::QuotaAttachConflict => "quota_attach_conflict",
CK::RotationConflict(_) => "rotation_conflict",
CK::ActiveAttemptExists => "active_attempt_exists",
_ => "conflict",
};
(
StatusCode::CONFLICT,
ErrorBody {
error: format!("{code}: {kind:?}"),
kind: Some(code.into()),
retryable: Some(false),
},
)
}
EE::Contention(ck) => {
use ff_core::engine_error::ContentionKind as CK;
let (status, code, retryable) = match ck {
CK::RetryExhausted => (
StatusCode::INTERNAL_SERVER_ERROR,
"retry_exhausted",
false,
),
CK::RateLimitExceeded => (
StatusCode::TOO_MANY_REQUESTS,
"rate_limit_exceeded",
true,
),
CK::ConcurrencyLimitExceeded => (
StatusCode::TOO_MANY_REQUESTS,
"concurrency_limit_exceeded",
true,
),
_ => (StatusCode::CONFLICT, "contention", true),
};
(
status,
ErrorBody {
error: format!("{code}: {ck:?}"),
kind: Some(code.into()),
retryable: Some(retryable),
},
)
}
EE::State(sk) => {
use ff_core::engine_error::StateKind as SK;
let (status, code) = match sk {
SK::ExecutionNotTerminal => {
(StatusCode::CONFLICT, "execution_not_terminal")
}
SK::MaxReplaysExhausted => {
(StatusCode::CONFLICT, "max_replays_exhausted")
}
SK::ReplayNotAllowed => {
(StatusCode::CONFLICT, "replay_not_allowed")
}
SK::NotRunnable => (StatusCode::CONFLICT, "not_runnable"),
SK::Terminal => (StatusCode::CONFLICT, "terminal"),
SK::FlowAlreadyTerminal => {
(StatusCode::CONFLICT, "flow_already_terminal")
}
SK::BudgetExceeded => (StatusCode::CONFLICT, "budget_exceeded"),
SK::BudgetSoftExceeded => {
(StatusCode::CONFLICT, "budget_soft_exceeded")
}
SK::AlreadySatisfied
| SK::DuplicateSignal
| SK::OkAlreadyApplied
| SK::AttemptAlreadyTerminal
| SK::StreamAlreadyClosed
| SK::LeaseExpired
| SK::LeaseRevoked => (StatusCode::CONFLICT, "already_satisfied"),
_ => (StatusCode::CONFLICT, "state_conflict"),
};
(
status,
ErrorBody {
error: format!("{code}: {sk:?}"),
kind: Some(code.into()),
retryable: Some(false),
},
)
}
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
ErrorBody {
error: self.0.to_string(),
kind: None,
retryable: Some(self.0.is_retryable()),
},
),
}
}
other => (
StatusCode::INTERNAL_SERVER_ERROR,
ErrorBody {
error: other.to_string(),
kind: None,
retryable: Some(false),
},
),
};
(status, Json(body)).into_response()
}
}
pub fn router(
server: Arc<Server>,
cors_origins: &[String],
api_token: Option<String>,
) -> Result<Router, ConfigError> {
router_with_metrics(server, cors_origins, api_token, None)
}
pub fn router_with_metrics(
server: Arc<Server>,
cors_origins: &[String],
api_token: Option<String>,
#[cfg_attr(not(feature = "observability"), allow(unused_variables))]
metrics: Option<Arc<crate::Metrics>>,
) -> Result<Router, ConfigError> {
let auth_enabled = api_token.is_some();
let cors = build_cors_layer(cors_origins, auth_enabled)?;
let mut app = Router::new()
.route(
"/v1/executions",
with_body_limit(
get(list_executions).post(create_execution),
BODY_LIMIT_LARGE_PAYLOAD,
),
)
.route("/v1/executions/{id}", get(get_execution))
.route("/v1/executions/{id}/state", get(get_execution_state))
.route(
"/v1/executions/{id}/pending-waitpoints",
get(list_pending_waitpoints),
)
.route("/v1/executions/{id}/result", get(get_execution_result))
.route(
"/v1/executions/{id}/cancel",
with_body_limit(post(cancel_execution), BODY_LIMIT_CONTROL),
)
.route(
"/v1/executions/{id}/signal",
with_body_limit(post(deliver_signal), BODY_LIMIT_MEDIUM_PAYLOAD),
)
.route(
"/v1/executions/{id}/priority",
with_body_limit(put(change_priority), BODY_LIMIT_CONTROL),
)
.route(
"/v1/executions/{id}/replay",
with_body_limit(post(replay_execution), BODY_LIMIT_CONTROL),
)
.route(
"/v1/executions/{id}/revoke-lease",
with_body_limit(post(revoke_lease), BODY_LIMIT_CONTROL),
)
.route(
"/v1/workers/{worker_id}/claim",
with_body_limit(post(claim_for_worker), BODY_LIMIT_CONTROL),
)
.route(
"/v1/executions/{id}/attempts/{idx}/stream",
get(read_attempt_stream),
)
.route(
"/v1/executions/{id}/attempts/{idx}/stream/tail",
get(tail_attempt_stream),
)
.route(
"/v1/flows",
with_body_limit(post(create_flow), BODY_LIMIT_CONTROL),
)
.route(
"/v1/flows/{id}/members",
with_body_limit(post(add_execution_to_flow), BODY_LIMIT_CONTROL),
)
.route(
"/v1/flows/{id}/cancel",
with_body_limit(post(cancel_flow), BODY_LIMIT_CONTROL),
)
.route(
"/v1/flows/{id}/edges",
with_body_limit(post(stage_dependency_edge), BODY_LIMIT_CONTROL),
)
.route(
"/v1/flows/{id}/edges/apply",
with_body_limit(post(apply_dependency_to_child), BODY_LIMIT_CONTROL),
)
.route(
"/v1/budgets",
with_body_limit(post(create_budget), BODY_LIMIT_CONTROL),
)
.route("/v1/budgets/{id}", get(get_budget_status))
.route(
"/v1/budgets/{id}/usage",
with_body_limit(post(report_usage), BODY_LIMIT_CONTROL),
)
.route(
"/v1/budgets/{id}/reset",
with_body_limit(post(reset_budget), BODY_LIMIT_CONTROL),
)
.route(
"/v1/quotas",
with_body_limit(post(create_quota_policy), BODY_LIMIT_CONTROL),
)
.route(
"/v1/admin/rotate-waitpoint-secret",
with_body_limit(post(rotate_waitpoint_secret), BODY_LIMIT_CONTROL),
)
.route("/healthz", get(healthz));
if let Some(token) = api_token {
let token = Arc::new(token);
app = app.layer(middleware::from_fn(move |req, next| {
let token = token.clone();
auth_middleware(token, req, next)
}));
}
#[cfg(feature = "observability")]
if let Some(m) = metrics.as_ref() {
let m = m.clone();
app = app.layer(middleware::from_fn_with_state(
m,
crate::metrics::http_middleware,
));
}
#[cfg_attr(not(feature = "observability"), allow(unused_mut))]
let mut app = app
.layer(TraceLayer::new_for_http())
.layer(cors)
.with_state(server);
#[cfg(feature = "observability")]
if let Some(m) = metrics {
let metrics_router: Router = Router::new()
.route("/metrics", get(crate::metrics::metrics_handler))
.with_state(m);
app = app.merge(metrics_router);
}
Ok(app)
}
async fn auth_middleware(
token: Arc<String>,
req: Request,
next: middleware::Next,
) -> Response {
if req.uri().path() == "/healthz" {
return next.run(req).await;
}
let auth_header = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok());
let authorized = auth_header
.and_then(|v| v.strip_prefix("Bearer "))
.is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
if authorized {
next.run(req).await
} else {
(
StatusCode::UNAUTHORIZED,
Json(ErrorBody::plain(
"missing or invalid Authorization header".to_owned(),
)),
)
.into_response()
}
}
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut diff = 0u8;
for (x, y) in a.iter().zip(b.iter()) {
diff |= x ^ y;
}
diff == 0
}
fn build_cors_layer(origins: &[String], auth_enabled: bool) -> Result<CorsLayer, ConfigError> {
if origins.iter().any(|o| o == "*") {
return Ok(CorsLayer::permissive());
}
let mut parsed = Vec::with_capacity(origins.len());
let mut accepted = Vec::with_capacity(origins.len());
let mut invalid = Vec::new();
for o in origins {
match o.parse() {
Ok(v) => {
parsed.push(v);
accepted.push(o.as_str());
}
Err(_) => invalid.push(o.clone()),
}
}
if parsed.is_empty() && !origins.is_empty() {
return Err(ConfigError::InvalidValue {
var: "FF_CORS_ORIGINS".to_owned(),
message: format!(
"all configured origins failed to parse as valid HTTP header values: {:?}; \
refusing to fall back to permissive CORS",
origins
),
});
}
if !invalid.is_empty() {
tracing::warn!(
?invalid,
?accepted,
"some FF_CORS_ORIGINS entries failed to parse and were dropped"
);
}
let mut headers = vec![axum::http::header::CONTENT_TYPE];
if auth_enabled {
headers.push(axum::http::header::AUTHORIZATION);
}
Ok(CorsLayer::new()
.allow_origin(AllowOrigin::list(parsed))
.allow_methods([Method::GET, Method::POST, Method::PUT])
.allow_headers(headers))
}
#[derive(Deserialize)]
struct ListExecutionsParams {
partition: u16,
#[serde(default)]
cursor: Option<String>,
#[serde(default = "default_limit")]
limit: u64,
}
fn default_limit() -> u64 { 50 }
async fn list_executions(
State(server): State<Arc<Server>>,
Query(params): Query<ListExecutionsParams>,
) -> Result<Json<ListExecutionsPage>, ApiError> {
let limit = params.limit.min(1000) as usize;
let cursor = match params.cursor {
Some(raw) if !raw.is_empty() => Some(
ff_core::types::ExecutionId::parse(&raw).map_err(|e| {
ApiError::from(ServerError::InvalidInput(format!(
"invalid cursor: {e}"
)))
})?,
),
_ => None,
};
let result = server
.list_executions_page(params.partition, cursor, limit)
.await?;
Ok(Json(result))
}
async fn create_execution(
State(server): State<Arc<Server>>,
AppJson(args): AppJson<CreateExecutionArgs>,
) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
let result = server.backend().create_execution(args).await?;
let status = match &result {
CreateExecutionResult::Created { .. } => StatusCode::CREATED,
CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
};
Ok((status, Json(result)))
}
async fn get_execution(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
) -> Result<Json<ExecutionInfo>, ApiError> {
let eid = parse_execution_id(&id)?;
Ok(Json(server.get_execution(&eid).await?))
}
async fn get_execution_state(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
) -> Result<Json<PublicState>, ApiError> {
let eid = parse_execution_id(&id)?;
Ok(Json(server.get_execution_state(&eid).await?))
}
async fn list_pending_waitpoints(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
) -> Result<Response, ApiError> {
let eid = parse_execution_id(&id)?;
let args = ff_core::contracts::ListPendingWaitpointsArgs::new(eid);
let page = server.backend().list_pending_waitpoints(args).await?;
Ok(Json(page.entries).into_response())
}
async fn get_execution_result(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
) -> Result<Response, ApiError> {
let eid = parse_execution_id(&id)?;
match server.backend().get_execution_result(&eid).await? {
Some(bytes) => Ok((
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
bytes,
)
.into_response()),
None => Err(ApiError(ServerError::NotFound(format!(
"execution result not found: {eid}"
)))),
}
}
async fn cancel_execution(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
AppJson(mut args): AppJson<CancelExecutionArgs>,
) -> Result<Json<CancelExecutionResult>, ApiError> {
let path_eid = parse_execution_id(&id)?;
check_id_match(&path_eid, &args.execution_id, "execution_id")?;
args.execution_id = path_eid;
Ok(Json(server.backend().cancel_execution(args).await?))
}
async fn deliver_signal(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
AppJson(mut args): AppJson<DeliverSignalArgs>,
) -> Result<Json<DeliverSignalResult>, ApiError> {
let path_eid = parse_execution_id(&id)?;
check_id_match(&path_eid, &args.execution_id, "execution_id")?;
args.execution_id = path_eid;
Ok(Json(server.deliver_signal(&args).await?))
}
#[derive(Deserialize)]
struct RotateWaitpointSecretBody {
new_kid: String,
new_secret_hex: String,
}
const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
async fn rotate_waitpoint_secret(
State(server): State<Arc<Server>>,
AppJson(body): AppJson<RotateWaitpointSecretBody>,
) -> Result<Response, ApiError> {
let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
Ok(r) => r?,
Err(_) => {
tracing::error!(
target: "audit",
new_kid = %body.new_kid,
timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
"waitpoint_hmac_rotation_timeout_http_504"
);
let body = ErrorBody::plain(format!(
"rotation exceeded {}s server-side timeout; retry is safe \
(per-partition rotation is idempotent on the same new_kid + secret_hex)",
ROTATE_HTTP_TIMEOUT.as_secs()
));
return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
}
};
if result.rotated == 0 && result.failed.is_empty() {
return Err(ApiError::from(ServerError::OperationFailed(
"rotation had no partitions to operate on \
(num_flow_partitions is 0 โ server misconfigured)"
.to_owned(),
)));
}
if result.rotated == 0 && !result.failed.is_empty() {
return Err(ApiError::from(ServerError::OperationFailed(
"rotation failed on all partitions (check Valkey connectivity)".to_owned(),
)));
}
Ok(Json(result).into_response())
}
#[derive(Deserialize)]
struct ChangePriorityBody {
new_priority: i32,
}
async fn change_priority(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
AppJson(body): AppJson<ChangePriorityBody>,
) -> Result<Json<ChangePriorityResult>, ApiError> {
let eid = parse_execution_id(&id)?;
let args = ff_core::contracts::ChangePriorityArgs {
execution_id: eid,
new_priority: body.new_priority,
lane_id: LaneId::new(""),
now: ff_core::types::TimestampMs::now(),
};
Ok(Json(server.backend().change_priority(args).await?))
}
async fn replay_execution(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
) -> Result<Json<ReplayExecutionResult>, ApiError> {
let eid = parse_execution_id(&id)?;
let args = ff_core::contracts::ReplayExecutionArgs {
execution_id: eid,
now: ff_core::types::TimestampMs::now(),
};
Ok(Json(server.backend().replay_execution(args).await?))
}
async fn revoke_lease(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
) -> Result<Json<RevokeLeaseResult>, ApiError> {
let eid = parse_execution_id(&id)?;
let args = ff_core::contracts::RevokeLeaseArgs {
execution_id: eid,
expected_lease_id: None,
worker_instance_id: WorkerInstanceId::new(""),
reason: "operator_revoke".to_owned(),
};
Ok(Json(server.backend().revoke_lease(args).await?))
}
#[derive(Deserialize)]
struct ClaimForWorkerBody {
lane_id: String,
worker_instance_id: String,
#[serde(default)]
capabilities: Vec<String>,
grant_ttl_ms: u64,
}
#[derive(Serialize)]
struct ClaimGrantDto {
execution_id: String,
partition_key: ff_core::partition::PartitionKey,
grant_key: String,
expires_at_ms: u64,
}
impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
fn from(g: ff_core::contracts::ClaimGrant) -> Self {
Self {
execution_id: g.execution_id.to_string(),
partition_key: g.partition_key,
grant_key: g.grant_key,
expires_at_ms: g.expires_at_ms,
}
}
}
const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
if value.is_empty() {
return Err(ApiError(ServerError::InvalidInput(format!(
"{field}: must not be empty"
))));
}
if value.len() > 256 {
return Err(ApiError(ServerError::InvalidInput(format!(
"{field}: exceeds 256 bytes (got {})",
value.len()
))));
}
if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
return Err(ApiError(ServerError::InvalidInput(format!(
"{field}: must not contain whitespace or control characters"
))));
}
Ok(())
}
async fn claim_for_worker(
State(server): State<Arc<Server>>,
Path(worker_id): Path<String>,
AppJson(body): AppJson<ClaimForWorkerBody>,
) -> Result<Response, ApiError> {
validate_identifier("worker_id", &worker_id)?;
validate_identifier("worker_instance_id", &body.worker_instance_id)?;
let worker_id = WorkerId::new(worker_id);
let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
let lane = LaneId::try_new(body.lane_id).map_err(|e| {
ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
})?;
if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
return Err(ApiError(ServerError::InvalidInput(format!(
"grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
))));
}
let caps: std::collections::BTreeSet<String> =
body.capabilities.into_iter().collect();
let args = ff_core::contracts::ClaimForWorkerArgs::new(
lane,
worker_id,
worker_instance_id,
caps,
body.grant_ttl_ms,
);
match server.backend().claim_for_worker(args).await? {
ff_core::contracts::ClaimForWorkerOutcome::Granted(grant) => {
Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response())
}
ff_core::contracts::ClaimForWorkerOutcome::NoWork => {
Ok(StatusCode::NO_CONTENT.into_response())
}
_ => Ok((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorBody::plain(
"claim_for_worker: backend returned a non-exhaustive outcome this server build does not understand".to_owned(),
)),
)
.into_response()),
}
}
#[derive(Deserialize)]
struct ReadStreamParams {
#[serde(default = "ff_core::contracts::StreamCursor::start")]
from: ff_core::contracts::StreamCursor,
#[serde(default = "ff_core::contracts::StreamCursor::end")]
to: ff_core::contracts::StreamCursor,
#[serde(default = "default_read_limit")]
limit: u64,
}
fn default_read_limit() -> u64 { 100 }
#[derive(Serialize)]
struct ReadStreamResponse {
frames: Vec<StreamFrame>,
count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
closed_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
closed_reason: Option<String>,
}
impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
fn from(sf: ff_core::contracts::StreamFrames) -> Self {
let count = sf.frames.len();
Self {
frames: sf.frames,
count,
closed_at: sf.closed_at.map(|t| t.0),
closed_reason: sf.closed_reason,
}
}
}
const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
async fn read_attempt_stream(
State(server): State<Arc<Server>>,
Path((id, idx)): Path<(String, u32)>,
Query(params): Query<ReadStreamParams>,
) -> Result<Json<ReadStreamResponse>, ApiError> {
if params.limit == 0 {
return Err(ApiError(ServerError::InvalidInput(
"limit must be >= 1".to_owned(),
)));
}
if params.limit > REST_STREAM_LIMIT_CEILING {
return Err(ApiError(ServerError::InvalidInput(format!(
"limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
))));
}
let eid = parse_execution_id(&id)?;
let attempt_index = AttemptIndex::new(idx);
let result = server
.read_attempt_stream(
&eid,
attempt_index,
params.from.to_wire(),
params.to.to_wire(),
params.limit,
)
.await?;
Ok(Json(result.into()))
}
#[derive(Deserialize)]
struct TailStreamParams {
#[serde(default = "ff_core::contracts::StreamCursor::beginning")]
after: ff_core::contracts::StreamCursor,
#[serde(default)]
block_ms: u64,
#[serde(default = "default_tail_limit")]
limit: u64,
}
fn default_tail_limit() -> u64 { 50 }
const MAX_TAIL_BLOCK_MS: u64 = 30_000;
async fn tail_attempt_stream(
State(server): State<Arc<Server>>,
Path((id, idx)): Path<(String, u32)>,
Query(params): Query<TailStreamParams>,
) -> Result<Json<ReadStreamResponse>, ApiError> {
if params.block_ms > MAX_TAIL_BLOCK_MS {
return Err(ApiError(ServerError::InvalidInput(format!(
"block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
))));
}
if params.limit == 0 {
return Err(ApiError(ServerError::InvalidInput(
"limit must be >= 1".to_owned(),
)));
}
if params.limit > REST_STREAM_LIMIT_CEILING {
return Err(ApiError(ServerError::InvalidInput(format!(
"limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
))));
}
if !params.after.is_concrete() {
return Err(ApiError(ServerError::InvalidInput(
"after: XREAD cursor must be a concrete entry id; pass '0-0' to start from the beginning"
.to_owned(),
)));
}
let eid = parse_execution_id(&id)?;
let attempt_index = AttemptIndex::new(idx);
let result = server
.tail_attempt_stream(
&eid,
attempt_index,
params.after.to_wire(),
params.block_ms,
params.limit,
)
.await?;
Ok(Json(result.into()))
}
async fn create_flow(
State(server): State<Arc<Server>>,
AppJson(args): AppJson<CreateFlowArgs>,
) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
let result = server.backend().create_flow(args).await?;
let status = match &result {
CreateFlowResult::Created { .. } => StatusCode::CREATED,
CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
};
Ok((status, Json(result)))
}
async fn add_execution_to_flow(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
let path_fid = parse_flow_id(&id)?;
check_id_match(&path_fid, &args.flow_id, "flow_id")?;
args.flow_id = path_fid;
let result = server.backend().add_execution_to_flow(args).await?;
let status = match &result {
AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
};
Ok((status, Json(result)))
}
async fn cancel_flow(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
Query(params): Query<HashMap<String, String>>,
AppJson(mut args): AppJson<CancelFlowArgs>,
) -> Result<Json<CancelFlowResult>, ApiError> {
let path_fid = parse_flow_id(&id)?;
check_id_match(&path_fid, &args.flow_id, "flow_id")?;
args.flow_id = path_fid;
let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
let result = if wait {
server.cancel_flow_wait(&args).await?
} else {
server.cancel_flow(&args).await?
};
Ok(Json(result))
}
async fn stage_dependency_edge(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
let path_fid = parse_flow_id(&id)?;
check_id_match(&path_fid, &args.flow_id, "flow_id")?;
args.flow_id = path_fid;
let result = server.backend().stage_dependency_edge(args).await?;
Ok((StatusCode::CREATED, Json(result)))
}
async fn apply_dependency_to_child(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
let path_fid = parse_flow_id(&id)?;
check_id_match(&path_fid, &args.flow_id, "flow_id")?;
args.flow_id = path_fid;
Ok(Json(server.backend().apply_dependency_to_child(args).await?))
}
async fn create_budget(
State(server): State<Arc<Server>>,
AppJson(args): AppJson<CreateBudgetArgs>,
) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
let result = server.backend().create_budget(args).await?;
let status = match &result {
CreateBudgetResult::Created { .. } => StatusCode::CREATED,
CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
};
Ok((status, Json(result)))
}
async fn get_budget_status(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
) -> Result<Json<BudgetStatus>, ApiError> {
let bid = parse_budget_id(&id)?;
Ok(Json(server.backend().get_budget_status(&bid).await?))
}
#[derive(Deserialize)]
struct ReportUsageBody {
dimensions: HashMap<String, u64>,
now: ff_core::types::TimestampMs,
#[serde(default)]
dedup_key: Option<String>,
}
async fn report_usage(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
AppJson(body): AppJson<ReportUsageBody>,
) -> Result<Json<ReportUsageResult>, ApiError> {
let bid = parse_budget_id(&id)?;
let dims: Vec<String> = body.dimensions.keys().cloned().collect();
let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
let mut args = ff_core::contracts::ReportUsageAdminArgs::new(dims, deltas, body.now);
if let Some(k) = body.dedup_key {
args = args.with_dedup_key(k);
}
Ok(Json(server.backend().report_usage_admin(&bid, args).await?))
}
async fn reset_budget(
State(server): State<Arc<Server>>,
Path(id): Path<String>,
) -> Result<Json<ResetBudgetResult>, ApiError> {
let bid = parse_budget_id(&id)?;
let args = ff_core::contracts::ResetBudgetArgs {
budget_id: bid,
now: ff_core::types::TimestampMs::now(),
};
Ok(Json(server.backend().reset_budget(args).await?))
}
async fn create_quota_policy(
State(server): State<Arc<Server>>,
AppJson(args): AppJson<CreateQuotaPolicyArgs>,
) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
let result = server.backend().create_quota_policy(args).await?;
let status = match &result {
CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
};
Ok((status, Json(result)))
}
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
}
async fn healthz(
State(server): State<Arc<Server>>,
) -> Result<Json<HealthResponse>, ApiError> {
server
.backend()
.ping()
.await
.map_err(|e| ApiError(ServerError::Engine(Box::new(e))))?;
Ok(Json(HealthResponse { status: "ok" }))
}
fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
if body_id != path_id {
return Err(ApiError(ServerError::InvalidInput(format!(
"path {id_name} does not match body {id_name}"
))));
}
Ok(())
}
fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
ExecutionId::parse(s)
.map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
}
fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
FlowId::parse(s)
.map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
}
fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
BudgetId::parse(s)
.map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
}
#[cfg(test)]
mod cors_tests {
use super::*;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use axum::routing::get;
use tower::ServiceExt;
fn app_with_cors(origins: &[String], auth_enabled: bool) -> Router {
let cors = build_cors_layer(origins, auth_enabled)
.expect("build_cors_layer succeeds for valid inputs");
Router::new().route("/noop", get(|| async { "ok" })).layer(cors)
}
#[test]
fn all_origins_invalid_returns_config_error_instead_of_permissive() {
let err = build_cors_layer(&["bad\0origin".to_owned()], false).unwrap_err();
let ConfigError::InvalidValue { var, message } = err;
assert_eq!(var, "FF_CORS_ORIGINS");
assert!(
message.contains("all configured origins failed to parse"),
"message was: {message}"
);
}
#[test]
fn wildcard_still_returns_permissive() {
let layer = build_cors_layer(&["*".to_owned()], false);
assert!(layer.is_ok());
}
#[test]
fn empty_origins_returns_empty_allowlist_ok() {
let layer = build_cors_layer(&[], false);
assert!(layer.is_ok());
}
#[test]
fn mixed_valid_and_invalid_keeps_valid_entries() {
let layer = build_cors_layer(
&["https://ok.example.com".to_owned(), "bad\0origin".to_owned()],
false,
);
assert!(layer.is_ok());
}
#[tokio::test]
async fn preflight_allows_authorization_when_auth_enabled() {
let app = app_with_cors(&["https://client.example.com".to_owned()], true);
let req = Request::builder()
.method("OPTIONS")
.uri("/noop")
.header("origin", "https://client.example.com")
.header("access-control-request-method", "GET")
.header("access-control-request-headers", "authorization,content-type")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let allow_headers = resp
.headers()
.get("access-control-allow-headers")
.expect("access-control-allow-headers present")
.to_str()
.unwrap()
.to_ascii_lowercase();
assert!(
allow_headers.contains("authorization"),
"expected `authorization` in Access-Control-Allow-Headers, got: {allow_headers}"
);
assert!(
allow_headers.contains("content-type"),
"expected `content-type` in Access-Control-Allow-Headers, got: {allow_headers}"
);
}
#[tokio::test]
async fn preflight_omits_authorization_when_auth_disabled() {
let app = app_with_cors(&["https://client.example.com".to_owned()], false);
let req = Request::builder()
.method("OPTIONS")
.uri("/noop")
.header("origin", "https://client.example.com")
.header("access-control-request-method", "GET")
.header("access-control-request-headers", "content-type")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let allow_headers = resp
.headers()
.get("access-control-allow-headers")
.expect("access-control-allow-headers present")
.to_str()
.unwrap()
.to_ascii_lowercase();
assert!(
!allow_headers.contains("authorization"),
"Authorization should not be advertised when auth is off; got: {allow_headers}"
);
assert!(allow_headers.contains("content-type"));
}
}
#[cfg(test)]
mod claim_grant_dto_tests {
use super::*;
use ff_core::contracts::ClaimGrant;
use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
use ff_core::types::{ExecutionId, FlowId};
fn sample_grant(family: PartitionFamily) -> ClaimGrant {
let config = ff_core::partition::PartitionConfig::default();
let fid = FlowId::new();
let eid = ExecutionId::for_flow(&fid, &config);
let p = Partition { family, index: 7 };
ClaimGrant {
execution_id: eid,
partition_key: PartitionKey::from(&p),
grant_key: "ff:exec:{fp:7}:deadbeef:claim_grant".to_owned(),
expires_at_ms: 1_700_000_000_000,
}
}
#[test]
fn claim_grant_dto_emits_opaque_partition_key() {
let dto = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
let json = serde_json::to_value(&dto).unwrap();
assert_eq!(json["partition_key"], serde_json::json!("{fp:7}"));
assert!(json.get("partition_family").is_none());
assert!(json.get("partition_index").is_none());
assert_eq!(json["expires_at_ms"], serde_json::json!(1_700_000_000_000u64));
}
#[test]
fn claim_grant_dto_collapses_execution_alias_on_wire() {
let dto_flow = ClaimGrantDto::from(sample_grant(PartitionFamily::Flow));
let dto_exec = ClaimGrantDto::from(sample_grant(PartitionFamily::Execution));
let jf = serde_json::to_value(&dto_flow).unwrap();
let je = serde_json::to_value(&dto_exec).unwrap();
assert_eq!(jf["partition_key"], je["partition_key"]);
}
}