ff_sdk/admin.rs
1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use ff_core::contracts::RotateWaitpointHmacSecretOutcome;
21use ff_core::engine_backend::EngineBackend;
22// v0.12 PR-6: these imports only power the Valkey-typed
23// `rotate_waitpoint_hmac_secret_all_partitions` helper at the bottom
24// of the module; gated so the ungated module builds clean under
25// `--no-default-features --features sqlite`.
26#[cfg(feature = "valkey-default")]
27use ff_core::contracts::RotateWaitpointHmacSecretArgs;
28#[cfg(feature = "valkey-default")]
29use ff_core::keys::IndexKeys;
30#[cfg(feature = "valkey-default")]
31use ff_core::partition::{Partition, PartitionFamily};
32use serde::{Deserialize, Serialize};
33
34use crate::SdkError;
35
36/// Default per-request timeout. The server's own
37/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
38/// client deadline is LATER than the server deadline and
39/// operators see the structured 504 GATEWAY_TIMEOUT body rather
40/// than a client-side timeout error.
41const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
42
43/// Grace window in ms that the embedded `rotate_waitpoint_secret`
44/// path forwards to the backend primitive. Matches `ff-server`'s
45/// default `FF_WAITPOINT_HMAC_GRACE_MS` (24 h) so tokens signed by
46/// the outgoing kid remain valid for a full day after rotation
47/// without the embedded-path hard-killing in-flight flows. HTTP
48/// callers get whatever the server was configured with; embedded
49/// callers get this pinned default.
50pub const EMBEDDED_WAITPOINT_HMAC_GRACE_MS: u64 = 86_400_000;
51
52/// Maximum grant TTL (ms) the embedded admin path accepts on
53/// `claim_for_worker` / `issue_reclaim_grant`. Matches
54/// `ff-server`'s `CLAIM_GRANT_TTL_MS_MAX` / `RECLAIM_GRANT_TTL_MS_MAX`
55/// so the embedded transport rejects the same range the HTTP
56/// transport does.
57const EMBEDDED_GRANT_TTL_MS_MAX: u64 = 60_000;
58
59/// Client for FlowFabric admin primitives — backend-agnostic facade
60/// (v0.13 SC-10 ergonomics).
61///
62/// Two construction shapes:
63///
64/// * [`FlowFabricAdminClient::new`] / [`FlowFabricAdminClient::with_token`]
65/// — HTTP transport targeting a running `ff-server`.
66/// * [`FlowFabricAdminClient::connect_with`] — embedded transport
67/// that dispatches directly through an `Arc<dyn EngineBackend>`.
68/// No `ff-server` required; works under `FF_DEV_MODE=1` + SQLite
69/// and in any in-process deployment.
70///
71/// The public method surface is identical across both transports;
72/// consumers choose at construction time. Admin methods that have
73/// no backend-trait equivalent return
74/// [`SdkError::AdminApi`] with status 503 on the embedded path —
75/// today every method on this client maps cleanly, so this fallback
76/// is only reached if a future admin primitive lands HTTP-first.
77#[derive(Debug, Clone)]
78pub struct FlowFabricAdminClient {
79 transport: AdminTransport,
80}
81
82/// Internal discriminator between the HTTP and embedded transports.
83/// Private by design — the public API is uniform across both shapes
84/// (see the [`FlowFabricAdminClient`] type-level docs).
85#[derive(Clone)]
86enum AdminTransport {
87 Http {
88 http: reqwest::Client,
89 base_url: String,
90 },
91 Embedded(Arc<dyn EngineBackend>),
92}
93
94impl std::fmt::Debug for AdminTransport {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 AdminTransport::Http { base_url, .. } => f
98 .debug_struct("Http")
99 .field("base_url", base_url)
100 .finish_non_exhaustive(),
101 AdminTransport::Embedded(backend) => f
102 .debug_struct("Embedded")
103 .field("backend", &backend.backend_label())
104 .finish(),
105 }
106 }
107}
108
109impl FlowFabricAdminClient {
110 /// Build a client without auth. Suitable for a dev ff-server
111 /// whose `api_token` is unconfigured. Production deployments
112 /// should use [`with_token`](Self::with_token).
113 pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
114 let http = reqwest::Client::builder()
115 .timeout(DEFAULT_TIMEOUT)
116 .build()
117 .map_err(|e| SdkError::Http {
118 source: e,
119 context: "build reqwest::Client".into(),
120 })?;
121 Ok(Self {
122 transport: AdminTransport::Http {
123 http,
124 base_url: normalize_base_url(base_url.into()),
125 },
126 })
127 }
128
129 /// Build a client that dispatches admin primitives directly
130 /// through an `Arc<dyn EngineBackend>`, bypassing HTTP entirely.
131 ///
132 /// # When to use
133 ///
134 /// * `FF_DEV_MODE=1` SQLite scenarios where no `ff-server` is
135 /// running.
136 /// * In-process / embedded deployments that hold a backend
137 /// handle already (e.g. tests, examples, scheduler-hosting
138 /// binaries).
139 ///
140 /// # Semantic parity
141 ///
142 /// Each method on [`FlowFabricAdminClient`] dispatches to the
143 /// equivalent `EngineBackend` trait method (see the RFC-024 /
144 /// RFC-017 admin surfaces). Validation **rules** + request-body
145 /// translation mirror the server-side handler in `ff-server` so
146 /// callers get the same accept / reject behaviour across
147 /// transports. Note: the exact [`SdkError`] variant differs —
148 /// embedded-path validation rejects surface as [`SdkError::Config`]
149 /// (no HTTP round-trip) while HTTP returns [`SdkError::AdminApi`]
150 /// with status `400`. Callers that need to distinguish a 4xx from
151 /// a transport fault should use [`SdkError::is_retryable`] or
152 /// match on `Config` + `AdminApi` together rather than relying on
153 /// a single variant.
154 ///
155 /// `EngineError::Unavailable` from the backend — emitted by
156 /// backends that have not implemented a given method — is mapped
157 /// to [`SdkError::AdminApi`] with `status = 503` and
158 /// `kind = Some("unavailable")` so callers see a uniform
159 /// admin-error surface across transports.
160 ///
161 /// # Divergence from the HTTP transport
162 ///
163 /// * `rotate_waitpoint_secret` forwards
164 /// [`EMBEDDED_WAITPOINT_HMAC_GRACE_MS`] (24 h, matching
165 /// `ff-server`'s default `FF_WAITPOINT_HMAC_GRACE_MS`) as the
166 /// per-partition grace window. The HTTP transport reads the
167 /// server's env-configured value; the embedded client has no
168 /// config surface so it pins the documented default.
169 /// * No single-writer admin semaphore, no audit-log emission.
170 /// These are `ff-server` responsibilities; embedded consumers
171 /// wanting them bring their own gate.
172 pub fn connect_with(backend: Arc<dyn EngineBackend>) -> Self {
173 Self {
174 transport: AdminTransport::Embedded(backend),
175 }
176 }
177
178 /// Build a client that sends `Authorization: Bearer <token>` on
179 /// every request. The token is passed by value so the caller
180 /// retains ownership policy (e.g. zeroize on drop at the
181 /// caller side); the SDK only reads it.
182 ///
183 /// # Empty-token guard
184 ///
185 /// An empty or all-whitespace `token` returns
186 /// [`SdkError::Config`] instead of silently constructing
187 /// `Authorization: Bearer ` (which the server rejects with
188 /// 401, leaving the operator chasing a "why is auth broken"
189 /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
190 /// where the var was meant to be set; the unset-expansion is
191 /// the empty string. Prefer an obvious error at construction
192 /// over a silent 401 at first request.
193 ///
194 /// If the caller genuinely wants an unauthenticated client
195 /// (dev ff-server without `api_token` configured), use
196 /// [`FlowFabricAdminClient::new`] instead.
197 pub fn with_token(
198 base_url: impl Into<String>,
199 token: impl AsRef<str>,
200 ) -> Result<Self, SdkError> {
201 let token_str = token.as_ref();
202 if token_str.trim().is_empty() {
203 return Err(SdkError::Config {
204 context: "admin_client".into(),
205 field: Some("bearer_token".into()),
206 message: "is empty or all-whitespace; use \
207 FlowFabricAdminClient::new for unauthenticated access"
208 .into(),
209 });
210 }
211 let mut headers = reqwest::header::HeaderMap::new();
212 let mut auth_value =
213 reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
214 |_| SdkError::Config {
215 context: "admin_client".into(),
216 field: Some("bearer_token".into()),
217 message: "contains characters not valid in an HTTP header".into(),
218 },
219 )?;
220 // Mark Authorization as sensitive so it doesn't appear in
221 // reqwest's Debug output / logs.
222 auth_value.set_sensitive(true);
223 headers.insert(reqwest::header::AUTHORIZATION, auth_value);
224
225 let http = reqwest::Client::builder()
226 .timeout(DEFAULT_TIMEOUT)
227 .default_headers(headers)
228 .build()
229 .map_err(|e| SdkError::Http {
230 source: e,
231 context: "build reqwest::Client".into(),
232 })?;
233 Ok(Self {
234 transport: AdminTransport::Http {
235 http,
236 base_url: normalize_base_url(base_url.into()),
237 },
238 })
239 }
240
241 /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
242 ///
243 /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
244 /// server-side one: the request carries lane + identity +
245 /// capabilities + grant TTL; the server runs budget, quota, and
246 /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
247 /// and returns a `ClaimGrant` on success.
248 ///
249 /// Returns `Ok(None)` when the server responds 204 No Content
250 /// (no eligible execution on the lane). Callers that want to keep
251 /// polling should back off per their claim cadence.
252 pub async fn claim_for_worker(
253 &self,
254 req: ClaimForWorkerRequest,
255 ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
256 match &self.transport {
257 AdminTransport::Http { http, base_url } => {
258 claim_for_worker_http(http, base_url, req).await
259 }
260 AdminTransport::Embedded(backend) => {
261 claim_for_worker_embedded(backend.as_ref(), req).await
262 }
263 }
264 }
265
266 /// Rotate the waitpoint HMAC secret on the server.
267 ///
268 /// Promotes the currently-installed kid to `previous_kid`
269 /// (accepted for the server's configured
270 /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
271 /// `new_secret_hex` under `new_kid` as the new current. Fans
272 /// out across every execution partition. Idempotent: re-running
273 /// with the same `(new_kid, new_secret_hex)` converges.
274 ///
275 /// The server returns 200 if at least one partition rotated OR
276 /// at least one partition was already rotating under a
277 /// concurrent request. See `RotateWaitpointSecretResponse`
278 /// fields for the breakdown.
279 ///
280 /// # Errors
281 ///
282 /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
283 /// input, 401 missing/bad bearer, 429 concurrent rotate,
284 /// 500 all partitions failed, 504 server-side timeout).
285 /// * [`SdkError::Http`] — transport error (connect, body
286 /// decode, client-side timeout).
287 ///
288 /// # Retry semantics
289 ///
290 /// Rotation is idempotent on the same `(new_kid,
291 /// new_secret_hex)` so retries are SAFE even on 504s or
292 /// partial failures.
293 pub async fn rotate_waitpoint_secret(
294 &self,
295 req: RotateWaitpointSecretRequest,
296 ) -> Result<RotateWaitpointSecretResponse, SdkError> {
297 match &self.transport {
298 AdminTransport::Http { http, base_url } => {
299 rotate_waitpoint_secret_http(http, base_url, req).await
300 }
301 AdminTransport::Embedded(backend) => {
302 rotate_waitpoint_secret_embedded(backend.as_ref(), req).await
303 }
304 }
305 }
306
307 /// Read the raw HMAC `waitpoint_token` for a specific
308 /// `(execution_id, waitpoint_id)` pair.
309 ///
310 /// # Operator-only
311 ///
312 /// The sibling `list_pending_waitpoints` API intentionally sanitises
313 /// this field (RFC-017 Stage E4 / v0.8.0 §8) — consumers correlate
314 /// via `(token_kid, token_fingerprint)` and normally obtain the raw
315 /// token from their own worker's `SuspendOutcome` at suspend time.
316 /// This admin method re-exposes the token behind the server's
317 /// bearer-auth layer so operator tooling (approval CLIs,
318 /// external-callback bridges) can fetch a delivery credential
319 /// programmatically instead of copy-pasting from worker logs.
320 ///
321 /// Deployments MUST run `ff-server` with `api_token` configured
322 /// when exposing this endpoint to untrusted networks. The embedded
323 /// transport has no auth boundary — access is gated by whoever
324 /// holds the `Arc<dyn EngineBackend>`.
325 ///
326 /// # Returns
327 ///
328 /// * `Ok(Some(token))` — HMAC token string suitable for
329 /// `DeliverSignalArgs::waitpoint_token` / `/signal` request body.
330 /// * `Ok(None)` — waitpoint is unknown, consumed, expired, or the
331 /// stored token column is empty.
332 /// * `Err(SdkError::AdminApi { status: 503, kind: Some("unavailable") })`
333 /// — the backend (e.g. an out-of-tree implementation) has not
334 /// overridden `EngineBackend::read_waitpoint_token`.
335 pub async fn read_waitpoint_token(
336 &self,
337 execution_id: &ff_core::types::ExecutionId,
338 waitpoint_id: &ff_core::types::WaitpointId,
339 ) -> Result<Option<String>, SdkError> {
340 match &self.transport {
341 AdminTransport::Http { http, base_url } => {
342 read_waitpoint_token_http(http, base_url, execution_id, waitpoint_id).await
343 }
344 AdminTransport::Embedded(backend) => {
345 read_waitpoint_token_embedded(backend.as_ref(), execution_id, waitpoint_id).await
346 }
347 }
348 }
349
350 /// Request a lease-reclaim grant for an execution in
351 /// `lease_expired_reclaimable` or `lease_revoked` state (RFC-024
352 /// §3.5).
353 ///
354 /// Routes `POST /v1/executions/{execution_id}/reclaim`. The
355 /// ff-server handler dispatches through the `EngineBackend` trait
356 /// to whichever backend the server was started with (Valkey /
357 /// Postgres / SQLite).
358 ///
359 /// # worker_capabilities (RFC-024 §3.2 B-2)
360 ///
361 /// The request body carries `worker_capabilities`. Consumers typically
362 /// source these from their registered worker's configured
363 /// `WorkerConfig::capabilities`. Admission compares
364 /// `worker_capabilities` against the execution's
365 /// `required_capabilities` (persisted on `exec_core` at
366 /// `create_execution` time from
367 /// `ExecutionPolicy.routing_requirements.required_capabilities`);
368 /// any required capability missing from the worker set surfaces as
369 /// `IssueReclaimGrantResponse::NotReclaimable { detail:
370 /// "capability_mismatch: <missing csv>" }` (Lua
371 /// `ff_issue_reclaim_grant`, `crates/ff-script/src/flowfabric.lua`
372 /// §3969-3982; sqlite/PG backends mirror the check). The SDK does
373 /// not re-read worker state automatically — admin clients are not
374 /// bound to a worker — so the consumer threads the capabilities
375 /// through at call-time.
376 ///
377 /// `capability_hash` is NOT consulted for admission; it is stored
378 /// verbatim on the grant hash for audit / downstream observability
379 /// only.
380 ///
381 /// # Consumer flow (RFC-024 §4.4)
382 ///
383 /// 1. Consumer's `POST /v1/runs/:id/complete` returns
384 /// `lease_expired`.
385 /// 2. Consumer calls this method; handles
386 /// [`IssueReclaimGrantResponse::Granted`] → builds a
387 /// [`ff_core::contracts::ReclaimGrant`] via
388 /// [`IssueReclaimGrantResponse::into_grant`].
389 /// 3. Consumer passes the grant to
390 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`] along
391 /// with a fresh [`ff_core::contracts::ReclaimExecutionArgs`];
392 /// the new attempt is minted with `HandleKind::Reclaimed`.
393 /// 4. Consumer drives terminal writes on the fresh lease.
394 ///
395 /// # Errors
396 ///
397 /// * [`SdkError::AdminApi`] — non-2xx response. 404 when the
398 /// execution does not exist; 400 on malformed `execution_id` or
399 /// body.
400 /// * [`SdkError::Http`] — transport error (connect, body
401 /// decode, client-side timeout).
402 ///
403 /// # Retry semantics
404 ///
405 /// Idempotent on the Lua side: repeated calls against an execution
406 /// already re-leased (a concurrent reclaim beat this one) surface
407 /// as `NotReclaimable`. Safe to retry on transport faults.
408 pub async fn issue_reclaim_grant(
409 &self,
410 execution_id: &str,
411 req: IssueReclaimGrantRequest,
412 ) -> Result<IssueReclaimGrantResponse, SdkError> {
413 match &self.transport {
414 AdminTransport::Http { http, base_url } => {
415 issue_reclaim_grant_http(http, base_url, execution_id, req).await
416 }
417 AdminTransport::Embedded(backend) => {
418 issue_reclaim_grant_embedded(backend.as_ref(), execution_id, req).await
419 }
420 }
421 }
422}
423
424// ── HTTP-transport helpers (private) ─────────────────────────────────
425
426async fn claim_for_worker_http(
427 http: &reqwest::Client,
428 base_url: &str,
429 req: ClaimForWorkerRequest,
430) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
431 // Percent-encode `worker_id` in the URL path — `WorkerId` is a
432 // free-form string (could contain `/`, spaces, `%`, etc.) and
433 // splicing it verbatim would produce malformed URLs or
434 // misrouted paths. `Url::path_segments_mut().push` handles the
435 // encoding natively.
436 let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
437 context: "admin_client: claim_for_worker".into(),
438 field: Some("base_url".into()),
439 message: format!("invalid base_url '{}': {e}", base_url),
440 })?;
441 {
442 let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
443 context: "admin_client: claim_for_worker".into(),
444 field: Some("base_url".into()),
445 message: format!("base_url cannot be a base URL: '{}'", base_url),
446 })?;
447 segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
448 }
449 let url = url.to_string();
450 let resp = http
451 .post(&url)
452 .json(&req)
453 .send()
454 .await
455 .map_err(|e| SdkError::Http {
456 source: e,
457 context: "POST /v1/workers/{worker_id}/claim".into(),
458 })?;
459
460 let status = resp.status();
461 if status == reqwest::StatusCode::NO_CONTENT {
462 return Ok(None);
463 }
464 if status.is_success() {
465 return resp
466 .json::<ClaimForWorkerResponse>()
467 .await
468 .map(Some)
469 .map_err(|e| SdkError::Http {
470 source: e,
471 context: "decode claim_for_worker response body".into(),
472 });
473 }
474
475 // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
476 let status_u16 = status.as_u16();
477 let raw = resp.text().await.map_err(|e| SdkError::Http {
478 source: e,
479 context: format!("read claim_for_worker error body (status {status_u16})"),
480 })?;
481 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
482 Err(SdkError::AdminApi {
483 status: status_u16,
484 message: parsed
485 .as_ref()
486 .map(|b| b.error.clone())
487 .unwrap_or_else(|| raw.clone()),
488 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
489 retryable: parsed.as_ref().and_then(|b| b.retryable),
490 raw_body: raw,
491 })
492}
493
494async fn rotate_waitpoint_secret_http(
495 http: &reqwest::Client,
496 base_url: &str,
497 req: RotateWaitpointSecretRequest,
498) -> Result<RotateWaitpointSecretResponse, SdkError> {
499 let url = format!("{}/v1/admin/rotate-waitpoint-secret", base_url);
500 let resp = http
501 .post(&url)
502 .json(&req)
503 .send()
504 .await
505 .map_err(|e| SdkError::Http {
506 source: e,
507 context: "POST /v1/admin/rotate-waitpoint-secret".into(),
508 })?;
509
510 let status = resp.status();
511 if status.is_success() {
512 return resp
513 .json::<RotateWaitpointSecretResponse>()
514 .await
515 .map_err(|e| SdkError::Http {
516 source: e,
517 context: "decode rotate-waitpoint-secret response body".into(),
518 });
519 }
520
521 // Non-2xx: parse the server's ErrorBody if we can, fall
522 // back to a raw body otherwise. Propagate body-read
523 // transport errors as Http rather than silently flattening
524 // them into `AdminApi { raw_body: "" }` — a connection drop
525 // mid-body-read is a transport fault, not an API-layer
526 // reject, and misclassifying it strips `is_retryable`'s
527 // timeout/connect signal from the caller.
528 let status_u16 = status.as_u16();
529 let raw = resp.text().await.map_err(|e| SdkError::Http {
530 source: e,
531 context: format!(
532 "read rotate-waitpoint-secret error response body (status {status_u16})"
533 ),
534 })?;
535 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
536 Err(SdkError::AdminApi {
537 status: status_u16,
538 message: parsed
539 .as_ref()
540 .map(|b| b.error.clone())
541 .unwrap_or_else(|| raw.clone()),
542 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
543 retryable: parsed.as_ref().and_then(|b| b.retryable),
544 raw_body: raw,
545 })
546}
547
548async fn issue_reclaim_grant_http(
549 http: &reqwest::Client,
550 base_url: &str,
551 execution_id: &str,
552 req: IssueReclaimGrantRequest,
553) -> Result<IssueReclaimGrantResponse, SdkError> {
554 // Percent-encode `execution_id` in the URL path — the id is a
555 // free-form string and splicing verbatim would produce
556 // malformed URLs. Mirrors `claim_for_worker`'s handling.
557 let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
558 context: "admin_client: issue_reclaim_grant".into(),
559 field: Some("base_url".into()),
560 message: format!("invalid base_url '{}': {e}", base_url),
561 })?;
562 {
563 let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
564 context: "admin_client: issue_reclaim_grant".into(),
565 field: Some("base_url".into()),
566 message: format!("base_url cannot be a base URL: '{}'", base_url),
567 })?;
568 segs.extend(&["v1", "executions", execution_id, "reclaim"]);
569 }
570 let url = url.to_string();
571 let resp = http
572 .post(&url)
573 .json(&req)
574 .send()
575 .await
576 .map_err(|e| SdkError::Http {
577 source: e,
578 context: "POST /v1/executions/{id}/reclaim".into(),
579 })?;
580
581 let status = resp.status();
582 if status.is_success() {
583 return resp
584 .json::<IssueReclaimGrantResponse>()
585 .await
586 .map_err(|e| SdkError::Http {
587 source: e,
588 context: "decode issue_reclaim_grant response body".into(),
589 });
590 }
591
592 let status_u16 = status.as_u16();
593 let raw = resp.text().await.map_err(|e| SdkError::Http {
594 source: e,
595 context: format!(
596 "read issue_reclaim_grant error body (status {status_u16})"
597 ),
598 })?;
599 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
600 Err(SdkError::AdminApi {
601 status: status_u16,
602 message: parsed
603 .as_ref()
604 .map(|b| b.error.clone())
605 .unwrap_or_else(|| raw.clone()),
606 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
607 retryable: parsed.as_ref().and_then(|b| b.retryable),
608 raw_body: raw,
609 })
610}
611
612// ── Embedded-transport helpers (private) ─────────────────────────────
613//
614// Dispatch directly through the `EngineBackend` trait. The request
615// body validation mirrors the server-side handler in
616// `ff-server::api`. Translation between the wire DTOs and the core
617// `contracts::*` types lives here so consumers get identical
618// surfaces across transports.
619
620/// Validate a free-form identifier the same way `ff-server`'s
621/// `validate_identifier` does: non-empty, ≤256 bytes, no whitespace
622/// or control chars. Embedded-transport callers hit this before the
623/// request reaches the backend so invalid identifiers cannot leak
624/// past the SDK's parity guarantee.
625fn validate_admin_identifier(
626 op: &'static str,
627 field: &'static str,
628 value: &str,
629) -> Result<(), SdkError> {
630 if value.is_empty() {
631 return Err(SdkError::Config {
632 context: format!("admin_client: {op}"),
633 field: Some(field.into()),
634 message: "must not be empty".into(),
635 });
636 }
637 if value.len() > 256 {
638 return Err(SdkError::Config {
639 context: format!("admin_client: {op}"),
640 field: Some(field.into()),
641 message: format!("exceeds 256 bytes (got {})", value.len()),
642 });
643 }
644 if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
645 return Err(SdkError::Config {
646 context: format!("admin_client: {op}"),
647 field: Some(field.into()),
648 message: "must not contain whitespace or control characters".into(),
649 });
650 }
651 Ok(())
652}
653
654/// Bounded grant-TTL check mirroring `ff-server`'s
655/// `1..=CLAIM_GRANT_TTL_MS_MAX`. Shared between `claim_for_worker`
656/// and `issue_reclaim_grant` embedded paths.
657fn validate_admin_grant_ttl(op: &'static str, grant_ttl_ms: u64) -> Result<(), SdkError> {
658 if grant_ttl_ms == 0 || grant_ttl_ms > EMBEDDED_GRANT_TTL_MS_MAX {
659 return Err(SdkError::Config {
660 context: format!("admin_client: {op}"),
661 field: Some("grant_ttl_ms".into()),
662 message: format!("must be in 1..={EMBEDDED_GRANT_TTL_MS_MAX}"),
663 });
664 }
665 Ok(())
666}
667
668/// Map an `EngineError` from a backend call into the `SdkError::AdminApi`
669/// surface so embedded and HTTP transports emit the same shape. `Unavailable`
670/// becomes 503 with kind `"unavailable"`; every other engine error bubbles
671/// up via `SdkError::Engine`.
672fn engine_err_to_admin(err: ff_core::engine_error::EngineError, op: &str) -> SdkError {
673 if let ff_core::engine_error::EngineError::Unavailable { op: backend_op } = &err {
674 return SdkError::AdminApi {
675 status: 503,
676 message: format!(
677 "{op}: backend does not implement '{backend_op}' on this transport"
678 ),
679 kind: Some("unavailable".to_owned()),
680 retryable: Some(false),
681 raw_body: String::new(),
682 };
683 }
684 SdkError::Engine(Box::new(err))
685}
686
687async fn claim_for_worker_embedded(
688 backend: &dyn EngineBackend,
689 req: ClaimForWorkerRequest,
690) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
691 // Mirror ff-server's validation + translation. Errors surface as
692 // SdkError::Config so consumers see validation faults loud rather
693 // than as backend transport errors.
694 let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
695 context: "admin_client: claim_for_worker".into(),
696 field: Some("lane_id".into()),
697 message: e.to_string(),
698 })?;
699 validate_admin_identifier("claim_for_worker", "worker_id", &req.worker_id)?;
700 validate_admin_identifier(
701 "claim_for_worker",
702 "worker_instance_id",
703 &req.worker_instance_id,
704 )?;
705 validate_admin_grant_ttl("claim_for_worker", req.grant_ttl_ms)?;
706 let caps: std::collections::BTreeSet<String> = req.capabilities.into_iter().collect();
707 let args = ff_core::contracts::ClaimForWorkerArgs::new(
708 lane_id,
709 ff_core::types::WorkerId::new(req.worker_id),
710 ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
711 caps,
712 req.grant_ttl_ms,
713 );
714 match backend
715 .claim_for_worker(args)
716 .await
717 .map_err(|e| engine_err_to_admin(e, "claim_for_worker"))?
718 {
719 ff_core::contracts::ClaimForWorkerOutcome::NoWork => Ok(None),
720 ff_core::contracts::ClaimForWorkerOutcome::Granted(grant) => {
721 Ok(Some(ClaimForWorkerResponse {
722 execution_id: grant.execution_id.to_string(),
723 partition_key: grant.partition_key,
724 grant_key: grant.grant_key,
725 expires_at_ms: grant.expires_at_ms,
726 }))
727 }
728 // `ClaimForWorkerOutcome` is `#[non_exhaustive]`; mirror
729 // ff-server's 503 policy on unknown variants.
730 _ => Err(SdkError::AdminApi {
731 status: 503,
732 message: "claim_for_worker: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
733 kind: Some("unknown_outcome".to_owned()),
734 retryable: Some(false),
735 raw_body: String::new(),
736 }),
737 }
738}
739
740async fn issue_reclaim_grant_embedded(
741 backend: &dyn EngineBackend,
742 execution_id: &str,
743 req: IssueReclaimGrantRequest,
744) -> Result<IssueReclaimGrantResponse, SdkError> {
745 let exec_id = ff_core::types::ExecutionId::parse(execution_id).map_err(|e| SdkError::Config {
746 context: "admin_client: issue_reclaim_grant".into(),
747 field: Some("execution_id".into()),
748 message: e.to_string(),
749 })?;
750 let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
751 context: "admin_client: issue_reclaim_grant".into(),
752 field: Some("lane_id".into()),
753 message: e.to_string(),
754 })?;
755 validate_admin_identifier("issue_reclaim_grant", "worker_id", &req.worker_id)?;
756 validate_admin_identifier(
757 "issue_reclaim_grant",
758 "worker_instance_id",
759 &req.worker_instance_id,
760 )?;
761 validate_admin_grant_ttl("issue_reclaim_grant", req.grant_ttl_ms)?;
762 let caps: std::collections::BTreeSet<String> = req.worker_capabilities.into_iter().collect();
763 let args = ff_core::contracts::IssueReclaimGrantArgs::new(
764 exec_id,
765 ff_core::types::WorkerId::new(req.worker_id),
766 ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
767 lane_id,
768 req.capability_hash,
769 req.grant_ttl_ms,
770 req.route_snapshot_json,
771 req.admission_summary,
772 caps,
773 ff_core::types::TimestampMs::now(),
774 );
775 match backend
776 .issue_reclaim_grant(args)
777 .await
778 .map_err(|e| engine_err_to_admin(e, "issue_reclaim_grant"))?
779 {
780 ff_core::contracts::IssueReclaimGrantOutcome::Granted(grant) => {
781 Ok(IssueReclaimGrantResponse::Granted {
782 execution_id: grant.execution_id.to_string(),
783 partition_key: grant.partition_key,
784 grant_key: grant.grant_key,
785 expires_at_ms: grant.expires_at_ms,
786 lane_id: grant.lane_id.as_str().to_owned(),
787 })
788 }
789 ff_core::contracts::IssueReclaimGrantOutcome::NotReclaimable { execution_id, detail } => {
790 Ok(IssueReclaimGrantResponse::NotReclaimable {
791 execution_id: execution_id.to_string(),
792 detail,
793 })
794 }
795 ff_core::contracts::IssueReclaimGrantOutcome::ReclaimCapExceeded {
796 execution_id,
797 reclaim_count,
798 } => Ok(IssueReclaimGrantResponse::ReclaimCapExceeded {
799 execution_id: execution_id.to_string(),
800 reclaim_count,
801 }),
802 _ => Err(SdkError::AdminApi {
803 status: 503,
804 message: "issue_reclaim_grant: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
805 kind: Some("unknown_outcome".to_owned()),
806 retryable: Some(false),
807 raw_body: String::new(),
808 }),
809 }
810}
811
812async fn read_waitpoint_token_http(
813 http: &reqwest::Client,
814 base_url: &str,
815 execution_id: &ff_core::types::ExecutionId,
816 waitpoint_id: &ff_core::types::WaitpointId,
817) -> Result<Option<String>, SdkError> {
818 // Percent-encode both path segments: execution_id carries `{fp:N}:`
819 // hash-tag punctuation; waitpoint_id is a UUID but be defensive.
820 let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
821 context: "admin_client: read_waitpoint_token".into(),
822 field: Some("base_url".into()),
823 message: format!("invalid base_url '{}': {e}", base_url),
824 })?;
825 url.path_segments_mut()
826 .map_err(|_| SdkError::Config {
827 context: "admin_client: read_waitpoint_token".into(),
828 field: Some("base_url".into()),
829 message: format!("base_url '{}' cannot be a base", base_url),
830 })?
831 .extend(&[
832 "v1",
833 "executions",
834 execution_id.as_str(),
835 "waitpoints",
836 &waitpoint_id.to_string(),
837 "token",
838 ]);
839
840 let resp = http
841 .get(url.clone())
842 .send()
843 .await
844 .map_err(|e| SdkError::Http {
845 source: e,
846 context: format!("GET {url}"),
847 })?;
848 let status = resp.status();
849 if status == reqwest::StatusCode::NOT_FOUND {
850 return Ok(None);
851 }
852 if status.is_success() {
853 #[derive(Deserialize)]
854 struct Body {
855 token: String,
856 }
857 let body: Body = resp.json().await.map_err(|e| SdkError::Http {
858 source: e,
859 context: "decode read_waitpoint_token response body".into(),
860 })?;
861 return Ok(Some(body.token));
862 }
863
864 let status_u16 = status.as_u16();
865 let raw = resp.text().await.map_err(|e| SdkError::Http {
866 source: e,
867 context: format!("read read_waitpoint_token error body (status {status_u16})"),
868 })?;
869 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
870 Err(SdkError::AdminApi {
871 status: status_u16,
872 message: parsed
873 .as_ref()
874 .map(|b| b.error.clone())
875 .unwrap_or_else(|| raw.clone()),
876 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
877 retryable: parsed.as_ref().and_then(|b| b.retryable),
878 raw_body: raw,
879 })
880}
881
882async fn read_waitpoint_token_embedded(
883 backend: &dyn EngineBackend,
884 execution_id: &ff_core::types::ExecutionId,
885 waitpoint_id: &ff_core::types::WaitpointId,
886) -> Result<Option<String>, SdkError> {
887 let partition =
888 ff_core::partition::PartitionKey::from(&ff_core::partition::Partition {
889 family: ff_core::partition::PartitionFamily::Flow,
890 index: execution_id.partition(),
891 });
892 backend
893 .read_waitpoint_token(partition, waitpoint_id)
894 .await
895 .map_err(|e| engine_err_to_admin(e, "read_waitpoint_token"))
896}
897
898async fn rotate_waitpoint_secret_embedded(
899 backend: &dyn EngineBackend,
900 req: RotateWaitpointSecretRequest,
901) -> Result<RotateWaitpointSecretResponse, SdkError> {
902 // Validation mirrors `ff-server::Server::rotate_waitpoint_secret`
903 // so the embedded and HTTP transports reject the same invalid
904 // inputs.
905 if req.new_kid.is_empty() || req.new_kid.contains(':') {
906 return Err(SdkError::Config {
907 context: "admin_client: rotate_waitpoint_secret".into(),
908 field: Some("new_kid".into()),
909 message: "must be non-empty and must not contain ':'".into(),
910 });
911 }
912 if req.new_secret_hex.is_empty()
913 || !req.new_secret_hex.len().is_multiple_of(2)
914 || !req.new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
915 {
916 return Err(SdkError::Config {
917 context: "admin_client: rotate_waitpoint_secret".into(),
918 field: Some("new_secret_hex".into()),
919 message: "must be a non-empty even-length hex string".into(),
920 });
921 }
922 // Embedded consumers have no config surface from which to read
923 // the per-deployment grace window, so pin the documented default
924 // matching `ff-server`'s `FF_WAITPOINT_HMAC_GRACE_MS` (24 h). See
925 // `EMBEDDED_WAITPOINT_HMAC_GRACE_MS` + the `connect_with` rustdoc
926 // for the rationale and divergence-from-HTTP notes. Unlike the
927 // HTTP handler, this path does not enforce the 120 s end-to-end
928 // timeout (no HTTP deadline to honour) and does not emit the
929 // `audit`-target `waitpoint_hmac_rotation_*` events (those are
930 // server-owned operator signals). Rotation is idempotent on the
931 // same (new_kid, new_secret_hex) pair, so retries remain safe.
932 let args = ff_core::contracts::RotateWaitpointHmacSecretAllArgs::new(
933 req.new_kid.clone(),
934 req.new_secret_hex,
935 EMBEDDED_WAITPOINT_HMAC_GRACE_MS,
936 );
937 let result = backend
938 .rotate_waitpoint_hmac_secret_all(args)
939 .await
940 .map_err(|e| engine_err_to_admin(e, "rotate_waitpoint_secret"))?;
941
942 // Collapse the per-partition entries into the HTTP response shape
943 // the server emits — rotated count + failed indices + echoed
944 // new_kid — so consumers see identical return values across
945 // transports.
946 let mut rotated: u16 = 0;
947 let mut failed: Vec<u16> = Vec::new();
948 for entry in &result.entries {
949 match &entry.result {
950 Ok(_) => {
951 rotated = rotated.saturating_add(1);
952 }
953 Err(_) => failed.push(entry.partition),
954 }
955 }
956 Ok(RotateWaitpointSecretResponse {
957 rotated,
958 failed,
959 new_kid: req.new_kid,
960 })
961}
962
963/// Request body for `POST /v1/executions/{execution_id}/reclaim`
964/// (RFC-024 §3.5).
965///
966/// Mirrors `ff_server::api::IssueReclaimGrantBody` 1:1. The
967/// `execution_id` goes in the URL path, not the body.
968#[derive(Debug, Clone, Serialize)]
969pub struct IssueReclaimGrantRequest {
970 /// Worker identity requesting the grant. The Lua
971 /// `ff_reclaim_execution` validates grant consumption via
972 /// `grant.worker_id == args.worker_id` (RFC-024 §4.4) — the
973 /// worker consuming the grant must match this value.
974 pub worker_id: String,
975 /// Worker-instance identity. Informational at grant-issuance
976 /// time; stored on the grant so consumers can correlate events.
977 pub worker_instance_id: String,
978 /// Lane the execution belongs to. Needed by
979 /// `ff_issue_reclaim_grant` for `KEYS[*]` construction.
980 pub lane_id: String,
981 /// Opaque capability-hash token stored verbatim on the issued
982 /// grant for audit / downstream observability. NOT used for
983 /// admission — admission compares `worker_capabilities` against
984 /// the execution's `required_capabilities` (see the
985 /// [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc).
986 /// `None` leaves the field empty on the grant.
987 #[serde(default, skip_serializing_if = "Option::is_none")]
988 pub capability_hash: Option<String>,
989 /// Grant TTL in milliseconds. Bounded server-side.
990 pub grant_ttl_ms: u64,
991 /// Route snapshot JSON carried onto the grant for audit.
992 #[serde(default, skip_serializing_if = "Option::is_none")]
993 pub route_snapshot_json: Option<String>,
994 /// Admission summary string carried onto the grant for audit.
995 #[serde(default, skip_serializing_if = "Option::is_none")]
996 pub admission_summary: Option<String>,
997 /// Worker capability tokens. Consumers typically source these
998 /// from their registered worker's `WorkerConfig::capabilities`
999 /// (see [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc
1000 /// for the override contract).
1001 #[serde(default)]
1002 pub worker_capabilities: Vec<String>,
1003}
1004
1005/// Response body for `POST /v1/executions/{execution_id}/reclaim`
1006/// (RFC-024 §3.5).
1007///
1008/// The server serializes this struct with a `status` discriminator so
1009/// consumers can match on structured outcomes without re-parsing a
1010/// 200-vs-4xx split for business-logic outcomes (mirrors
1011/// `RotateWaitpointSecretResponse`'s precedent).
1012#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1013#[serde(tag = "status", rename_all = "snake_case")]
1014pub enum IssueReclaimGrantResponse {
1015 /// Grant issued. Build a
1016 /// [`ff_core::contracts::ReclaimGrant`] via
1017 /// [`Self::into_grant`] and feed it to
1018 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
1019 Granted {
1020 execution_id: String,
1021 partition_key: ff_core::partition::PartitionKey,
1022 grant_key: String,
1023 expires_at_ms: u64,
1024 lane_id: String,
1025 },
1026 /// Execution is not in a reclaimable state (not
1027 /// `lease_expired_reclaimable` / `lease_revoked`).
1028 NotReclaimable {
1029 execution_id: String,
1030 detail: String,
1031 },
1032 /// `max_reclaim_count` exceeded; execution transitioned to
1033 /// terminal_failed. Consumers stop retrying and surface a
1034 /// structural failure.
1035 ReclaimCapExceeded {
1036 execution_id: String,
1037 reclaim_count: u32,
1038 },
1039}
1040
1041impl IssueReclaimGrantResponse {
1042 /// Convert a [`Self::Granted`] response into a typed
1043 /// [`ff_core::contracts::ReclaimGrant`] for handoff to
1044 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
1045 ///
1046 /// Returns [`SdkError::AdminApi`] when the wire variant is not
1047 /// `Granted` (consumer asked for a grant but the server replied
1048 /// with a terminal outcome) or when `execution_id` / `lane_id`
1049 /// are malformed — the latter signals a drift between server and
1050 /// SDK, so failing loud prevents silent misrouting.
1051 pub fn into_grant(self) -> Result<ff_core::contracts::ReclaimGrant, SdkError> {
1052 match self {
1053 IssueReclaimGrantResponse::Granted {
1054 execution_id,
1055 partition_key,
1056 grant_key,
1057 expires_at_ms,
1058 lane_id,
1059 } => {
1060 let eid = ff_core::types::ExecutionId::parse(&execution_id)
1061 .map_err(|e| SdkError::AdminApi {
1062 status: 200,
1063 message: format!(
1064 "issue_reclaim_grant: server returned malformed execution_id '{execution_id}': {e}"
1065 ),
1066 kind: Some("malformed_response".to_owned()),
1067 retryable: Some(false),
1068 raw_body: String::new(),
1069 })?;
1070 let lane = ff_core::types::LaneId::try_new(lane_id.clone())
1071 .map_err(|e| SdkError::AdminApi {
1072 status: 200,
1073 message: format!(
1074 "issue_reclaim_grant: server returned malformed lane_id '{lane_id}': {e}"
1075 ),
1076 kind: Some("malformed_response".to_owned()),
1077 retryable: Some(false),
1078 raw_body: String::new(),
1079 })?;
1080 Ok(ff_core::contracts::ReclaimGrant::new(
1081 eid,
1082 partition_key,
1083 grant_key,
1084 expires_at_ms,
1085 lane,
1086 ))
1087 }
1088 IssueReclaimGrantResponse::NotReclaimable { execution_id, detail } => {
1089 Err(SdkError::AdminApi {
1090 status: 200,
1091 message: format!(
1092 "issue_reclaim_grant: execution '{execution_id}' not reclaimable: {detail}"
1093 ),
1094 kind: Some("not_reclaimable".to_owned()),
1095 retryable: Some(false),
1096 raw_body: String::new(),
1097 })
1098 }
1099 IssueReclaimGrantResponse::ReclaimCapExceeded {
1100 execution_id,
1101 reclaim_count,
1102 } => Err(SdkError::AdminApi {
1103 status: 200,
1104 message: format!(
1105 "issue_reclaim_grant: execution '{execution_id}' hit reclaim cap ({reclaim_count})"
1106 ),
1107 kind: Some("reclaim_cap_exceeded".to_owned()),
1108 retryable: Some(false),
1109 raw_body: String::new(),
1110 }),
1111 }
1112 }
1113}
1114
1115/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
1116///
1117/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
1118#[derive(Debug, Clone, Serialize)]
1119pub struct RotateWaitpointSecretRequest {
1120 /// New key identifier. Non-empty, must not contain `:` (the
1121 /// server uses `:` as the field separator in the secret hash).
1122 pub new_kid: String,
1123 /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
1124 pub new_secret_hex: String,
1125}
1126
1127/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
1128///
1129/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
1130/// The server serializes this struct as-is via `Json(result)`.
1131#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1132pub struct RotateWaitpointSecretResponse {
1133 /// Count of partitions that accepted the rotation.
1134 pub rotated: u16,
1135 /// Partition indices where the rotation failed — operator
1136 /// should investigate. Rotation is idempotent on the same
1137 /// `(new_kid, new_secret_hex)` so a retry after the underlying
1138 /// fault clears converges.
1139 pub failed: Vec<u16>,
1140 /// The `new_kid` that was installed as current on every
1141 /// rotated partition — echoes the request field back for
1142 /// confirmation.
1143 pub new_kid: String,
1144}
1145
1146/// Server-side error body shape, as emitted by
1147/// `ff_server::api::ErrorBody`. Kept internal because consumers
1148/// match on the flattened fields of [`SdkError::AdminApi`].
1149#[derive(Debug, Clone, Deserialize)]
1150struct AdminErrorBody {
1151 error: String,
1152 #[serde(default)]
1153 kind: Option<String>,
1154 #[serde(default)]
1155 retryable: Option<bool>,
1156}
1157
1158/// Request body for `POST /v1/workers/{worker_id}/claim`.
1159///
1160/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
1161/// goes in the URL path (not the body) but is kept on the struct
1162/// for ergonomics — callers don't juggle a separate arg.
1163#[derive(Debug, Clone, Serialize)]
1164pub struct ClaimForWorkerRequest {
1165 #[serde(skip)]
1166 pub worker_id: String,
1167 pub lane_id: String,
1168 pub worker_instance_id: String,
1169 #[serde(default)]
1170 pub capabilities: Vec<String>,
1171 /// Grant TTL in milliseconds. Server rejects 0 or anything over
1172 /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
1173 pub grant_ttl_ms: u64,
1174}
1175
1176/// Response body for `POST /v1/workers/{worker_id}/claim`.
1177///
1178/// Wire shape of `ff_core::contracts::ClaimGrant`. Carries the opaque
1179/// [`ff_core::partition::PartitionKey`] directly on the wire (issue
1180/// #91); the SDK reconstructs the core type via [`Self::into_grant`].
1181#[derive(Debug, Clone, Deserialize)]
1182pub struct ClaimForWorkerResponse {
1183 pub execution_id: String,
1184 pub partition_key: ff_core::partition::PartitionKey,
1185 pub grant_key: String,
1186 pub expires_at_ms: u64,
1187}
1188
1189impl ClaimForWorkerResponse {
1190 /// Convert the wire DTO into a typed
1191 /// [`ff_core::contracts::ClaimGrant`] for handoff to
1192 /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
1193 /// [`SdkError::AdminApi`] on malformed execution_id — a drift
1194 /// signal that the server and SDK disagree on the wire shape, so
1195 /// failing loud prevents routing to a ghost partition.
1196 ///
1197 /// The `partition_key` itself is not eagerly parsed here: it is
1198 /// carried opaquely to the `claim_from_grant` hot path, which
1199 /// parses it there and surfaces a typed error on malformed keys.
1200 pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
1201 let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
1202 .map_err(|e| SdkError::AdminApi {
1203 status: 200,
1204 message: format!(
1205 "claim_for_worker: server returned malformed execution_id '{}': {e}",
1206 self.execution_id
1207 ),
1208 kind: Some("malformed_response".to_owned()),
1209 retryable: Some(false),
1210 raw_body: String::new(),
1211 })?;
1212 Ok(ff_core::contracts::ClaimGrant::new(
1213 execution_id,
1214 self.partition_key,
1215 self.grant_key,
1216 self.expires_at_ms,
1217 ))
1218 }
1219}
1220
1221/// Per-partition outcome of a cluster-wide waitpoint HMAC secret
1222/// rotation. Returned by [`rotate_waitpoint_hmac_secret_all_partitions`]
1223/// so operators can audit which partitions rotated vs. no-op'd vs. failed.
1224///
1225/// The index is the execution-partition index (`0..num_partitions`),
1226/// matching `{fp:N}` in the keyspace.
1227#[derive(Debug)]
1228pub struct PartitionRotationOutcome {
1229 /// Execution partition index (`0..num_partitions`).
1230 pub partition: u16,
1231 /// FCALL outcome on this partition, or the error it raised.
1232 pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
1233}
1234
1235/// Rotate the waitpoint HMAC secret across every execution partition
1236/// by fanning out the `ff_rotate_waitpoint_hmac_secret` FCALL.
1237///
1238/// This is the canonical Rust-side rotation path for direct-Valkey
1239/// consumers (e.g. cairn-fabric) that cannot route through the
1240/// `ff-server` admin REST endpoint. Callers who have an HTTP-reachable
1241/// `ff-server` should prefer [`FlowFabricAdminClient::rotate_waitpoint_secret`] —
1242/// that path adds a single-writer admission gate, parallel fan-out,
1243/// structured audit events, and the server's configured grace window.
1244///
1245/// # Production rotation recipe
1246///
1247/// Operators MUST coordinate so secret rotation **precedes** any
1248/// waitpoint resolution that will present the new `kid`. The broad
1249/// sequence:
1250///
1251/// 1. Pick a fresh `new_kid` (must NOT contain `:` — the server uses
1252/// `:` as the field separator in the secret hash).
1253/// 2. Call this helper with the previous `kid`'s grace window
1254/// (`grace_ms` — the duration during which tokens signed by the
1255/// outgoing secret remain valid).
1256/// 3. Only after this call returns with all partitions `Ok(_)` (either
1257/// `Rotated` or `Noop`), begin signing new tokens with `new_kid`.
1258/// 4. Retain the previous secret in the keystore until the grace
1259/// window elapses — the FCALL handles GC of expired kids on every
1260/// rotation, so just don't rotate again before the grace window.
1261///
1262/// See RFC-004 §rotation for the full 4-key HSET + `previous_expires_at`
1263/// dance the FCALL implements server-side.
1264///
1265/// # Idempotency
1266///
1267/// Each partition FCALL is idempotent on the same `(new_kid,
1268/// new_secret_hex)` pair: a replay with identical args returns
1269/// `RotateWaitpointHmacSecretOutcome::Noop`. A same-kid-different-secret
1270/// replay surfaces as a per-partition `SdkError` (wrapping
1271/// `ScriptError::RotationConflict`) — pick a fresh `new_kid` to recover.
1272///
1273/// # Error semantics
1274///
1275/// A per-partition FCALL failure (transport fault, rotation conflict,
1276/// etc.) is recorded on that partition's [`PartitionRotationOutcome`]
1277/// and fan-out **continues** — the contract matches the server's
1278/// `rotate_waitpoint_secret` (partial success is allowed, operators
1279/// retry on the failed partition subset). Returning `Vec<_>` (not
1280/// `Result<Vec<_>, _>`) is deliberate: every whole-call invariant is
1281/// enforced by the underlying FCALL on each partition (kid non-empty,
1282/// no `:`, even-length hex, etc.), so the aggregate has nothing left
1283/// to reject at the Rust boundary. Callers decide how to treat partial
1284/// failures (fail loud / retry the subset / record metrics).
1285///
1286/// # Concurrency + performance
1287///
1288/// Sequential (one partition at a time) to keep the helper dependency-
1289/// free: no `futures::stream` / tokio-specific primitives on the caller
1290/// path. For a cluster with N partitions and per-partition RTT R, the
1291/// total duration is ~N*R. Consumers needing parallel fan-out should
1292/// wrap this with `FuturesUnordered` themselves, or use the server
1293/// admin endpoint (which fans out with bounded concurrency = 16).
1294///
1295/// # Test harness
1296///
1297/// The `ff-test::fixtures::TestCluster::rotate_waitpoint_hmac_secret`
1298/// method is a thin wrapper around this helper — integration tests and
1299/// production code exercise the same code path.
1300///
1301/// # Example
1302///
1303/// ```rust,ignore
1304/// use ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions;
1305///
1306/// let results = rotate_waitpoint_hmac_secret_all_partitions(
1307/// &client,
1308/// partition_config.num_flow_partitions,
1309/// "kid-2026-04-22",
1310/// "deadbeef...64-hex-chars...",
1311/// 60_000,
1312/// )
1313/// .await?;
1314///
1315/// for entry in &results {
1316/// match &entry.result {
1317/// Ok(outcome) => tracing::info!(partition = entry.partition, ?outcome, "rotated"),
1318/// Err(e) => tracing::error!(partition = entry.partition, %e, "rotation failed"),
1319/// }
1320/// }
1321/// ```
1322// v0.12 PR-6: the `admin` module is ungated at module level so
1323// consumers under `--no-default-features --features sqlite` can reach
1324// the HTTP admin client surface. This helper is the one remaining
1325// Valkey-typed item in the module (takes a `&ferriskey::Client` and
1326// fans out `ff_rotate_waitpoint_hmac_secret` FCALLs), so it stays
1327// `valkey-default`-gated. See `lib.rs` PR-6 comment for the Option 1
1328// / Option 2 decision.
1329#[cfg(feature = "valkey-default")]
1330pub async fn rotate_waitpoint_hmac_secret_all_partitions(
1331 client: &ferriskey::Client,
1332 num_partitions: u16,
1333 new_kid: &str,
1334 new_secret_hex: &str,
1335 grace_ms: u64,
1336) -> Vec<PartitionRotationOutcome> {
1337 // Hoisted out of the loop — `ff_rotate_waitpoint_hmac_secret` only
1338 // borrows the args, so every partition can reuse the same struct.
1339 // Avoids N × 2 string clones on the hot fan-out path.
1340 let args = RotateWaitpointHmacSecretArgs {
1341 new_kid: new_kid.to_owned(),
1342 new_secret_hex: new_secret_hex.to_owned(),
1343 grace_ms,
1344 };
1345 let mut out = Vec::with_capacity(num_partitions as usize);
1346 for index in 0..num_partitions {
1347 let partition = Partition {
1348 family: PartitionFamily::Execution,
1349 index,
1350 };
1351 let idx = IndexKeys::new(&partition);
1352 let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
1353 client, &idx, &args,
1354 )
1355 .await
1356 .map_err(SdkError::from);
1357 out.push(PartitionRotationOutcome {
1358 partition: index,
1359 result,
1360 });
1361 }
1362 out
1363}
1364
1365/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
1366/// never produces `https://host//v1/...`. Mirror of
1367/// media-pipeline's pattern.
1368fn normalize_base_url(mut url: String) -> String {
1369 while url.ends_with('/') {
1370 url.pop();
1371 }
1372 url
1373}
1374
1375#[cfg(test)]
1376mod tests {
1377 use super::*;
1378
1379 #[test]
1380 fn base_url_strips_trailing_slash() {
1381 assert_eq!(normalize_base_url("http://x".into()), "http://x");
1382 assert_eq!(normalize_base_url("http://x/".into()), "http://x");
1383 assert_eq!(normalize_base_url("http://x///".into()), "http://x");
1384 }
1385
1386 #[test]
1387 fn with_token_rejects_bad_header_chars() {
1388 // Raw newline in the token would split the Authorization
1389 // header — must fail loudly at construction.
1390 let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
1391 assert!(
1392 matches!(err, SdkError::Config { .. }),
1393 "got: {err:?}"
1394 );
1395 }
1396
1397 #[test]
1398 fn with_token_rejects_empty_or_whitespace() {
1399 // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
1400 // Fail loudly at construction instead of shipping a client
1401 // that silently 401s on first request.
1402 for s in ["", " ", "\t\n ", " "] {
1403 let err = FlowFabricAdminClient::with_token("http://x", s)
1404 .unwrap_err();
1405 assert!(
1406 matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
1407 "token {s:?} should return Config with field=bearer_token; got: {err:?}"
1408 );
1409 }
1410 }
1411
1412 #[test]
1413 fn admin_error_body_deserialises_optional_fields() {
1414 // `kind` + `retryable` absent (the usual shape for 400s).
1415 let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
1416 assert_eq!(b.error, "bad new_kid");
1417 assert!(b.kind.is_none());
1418 assert!(b.retryable.is_none());
1419
1420 // `kind` + `retryable` present (500 ValkeyError shape).
1421 let b: AdminErrorBody = serde_json::from_str(
1422 r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
1423 )
1424 .unwrap();
1425 assert_eq!(b.error, "valkey: timed out");
1426 assert_eq!(b.kind.as_deref(), Some("IoError"));
1427 assert_eq!(b.retryable, Some(true));
1428 }
1429
1430 #[test]
1431 fn rotate_response_deserialises_server_shape() {
1432 // Exact shape the server emits.
1433 let raw = r#"{
1434 "rotated": 3,
1435 "failed": [4, 5],
1436 "new_kid": "kid-2026-04-18"
1437 }"#;
1438 let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
1439 assert_eq!(r.rotated, 3);
1440 assert_eq!(r.failed, vec![4, 5]);
1441 assert_eq!(r.new_kid, "kid-2026-04-18");
1442 }
1443
1444 // ── ClaimForWorkerResponse::into_grant ──
1445
1446 fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
1447 ClaimForWorkerResponse {
1448 execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
1449 partition_key: serde_json::from_str(
1450 &serde_json::to_string(partition_key).unwrap(),
1451 )
1452 .unwrap(),
1453 grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
1454 expires_at_ms: 1_700_000_000_000,
1455 }
1456 }
1457
1458 #[test]
1459 fn into_grant_preserves_all_known_partition_key_shapes() {
1460 // Post-#91: families collapse into opaque PartitionKey literals.
1461 // Flow and Execution both produce "{fp:N}"; Budget is "{b:N}";
1462 // Quota is "{q:N}". The DTO preserves the wire string as-is;
1463 // into_grant hands it opaquely to the core type.
1464 for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
1465 let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
1466 panic!("key {key_str} should parse: {e:?}")
1467 });
1468 assert_eq!(g.partition_key.as_str(), key_str);
1469 assert_eq!(g.expires_at_ms, 1_700_000_000_000);
1470 }
1471 }
1472
1473 #[test]
1474 fn into_grant_preserves_opaque_partition_key() {
1475 // The SDK does NOT eagerly parse the partition_key on the
1476 // admin boundary — malformed keys are caught at the
1477 // claim_from_grant hot path where the typed Partition is
1478 // actually needed. This test pins the opacity contract.
1479 let resp = sample_claim_response("{zz:0}");
1480 let g = resp.into_grant().expect("SDK must not parse partition_key");
1481 assert_eq!(g.partition_key.as_str(), "{zz:0}");
1482 // Parsing surfaces the error explicitly.
1483 assert!(g.partition().is_err());
1484 }
1485
1486 #[test]
1487 fn into_grant_rejects_malformed_execution_id() {
1488 let mut resp = sample_claim_response("{fp:5}");
1489 resp.execution_id = "not-a-valid-eid".to_owned();
1490 let err = resp.into_grant().unwrap_err();
1491 match err {
1492 SdkError::AdminApi { message, kind, .. } => {
1493 assert!(message.contains("malformed execution_id"),
1494 "msg: {message}");
1495 assert_eq!(kind.as_deref(), Some("malformed_response"));
1496 }
1497 other => panic!("expected AdminApi, got {other:?}"),
1498 }
1499 }
1500
1501 // ── ClaimForWorkerResponse wire shape (issue #91) ──
1502
1503 // `rotate_waitpoint_hmac_secret_all_partitions` exercise coverage
1504 // lives in `ff-test` — the integration test harness in
1505 // `crates/ff-test/tests/waitpoint_hmac_rotation_fcall.rs` and
1506 // `waitpoint_tokens.rs` calls through the function via the
1507 // `TestCluster::rotate_waitpoint_hmac_secret` fixture, which is
1508 // now a thin delegator. A pure unit test here would require a
1509 // mock `ferriskey::Client` (ferriskey's `Client` performs a live
1510 // RESP handshake on `ClientBuilder::build`, so a local TCP
1511 // listener alone isn't sufficient) — expensive to construct for
1512 // one-line iteration-count coverage.
1513
1514 #[test]
1515 fn read_waitpoint_token_url_percent_encodes_path_segments() {
1516 // The execution id carries `{fp:N}:` literal punctuation;
1517 // a naïve `format!` splice would ship those chars unencoded
1518 // and the server would match the wrong route. Pin that the
1519 // reqwest URL builder percent-encodes each segment.
1520 use ff_core::types::{ExecutionId, WaitpointId};
1521
1522 let mut url = reqwest::Url::parse("http://x").unwrap();
1523 let execution_id = ExecutionId::parse(
1524 "{fp:7}:11111111-1111-1111-1111-111111111111",
1525 )
1526 .unwrap();
1527 let waitpoint_id = WaitpointId::parse("22222222-2222-2222-2222-222222222222")
1528 .unwrap();
1529 url.path_segments_mut()
1530 .unwrap()
1531 .extend(&[
1532 "v1",
1533 "executions",
1534 execution_id.as_str(),
1535 "waitpoints",
1536 &waitpoint_id.to_string(),
1537 "token",
1538 ]);
1539 assert_eq!(
1540 url.as_str(),
1541 "http://x/v1/executions/%7Bfp:7%7D:11111111-1111-1111-1111-111111111111\
1542 /waitpoints/22222222-2222-2222-2222-222222222222/token"
1543 );
1544 }
1545
1546 #[test]
1547 fn claim_for_worker_response_deserialises_opaque_partition_key() {
1548 // Exact shape the server emits post-#91.
1549 let raw = r#"{
1550 "execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
1551 "partition_key": "{fp:7}",
1552 "grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
1553 "expires_at_ms": 1700000000000
1554 }"#;
1555 let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
1556 assert_eq!(r.partition_key.as_str(), "{fp:7}");
1557 assert_eq!(r.expires_at_ms, 1_700_000_000_000);
1558 }
1559}