ff_sdk/admin.rs
1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::time::Duration;
18
19use ff_core::contracts::{RotateWaitpointHmacSecretArgs, RotateWaitpointHmacSecretOutcome};
20use ff_core::keys::IndexKeys;
21use ff_core::partition::{Partition, PartitionFamily};
22use serde::{Deserialize, Serialize};
23
24use crate::SdkError;
25
26/// Default per-request timeout. The server's own
27/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
28/// client deadline is LATER than the server deadline and
29/// operators see the structured 504 GATEWAY_TIMEOUT body rather
30/// than a client-side timeout error.
31const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
32
33/// Client for the `ff-server` admin REST surface.
34///
35/// Construct via [`FlowFabricAdminClient::new`] (no auth) or
36/// [`FlowFabricAdminClient::with_token`] (Bearer auth). Both
37/// return a ready-to-use client backed by a single pooled
38/// `reqwest::Client` — reuse the instance across calls instead of
39/// building one per request.
40#[derive(Debug, Clone)]
41pub struct FlowFabricAdminClient {
42 http: reqwest::Client,
43 base_url: String,
44}
45
46impl FlowFabricAdminClient {
47 /// Build a client without auth. Suitable for a dev ff-server
48 /// whose `api_token` is unconfigured. Production deployments
49 /// should use [`with_token`](Self::with_token).
50 pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
51 let http = reqwest::Client::builder()
52 .timeout(DEFAULT_TIMEOUT)
53 .build()
54 .map_err(|e| SdkError::Http {
55 source: e,
56 context: "build reqwest::Client".into(),
57 })?;
58 Ok(Self {
59 http,
60 base_url: normalize_base_url(base_url.into()),
61 })
62 }
63
64 /// Build a client that sends `Authorization: Bearer <token>` on
65 /// every request. The token is passed by value so the caller
66 /// retains ownership policy (e.g. zeroize on drop at the
67 /// caller side); the SDK only reads it.
68 ///
69 /// # Empty-token guard
70 ///
71 /// An empty or all-whitespace `token` returns
72 /// [`SdkError::Config`] instead of silently constructing
73 /// `Authorization: Bearer ` (which the server rejects with
74 /// 401, leaving the operator chasing a "why is auth broken"
75 /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
76 /// where the var was meant to be set; the unset-expansion is
77 /// the empty string. Prefer an obvious error at construction
78 /// over a silent 401 at first request.
79 ///
80 /// If the caller genuinely wants an unauthenticated client
81 /// (dev ff-server without `api_token` configured), use
82 /// [`FlowFabricAdminClient::new`] instead.
83 pub fn with_token(
84 base_url: impl Into<String>,
85 token: impl AsRef<str>,
86 ) -> Result<Self, SdkError> {
87 let token_str = token.as_ref();
88 if token_str.trim().is_empty() {
89 return Err(SdkError::Config {
90 context: "admin_client".into(),
91 field: Some("bearer_token".into()),
92 message: "is empty or all-whitespace; use \
93 FlowFabricAdminClient::new for unauthenticated access"
94 .into(),
95 });
96 }
97 let mut headers = reqwest::header::HeaderMap::new();
98 let mut auth_value =
99 reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
100 |_| SdkError::Config {
101 context: "admin_client".into(),
102 field: Some("bearer_token".into()),
103 message: "contains characters not valid in an HTTP header".into(),
104 },
105 )?;
106 // Mark Authorization as sensitive so it doesn't appear in
107 // reqwest's Debug output / logs.
108 auth_value.set_sensitive(true);
109 headers.insert(reqwest::header::AUTHORIZATION, auth_value);
110
111 let http = reqwest::Client::builder()
112 .timeout(DEFAULT_TIMEOUT)
113 .default_headers(headers)
114 .build()
115 .map_err(|e| SdkError::Http {
116 source: e,
117 context: "build reqwest::Client".into(),
118 })?;
119 Ok(Self {
120 http,
121 base_url: normalize_base_url(base_url.into()),
122 })
123 }
124
125 /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
126 ///
127 /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
128 /// server-side one: the request carries lane + identity +
129 /// capabilities + grant TTL; the server runs budget, quota, and
130 /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
131 /// and returns a `ClaimGrant` on success.
132 ///
133 /// Returns `Ok(None)` when the server responds 204 No Content
134 /// (no eligible execution on the lane). Callers that want to keep
135 /// polling should back off per their claim cadence.
136 pub async fn claim_for_worker(
137 &self,
138 req: ClaimForWorkerRequest,
139 ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
140 // Percent-encode `worker_id` in the URL path — `WorkerId` is a
141 // free-form string (could contain `/`, spaces, `%`, etc.) and
142 // splicing it verbatim would produce malformed URLs or
143 // misrouted paths. `Url::path_segments_mut().push` handles the
144 // encoding natively.
145 let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| SdkError::Config {
146 context: "admin_client: claim_for_worker".into(),
147 field: Some("base_url".into()),
148 message: format!("invalid base_url '{}': {e}", self.base_url),
149 })?;
150 {
151 let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
152 context: "admin_client: claim_for_worker".into(),
153 field: Some("base_url".into()),
154 message: format!("base_url cannot be a base URL: '{}'", self.base_url),
155 })?;
156 segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
157 }
158 let url = url.to_string();
159 let resp = self
160 .http
161 .post(&url)
162 .json(&req)
163 .send()
164 .await
165 .map_err(|e| SdkError::Http {
166 source: e,
167 context: "POST /v1/workers/{worker_id}/claim".into(),
168 })?;
169
170 let status = resp.status();
171 if status == reqwest::StatusCode::NO_CONTENT {
172 return Ok(None);
173 }
174 if status.is_success() {
175 return resp
176 .json::<ClaimForWorkerResponse>()
177 .await
178 .map(Some)
179 .map_err(|e| SdkError::Http {
180 source: e,
181 context: "decode claim_for_worker response body".into(),
182 });
183 }
184
185 // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
186 let status_u16 = status.as_u16();
187 let raw = resp.text().await.map_err(|e| SdkError::Http {
188 source: e,
189 context: format!("read claim_for_worker error body (status {status_u16})"),
190 })?;
191 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
192 Err(SdkError::AdminApi {
193 status: status_u16,
194 message: parsed
195 .as_ref()
196 .map(|b| b.error.clone())
197 .unwrap_or_else(|| raw.clone()),
198 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
199 retryable: parsed.as_ref().and_then(|b| b.retryable),
200 raw_body: raw,
201 })
202 }
203
204 /// Rotate the waitpoint HMAC secret on the server.
205 ///
206 /// Promotes the currently-installed kid to `previous_kid`
207 /// (accepted for the server's configured
208 /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
209 /// `new_secret_hex` under `new_kid` as the new current. Fans
210 /// out across every execution partition. Idempotent: re-running
211 /// with the same `(new_kid, new_secret_hex)` converges.
212 ///
213 /// The server returns 200 if at least one partition rotated OR
214 /// at least one partition was already rotating under a
215 /// concurrent request. See `RotateWaitpointSecretResponse`
216 /// fields for the breakdown.
217 ///
218 /// # Errors
219 ///
220 /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
221 /// input, 401 missing/bad bearer, 429 concurrent rotate,
222 /// 500 all partitions failed, 504 server-side timeout).
223 /// * [`SdkError::Http`] — transport error (connect, body
224 /// decode, client-side timeout).
225 ///
226 /// # Retry semantics
227 ///
228 /// Rotation is idempotent on the same `(new_kid,
229 /// new_secret_hex)` so retries are SAFE even on 504s or
230 /// partial failures.
231 pub async fn rotate_waitpoint_secret(
232 &self,
233 req: RotateWaitpointSecretRequest,
234 ) -> Result<RotateWaitpointSecretResponse, SdkError> {
235 let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
236 let resp = self
237 .http
238 .post(&url)
239 .json(&req)
240 .send()
241 .await
242 .map_err(|e| SdkError::Http {
243 source: e,
244 context: "POST /v1/admin/rotate-waitpoint-secret".into(),
245 })?;
246
247 let status = resp.status();
248 if status.is_success() {
249 return resp
250 .json::<RotateWaitpointSecretResponse>()
251 .await
252 .map_err(|e| SdkError::Http {
253 source: e,
254 context: "decode rotate-waitpoint-secret response body".into(),
255 });
256 }
257
258 // Non-2xx: parse the server's ErrorBody if we can, fall
259 // back to a raw body otherwise. Propagate body-read
260 // transport errors as Http rather than silently flattening
261 // them into `AdminApi { raw_body: "" }` — a connection drop
262 // mid-body-read is a transport fault, not an API-layer
263 // reject, and misclassifying it strips `is_retryable`'s
264 // timeout/connect signal from the caller.
265 let status_u16 = status.as_u16();
266 let raw = resp.text().await.map_err(|e| SdkError::Http {
267 source: e,
268 context: format!(
269 "read rotate-waitpoint-secret error response body (status {status_u16})"
270 ),
271 })?;
272 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
273 Err(SdkError::AdminApi {
274 status: status_u16,
275 message: parsed
276 .as_ref()
277 .map(|b| b.error.clone())
278 .unwrap_or_else(|| raw.clone()),
279 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
280 retryable: parsed.as_ref().and_then(|b| b.retryable),
281 raw_body: raw,
282 })
283 }
284}
285
286/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
287///
288/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
289#[derive(Debug, Clone, Serialize)]
290pub struct RotateWaitpointSecretRequest {
291 /// New key identifier. Non-empty, must not contain `:` (the
292 /// server uses `:` as the field separator in the secret hash).
293 pub new_kid: String,
294 /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
295 pub new_secret_hex: String,
296}
297
298/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
299///
300/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
301/// The server serializes this struct as-is via `Json(result)`.
302#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
303pub struct RotateWaitpointSecretResponse {
304 /// Count of partitions that accepted the rotation.
305 pub rotated: u16,
306 /// Partition indices where the rotation failed — operator
307 /// should investigate. Rotation is idempotent on the same
308 /// `(new_kid, new_secret_hex)` so a retry after the underlying
309 /// fault clears converges.
310 pub failed: Vec<u16>,
311 /// The `new_kid` that was installed as current on every
312 /// rotated partition — echoes the request field back for
313 /// confirmation.
314 pub new_kid: String,
315}
316
317/// Server-side error body shape, as emitted by
318/// `ff_server::api::ErrorBody`. Kept internal because consumers
319/// match on the flattened fields of [`SdkError::AdminApi`].
320#[derive(Debug, Clone, Deserialize)]
321struct AdminErrorBody {
322 error: String,
323 #[serde(default)]
324 kind: Option<String>,
325 #[serde(default)]
326 retryable: Option<bool>,
327}
328
329/// Request body for `POST /v1/workers/{worker_id}/claim`.
330///
331/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
332/// goes in the URL path (not the body) but is kept on the struct
333/// for ergonomics — callers don't juggle a separate arg.
334#[derive(Debug, Clone, Serialize)]
335pub struct ClaimForWorkerRequest {
336 #[serde(skip)]
337 pub worker_id: String,
338 pub lane_id: String,
339 pub worker_instance_id: String,
340 #[serde(default)]
341 pub capabilities: Vec<String>,
342 /// Grant TTL in milliseconds. Server rejects 0 or anything over
343 /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
344 pub grant_ttl_ms: u64,
345}
346
347/// Response body for `POST /v1/workers/{worker_id}/claim`.
348///
349/// Wire shape of `ff_core::contracts::ClaimGrant`. Carries the opaque
350/// [`ff_core::partition::PartitionKey`] directly on the wire (issue
351/// #91); the SDK reconstructs the core type via [`Self::into_grant`].
352#[derive(Debug, Clone, Deserialize)]
353pub struct ClaimForWorkerResponse {
354 pub execution_id: String,
355 pub partition_key: ff_core::partition::PartitionKey,
356 pub grant_key: String,
357 pub expires_at_ms: u64,
358}
359
360impl ClaimForWorkerResponse {
361 /// Convert the wire DTO into a typed
362 /// [`ff_core::contracts::ClaimGrant`] for handoff to
363 /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
364 /// [`SdkError::AdminApi`] on malformed execution_id — a drift
365 /// signal that the server and SDK disagree on the wire shape, so
366 /// failing loud prevents routing to a ghost partition.
367 ///
368 /// The `partition_key` itself is not eagerly parsed here: it is
369 /// carried opaquely to the `claim_from_grant` hot path, which
370 /// parses it there and surfaces a typed error on malformed keys.
371 pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
372 let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
373 .map_err(|e| SdkError::AdminApi {
374 status: 200,
375 message: format!(
376 "claim_for_worker: server returned malformed execution_id '{}': {e}",
377 self.execution_id
378 ),
379 kind: Some("malformed_response".to_owned()),
380 retryable: Some(false),
381 raw_body: String::new(),
382 })?;
383 Ok(ff_core::contracts::ClaimGrant {
384 execution_id,
385 partition_key: self.partition_key,
386 grant_key: self.grant_key,
387 expires_at_ms: self.expires_at_ms,
388 })
389 }
390}
391
392/// Per-partition outcome of a cluster-wide waitpoint HMAC secret
393/// rotation. Returned by [`rotate_waitpoint_hmac_secret_all_partitions`]
394/// so operators can audit which partitions rotated vs. no-op'd vs. failed.
395///
396/// The index is the execution-partition index (`0..num_partitions`),
397/// matching `{fp:N}` in the keyspace.
398#[derive(Debug)]
399pub struct PartitionRotationOutcome {
400 /// Execution partition index (`0..num_partitions`).
401 pub partition: u16,
402 /// FCALL outcome on this partition, or the error it raised.
403 pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
404}
405
406/// Rotate the waitpoint HMAC secret across every execution partition
407/// by fanning out the `ff_rotate_waitpoint_hmac_secret` FCALL.
408///
409/// This is the canonical Rust-side rotation path for direct-Valkey
410/// consumers (e.g. cairn-fabric) that cannot route through the
411/// `ff-server` admin REST endpoint. Callers who have an HTTP-reachable
412/// `ff-server` should prefer [`FlowFabricAdminClient::rotate_waitpoint_secret`] —
413/// that path adds a single-writer admission gate, parallel fan-out,
414/// structured audit events, and the server's configured grace window.
415///
416/// # Production rotation recipe
417///
418/// Operators MUST coordinate so secret rotation **precedes** any
419/// waitpoint resolution that will present the new `kid`. The broad
420/// sequence:
421///
422/// 1. Pick a fresh `new_kid` (must NOT contain `:` — the server uses
423/// `:` as the field separator in the secret hash).
424/// 2. Call this helper with the previous `kid`'s grace window
425/// (`grace_ms` — the duration during which tokens signed by the
426/// outgoing secret remain valid).
427/// 3. Only after this call returns with all partitions `Ok(_)` (either
428/// `Rotated` or `Noop`), begin signing new tokens with `new_kid`.
429/// 4. Retain the previous secret in the keystore until the grace
430/// window elapses — the FCALL handles GC of expired kids on every
431/// rotation, so just don't rotate again before the grace window.
432///
433/// See RFC-004 §rotation for the full 4-key HSET + `previous_expires_at`
434/// dance the FCALL implements server-side.
435///
436/// # Idempotency
437///
438/// Each partition FCALL is idempotent on the same `(new_kid,
439/// new_secret_hex)` pair: a replay with identical args returns
440/// `RotateWaitpointHmacSecretOutcome::Noop`. A same-kid-different-secret
441/// replay surfaces as a per-partition `SdkError` (wrapping
442/// `ScriptError::RotationConflict`) — pick a fresh `new_kid` to recover.
443///
444/// # Error semantics
445///
446/// A per-partition FCALL failure (transport fault, rotation conflict,
447/// etc.) is recorded on that partition's [`PartitionRotationOutcome`]
448/// and fan-out **continues** — the contract matches the server's
449/// `rotate_waitpoint_secret` (partial success is allowed, operators
450/// retry on the failed partition subset). Returning `Vec<_>` (not
451/// `Result<Vec<_>, _>`) is deliberate: every whole-call invariant is
452/// enforced by the underlying FCALL on each partition (kid non-empty,
453/// no `:`, even-length hex, etc.), so the aggregate has nothing left
454/// to reject at the Rust boundary. Callers decide how to treat partial
455/// failures (fail loud / retry the subset / record metrics).
456///
457/// # Concurrency + performance
458///
459/// Sequential (one partition at a time) to keep the helper dependency-
460/// free: no `futures::stream` / tokio-specific primitives on the caller
461/// path. For a cluster with N partitions and per-partition RTT R, the
462/// total duration is ~N*R. Consumers needing parallel fan-out should
463/// wrap this with `FuturesUnordered` themselves, or use the server
464/// admin endpoint (which fans out with bounded concurrency = 16).
465///
466/// # Test harness
467///
468/// The `ff-test::fixtures::TestCluster::rotate_waitpoint_hmac_secret`
469/// method is a thin wrapper around this helper — integration tests and
470/// production code exercise the same code path.
471///
472/// # Example
473///
474/// ```rust,ignore
475/// use ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions;
476///
477/// let results = rotate_waitpoint_hmac_secret_all_partitions(
478/// &client,
479/// partition_config.num_flow_partitions,
480/// "kid-2026-04-22",
481/// "deadbeef...64-hex-chars...",
482/// 60_000,
483/// )
484/// .await?;
485///
486/// for entry in &results {
487/// match &entry.result {
488/// Ok(outcome) => tracing::info!(partition = entry.partition, ?outcome, "rotated"),
489/// Err(e) => tracing::error!(partition = entry.partition, %e, "rotation failed"),
490/// }
491/// }
492/// ```
493pub async fn rotate_waitpoint_hmac_secret_all_partitions(
494 client: &ferriskey::Client,
495 num_partitions: u16,
496 new_kid: &str,
497 new_secret_hex: &str,
498 grace_ms: u64,
499) -> Vec<PartitionRotationOutcome> {
500 // Hoisted out of the loop — `ff_rotate_waitpoint_hmac_secret` only
501 // borrows the args, so every partition can reuse the same struct.
502 // Avoids N × 2 string clones on the hot fan-out path.
503 let args = RotateWaitpointHmacSecretArgs {
504 new_kid: new_kid.to_owned(),
505 new_secret_hex: new_secret_hex.to_owned(),
506 grace_ms,
507 };
508 let mut out = Vec::with_capacity(num_partitions as usize);
509 for index in 0..num_partitions {
510 let partition = Partition {
511 family: PartitionFamily::Execution,
512 index,
513 };
514 let idx = IndexKeys::new(&partition);
515 let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
516 client, &idx, &args,
517 )
518 .await
519 .map_err(SdkError::from);
520 out.push(PartitionRotationOutcome {
521 partition: index,
522 result,
523 });
524 }
525 out
526}
527
528/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
529/// never produces `https://host//v1/...`. Mirror of
530/// media-pipeline's pattern.
531fn normalize_base_url(mut url: String) -> String {
532 while url.ends_with('/') {
533 url.pop();
534 }
535 url
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541
542 #[test]
543 fn base_url_strips_trailing_slash() {
544 assert_eq!(normalize_base_url("http://x".into()), "http://x");
545 assert_eq!(normalize_base_url("http://x/".into()), "http://x");
546 assert_eq!(normalize_base_url("http://x///".into()), "http://x");
547 }
548
549 #[test]
550 fn with_token_rejects_bad_header_chars() {
551 // Raw newline in the token would split the Authorization
552 // header — must fail loudly at construction.
553 let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
554 assert!(
555 matches!(err, SdkError::Config { .. }),
556 "got: {err:?}"
557 );
558 }
559
560 #[test]
561 fn with_token_rejects_empty_or_whitespace() {
562 // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
563 // Fail loudly at construction instead of shipping a client
564 // that silently 401s on first request.
565 for s in ["", " ", "\t\n ", " "] {
566 let err = FlowFabricAdminClient::with_token("http://x", s)
567 .unwrap_err();
568 assert!(
569 matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
570 "token {s:?} should return Config with field=bearer_token; got: {err:?}"
571 );
572 }
573 }
574
575 #[test]
576 fn admin_error_body_deserialises_optional_fields() {
577 // `kind` + `retryable` absent (the usual shape for 400s).
578 let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
579 assert_eq!(b.error, "bad new_kid");
580 assert!(b.kind.is_none());
581 assert!(b.retryable.is_none());
582
583 // `kind` + `retryable` present (500 ValkeyError shape).
584 let b: AdminErrorBody = serde_json::from_str(
585 r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
586 )
587 .unwrap();
588 assert_eq!(b.error, "valkey: timed out");
589 assert_eq!(b.kind.as_deref(), Some("IoError"));
590 assert_eq!(b.retryable, Some(true));
591 }
592
593 #[test]
594 fn rotate_response_deserialises_server_shape() {
595 // Exact shape the server emits.
596 let raw = r#"{
597 "rotated": 3,
598 "failed": [4, 5],
599 "new_kid": "kid-2026-04-18"
600 }"#;
601 let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
602 assert_eq!(r.rotated, 3);
603 assert_eq!(r.failed, vec![4, 5]);
604 assert_eq!(r.new_kid, "kid-2026-04-18");
605 }
606
607 // ── ClaimForWorkerResponse::into_grant ──
608
609 fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
610 ClaimForWorkerResponse {
611 execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
612 partition_key: serde_json::from_str(
613 &serde_json::to_string(partition_key).unwrap(),
614 )
615 .unwrap(),
616 grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
617 expires_at_ms: 1_700_000_000_000,
618 }
619 }
620
621 #[test]
622 fn into_grant_preserves_all_known_partition_key_shapes() {
623 // Post-#91: families collapse into opaque PartitionKey literals.
624 // Flow and Execution both produce "{fp:N}"; Budget is "{b:N}";
625 // Quota is "{q:N}". The DTO preserves the wire string as-is;
626 // into_grant hands it opaquely to the core type.
627 for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
628 let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
629 panic!("key {key_str} should parse: {e:?}")
630 });
631 assert_eq!(g.partition_key.as_str(), key_str);
632 assert_eq!(g.expires_at_ms, 1_700_000_000_000);
633 }
634 }
635
636 #[test]
637 fn into_grant_preserves_opaque_partition_key() {
638 // The SDK does NOT eagerly parse the partition_key on the
639 // admin boundary — malformed keys are caught at the
640 // claim_from_grant hot path where the typed Partition is
641 // actually needed. This test pins the opacity contract.
642 let resp = sample_claim_response("{zz:0}");
643 let g = resp.into_grant().expect("SDK must not parse partition_key");
644 assert_eq!(g.partition_key.as_str(), "{zz:0}");
645 // Parsing surfaces the error explicitly.
646 assert!(g.partition().is_err());
647 }
648
649 #[test]
650 fn into_grant_rejects_malformed_execution_id() {
651 let mut resp = sample_claim_response("{fp:5}");
652 resp.execution_id = "not-a-valid-eid".to_owned();
653 let err = resp.into_grant().unwrap_err();
654 match err {
655 SdkError::AdminApi { message, kind, .. } => {
656 assert!(message.contains("malformed execution_id"),
657 "msg: {message}");
658 assert_eq!(kind.as_deref(), Some("malformed_response"));
659 }
660 other => panic!("expected AdminApi, got {other:?}"),
661 }
662 }
663
664 // ── ClaimForWorkerResponse wire shape (issue #91) ──
665
666 // `rotate_waitpoint_hmac_secret_all_partitions` exercise coverage
667 // lives in `ff-test` — the integration test harness in
668 // `crates/ff-test/tests/waitpoint_hmac_rotation_fcall.rs` and
669 // `waitpoint_tokens.rs` calls through the function via the
670 // `TestCluster::rotate_waitpoint_hmac_secret` fixture, which is
671 // now a thin delegator. A pure unit test here would require a
672 // mock `ferriskey::Client` (ferriskey's `Client` performs a live
673 // RESP handshake on `ClientBuilder::build`, so a local TCP
674 // listener alone isn't sufficient) — expensive to construct for
675 // one-line iteration-count coverage.
676
677 #[test]
678 fn claim_for_worker_response_deserialises_opaque_partition_key() {
679 // Exact shape the server emits post-#91.
680 let raw = r#"{
681 "execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
682 "partition_key": "{fp:7}",
683 "grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
684 "expires_at_ms": 1700000000000
685 }"#;
686 let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
687 assert_eq!(r.partition_key.as_str(), "{fp:7}");
688 assert_eq!(r.expires_at_ms, 1_700_000_000_000);
689 }
690}