Skip to main content

ff_sdk/
admin.rs

1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use ff_core::contracts::RotateWaitpointHmacSecretOutcome;
21use ff_core::engine_backend::EngineBackend;
22// v0.12 PR-6: these imports only power the Valkey-typed
23// `rotate_waitpoint_hmac_secret_all_partitions` helper at the bottom
24// of the module; gated so the ungated module builds clean under
25// `--no-default-features --features sqlite`.
26#[cfg(feature = "valkey-default")]
27use ff_core::contracts::RotateWaitpointHmacSecretArgs;
28#[cfg(feature = "valkey-default")]
29use ff_core::keys::IndexKeys;
30#[cfg(feature = "valkey-default")]
31use ff_core::partition::{Partition, PartitionFamily};
32use serde::{Deserialize, Serialize};
33
34use crate::SdkError;
35
36/// Default per-request timeout. The server's own
37/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
38/// client deadline is LATER than the server deadline and
39/// operators see the structured 504 GATEWAY_TIMEOUT body rather
40/// than a client-side timeout error.
41const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
42
43/// Grace window in ms that the embedded `rotate_waitpoint_secret`
44/// path forwards to the backend primitive. Matches `ff-server`'s
45/// default `FF_WAITPOINT_HMAC_GRACE_MS` (24 h) so tokens signed by
46/// the outgoing kid remain valid for a full day after rotation
47/// without the embedded-path hard-killing in-flight flows. HTTP
48/// callers get whatever the server was configured with; embedded
49/// callers get this pinned default.
50pub const EMBEDDED_WAITPOINT_HMAC_GRACE_MS: u64 = 86_400_000;
51
52/// Maximum grant TTL (ms) the embedded admin path accepts on
53/// `claim_for_worker` / `issue_reclaim_grant`. Matches
54/// `ff-server`'s `CLAIM_GRANT_TTL_MS_MAX` / `RECLAIM_GRANT_TTL_MS_MAX`
55/// so the embedded transport rejects the same range the HTTP
56/// transport does.
57const EMBEDDED_GRANT_TTL_MS_MAX: u64 = 60_000;
58
59/// Client for FlowFabric admin primitives — backend-agnostic facade
60/// (v0.13 SC-10 ergonomics).
61///
62/// Two construction shapes:
63///
64/// * [`FlowFabricAdminClient::new`] / [`FlowFabricAdminClient::with_token`]
65///   — HTTP transport targeting a running `ff-server`.
66/// * [`FlowFabricAdminClient::connect_with`] — embedded transport
67///   that dispatches directly through an `Arc<dyn EngineBackend>`.
68///   No `ff-server` required; works under `FF_DEV_MODE=1` + SQLite
69///   and in any in-process deployment.
70///
71/// The public method surface is identical across both transports;
72/// consumers choose at construction time. Admin methods that have
73/// no backend-trait equivalent return
74/// [`SdkError::AdminApi`] with status 503 on the embedded path —
75/// today every method on this client maps cleanly, so this fallback
76/// is only reached if a future admin primitive lands HTTP-first.
77#[derive(Debug, Clone)]
78pub struct FlowFabricAdminClient {
79    transport: AdminTransport,
80}
81
82/// Internal discriminator between the HTTP and embedded transports.
83/// Private by design — the public API is uniform across both shapes
84/// (see the [`FlowFabricAdminClient`] type-level docs).
85#[derive(Clone)]
86enum AdminTransport {
87    Http {
88        http: reqwest::Client,
89        base_url: String,
90    },
91    Embedded(Arc<dyn EngineBackend>),
92}
93
94impl std::fmt::Debug for AdminTransport {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            AdminTransport::Http { base_url, .. } => f
98                .debug_struct("Http")
99                .field("base_url", base_url)
100                .finish_non_exhaustive(),
101            AdminTransport::Embedded(backend) => f
102                .debug_struct("Embedded")
103                .field("backend", &backend.backend_label())
104                .finish(),
105        }
106    }
107}
108
109impl FlowFabricAdminClient {
110    /// Build a client without auth. Suitable for a dev ff-server
111    /// whose `api_token` is unconfigured. Production deployments
112    /// should use [`with_token`](Self::with_token).
113    pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
114        let http = reqwest::Client::builder()
115            .timeout(DEFAULT_TIMEOUT)
116            .build()
117            .map_err(|e| SdkError::Http {
118                source: e,
119                context: "build reqwest::Client".into(),
120            })?;
121        Ok(Self {
122            transport: AdminTransport::Http {
123                http,
124                base_url: normalize_base_url(base_url.into()),
125            },
126        })
127    }
128
129    /// Build a client that dispatches admin primitives directly
130    /// through an `Arc<dyn EngineBackend>`, bypassing HTTP entirely.
131    ///
132    /// # When to use
133    ///
134    /// * `FF_DEV_MODE=1` SQLite scenarios where no `ff-server` is
135    ///   running.
136    /// * In-process / embedded deployments that hold a backend
137    ///   handle already (e.g. tests, examples, scheduler-hosting
138    ///   binaries).
139    ///
140    /// # Semantic parity
141    ///
142    /// Each method on [`FlowFabricAdminClient`] dispatches to the
143    /// equivalent `EngineBackend` trait method (see the RFC-024 /
144    /// RFC-017 admin surfaces). Validation **rules** + request-body
145    /// translation mirror the server-side handler in `ff-server` so
146    /// callers get the same accept / reject behaviour across
147    /// transports. Note: the exact [`SdkError`] variant differs —
148    /// embedded-path validation rejects surface as [`SdkError::Config`]
149    /// (no HTTP round-trip) while HTTP returns [`SdkError::AdminApi`]
150    /// with status `400`. Callers that need to distinguish a 4xx from
151    /// a transport fault should use [`SdkError::is_retryable`] or
152    /// match on `Config` + `AdminApi` together rather than relying on
153    /// a single variant.
154    ///
155    /// `EngineError::Unavailable` from the backend — emitted by
156    /// backends that have not implemented a given method — is mapped
157    /// to [`SdkError::AdminApi`] with `status = 503` and
158    /// `kind = Some("unavailable")` so callers see a uniform
159    /// admin-error surface across transports.
160    ///
161    /// # Divergence from the HTTP transport
162    ///
163    /// * `rotate_waitpoint_secret` forwards
164    ///   [`EMBEDDED_WAITPOINT_HMAC_GRACE_MS`] (24 h, matching
165    ///   `ff-server`'s default `FF_WAITPOINT_HMAC_GRACE_MS`) as the
166    ///   per-partition grace window. The HTTP transport reads the
167    ///   server's env-configured value; the embedded client has no
168    ///   config surface so it pins the documented default.
169    /// * No single-writer admin semaphore, no audit-log emission.
170    ///   These are `ff-server` responsibilities; embedded consumers
171    ///   wanting them bring their own gate.
172    pub fn connect_with(backend: Arc<dyn EngineBackend>) -> Self {
173        Self {
174            transport: AdminTransport::Embedded(backend),
175        }
176    }
177
178    /// Build a client that sends `Authorization: Bearer <token>` on
179    /// every request. The token is passed by value so the caller
180    /// retains ownership policy (e.g. zeroize on drop at the
181    /// caller side); the SDK only reads it.
182    ///
183    /// # Empty-token guard
184    ///
185    /// An empty or all-whitespace `token` returns
186    /// [`SdkError::Config`] instead of silently constructing
187    /// `Authorization: Bearer ` (which the server rejects with
188    /// 401, leaving the operator chasing a "why is auth broken"
189    /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
190    /// where the var was meant to be set; the unset-expansion is
191    /// the empty string. Prefer an obvious error at construction
192    /// over a silent 401 at first request.
193    ///
194    /// If the caller genuinely wants an unauthenticated client
195    /// (dev ff-server without `api_token` configured), use
196    /// [`FlowFabricAdminClient::new`] instead.
197    pub fn with_token(
198        base_url: impl Into<String>,
199        token: impl AsRef<str>,
200    ) -> Result<Self, SdkError> {
201        let token_str = token.as_ref();
202        if token_str.trim().is_empty() {
203            return Err(SdkError::Config {
204                context: "admin_client".into(),
205                field: Some("bearer_token".into()),
206                message: "is empty or all-whitespace; use \
207                          FlowFabricAdminClient::new for unauthenticated access"
208                    .into(),
209            });
210        }
211        let mut headers = reqwest::header::HeaderMap::new();
212        let mut auth_value =
213            reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
214                |_| SdkError::Config {
215                    context: "admin_client".into(),
216                    field: Some("bearer_token".into()),
217                    message: "contains characters not valid in an HTTP header".into(),
218                },
219            )?;
220        // Mark Authorization as sensitive so it doesn't appear in
221        // reqwest's Debug output / logs.
222        auth_value.set_sensitive(true);
223        headers.insert(reqwest::header::AUTHORIZATION, auth_value);
224
225        let http = reqwest::Client::builder()
226            .timeout(DEFAULT_TIMEOUT)
227            .default_headers(headers)
228            .build()
229            .map_err(|e| SdkError::Http {
230                source: e,
231                context: "build reqwest::Client".into(),
232            })?;
233        Ok(Self {
234            transport: AdminTransport::Http {
235                http,
236                base_url: normalize_base_url(base_url.into()),
237            },
238        })
239    }
240
241    /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
242    ///
243    /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
244    /// server-side one: the request carries lane + identity +
245    /// capabilities + grant TTL; the server runs budget, quota, and
246    /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
247    /// and returns a `ClaimGrant` on success.
248    ///
249    /// Returns `Ok(None)` when the server responds 204 No Content
250    /// (no eligible execution on the lane). Callers that want to keep
251    /// polling should back off per their claim cadence.
252    pub async fn claim_for_worker(
253        &self,
254        req: ClaimForWorkerRequest,
255    ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
256        match &self.transport {
257            AdminTransport::Http { http, base_url } => {
258                claim_for_worker_http(http, base_url, req).await
259            }
260            AdminTransport::Embedded(backend) => {
261                claim_for_worker_embedded(backend.as_ref(), req).await
262            }
263        }
264    }
265
266    /// Rotate the waitpoint HMAC secret on the server.
267    ///
268    /// Promotes the currently-installed kid to `previous_kid`
269    /// (accepted for the server's configured
270    /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
271    /// `new_secret_hex` under `new_kid` as the new current. Fans
272    /// out across every execution partition. Idempotent: re-running
273    /// with the same `(new_kid, new_secret_hex)` converges.
274    ///
275    /// The server returns 200 if at least one partition rotated OR
276    /// at least one partition was already rotating under a
277    /// concurrent request. See `RotateWaitpointSecretResponse`
278    /// fields for the breakdown.
279    ///
280    /// # Errors
281    ///
282    /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
283    ///   input, 401 missing/bad bearer, 429 concurrent rotate,
284    ///   500 all partitions failed, 504 server-side timeout).
285    /// * [`SdkError::Http`] — transport error (connect, body
286    ///   decode, client-side timeout).
287    ///
288    /// # Retry semantics
289    ///
290    /// Rotation is idempotent on the same `(new_kid,
291    /// new_secret_hex)` so retries are SAFE even on 504s or
292    /// partial failures.
293    pub async fn rotate_waitpoint_secret(
294        &self,
295        req: RotateWaitpointSecretRequest,
296    ) -> Result<RotateWaitpointSecretResponse, SdkError> {
297        match &self.transport {
298            AdminTransport::Http { http, base_url } => {
299                rotate_waitpoint_secret_http(http, base_url, req).await
300            }
301            AdminTransport::Embedded(backend) => {
302                rotate_waitpoint_secret_embedded(backend.as_ref(), req).await
303            }
304        }
305    }
306
307    /// Request a lease-reclaim grant for an execution in
308    /// `lease_expired_reclaimable` or `lease_revoked` state (RFC-024
309    /// §3.5).
310    ///
311    /// Routes `POST /v1/executions/{execution_id}/reclaim`. The
312    /// ff-server handler dispatches through the `EngineBackend` trait
313    /// to whichever backend the server was started with (Valkey /
314    /// Postgres / SQLite).
315    ///
316    /// # worker_capabilities (RFC-024 §3.2 B-2)
317    ///
318    /// The request body carries `worker_capabilities`. Consumers typically
319    /// source these from their registered worker's configured
320    /// `WorkerConfig::capabilities`. Admission compares
321    /// `worker_capabilities` against the execution's
322    /// `required_capabilities` (persisted on `exec_core` at
323    /// `create_execution` time from
324    /// `ExecutionPolicy.routing_requirements.required_capabilities`);
325    /// any required capability missing from the worker set surfaces as
326    /// `IssueReclaimGrantResponse::NotReclaimable { detail:
327    /// "capability_mismatch: <missing csv>" }` (Lua
328    /// `ff_issue_reclaim_grant`, `crates/ff-script/src/flowfabric.lua`
329    /// §3969-3982; sqlite/PG backends mirror the check). The SDK does
330    /// not re-read worker state automatically — admin clients are not
331    /// bound to a worker — so the consumer threads the capabilities
332    /// through at call-time.
333    ///
334    /// `capability_hash` is NOT consulted for admission; it is stored
335    /// verbatim on the grant hash for audit / downstream observability
336    /// only.
337    ///
338    /// # Consumer flow (RFC-024 §4.4)
339    ///
340    /// 1. Consumer's `POST /v1/runs/:id/complete` returns
341    ///    `lease_expired`.
342    /// 2. Consumer calls this method; handles
343    ///    [`IssueReclaimGrantResponse::Granted`] → builds a
344    ///    [`ff_core::contracts::ReclaimGrant`] via
345    ///    [`IssueReclaimGrantResponse::into_grant`].
346    /// 3. Consumer passes the grant to
347    ///    [`crate::FlowFabricWorker::claim_from_reclaim_grant`] along
348    ///    with a fresh [`ff_core::contracts::ReclaimExecutionArgs`];
349    ///    the new attempt is minted with `HandleKind::Reclaimed`.
350    /// 4. Consumer drives terminal writes on the fresh lease.
351    ///
352    /// # Errors
353    ///
354    /// * [`SdkError::AdminApi`] — non-2xx response. 404 when the
355    ///   execution does not exist; 400 on malformed `execution_id` or
356    ///   body.
357    /// * [`SdkError::Http`] — transport error (connect, body
358    ///   decode, client-side timeout).
359    ///
360    /// # Retry semantics
361    ///
362    /// Idempotent on the Lua side: repeated calls against an execution
363    /// already re-leased (a concurrent reclaim beat this one) surface
364    /// as `NotReclaimable`. Safe to retry on transport faults.
365    pub async fn issue_reclaim_grant(
366        &self,
367        execution_id: &str,
368        req: IssueReclaimGrantRequest,
369    ) -> Result<IssueReclaimGrantResponse, SdkError> {
370        match &self.transport {
371            AdminTransport::Http { http, base_url } => {
372                issue_reclaim_grant_http(http, base_url, execution_id, req).await
373            }
374            AdminTransport::Embedded(backend) => {
375                issue_reclaim_grant_embedded(backend.as_ref(), execution_id, req).await
376            }
377        }
378    }
379}
380
381// ── HTTP-transport helpers (private) ─────────────────────────────────
382
383async fn claim_for_worker_http(
384    http: &reqwest::Client,
385    base_url: &str,
386    req: ClaimForWorkerRequest,
387) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
388    // Percent-encode `worker_id` in the URL path — `WorkerId` is a
389    // free-form string (could contain `/`, spaces, `%`, etc.) and
390    // splicing it verbatim would produce malformed URLs or
391    // misrouted paths. `Url::path_segments_mut().push` handles the
392    // encoding natively.
393    let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
394        context: "admin_client: claim_for_worker".into(),
395        field: Some("base_url".into()),
396        message: format!("invalid base_url '{}': {e}", base_url),
397    })?;
398    {
399        let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
400            context: "admin_client: claim_for_worker".into(),
401            field: Some("base_url".into()),
402            message: format!("base_url cannot be a base URL: '{}'", base_url),
403        })?;
404        segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
405    }
406    let url = url.to_string();
407    let resp = http
408        .post(&url)
409        .json(&req)
410        .send()
411        .await
412        .map_err(|e| SdkError::Http {
413            source: e,
414            context: "POST /v1/workers/{worker_id}/claim".into(),
415        })?;
416
417    let status = resp.status();
418    if status == reqwest::StatusCode::NO_CONTENT {
419        return Ok(None);
420    }
421    if status.is_success() {
422        return resp
423            .json::<ClaimForWorkerResponse>()
424            .await
425            .map(Some)
426            .map_err(|e| SdkError::Http {
427                source: e,
428                context: "decode claim_for_worker response body".into(),
429            });
430    }
431
432    // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
433    let status_u16 = status.as_u16();
434    let raw = resp.text().await.map_err(|e| SdkError::Http {
435        source: e,
436        context: format!("read claim_for_worker error body (status {status_u16})"),
437    })?;
438    let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
439    Err(SdkError::AdminApi {
440        status: status_u16,
441        message: parsed
442            .as_ref()
443            .map(|b| b.error.clone())
444            .unwrap_or_else(|| raw.clone()),
445        kind: parsed.as_ref().and_then(|b| b.kind.clone()),
446        retryable: parsed.as_ref().and_then(|b| b.retryable),
447        raw_body: raw,
448    })
449}
450
451async fn rotate_waitpoint_secret_http(
452    http: &reqwest::Client,
453    base_url: &str,
454    req: RotateWaitpointSecretRequest,
455) -> Result<RotateWaitpointSecretResponse, SdkError> {
456    let url = format!("{}/v1/admin/rotate-waitpoint-secret", base_url);
457    let resp = http
458        .post(&url)
459        .json(&req)
460        .send()
461        .await
462        .map_err(|e| SdkError::Http {
463            source: e,
464            context: "POST /v1/admin/rotate-waitpoint-secret".into(),
465        })?;
466
467    let status = resp.status();
468    if status.is_success() {
469        return resp
470            .json::<RotateWaitpointSecretResponse>()
471            .await
472            .map_err(|e| SdkError::Http {
473                source: e,
474                context: "decode rotate-waitpoint-secret response body".into(),
475            });
476    }
477
478    // Non-2xx: parse the server's ErrorBody if we can, fall
479    // back to a raw body otherwise. Propagate body-read
480    // transport errors as Http rather than silently flattening
481    // them into `AdminApi { raw_body: "" }` — a connection drop
482    // mid-body-read is a transport fault, not an API-layer
483    // reject, and misclassifying it strips `is_retryable`'s
484    // timeout/connect signal from the caller.
485    let status_u16 = status.as_u16();
486    let raw = resp.text().await.map_err(|e| SdkError::Http {
487        source: e,
488        context: format!(
489            "read rotate-waitpoint-secret error response body (status {status_u16})"
490        ),
491    })?;
492    let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
493    Err(SdkError::AdminApi {
494        status: status_u16,
495        message: parsed
496            .as_ref()
497            .map(|b| b.error.clone())
498            .unwrap_or_else(|| raw.clone()),
499        kind: parsed.as_ref().and_then(|b| b.kind.clone()),
500        retryable: parsed.as_ref().and_then(|b| b.retryable),
501        raw_body: raw,
502    })
503}
504
505async fn issue_reclaim_grant_http(
506    http: &reqwest::Client,
507    base_url: &str,
508    execution_id: &str,
509    req: IssueReclaimGrantRequest,
510) -> Result<IssueReclaimGrantResponse, SdkError> {
511    // Percent-encode `execution_id` in the URL path — the id is a
512    // free-form string and splicing verbatim would produce
513    // malformed URLs. Mirrors `claim_for_worker`'s handling.
514    let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
515        context: "admin_client: issue_reclaim_grant".into(),
516        field: Some("base_url".into()),
517        message: format!("invalid base_url '{}': {e}", base_url),
518    })?;
519    {
520        let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
521            context: "admin_client: issue_reclaim_grant".into(),
522            field: Some("base_url".into()),
523            message: format!("base_url cannot be a base URL: '{}'", base_url),
524        })?;
525        segs.extend(&["v1", "executions", execution_id, "reclaim"]);
526    }
527    let url = url.to_string();
528    let resp = http
529        .post(&url)
530        .json(&req)
531        .send()
532        .await
533        .map_err(|e| SdkError::Http {
534            source: e,
535            context: "POST /v1/executions/{id}/reclaim".into(),
536        })?;
537
538    let status = resp.status();
539    if status.is_success() {
540        return resp
541            .json::<IssueReclaimGrantResponse>()
542            .await
543            .map_err(|e| SdkError::Http {
544                source: e,
545                context: "decode issue_reclaim_grant response body".into(),
546            });
547    }
548
549    let status_u16 = status.as_u16();
550    let raw = resp.text().await.map_err(|e| SdkError::Http {
551        source: e,
552        context: format!(
553            "read issue_reclaim_grant error body (status {status_u16})"
554        ),
555    })?;
556    let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
557    Err(SdkError::AdminApi {
558        status: status_u16,
559        message: parsed
560            .as_ref()
561            .map(|b| b.error.clone())
562            .unwrap_or_else(|| raw.clone()),
563        kind: parsed.as_ref().and_then(|b| b.kind.clone()),
564        retryable: parsed.as_ref().and_then(|b| b.retryable),
565        raw_body: raw,
566    })
567}
568
569// ── Embedded-transport helpers (private) ─────────────────────────────
570//
571// Dispatch directly through the `EngineBackend` trait. The request
572// body validation mirrors the server-side handler in
573// `ff-server::api`. Translation between the wire DTOs and the core
574// `contracts::*` types lives here so consumers get identical
575// surfaces across transports.
576
577/// Validate a free-form identifier the same way `ff-server`'s
578/// `validate_identifier` does: non-empty, ≤256 bytes, no whitespace
579/// or control chars. Embedded-transport callers hit this before the
580/// request reaches the backend so invalid identifiers cannot leak
581/// past the SDK's parity guarantee.
582fn validate_admin_identifier(
583    op: &'static str,
584    field: &'static str,
585    value: &str,
586) -> Result<(), SdkError> {
587    if value.is_empty() {
588        return Err(SdkError::Config {
589            context: format!("admin_client: {op}"),
590            field: Some(field.into()),
591            message: "must not be empty".into(),
592        });
593    }
594    if value.len() > 256 {
595        return Err(SdkError::Config {
596            context: format!("admin_client: {op}"),
597            field: Some(field.into()),
598            message: format!("exceeds 256 bytes (got {})", value.len()),
599        });
600    }
601    if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
602        return Err(SdkError::Config {
603            context: format!("admin_client: {op}"),
604            field: Some(field.into()),
605            message: "must not contain whitespace or control characters".into(),
606        });
607    }
608    Ok(())
609}
610
611/// Bounded grant-TTL check mirroring `ff-server`'s
612/// `1..=CLAIM_GRANT_TTL_MS_MAX`. Shared between `claim_for_worker`
613/// and `issue_reclaim_grant` embedded paths.
614fn validate_admin_grant_ttl(op: &'static str, grant_ttl_ms: u64) -> Result<(), SdkError> {
615    if grant_ttl_ms == 0 || grant_ttl_ms > EMBEDDED_GRANT_TTL_MS_MAX {
616        return Err(SdkError::Config {
617            context: format!("admin_client: {op}"),
618            field: Some("grant_ttl_ms".into()),
619            message: format!("must be in 1..={EMBEDDED_GRANT_TTL_MS_MAX}"),
620        });
621    }
622    Ok(())
623}
624
625/// Map an `EngineError` from a backend call into the `SdkError::AdminApi`
626/// surface so embedded and HTTP transports emit the same shape. `Unavailable`
627/// becomes 503 with kind `"unavailable"`; every other engine error bubbles
628/// up via `SdkError::Engine`.
629fn engine_err_to_admin(err: ff_core::engine_error::EngineError, op: &str) -> SdkError {
630    if let ff_core::engine_error::EngineError::Unavailable { op: backend_op } = &err {
631        return SdkError::AdminApi {
632            status: 503,
633            message: format!(
634                "{op}: backend does not implement '{backend_op}' on this transport"
635            ),
636            kind: Some("unavailable".to_owned()),
637            retryable: Some(false),
638            raw_body: String::new(),
639        };
640    }
641    SdkError::Engine(Box::new(err))
642}
643
644async fn claim_for_worker_embedded(
645    backend: &dyn EngineBackend,
646    req: ClaimForWorkerRequest,
647) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
648    // Mirror ff-server's validation + translation. Errors surface as
649    // SdkError::Config so consumers see validation faults loud rather
650    // than as backend transport errors.
651    let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
652        context: "admin_client: claim_for_worker".into(),
653        field: Some("lane_id".into()),
654        message: e.to_string(),
655    })?;
656    validate_admin_identifier("claim_for_worker", "worker_id", &req.worker_id)?;
657    validate_admin_identifier(
658        "claim_for_worker",
659        "worker_instance_id",
660        &req.worker_instance_id,
661    )?;
662    validate_admin_grant_ttl("claim_for_worker", req.grant_ttl_ms)?;
663    let caps: std::collections::BTreeSet<String> = req.capabilities.into_iter().collect();
664    let args = ff_core::contracts::ClaimForWorkerArgs::new(
665        lane_id,
666        ff_core::types::WorkerId::new(req.worker_id),
667        ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
668        caps,
669        req.grant_ttl_ms,
670    );
671    match backend
672        .claim_for_worker(args)
673        .await
674        .map_err(|e| engine_err_to_admin(e, "claim_for_worker"))?
675    {
676        ff_core::contracts::ClaimForWorkerOutcome::NoWork => Ok(None),
677        ff_core::contracts::ClaimForWorkerOutcome::Granted(grant) => {
678            Ok(Some(ClaimForWorkerResponse {
679                execution_id: grant.execution_id.to_string(),
680                partition_key: grant.partition_key,
681                grant_key: grant.grant_key,
682                expires_at_ms: grant.expires_at_ms,
683            }))
684        }
685        // `ClaimForWorkerOutcome` is `#[non_exhaustive]`; mirror
686        // ff-server's 503 policy on unknown variants.
687        _ => Err(SdkError::AdminApi {
688            status: 503,
689            message: "claim_for_worker: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
690            kind: Some("unknown_outcome".to_owned()),
691            retryable: Some(false),
692            raw_body: String::new(),
693        }),
694    }
695}
696
697async fn issue_reclaim_grant_embedded(
698    backend: &dyn EngineBackend,
699    execution_id: &str,
700    req: IssueReclaimGrantRequest,
701) -> Result<IssueReclaimGrantResponse, SdkError> {
702    let exec_id = ff_core::types::ExecutionId::parse(execution_id).map_err(|e| SdkError::Config {
703        context: "admin_client: issue_reclaim_grant".into(),
704        field: Some("execution_id".into()),
705        message: e.to_string(),
706    })?;
707    let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
708        context: "admin_client: issue_reclaim_grant".into(),
709        field: Some("lane_id".into()),
710        message: e.to_string(),
711    })?;
712    validate_admin_identifier("issue_reclaim_grant", "worker_id", &req.worker_id)?;
713    validate_admin_identifier(
714        "issue_reclaim_grant",
715        "worker_instance_id",
716        &req.worker_instance_id,
717    )?;
718    validate_admin_grant_ttl("issue_reclaim_grant", req.grant_ttl_ms)?;
719    let caps: std::collections::BTreeSet<String> = req.worker_capabilities.into_iter().collect();
720    let args = ff_core::contracts::IssueReclaimGrantArgs::new(
721        exec_id,
722        ff_core::types::WorkerId::new(req.worker_id),
723        ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
724        lane_id,
725        req.capability_hash,
726        req.grant_ttl_ms,
727        req.route_snapshot_json,
728        req.admission_summary,
729        caps,
730        ff_core::types::TimestampMs::now(),
731    );
732    match backend
733        .issue_reclaim_grant(args)
734        .await
735        .map_err(|e| engine_err_to_admin(e, "issue_reclaim_grant"))?
736    {
737        ff_core::contracts::IssueReclaimGrantOutcome::Granted(grant) => {
738            Ok(IssueReclaimGrantResponse::Granted {
739                execution_id: grant.execution_id.to_string(),
740                partition_key: grant.partition_key,
741                grant_key: grant.grant_key,
742                expires_at_ms: grant.expires_at_ms,
743                lane_id: grant.lane_id.as_str().to_owned(),
744            })
745        }
746        ff_core::contracts::IssueReclaimGrantOutcome::NotReclaimable { execution_id, detail } => {
747            Ok(IssueReclaimGrantResponse::NotReclaimable {
748                execution_id: execution_id.to_string(),
749                detail,
750            })
751        }
752        ff_core::contracts::IssueReclaimGrantOutcome::ReclaimCapExceeded {
753            execution_id,
754            reclaim_count,
755        } => Ok(IssueReclaimGrantResponse::ReclaimCapExceeded {
756            execution_id: execution_id.to_string(),
757            reclaim_count,
758        }),
759        _ => Err(SdkError::AdminApi {
760            status: 503,
761            message: "issue_reclaim_grant: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
762            kind: Some("unknown_outcome".to_owned()),
763            retryable: Some(false),
764            raw_body: String::new(),
765        }),
766    }
767}
768
769async fn rotate_waitpoint_secret_embedded(
770    backend: &dyn EngineBackend,
771    req: RotateWaitpointSecretRequest,
772) -> Result<RotateWaitpointSecretResponse, SdkError> {
773    // Validation mirrors `ff-server::Server::rotate_waitpoint_secret`
774    // so the embedded and HTTP transports reject the same invalid
775    // inputs.
776    if req.new_kid.is_empty() || req.new_kid.contains(':') {
777        return Err(SdkError::Config {
778            context: "admin_client: rotate_waitpoint_secret".into(),
779            field: Some("new_kid".into()),
780            message: "must be non-empty and must not contain ':'".into(),
781        });
782    }
783    if req.new_secret_hex.is_empty()
784        || !req.new_secret_hex.len().is_multiple_of(2)
785        || !req.new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
786    {
787        return Err(SdkError::Config {
788            context: "admin_client: rotate_waitpoint_secret".into(),
789            field: Some("new_secret_hex".into()),
790            message: "must be a non-empty even-length hex string".into(),
791        });
792    }
793    // Embedded consumers have no config surface from which to read
794    // the per-deployment grace window, so pin the documented default
795    // matching `ff-server`'s `FF_WAITPOINT_HMAC_GRACE_MS` (24 h). See
796    // `EMBEDDED_WAITPOINT_HMAC_GRACE_MS` + the `connect_with` rustdoc
797    // for the rationale and divergence-from-HTTP notes. Unlike the
798    // HTTP handler, this path does not enforce the 120 s end-to-end
799    // timeout (no HTTP deadline to honour) and does not emit the
800    // `audit`-target `waitpoint_hmac_rotation_*` events (those are
801    // server-owned operator signals). Rotation is idempotent on the
802    // same (new_kid, new_secret_hex) pair, so retries remain safe.
803    let args = ff_core::contracts::RotateWaitpointHmacSecretAllArgs::new(
804        req.new_kid.clone(),
805        req.new_secret_hex,
806        EMBEDDED_WAITPOINT_HMAC_GRACE_MS,
807    );
808    let result = backend
809        .rotate_waitpoint_hmac_secret_all(args)
810        .await
811        .map_err(|e| engine_err_to_admin(e, "rotate_waitpoint_secret"))?;
812
813    // Collapse the per-partition entries into the HTTP response shape
814    // the server emits — rotated count + failed indices + echoed
815    // new_kid — so consumers see identical return values across
816    // transports.
817    let mut rotated: u16 = 0;
818    let mut failed: Vec<u16> = Vec::new();
819    for entry in &result.entries {
820        match &entry.result {
821            Ok(_) => {
822                rotated = rotated.saturating_add(1);
823            }
824            Err(_) => failed.push(entry.partition),
825        }
826    }
827    Ok(RotateWaitpointSecretResponse {
828        rotated,
829        failed,
830        new_kid: req.new_kid,
831    })
832}
833
834/// Request body for `POST /v1/executions/{execution_id}/reclaim`
835/// (RFC-024 §3.5).
836///
837/// Mirrors `ff_server::api::IssueReclaimGrantBody` 1:1. The
838/// `execution_id` goes in the URL path, not the body.
839#[derive(Debug, Clone, Serialize)]
840pub struct IssueReclaimGrantRequest {
841    /// Worker identity requesting the grant. The Lua
842    /// `ff_reclaim_execution` validates grant consumption via
843    /// `grant.worker_id == args.worker_id` (RFC-024 §4.4) — the
844    /// worker consuming the grant must match this value.
845    pub worker_id: String,
846    /// Worker-instance identity. Informational at grant-issuance
847    /// time; stored on the grant so consumers can correlate events.
848    pub worker_instance_id: String,
849    /// Lane the execution belongs to. Needed by
850    /// `ff_issue_reclaim_grant` for `KEYS[*]` construction.
851    pub lane_id: String,
852    /// Opaque capability-hash token stored verbatim on the issued
853    /// grant for audit / downstream observability. NOT used for
854    /// admission — admission compares `worker_capabilities` against
855    /// the execution's `required_capabilities` (see the
856    /// [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc).
857    /// `None` leaves the field empty on the grant.
858    #[serde(default, skip_serializing_if = "Option::is_none")]
859    pub capability_hash: Option<String>,
860    /// Grant TTL in milliseconds. Bounded server-side.
861    pub grant_ttl_ms: u64,
862    /// Route snapshot JSON carried onto the grant for audit.
863    #[serde(default, skip_serializing_if = "Option::is_none")]
864    pub route_snapshot_json: Option<String>,
865    /// Admission summary string carried onto the grant for audit.
866    #[serde(default, skip_serializing_if = "Option::is_none")]
867    pub admission_summary: Option<String>,
868    /// Worker capability tokens. Consumers typically source these
869    /// from their registered worker's `WorkerConfig::capabilities`
870    /// (see [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc
871    /// for the override contract).
872    #[serde(default)]
873    pub worker_capabilities: Vec<String>,
874}
875
876/// Response body for `POST /v1/executions/{execution_id}/reclaim`
877/// (RFC-024 §3.5).
878///
879/// The server serializes this struct with a `status` discriminator so
880/// consumers can match on structured outcomes without re-parsing a
881/// 200-vs-4xx split for business-logic outcomes (mirrors
882/// `RotateWaitpointSecretResponse`'s precedent).
883#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
884#[serde(tag = "status", rename_all = "snake_case")]
885pub enum IssueReclaimGrantResponse {
886    /// Grant issued. Build a
887    /// [`ff_core::contracts::ReclaimGrant`] via
888    /// [`Self::into_grant`] and feed it to
889    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
890    Granted {
891        execution_id: String,
892        partition_key: ff_core::partition::PartitionKey,
893        grant_key: String,
894        expires_at_ms: u64,
895        lane_id: String,
896    },
897    /// Execution is not in a reclaimable state (not
898    /// `lease_expired_reclaimable` / `lease_revoked`).
899    NotReclaimable {
900        execution_id: String,
901        detail: String,
902    },
903    /// `max_reclaim_count` exceeded; execution transitioned to
904    /// terminal_failed. Consumers stop retrying and surface a
905    /// structural failure.
906    ReclaimCapExceeded {
907        execution_id: String,
908        reclaim_count: u32,
909    },
910}
911
912impl IssueReclaimGrantResponse {
913    /// Convert a [`Self::Granted`] response into a typed
914    /// [`ff_core::contracts::ReclaimGrant`] for handoff to
915    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
916    ///
917    /// Returns [`SdkError::AdminApi`] when the wire variant is not
918    /// `Granted` (consumer asked for a grant but the server replied
919    /// with a terminal outcome) or when `execution_id` / `lane_id`
920    /// are malformed — the latter signals a drift between server and
921    /// SDK, so failing loud prevents silent misrouting.
922    pub fn into_grant(self) -> Result<ff_core::contracts::ReclaimGrant, SdkError> {
923        match self {
924            IssueReclaimGrantResponse::Granted {
925                execution_id,
926                partition_key,
927                grant_key,
928                expires_at_ms,
929                lane_id,
930            } => {
931                let eid = ff_core::types::ExecutionId::parse(&execution_id)
932                    .map_err(|e| SdkError::AdminApi {
933                        status: 200,
934                        message: format!(
935                            "issue_reclaim_grant: server returned malformed execution_id '{execution_id}': {e}"
936                        ),
937                        kind: Some("malformed_response".to_owned()),
938                        retryable: Some(false),
939                        raw_body: String::new(),
940                    })?;
941                let lane = ff_core::types::LaneId::try_new(lane_id.clone())
942                    .map_err(|e| SdkError::AdminApi {
943                        status: 200,
944                        message: format!(
945                            "issue_reclaim_grant: server returned malformed lane_id '{lane_id}': {e}"
946                        ),
947                        kind: Some("malformed_response".to_owned()),
948                        retryable: Some(false),
949                        raw_body: String::new(),
950                    })?;
951                Ok(ff_core::contracts::ReclaimGrant::new(
952                    eid,
953                    partition_key,
954                    grant_key,
955                    expires_at_ms,
956                    lane,
957                ))
958            }
959            IssueReclaimGrantResponse::NotReclaimable { execution_id, detail } => {
960                Err(SdkError::AdminApi {
961                    status: 200,
962                    message: format!(
963                        "issue_reclaim_grant: execution '{execution_id}' not reclaimable: {detail}"
964                    ),
965                    kind: Some("not_reclaimable".to_owned()),
966                    retryable: Some(false),
967                    raw_body: String::new(),
968                })
969            }
970            IssueReclaimGrantResponse::ReclaimCapExceeded {
971                execution_id,
972                reclaim_count,
973            } => Err(SdkError::AdminApi {
974                status: 200,
975                message: format!(
976                    "issue_reclaim_grant: execution '{execution_id}' hit reclaim cap ({reclaim_count})"
977                ),
978                kind: Some("reclaim_cap_exceeded".to_owned()),
979                retryable: Some(false),
980                raw_body: String::new(),
981            }),
982        }
983    }
984}
985
986/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
987///
988/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
989#[derive(Debug, Clone, Serialize)]
990pub struct RotateWaitpointSecretRequest {
991    /// New key identifier. Non-empty, must not contain `:` (the
992    /// server uses `:` as the field separator in the secret hash).
993    pub new_kid: String,
994    /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
995    pub new_secret_hex: String,
996}
997
998/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
999///
1000/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
1001/// The server serializes this struct as-is via `Json(result)`.
1002#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1003pub struct RotateWaitpointSecretResponse {
1004    /// Count of partitions that accepted the rotation.
1005    pub rotated: u16,
1006    /// Partition indices where the rotation failed — operator
1007    /// should investigate. Rotation is idempotent on the same
1008    /// `(new_kid, new_secret_hex)` so a retry after the underlying
1009    /// fault clears converges.
1010    pub failed: Vec<u16>,
1011    /// The `new_kid` that was installed as current on every
1012    /// rotated partition — echoes the request field back for
1013    /// confirmation.
1014    pub new_kid: String,
1015}
1016
1017/// Server-side error body shape, as emitted by
1018/// `ff_server::api::ErrorBody`. Kept internal because consumers
1019/// match on the flattened fields of [`SdkError::AdminApi`].
1020#[derive(Debug, Clone, Deserialize)]
1021struct AdminErrorBody {
1022    error: String,
1023    #[serde(default)]
1024    kind: Option<String>,
1025    #[serde(default)]
1026    retryable: Option<bool>,
1027}
1028
1029/// Request body for `POST /v1/workers/{worker_id}/claim`.
1030///
1031/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
1032/// goes in the URL path (not the body) but is kept on the struct
1033/// for ergonomics — callers don't juggle a separate arg.
1034#[derive(Debug, Clone, Serialize)]
1035pub struct ClaimForWorkerRequest {
1036    #[serde(skip)]
1037    pub worker_id: String,
1038    pub lane_id: String,
1039    pub worker_instance_id: String,
1040    #[serde(default)]
1041    pub capabilities: Vec<String>,
1042    /// Grant TTL in milliseconds. Server rejects 0 or anything over
1043    /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
1044    pub grant_ttl_ms: u64,
1045}
1046
1047/// Response body for `POST /v1/workers/{worker_id}/claim`.
1048///
1049/// Wire shape of `ff_core::contracts::ClaimGrant`. Carries the opaque
1050/// [`ff_core::partition::PartitionKey`] directly on the wire (issue
1051/// #91); the SDK reconstructs the core type via [`Self::into_grant`].
1052#[derive(Debug, Clone, Deserialize)]
1053pub struct ClaimForWorkerResponse {
1054    pub execution_id: String,
1055    pub partition_key: ff_core::partition::PartitionKey,
1056    pub grant_key: String,
1057    pub expires_at_ms: u64,
1058}
1059
1060impl ClaimForWorkerResponse {
1061    /// Convert the wire DTO into a typed
1062    /// [`ff_core::contracts::ClaimGrant`] for handoff to
1063    /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
1064    /// [`SdkError::AdminApi`] on malformed execution_id — a drift
1065    /// signal that the server and SDK disagree on the wire shape, so
1066    /// failing loud prevents routing to a ghost partition.
1067    ///
1068    /// The `partition_key` itself is not eagerly parsed here: it is
1069    /// carried opaquely to the `claim_from_grant` hot path, which
1070    /// parses it there and surfaces a typed error on malformed keys.
1071    pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
1072        let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
1073            .map_err(|e| SdkError::AdminApi {
1074                status: 200,
1075                message: format!(
1076                    "claim_for_worker: server returned malformed execution_id '{}': {e}",
1077                    self.execution_id
1078                ),
1079                kind: Some("malformed_response".to_owned()),
1080                retryable: Some(false),
1081                raw_body: String::new(),
1082            })?;
1083        Ok(ff_core::contracts::ClaimGrant::new(
1084            execution_id,
1085            self.partition_key,
1086            self.grant_key,
1087            self.expires_at_ms,
1088        ))
1089    }
1090}
1091
1092/// Per-partition outcome of a cluster-wide waitpoint HMAC secret
1093/// rotation. Returned by [`rotate_waitpoint_hmac_secret_all_partitions`]
1094/// so operators can audit which partitions rotated vs. no-op'd vs. failed.
1095///
1096/// The index is the execution-partition index (`0..num_partitions`),
1097/// matching `{fp:N}` in the keyspace.
1098#[derive(Debug)]
1099pub struct PartitionRotationOutcome {
1100    /// Execution partition index (`0..num_partitions`).
1101    pub partition: u16,
1102    /// FCALL outcome on this partition, or the error it raised.
1103    pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
1104}
1105
1106/// Rotate the waitpoint HMAC secret across every execution partition
1107/// by fanning out the `ff_rotate_waitpoint_hmac_secret` FCALL.
1108///
1109/// This is the canonical Rust-side rotation path for direct-Valkey
1110/// consumers (e.g. cairn-fabric) that cannot route through the
1111/// `ff-server` admin REST endpoint. Callers who have an HTTP-reachable
1112/// `ff-server` should prefer [`FlowFabricAdminClient::rotate_waitpoint_secret`] —
1113/// that path adds a single-writer admission gate, parallel fan-out,
1114/// structured audit events, and the server's configured grace window.
1115///
1116/// # Production rotation recipe
1117///
1118/// Operators MUST coordinate so secret rotation **precedes** any
1119/// waitpoint resolution that will present the new `kid`. The broad
1120/// sequence:
1121///
1122/// 1. Pick a fresh `new_kid` (must NOT contain `:` — the server uses
1123///    `:` as the field separator in the secret hash).
1124/// 2. Call this helper with the previous `kid`'s grace window
1125///    (`grace_ms` — the duration during which tokens signed by the
1126///    outgoing secret remain valid).
1127/// 3. Only after this call returns with all partitions `Ok(_)` (either
1128///    `Rotated` or `Noop`), begin signing new tokens with `new_kid`.
1129/// 4. Retain the previous secret in the keystore until the grace
1130///    window elapses — the FCALL handles GC of expired kids on every
1131///    rotation, so just don't rotate again before the grace window.
1132///
1133/// See RFC-004 §rotation for the full 4-key HSET + `previous_expires_at`
1134/// dance the FCALL implements server-side.
1135///
1136/// # Idempotency
1137///
1138/// Each partition FCALL is idempotent on the same `(new_kid,
1139/// new_secret_hex)` pair: a replay with identical args returns
1140/// `RotateWaitpointHmacSecretOutcome::Noop`. A same-kid-different-secret
1141/// replay surfaces as a per-partition `SdkError` (wrapping
1142/// `ScriptError::RotationConflict`) — pick a fresh `new_kid` to recover.
1143///
1144/// # Error semantics
1145///
1146/// A per-partition FCALL failure (transport fault, rotation conflict,
1147/// etc.) is recorded on that partition's [`PartitionRotationOutcome`]
1148/// and fan-out **continues** — the contract matches the server's
1149/// `rotate_waitpoint_secret` (partial success is allowed, operators
1150/// retry on the failed partition subset). Returning `Vec<_>` (not
1151/// `Result<Vec<_>, _>`) is deliberate: every whole-call invariant is
1152/// enforced by the underlying FCALL on each partition (kid non-empty,
1153/// no `:`, even-length hex, etc.), so the aggregate has nothing left
1154/// to reject at the Rust boundary. Callers decide how to treat partial
1155/// failures (fail loud / retry the subset / record metrics).
1156///
1157/// # Concurrency + performance
1158///
1159/// Sequential (one partition at a time) to keep the helper dependency-
1160/// free: no `futures::stream` / tokio-specific primitives on the caller
1161/// path. For a cluster with N partitions and per-partition RTT R, the
1162/// total duration is ~N*R. Consumers needing parallel fan-out should
1163/// wrap this with `FuturesUnordered` themselves, or use the server
1164/// admin endpoint (which fans out with bounded concurrency = 16).
1165///
1166/// # Test harness
1167///
1168/// The `ff-test::fixtures::TestCluster::rotate_waitpoint_hmac_secret`
1169/// method is a thin wrapper around this helper — integration tests and
1170/// production code exercise the same code path.
1171///
1172/// # Example
1173///
1174/// ```rust,ignore
1175/// use ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions;
1176///
1177/// let results = rotate_waitpoint_hmac_secret_all_partitions(
1178///     &client,
1179///     partition_config.num_flow_partitions,
1180///     "kid-2026-04-22",
1181///     "deadbeef...64-hex-chars...",
1182///     60_000,
1183/// )
1184/// .await?;
1185///
1186/// for entry in &results {
1187///     match &entry.result {
1188///         Ok(outcome) => tracing::info!(partition = entry.partition, ?outcome, "rotated"),
1189///         Err(e) => tracing::error!(partition = entry.partition, %e, "rotation failed"),
1190///     }
1191/// }
1192/// ```
1193// v0.12 PR-6: the `admin` module is ungated at module level so
1194// consumers under `--no-default-features --features sqlite` can reach
1195// the HTTP admin client surface. This helper is the one remaining
1196// Valkey-typed item in the module (takes a `&ferriskey::Client` and
1197// fans out `ff_rotate_waitpoint_hmac_secret` FCALLs), so it stays
1198// `valkey-default`-gated. See `lib.rs` PR-6 comment for the Option 1
1199// / Option 2 decision.
1200#[cfg(feature = "valkey-default")]
1201pub async fn rotate_waitpoint_hmac_secret_all_partitions(
1202    client: &ferriskey::Client,
1203    num_partitions: u16,
1204    new_kid: &str,
1205    new_secret_hex: &str,
1206    grace_ms: u64,
1207) -> Vec<PartitionRotationOutcome> {
1208    // Hoisted out of the loop — `ff_rotate_waitpoint_hmac_secret` only
1209    // borrows the args, so every partition can reuse the same struct.
1210    // Avoids N × 2 string clones on the hot fan-out path.
1211    let args = RotateWaitpointHmacSecretArgs {
1212        new_kid: new_kid.to_owned(),
1213        new_secret_hex: new_secret_hex.to_owned(),
1214        grace_ms,
1215    };
1216    let mut out = Vec::with_capacity(num_partitions as usize);
1217    for index in 0..num_partitions {
1218        let partition = Partition {
1219            family: PartitionFamily::Execution,
1220            index,
1221        };
1222        let idx = IndexKeys::new(&partition);
1223        let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
1224            client, &idx, &args,
1225        )
1226        .await
1227        .map_err(SdkError::from);
1228        out.push(PartitionRotationOutcome {
1229            partition: index,
1230            result,
1231        });
1232    }
1233    out
1234}
1235
1236/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
1237/// never produces `https://host//v1/...`. Mirror of
1238/// media-pipeline's pattern.
1239fn normalize_base_url(mut url: String) -> String {
1240    while url.ends_with('/') {
1241        url.pop();
1242    }
1243    url
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248    use super::*;
1249
1250    #[test]
1251    fn base_url_strips_trailing_slash() {
1252        assert_eq!(normalize_base_url("http://x".into()), "http://x");
1253        assert_eq!(normalize_base_url("http://x/".into()), "http://x");
1254        assert_eq!(normalize_base_url("http://x///".into()), "http://x");
1255    }
1256
1257    #[test]
1258    fn with_token_rejects_bad_header_chars() {
1259        // Raw newline in the token would split the Authorization
1260        // header — must fail loudly at construction.
1261        let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
1262        assert!(
1263            matches!(err, SdkError::Config { .. }),
1264            "got: {err:?}"
1265        );
1266    }
1267
1268    #[test]
1269    fn with_token_rejects_empty_or_whitespace() {
1270        // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
1271        // Fail loudly at construction instead of shipping a client
1272        // that silently 401s on first request.
1273        for s in ["", " ", "\t\n ", "   "] {
1274            let err = FlowFabricAdminClient::with_token("http://x", s)
1275                .unwrap_err();
1276            assert!(
1277                matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
1278                "token {s:?} should return Config with field=bearer_token; got: {err:?}"
1279            );
1280        }
1281    }
1282
1283    #[test]
1284    fn admin_error_body_deserialises_optional_fields() {
1285        // `kind` + `retryable` absent (the usual shape for 400s).
1286        let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
1287        assert_eq!(b.error, "bad new_kid");
1288        assert!(b.kind.is_none());
1289        assert!(b.retryable.is_none());
1290
1291        // `kind` + `retryable` present (500 ValkeyError shape).
1292        let b: AdminErrorBody = serde_json::from_str(
1293            r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
1294        )
1295        .unwrap();
1296        assert_eq!(b.error, "valkey: timed out");
1297        assert_eq!(b.kind.as_deref(), Some("IoError"));
1298        assert_eq!(b.retryable, Some(true));
1299    }
1300
1301    #[test]
1302    fn rotate_response_deserialises_server_shape() {
1303        // Exact shape the server emits.
1304        let raw = r#"{
1305            "rotated": 3,
1306            "failed": [4, 5],
1307            "new_kid": "kid-2026-04-18"
1308        }"#;
1309        let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
1310        assert_eq!(r.rotated, 3);
1311        assert_eq!(r.failed, vec![4, 5]);
1312        assert_eq!(r.new_kid, "kid-2026-04-18");
1313    }
1314
1315    // ── ClaimForWorkerResponse::into_grant ──
1316
1317    fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
1318        ClaimForWorkerResponse {
1319            execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
1320            partition_key: serde_json::from_str(
1321                &serde_json::to_string(partition_key).unwrap(),
1322            )
1323            .unwrap(),
1324            grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
1325            expires_at_ms: 1_700_000_000_000,
1326        }
1327    }
1328
1329    #[test]
1330    fn into_grant_preserves_all_known_partition_key_shapes() {
1331        // Post-#91: families collapse into opaque PartitionKey literals.
1332        // Flow and Execution both produce "{fp:N}"; Budget is "{b:N}";
1333        // Quota is "{q:N}". The DTO preserves the wire string as-is;
1334        // into_grant hands it opaquely to the core type.
1335        for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
1336            let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
1337                panic!("key {key_str} should parse: {e:?}")
1338            });
1339            assert_eq!(g.partition_key.as_str(), key_str);
1340            assert_eq!(g.expires_at_ms, 1_700_000_000_000);
1341        }
1342    }
1343
1344    #[test]
1345    fn into_grant_preserves_opaque_partition_key() {
1346        // The SDK does NOT eagerly parse the partition_key on the
1347        // admin boundary — malformed keys are caught at the
1348        // claim_from_grant hot path where the typed Partition is
1349        // actually needed. This test pins the opacity contract.
1350        let resp = sample_claim_response("{zz:0}");
1351        let g = resp.into_grant().expect("SDK must not parse partition_key");
1352        assert_eq!(g.partition_key.as_str(), "{zz:0}");
1353        // Parsing surfaces the error explicitly.
1354        assert!(g.partition().is_err());
1355    }
1356
1357    #[test]
1358    fn into_grant_rejects_malformed_execution_id() {
1359        let mut resp = sample_claim_response("{fp:5}");
1360        resp.execution_id = "not-a-valid-eid".to_owned();
1361        let err = resp.into_grant().unwrap_err();
1362        match err {
1363            SdkError::AdminApi { message, kind, .. } => {
1364                assert!(message.contains("malformed execution_id"),
1365                    "msg: {message}");
1366                assert_eq!(kind.as_deref(), Some("malformed_response"));
1367            }
1368            other => panic!("expected AdminApi, got {other:?}"),
1369        }
1370    }
1371
1372    // ── ClaimForWorkerResponse wire shape (issue #91) ──
1373
1374    // `rotate_waitpoint_hmac_secret_all_partitions` exercise coverage
1375    // lives in `ff-test` — the integration test harness in
1376    // `crates/ff-test/tests/waitpoint_hmac_rotation_fcall.rs` and
1377    // `waitpoint_tokens.rs` calls through the function via the
1378    // `TestCluster::rotate_waitpoint_hmac_secret` fixture, which is
1379    // now a thin delegator. A pure unit test here would require a
1380    // mock `ferriskey::Client` (ferriskey's `Client` performs a live
1381    // RESP handshake on `ClientBuilder::build`, so a local TCP
1382    // listener alone isn't sufficient) — expensive to construct for
1383    // one-line iteration-count coverage.
1384
1385    #[test]
1386    fn claim_for_worker_response_deserialises_opaque_partition_key() {
1387        // Exact shape the server emits post-#91.
1388        let raw = r#"{
1389            "execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
1390            "partition_key": "{fp:7}",
1391            "grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
1392            "expires_at_ms": 1700000000000
1393        }"#;
1394        let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
1395        assert_eq!(r.partition_key.as_str(), "{fp:7}");
1396        assert_eq!(r.expires_at_ms, 1_700_000_000_000);
1397    }
1398}