ff_sdk/admin.rs
1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use ff_core::contracts::RotateWaitpointHmacSecretOutcome;
21use ff_core::engine_backend::EngineBackend;
22// v0.12 PR-6: these imports only power the Valkey-typed
23// `rotate_waitpoint_hmac_secret_all_partitions` helper at the bottom
24// of the module; gated so the ungated module builds clean under
25// `--no-default-features --features sqlite`.
26#[cfg(feature = "valkey-default")]
27use ff_core::contracts::RotateWaitpointHmacSecretArgs;
28#[cfg(feature = "valkey-default")]
29use ff_core::keys::IndexKeys;
30#[cfg(feature = "valkey-default")]
31use ff_core::partition::{Partition, PartitionFamily};
32use serde::{Deserialize, Serialize};
33
34use crate::SdkError;
35
36/// Default per-request timeout. The server's own
37/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
38/// client deadline is LATER than the server deadline and
39/// operators see the structured 504 GATEWAY_TIMEOUT body rather
40/// than a client-side timeout error.
41const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
42
43/// Grace window in ms that the embedded `rotate_waitpoint_secret`
44/// path forwards to the backend primitive. Matches `ff-server`'s
45/// default `FF_WAITPOINT_HMAC_GRACE_MS` (24 h) so tokens signed by
46/// the outgoing kid remain valid for a full day after rotation
47/// without the embedded-path hard-killing in-flight flows. HTTP
48/// callers get whatever the server was configured with; embedded
49/// callers get this pinned default.
50pub const EMBEDDED_WAITPOINT_HMAC_GRACE_MS: u64 = 86_400_000;
51
52/// Maximum grant TTL (ms) the embedded admin path accepts on
53/// `claim_for_worker` / `issue_reclaim_grant`. Matches
54/// `ff-server`'s `CLAIM_GRANT_TTL_MS_MAX` / `RECLAIM_GRANT_TTL_MS_MAX`
55/// so the embedded transport rejects the same range the HTTP
56/// transport does.
57const EMBEDDED_GRANT_TTL_MS_MAX: u64 = 60_000;
58
59/// Client for FlowFabric admin primitives — backend-agnostic facade
60/// (v0.13 SC-10 ergonomics).
61///
62/// Two construction shapes:
63///
64/// * [`FlowFabricAdminClient::new`] / [`FlowFabricAdminClient::with_token`]
65/// — HTTP transport targeting a running `ff-server`.
66/// * [`FlowFabricAdminClient::connect_with`] — embedded transport
67/// that dispatches directly through an `Arc<dyn EngineBackend>`.
68/// No `ff-server` required; works under `FF_DEV_MODE=1` + SQLite
69/// and in any in-process deployment.
70///
71/// The public method surface is identical across both transports;
72/// consumers choose at construction time. Admin methods that have
73/// no backend-trait equivalent return
74/// [`SdkError::AdminApi`] with status 503 on the embedded path —
75/// today every method on this client maps cleanly, so this fallback
76/// is only reached if a future admin primitive lands HTTP-first.
77#[derive(Debug, Clone)]
78pub struct FlowFabricAdminClient {
79 transport: AdminTransport,
80}
81
82/// Internal discriminator between the HTTP and embedded transports.
83/// Private by design — the public API is uniform across both shapes
84/// (see the [`FlowFabricAdminClient`] type-level docs).
85#[derive(Clone)]
86enum AdminTransport {
87 Http {
88 http: reqwest::Client,
89 base_url: String,
90 },
91 Embedded(Arc<dyn EngineBackend>),
92}
93
94impl std::fmt::Debug for AdminTransport {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 AdminTransport::Http { base_url, .. } => f
98 .debug_struct("Http")
99 .field("base_url", base_url)
100 .finish_non_exhaustive(),
101 AdminTransport::Embedded(backend) => f
102 .debug_struct("Embedded")
103 .field("backend", &backend.backend_label())
104 .finish(),
105 }
106 }
107}
108
109impl FlowFabricAdminClient {
110 /// Build a client without auth. Suitable for a dev ff-server
111 /// whose `api_token` is unconfigured. Production deployments
112 /// should use [`with_token`](Self::with_token).
113 pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
114 let http = reqwest::Client::builder()
115 .timeout(DEFAULT_TIMEOUT)
116 .build()
117 .map_err(|e| SdkError::Http {
118 source: e,
119 context: "build reqwest::Client".into(),
120 })?;
121 Ok(Self {
122 transport: AdminTransport::Http {
123 http,
124 base_url: normalize_base_url(base_url.into()),
125 },
126 })
127 }
128
129 /// Build a client that dispatches admin primitives directly
130 /// through an `Arc<dyn EngineBackend>`, bypassing HTTP entirely.
131 ///
132 /// # When to use
133 ///
134 /// * `FF_DEV_MODE=1` SQLite scenarios where no `ff-server` is
135 /// running.
136 /// * In-process / embedded deployments that hold a backend
137 /// handle already (e.g. tests, examples, scheduler-hosting
138 /// binaries).
139 ///
140 /// # Semantic parity
141 ///
142 /// Each method on [`FlowFabricAdminClient`] dispatches to the
143 /// equivalent `EngineBackend` trait method (see the RFC-024 /
144 /// RFC-017 admin surfaces). Validation **rules** + request-body
145 /// translation mirror the server-side handler in `ff-server` so
146 /// callers get the same accept / reject behaviour across
147 /// transports. Note: the exact [`SdkError`] variant differs —
148 /// embedded-path validation rejects surface as [`SdkError::Config`]
149 /// (no HTTP round-trip) while HTTP returns [`SdkError::AdminApi`]
150 /// with status `400`. Callers that need to distinguish a 4xx from
151 /// a transport fault should use [`SdkError::is_retryable`] or
152 /// match on `Config` + `AdminApi` together rather than relying on
153 /// a single variant.
154 ///
155 /// `EngineError::Unavailable` from the backend — emitted by
156 /// backends that have not implemented a given method — is mapped
157 /// to [`SdkError::AdminApi`] with `status = 503` and
158 /// `kind = Some("unavailable")` so callers see a uniform
159 /// admin-error surface across transports.
160 ///
161 /// # Divergence from the HTTP transport
162 ///
163 /// * `rotate_waitpoint_secret` forwards
164 /// [`EMBEDDED_WAITPOINT_HMAC_GRACE_MS`] (24 h, matching
165 /// `ff-server`'s default `FF_WAITPOINT_HMAC_GRACE_MS`) as the
166 /// per-partition grace window. The HTTP transport reads the
167 /// server's env-configured value; the embedded client has no
168 /// config surface so it pins the documented default.
169 /// * No single-writer admin semaphore, no audit-log emission.
170 /// These are `ff-server` responsibilities; embedded consumers
171 /// wanting them bring their own gate.
172 pub fn connect_with(backend: Arc<dyn EngineBackend>) -> Self {
173 Self {
174 transport: AdminTransport::Embedded(backend),
175 }
176 }
177
178 /// Build a client that sends `Authorization: Bearer <token>` on
179 /// every request. The token is passed by value so the caller
180 /// retains ownership policy (e.g. zeroize on drop at the
181 /// caller side); the SDK only reads it.
182 ///
183 /// # Empty-token guard
184 ///
185 /// An empty or all-whitespace `token` returns
186 /// [`SdkError::Config`] instead of silently constructing
187 /// `Authorization: Bearer ` (which the server rejects with
188 /// 401, leaving the operator chasing a "why is auth broken"
189 /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
190 /// where the var was meant to be set; the unset-expansion is
191 /// the empty string. Prefer an obvious error at construction
192 /// over a silent 401 at first request.
193 ///
194 /// If the caller genuinely wants an unauthenticated client
195 /// (dev ff-server without `api_token` configured), use
196 /// [`FlowFabricAdminClient::new`] instead.
197 pub fn with_token(
198 base_url: impl Into<String>,
199 token: impl AsRef<str>,
200 ) -> Result<Self, SdkError> {
201 let token_str = token.as_ref();
202 if token_str.trim().is_empty() {
203 return Err(SdkError::Config {
204 context: "admin_client".into(),
205 field: Some("bearer_token".into()),
206 message: "is empty or all-whitespace; use \
207 FlowFabricAdminClient::new for unauthenticated access"
208 .into(),
209 });
210 }
211 let mut headers = reqwest::header::HeaderMap::new();
212 let mut auth_value =
213 reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
214 |_| SdkError::Config {
215 context: "admin_client".into(),
216 field: Some("bearer_token".into()),
217 message: "contains characters not valid in an HTTP header".into(),
218 },
219 )?;
220 // Mark Authorization as sensitive so it doesn't appear in
221 // reqwest's Debug output / logs.
222 auth_value.set_sensitive(true);
223 headers.insert(reqwest::header::AUTHORIZATION, auth_value);
224
225 let http = reqwest::Client::builder()
226 .timeout(DEFAULT_TIMEOUT)
227 .default_headers(headers)
228 .build()
229 .map_err(|e| SdkError::Http {
230 source: e,
231 context: "build reqwest::Client".into(),
232 })?;
233 Ok(Self {
234 transport: AdminTransport::Http {
235 http,
236 base_url: normalize_base_url(base_url.into()),
237 },
238 })
239 }
240
241 /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
242 ///
243 /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
244 /// server-side one: the request carries lane + identity +
245 /// capabilities + grant TTL; the server runs budget, quota, and
246 /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
247 /// and returns a `ClaimGrant` on success.
248 ///
249 /// Returns `Ok(None)` when the server responds 204 No Content
250 /// (no eligible execution on the lane). Callers that want to keep
251 /// polling should back off per their claim cadence.
252 pub async fn claim_for_worker(
253 &self,
254 req: ClaimForWorkerRequest,
255 ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
256 match &self.transport {
257 AdminTransport::Http { http, base_url } => {
258 claim_for_worker_http(http, base_url, req).await
259 }
260 AdminTransport::Embedded(backend) => {
261 claim_for_worker_embedded(backend.as_ref(), req).await
262 }
263 }
264 }
265
266 /// Rotate the waitpoint HMAC secret on the server.
267 ///
268 /// Promotes the currently-installed kid to `previous_kid`
269 /// (accepted for the server's configured
270 /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
271 /// `new_secret_hex` under `new_kid` as the new current. Fans
272 /// out across every execution partition. Idempotent: re-running
273 /// with the same `(new_kid, new_secret_hex)` converges.
274 ///
275 /// The server returns 200 if at least one partition rotated OR
276 /// at least one partition was already rotating under a
277 /// concurrent request. See `RotateWaitpointSecretResponse`
278 /// fields for the breakdown.
279 ///
280 /// # Errors
281 ///
282 /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
283 /// input, 401 missing/bad bearer, 429 concurrent rotate,
284 /// 500 all partitions failed, 504 server-side timeout).
285 /// * [`SdkError::Http`] — transport error (connect, body
286 /// decode, client-side timeout).
287 ///
288 /// # Retry semantics
289 ///
290 /// Rotation is idempotent on the same `(new_kid,
291 /// new_secret_hex)` so retries are SAFE even on 504s or
292 /// partial failures.
293 pub async fn rotate_waitpoint_secret(
294 &self,
295 req: RotateWaitpointSecretRequest,
296 ) -> Result<RotateWaitpointSecretResponse, SdkError> {
297 match &self.transport {
298 AdminTransport::Http { http, base_url } => {
299 rotate_waitpoint_secret_http(http, base_url, req).await
300 }
301 AdminTransport::Embedded(backend) => {
302 rotate_waitpoint_secret_embedded(backend.as_ref(), req).await
303 }
304 }
305 }
306
307 /// Request a lease-reclaim grant for an execution in
308 /// `lease_expired_reclaimable` or `lease_revoked` state (RFC-024
309 /// §3.5).
310 ///
311 /// Routes `POST /v1/executions/{execution_id}/reclaim`. The
312 /// ff-server handler dispatches through the `EngineBackend` trait
313 /// to whichever backend the server was started with (Valkey /
314 /// Postgres / SQLite).
315 ///
316 /// # worker_capabilities (RFC-024 §3.2 B-2)
317 ///
318 /// The request body carries `worker_capabilities`. Consumers typically
319 /// source these from their registered worker's configured
320 /// `WorkerConfig::capabilities`. Admission compares
321 /// `worker_capabilities` against the execution's
322 /// `required_capabilities` (persisted on `exec_core` at
323 /// `create_execution` time from
324 /// `ExecutionPolicy.routing_requirements.required_capabilities`);
325 /// any required capability missing from the worker set surfaces as
326 /// `IssueReclaimGrantResponse::NotReclaimable { detail:
327 /// "capability_mismatch: <missing csv>" }` (Lua
328 /// `ff_issue_reclaim_grant`, `crates/ff-script/src/flowfabric.lua`
329 /// §3969-3982; sqlite/PG backends mirror the check). The SDK does
330 /// not re-read worker state automatically — admin clients are not
331 /// bound to a worker — so the consumer threads the capabilities
332 /// through at call-time.
333 ///
334 /// `capability_hash` is NOT consulted for admission; it is stored
335 /// verbatim on the grant hash for audit / downstream observability
336 /// only.
337 ///
338 /// # Consumer flow (RFC-024 §4.4)
339 ///
340 /// 1. Consumer's `POST /v1/runs/:id/complete` returns
341 /// `lease_expired`.
342 /// 2. Consumer calls this method; handles
343 /// [`IssueReclaimGrantResponse::Granted`] → builds a
344 /// [`ff_core::contracts::ReclaimGrant`] via
345 /// [`IssueReclaimGrantResponse::into_grant`].
346 /// 3. Consumer passes the grant to
347 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`] along
348 /// with a fresh [`ff_core::contracts::ReclaimExecutionArgs`];
349 /// the new attempt is minted with `HandleKind::Reclaimed`.
350 /// 4. Consumer drives terminal writes on the fresh lease.
351 ///
352 /// # Errors
353 ///
354 /// * [`SdkError::AdminApi`] — non-2xx response. 404 when the
355 /// execution does not exist; 400 on malformed `execution_id` or
356 /// body.
357 /// * [`SdkError::Http`] — transport error (connect, body
358 /// decode, client-side timeout).
359 ///
360 /// # Retry semantics
361 ///
362 /// Idempotent on the Lua side: repeated calls against an execution
363 /// already re-leased (a concurrent reclaim beat this one) surface
364 /// as `NotReclaimable`. Safe to retry on transport faults.
365 pub async fn issue_reclaim_grant(
366 &self,
367 execution_id: &str,
368 req: IssueReclaimGrantRequest,
369 ) -> Result<IssueReclaimGrantResponse, SdkError> {
370 match &self.transport {
371 AdminTransport::Http { http, base_url } => {
372 issue_reclaim_grant_http(http, base_url, execution_id, req).await
373 }
374 AdminTransport::Embedded(backend) => {
375 issue_reclaim_grant_embedded(backend.as_ref(), execution_id, req).await
376 }
377 }
378 }
379}
380
381// ── HTTP-transport helpers (private) ─────────────────────────────────
382
383async fn claim_for_worker_http(
384 http: &reqwest::Client,
385 base_url: &str,
386 req: ClaimForWorkerRequest,
387) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
388 // Percent-encode `worker_id` in the URL path — `WorkerId` is a
389 // free-form string (could contain `/`, spaces, `%`, etc.) and
390 // splicing it verbatim would produce malformed URLs or
391 // misrouted paths. `Url::path_segments_mut().push` handles the
392 // encoding natively.
393 let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
394 context: "admin_client: claim_for_worker".into(),
395 field: Some("base_url".into()),
396 message: format!("invalid base_url '{}': {e}", base_url),
397 })?;
398 {
399 let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
400 context: "admin_client: claim_for_worker".into(),
401 field: Some("base_url".into()),
402 message: format!("base_url cannot be a base URL: '{}'", base_url),
403 })?;
404 segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
405 }
406 let url = url.to_string();
407 let resp = http
408 .post(&url)
409 .json(&req)
410 .send()
411 .await
412 .map_err(|e| SdkError::Http {
413 source: e,
414 context: "POST /v1/workers/{worker_id}/claim".into(),
415 })?;
416
417 let status = resp.status();
418 if status == reqwest::StatusCode::NO_CONTENT {
419 return Ok(None);
420 }
421 if status.is_success() {
422 return resp
423 .json::<ClaimForWorkerResponse>()
424 .await
425 .map(Some)
426 .map_err(|e| SdkError::Http {
427 source: e,
428 context: "decode claim_for_worker response body".into(),
429 });
430 }
431
432 // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
433 let status_u16 = status.as_u16();
434 let raw = resp.text().await.map_err(|e| SdkError::Http {
435 source: e,
436 context: format!("read claim_for_worker error body (status {status_u16})"),
437 })?;
438 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
439 Err(SdkError::AdminApi {
440 status: status_u16,
441 message: parsed
442 .as_ref()
443 .map(|b| b.error.clone())
444 .unwrap_or_else(|| raw.clone()),
445 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
446 retryable: parsed.as_ref().and_then(|b| b.retryable),
447 raw_body: raw,
448 })
449}
450
451async fn rotate_waitpoint_secret_http(
452 http: &reqwest::Client,
453 base_url: &str,
454 req: RotateWaitpointSecretRequest,
455) -> Result<RotateWaitpointSecretResponse, SdkError> {
456 let url = format!("{}/v1/admin/rotate-waitpoint-secret", base_url);
457 let resp = http
458 .post(&url)
459 .json(&req)
460 .send()
461 .await
462 .map_err(|e| SdkError::Http {
463 source: e,
464 context: "POST /v1/admin/rotate-waitpoint-secret".into(),
465 })?;
466
467 let status = resp.status();
468 if status.is_success() {
469 return resp
470 .json::<RotateWaitpointSecretResponse>()
471 .await
472 .map_err(|e| SdkError::Http {
473 source: e,
474 context: "decode rotate-waitpoint-secret response body".into(),
475 });
476 }
477
478 // Non-2xx: parse the server's ErrorBody if we can, fall
479 // back to a raw body otherwise. Propagate body-read
480 // transport errors as Http rather than silently flattening
481 // them into `AdminApi { raw_body: "" }` — a connection drop
482 // mid-body-read is a transport fault, not an API-layer
483 // reject, and misclassifying it strips `is_retryable`'s
484 // timeout/connect signal from the caller.
485 let status_u16 = status.as_u16();
486 let raw = resp.text().await.map_err(|e| SdkError::Http {
487 source: e,
488 context: format!(
489 "read rotate-waitpoint-secret error response body (status {status_u16})"
490 ),
491 })?;
492 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
493 Err(SdkError::AdminApi {
494 status: status_u16,
495 message: parsed
496 .as_ref()
497 .map(|b| b.error.clone())
498 .unwrap_or_else(|| raw.clone()),
499 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
500 retryable: parsed.as_ref().and_then(|b| b.retryable),
501 raw_body: raw,
502 })
503}
504
505async fn issue_reclaim_grant_http(
506 http: &reqwest::Client,
507 base_url: &str,
508 execution_id: &str,
509 req: IssueReclaimGrantRequest,
510) -> Result<IssueReclaimGrantResponse, SdkError> {
511 // Percent-encode `execution_id` in the URL path — the id is a
512 // free-form string and splicing verbatim would produce
513 // malformed URLs. Mirrors `claim_for_worker`'s handling.
514 let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
515 context: "admin_client: issue_reclaim_grant".into(),
516 field: Some("base_url".into()),
517 message: format!("invalid base_url '{}': {e}", base_url),
518 })?;
519 {
520 let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
521 context: "admin_client: issue_reclaim_grant".into(),
522 field: Some("base_url".into()),
523 message: format!("base_url cannot be a base URL: '{}'", base_url),
524 })?;
525 segs.extend(&["v1", "executions", execution_id, "reclaim"]);
526 }
527 let url = url.to_string();
528 let resp = http
529 .post(&url)
530 .json(&req)
531 .send()
532 .await
533 .map_err(|e| SdkError::Http {
534 source: e,
535 context: "POST /v1/executions/{id}/reclaim".into(),
536 })?;
537
538 let status = resp.status();
539 if status.is_success() {
540 return resp
541 .json::<IssueReclaimGrantResponse>()
542 .await
543 .map_err(|e| SdkError::Http {
544 source: e,
545 context: "decode issue_reclaim_grant response body".into(),
546 });
547 }
548
549 let status_u16 = status.as_u16();
550 let raw = resp.text().await.map_err(|e| SdkError::Http {
551 source: e,
552 context: format!(
553 "read issue_reclaim_grant error body (status {status_u16})"
554 ),
555 })?;
556 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
557 Err(SdkError::AdminApi {
558 status: status_u16,
559 message: parsed
560 .as_ref()
561 .map(|b| b.error.clone())
562 .unwrap_or_else(|| raw.clone()),
563 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
564 retryable: parsed.as_ref().and_then(|b| b.retryable),
565 raw_body: raw,
566 })
567}
568
569// ── Embedded-transport helpers (private) ─────────────────────────────
570//
571// Dispatch directly through the `EngineBackend` trait. The request
572// body validation mirrors the server-side handler in
573// `ff-server::api`. Translation between the wire DTOs and the core
574// `contracts::*` types lives here so consumers get identical
575// surfaces across transports.
576
577/// Validate a free-form identifier the same way `ff-server`'s
578/// `validate_identifier` does: non-empty, ≤256 bytes, no whitespace
579/// or control chars. Embedded-transport callers hit this before the
580/// request reaches the backend so invalid identifiers cannot leak
581/// past the SDK's parity guarantee.
582fn validate_admin_identifier(
583 op: &'static str,
584 field: &'static str,
585 value: &str,
586) -> Result<(), SdkError> {
587 if value.is_empty() {
588 return Err(SdkError::Config {
589 context: format!("admin_client: {op}"),
590 field: Some(field.into()),
591 message: "must not be empty".into(),
592 });
593 }
594 if value.len() > 256 {
595 return Err(SdkError::Config {
596 context: format!("admin_client: {op}"),
597 field: Some(field.into()),
598 message: format!("exceeds 256 bytes (got {})", value.len()),
599 });
600 }
601 if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
602 return Err(SdkError::Config {
603 context: format!("admin_client: {op}"),
604 field: Some(field.into()),
605 message: "must not contain whitespace or control characters".into(),
606 });
607 }
608 Ok(())
609}
610
611/// Bounded grant-TTL check mirroring `ff-server`'s
612/// `1..=CLAIM_GRANT_TTL_MS_MAX`. Shared between `claim_for_worker`
613/// and `issue_reclaim_grant` embedded paths.
614fn validate_admin_grant_ttl(op: &'static str, grant_ttl_ms: u64) -> Result<(), SdkError> {
615 if grant_ttl_ms == 0 || grant_ttl_ms > EMBEDDED_GRANT_TTL_MS_MAX {
616 return Err(SdkError::Config {
617 context: format!("admin_client: {op}"),
618 field: Some("grant_ttl_ms".into()),
619 message: format!("must be in 1..={EMBEDDED_GRANT_TTL_MS_MAX}"),
620 });
621 }
622 Ok(())
623}
624
625/// Map an `EngineError` from a backend call into the `SdkError::AdminApi`
626/// surface so embedded and HTTP transports emit the same shape. `Unavailable`
627/// becomes 503 with kind `"unavailable"`; every other engine error bubbles
628/// up via `SdkError::Engine`.
629fn engine_err_to_admin(err: ff_core::engine_error::EngineError, op: &str) -> SdkError {
630 if let ff_core::engine_error::EngineError::Unavailable { op: backend_op } = &err {
631 return SdkError::AdminApi {
632 status: 503,
633 message: format!(
634 "{op}: backend does not implement '{backend_op}' on this transport"
635 ),
636 kind: Some("unavailable".to_owned()),
637 retryable: Some(false),
638 raw_body: String::new(),
639 };
640 }
641 SdkError::Engine(Box::new(err))
642}
643
644async fn claim_for_worker_embedded(
645 backend: &dyn EngineBackend,
646 req: ClaimForWorkerRequest,
647) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
648 // Mirror ff-server's validation + translation. Errors surface as
649 // SdkError::Config so consumers see validation faults loud rather
650 // than as backend transport errors.
651 let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
652 context: "admin_client: claim_for_worker".into(),
653 field: Some("lane_id".into()),
654 message: e.to_string(),
655 })?;
656 validate_admin_identifier("claim_for_worker", "worker_id", &req.worker_id)?;
657 validate_admin_identifier(
658 "claim_for_worker",
659 "worker_instance_id",
660 &req.worker_instance_id,
661 )?;
662 validate_admin_grant_ttl("claim_for_worker", req.grant_ttl_ms)?;
663 let caps: std::collections::BTreeSet<String> = req.capabilities.into_iter().collect();
664 let args = ff_core::contracts::ClaimForWorkerArgs::new(
665 lane_id,
666 ff_core::types::WorkerId::new(req.worker_id),
667 ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
668 caps,
669 req.grant_ttl_ms,
670 );
671 match backend
672 .claim_for_worker(args)
673 .await
674 .map_err(|e| engine_err_to_admin(e, "claim_for_worker"))?
675 {
676 ff_core::contracts::ClaimForWorkerOutcome::NoWork => Ok(None),
677 ff_core::contracts::ClaimForWorkerOutcome::Granted(grant) => {
678 Ok(Some(ClaimForWorkerResponse {
679 execution_id: grant.execution_id.to_string(),
680 partition_key: grant.partition_key,
681 grant_key: grant.grant_key,
682 expires_at_ms: grant.expires_at_ms,
683 }))
684 }
685 // `ClaimForWorkerOutcome` is `#[non_exhaustive]`; mirror
686 // ff-server's 503 policy on unknown variants.
687 _ => Err(SdkError::AdminApi {
688 status: 503,
689 message: "claim_for_worker: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
690 kind: Some("unknown_outcome".to_owned()),
691 retryable: Some(false),
692 raw_body: String::new(),
693 }),
694 }
695}
696
697async fn issue_reclaim_grant_embedded(
698 backend: &dyn EngineBackend,
699 execution_id: &str,
700 req: IssueReclaimGrantRequest,
701) -> Result<IssueReclaimGrantResponse, SdkError> {
702 let exec_id = ff_core::types::ExecutionId::parse(execution_id).map_err(|e| SdkError::Config {
703 context: "admin_client: issue_reclaim_grant".into(),
704 field: Some("execution_id".into()),
705 message: e.to_string(),
706 })?;
707 let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
708 context: "admin_client: issue_reclaim_grant".into(),
709 field: Some("lane_id".into()),
710 message: e.to_string(),
711 })?;
712 validate_admin_identifier("issue_reclaim_grant", "worker_id", &req.worker_id)?;
713 validate_admin_identifier(
714 "issue_reclaim_grant",
715 "worker_instance_id",
716 &req.worker_instance_id,
717 )?;
718 validate_admin_grant_ttl("issue_reclaim_grant", req.grant_ttl_ms)?;
719 let caps: std::collections::BTreeSet<String> = req.worker_capabilities.into_iter().collect();
720 let args = ff_core::contracts::IssueReclaimGrantArgs::new(
721 exec_id,
722 ff_core::types::WorkerId::new(req.worker_id),
723 ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
724 lane_id,
725 req.capability_hash,
726 req.grant_ttl_ms,
727 req.route_snapshot_json,
728 req.admission_summary,
729 caps,
730 ff_core::types::TimestampMs::now(),
731 );
732 match backend
733 .issue_reclaim_grant(args)
734 .await
735 .map_err(|e| engine_err_to_admin(e, "issue_reclaim_grant"))?
736 {
737 ff_core::contracts::IssueReclaimGrantOutcome::Granted(grant) => {
738 Ok(IssueReclaimGrantResponse::Granted {
739 execution_id: grant.execution_id.to_string(),
740 partition_key: grant.partition_key,
741 grant_key: grant.grant_key,
742 expires_at_ms: grant.expires_at_ms,
743 lane_id: grant.lane_id.as_str().to_owned(),
744 })
745 }
746 ff_core::contracts::IssueReclaimGrantOutcome::NotReclaimable { execution_id, detail } => {
747 Ok(IssueReclaimGrantResponse::NotReclaimable {
748 execution_id: execution_id.to_string(),
749 detail,
750 })
751 }
752 ff_core::contracts::IssueReclaimGrantOutcome::ReclaimCapExceeded {
753 execution_id,
754 reclaim_count,
755 } => Ok(IssueReclaimGrantResponse::ReclaimCapExceeded {
756 execution_id: execution_id.to_string(),
757 reclaim_count,
758 }),
759 _ => Err(SdkError::AdminApi {
760 status: 503,
761 message: "issue_reclaim_grant: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
762 kind: Some("unknown_outcome".to_owned()),
763 retryable: Some(false),
764 raw_body: String::new(),
765 }),
766 }
767}
768
769async fn rotate_waitpoint_secret_embedded(
770 backend: &dyn EngineBackend,
771 req: RotateWaitpointSecretRequest,
772) -> Result<RotateWaitpointSecretResponse, SdkError> {
773 // Validation mirrors `ff-server::Server::rotate_waitpoint_secret`
774 // so the embedded and HTTP transports reject the same invalid
775 // inputs.
776 if req.new_kid.is_empty() || req.new_kid.contains(':') {
777 return Err(SdkError::Config {
778 context: "admin_client: rotate_waitpoint_secret".into(),
779 field: Some("new_kid".into()),
780 message: "must be non-empty and must not contain ':'".into(),
781 });
782 }
783 if req.new_secret_hex.is_empty()
784 || !req.new_secret_hex.len().is_multiple_of(2)
785 || !req.new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
786 {
787 return Err(SdkError::Config {
788 context: "admin_client: rotate_waitpoint_secret".into(),
789 field: Some("new_secret_hex".into()),
790 message: "must be a non-empty even-length hex string".into(),
791 });
792 }
793 // Embedded consumers have no config surface from which to read
794 // the per-deployment grace window, so pin the documented default
795 // matching `ff-server`'s `FF_WAITPOINT_HMAC_GRACE_MS` (24 h). See
796 // `EMBEDDED_WAITPOINT_HMAC_GRACE_MS` + the `connect_with` rustdoc
797 // for the rationale and divergence-from-HTTP notes. Unlike the
798 // HTTP handler, this path does not enforce the 120 s end-to-end
799 // timeout (no HTTP deadline to honour) and does not emit the
800 // `audit`-target `waitpoint_hmac_rotation_*` events (those are
801 // server-owned operator signals). Rotation is idempotent on the
802 // same (new_kid, new_secret_hex) pair, so retries remain safe.
803 let args = ff_core::contracts::RotateWaitpointHmacSecretAllArgs::new(
804 req.new_kid.clone(),
805 req.new_secret_hex,
806 EMBEDDED_WAITPOINT_HMAC_GRACE_MS,
807 );
808 let result = backend
809 .rotate_waitpoint_hmac_secret_all(args)
810 .await
811 .map_err(|e| engine_err_to_admin(e, "rotate_waitpoint_secret"))?;
812
813 // Collapse the per-partition entries into the HTTP response shape
814 // the server emits — rotated count + failed indices + echoed
815 // new_kid — so consumers see identical return values across
816 // transports.
817 let mut rotated: u16 = 0;
818 let mut failed: Vec<u16> = Vec::new();
819 for entry in &result.entries {
820 match &entry.result {
821 Ok(_) => {
822 rotated = rotated.saturating_add(1);
823 }
824 Err(_) => failed.push(entry.partition),
825 }
826 }
827 Ok(RotateWaitpointSecretResponse {
828 rotated,
829 failed,
830 new_kid: req.new_kid,
831 })
832}
833
834/// Request body for `POST /v1/executions/{execution_id}/reclaim`
835/// (RFC-024 §3.5).
836///
837/// Mirrors `ff_server::api::IssueReclaimGrantBody` 1:1. The
838/// `execution_id` goes in the URL path, not the body.
839#[derive(Debug, Clone, Serialize)]
840pub struct IssueReclaimGrantRequest {
841 /// Worker identity requesting the grant. The Lua
842 /// `ff_reclaim_execution` validates grant consumption via
843 /// `grant.worker_id == args.worker_id` (RFC-024 §4.4) — the
844 /// worker consuming the grant must match this value.
845 pub worker_id: String,
846 /// Worker-instance identity. Informational at grant-issuance
847 /// time; stored on the grant so consumers can correlate events.
848 pub worker_instance_id: String,
849 /// Lane the execution belongs to. Needed by
850 /// `ff_issue_reclaim_grant` for `KEYS[*]` construction.
851 pub lane_id: String,
852 /// Opaque capability-hash token stored verbatim on the issued
853 /// grant for audit / downstream observability. NOT used for
854 /// admission — admission compares `worker_capabilities` against
855 /// the execution's `required_capabilities` (see the
856 /// [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc).
857 /// `None` leaves the field empty on the grant.
858 #[serde(default, skip_serializing_if = "Option::is_none")]
859 pub capability_hash: Option<String>,
860 /// Grant TTL in milliseconds. Bounded server-side.
861 pub grant_ttl_ms: u64,
862 /// Route snapshot JSON carried onto the grant for audit.
863 #[serde(default, skip_serializing_if = "Option::is_none")]
864 pub route_snapshot_json: Option<String>,
865 /// Admission summary string carried onto the grant for audit.
866 #[serde(default, skip_serializing_if = "Option::is_none")]
867 pub admission_summary: Option<String>,
868 /// Worker capability tokens. Consumers typically source these
869 /// from their registered worker's `WorkerConfig::capabilities`
870 /// (see [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc
871 /// for the override contract).
872 #[serde(default)]
873 pub worker_capabilities: Vec<String>,
874}
875
876/// Response body for `POST /v1/executions/{execution_id}/reclaim`
877/// (RFC-024 §3.5).
878///
879/// The server serializes this struct with a `status` discriminator so
880/// consumers can match on structured outcomes without re-parsing a
881/// 200-vs-4xx split for business-logic outcomes (mirrors
882/// `RotateWaitpointSecretResponse`'s precedent).
883#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
884#[serde(tag = "status", rename_all = "snake_case")]
885pub enum IssueReclaimGrantResponse {
886 /// Grant issued. Build a
887 /// [`ff_core::contracts::ReclaimGrant`] via
888 /// [`Self::into_grant`] and feed it to
889 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
890 Granted {
891 execution_id: String,
892 partition_key: ff_core::partition::PartitionKey,
893 grant_key: String,
894 expires_at_ms: u64,
895 lane_id: String,
896 },
897 /// Execution is not in a reclaimable state (not
898 /// `lease_expired_reclaimable` / `lease_revoked`).
899 NotReclaimable {
900 execution_id: String,
901 detail: String,
902 },
903 /// `max_reclaim_count` exceeded; execution transitioned to
904 /// terminal_failed. Consumers stop retrying and surface a
905 /// structural failure.
906 ReclaimCapExceeded {
907 execution_id: String,
908 reclaim_count: u32,
909 },
910}
911
912impl IssueReclaimGrantResponse {
913 /// Convert a [`Self::Granted`] response into a typed
914 /// [`ff_core::contracts::ReclaimGrant`] for handoff to
915 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
916 ///
917 /// Returns [`SdkError::AdminApi`] when the wire variant is not
918 /// `Granted` (consumer asked for a grant but the server replied
919 /// with a terminal outcome) or when `execution_id` / `lane_id`
920 /// are malformed — the latter signals a drift between server and
921 /// SDK, so failing loud prevents silent misrouting.
922 pub fn into_grant(self) -> Result<ff_core::contracts::ReclaimGrant, SdkError> {
923 match self {
924 IssueReclaimGrantResponse::Granted {
925 execution_id,
926 partition_key,
927 grant_key,
928 expires_at_ms,
929 lane_id,
930 } => {
931 let eid = ff_core::types::ExecutionId::parse(&execution_id)
932 .map_err(|e| SdkError::AdminApi {
933 status: 200,
934 message: format!(
935 "issue_reclaim_grant: server returned malformed execution_id '{execution_id}': {e}"
936 ),
937 kind: Some("malformed_response".to_owned()),
938 retryable: Some(false),
939 raw_body: String::new(),
940 })?;
941 let lane = ff_core::types::LaneId::try_new(lane_id.clone())
942 .map_err(|e| SdkError::AdminApi {
943 status: 200,
944 message: format!(
945 "issue_reclaim_grant: server returned malformed lane_id '{lane_id}': {e}"
946 ),
947 kind: Some("malformed_response".to_owned()),
948 retryable: Some(false),
949 raw_body: String::new(),
950 })?;
951 Ok(ff_core::contracts::ReclaimGrant::new(
952 eid,
953 partition_key,
954 grant_key,
955 expires_at_ms,
956 lane,
957 ))
958 }
959 IssueReclaimGrantResponse::NotReclaimable { execution_id, detail } => {
960 Err(SdkError::AdminApi {
961 status: 200,
962 message: format!(
963 "issue_reclaim_grant: execution '{execution_id}' not reclaimable: {detail}"
964 ),
965 kind: Some("not_reclaimable".to_owned()),
966 retryable: Some(false),
967 raw_body: String::new(),
968 })
969 }
970 IssueReclaimGrantResponse::ReclaimCapExceeded {
971 execution_id,
972 reclaim_count,
973 } => Err(SdkError::AdminApi {
974 status: 200,
975 message: format!(
976 "issue_reclaim_grant: execution '{execution_id}' hit reclaim cap ({reclaim_count})"
977 ),
978 kind: Some("reclaim_cap_exceeded".to_owned()),
979 retryable: Some(false),
980 raw_body: String::new(),
981 }),
982 }
983 }
984}
985
986/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
987///
988/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
989#[derive(Debug, Clone, Serialize)]
990pub struct RotateWaitpointSecretRequest {
991 /// New key identifier. Non-empty, must not contain `:` (the
992 /// server uses `:` as the field separator in the secret hash).
993 pub new_kid: String,
994 /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
995 pub new_secret_hex: String,
996}
997
998/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
999///
1000/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
1001/// The server serializes this struct as-is via `Json(result)`.
1002#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1003pub struct RotateWaitpointSecretResponse {
1004 /// Count of partitions that accepted the rotation.
1005 pub rotated: u16,
1006 /// Partition indices where the rotation failed — operator
1007 /// should investigate. Rotation is idempotent on the same
1008 /// `(new_kid, new_secret_hex)` so a retry after the underlying
1009 /// fault clears converges.
1010 pub failed: Vec<u16>,
1011 /// The `new_kid` that was installed as current on every
1012 /// rotated partition — echoes the request field back for
1013 /// confirmation.
1014 pub new_kid: String,
1015}
1016
1017/// Server-side error body shape, as emitted by
1018/// `ff_server::api::ErrorBody`. Kept internal because consumers
1019/// match on the flattened fields of [`SdkError::AdminApi`].
1020#[derive(Debug, Clone, Deserialize)]
1021struct AdminErrorBody {
1022 error: String,
1023 #[serde(default)]
1024 kind: Option<String>,
1025 #[serde(default)]
1026 retryable: Option<bool>,
1027}
1028
1029/// Request body for `POST /v1/workers/{worker_id}/claim`.
1030///
1031/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
1032/// goes in the URL path (not the body) but is kept on the struct
1033/// for ergonomics — callers don't juggle a separate arg.
1034#[derive(Debug, Clone, Serialize)]
1035pub struct ClaimForWorkerRequest {
1036 #[serde(skip)]
1037 pub worker_id: String,
1038 pub lane_id: String,
1039 pub worker_instance_id: String,
1040 #[serde(default)]
1041 pub capabilities: Vec<String>,
1042 /// Grant TTL in milliseconds. Server rejects 0 or anything over
1043 /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
1044 pub grant_ttl_ms: u64,
1045}
1046
1047/// Response body for `POST /v1/workers/{worker_id}/claim`.
1048///
1049/// Wire shape of `ff_core::contracts::ClaimGrant`. Carries the opaque
1050/// [`ff_core::partition::PartitionKey`] directly on the wire (issue
1051/// #91); the SDK reconstructs the core type via [`Self::into_grant`].
1052#[derive(Debug, Clone, Deserialize)]
1053pub struct ClaimForWorkerResponse {
1054 pub execution_id: String,
1055 pub partition_key: ff_core::partition::PartitionKey,
1056 pub grant_key: String,
1057 pub expires_at_ms: u64,
1058}
1059
1060impl ClaimForWorkerResponse {
1061 /// Convert the wire DTO into a typed
1062 /// [`ff_core::contracts::ClaimGrant`] for handoff to
1063 /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
1064 /// [`SdkError::AdminApi`] on malformed execution_id — a drift
1065 /// signal that the server and SDK disagree on the wire shape, so
1066 /// failing loud prevents routing to a ghost partition.
1067 ///
1068 /// The `partition_key` itself is not eagerly parsed here: it is
1069 /// carried opaquely to the `claim_from_grant` hot path, which
1070 /// parses it there and surfaces a typed error on malformed keys.
1071 pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
1072 let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
1073 .map_err(|e| SdkError::AdminApi {
1074 status: 200,
1075 message: format!(
1076 "claim_for_worker: server returned malformed execution_id '{}': {e}",
1077 self.execution_id
1078 ),
1079 kind: Some("malformed_response".to_owned()),
1080 retryable: Some(false),
1081 raw_body: String::new(),
1082 })?;
1083 Ok(ff_core::contracts::ClaimGrant::new(
1084 execution_id,
1085 self.partition_key,
1086 self.grant_key,
1087 self.expires_at_ms,
1088 ))
1089 }
1090}
1091
1092/// Per-partition outcome of a cluster-wide waitpoint HMAC secret
1093/// rotation. Returned by [`rotate_waitpoint_hmac_secret_all_partitions`]
1094/// so operators can audit which partitions rotated vs. no-op'd vs. failed.
1095///
1096/// The index is the execution-partition index (`0..num_partitions`),
1097/// matching `{fp:N}` in the keyspace.
1098#[derive(Debug)]
1099pub struct PartitionRotationOutcome {
1100 /// Execution partition index (`0..num_partitions`).
1101 pub partition: u16,
1102 /// FCALL outcome on this partition, or the error it raised.
1103 pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
1104}
1105
1106/// Rotate the waitpoint HMAC secret across every execution partition
1107/// by fanning out the `ff_rotate_waitpoint_hmac_secret` FCALL.
1108///
1109/// This is the canonical Rust-side rotation path for direct-Valkey
1110/// consumers (e.g. cairn-fabric) that cannot route through the
1111/// `ff-server` admin REST endpoint. Callers who have an HTTP-reachable
1112/// `ff-server` should prefer [`FlowFabricAdminClient::rotate_waitpoint_secret`] —
1113/// that path adds a single-writer admission gate, parallel fan-out,
1114/// structured audit events, and the server's configured grace window.
1115///
1116/// # Production rotation recipe
1117///
1118/// Operators MUST coordinate so secret rotation **precedes** any
1119/// waitpoint resolution that will present the new `kid`. The broad
1120/// sequence:
1121///
1122/// 1. Pick a fresh `new_kid` (must NOT contain `:` — the server uses
1123/// `:` as the field separator in the secret hash).
1124/// 2. Call this helper with the previous `kid`'s grace window
1125/// (`grace_ms` — the duration during which tokens signed by the
1126/// outgoing secret remain valid).
1127/// 3. Only after this call returns with all partitions `Ok(_)` (either
1128/// `Rotated` or `Noop`), begin signing new tokens with `new_kid`.
1129/// 4. Retain the previous secret in the keystore until the grace
1130/// window elapses — the FCALL handles GC of expired kids on every
1131/// rotation, so just don't rotate again before the grace window.
1132///
1133/// See RFC-004 §rotation for the full 4-key HSET + `previous_expires_at`
1134/// dance the FCALL implements server-side.
1135///
1136/// # Idempotency
1137///
1138/// Each partition FCALL is idempotent on the same `(new_kid,
1139/// new_secret_hex)` pair: a replay with identical args returns
1140/// `RotateWaitpointHmacSecretOutcome::Noop`. A same-kid-different-secret
1141/// replay surfaces as a per-partition `SdkError` (wrapping
1142/// `ScriptError::RotationConflict`) — pick a fresh `new_kid` to recover.
1143///
1144/// # Error semantics
1145///
1146/// A per-partition FCALL failure (transport fault, rotation conflict,
1147/// etc.) is recorded on that partition's [`PartitionRotationOutcome`]
1148/// and fan-out **continues** — the contract matches the server's
1149/// `rotate_waitpoint_secret` (partial success is allowed, operators
1150/// retry on the failed partition subset). Returning `Vec<_>` (not
1151/// `Result<Vec<_>, _>`) is deliberate: every whole-call invariant is
1152/// enforced by the underlying FCALL on each partition (kid non-empty,
1153/// no `:`, even-length hex, etc.), so the aggregate has nothing left
1154/// to reject at the Rust boundary. Callers decide how to treat partial
1155/// failures (fail loud / retry the subset / record metrics).
1156///
1157/// # Concurrency + performance
1158///
1159/// Sequential (one partition at a time) to keep the helper dependency-
1160/// free: no `futures::stream` / tokio-specific primitives on the caller
1161/// path. For a cluster with N partitions and per-partition RTT R, the
1162/// total duration is ~N*R. Consumers needing parallel fan-out should
1163/// wrap this with `FuturesUnordered` themselves, or use the server
1164/// admin endpoint (which fans out with bounded concurrency = 16).
1165///
1166/// # Test harness
1167///
1168/// The `ff-test::fixtures::TestCluster::rotate_waitpoint_hmac_secret`
1169/// method is a thin wrapper around this helper — integration tests and
1170/// production code exercise the same code path.
1171///
1172/// # Example
1173///
1174/// ```rust,ignore
1175/// use ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions;
1176///
1177/// let results = rotate_waitpoint_hmac_secret_all_partitions(
1178/// &client,
1179/// partition_config.num_flow_partitions,
1180/// "kid-2026-04-22",
1181/// "deadbeef...64-hex-chars...",
1182/// 60_000,
1183/// )
1184/// .await?;
1185///
1186/// for entry in &results {
1187/// match &entry.result {
1188/// Ok(outcome) => tracing::info!(partition = entry.partition, ?outcome, "rotated"),
1189/// Err(e) => tracing::error!(partition = entry.partition, %e, "rotation failed"),
1190/// }
1191/// }
1192/// ```
1193// v0.12 PR-6: the `admin` module is ungated at module level so
1194// consumers under `--no-default-features --features sqlite` can reach
1195// the HTTP admin client surface. This helper is the one remaining
1196// Valkey-typed item in the module (takes a `&ferriskey::Client` and
1197// fans out `ff_rotate_waitpoint_hmac_secret` FCALLs), so it stays
1198// `valkey-default`-gated. See `lib.rs` PR-6 comment for the Option 1
1199// / Option 2 decision.
1200#[cfg(feature = "valkey-default")]
1201pub async fn rotate_waitpoint_hmac_secret_all_partitions(
1202 client: &ferriskey::Client,
1203 num_partitions: u16,
1204 new_kid: &str,
1205 new_secret_hex: &str,
1206 grace_ms: u64,
1207) -> Vec<PartitionRotationOutcome> {
1208 // Hoisted out of the loop — `ff_rotate_waitpoint_hmac_secret` only
1209 // borrows the args, so every partition can reuse the same struct.
1210 // Avoids N × 2 string clones on the hot fan-out path.
1211 let args = RotateWaitpointHmacSecretArgs {
1212 new_kid: new_kid.to_owned(),
1213 new_secret_hex: new_secret_hex.to_owned(),
1214 grace_ms,
1215 };
1216 let mut out = Vec::with_capacity(num_partitions as usize);
1217 for index in 0..num_partitions {
1218 let partition = Partition {
1219 family: PartitionFamily::Execution,
1220 index,
1221 };
1222 let idx = IndexKeys::new(&partition);
1223 let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
1224 client, &idx, &args,
1225 )
1226 .await
1227 .map_err(SdkError::from);
1228 out.push(PartitionRotationOutcome {
1229 partition: index,
1230 result,
1231 });
1232 }
1233 out
1234}
1235
1236/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
1237/// never produces `https://host//v1/...`. Mirror of
1238/// media-pipeline's pattern.
1239fn normalize_base_url(mut url: String) -> String {
1240 while url.ends_with('/') {
1241 url.pop();
1242 }
1243 url
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248 use super::*;
1249
1250 #[test]
1251 fn base_url_strips_trailing_slash() {
1252 assert_eq!(normalize_base_url("http://x".into()), "http://x");
1253 assert_eq!(normalize_base_url("http://x/".into()), "http://x");
1254 assert_eq!(normalize_base_url("http://x///".into()), "http://x");
1255 }
1256
1257 #[test]
1258 fn with_token_rejects_bad_header_chars() {
1259 // Raw newline in the token would split the Authorization
1260 // header — must fail loudly at construction.
1261 let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
1262 assert!(
1263 matches!(err, SdkError::Config { .. }),
1264 "got: {err:?}"
1265 );
1266 }
1267
1268 #[test]
1269 fn with_token_rejects_empty_or_whitespace() {
1270 // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
1271 // Fail loudly at construction instead of shipping a client
1272 // that silently 401s on first request.
1273 for s in ["", " ", "\t\n ", " "] {
1274 let err = FlowFabricAdminClient::with_token("http://x", s)
1275 .unwrap_err();
1276 assert!(
1277 matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
1278 "token {s:?} should return Config with field=bearer_token; got: {err:?}"
1279 );
1280 }
1281 }
1282
1283 #[test]
1284 fn admin_error_body_deserialises_optional_fields() {
1285 // `kind` + `retryable` absent (the usual shape for 400s).
1286 let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
1287 assert_eq!(b.error, "bad new_kid");
1288 assert!(b.kind.is_none());
1289 assert!(b.retryable.is_none());
1290
1291 // `kind` + `retryable` present (500 ValkeyError shape).
1292 let b: AdminErrorBody = serde_json::from_str(
1293 r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
1294 )
1295 .unwrap();
1296 assert_eq!(b.error, "valkey: timed out");
1297 assert_eq!(b.kind.as_deref(), Some("IoError"));
1298 assert_eq!(b.retryable, Some(true));
1299 }
1300
1301 #[test]
1302 fn rotate_response_deserialises_server_shape() {
1303 // Exact shape the server emits.
1304 let raw = r#"{
1305 "rotated": 3,
1306 "failed": [4, 5],
1307 "new_kid": "kid-2026-04-18"
1308 }"#;
1309 let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
1310 assert_eq!(r.rotated, 3);
1311 assert_eq!(r.failed, vec![4, 5]);
1312 assert_eq!(r.new_kid, "kid-2026-04-18");
1313 }
1314
1315 // ── ClaimForWorkerResponse::into_grant ──
1316
1317 fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
1318 ClaimForWorkerResponse {
1319 execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
1320 partition_key: serde_json::from_str(
1321 &serde_json::to_string(partition_key).unwrap(),
1322 )
1323 .unwrap(),
1324 grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
1325 expires_at_ms: 1_700_000_000_000,
1326 }
1327 }
1328
1329 #[test]
1330 fn into_grant_preserves_all_known_partition_key_shapes() {
1331 // Post-#91: families collapse into opaque PartitionKey literals.
1332 // Flow and Execution both produce "{fp:N}"; Budget is "{b:N}";
1333 // Quota is "{q:N}". The DTO preserves the wire string as-is;
1334 // into_grant hands it opaquely to the core type.
1335 for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
1336 let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
1337 panic!("key {key_str} should parse: {e:?}")
1338 });
1339 assert_eq!(g.partition_key.as_str(), key_str);
1340 assert_eq!(g.expires_at_ms, 1_700_000_000_000);
1341 }
1342 }
1343
1344 #[test]
1345 fn into_grant_preserves_opaque_partition_key() {
1346 // The SDK does NOT eagerly parse the partition_key on the
1347 // admin boundary — malformed keys are caught at the
1348 // claim_from_grant hot path where the typed Partition is
1349 // actually needed. This test pins the opacity contract.
1350 let resp = sample_claim_response("{zz:0}");
1351 let g = resp.into_grant().expect("SDK must not parse partition_key");
1352 assert_eq!(g.partition_key.as_str(), "{zz:0}");
1353 // Parsing surfaces the error explicitly.
1354 assert!(g.partition().is_err());
1355 }
1356
1357 #[test]
1358 fn into_grant_rejects_malformed_execution_id() {
1359 let mut resp = sample_claim_response("{fp:5}");
1360 resp.execution_id = "not-a-valid-eid".to_owned();
1361 let err = resp.into_grant().unwrap_err();
1362 match err {
1363 SdkError::AdminApi { message, kind, .. } => {
1364 assert!(message.contains("malformed execution_id"),
1365 "msg: {message}");
1366 assert_eq!(kind.as_deref(), Some("malformed_response"));
1367 }
1368 other => panic!("expected AdminApi, got {other:?}"),
1369 }
1370 }
1371
1372 // ── ClaimForWorkerResponse wire shape (issue #91) ──
1373
1374 // `rotate_waitpoint_hmac_secret_all_partitions` exercise coverage
1375 // lives in `ff-test` — the integration test harness in
1376 // `crates/ff-test/tests/waitpoint_hmac_rotation_fcall.rs` and
1377 // `waitpoint_tokens.rs` calls through the function via the
1378 // `TestCluster::rotate_waitpoint_hmac_secret` fixture, which is
1379 // now a thin delegator. A pure unit test here would require a
1380 // mock `ferriskey::Client` (ferriskey's `Client` performs a live
1381 // RESP handshake on `ClientBuilder::build`, so a local TCP
1382 // listener alone isn't sufficient) — expensive to construct for
1383 // one-line iteration-count coverage.
1384
1385 #[test]
1386 fn claim_for_worker_response_deserialises_opaque_partition_key() {
1387 // Exact shape the server emits post-#91.
1388 let raw = r#"{
1389 "execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
1390 "partition_key": "{fp:7}",
1391 "grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
1392 "expires_at_ms": 1700000000000
1393 }"#;
1394 let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
1395 assert_eq!(r.partition_key.as_str(), "{fp:7}");
1396 assert_eq!(r.expires_at_ms, 1_700_000_000_000);
1397 }
1398}