1use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use axum::{
8 extract::{Path, Query, Request, State},
9 http::StatusCode,
10 middleware,
11 response::{IntoResponse, Response},
12 routing::{get, post, put},
13 Json, Router,
14};
15use serde::{Deserialize, Serialize};
16use axum::http::{HeaderName, Method};
17use tower_http::cors::{AllowOrigin, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20use ff_core::contracts::*;
21use ff_core::state::PublicState;
22use ff_core::types::*;
23
24use crate::server::{Server, ServerError};
25
26struct AppJson<T>(T);
29
30impl<S, T> axum::extract::FromRequest<S> for AppJson<T>
31where
32 T: serde::de::DeserializeOwned + Send,
33 S: Send + Sync,
34{
35 type Rejection = Response;
36
37 async fn from_request(
38 req: axum::extract::Request,
39 state: &S,
40 ) -> Result<Self, Self::Rejection> {
41 match Json::<T>::from_request(req, state).await {
42 Ok(Json(value)) => Ok(AppJson(value)),
43 Err(rejection) => {
44 let status = rejection.status();
45 tracing::debug!(detail = %rejection.body_text(), "JSON rejection");
46 let body = ErrorBody::plain(format!(
47 "invalid JSON: {}",
48 status.canonical_reason().unwrap_or("bad request"),
49 ));
50 Err((status, Json(body)).into_response())
51 }
52 }
53 }
54}
55
56struct ApiError(ServerError);
59
60impl From<ServerError> for ApiError {
61 fn from(e: ServerError) -> Self {
62 Self(e)
63 }
64}
65
66#[derive(Serialize)]
70struct ErrorBody {
71 error: String,
72 #[serde(skip_serializing_if = "Option::is_none")]
73 kind: Option<String>,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 retryable: Option<bool>,
76}
77
78impl ErrorBody {
79 fn plain(error: String) -> Self {
80 Self { error, kind: None, retryable: None }
81 }
82}
83
84impl IntoResponse for ApiError {
85 fn into_response(self) -> Response {
86 use ff_script::retry::kind_to_stable_str;
87
88 let (status, body) = match &self.0 {
89 ServerError::NotFound(msg) => {
90 (StatusCode::NOT_FOUND, ErrorBody::plain(msg.clone()))
91 }
92 ServerError::InvalidInput(msg) => {
93 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
94 }
95 ServerError::OperationFailed(msg) => {
96 (StatusCode::BAD_REQUEST, ErrorBody::plain(msg.clone()))
97 }
98 ServerError::ConcurrencyLimitExceeded(source, max) => (
99 StatusCode::TOO_MANY_REQUESTS,
100 ErrorBody {
101 error: format!(
102 "too many concurrent {source} calls (server max: {max}); retry with backoff"
103 ),
104 kind: None,
105 retryable: Some(true),
106 },
107 ),
108 ServerError::Valkey(e) => {
109 let kind_str = kind_to_stable_str(e.kind());
110 tracing::error!(
111 kind = kind_str,
112 code = e.code().unwrap_or(""),
113 detail = e.detail().unwrap_or(""),
114 "valkey error"
115 );
116 (
117 StatusCode::INTERNAL_SERVER_ERROR,
118 ErrorBody {
119 error: self.0.to_string(),
120 kind: Some(kind_str.to_owned()),
121 retryable: Some(self.0.is_retryable()),
122 },
123 )
124 }
125 ServerError::ValkeyContext { source, context } => {
126 let kind_str = kind_to_stable_str(source.kind());
127 tracing::error!(
128 kind = kind_str,
129 code = source.code().unwrap_or(""),
130 detail = source.detail().unwrap_or(""),
131 context = %context,
132 "valkey error"
133 );
134 (
135 StatusCode::INTERNAL_SERVER_ERROR,
136 ErrorBody {
137 error: self.0.to_string(),
138 kind: Some(kind_str.to_owned()),
139 retryable: Some(self.0.is_retryable()),
140 },
141 )
142 }
143 ServerError::LibraryLoad(load_err) => {
144 let kind_str = load_err.valkey_kind().map(kind_to_stable_str);
145 tracing::error!(
146 kind = kind_str.unwrap_or(""),
147 error = %load_err,
148 "library load failure"
149 );
150 (
151 StatusCode::INTERNAL_SERVER_ERROR,
152 ErrorBody {
153 error: format!("library load: {load_err}"),
154 kind: kind_str.map(str::to_owned),
155 retryable: Some(self.0.is_retryable()),
156 },
157 )
158 }
159 other => (
163 StatusCode::INTERNAL_SERVER_ERROR,
164 ErrorBody {
165 error: other.to_string(),
166 kind: None,
167 retryable: Some(false),
168 },
169 ),
170 };
171 (status, Json(body)).into_response()
172 }
173}
174
175pub fn router(server: Arc<Server>, cors_origins: &[String], api_token: Option<String>) -> Router {
178 let cors = build_cors_layer(cors_origins);
179
180 let mut app = Router::new()
181 .route("/v1/executions", get(list_executions).post(create_execution))
183 .route("/v1/executions/{id}", get(get_execution))
184 .route("/v1/executions/{id}/state", get(get_execution_state))
185 .route(
186 "/v1/executions/{id}/pending-waitpoints",
187 get(list_pending_waitpoints),
188 )
189 .route("/v1/executions/{id}/result", get(get_execution_result))
190 .route("/v1/executions/{id}/cancel", post(cancel_execution))
191 .route("/v1/executions/{id}/signal", post(deliver_signal))
192 .route("/v1/executions/{id}/priority", put(change_priority))
193 .route("/v1/executions/{id}/replay", post(replay_execution))
194 .route("/v1/executions/{id}/revoke-lease", post(revoke_lease))
195 .route("/v1/workers/{worker_id}/claim", post(claim_for_worker))
200 .route(
202 "/v1/executions/{id}/attempts/{idx}/stream",
203 get(read_attempt_stream),
204 )
205 .route(
206 "/v1/executions/{id}/attempts/{idx}/stream/tail",
207 get(tail_attempt_stream),
208 )
209 .route("/v1/flows", post(create_flow))
211 .route("/v1/flows/{id}/members", post(add_execution_to_flow))
212 .route("/v1/flows/{id}/cancel", post(cancel_flow))
213 .route("/v1/flows/{id}/edges", post(stage_dependency_edge))
214 .route("/v1/flows/{id}/edges/apply", post(apply_dependency_to_child))
215 .route("/v1/budgets", post(create_budget))
217 .route("/v1/budgets/{id}", get(get_budget_status))
218 .route("/v1/budgets/{id}/usage", post(report_usage))
219 .route("/v1/budgets/{id}/reset", post(reset_budget))
220 .route("/v1/quotas", post(create_quota_policy))
222 .route("/v1/admin/rotate-waitpoint-secret", post(rotate_waitpoint_secret))
224 .route("/healthz", get(healthz));
226
227 if let Some(token) = api_token {
228 let token = Arc::new(token);
229 app = app.layer(middleware::from_fn(move |req, next| {
230 let token = token.clone();
231 auth_middleware(token, req, next)
232 }));
233 }
234
235 app.layer(TraceLayer::new_for_http())
236 .layer(cors)
237 .with_state(server)
238}
239
240async fn auth_middleware(
241 token: Arc<String>,
242 req: Request,
243 next: middleware::Next,
244) -> Response {
245 if req.uri().path() == "/healthz" {
246 return next.run(req).await;
247 }
248
249 let auth_header = req
250 .headers()
251 .get("authorization")
252 .and_then(|v| v.to_str().ok());
253
254 let authorized = auth_header
255 .and_then(|v| v.strip_prefix("Bearer "))
256 .is_some_and(|t| constant_time_eq(t.as_bytes(), token.as_bytes()));
257
258 if authorized {
259 next.run(req).await
260 } else {
261 (
262 StatusCode::UNAUTHORIZED,
263 Json(ErrorBody::plain(
264 "missing or invalid Authorization header".to_owned(),
265 )),
266 )
267 .into_response()
268 }
269}
270
271fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
272 if a.len() != b.len() {
273 return false;
274 }
275 let mut diff = 0u8;
276 for (x, y) in a.iter().zip(b.iter()) {
277 diff |= x ^ y;
278 }
279 diff == 0
280}
281
282fn build_cors_layer(origins: &[String]) -> CorsLayer {
283 if origins.iter().any(|o| o == "*") {
284 return CorsLayer::permissive();
285 }
286 let parsed: Vec<_> = origins
287 .iter()
288 .filter_map(|o| o.parse().ok())
289 .collect();
290 if parsed.is_empty() && !origins.is_empty() {
291 tracing::warn!(
292 configured = ?origins,
293 "all configured CORS origins failed to parse, falling back to permissive"
294 );
295 return CorsLayer::permissive();
296 }
297 CorsLayer::new()
298 .allow_origin(AllowOrigin::list(parsed))
299 .allow_methods([Method::GET, Method::POST, Method::PUT])
300 .allow_headers([HeaderName::from_static("content-type")])
301}
302
303#[derive(Deserialize)]
306struct ListExecutionsParams {
307 partition: u16,
308 #[serde(default = "default_lane")]
309 lane: String,
310 #[serde(default = "default_state_filter")]
311 state: String,
312 #[serde(default = "default_limit")]
313 limit: u64,
314 #[serde(default)]
315 offset: u64,
316}
317
318fn default_lane() -> String { "default".to_owned() }
319fn default_state_filter() -> String { "eligible".to_owned() }
320fn default_limit() -> u64 { 50 }
321
322async fn list_executions(
323 State(server): State<Arc<Server>>,
324 Query(params): Query<ListExecutionsParams>,
325) -> Result<Json<ListExecutionsResult>, ApiError> {
326 let lane = ff_core::types::LaneId::try_new(params.lane.clone())
327 .map_err(|e| ApiError::from(ServerError::InvalidInput(format!("invalid lane: {e}"))))?;
328 let limit = params.limit.min(1000);
329 let result = server
330 .list_executions(params.partition, &lane, ¶ms.state, params.offset, limit)
331 .await?;
332 Ok(Json(result))
333}
334
335async fn create_execution(
336 State(server): State<Arc<Server>>,
337 AppJson(args): AppJson<CreateExecutionArgs>,
338) -> Result<(StatusCode, Json<CreateExecutionResult>), ApiError> {
339 let result = server.create_execution(&args).await?;
340 let status = match &result {
341 CreateExecutionResult::Created { .. } => StatusCode::CREATED,
342 CreateExecutionResult::Duplicate { .. } => StatusCode::OK,
343 };
344 Ok((status, Json(result)))
345}
346
347async fn get_execution(
348 State(server): State<Arc<Server>>,
349 Path(id): Path<String>,
350) -> Result<Json<ExecutionInfo>, ApiError> {
351 let eid = parse_execution_id(&id)?;
352 Ok(Json(server.get_execution(&eid).await?))
353}
354
355async fn get_execution_state(
356 State(server): State<Arc<Server>>,
357 Path(id): Path<String>,
358) -> Result<Json<PublicState>, ApiError> {
359 let eid = parse_execution_id(&id)?;
360 Ok(Json(server.get_execution_state(&eid).await?))
361}
362
363async fn list_pending_waitpoints(
376 State(server): State<Arc<Server>>,
377 Path(id): Path<String>,
378) -> Result<Json<Vec<PendingWaitpointInfo>>, ApiError> {
379 let eid = parse_execution_id(&id)?;
380 Ok(Json(server.list_pending_waitpoints(&eid).await?))
381}
382
383async fn get_execution_result(
418 State(server): State<Arc<Server>>,
419 Path(id): Path<String>,
420) -> Result<Response, ApiError> {
421 let eid = parse_execution_id(&id)?;
422 match server.get_execution_result(&eid).await? {
423 Some(bytes) => Ok((
424 StatusCode::OK,
425 [(axum::http::header::CONTENT_TYPE, "application/octet-stream")],
426 bytes,
427 )
428 .into_response()),
429 None => Err(ApiError(ServerError::NotFound(format!(
430 "execution result not found: {eid}"
431 )))),
432 }
433}
434
435async fn cancel_execution(
436 State(server): State<Arc<Server>>,
437 Path(id): Path<String>,
438 AppJson(mut args): AppJson<CancelExecutionArgs>,
439) -> Result<Json<CancelExecutionResult>, ApiError> {
440 let path_eid = parse_execution_id(&id)?;
441 check_id_match(&path_eid, &args.execution_id, "execution_id")?;
442 args.execution_id = path_eid;
443 Ok(Json(server.cancel_execution(&args).await?))
444}
445
446async fn deliver_signal(
447 State(server): State<Arc<Server>>,
448 Path(id): Path<String>,
449 AppJson(mut args): AppJson<DeliverSignalArgs>,
450) -> Result<Json<DeliverSignalResult>, ApiError> {
451 let path_eid = parse_execution_id(&id)?;
452 check_id_match(&path_eid, &args.execution_id, "execution_id")?;
453 args.execution_id = path_eid;
454 Ok(Json(server.deliver_signal(&args).await?))
455}
456
457#[derive(Deserialize)]
460struct RotateWaitpointSecretBody {
461 new_kid: String,
462 new_secret_hex: String,
464}
465
466const ROTATE_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
480
481async fn rotate_waitpoint_secret(
482 State(server): State<Arc<Server>>,
483 AppJson(body): AppJson<RotateWaitpointSecretBody>,
484) -> Result<Response, ApiError> {
485 let rotate_fut = server.rotate_waitpoint_secret(&body.new_kid, &body.new_secret_hex);
490 let result = match tokio::time::timeout(ROTATE_HTTP_TIMEOUT, rotate_fut).await {
491 Ok(r) => r?,
492 Err(_) => {
493 tracing::error!(
494 target: "audit",
495 new_kid = %body.new_kid,
496 timeout_s = ROTATE_HTTP_TIMEOUT.as_secs(),
497 "waitpoint_hmac_rotation_timeout_http_504"
498 );
499 let body = ErrorBody::plain(format!(
500 "rotation exceeded {}s server-side timeout; retry is safe \
501 (per-partition rotation is idempotent on the same new_kid + secret_hex)",
502 ROTATE_HTTP_TIMEOUT.as_secs()
503 ));
504 return Ok((StatusCode::GATEWAY_TIMEOUT, Json(body)).into_response());
505 }
506 };
507 if result.rotated == 0 && result.failed.is_empty() {
517 return Err(ApiError::from(ServerError::OperationFailed(
518 "rotation had no partitions to operate on \
519 (num_flow_partitions is 0 — server misconfigured)"
520 .to_owned(),
521 )));
522 }
523 if result.rotated == 0 && !result.failed.is_empty() {
524 return Err(ApiError::from(ServerError::OperationFailed(
525 "rotation failed on all partitions (check Valkey connectivity)".to_owned(),
526 )));
527 }
528 Ok(Json(result).into_response())
529}
530
531#[derive(Deserialize)]
532struct ChangePriorityBody {
533 new_priority: i32,
534}
535
536async fn change_priority(
537 State(server): State<Arc<Server>>,
538 Path(id): Path<String>,
539 AppJson(body): AppJson<ChangePriorityBody>,
540) -> Result<Json<ChangePriorityResult>, ApiError> {
541 let eid = parse_execution_id(&id)?;
542 Ok(Json(server.change_priority(&eid, body.new_priority).await?))
543}
544
545async fn replay_execution(
546 State(server): State<Arc<Server>>,
547 Path(id): Path<String>,
548) -> Result<Json<ReplayExecutionResult>, ApiError> {
549 let eid = parse_execution_id(&id)?;
550 Ok(Json(server.replay_execution(&eid).await?))
551}
552
553async fn revoke_lease(
554 State(server): State<Arc<Server>>,
555 Path(id): Path<String>,
556) -> Result<Json<RevokeLeaseResult>, ApiError> {
557 let eid = parse_execution_id(&id)?;
558 Ok(Json(server.revoke_lease(&eid).await?))
559}
560
561#[derive(Deserialize)]
571struct ClaimForWorkerBody {
572 lane_id: String,
573 worker_instance_id: String,
574 #[serde(default)]
578 capabilities: Vec<String>,
579 grant_ttl_ms: u64,
582}
583
584#[derive(Serialize)]
589struct ClaimGrantDto {
590 execution_id: String,
591 partition_family: &'static str,
592 partition_index: u16,
593 grant_key: String,
594 expires_at_ms: u64,
595}
596
597impl From<ff_core::contracts::ClaimGrant> for ClaimGrantDto {
598 fn from(g: ff_core::contracts::ClaimGrant) -> Self {
599 let family = match g.partition.family {
600 ff_core::partition::PartitionFamily::Flow => "flow",
601 ff_core::partition::PartitionFamily::Execution => "execution",
602 ff_core::partition::PartitionFamily::Budget => "budget",
603 ff_core::partition::PartitionFamily::Quota => "quota",
604 };
605 Self {
606 execution_id: g.execution_id.to_string(),
607 partition_family: family,
608 partition_index: g.partition.index,
609 grant_key: g.grant_key,
610 expires_at_ms: g.expires_at_ms,
611 }
612 }
613}
614
615const CLAIM_GRANT_TTL_MS_MAX: u64 = 60_000;
619
620fn validate_identifier(field: &str, value: &str) -> Result<(), ApiError> {
625 if value.is_empty() {
626 return Err(ApiError(ServerError::InvalidInput(format!(
627 "{field}: must not be empty"
628 ))));
629 }
630 if value.len() > 256 {
631 return Err(ApiError(ServerError::InvalidInput(format!(
632 "{field}: exceeds 256 bytes (got {})",
633 value.len()
634 ))));
635 }
636 if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
637 return Err(ApiError(ServerError::InvalidInput(format!(
638 "{field}: must not contain whitespace or control characters"
639 ))));
640 }
641 Ok(())
642}
643
644async fn claim_for_worker(
645 State(server): State<Arc<Server>>,
646 Path(worker_id): Path<String>,
647 AppJson(body): AppJson<ClaimForWorkerBody>,
648) -> Result<Response, ApiError> {
649 validate_identifier("worker_id", &worker_id)?;
650 validate_identifier("worker_instance_id", &body.worker_instance_id)?;
651 let worker_id = WorkerId::new(worker_id);
652 let worker_instance_id = WorkerInstanceId::new(body.worker_instance_id);
653 let lane = LaneId::try_new(body.lane_id).map_err(|e| {
654 ApiError(ServerError::InvalidInput(format!("lane_id: {e}")))
655 })?;
656 if body.grant_ttl_ms == 0 || body.grant_ttl_ms > CLAIM_GRANT_TTL_MS_MAX {
657 return Err(ApiError(ServerError::InvalidInput(format!(
658 "grant_ttl_ms must be in 1..={CLAIM_GRANT_TTL_MS_MAX}"
659 ))));
660 }
661 let caps: std::collections::BTreeSet<String> =
662 body.capabilities.into_iter().collect();
663
664 match server
665 .claim_for_worker(
666 &lane,
667 &worker_id,
668 &worker_instance_id,
669 &caps,
670 body.grant_ttl_ms,
671 )
672 .await?
673 {
674 Some(grant) => Ok((StatusCode::OK, Json(ClaimGrantDto::from(grant))).into_response()),
675 None => Ok(StatusCode::NO_CONTENT.into_response()),
676 }
677}
678
679#[derive(Deserialize)]
682struct ReadStreamParams {
683 #[serde(default = "default_from_id")]
684 from: String,
685 #[serde(default = "default_to_id")]
686 to: String,
687 #[serde(default = "default_read_limit")]
688 limit: u64,
689}
690
691fn default_from_id() -> String { "-".to_owned() }
692fn default_to_id() -> String { "+".to_owned() }
693fn default_read_limit() -> u64 { 100 }
694
695fn validate_stream_id(s: &str, field: &str, allow_open_markers: bool) -> Result<(), ApiError> {
703 if allow_open_markers && (s == "-" || s == "+") {
704 return Ok(());
705 }
706 let (ms_part, seq_part) = match s.split_once('-') {
708 Some((ms, seq)) => (ms, Some(seq)),
709 None => (s, None),
710 };
711 let ms_valid = !ms_part.is_empty() && ms_part.chars().all(|c| c.is_ascii_digit());
712 let seq_valid = seq_part
713 .map(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit()))
714 .unwrap_or(true);
715 if ms_valid && seq_valid {
716 Ok(())
717 } else {
718 Err(ApiError(ServerError::InvalidInput(format!(
719 "{field}: invalid stream ID '{s}' (expected '-', '+', '<ms>', or '<ms>-<seq>')"
720 ))))
721 }
722}
723
724#[derive(Serialize)]
725struct ReadStreamResponse {
726 frames: Vec<StreamFrame>,
727 count: usize,
728 #[serde(skip_serializing_if = "Option::is_none")]
733 closed_at: Option<i64>,
734 #[serde(skip_serializing_if = "Option::is_none")]
737 closed_reason: Option<String>,
738}
739
740impl From<ff_core::contracts::StreamFrames> for ReadStreamResponse {
741 fn from(sf: ff_core::contracts::StreamFrames) -> Self {
742 let count = sf.frames.len();
743 Self {
744 frames: sf.frames,
745 count,
746 closed_at: sf.closed_at.map(|t| t.0),
747 closed_reason: sf.closed_reason,
748 }
749 }
750}
751
752const REST_STREAM_LIMIT_CEILING: u64 = 1_000;
762
763async fn read_attempt_stream(
764 State(server): State<Arc<Server>>,
765 Path((id, idx)): Path<(String, u32)>,
766 Query(params): Query<ReadStreamParams>,
767) -> Result<Json<ReadStreamResponse>, ApiError> {
768 if params.limit == 0 {
769 return Err(ApiError(ServerError::InvalidInput(
770 "limit must be >= 1".to_owned(),
771 )));
772 }
773 if params.limit > REST_STREAM_LIMIT_CEILING {
774 return Err(ApiError(ServerError::InvalidInput(format!(
775 "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via from/to for larger spans"
776 ))));
777 }
778 validate_stream_id(¶ms.from, "from", true)?;
779 validate_stream_id(¶ms.to, "to", true)?;
780
781 let eid = parse_execution_id(&id)?;
782 let attempt_index = AttemptIndex::new(idx);
783 let result = server
784 .read_attempt_stream(&eid, attempt_index, ¶ms.from, ¶ms.to, params.limit)
785 .await?;
786 Ok(Json(result.into()))
787}
788
789#[derive(Deserialize)]
790struct TailStreamParams {
791 #[serde(default = "default_tail_after")]
792 after: String,
793 #[serde(default)]
794 block_ms: u64,
795 #[serde(default = "default_tail_limit")]
796 limit: u64,
797}
798
799fn default_tail_after() -> String { "0-0".to_owned() }
800fn default_tail_limit() -> u64 { 50 }
801
802const MAX_TAIL_BLOCK_MS: u64 = 30_000;
811
812async fn tail_attempt_stream(
813 State(server): State<Arc<Server>>,
814 Path((id, idx)): Path<(String, u32)>,
815 Query(params): Query<TailStreamParams>,
816) -> Result<Json<ReadStreamResponse>, ApiError> {
817 if params.block_ms > MAX_TAIL_BLOCK_MS {
818 return Err(ApiError(ServerError::InvalidInput(format!(
819 "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
820 ))));
821 }
822 if params.limit == 0 {
823 return Err(ApiError(ServerError::InvalidInput(
824 "limit must be >= 1".to_owned(),
825 )));
826 }
827 if params.limit > REST_STREAM_LIMIT_CEILING {
828 return Err(ApiError(ServerError::InvalidInput(format!(
829 "limit exceeds REST ceiling {REST_STREAM_LIMIT_CEILING}; paginate via after for larger spans"
830 ))));
831 }
832 validate_stream_id(¶ms.after, "after", false)?;
834
835 let eid = parse_execution_id(&id)?;
836 let attempt_index = AttemptIndex::new(idx);
837 let result = server
838 .tail_attempt_stream(&eid, attempt_index, ¶ms.after, params.block_ms, params.limit)
839 .await?;
840 Ok(Json(result.into()))
841}
842
843async fn create_flow(
846 State(server): State<Arc<Server>>,
847 AppJson(args): AppJson<CreateFlowArgs>,
848) -> Result<(StatusCode, Json<CreateFlowResult>), ApiError> {
849 let result = server.create_flow(&args).await?;
850 let status = match &result {
851 CreateFlowResult::Created { .. } => StatusCode::CREATED,
852 CreateFlowResult::AlreadySatisfied { .. } => StatusCode::OK,
853 };
854 Ok((status, Json(result)))
855}
856
857async fn add_execution_to_flow(
858 State(server): State<Arc<Server>>,
859 Path(id): Path<String>,
860 AppJson(mut args): AppJson<AddExecutionToFlowArgs>,
861) -> Result<(StatusCode, Json<AddExecutionToFlowResult>), ApiError> {
862 let path_fid = parse_flow_id(&id)?;
863 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
864 args.flow_id = path_fid;
865 let result = server.add_execution_to_flow(&args).await?;
866 let status = match &result {
867 AddExecutionToFlowResult::Added { .. } => StatusCode::CREATED,
868 AddExecutionToFlowResult::AlreadyMember { .. } => StatusCode::OK,
869 };
870 Ok((status, Json(result)))
871}
872
873async fn cancel_flow(
886 State(server): State<Arc<Server>>,
887 Path(id): Path<String>,
888 Query(params): Query<HashMap<String, String>>,
889 AppJson(mut args): AppJson<CancelFlowArgs>,
890) -> Result<Json<CancelFlowResult>, ApiError> {
891 let path_fid = parse_flow_id(&id)?;
892 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
893 args.flow_id = path_fid;
894 let wait = params.get("wait").is_some_and(|v| v == "true" || v == "1");
895 let result = if wait {
896 server.cancel_flow_wait(&args).await?
897 } else {
898 server.cancel_flow(&args).await?
899 };
900 Ok(Json(result))
901}
902
903async fn stage_dependency_edge(
904 State(server): State<Arc<Server>>,
905 Path(id): Path<String>,
906 AppJson(mut args): AppJson<StageDependencyEdgeArgs>,
907) -> Result<(StatusCode, Json<StageDependencyEdgeResult>), ApiError> {
908 let path_fid = parse_flow_id(&id)?;
909 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
910 args.flow_id = path_fid;
911 let result = server.stage_dependency_edge(&args).await?;
912 Ok((StatusCode::CREATED, Json(result)))
913}
914
915async fn apply_dependency_to_child(
916 State(server): State<Arc<Server>>,
917 Path(id): Path<String>,
918 AppJson(mut args): AppJson<ApplyDependencyToChildArgs>,
919) -> Result<Json<ApplyDependencyToChildResult>, ApiError> {
920 let path_fid = parse_flow_id(&id)?;
921 check_id_match(&path_fid, &args.flow_id, "flow_id")?;
922 args.flow_id = path_fid;
923 Ok(Json(server.apply_dependency_to_child(&args).await?))
924}
925
926async fn create_budget(
929 State(server): State<Arc<Server>>,
930 AppJson(args): AppJson<CreateBudgetArgs>,
931) -> Result<(StatusCode, Json<CreateBudgetResult>), ApiError> {
932 let result = server.create_budget(&args).await?;
933 let status = match &result {
934 CreateBudgetResult::Created { .. } => StatusCode::CREATED,
935 CreateBudgetResult::AlreadySatisfied { .. } => StatusCode::OK,
936 };
937 Ok((status, Json(result)))
938}
939
940async fn get_budget_status(
941 State(server): State<Arc<Server>>,
942 Path(id): Path<String>,
943) -> Result<Json<BudgetStatus>, ApiError> {
944 let bid = parse_budget_id(&id)?;
945 Ok(Json(server.get_budget_status(&bid).await?))
946}
947
948#[derive(Deserialize)]
949struct ReportUsageBody {
950 dimensions: HashMap<String, u64>,
951 now: ff_core::types::TimestampMs,
952 #[serde(default)]
953 dedup_key: Option<String>,
954}
955
956async fn report_usage(
957 State(server): State<Arc<Server>>,
958 Path(id): Path<String>,
959 AppJson(body): AppJson<ReportUsageBody>,
960) -> Result<Json<ReportUsageResult>, ApiError> {
961 let bid = parse_budget_id(&id)?;
962 let dims: Vec<String> = body.dimensions.keys().cloned().collect();
963 let deltas: Vec<u64> = dims.iter().map(|d| body.dimensions[d]).collect();
964 let args = ReportUsageArgs {
965 dimensions: dims,
966 deltas,
967 now: body.now,
968 dedup_key: body.dedup_key,
969 };
970 Ok(Json(server.report_usage(&bid, &args).await?))
971}
972
973async fn reset_budget(
974 State(server): State<Arc<Server>>,
975 Path(id): Path<String>,
976) -> Result<Json<ResetBudgetResult>, ApiError> {
977 let bid = parse_budget_id(&id)?;
978 Ok(Json(server.reset_budget(&bid).await?))
979}
980
981async fn create_quota_policy(
982 State(server): State<Arc<Server>>,
983 AppJson(args): AppJson<CreateQuotaPolicyArgs>,
984) -> Result<(StatusCode, Json<CreateQuotaPolicyResult>), ApiError> {
985 let result = server.create_quota_policy(&args).await?;
986 let status = match &result {
987 CreateQuotaPolicyResult::Created { .. } => StatusCode::CREATED,
988 CreateQuotaPolicyResult::AlreadySatisfied { .. } => StatusCode::OK,
989 };
990 Ok((status, Json(result)))
991}
992
993#[derive(Serialize)]
996struct HealthResponse {
997 status: &'static str,
998}
999
1000async fn healthz(
1001 State(server): State<Arc<Server>>,
1002) -> Result<Json<HealthResponse>, ApiError> {
1003 let _: String = server
1004 .client()
1005 .cmd("PING")
1006 .execute()
1007 .await
1008 .map_err(|e| ApiError(ServerError::ValkeyContext { source: e, context: "healthz PING".into() }))?;
1009 Ok(Json(HealthResponse { status: "ok" }))
1010}
1011
1012fn check_id_match<T: PartialEq + fmt::Display>(path_id: &T, body_id: &T, id_name: &str) -> Result<(), ApiError> {
1016 if body_id != path_id {
1017 return Err(ApiError(ServerError::InvalidInput(format!(
1018 "path {id_name} does not match body {id_name}"
1019 ))));
1020 }
1021 Ok(())
1022}
1023
1024fn parse_execution_id(s: &str) -> Result<ExecutionId, ApiError> {
1025 ExecutionId::parse(s)
1026 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid execution_id: {e}"))))
1027}
1028
1029fn parse_flow_id(s: &str) -> Result<FlowId, ApiError> {
1030 FlowId::parse(s)
1031 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid flow_id: {e}"))))
1032}
1033
1034fn parse_budget_id(s: &str) -> Result<BudgetId, ApiError> {
1035 BudgetId::parse(s)
1036 .map_err(|e| ApiError(ServerError::InvalidInput(format!("invalid budget_id: {e}"))))
1037}