Skip to main content

ff_sdk/
admin.rs

1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::time::Duration;
18
19use ff_core::contracts::{RotateWaitpointHmacSecretArgs, RotateWaitpointHmacSecretOutcome};
20use ff_core::keys::IndexKeys;
21use ff_core::partition::{Partition, PartitionFamily};
22use serde::{Deserialize, Serialize};
23
24use crate::SdkError;
25
26/// Default per-request timeout. The server's own
27/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
28/// client deadline is LATER than the server deadline and
29/// operators see the structured 504 GATEWAY_TIMEOUT body rather
30/// than a client-side timeout error.
31const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
32
33/// Client for the `ff-server` admin REST surface.
34///
35/// Construct via [`FlowFabricAdminClient::new`] (no auth) or
36/// [`FlowFabricAdminClient::with_token`] (Bearer auth). Both
37/// return a ready-to-use client backed by a single pooled
38/// `reqwest::Client` — reuse the instance across calls instead of
39/// building one per request.
40#[derive(Debug, Clone)]
41pub struct FlowFabricAdminClient {
42    http: reqwest::Client,
43    base_url: String,
44}
45
46impl FlowFabricAdminClient {
47    /// Build a client without auth. Suitable for a dev ff-server
48    /// whose `api_token` is unconfigured. Production deployments
49    /// should use [`with_token`](Self::with_token).
50    pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
51        let http = reqwest::Client::builder()
52            .timeout(DEFAULT_TIMEOUT)
53            .build()
54            .map_err(|e| SdkError::Http {
55                source: e,
56                context: "build reqwest::Client".into(),
57            })?;
58        Ok(Self {
59            http,
60            base_url: normalize_base_url(base_url.into()),
61        })
62    }
63
64    /// Build a client that sends `Authorization: Bearer <token>` on
65    /// every request. The token is passed by value so the caller
66    /// retains ownership policy (e.g. zeroize on drop at the
67    /// caller side); the SDK only reads it.
68    ///
69    /// # Empty-token guard
70    ///
71    /// An empty or all-whitespace `token` returns
72    /// [`SdkError::Config`] instead of silently constructing
73    /// `Authorization: Bearer ` (which the server rejects with
74    /// 401, leaving the operator chasing a "why is auth broken"
75    /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
76    /// where the var was meant to be set; the unset-expansion is
77    /// the empty string. Prefer an obvious error at construction
78    /// over a silent 401 at first request.
79    ///
80    /// If the caller genuinely wants an unauthenticated client
81    /// (dev ff-server without `api_token` configured), use
82    /// [`FlowFabricAdminClient::new`] instead.
83    pub fn with_token(
84        base_url: impl Into<String>,
85        token: impl AsRef<str>,
86    ) -> Result<Self, SdkError> {
87        let token_str = token.as_ref();
88        if token_str.trim().is_empty() {
89            return Err(SdkError::Config {
90                context: "admin_client".into(),
91                field: Some("bearer_token".into()),
92                message: "is empty or all-whitespace; use \
93                          FlowFabricAdminClient::new for unauthenticated access"
94                    .into(),
95            });
96        }
97        let mut headers = reqwest::header::HeaderMap::new();
98        let mut auth_value =
99            reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
100                |_| SdkError::Config {
101                    context: "admin_client".into(),
102                    field: Some("bearer_token".into()),
103                    message: "contains characters not valid in an HTTP header".into(),
104                },
105            )?;
106        // Mark Authorization as sensitive so it doesn't appear in
107        // reqwest's Debug output / logs.
108        auth_value.set_sensitive(true);
109        headers.insert(reqwest::header::AUTHORIZATION, auth_value);
110
111        let http = reqwest::Client::builder()
112            .timeout(DEFAULT_TIMEOUT)
113            .default_headers(headers)
114            .build()
115            .map_err(|e| SdkError::Http {
116                source: e,
117                context: "build reqwest::Client".into(),
118            })?;
119        Ok(Self {
120            http,
121            base_url: normalize_base_url(base_url.into()),
122        })
123    }
124
125    /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
126    ///
127    /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
128    /// server-side one: the request carries lane + identity +
129    /// capabilities + grant TTL; the server runs budget, quota, and
130    /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
131    /// and returns a `ClaimGrant` on success.
132    ///
133    /// Returns `Ok(None)` when the server responds 204 No Content
134    /// (no eligible execution on the lane). Callers that want to keep
135    /// polling should back off per their claim cadence.
136    pub async fn claim_for_worker(
137        &self,
138        req: ClaimForWorkerRequest,
139    ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
140        // Percent-encode `worker_id` in the URL path — `WorkerId` is a
141        // free-form string (could contain `/`, spaces, `%`, etc.) and
142        // splicing it verbatim would produce malformed URLs or
143        // misrouted paths. `Url::path_segments_mut().push` handles the
144        // encoding natively.
145        let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| SdkError::Config {
146            context: "admin_client: claim_for_worker".into(),
147            field: Some("base_url".into()),
148            message: format!("invalid base_url '{}': {e}", self.base_url),
149        })?;
150        {
151            let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
152                context: "admin_client: claim_for_worker".into(),
153                field: Some("base_url".into()),
154                message: format!("base_url cannot be a base URL: '{}'", self.base_url),
155            })?;
156            segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
157        }
158        let url = url.to_string();
159        let resp = self
160            .http
161            .post(&url)
162            .json(&req)
163            .send()
164            .await
165            .map_err(|e| SdkError::Http {
166                source: e,
167                context: "POST /v1/workers/{worker_id}/claim".into(),
168            })?;
169
170        let status = resp.status();
171        if status == reqwest::StatusCode::NO_CONTENT {
172            return Ok(None);
173        }
174        if status.is_success() {
175            return resp
176                .json::<ClaimForWorkerResponse>()
177                .await
178                .map(Some)
179                .map_err(|e| SdkError::Http {
180                    source: e,
181                    context: "decode claim_for_worker response body".into(),
182                });
183        }
184
185        // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
186        let status_u16 = status.as_u16();
187        let raw = resp.text().await.map_err(|e| SdkError::Http {
188            source: e,
189            context: format!("read claim_for_worker error body (status {status_u16})"),
190        })?;
191        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
192        Err(SdkError::AdminApi {
193            status: status_u16,
194            message: parsed
195                .as_ref()
196                .map(|b| b.error.clone())
197                .unwrap_or_else(|| raw.clone()),
198            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
199            retryable: parsed.as_ref().and_then(|b| b.retryable),
200            raw_body: raw,
201        })
202    }
203
204    /// Rotate the waitpoint HMAC secret on the server.
205    ///
206    /// Promotes the currently-installed kid to `previous_kid`
207    /// (accepted for the server's configured
208    /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
209    /// `new_secret_hex` under `new_kid` as the new current. Fans
210    /// out across every execution partition. Idempotent: re-running
211    /// with the same `(new_kid, new_secret_hex)` converges.
212    ///
213    /// The server returns 200 if at least one partition rotated OR
214    /// at least one partition was already rotating under a
215    /// concurrent request. See `RotateWaitpointSecretResponse`
216    /// fields for the breakdown.
217    ///
218    /// # Errors
219    ///
220    /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
221    ///   input, 401 missing/bad bearer, 429 concurrent rotate,
222    ///   500 all partitions failed, 504 server-side timeout).
223    /// * [`SdkError::Http`] — transport error (connect, body
224    ///   decode, client-side timeout).
225    ///
226    /// # Retry semantics
227    ///
228    /// Rotation is idempotent on the same `(new_kid,
229    /// new_secret_hex)` so retries are SAFE even on 504s or
230    /// partial failures.
231    pub async fn rotate_waitpoint_secret(
232        &self,
233        req: RotateWaitpointSecretRequest,
234    ) -> Result<RotateWaitpointSecretResponse, SdkError> {
235        let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
236        let resp = self
237            .http
238            .post(&url)
239            .json(&req)
240            .send()
241            .await
242            .map_err(|e| SdkError::Http {
243                source: e,
244                context: "POST /v1/admin/rotate-waitpoint-secret".into(),
245            })?;
246
247        let status = resp.status();
248        if status.is_success() {
249            return resp
250                .json::<RotateWaitpointSecretResponse>()
251                .await
252                .map_err(|e| SdkError::Http {
253                    source: e,
254                    context: "decode rotate-waitpoint-secret response body".into(),
255                });
256        }
257
258        // Non-2xx: parse the server's ErrorBody if we can, fall
259        // back to a raw body otherwise. Propagate body-read
260        // transport errors as Http rather than silently flattening
261        // them into `AdminApi { raw_body: "" }` — a connection drop
262        // mid-body-read is a transport fault, not an API-layer
263        // reject, and misclassifying it strips `is_retryable`'s
264        // timeout/connect signal from the caller.
265        let status_u16 = status.as_u16();
266        let raw = resp.text().await.map_err(|e| SdkError::Http {
267            source: e,
268            context: format!(
269                "read rotate-waitpoint-secret error response body (status {status_u16})"
270            ),
271        })?;
272        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
273        Err(SdkError::AdminApi {
274            status: status_u16,
275            message: parsed
276                .as_ref()
277                .map(|b| b.error.clone())
278                .unwrap_or_else(|| raw.clone()),
279            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
280            retryable: parsed.as_ref().and_then(|b| b.retryable),
281            raw_body: raw,
282        })
283    }
284}
285
286/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
287///
288/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
289#[derive(Debug, Clone, Serialize)]
290pub struct RotateWaitpointSecretRequest {
291    /// New key identifier. Non-empty, must not contain `:` (the
292    /// server uses `:` as the field separator in the secret hash).
293    pub new_kid: String,
294    /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
295    pub new_secret_hex: String,
296}
297
298/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
299///
300/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
301/// The server serializes this struct as-is via `Json(result)`.
302#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
303pub struct RotateWaitpointSecretResponse {
304    /// Count of partitions that accepted the rotation.
305    pub rotated: u16,
306    /// Partition indices where the rotation failed — operator
307    /// should investigate. Rotation is idempotent on the same
308    /// `(new_kid, new_secret_hex)` so a retry after the underlying
309    /// fault clears converges.
310    pub failed: Vec<u16>,
311    /// The `new_kid` that was installed as current on every
312    /// rotated partition — echoes the request field back for
313    /// confirmation.
314    pub new_kid: String,
315}
316
317/// Server-side error body shape, as emitted by
318/// `ff_server::api::ErrorBody`. Kept internal because consumers
319/// match on the flattened fields of [`SdkError::AdminApi`].
320#[derive(Debug, Clone, Deserialize)]
321struct AdminErrorBody {
322    error: String,
323    #[serde(default)]
324    kind: Option<String>,
325    #[serde(default)]
326    retryable: Option<bool>,
327}
328
329/// Request body for `POST /v1/workers/{worker_id}/claim`.
330///
331/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
332/// goes in the URL path (not the body) but is kept on the struct
333/// for ergonomics — callers don't juggle a separate arg.
334#[derive(Debug, Clone, Serialize)]
335pub struct ClaimForWorkerRequest {
336    #[serde(skip)]
337    pub worker_id: String,
338    pub lane_id: String,
339    pub worker_instance_id: String,
340    #[serde(default)]
341    pub capabilities: Vec<String>,
342    /// Grant TTL in milliseconds. Server rejects 0 or anything over
343    /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
344    pub grant_ttl_ms: u64,
345}
346
347/// Response body for `POST /v1/workers/{worker_id}/claim`.
348///
349/// Wire shape of `ff_core::contracts::ClaimGrant`. Carries the opaque
350/// [`ff_core::partition::PartitionKey`] directly on the wire (issue
351/// #91); the SDK reconstructs the core type via [`Self::into_grant`].
352#[derive(Debug, Clone, Deserialize)]
353pub struct ClaimForWorkerResponse {
354    pub execution_id: String,
355    pub partition_key: ff_core::partition::PartitionKey,
356    pub grant_key: String,
357    pub expires_at_ms: u64,
358}
359
360impl ClaimForWorkerResponse {
361    /// Convert the wire DTO into a typed
362    /// [`ff_core::contracts::ClaimGrant`] for handoff to
363    /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
364    /// [`SdkError::AdminApi`] on malformed execution_id — a drift
365    /// signal that the server and SDK disagree on the wire shape, so
366    /// failing loud prevents routing to a ghost partition.
367    ///
368    /// The `partition_key` itself is not eagerly parsed here: it is
369    /// carried opaquely to the `claim_from_grant` hot path, which
370    /// parses it there and surfaces a typed error on malformed keys.
371    pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
372        let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
373            .map_err(|e| SdkError::AdminApi {
374                status: 200,
375                message: format!(
376                    "claim_for_worker: server returned malformed execution_id '{}': {e}",
377                    self.execution_id
378                ),
379                kind: Some("malformed_response".to_owned()),
380                retryable: Some(false),
381                raw_body: String::new(),
382            })?;
383        Ok(ff_core::contracts::ClaimGrant {
384            execution_id,
385            partition_key: self.partition_key,
386            grant_key: self.grant_key,
387            expires_at_ms: self.expires_at_ms,
388        })
389    }
390}
391
392/// Per-partition outcome of a cluster-wide waitpoint HMAC secret
393/// rotation. Returned by [`rotate_waitpoint_hmac_secret_all_partitions`]
394/// so operators can audit which partitions rotated vs. no-op'd vs. failed.
395///
396/// The index is the execution-partition index (`0..num_partitions`),
397/// matching `{fp:N}` in the keyspace.
398#[derive(Debug)]
399pub struct PartitionRotationOutcome {
400    /// Execution partition index (`0..num_partitions`).
401    pub partition: u16,
402    /// FCALL outcome on this partition, or the error it raised.
403    pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
404}
405
406/// Rotate the waitpoint HMAC secret across every execution partition
407/// by fanning out the `ff_rotate_waitpoint_hmac_secret` FCALL.
408///
409/// This is the canonical Rust-side rotation path for direct-Valkey
410/// consumers (e.g. cairn-fabric) that cannot route through the
411/// `ff-server` admin REST endpoint. Callers who have an HTTP-reachable
412/// `ff-server` should prefer [`FlowFabricAdminClient::rotate_waitpoint_secret`] —
413/// that path adds a single-writer admission gate, parallel fan-out,
414/// structured audit events, and the server's configured grace window.
415///
416/// # Production rotation recipe
417///
418/// Operators MUST coordinate so secret rotation **precedes** any
419/// waitpoint resolution that will present the new `kid`. The broad
420/// sequence:
421///
422/// 1. Pick a fresh `new_kid` (must NOT contain `:` — the server uses
423///    `:` as the field separator in the secret hash).
424/// 2. Call this helper with the previous `kid`'s grace window
425///    (`grace_ms` — the duration during which tokens signed by the
426///    outgoing secret remain valid).
427/// 3. Only after this call returns with all partitions `Ok(_)` (either
428///    `Rotated` or `Noop`), begin signing new tokens with `new_kid`.
429/// 4. Retain the previous secret in the keystore until the grace
430///    window elapses — the FCALL handles GC of expired kids on every
431///    rotation, so just don't rotate again before the grace window.
432///
433/// See RFC-004 §rotation for the full 4-key HSET + `previous_expires_at`
434/// dance the FCALL implements server-side.
435///
436/// # Idempotency
437///
438/// Each partition FCALL is idempotent on the same `(new_kid,
439/// new_secret_hex)` pair: a replay with identical args returns
440/// `RotateWaitpointHmacSecretOutcome::Noop`. A same-kid-different-secret
441/// replay surfaces as a per-partition `SdkError` (wrapping
442/// `ScriptError::RotationConflict`) — pick a fresh `new_kid` to recover.
443///
444/// # Error semantics
445///
446/// A per-partition FCALL failure (transport fault, rotation conflict,
447/// etc.) is recorded on that partition's [`PartitionRotationOutcome`]
448/// and fan-out **continues** — the contract matches the server's
449/// `rotate_waitpoint_secret` (partial success is allowed, operators
450/// retry on the failed partition subset). Returning `Vec<_>` (not
451/// `Result<Vec<_>, _>`) is deliberate: every whole-call invariant is
452/// enforced by the underlying FCALL on each partition (kid non-empty,
453/// no `:`, even-length hex, etc.), so the aggregate has nothing left
454/// to reject at the Rust boundary. Callers decide how to treat partial
455/// failures (fail loud / retry the subset / record metrics).
456///
457/// # Concurrency + performance
458///
459/// Sequential (one partition at a time) to keep the helper dependency-
460/// free: no `futures::stream` / tokio-specific primitives on the caller
461/// path. For a cluster with N partitions and per-partition RTT R, the
462/// total duration is ~N*R. Consumers needing parallel fan-out should
463/// wrap this with `FuturesUnordered` themselves, or use the server
464/// admin endpoint (which fans out with bounded concurrency = 16).
465///
466/// # Test harness
467///
468/// The `ff-test::fixtures::TestCluster::rotate_waitpoint_hmac_secret`
469/// method is a thin wrapper around this helper — integration tests and
470/// production code exercise the same code path.
471///
472/// # Example
473///
474/// ```rust,ignore
475/// use ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions;
476///
477/// let results = rotate_waitpoint_hmac_secret_all_partitions(
478///     &client,
479///     partition_config.num_flow_partitions,
480///     "kid-2026-04-22",
481///     "deadbeef...64-hex-chars...",
482///     60_000,
483/// )
484/// .await?;
485///
486/// for entry in &results {
487///     match &entry.result {
488///         Ok(outcome) => tracing::info!(partition = entry.partition, ?outcome, "rotated"),
489///         Err(e) => tracing::error!(partition = entry.partition, %e, "rotation failed"),
490///     }
491/// }
492/// ```
493pub async fn rotate_waitpoint_hmac_secret_all_partitions(
494    client: &ferriskey::Client,
495    num_partitions: u16,
496    new_kid: &str,
497    new_secret_hex: &str,
498    grace_ms: u64,
499) -> Vec<PartitionRotationOutcome> {
500    // Hoisted out of the loop — `ff_rotate_waitpoint_hmac_secret` only
501    // borrows the args, so every partition can reuse the same struct.
502    // Avoids N × 2 string clones on the hot fan-out path.
503    let args = RotateWaitpointHmacSecretArgs {
504        new_kid: new_kid.to_owned(),
505        new_secret_hex: new_secret_hex.to_owned(),
506        grace_ms,
507    };
508    let mut out = Vec::with_capacity(num_partitions as usize);
509    for index in 0..num_partitions {
510        let partition = Partition {
511            family: PartitionFamily::Execution,
512            index,
513        };
514        let idx = IndexKeys::new(&partition);
515        let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
516            client, &idx, &args,
517        )
518        .await
519        .map_err(SdkError::from);
520        out.push(PartitionRotationOutcome {
521            partition: index,
522            result,
523        });
524    }
525    out
526}
527
528/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
529/// never produces `https://host//v1/...`. Mirror of
530/// media-pipeline's pattern.
531fn normalize_base_url(mut url: String) -> String {
532    while url.ends_with('/') {
533        url.pop();
534    }
535    url
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541
542    #[test]
543    fn base_url_strips_trailing_slash() {
544        assert_eq!(normalize_base_url("http://x".into()), "http://x");
545        assert_eq!(normalize_base_url("http://x/".into()), "http://x");
546        assert_eq!(normalize_base_url("http://x///".into()), "http://x");
547    }
548
549    #[test]
550    fn with_token_rejects_bad_header_chars() {
551        // Raw newline in the token would split the Authorization
552        // header — must fail loudly at construction.
553        let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
554        assert!(
555            matches!(err, SdkError::Config { .. }),
556            "got: {err:?}"
557        );
558    }
559
560    #[test]
561    fn with_token_rejects_empty_or_whitespace() {
562        // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
563        // Fail loudly at construction instead of shipping a client
564        // that silently 401s on first request.
565        for s in ["", " ", "\t\n ", "   "] {
566            let err = FlowFabricAdminClient::with_token("http://x", s)
567                .unwrap_err();
568            assert!(
569                matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
570                "token {s:?} should return Config with field=bearer_token; got: {err:?}"
571            );
572        }
573    }
574
575    #[test]
576    fn admin_error_body_deserialises_optional_fields() {
577        // `kind` + `retryable` absent (the usual shape for 400s).
578        let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
579        assert_eq!(b.error, "bad new_kid");
580        assert!(b.kind.is_none());
581        assert!(b.retryable.is_none());
582
583        // `kind` + `retryable` present (500 ValkeyError shape).
584        let b: AdminErrorBody = serde_json::from_str(
585            r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
586        )
587        .unwrap();
588        assert_eq!(b.error, "valkey: timed out");
589        assert_eq!(b.kind.as_deref(), Some("IoError"));
590        assert_eq!(b.retryable, Some(true));
591    }
592
593    #[test]
594    fn rotate_response_deserialises_server_shape() {
595        // Exact shape the server emits.
596        let raw = r#"{
597            "rotated": 3,
598            "failed": [4, 5],
599            "new_kid": "kid-2026-04-18"
600        }"#;
601        let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
602        assert_eq!(r.rotated, 3);
603        assert_eq!(r.failed, vec![4, 5]);
604        assert_eq!(r.new_kid, "kid-2026-04-18");
605    }
606
607    // ── ClaimForWorkerResponse::into_grant ──
608
609    fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
610        ClaimForWorkerResponse {
611            execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
612            partition_key: serde_json::from_str(
613                &serde_json::to_string(partition_key).unwrap(),
614            )
615            .unwrap(),
616            grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
617            expires_at_ms: 1_700_000_000_000,
618        }
619    }
620
621    #[test]
622    fn into_grant_preserves_all_known_partition_key_shapes() {
623        // Post-#91: families collapse into opaque PartitionKey literals.
624        // Flow and Execution both produce "{fp:N}"; Budget is "{b:N}";
625        // Quota is "{q:N}". The DTO preserves the wire string as-is;
626        // into_grant hands it opaquely to the core type.
627        for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
628            let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
629                panic!("key {key_str} should parse: {e:?}")
630            });
631            assert_eq!(g.partition_key.as_str(), key_str);
632            assert_eq!(g.expires_at_ms, 1_700_000_000_000);
633        }
634    }
635
636    #[test]
637    fn into_grant_preserves_opaque_partition_key() {
638        // The SDK does NOT eagerly parse the partition_key on the
639        // admin boundary — malformed keys are caught at the
640        // claim_from_grant hot path where the typed Partition is
641        // actually needed. This test pins the opacity contract.
642        let resp = sample_claim_response("{zz:0}");
643        let g = resp.into_grant().expect("SDK must not parse partition_key");
644        assert_eq!(g.partition_key.as_str(), "{zz:0}");
645        // Parsing surfaces the error explicitly.
646        assert!(g.partition().is_err());
647    }
648
649    #[test]
650    fn into_grant_rejects_malformed_execution_id() {
651        let mut resp = sample_claim_response("{fp:5}");
652        resp.execution_id = "not-a-valid-eid".to_owned();
653        let err = resp.into_grant().unwrap_err();
654        match err {
655            SdkError::AdminApi { message, kind, .. } => {
656                assert!(message.contains("malformed execution_id"),
657                    "msg: {message}");
658                assert_eq!(kind.as_deref(), Some("malformed_response"));
659            }
660            other => panic!("expected AdminApi, got {other:?}"),
661        }
662    }
663
664    // ── ClaimForWorkerResponse wire shape (issue #91) ──
665
666    // `rotate_waitpoint_hmac_secret_all_partitions` exercise coverage
667    // lives in `ff-test` — the integration test harness in
668    // `crates/ff-test/tests/waitpoint_hmac_rotation_fcall.rs` and
669    // `waitpoint_tokens.rs` calls through the function via the
670    // `TestCluster::rotate_waitpoint_hmac_secret` fixture, which is
671    // now a thin delegator. A pure unit test here would require a
672    // mock `ferriskey::Client` (ferriskey's `Client` performs a live
673    // RESP handshake on `ClientBuilder::build`, so a local TCP
674    // listener alone isn't sufficient) — expensive to construct for
675    // one-line iteration-count coverage.
676
677    #[test]
678    fn claim_for_worker_response_deserialises_opaque_partition_key() {
679        // Exact shape the server emits post-#91.
680        let raw = r#"{
681            "execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
682            "partition_key": "{fp:7}",
683            "grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
684            "expires_at_ms": 1700000000000
685        }"#;
686        let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
687        assert_eq!(r.partition_key.as_str(), "{fp:7}");
688        assert_eq!(r.expires_at_ms, 1_700_000_000_000);
689    }
690}