ff_sdk/admin.rs
1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::time::Duration;
18
19use ff_core::contracts::RotateWaitpointHmacSecretOutcome;
20// v0.12 PR-6: these imports only power the Valkey-typed
21// `rotate_waitpoint_hmac_secret_all_partitions` helper at the bottom
22// of the module; gated so the ungated module builds clean under
23// `--no-default-features --features sqlite`.
24#[cfg(feature = "valkey-default")]
25use ff_core::contracts::RotateWaitpointHmacSecretArgs;
26#[cfg(feature = "valkey-default")]
27use ff_core::keys::IndexKeys;
28#[cfg(feature = "valkey-default")]
29use ff_core::partition::{Partition, PartitionFamily};
30use serde::{Deserialize, Serialize};
31
32use crate::SdkError;
33
34/// Default per-request timeout. The server's own
35/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
36/// client deadline is LATER than the server deadline and
37/// operators see the structured 504 GATEWAY_TIMEOUT body rather
38/// than a client-side timeout error.
39const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
40
41/// Client for the `ff-server` admin REST surface.
42///
43/// Construct via [`FlowFabricAdminClient::new`] (no auth) or
44/// [`FlowFabricAdminClient::with_token`] (Bearer auth). Both
45/// return a ready-to-use client backed by a single pooled
46/// `reqwest::Client` — reuse the instance across calls instead of
47/// building one per request.
48#[derive(Debug, Clone)]
49pub struct FlowFabricAdminClient {
50 http: reqwest::Client,
51 base_url: String,
52}
53
54impl FlowFabricAdminClient {
55 /// Build a client without auth. Suitable for a dev ff-server
56 /// whose `api_token` is unconfigured. Production deployments
57 /// should use [`with_token`](Self::with_token).
58 pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
59 let http = reqwest::Client::builder()
60 .timeout(DEFAULT_TIMEOUT)
61 .build()
62 .map_err(|e| SdkError::Http {
63 source: e,
64 context: "build reqwest::Client".into(),
65 })?;
66 Ok(Self {
67 http,
68 base_url: normalize_base_url(base_url.into()),
69 })
70 }
71
72 /// Build a client that sends `Authorization: Bearer <token>` on
73 /// every request. The token is passed by value so the caller
74 /// retains ownership policy (e.g. zeroize on drop at the
75 /// caller side); the SDK only reads it.
76 ///
77 /// # Empty-token guard
78 ///
79 /// An empty or all-whitespace `token` returns
80 /// [`SdkError::Config`] instead of silently constructing
81 /// `Authorization: Bearer ` (which the server rejects with
82 /// 401, leaving the operator chasing a "why is auth broken"
83 /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
84 /// where the var was meant to be set; the unset-expansion is
85 /// the empty string. Prefer an obvious error at construction
86 /// over a silent 401 at first request.
87 ///
88 /// If the caller genuinely wants an unauthenticated client
89 /// (dev ff-server without `api_token` configured), use
90 /// [`FlowFabricAdminClient::new`] instead.
91 pub fn with_token(
92 base_url: impl Into<String>,
93 token: impl AsRef<str>,
94 ) -> Result<Self, SdkError> {
95 let token_str = token.as_ref();
96 if token_str.trim().is_empty() {
97 return Err(SdkError::Config {
98 context: "admin_client".into(),
99 field: Some("bearer_token".into()),
100 message: "is empty or all-whitespace; use \
101 FlowFabricAdminClient::new for unauthenticated access"
102 .into(),
103 });
104 }
105 let mut headers = reqwest::header::HeaderMap::new();
106 let mut auth_value =
107 reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
108 |_| SdkError::Config {
109 context: "admin_client".into(),
110 field: Some("bearer_token".into()),
111 message: "contains characters not valid in an HTTP header".into(),
112 },
113 )?;
114 // Mark Authorization as sensitive so it doesn't appear in
115 // reqwest's Debug output / logs.
116 auth_value.set_sensitive(true);
117 headers.insert(reqwest::header::AUTHORIZATION, auth_value);
118
119 let http = reqwest::Client::builder()
120 .timeout(DEFAULT_TIMEOUT)
121 .default_headers(headers)
122 .build()
123 .map_err(|e| SdkError::Http {
124 source: e,
125 context: "build reqwest::Client".into(),
126 })?;
127 Ok(Self {
128 http,
129 base_url: normalize_base_url(base_url.into()),
130 })
131 }
132
133 /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
134 ///
135 /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
136 /// server-side one: the request carries lane + identity +
137 /// capabilities + grant TTL; the server runs budget, quota, and
138 /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
139 /// and returns a `ClaimGrant` on success.
140 ///
141 /// Returns `Ok(None)` when the server responds 204 No Content
142 /// (no eligible execution on the lane). Callers that want to keep
143 /// polling should back off per their claim cadence.
144 pub async fn claim_for_worker(
145 &self,
146 req: ClaimForWorkerRequest,
147 ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
148 // Percent-encode `worker_id` in the URL path — `WorkerId` is a
149 // free-form string (could contain `/`, spaces, `%`, etc.) and
150 // splicing it verbatim would produce malformed URLs or
151 // misrouted paths. `Url::path_segments_mut().push` handles the
152 // encoding natively.
153 let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| SdkError::Config {
154 context: "admin_client: claim_for_worker".into(),
155 field: Some("base_url".into()),
156 message: format!("invalid base_url '{}': {e}", self.base_url),
157 })?;
158 {
159 let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
160 context: "admin_client: claim_for_worker".into(),
161 field: Some("base_url".into()),
162 message: format!("base_url cannot be a base URL: '{}'", self.base_url),
163 })?;
164 segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
165 }
166 let url = url.to_string();
167 let resp = self
168 .http
169 .post(&url)
170 .json(&req)
171 .send()
172 .await
173 .map_err(|e| SdkError::Http {
174 source: e,
175 context: "POST /v1/workers/{worker_id}/claim".into(),
176 })?;
177
178 let status = resp.status();
179 if status == reqwest::StatusCode::NO_CONTENT {
180 return Ok(None);
181 }
182 if status.is_success() {
183 return resp
184 .json::<ClaimForWorkerResponse>()
185 .await
186 .map(Some)
187 .map_err(|e| SdkError::Http {
188 source: e,
189 context: "decode claim_for_worker response body".into(),
190 });
191 }
192
193 // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
194 let status_u16 = status.as_u16();
195 let raw = resp.text().await.map_err(|e| SdkError::Http {
196 source: e,
197 context: format!("read claim_for_worker error body (status {status_u16})"),
198 })?;
199 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
200 Err(SdkError::AdminApi {
201 status: status_u16,
202 message: parsed
203 .as_ref()
204 .map(|b| b.error.clone())
205 .unwrap_or_else(|| raw.clone()),
206 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
207 retryable: parsed.as_ref().and_then(|b| b.retryable),
208 raw_body: raw,
209 })
210 }
211
212 /// Rotate the waitpoint HMAC secret on the server.
213 ///
214 /// Promotes the currently-installed kid to `previous_kid`
215 /// (accepted for the server's configured
216 /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
217 /// `new_secret_hex` under `new_kid` as the new current. Fans
218 /// out across every execution partition. Idempotent: re-running
219 /// with the same `(new_kid, new_secret_hex)` converges.
220 ///
221 /// The server returns 200 if at least one partition rotated OR
222 /// at least one partition was already rotating under a
223 /// concurrent request. See `RotateWaitpointSecretResponse`
224 /// fields for the breakdown.
225 ///
226 /// # Errors
227 ///
228 /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
229 /// input, 401 missing/bad bearer, 429 concurrent rotate,
230 /// 500 all partitions failed, 504 server-side timeout).
231 /// * [`SdkError::Http`] — transport error (connect, body
232 /// decode, client-side timeout).
233 ///
234 /// # Retry semantics
235 ///
236 /// Rotation is idempotent on the same `(new_kid,
237 /// new_secret_hex)` so retries are SAFE even on 504s or
238 /// partial failures.
239 pub async fn rotate_waitpoint_secret(
240 &self,
241 req: RotateWaitpointSecretRequest,
242 ) -> Result<RotateWaitpointSecretResponse, SdkError> {
243 let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
244 let resp = self
245 .http
246 .post(&url)
247 .json(&req)
248 .send()
249 .await
250 .map_err(|e| SdkError::Http {
251 source: e,
252 context: "POST /v1/admin/rotate-waitpoint-secret".into(),
253 })?;
254
255 let status = resp.status();
256 if status.is_success() {
257 return resp
258 .json::<RotateWaitpointSecretResponse>()
259 .await
260 .map_err(|e| SdkError::Http {
261 source: e,
262 context: "decode rotate-waitpoint-secret response body".into(),
263 });
264 }
265
266 // Non-2xx: parse the server's ErrorBody if we can, fall
267 // back to a raw body otherwise. Propagate body-read
268 // transport errors as Http rather than silently flattening
269 // them into `AdminApi { raw_body: "" }` — a connection drop
270 // mid-body-read is a transport fault, not an API-layer
271 // reject, and misclassifying it strips `is_retryable`'s
272 // timeout/connect signal from the caller.
273 let status_u16 = status.as_u16();
274 let raw = resp.text().await.map_err(|e| SdkError::Http {
275 source: e,
276 context: format!(
277 "read rotate-waitpoint-secret error response body (status {status_u16})"
278 ),
279 })?;
280 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
281 Err(SdkError::AdminApi {
282 status: status_u16,
283 message: parsed
284 .as_ref()
285 .map(|b| b.error.clone())
286 .unwrap_or_else(|| raw.clone()),
287 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
288 retryable: parsed.as_ref().and_then(|b| b.retryable),
289 raw_body: raw,
290 })
291 }
292
293 /// Request a lease-reclaim grant for an execution in
294 /// `lease_expired_reclaimable` or `lease_revoked` state (RFC-024
295 /// §3.5).
296 ///
297 /// Routes `POST /v1/executions/{execution_id}/reclaim`. The
298 /// ff-server handler dispatches through the `EngineBackend` trait
299 /// to whichever backend the server was started with (Valkey /
300 /// Postgres / SQLite).
301 ///
302 /// # worker_capabilities (RFC-024 §3.2 B-2)
303 ///
304 /// The request body carries `worker_capabilities`. Consumers typically
305 /// source these from their registered worker's configured
306 /// `WorkerConfig::capabilities`. Admission compares
307 /// `worker_capabilities` against the execution's
308 /// `required_capabilities` (persisted on `exec_core` at
309 /// `create_execution` time from
310 /// `ExecutionPolicy.routing_requirements.required_capabilities`);
311 /// any required capability missing from the worker set surfaces as
312 /// `IssueReclaimGrantResponse::NotReclaimable { detail:
313 /// "capability_mismatch: <missing csv>" }` (Lua
314 /// `ff_issue_reclaim_grant`, `crates/ff-script/src/flowfabric.lua`
315 /// §3969-3982; sqlite/PG backends mirror the check). The SDK does
316 /// not re-read worker state automatically — admin clients are not
317 /// bound to a worker — so the consumer threads the capabilities
318 /// through at call-time.
319 ///
320 /// `capability_hash` is NOT consulted for admission; it is stored
321 /// verbatim on the grant hash for audit / downstream observability
322 /// only.
323 ///
324 /// # Consumer flow (RFC-024 §4.4)
325 ///
326 /// 1. Consumer's `POST /v1/runs/:id/complete` returns
327 /// `lease_expired`.
328 /// 2. Consumer calls this method; handles
329 /// [`IssueReclaimGrantResponse::Granted`] → builds a
330 /// [`ff_core::contracts::ReclaimGrant`] via
331 /// [`IssueReclaimGrantResponse::into_grant`].
332 /// 3. Consumer passes the grant to
333 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`] along
334 /// with a fresh [`ff_core::contracts::ReclaimExecutionArgs`];
335 /// the new attempt is minted with `HandleKind::Reclaimed`.
336 /// 4. Consumer drives terminal writes on the fresh lease.
337 ///
338 /// # Errors
339 ///
340 /// * [`SdkError::AdminApi`] — non-2xx response. 404 when the
341 /// execution does not exist; 400 on malformed `execution_id` or
342 /// body.
343 /// * [`SdkError::Http`] — transport error (connect, body
344 /// decode, client-side timeout).
345 ///
346 /// # Retry semantics
347 ///
348 /// Idempotent on the Lua side: repeated calls against an execution
349 /// already re-leased (a concurrent reclaim beat this one) surface
350 /// as `NotReclaimable`. Safe to retry on transport faults.
351 pub async fn issue_reclaim_grant(
352 &self,
353 execution_id: &str,
354 req: IssueReclaimGrantRequest,
355 ) -> Result<IssueReclaimGrantResponse, SdkError> {
356 // Percent-encode `execution_id` in the URL path — the id is a
357 // free-form string and splicing verbatim would produce
358 // malformed URLs. Mirrors `claim_for_worker`'s handling.
359 let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| SdkError::Config {
360 context: "admin_client: issue_reclaim_grant".into(),
361 field: Some("base_url".into()),
362 message: format!("invalid base_url '{}': {e}", self.base_url),
363 })?;
364 {
365 let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
366 context: "admin_client: issue_reclaim_grant".into(),
367 field: Some("base_url".into()),
368 message: format!("base_url cannot be a base URL: '{}'", self.base_url),
369 })?;
370 segs.extend(&["v1", "executions", execution_id, "reclaim"]);
371 }
372 let url = url.to_string();
373 let resp = self
374 .http
375 .post(&url)
376 .json(&req)
377 .send()
378 .await
379 .map_err(|e| SdkError::Http {
380 source: e,
381 context: "POST /v1/executions/{id}/reclaim".into(),
382 })?;
383
384 let status = resp.status();
385 if status.is_success() {
386 return resp
387 .json::<IssueReclaimGrantResponse>()
388 .await
389 .map_err(|e| SdkError::Http {
390 source: e,
391 context: "decode issue_reclaim_grant response body".into(),
392 });
393 }
394
395 let status_u16 = status.as_u16();
396 let raw = resp.text().await.map_err(|e| SdkError::Http {
397 source: e,
398 context: format!(
399 "read issue_reclaim_grant error body (status {status_u16})"
400 ),
401 })?;
402 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
403 Err(SdkError::AdminApi {
404 status: status_u16,
405 message: parsed
406 .as_ref()
407 .map(|b| b.error.clone())
408 .unwrap_or_else(|| raw.clone()),
409 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
410 retryable: parsed.as_ref().and_then(|b| b.retryable),
411 raw_body: raw,
412 })
413 }
414}
415
416/// Request body for `POST /v1/executions/{execution_id}/reclaim`
417/// (RFC-024 §3.5).
418///
419/// Mirrors `ff_server::api::IssueReclaimGrantBody` 1:1. The
420/// `execution_id` goes in the URL path, not the body.
421#[derive(Debug, Clone, Serialize)]
422pub struct IssueReclaimGrantRequest {
423 /// Worker identity requesting the grant. The Lua
424 /// `ff_reclaim_execution` validates grant consumption via
425 /// `grant.worker_id == args.worker_id` (RFC-024 §4.4) — the
426 /// worker consuming the grant must match this value.
427 pub worker_id: String,
428 /// Worker-instance identity. Informational at grant-issuance
429 /// time; stored on the grant so consumers can correlate events.
430 pub worker_instance_id: String,
431 /// Lane the execution belongs to. Needed by
432 /// `ff_issue_reclaim_grant` for `KEYS[*]` construction.
433 pub lane_id: String,
434 /// Opaque capability-hash token stored verbatim on the issued
435 /// grant for audit / downstream observability. NOT used for
436 /// admission — admission compares `worker_capabilities` against
437 /// the execution's `required_capabilities` (see the
438 /// [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc).
439 /// `None` leaves the field empty on the grant.
440 #[serde(default, skip_serializing_if = "Option::is_none")]
441 pub capability_hash: Option<String>,
442 /// Grant TTL in milliseconds. Bounded server-side.
443 pub grant_ttl_ms: u64,
444 /// Route snapshot JSON carried onto the grant for audit.
445 #[serde(default, skip_serializing_if = "Option::is_none")]
446 pub route_snapshot_json: Option<String>,
447 /// Admission summary string carried onto the grant for audit.
448 #[serde(default, skip_serializing_if = "Option::is_none")]
449 pub admission_summary: Option<String>,
450 /// Worker capability tokens. Consumers typically source these
451 /// from their registered worker's `WorkerConfig::capabilities`
452 /// (see [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc
453 /// for the override contract).
454 #[serde(default)]
455 pub worker_capabilities: Vec<String>,
456}
457
458/// Response body for `POST /v1/executions/{execution_id}/reclaim`
459/// (RFC-024 §3.5).
460///
461/// The server serializes this struct with a `status` discriminator so
462/// consumers can match on structured outcomes without re-parsing a
463/// 200-vs-4xx split for business-logic outcomes (mirrors
464/// `RotateWaitpointSecretResponse`'s precedent).
465#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
466#[serde(tag = "status", rename_all = "snake_case")]
467pub enum IssueReclaimGrantResponse {
468 /// Grant issued. Build a
469 /// [`ff_core::contracts::ReclaimGrant`] via
470 /// [`Self::into_grant`] and feed it to
471 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
472 Granted {
473 execution_id: String,
474 partition_key: ff_core::partition::PartitionKey,
475 grant_key: String,
476 expires_at_ms: u64,
477 lane_id: String,
478 },
479 /// Execution is not in a reclaimable state (not
480 /// `lease_expired_reclaimable` / `lease_revoked`).
481 NotReclaimable {
482 execution_id: String,
483 detail: String,
484 },
485 /// `max_reclaim_count` exceeded; execution transitioned to
486 /// terminal_failed. Consumers stop retrying and surface a
487 /// structural failure.
488 ReclaimCapExceeded {
489 execution_id: String,
490 reclaim_count: u32,
491 },
492}
493
494impl IssueReclaimGrantResponse {
495 /// Convert a [`Self::Granted`] response into a typed
496 /// [`ff_core::contracts::ReclaimGrant`] for handoff to
497 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
498 ///
499 /// Returns [`SdkError::AdminApi`] when the wire variant is not
500 /// `Granted` (consumer asked for a grant but the server replied
501 /// with a terminal outcome) or when `execution_id` / `lane_id`
502 /// are malformed — the latter signals a drift between server and
503 /// SDK, so failing loud prevents silent misrouting.
504 pub fn into_grant(self) -> Result<ff_core::contracts::ReclaimGrant, SdkError> {
505 match self {
506 IssueReclaimGrantResponse::Granted {
507 execution_id,
508 partition_key,
509 grant_key,
510 expires_at_ms,
511 lane_id,
512 } => {
513 let eid = ff_core::types::ExecutionId::parse(&execution_id)
514 .map_err(|e| SdkError::AdminApi {
515 status: 200,
516 message: format!(
517 "issue_reclaim_grant: server returned malformed execution_id '{execution_id}': {e}"
518 ),
519 kind: Some("malformed_response".to_owned()),
520 retryable: Some(false),
521 raw_body: String::new(),
522 })?;
523 let lane = ff_core::types::LaneId::try_new(lane_id.clone())
524 .map_err(|e| SdkError::AdminApi {
525 status: 200,
526 message: format!(
527 "issue_reclaim_grant: server returned malformed lane_id '{lane_id}': {e}"
528 ),
529 kind: Some("malformed_response".to_owned()),
530 retryable: Some(false),
531 raw_body: String::new(),
532 })?;
533 Ok(ff_core::contracts::ReclaimGrant::new(
534 eid,
535 partition_key,
536 grant_key,
537 expires_at_ms,
538 lane,
539 ))
540 }
541 IssueReclaimGrantResponse::NotReclaimable { execution_id, detail } => {
542 Err(SdkError::AdminApi {
543 status: 200,
544 message: format!(
545 "issue_reclaim_grant: execution '{execution_id}' not reclaimable: {detail}"
546 ),
547 kind: Some("not_reclaimable".to_owned()),
548 retryable: Some(false),
549 raw_body: String::new(),
550 })
551 }
552 IssueReclaimGrantResponse::ReclaimCapExceeded {
553 execution_id,
554 reclaim_count,
555 } => Err(SdkError::AdminApi {
556 status: 200,
557 message: format!(
558 "issue_reclaim_grant: execution '{execution_id}' hit reclaim cap ({reclaim_count})"
559 ),
560 kind: Some("reclaim_cap_exceeded".to_owned()),
561 retryable: Some(false),
562 raw_body: String::new(),
563 }),
564 }
565 }
566}
567
568/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
569///
570/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
571#[derive(Debug, Clone, Serialize)]
572pub struct RotateWaitpointSecretRequest {
573 /// New key identifier. Non-empty, must not contain `:` (the
574 /// server uses `:` as the field separator in the secret hash).
575 pub new_kid: String,
576 /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
577 pub new_secret_hex: String,
578}
579
580/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
581///
582/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
583/// The server serializes this struct as-is via `Json(result)`.
584#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
585pub struct RotateWaitpointSecretResponse {
586 /// Count of partitions that accepted the rotation.
587 pub rotated: u16,
588 /// Partition indices where the rotation failed — operator
589 /// should investigate. Rotation is idempotent on the same
590 /// `(new_kid, new_secret_hex)` so a retry after the underlying
591 /// fault clears converges.
592 pub failed: Vec<u16>,
593 /// The `new_kid` that was installed as current on every
594 /// rotated partition — echoes the request field back for
595 /// confirmation.
596 pub new_kid: String,
597}
598
599/// Server-side error body shape, as emitted by
600/// `ff_server::api::ErrorBody`. Kept internal because consumers
601/// match on the flattened fields of [`SdkError::AdminApi`].
602#[derive(Debug, Clone, Deserialize)]
603struct AdminErrorBody {
604 error: String,
605 #[serde(default)]
606 kind: Option<String>,
607 #[serde(default)]
608 retryable: Option<bool>,
609}
610
611/// Request body for `POST /v1/workers/{worker_id}/claim`.
612///
613/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
614/// goes in the URL path (not the body) but is kept on the struct
615/// for ergonomics — callers don't juggle a separate arg.
616#[derive(Debug, Clone, Serialize)]
617pub struct ClaimForWorkerRequest {
618 #[serde(skip)]
619 pub worker_id: String,
620 pub lane_id: String,
621 pub worker_instance_id: String,
622 #[serde(default)]
623 pub capabilities: Vec<String>,
624 /// Grant TTL in milliseconds. Server rejects 0 or anything over
625 /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
626 pub grant_ttl_ms: u64,
627}
628
629/// Response body for `POST /v1/workers/{worker_id}/claim`.
630///
631/// Wire shape of `ff_core::contracts::ClaimGrant`. Carries the opaque
632/// [`ff_core::partition::PartitionKey`] directly on the wire (issue
633/// #91); the SDK reconstructs the core type via [`Self::into_grant`].
634#[derive(Debug, Clone, Deserialize)]
635pub struct ClaimForWorkerResponse {
636 pub execution_id: String,
637 pub partition_key: ff_core::partition::PartitionKey,
638 pub grant_key: String,
639 pub expires_at_ms: u64,
640}
641
642impl ClaimForWorkerResponse {
643 /// Convert the wire DTO into a typed
644 /// [`ff_core::contracts::ClaimGrant`] for handoff to
645 /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
646 /// [`SdkError::AdminApi`] on malformed execution_id — a drift
647 /// signal that the server and SDK disagree on the wire shape, so
648 /// failing loud prevents routing to a ghost partition.
649 ///
650 /// The `partition_key` itself is not eagerly parsed here: it is
651 /// carried opaquely to the `claim_from_grant` hot path, which
652 /// parses it there and surfaces a typed error on malformed keys.
653 pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
654 let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
655 .map_err(|e| SdkError::AdminApi {
656 status: 200,
657 message: format!(
658 "claim_for_worker: server returned malformed execution_id '{}': {e}",
659 self.execution_id
660 ),
661 kind: Some("malformed_response".to_owned()),
662 retryable: Some(false),
663 raw_body: String::new(),
664 })?;
665 Ok(ff_core::contracts::ClaimGrant::new(
666 execution_id,
667 self.partition_key,
668 self.grant_key,
669 self.expires_at_ms,
670 ))
671 }
672}
673
674/// Per-partition outcome of a cluster-wide waitpoint HMAC secret
675/// rotation. Returned by [`rotate_waitpoint_hmac_secret_all_partitions`]
676/// so operators can audit which partitions rotated vs. no-op'd vs. failed.
677///
678/// The index is the execution-partition index (`0..num_partitions`),
679/// matching `{fp:N}` in the keyspace.
680#[derive(Debug)]
681pub struct PartitionRotationOutcome {
682 /// Execution partition index (`0..num_partitions`).
683 pub partition: u16,
684 /// FCALL outcome on this partition, or the error it raised.
685 pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
686}
687
688/// Rotate the waitpoint HMAC secret across every execution partition
689/// by fanning out the `ff_rotate_waitpoint_hmac_secret` FCALL.
690///
691/// This is the canonical Rust-side rotation path for direct-Valkey
692/// consumers (e.g. cairn-fabric) that cannot route through the
693/// `ff-server` admin REST endpoint. Callers who have an HTTP-reachable
694/// `ff-server` should prefer [`FlowFabricAdminClient::rotate_waitpoint_secret`] —
695/// that path adds a single-writer admission gate, parallel fan-out,
696/// structured audit events, and the server's configured grace window.
697///
698/// # Production rotation recipe
699///
700/// Operators MUST coordinate so secret rotation **precedes** any
701/// waitpoint resolution that will present the new `kid`. The broad
702/// sequence:
703///
704/// 1. Pick a fresh `new_kid` (must NOT contain `:` — the server uses
705/// `:` as the field separator in the secret hash).
706/// 2. Call this helper with the previous `kid`'s grace window
707/// (`grace_ms` — the duration during which tokens signed by the
708/// outgoing secret remain valid).
709/// 3. Only after this call returns with all partitions `Ok(_)` (either
710/// `Rotated` or `Noop`), begin signing new tokens with `new_kid`.
711/// 4. Retain the previous secret in the keystore until the grace
712/// window elapses — the FCALL handles GC of expired kids on every
713/// rotation, so just don't rotate again before the grace window.
714///
715/// See RFC-004 §rotation for the full 4-key HSET + `previous_expires_at`
716/// dance the FCALL implements server-side.
717///
718/// # Idempotency
719///
720/// Each partition FCALL is idempotent on the same `(new_kid,
721/// new_secret_hex)` pair: a replay with identical args returns
722/// `RotateWaitpointHmacSecretOutcome::Noop`. A same-kid-different-secret
723/// replay surfaces as a per-partition `SdkError` (wrapping
724/// `ScriptError::RotationConflict`) — pick a fresh `new_kid` to recover.
725///
726/// # Error semantics
727///
728/// A per-partition FCALL failure (transport fault, rotation conflict,
729/// etc.) is recorded on that partition's [`PartitionRotationOutcome`]
730/// and fan-out **continues** — the contract matches the server's
731/// `rotate_waitpoint_secret` (partial success is allowed, operators
732/// retry on the failed partition subset). Returning `Vec<_>` (not
733/// `Result<Vec<_>, _>`) is deliberate: every whole-call invariant is
734/// enforced by the underlying FCALL on each partition (kid non-empty,
735/// no `:`, even-length hex, etc.), so the aggregate has nothing left
736/// to reject at the Rust boundary. Callers decide how to treat partial
737/// failures (fail loud / retry the subset / record metrics).
738///
739/// # Concurrency + performance
740///
741/// Sequential (one partition at a time) to keep the helper dependency-
742/// free: no `futures::stream` / tokio-specific primitives on the caller
743/// path. For a cluster with N partitions and per-partition RTT R, the
744/// total duration is ~N*R. Consumers needing parallel fan-out should
745/// wrap this with `FuturesUnordered` themselves, or use the server
746/// admin endpoint (which fans out with bounded concurrency = 16).
747///
748/// # Test harness
749///
750/// The `ff-test::fixtures::TestCluster::rotate_waitpoint_hmac_secret`
751/// method is a thin wrapper around this helper — integration tests and
752/// production code exercise the same code path.
753///
754/// # Example
755///
756/// ```rust,ignore
757/// use ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions;
758///
759/// let results = rotate_waitpoint_hmac_secret_all_partitions(
760/// &client,
761/// partition_config.num_flow_partitions,
762/// "kid-2026-04-22",
763/// "deadbeef...64-hex-chars...",
764/// 60_000,
765/// )
766/// .await?;
767///
768/// for entry in &results {
769/// match &entry.result {
770/// Ok(outcome) => tracing::info!(partition = entry.partition, ?outcome, "rotated"),
771/// Err(e) => tracing::error!(partition = entry.partition, %e, "rotation failed"),
772/// }
773/// }
774/// ```
775// v0.12 PR-6: the `admin` module is ungated at module level so
776// consumers under `--no-default-features --features sqlite` can reach
777// the HTTP admin client surface. This helper is the one remaining
778// Valkey-typed item in the module (takes a `&ferriskey::Client` and
779// fans out `ff_rotate_waitpoint_hmac_secret` FCALLs), so it stays
780// `valkey-default`-gated. See `lib.rs` PR-6 comment for the Option 1
781// / Option 2 decision.
782#[cfg(feature = "valkey-default")]
783pub async fn rotate_waitpoint_hmac_secret_all_partitions(
784 client: &ferriskey::Client,
785 num_partitions: u16,
786 new_kid: &str,
787 new_secret_hex: &str,
788 grace_ms: u64,
789) -> Vec<PartitionRotationOutcome> {
790 // Hoisted out of the loop — `ff_rotate_waitpoint_hmac_secret` only
791 // borrows the args, so every partition can reuse the same struct.
792 // Avoids N × 2 string clones on the hot fan-out path.
793 let args = RotateWaitpointHmacSecretArgs {
794 new_kid: new_kid.to_owned(),
795 new_secret_hex: new_secret_hex.to_owned(),
796 grace_ms,
797 };
798 let mut out = Vec::with_capacity(num_partitions as usize);
799 for index in 0..num_partitions {
800 let partition = Partition {
801 family: PartitionFamily::Execution,
802 index,
803 };
804 let idx = IndexKeys::new(&partition);
805 let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
806 client, &idx, &args,
807 )
808 .await
809 .map_err(SdkError::from);
810 out.push(PartitionRotationOutcome {
811 partition: index,
812 result,
813 });
814 }
815 out
816}
817
818/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
819/// never produces `https://host//v1/...`. Mirror of
820/// media-pipeline's pattern.
821fn normalize_base_url(mut url: String) -> String {
822 while url.ends_with('/') {
823 url.pop();
824 }
825 url
826}
827
828#[cfg(test)]
829mod tests {
830 use super::*;
831
832 #[test]
833 fn base_url_strips_trailing_slash() {
834 assert_eq!(normalize_base_url("http://x".into()), "http://x");
835 assert_eq!(normalize_base_url("http://x/".into()), "http://x");
836 assert_eq!(normalize_base_url("http://x///".into()), "http://x");
837 }
838
839 #[test]
840 fn with_token_rejects_bad_header_chars() {
841 // Raw newline in the token would split the Authorization
842 // header — must fail loudly at construction.
843 let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
844 assert!(
845 matches!(err, SdkError::Config { .. }),
846 "got: {err:?}"
847 );
848 }
849
850 #[test]
851 fn with_token_rejects_empty_or_whitespace() {
852 // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
853 // Fail loudly at construction instead of shipping a client
854 // that silently 401s on first request.
855 for s in ["", " ", "\t\n ", " "] {
856 let err = FlowFabricAdminClient::with_token("http://x", s)
857 .unwrap_err();
858 assert!(
859 matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
860 "token {s:?} should return Config with field=bearer_token; got: {err:?}"
861 );
862 }
863 }
864
865 #[test]
866 fn admin_error_body_deserialises_optional_fields() {
867 // `kind` + `retryable` absent (the usual shape for 400s).
868 let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
869 assert_eq!(b.error, "bad new_kid");
870 assert!(b.kind.is_none());
871 assert!(b.retryable.is_none());
872
873 // `kind` + `retryable` present (500 ValkeyError shape).
874 let b: AdminErrorBody = serde_json::from_str(
875 r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
876 )
877 .unwrap();
878 assert_eq!(b.error, "valkey: timed out");
879 assert_eq!(b.kind.as_deref(), Some("IoError"));
880 assert_eq!(b.retryable, Some(true));
881 }
882
883 #[test]
884 fn rotate_response_deserialises_server_shape() {
885 // Exact shape the server emits.
886 let raw = r#"{
887 "rotated": 3,
888 "failed": [4, 5],
889 "new_kid": "kid-2026-04-18"
890 }"#;
891 let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
892 assert_eq!(r.rotated, 3);
893 assert_eq!(r.failed, vec![4, 5]);
894 assert_eq!(r.new_kid, "kid-2026-04-18");
895 }
896
897 // ── ClaimForWorkerResponse::into_grant ──
898
899 fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
900 ClaimForWorkerResponse {
901 execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
902 partition_key: serde_json::from_str(
903 &serde_json::to_string(partition_key).unwrap(),
904 )
905 .unwrap(),
906 grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
907 expires_at_ms: 1_700_000_000_000,
908 }
909 }
910
911 #[test]
912 fn into_grant_preserves_all_known_partition_key_shapes() {
913 // Post-#91: families collapse into opaque PartitionKey literals.
914 // Flow and Execution both produce "{fp:N}"; Budget is "{b:N}";
915 // Quota is "{q:N}". The DTO preserves the wire string as-is;
916 // into_grant hands it opaquely to the core type.
917 for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
918 let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
919 panic!("key {key_str} should parse: {e:?}")
920 });
921 assert_eq!(g.partition_key.as_str(), key_str);
922 assert_eq!(g.expires_at_ms, 1_700_000_000_000);
923 }
924 }
925
926 #[test]
927 fn into_grant_preserves_opaque_partition_key() {
928 // The SDK does NOT eagerly parse the partition_key on the
929 // admin boundary — malformed keys are caught at the
930 // claim_from_grant hot path where the typed Partition is
931 // actually needed. This test pins the opacity contract.
932 let resp = sample_claim_response("{zz:0}");
933 let g = resp.into_grant().expect("SDK must not parse partition_key");
934 assert_eq!(g.partition_key.as_str(), "{zz:0}");
935 // Parsing surfaces the error explicitly.
936 assert!(g.partition().is_err());
937 }
938
939 #[test]
940 fn into_grant_rejects_malformed_execution_id() {
941 let mut resp = sample_claim_response("{fp:5}");
942 resp.execution_id = "not-a-valid-eid".to_owned();
943 let err = resp.into_grant().unwrap_err();
944 match err {
945 SdkError::AdminApi { message, kind, .. } => {
946 assert!(message.contains("malformed execution_id"),
947 "msg: {message}");
948 assert_eq!(kind.as_deref(), Some("malformed_response"));
949 }
950 other => panic!("expected AdminApi, got {other:?}"),
951 }
952 }
953
954 // ── ClaimForWorkerResponse wire shape (issue #91) ──
955
956 // `rotate_waitpoint_hmac_secret_all_partitions` exercise coverage
957 // lives in `ff-test` — the integration test harness in
958 // `crates/ff-test/tests/waitpoint_hmac_rotation_fcall.rs` and
959 // `waitpoint_tokens.rs` calls through the function via the
960 // `TestCluster::rotate_waitpoint_hmac_secret` fixture, which is
961 // now a thin delegator. A pure unit test here would require a
962 // mock `ferriskey::Client` (ferriskey's `Client` performs a live
963 // RESP handshake on `ClientBuilder::build`, so a local TCP
964 // listener alone isn't sufficient) — expensive to construct for
965 // one-line iteration-count coverage.
966
967 #[test]
968 fn claim_for_worker_response_deserialises_opaque_partition_key() {
969 // Exact shape the server emits post-#91.
970 let raw = r#"{
971 "execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
972 "partition_key": "{fp:7}",
973 "grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
974 "expires_at_ms": 1700000000000
975 }"#;
976 let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
977 assert_eq!(r.partition_key.as_str(), "{fp:7}");
978 assert_eq!(r.expires_at_ms, 1_700_000_000_000);
979 }
980}