Skip to main content

ff_sdk/
admin.rs

1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use ff_core::contracts::RotateWaitpointHmacSecretOutcome;
21use ff_core::engine_backend::EngineBackend;
22// v0.12 PR-6: these imports only power the Valkey-typed
23// `rotate_waitpoint_hmac_secret_all_partitions` helper at the bottom
24// of the module; gated so the ungated module builds clean under
25// `--no-default-features --features sqlite`.
26#[cfg(feature = "valkey-default")]
27use ff_core::contracts::RotateWaitpointHmacSecretArgs;
28#[cfg(feature = "valkey-default")]
29use ff_core::keys::IndexKeys;
30#[cfg(feature = "valkey-default")]
31use ff_core::partition::{Partition, PartitionFamily};
32use serde::{Deserialize, Serialize};
33
34use crate::SdkError;
35
36/// Default per-request timeout. The server's own
37/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
38/// client deadline is LATER than the server deadline and
39/// operators see the structured 504 GATEWAY_TIMEOUT body rather
40/// than a client-side timeout error.
41const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
42
43/// Grace window in ms that the embedded `rotate_waitpoint_secret`
44/// path forwards to the backend primitive. Matches `ff-server`'s
45/// default `FF_WAITPOINT_HMAC_GRACE_MS` (24 h) so tokens signed by
46/// the outgoing kid remain valid for a full day after rotation
47/// without the embedded-path hard-killing in-flight flows. HTTP
48/// callers get whatever the server was configured with; embedded
49/// callers get this pinned default.
50pub const EMBEDDED_WAITPOINT_HMAC_GRACE_MS: u64 = 86_400_000;
51
52/// Maximum grant TTL (ms) the embedded admin path accepts on
53/// `claim_for_worker` / `issue_reclaim_grant`. Matches
54/// `ff-server`'s `CLAIM_GRANT_TTL_MS_MAX` / `RECLAIM_GRANT_TTL_MS_MAX`
55/// so the embedded transport rejects the same range the HTTP
56/// transport does.
57const EMBEDDED_GRANT_TTL_MS_MAX: u64 = 60_000;
58
59/// Client for FlowFabric admin primitives — backend-agnostic facade
60/// (v0.13 SC-10 ergonomics).
61///
62/// Two construction shapes:
63///
64/// * [`FlowFabricAdminClient::new`] / [`FlowFabricAdminClient::with_token`]
65///   — HTTP transport targeting a running `ff-server`.
66/// * [`FlowFabricAdminClient::connect_with`] — embedded transport
67///   that dispatches directly through an `Arc<dyn EngineBackend>`.
68///   No `ff-server` required; works under `FF_DEV_MODE=1` + SQLite
69///   and in any in-process deployment.
70///
71/// The public method surface is identical across both transports;
72/// consumers choose at construction time. Admin methods that have
73/// no backend-trait equivalent return
74/// [`SdkError::AdminApi`] with status 503 on the embedded path —
75/// today every method on this client maps cleanly, so this fallback
76/// is only reached if a future admin primitive lands HTTP-first.
77#[derive(Debug, Clone)]
78pub struct FlowFabricAdminClient {
79    transport: AdminTransport,
80}
81
82/// Internal discriminator between the HTTP and embedded transports.
83/// Private by design — the public API is uniform across both shapes
84/// (see the [`FlowFabricAdminClient`] type-level docs).
85#[derive(Clone)]
86enum AdminTransport {
87    Http {
88        http: reqwest::Client,
89        base_url: String,
90    },
91    Embedded(Arc<dyn EngineBackend>),
92}
93
94impl std::fmt::Debug for AdminTransport {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            AdminTransport::Http { base_url, .. } => f
98                .debug_struct("Http")
99                .field("base_url", base_url)
100                .finish_non_exhaustive(),
101            AdminTransport::Embedded(backend) => f
102                .debug_struct("Embedded")
103                .field("backend", &backend.backend_label())
104                .finish(),
105        }
106    }
107}
108
109impl FlowFabricAdminClient {
110    /// Build a client without auth. Suitable for a dev ff-server
111    /// whose `api_token` is unconfigured. Production deployments
112    /// should use [`with_token`](Self::with_token).
113    pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
114        let http = reqwest::Client::builder()
115            .timeout(DEFAULT_TIMEOUT)
116            .build()
117            .map_err(|e| SdkError::Http {
118                source: e,
119                context: "build reqwest::Client".into(),
120            })?;
121        Ok(Self {
122            transport: AdminTransport::Http {
123                http,
124                base_url: normalize_base_url(base_url.into()),
125            },
126        })
127    }
128
129    /// Build a client that dispatches admin primitives directly
130    /// through an `Arc<dyn EngineBackend>`, bypassing HTTP entirely.
131    ///
132    /// # When to use
133    ///
134    /// * `FF_DEV_MODE=1` SQLite scenarios where no `ff-server` is
135    ///   running.
136    /// * In-process / embedded deployments that hold a backend
137    ///   handle already (e.g. tests, examples, scheduler-hosting
138    ///   binaries).
139    ///
140    /// # Semantic parity
141    ///
142    /// Each method on [`FlowFabricAdminClient`] dispatches to the
143    /// equivalent `EngineBackend` trait method (see the RFC-024 /
144    /// RFC-017 admin surfaces). Validation **rules** + request-body
145    /// translation mirror the server-side handler in `ff-server` so
146    /// callers get the same accept / reject behaviour across
147    /// transports. Note: the exact [`SdkError`] variant differs —
148    /// embedded-path validation rejects surface as [`SdkError::Config`]
149    /// (no HTTP round-trip) while HTTP returns [`SdkError::AdminApi`]
150    /// with status `400`. Callers that need to distinguish a 4xx from
151    /// a transport fault should use [`SdkError::is_retryable`] or
152    /// match on `Config` + `AdminApi` together rather than relying on
153    /// a single variant.
154    ///
155    /// `EngineError::Unavailable` from the backend — emitted by
156    /// backends that have not implemented a given method — is mapped
157    /// to [`SdkError::AdminApi`] with `status = 503` and
158    /// `kind = Some("unavailable")` so callers see a uniform
159    /// admin-error surface across transports.
160    ///
161    /// # Divergence from the HTTP transport
162    ///
163    /// * `rotate_waitpoint_secret` forwards
164    ///   [`EMBEDDED_WAITPOINT_HMAC_GRACE_MS`] (24 h, matching
165    ///   `ff-server`'s default `FF_WAITPOINT_HMAC_GRACE_MS`) as the
166    ///   per-partition grace window. The HTTP transport reads the
167    ///   server's env-configured value; the embedded client has no
168    ///   config surface so it pins the documented default.
169    /// * No single-writer admin semaphore, no audit-log emission.
170    ///   These are `ff-server` responsibilities; embedded consumers
171    ///   wanting them bring their own gate.
172    pub fn connect_with(backend: Arc<dyn EngineBackend>) -> Self {
173        Self {
174            transport: AdminTransport::Embedded(backend),
175        }
176    }
177
178    /// Build a client that sends `Authorization: Bearer <token>` on
179    /// every request. The token is passed by value so the caller
180    /// retains ownership policy (e.g. zeroize on drop at the
181    /// caller side); the SDK only reads it.
182    ///
183    /// # Empty-token guard
184    ///
185    /// An empty or all-whitespace `token` returns
186    /// [`SdkError::Config`] instead of silently constructing
187    /// `Authorization: Bearer ` (which the server rejects with
188    /// 401, leaving the operator chasing a "why is auth broken"
189    /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
190    /// where the var was meant to be set; the unset-expansion is
191    /// the empty string. Prefer an obvious error at construction
192    /// over a silent 401 at first request.
193    ///
194    /// If the caller genuinely wants an unauthenticated client
195    /// (dev ff-server without `api_token` configured), use
196    /// [`FlowFabricAdminClient::new`] instead.
197    pub fn with_token(
198        base_url: impl Into<String>,
199        token: impl AsRef<str>,
200    ) -> Result<Self, SdkError> {
201        let token_str = token.as_ref();
202        if token_str.trim().is_empty() {
203            return Err(SdkError::Config {
204                context: "admin_client".into(),
205                field: Some("bearer_token".into()),
206                message: "is empty or all-whitespace; use \
207                          FlowFabricAdminClient::new for unauthenticated access"
208                    .into(),
209            });
210        }
211        let mut headers = reqwest::header::HeaderMap::new();
212        let mut auth_value =
213            reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
214                |_| SdkError::Config {
215                    context: "admin_client".into(),
216                    field: Some("bearer_token".into()),
217                    message: "contains characters not valid in an HTTP header".into(),
218                },
219            )?;
220        // Mark Authorization as sensitive so it doesn't appear in
221        // reqwest's Debug output / logs.
222        auth_value.set_sensitive(true);
223        headers.insert(reqwest::header::AUTHORIZATION, auth_value);
224
225        let http = reqwest::Client::builder()
226            .timeout(DEFAULT_TIMEOUT)
227            .default_headers(headers)
228            .build()
229            .map_err(|e| SdkError::Http {
230                source: e,
231                context: "build reqwest::Client".into(),
232            })?;
233        Ok(Self {
234            transport: AdminTransport::Http {
235                http,
236                base_url: normalize_base_url(base_url.into()),
237            },
238        })
239    }
240
241    /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
242    ///
243    /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
244    /// server-side one: the request carries lane + identity +
245    /// capabilities + grant TTL; the server runs budget, quota, and
246    /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
247    /// and returns a `ClaimGrant` on success.
248    ///
249    /// Returns `Ok(None)` when the server responds 204 No Content
250    /// (no eligible execution on the lane). Callers that want to keep
251    /// polling should back off per their claim cadence.
252    pub async fn claim_for_worker(
253        &self,
254        req: ClaimForWorkerRequest,
255    ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
256        match &self.transport {
257            AdminTransport::Http { http, base_url } => {
258                claim_for_worker_http(http, base_url, req).await
259            }
260            AdminTransport::Embedded(backend) => {
261                claim_for_worker_embedded(backend.as_ref(), req).await
262            }
263        }
264    }
265
266    /// Rotate the waitpoint HMAC secret on the server.
267    ///
268    /// Promotes the currently-installed kid to `previous_kid`
269    /// (accepted for the server's configured
270    /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
271    /// `new_secret_hex` under `new_kid` as the new current. Fans
272    /// out across every execution partition. Idempotent: re-running
273    /// with the same `(new_kid, new_secret_hex)` converges.
274    ///
275    /// The server returns 200 if at least one partition rotated OR
276    /// at least one partition was already rotating under a
277    /// concurrent request. See `RotateWaitpointSecretResponse`
278    /// fields for the breakdown.
279    ///
280    /// # Errors
281    ///
282    /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
283    ///   input, 401 missing/bad bearer, 429 concurrent rotate,
284    ///   500 all partitions failed, 504 server-side timeout).
285    /// * [`SdkError::Http`] — transport error (connect, body
286    ///   decode, client-side timeout).
287    ///
288    /// # Retry semantics
289    ///
290    /// Rotation is idempotent on the same `(new_kid,
291    /// new_secret_hex)` so retries are SAFE even on 504s or
292    /// partial failures.
293    pub async fn rotate_waitpoint_secret(
294        &self,
295        req: RotateWaitpointSecretRequest,
296    ) -> Result<RotateWaitpointSecretResponse, SdkError> {
297        match &self.transport {
298            AdminTransport::Http { http, base_url } => {
299                rotate_waitpoint_secret_http(http, base_url, req).await
300            }
301            AdminTransport::Embedded(backend) => {
302                rotate_waitpoint_secret_embedded(backend.as_ref(), req).await
303            }
304        }
305    }
306
307    /// Read the raw HMAC `waitpoint_token` for a specific
308    /// `(execution_id, waitpoint_id)` pair.
309    ///
310    /// # Operator-only
311    ///
312    /// The sibling `list_pending_waitpoints` API intentionally sanitises
313    /// this field (RFC-017 Stage E4 / v0.8.0 §8) — consumers correlate
314    /// via `(token_kid, token_fingerprint)` and normally obtain the raw
315    /// token from their own worker's `SuspendOutcome` at suspend time.
316    /// This admin method re-exposes the token behind the server's
317    /// bearer-auth layer so operator tooling (approval CLIs,
318    /// external-callback bridges) can fetch a delivery credential
319    /// programmatically instead of copy-pasting from worker logs.
320    ///
321    /// Deployments MUST run `ff-server` with `api_token` configured
322    /// when exposing this endpoint to untrusted networks. The embedded
323    /// transport has no auth boundary — access is gated by whoever
324    /// holds the `Arc<dyn EngineBackend>`.
325    ///
326    /// # Returns
327    ///
328    /// * `Ok(Some(token))` — HMAC token string suitable for
329    ///   `DeliverSignalArgs::waitpoint_token` / `/signal` request body.
330    /// * `Ok(None)` — waitpoint is unknown, consumed, expired, or the
331    ///   stored token column is empty.
332    /// * `Err(SdkError::AdminApi { status: 503, kind: Some("unavailable") })`
333    ///   — the backend (e.g. an out-of-tree implementation) has not
334    ///   overridden `EngineBackend::read_waitpoint_token`.
335    pub async fn read_waitpoint_token(
336        &self,
337        execution_id: &ff_core::types::ExecutionId,
338        waitpoint_id: &ff_core::types::WaitpointId,
339    ) -> Result<Option<String>, SdkError> {
340        match &self.transport {
341            AdminTransport::Http { http, base_url } => {
342                read_waitpoint_token_http(http, base_url, execution_id, waitpoint_id).await
343            }
344            AdminTransport::Embedded(backend) => {
345                read_waitpoint_token_embedded(backend.as_ref(), execution_id, waitpoint_id).await
346            }
347        }
348    }
349
350    /// Request a lease-reclaim grant for an execution in
351    /// `lease_expired_reclaimable` or `lease_revoked` state (RFC-024
352    /// §3.5).
353    ///
354    /// Routes `POST /v1/executions/{execution_id}/reclaim`. The
355    /// ff-server handler dispatches through the `EngineBackend` trait
356    /// to whichever backend the server was started with (Valkey /
357    /// Postgres / SQLite).
358    ///
359    /// # worker_capabilities (RFC-024 §3.2 B-2)
360    ///
361    /// The request body carries `worker_capabilities`. Consumers typically
362    /// source these from their registered worker's configured
363    /// `WorkerConfig::capabilities`. Admission compares
364    /// `worker_capabilities` against the execution's
365    /// `required_capabilities` (persisted on `exec_core` at
366    /// `create_execution` time from
367    /// `ExecutionPolicy.routing_requirements.required_capabilities`);
368    /// any required capability missing from the worker set surfaces as
369    /// `IssueReclaimGrantResponse::NotReclaimable { detail:
370    /// "capability_mismatch: <missing csv>" }` (Lua
371    /// `ff_issue_reclaim_grant`, `crates/ff-script/src/flowfabric.lua`
372    /// §3969-3982; sqlite/PG backends mirror the check). The SDK does
373    /// not re-read worker state automatically — admin clients are not
374    /// bound to a worker — so the consumer threads the capabilities
375    /// through at call-time.
376    ///
377    /// `capability_hash` is NOT consulted for admission; it is stored
378    /// verbatim on the grant hash for audit / downstream observability
379    /// only.
380    ///
381    /// # Consumer flow (RFC-024 §4.4)
382    ///
383    /// 1. Consumer's `POST /v1/runs/:id/complete` returns
384    ///    `lease_expired`.
385    /// 2. Consumer calls this method; handles
386    ///    [`IssueReclaimGrantResponse::Granted`] → builds a
387    ///    [`ff_core::contracts::ReclaimGrant`] via
388    ///    [`IssueReclaimGrantResponse::into_grant`].
389    /// 3. Consumer passes the grant to
390    ///    [`crate::FlowFabricWorker::claim_from_reclaim_grant`] along
391    ///    with a fresh [`ff_core::contracts::ReclaimExecutionArgs`];
392    ///    the new attempt is minted with `HandleKind::Reclaimed`.
393    /// 4. Consumer drives terminal writes on the fresh lease.
394    ///
395    /// # Errors
396    ///
397    /// * [`SdkError::AdminApi`] — non-2xx response. 404 when the
398    ///   execution does not exist; 400 on malformed `execution_id` or
399    ///   body.
400    /// * [`SdkError::Http`] — transport error (connect, body
401    ///   decode, client-side timeout).
402    ///
403    /// # Retry semantics
404    ///
405    /// Idempotent on the Lua side: repeated calls against an execution
406    /// already re-leased (a concurrent reclaim beat this one) surface
407    /// as `NotReclaimable`. Safe to retry on transport faults.
408    pub async fn issue_reclaim_grant(
409        &self,
410        execution_id: &str,
411        req: IssueReclaimGrantRequest,
412    ) -> Result<IssueReclaimGrantResponse, SdkError> {
413        match &self.transport {
414            AdminTransport::Http { http, base_url } => {
415                issue_reclaim_grant_http(http, base_url, execution_id, req).await
416            }
417            AdminTransport::Embedded(backend) => {
418                issue_reclaim_grant_embedded(backend.as_ref(), execution_id, req).await
419            }
420        }
421    }
422}
423
424// ── HTTP-transport helpers (private) ─────────────────────────────────
425
426async fn claim_for_worker_http(
427    http: &reqwest::Client,
428    base_url: &str,
429    req: ClaimForWorkerRequest,
430) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
431    // Percent-encode `worker_id` in the URL path — `WorkerId` is a
432    // free-form string (could contain `/`, spaces, `%`, etc.) and
433    // splicing it verbatim would produce malformed URLs or
434    // misrouted paths. `Url::path_segments_mut().push` handles the
435    // encoding natively.
436    let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
437        context: "admin_client: claim_for_worker".into(),
438        field: Some("base_url".into()),
439        message: format!("invalid base_url '{}': {e}", base_url),
440    })?;
441    {
442        let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
443            context: "admin_client: claim_for_worker".into(),
444            field: Some("base_url".into()),
445            message: format!("base_url cannot be a base URL: '{}'", base_url),
446        })?;
447        segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
448    }
449    let url = url.to_string();
450    let resp = http
451        .post(&url)
452        .json(&req)
453        .send()
454        .await
455        .map_err(|e| SdkError::Http {
456            source: e,
457            context: "POST /v1/workers/{worker_id}/claim".into(),
458        })?;
459
460    let status = resp.status();
461    if status == reqwest::StatusCode::NO_CONTENT {
462        return Ok(None);
463    }
464    if status.is_success() {
465        return resp
466            .json::<ClaimForWorkerResponse>()
467            .await
468            .map(Some)
469            .map_err(|e| SdkError::Http {
470                source: e,
471                context: "decode claim_for_worker response body".into(),
472            });
473    }
474
475    // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
476    let status_u16 = status.as_u16();
477    let raw = resp.text().await.map_err(|e| SdkError::Http {
478        source: e,
479        context: format!("read claim_for_worker error body (status {status_u16})"),
480    })?;
481    let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
482    Err(SdkError::AdminApi {
483        status: status_u16,
484        message: parsed
485            .as_ref()
486            .map(|b| b.error.clone())
487            .unwrap_or_else(|| raw.clone()),
488        kind: parsed.as_ref().and_then(|b| b.kind.clone()),
489        retryable: parsed.as_ref().and_then(|b| b.retryable),
490        raw_body: raw,
491    })
492}
493
494async fn rotate_waitpoint_secret_http(
495    http: &reqwest::Client,
496    base_url: &str,
497    req: RotateWaitpointSecretRequest,
498) -> Result<RotateWaitpointSecretResponse, SdkError> {
499    let url = format!("{}/v1/admin/rotate-waitpoint-secret", base_url);
500    let resp = http
501        .post(&url)
502        .json(&req)
503        .send()
504        .await
505        .map_err(|e| SdkError::Http {
506            source: e,
507            context: "POST /v1/admin/rotate-waitpoint-secret".into(),
508        })?;
509
510    let status = resp.status();
511    if status.is_success() {
512        return resp
513            .json::<RotateWaitpointSecretResponse>()
514            .await
515            .map_err(|e| SdkError::Http {
516                source: e,
517                context: "decode rotate-waitpoint-secret response body".into(),
518            });
519    }
520
521    // Non-2xx: parse the server's ErrorBody if we can, fall
522    // back to a raw body otherwise. Propagate body-read
523    // transport errors as Http rather than silently flattening
524    // them into `AdminApi { raw_body: "" }` — a connection drop
525    // mid-body-read is a transport fault, not an API-layer
526    // reject, and misclassifying it strips `is_retryable`'s
527    // timeout/connect signal from the caller.
528    let status_u16 = status.as_u16();
529    let raw = resp.text().await.map_err(|e| SdkError::Http {
530        source: e,
531        context: format!(
532            "read rotate-waitpoint-secret error response body (status {status_u16})"
533        ),
534    })?;
535    let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
536    Err(SdkError::AdminApi {
537        status: status_u16,
538        message: parsed
539            .as_ref()
540            .map(|b| b.error.clone())
541            .unwrap_or_else(|| raw.clone()),
542        kind: parsed.as_ref().and_then(|b| b.kind.clone()),
543        retryable: parsed.as_ref().and_then(|b| b.retryable),
544        raw_body: raw,
545    })
546}
547
548async fn issue_reclaim_grant_http(
549    http: &reqwest::Client,
550    base_url: &str,
551    execution_id: &str,
552    req: IssueReclaimGrantRequest,
553) -> Result<IssueReclaimGrantResponse, SdkError> {
554    // Percent-encode `execution_id` in the URL path — the id is a
555    // free-form string and splicing verbatim would produce
556    // malformed URLs. Mirrors `claim_for_worker`'s handling.
557    let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
558        context: "admin_client: issue_reclaim_grant".into(),
559        field: Some("base_url".into()),
560        message: format!("invalid base_url '{}': {e}", base_url),
561    })?;
562    {
563        let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
564            context: "admin_client: issue_reclaim_grant".into(),
565            field: Some("base_url".into()),
566            message: format!("base_url cannot be a base URL: '{}'", base_url),
567        })?;
568        segs.extend(&["v1", "executions", execution_id, "reclaim"]);
569    }
570    let url = url.to_string();
571    let resp = http
572        .post(&url)
573        .json(&req)
574        .send()
575        .await
576        .map_err(|e| SdkError::Http {
577            source: e,
578            context: "POST /v1/executions/{id}/reclaim".into(),
579        })?;
580
581    let status = resp.status();
582    if status.is_success() {
583        return resp
584            .json::<IssueReclaimGrantResponse>()
585            .await
586            .map_err(|e| SdkError::Http {
587                source: e,
588                context: "decode issue_reclaim_grant response body".into(),
589            });
590    }
591
592    let status_u16 = status.as_u16();
593    let raw = resp.text().await.map_err(|e| SdkError::Http {
594        source: e,
595        context: format!(
596            "read issue_reclaim_grant error body (status {status_u16})"
597        ),
598    })?;
599    let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
600    Err(SdkError::AdminApi {
601        status: status_u16,
602        message: parsed
603            .as_ref()
604            .map(|b| b.error.clone())
605            .unwrap_or_else(|| raw.clone()),
606        kind: parsed.as_ref().and_then(|b| b.kind.clone()),
607        retryable: parsed.as_ref().and_then(|b| b.retryable),
608        raw_body: raw,
609    })
610}
611
612// ── Embedded-transport helpers (private) ─────────────────────────────
613//
614// Dispatch directly through the `EngineBackend` trait. The request
615// body validation mirrors the server-side handler in
616// `ff-server::api`. Translation between the wire DTOs and the core
617// `contracts::*` types lives here so consumers get identical
618// surfaces across transports.
619
620/// Validate a free-form identifier the same way `ff-server`'s
621/// `validate_identifier` does: non-empty, ≤256 bytes, no whitespace
622/// or control chars. Embedded-transport callers hit this before the
623/// request reaches the backend so invalid identifiers cannot leak
624/// past the SDK's parity guarantee.
625fn validate_admin_identifier(
626    op: &'static str,
627    field: &'static str,
628    value: &str,
629) -> Result<(), SdkError> {
630    if value.is_empty() {
631        return Err(SdkError::Config {
632            context: format!("admin_client: {op}"),
633            field: Some(field.into()),
634            message: "must not be empty".into(),
635        });
636    }
637    if value.len() > 256 {
638        return Err(SdkError::Config {
639            context: format!("admin_client: {op}"),
640            field: Some(field.into()),
641            message: format!("exceeds 256 bytes (got {})", value.len()),
642        });
643    }
644    if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
645        return Err(SdkError::Config {
646            context: format!("admin_client: {op}"),
647            field: Some(field.into()),
648            message: "must not contain whitespace or control characters".into(),
649        });
650    }
651    Ok(())
652}
653
654/// Bounded grant-TTL check mirroring `ff-server`'s
655/// `1..=CLAIM_GRANT_TTL_MS_MAX`. Shared between `claim_for_worker`
656/// and `issue_reclaim_grant` embedded paths.
657fn validate_admin_grant_ttl(op: &'static str, grant_ttl_ms: u64) -> Result<(), SdkError> {
658    if grant_ttl_ms == 0 || grant_ttl_ms > EMBEDDED_GRANT_TTL_MS_MAX {
659        return Err(SdkError::Config {
660            context: format!("admin_client: {op}"),
661            field: Some("grant_ttl_ms".into()),
662            message: format!("must be in 1..={EMBEDDED_GRANT_TTL_MS_MAX}"),
663        });
664    }
665    Ok(())
666}
667
668/// Map an `EngineError` from a backend call into the `SdkError::AdminApi`
669/// surface so embedded and HTTP transports emit the same shape. `Unavailable`
670/// becomes 503 with kind `"unavailable"`; every other engine error bubbles
671/// up via `SdkError::Engine`.
672fn engine_err_to_admin(err: ff_core::engine_error::EngineError, op: &str) -> SdkError {
673    if let ff_core::engine_error::EngineError::Unavailable { op: backend_op } = &err {
674        return SdkError::AdminApi {
675            status: 503,
676            message: format!(
677                "{op}: backend does not implement '{backend_op}' on this transport"
678            ),
679            kind: Some("unavailable".to_owned()),
680            retryable: Some(false),
681            raw_body: String::new(),
682        };
683    }
684    SdkError::Engine(Box::new(err))
685}
686
687async fn claim_for_worker_embedded(
688    backend: &dyn EngineBackend,
689    req: ClaimForWorkerRequest,
690) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
691    // Mirror ff-server's validation + translation. Errors surface as
692    // SdkError::Config so consumers see validation faults loud rather
693    // than as backend transport errors.
694    let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
695        context: "admin_client: claim_for_worker".into(),
696        field: Some("lane_id".into()),
697        message: e.to_string(),
698    })?;
699    validate_admin_identifier("claim_for_worker", "worker_id", &req.worker_id)?;
700    validate_admin_identifier(
701        "claim_for_worker",
702        "worker_instance_id",
703        &req.worker_instance_id,
704    )?;
705    validate_admin_grant_ttl("claim_for_worker", req.grant_ttl_ms)?;
706    let caps: std::collections::BTreeSet<String> = req.capabilities.into_iter().collect();
707    let args = ff_core::contracts::ClaimForWorkerArgs::new(
708        lane_id,
709        ff_core::types::WorkerId::new(req.worker_id),
710        ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
711        caps,
712        req.grant_ttl_ms,
713    );
714    match backend
715        .claim_for_worker(args)
716        .await
717        .map_err(|e| engine_err_to_admin(e, "claim_for_worker"))?
718    {
719        ff_core::contracts::ClaimForWorkerOutcome::NoWork => Ok(None),
720        ff_core::contracts::ClaimForWorkerOutcome::Granted(grant) => {
721            Ok(Some(ClaimForWorkerResponse {
722                execution_id: grant.execution_id.to_string(),
723                partition_key: grant.partition_key,
724                grant_key: grant.grant_key,
725                expires_at_ms: grant.expires_at_ms,
726            }))
727        }
728        // `ClaimForWorkerOutcome` is `#[non_exhaustive]`; mirror
729        // ff-server's 503 policy on unknown variants.
730        _ => Err(SdkError::AdminApi {
731            status: 503,
732            message: "claim_for_worker: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
733            kind: Some("unknown_outcome".to_owned()),
734            retryable: Some(false),
735            raw_body: String::new(),
736        }),
737    }
738}
739
740async fn issue_reclaim_grant_embedded(
741    backend: &dyn EngineBackend,
742    execution_id: &str,
743    req: IssueReclaimGrantRequest,
744) -> Result<IssueReclaimGrantResponse, SdkError> {
745    let exec_id = ff_core::types::ExecutionId::parse(execution_id).map_err(|e| SdkError::Config {
746        context: "admin_client: issue_reclaim_grant".into(),
747        field: Some("execution_id".into()),
748        message: e.to_string(),
749    })?;
750    let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
751        context: "admin_client: issue_reclaim_grant".into(),
752        field: Some("lane_id".into()),
753        message: e.to_string(),
754    })?;
755    validate_admin_identifier("issue_reclaim_grant", "worker_id", &req.worker_id)?;
756    validate_admin_identifier(
757        "issue_reclaim_grant",
758        "worker_instance_id",
759        &req.worker_instance_id,
760    )?;
761    validate_admin_grant_ttl("issue_reclaim_grant", req.grant_ttl_ms)?;
762    let caps: std::collections::BTreeSet<String> = req.worker_capabilities.into_iter().collect();
763    let args = ff_core::contracts::IssueReclaimGrantArgs::new(
764        exec_id,
765        ff_core::types::WorkerId::new(req.worker_id),
766        ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
767        lane_id,
768        req.capability_hash,
769        req.grant_ttl_ms,
770        req.route_snapshot_json,
771        req.admission_summary,
772        caps,
773        ff_core::types::TimestampMs::now(),
774    );
775    match backend
776        .issue_reclaim_grant(args)
777        .await
778        .map_err(|e| engine_err_to_admin(e, "issue_reclaim_grant"))?
779    {
780        ff_core::contracts::IssueReclaimGrantOutcome::Granted(grant) => {
781            Ok(IssueReclaimGrantResponse::Granted {
782                execution_id: grant.execution_id.to_string(),
783                partition_key: grant.partition_key,
784                grant_key: grant.grant_key,
785                expires_at_ms: grant.expires_at_ms,
786                lane_id: grant.lane_id.as_str().to_owned(),
787            })
788        }
789        ff_core::contracts::IssueReclaimGrantOutcome::NotReclaimable { execution_id, detail } => {
790            Ok(IssueReclaimGrantResponse::NotReclaimable {
791                execution_id: execution_id.to_string(),
792                detail,
793            })
794        }
795        ff_core::contracts::IssueReclaimGrantOutcome::ReclaimCapExceeded {
796            execution_id,
797            reclaim_count,
798        } => Ok(IssueReclaimGrantResponse::ReclaimCapExceeded {
799            execution_id: execution_id.to_string(),
800            reclaim_count,
801        }),
802        _ => Err(SdkError::AdminApi {
803            status: 503,
804            message: "issue_reclaim_grant: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
805            kind: Some("unknown_outcome".to_owned()),
806            retryable: Some(false),
807            raw_body: String::new(),
808        }),
809    }
810}
811
812async fn read_waitpoint_token_http(
813    http: &reqwest::Client,
814    base_url: &str,
815    execution_id: &ff_core::types::ExecutionId,
816    waitpoint_id: &ff_core::types::WaitpointId,
817) -> Result<Option<String>, SdkError> {
818    // Percent-encode both path segments: execution_id carries `{fp:N}:`
819    // hash-tag punctuation; waitpoint_id is a UUID but be defensive.
820    let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
821        context: "admin_client: read_waitpoint_token".into(),
822        field: Some("base_url".into()),
823        message: format!("invalid base_url '{}': {e}", base_url),
824    })?;
825    url.path_segments_mut()
826        .map_err(|_| SdkError::Config {
827            context: "admin_client: read_waitpoint_token".into(),
828            field: Some("base_url".into()),
829            message: format!("base_url '{}' cannot be a base", base_url),
830        })?
831        .extend(&[
832            "v1",
833            "executions",
834            execution_id.as_str(),
835            "waitpoints",
836            &waitpoint_id.to_string(),
837            "token",
838        ]);
839
840    let resp = http
841        .get(url.clone())
842        .send()
843        .await
844        .map_err(|e| SdkError::Http {
845            source: e,
846            context: format!("GET {url}"),
847        })?;
848    let status = resp.status();
849    if status == reqwest::StatusCode::NOT_FOUND {
850        return Ok(None);
851    }
852    if status.is_success() {
853        #[derive(Deserialize)]
854        struct Body {
855            token: String,
856        }
857        let body: Body = resp.json().await.map_err(|e| SdkError::Http {
858            source: e,
859            context: "decode read_waitpoint_token response body".into(),
860        })?;
861        return Ok(Some(body.token));
862    }
863
864    let status_u16 = status.as_u16();
865    let raw = resp.text().await.map_err(|e| SdkError::Http {
866        source: e,
867        context: format!("read read_waitpoint_token error body (status {status_u16})"),
868    })?;
869    let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
870    Err(SdkError::AdminApi {
871        status: status_u16,
872        message: parsed
873            .as_ref()
874            .map(|b| b.error.clone())
875            .unwrap_or_else(|| raw.clone()),
876        kind: parsed.as_ref().and_then(|b| b.kind.clone()),
877        retryable: parsed.as_ref().and_then(|b| b.retryable),
878        raw_body: raw,
879    })
880}
881
882async fn read_waitpoint_token_embedded(
883    backend: &dyn EngineBackend,
884    execution_id: &ff_core::types::ExecutionId,
885    waitpoint_id: &ff_core::types::WaitpointId,
886) -> Result<Option<String>, SdkError> {
887    let partition =
888        ff_core::partition::PartitionKey::from(&ff_core::partition::Partition {
889            family: ff_core::partition::PartitionFamily::Flow,
890            index: execution_id.partition(),
891        });
892    backend
893        .read_waitpoint_token(partition, waitpoint_id)
894        .await
895        .map_err(|e| engine_err_to_admin(e, "read_waitpoint_token"))
896}
897
898async fn rotate_waitpoint_secret_embedded(
899    backend: &dyn EngineBackend,
900    req: RotateWaitpointSecretRequest,
901) -> Result<RotateWaitpointSecretResponse, SdkError> {
902    // Validation mirrors `ff-server::Server::rotate_waitpoint_secret`
903    // so the embedded and HTTP transports reject the same invalid
904    // inputs.
905    if req.new_kid.is_empty() || req.new_kid.contains(':') {
906        return Err(SdkError::Config {
907            context: "admin_client: rotate_waitpoint_secret".into(),
908            field: Some("new_kid".into()),
909            message: "must be non-empty and must not contain ':'".into(),
910        });
911    }
912    if req.new_secret_hex.is_empty()
913        || !req.new_secret_hex.len().is_multiple_of(2)
914        || !req.new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
915    {
916        return Err(SdkError::Config {
917            context: "admin_client: rotate_waitpoint_secret".into(),
918            field: Some("new_secret_hex".into()),
919            message: "must be a non-empty even-length hex string".into(),
920        });
921    }
922    // Embedded consumers have no config surface from which to read
923    // the per-deployment grace window, so pin the documented default
924    // matching `ff-server`'s `FF_WAITPOINT_HMAC_GRACE_MS` (24 h). See
925    // `EMBEDDED_WAITPOINT_HMAC_GRACE_MS` + the `connect_with` rustdoc
926    // for the rationale and divergence-from-HTTP notes. Unlike the
927    // HTTP handler, this path does not enforce the 120 s end-to-end
928    // timeout (no HTTP deadline to honour) and does not emit the
929    // `audit`-target `waitpoint_hmac_rotation_*` events (those are
930    // server-owned operator signals). Rotation is idempotent on the
931    // same (new_kid, new_secret_hex) pair, so retries remain safe.
932    let args = ff_core::contracts::RotateWaitpointHmacSecretAllArgs::new(
933        req.new_kid.clone(),
934        req.new_secret_hex,
935        EMBEDDED_WAITPOINT_HMAC_GRACE_MS,
936    );
937    let result = backend
938        .rotate_waitpoint_hmac_secret_all(args)
939        .await
940        .map_err(|e| engine_err_to_admin(e, "rotate_waitpoint_secret"))?;
941
942    // Collapse the per-partition entries into the HTTP response shape
943    // the server emits — rotated count + failed indices + echoed
944    // new_kid — so consumers see identical return values across
945    // transports.
946    let mut rotated: u16 = 0;
947    let mut failed: Vec<u16> = Vec::new();
948    for entry in &result.entries {
949        match &entry.result {
950            Ok(_) => {
951                rotated = rotated.saturating_add(1);
952            }
953            Err(_) => failed.push(entry.partition),
954        }
955    }
956    Ok(RotateWaitpointSecretResponse {
957        rotated,
958        failed,
959        new_kid: req.new_kid,
960    })
961}
962
963/// Request body for `POST /v1/executions/{execution_id}/reclaim`
964/// (RFC-024 §3.5).
965///
966/// Mirrors `ff_server::api::IssueReclaimGrantBody` 1:1. The
967/// `execution_id` goes in the URL path, not the body.
968#[derive(Debug, Clone, Serialize)]
969pub struct IssueReclaimGrantRequest {
970    /// Worker identity requesting the grant. The Lua
971    /// `ff_reclaim_execution` validates grant consumption via
972    /// `grant.worker_id == args.worker_id` (RFC-024 §4.4) — the
973    /// worker consuming the grant must match this value.
974    pub worker_id: String,
975    /// Worker-instance identity. Informational at grant-issuance
976    /// time; stored on the grant so consumers can correlate events.
977    pub worker_instance_id: String,
978    /// Lane the execution belongs to. Needed by
979    /// `ff_issue_reclaim_grant` for `KEYS[*]` construction.
980    pub lane_id: String,
981    /// Opaque capability-hash token stored verbatim on the issued
982    /// grant for audit / downstream observability. NOT used for
983    /// admission — admission compares `worker_capabilities` against
984    /// the execution's `required_capabilities` (see the
985    /// [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc).
986    /// `None` leaves the field empty on the grant.
987    #[serde(default, skip_serializing_if = "Option::is_none")]
988    pub capability_hash: Option<String>,
989    /// Grant TTL in milliseconds. Bounded server-side.
990    pub grant_ttl_ms: u64,
991    /// Route snapshot JSON carried onto the grant for audit.
992    #[serde(default, skip_serializing_if = "Option::is_none")]
993    pub route_snapshot_json: Option<String>,
994    /// Admission summary string carried onto the grant for audit.
995    #[serde(default, skip_serializing_if = "Option::is_none")]
996    pub admission_summary: Option<String>,
997    /// Worker capability tokens. Consumers typically source these
998    /// from their registered worker's `WorkerConfig::capabilities`
999    /// (see [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc
1000    /// for the override contract).
1001    #[serde(default)]
1002    pub worker_capabilities: Vec<String>,
1003}
1004
1005/// Response body for `POST /v1/executions/{execution_id}/reclaim`
1006/// (RFC-024 §3.5).
1007///
1008/// The server serializes this struct with a `status` discriminator so
1009/// consumers can match on structured outcomes without re-parsing a
1010/// 200-vs-4xx split for business-logic outcomes (mirrors
1011/// `RotateWaitpointSecretResponse`'s precedent).
1012#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1013#[serde(tag = "status", rename_all = "snake_case")]
1014pub enum IssueReclaimGrantResponse {
1015    /// Grant issued. Build a
1016    /// [`ff_core::contracts::ReclaimGrant`] via
1017    /// [`Self::into_grant`] and feed it to
1018    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
1019    Granted {
1020        execution_id: String,
1021        partition_key: ff_core::partition::PartitionKey,
1022        grant_key: String,
1023        expires_at_ms: u64,
1024        lane_id: String,
1025    },
1026    /// Execution is not in a reclaimable state (not
1027    /// `lease_expired_reclaimable` / `lease_revoked`).
1028    NotReclaimable {
1029        execution_id: String,
1030        detail: String,
1031    },
1032    /// `max_reclaim_count` exceeded; execution transitioned to
1033    /// terminal_failed. Consumers stop retrying and surface a
1034    /// structural failure.
1035    ReclaimCapExceeded {
1036        execution_id: String,
1037        reclaim_count: u32,
1038    },
1039}
1040
1041impl IssueReclaimGrantResponse {
1042    /// Convert a [`Self::Granted`] response into a typed
1043    /// [`ff_core::contracts::ReclaimGrant`] for handoff to
1044    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
1045    ///
1046    /// Returns [`SdkError::AdminApi`] when the wire variant is not
1047    /// `Granted` (consumer asked for a grant but the server replied
1048    /// with a terminal outcome) or when `execution_id` / `lane_id`
1049    /// are malformed — the latter signals a drift between server and
1050    /// SDK, so failing loud prevents silent misrouting.
1051    pub fn into_grant(self) -> Result<ff_core::contracts::ReclaimGrant, SdkError> {
1052        match self {
1053            IssueReclaimGrantResponse::Granted {
1054                execution_id,
1055                partition_key,
1056                grant_key,
1057                expires_at_ms,
1058                lane_id,
1059            } => {
1060                let eid = ff_core::types::ExecutionId::parse(&execution_id)
1061                    .map_err(|e| SdkError::AdminApi {
1062                        status: 200,
1063                        message: format!(
1064                            "issue_reclaim_grant: server returned malformed execution_id '{execution_id}': {e}"
1065                        ),
1066                        kind: Some("malformed_response".to_owned()),
1067                        retryable: Some(false),
1068                        raw_body: String::new(),
1069                    })?;
1070                let lane = ff_core::types::LaneId::try_new(lane_id.clone())
1071                    .map_err(|e| SdkError::AdminApi {
1072                        status: 200,
1073                        message: format!(
1074                            "issue_reclaim_grant: server returned malformed lane_id '{lane_id}': {e}"
1075                        ),
1076                        kind: Some("malformed_response".to_owned()),
1077                        retryable: Some(false),
1078                        raw_body: String::new(),
1079                    })?;
1080                Ok(ff_core::contracts::ReclaimGrant::new(
1081                    eid,
1082                    partition_key,
1083                    grant_key,
1084                    expires_at_ms,
1085                    lane,
1086                ))
1087            }
1088            IssueReclaimGrantResponse::NotReclaimable { execution_id, detail } => {
1089                Err(SdkError::AdminApi {
1090                    status: 200,
1091                    message: format!(
1092                        "issue_reclaim_grant: execution '{execution_id}' not reclaimable: {detail}"
1093                    ),
1094                    kind: Some("not_reclaimable".to_owned()),
1095                    retryable: Some(false),
1096                    raw_body: String::new(),
1097                })
1098            }
1099            IssueReclaimGrantResponse::ReclaimCapExceeded {
1100                execution_id,
1101                reclaim_count,
1102            } => Err(SdkError::AdminApi {
1103                status: 200,
1104                message: format!(
1105                    "issue_reclaim_grant: execution '{execution_id}' hit reclaim cap ({reclaim_count})"
1106                ),
1107                kind: Some("reclaim_cap_exceeded".to_owned()),
1108                retryable: Some(false),
1109                raw_body: String::new(),
1110            }),
1111        }
1112    }
1113}
1114
1115/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
1116///
1117/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
1118#[derive(Debug, Clone, Serialize)]
1119pub struct RotateWaitpointSecretRequest {
1120    /// New key identifier. Non-empty, must not contain `:` (the
1121    /// server uses `:` as the field separator in the secret hash).
1122    pub new_kid: String,
1123    /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
1124    pub new_secret_hex: String,
1125}
1126
1127/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
1128///
1129/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
1130/// The server serializes this struct as-is via `Json(result)`.
1131#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1132pub struct RotateWaitpointSecretResponse {
1133    /// Count of partitions that accepted the rotation.
1134    pub rotated: u16,
1135    /// Partition indices where the rotation failed — operator
1136    /// should investigate. Rotation is idempotent on the same
1137    /// `(new_kid, new_secret_hex)` so a retry after the underlying
1138    /// fault clears converges.
1139    pub failed: Vec<u16>,
1140    /// The `new_kid` that was installed as current on every
1141    /// rotated partition — echoes the request field back for
1142    /// confirmation.
1143    pub new_kid: String,
1144}
1145
1146/// Server-side error body shape, as emitted by
1147/// `ff_server::api::ErrorBody`. Kept internal because consumers
1148/// match on the flattened fields of [`SdkError::AdminApi`].
1149#[derive(Debug, Clone, Deserialize)]
1150struct AdminErrorBody {
1151    error: String,
1152    #[serde(default)]
1153    kind: Option<String>,
1154    #[serde(default)]
1155    retryable: Option<bool>,
1156}
1157
1158/// Request body for `POST /v1/workers/{worker_id}/claim`.
1159///
1160/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
1161/// goes in the URL path (not the body) but is kept on the struct
1162/// for ergonomics — callers don't juggle a separate arg.
1163#[derive(Debug, Clone, Serialize)]
1164pub struct ClaimForWorkerRequest {
1165    #[serde(skip)]
1166    pub worker_id: String,
1167    pub lane_id: String,
1168    pub worker_instance_id: String,
1169    #[serde(default)]
1170    pub capabilities: Vec<String>,
1171    /// Grant TTL in milliseconds. Server rejects 0 or anything over
1172    /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
1173    pub grant_ttl_ms: u64,
1174}
1175
1176/// Response body for `POST /v1/workers/{worker_id}/claim`.
1177///
1178/// Wire shape of `ff_core::contracts::ClaimGrant`. Carries the opaque
1179/// [`ff_core::partition::PartitionKey`] directly on the wire (issue
1180/// #91); the SDK reconstructs the core type via [`Self::into_grant`].
1181#[derive(Debug, Clone, Deserialize)]
1182pub struct ClaimForWorkerResponse {
1183    pub execution_id: String,
1184    pub partition_key: ff_core::partition::PartitionKey,
1185    pub grant_key: String,
1186    pub expires_at_ms: u64,
1187}
1188
1189impl ClaimForWorkerResponse {
1190    /// Convert the wire DTO into a typed
1191    /// [`ff_core::contracts::ClaimGrant`] for handoff to
1192    /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
1193    /// [`SdkError::AdminApi`] on malformed execution_id — a drift
1194    /// signal that the server and SDK disagree on the wire shape, so
1195    /// failing loud prevents routing to a ghost partition.
1196    ///
1197    /// The `partition_key` itself is not eagerly parsed here: it is
1198    /// carried opaquely to the `claim_from_grant` hot path, which
1199    /// parses it there and surfaces a typed error on malformed keys.
1200    pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
1201        let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
1202            .map_err(|e| SdkError::AdminApi {
1203                status: 200,
1204                message: format!(
1205                    "claim_for_worker: server returned malformed execution_id '{}': {e}",
1206                    self.execution_id
1207                ),
1208                kind: Some("malformed_response".to_owned()),
1209                retryable: Some(false),
1210                raw_body: String::new(),
1211            })?;
1212        Ok(ff_core::contracts::ClaimGrant::new(
1213            execution_id,
1214            self.partition_key,
1215            self.grant_key,
1216            self.expires_at_ms,
1217        ))
1218    }
1219}
1220
1221/// Per-partition outcome of a cluster-wide waitpoint HMAC secret
1222/// rotation. Returned by [`rotate_waitpoint_hmac_secret_all_partitions`]
1223/// so operators can audit which partitions rotated vs. no-op'd vs. failed.
1224///
1225/// The index is the execution-partition index (`0..num_partitions`),
1226/// matching `{fp:N}` in the keyspace.
1227#[derive(Debug)]
1228pub struct PartitionRotationOutcome {
1229    /// Execution partition index (`0..num_partitions`).
1230    pub partition: u16,
1231    /// FCALL outcome on this partition, or the error it raised.
1232    pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
1233}
1234
1235/// Rotate the waitpoint HMAC secret across every execution partition
1236/// by fanning out the `ff_rotate_waitpoint_hmac_secret` FCALL.
1237///
1238/// This is the canonical Rust-side rotation path for direct-Valkey
1239/// consumers (e.g. cairn-fabric) that cannot route through the
1240/// `ff-server` admin REST endpoint. Callers who have an HTTP-reachable
1241/// `ff-server` should prefer [`FlowFabricAdminClient::rotate_waitpoint_secret`] —
1242/// that path adds a single-writer admission gate, parallel fan-out,
1243/// structured audit events, and the server's configured grace window.
1244///
1245/// # Production rotation recipe
1246///
1247/// Operators MUST coordinate so secret rotation **precedes** any
1248/// waitpoint resolution that will present the new `kid`. The broad
1249/// sequence:
1250///
1251/// 1. Pick a fresh `new_kid` (must NOT contain `:` — the server uses
1252///    `:` as the field separator in the secret hash).
1253/// 2. Call this helper with the previous `kid`'s grace window
1254///    (`grace_ms` — the duration during which tokens signed by the
1255///    outgoing secret remain valid).
1256/// 3. Only after this call returns with all partitions `Ok(_)` (either
1257///    `Rotated` or `Noop`), begin signing new tokens with `new_kid`.
1258/// 4. Retain the previous secret in the keystore until the grace
1259///    window elapses — the FCALL handles GC of expired kids on every
1260///    rotation, so just don't rotate again before the grace window.
1261///
1262/// See RFC-004 §rotation for the full 4-key HSET + `previous_expires_at`
1263/// dance the FCALL implements server-side.
1264///
1265/// # Idempotency
1266///
1267/// Each partition FCALL is idempotent on the same `(new_kid,
1268/// new_secret_hex)` pair: a replay with identical args returns
1269/// `RotateWaitpointHmacSecretOutcome::Noop`. A same-kid-different-secret
1270/// replay surfaces as a per-partition `SdkError` (wrapping
1271/// `ScriptError::RotationConflict`) — pick a fresh `new_kid` to recover.
1272///
1273/// # Error semantics
1274///
1275/// A per-partition FCALL failure (transport fault, rotation conflict,
1276/// etc.) is recorded on that partition's [`PartitionRotationOutcome`]
1277/// and fan-out **continues** — the contract matches the server's
1278/// `rotate_waitpoint_secret` (partial success is allowed, operators
1279/// retry on the failed partition subset). Returning `Vec<_>` (not
1280/// `Result<Vec<_>, _>`) is deliberate: every whole-call invariant is
1281/// enforced by the underlying FCALL on each partition (kid non-empty,
1282/// no `:`, even-length hex, etc.), so the aggregate has nothing left
1283/// to reject at the Rust boundary. Callers decide how to treat partial
1284/// failures (fail loud / retry the subset / record metrics).
1285///
1286/// # Concurrency + performance
1287///
1288/// Sequential (one partition at a time) to keep the helper dependency-
1289/// free: no `futures::stream` / tokio-specific primitives on the caller
1290/// path. For a cluster with N partitions and per-partition RTT R, the
1291/// total duration is ~N*R. Consumers needing parallel fan-out should
1292/// wrap this with `FuturesUnordered` themselves, or use the server
1293/// admin endpoint (which fans out with bounded concurrency = 16).
1294///
1295/// # Test harness
1296///
1297/// The `ff-test::fixtures::TestCluster::rotate_waitpoint_hmac_secret`
1298/// method is a thin wrapper around this helper — integration tests and
1299/// production code exercise the same code path.
1300///
1301/// # Example
1302///
1303/// ```rust,ignore
1304/// use ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions;
1305///
1306/// let results = rotate_waitpoint_hmac_secret_all_partitions(
1307///     &client,
1308///     partition_config.num_flow_partitions,
1309///     "kid-2026-04-22",
1310///     "deadbeef...64-hex-chars...",
1311///     60_000,
1312/// )
1313/// .await?;
1314///
1315/// for entry in &results {
1316///     match &entry.result {
1317///         Ok(outcome) => tracing::info!(partition = entry.partition, ?outcome, "rotated"),
1318///         Err(e) => tracing::error!(partition = entry.partition, %e, "rotation failed"),
1319///     }
1320/// }
1321/// ```
1322// v0.12 PR-6: the `admin` module is ungated at module level so
1323// consumers under `--no-default-features --features sqlite` can reach
1324// the HTTP admin client surface. This helper is the one remaining
1325// Valkey-typed item in the module (takes a `&ferriskey::Client` and
1326// fans out `ff_rotate_waitpoint_hmac_secret` FCALLs), so it stays
1327// `valkey-default`-gated. See `lib.rs` PR-6 comment for the Option 1
1328// / Option 2 decision.
1329#[cfg(feature = "valkey-default")]
1330pub async fn rotate_waitpoint_hmac_secret_all_partitions(
1331    client: &ferriskey::Client,
1332    num_partitions: u16,
1333    new_kid: &str,
1334    new_secret_hex: &str,
1335    grace_ms: u64,
1336) -> Vec<PartitionRotationOutcome> {
1337    // Hoisted out of the loop — `ff_rotate_waitpoint_hmac_secret` only
1338    // borrows the args, so every partition can reuse the same struct.
1339    // Avoids N × 2 string clones on the hot fan-out path.
1340    let args = RotateWaitpointHmacSecretArgs {
1341        new_kid: new_kid.to_owned(),
1342        new_secret_hex: new_secret_hex.to_owned(),
1343        grace_ms,
1344    };
1345    let mut out = Vec::with_capacity(num_partitions as usize);
1346    for index in 0..num_partitions {
1347        let partition = Partition {
1348            family: PartitionFamily::Execution,
1349            index,
1350        };
1351        let idx = IndexKeys::new(&partition);
1352        let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
1353            client, &idx, &args,
1354        )
1355        .await
1356        .map_err(SdkError::from);
1357        out.push(PartitionRotationOutcome {
1358            partition: index,
1359            result,
1360        });
1361    }
1362    out
1363}
1364
1365/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
1366/// never produces `https://host//v1/...`. Mirror of
1367/// media-pipeline's pattern.
1368fn normalize_base_url(mut url: String) -> String {
1369    while url.ends_with('/') {
1370        url.pop();
1371    }
1372    url
1373}
1374
1375#[cfg(test)]
1376mod tests {
1377    use super::*;
1378
1379    #[test]
1380    fn base_url_strips_trailing_slash() {
1381        assert_eq!(normalize_base_url("http://x".into()), "http://x");
1382        assert_eq!(normalize_base_url("http://x/".into()), "http://x");
1383        assert_eq!(normalize_base_url("http://x///".into()), "http://x");
1384    }
1385
1386    #[test]
1387    fn with_token_rejects_bad_header_chars() {
1388        // Raw newline in the token would split the Authorization
1389        // header — must fail loudly at construction.
1390        let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
1391        assert!(
1392            matches!(err, SdkError::Config { .. }),
1393            "got: {err:?}"
1394        );
1395    }
1396
1397    #[test]
1398    fn with_token_rejects_empty_or_whitespace() {
1399        // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
1400        // Fail loudly at construction instead of shipping a client
1401        // that silently 401s on first request.
1402        for s in ["", " ", "\t\n ", "   "] {
1403            let err = FlowFabricAdminClient::with_token("http://x", s)
1404                .unwrap_err();
1405            assert!(
1406                matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
1407                "token {s:?} should return Config with field=bearer_token; got: {err:?}"
1408            );
1409        }
1410    }
1411
1412    #[test]
1413    fn admin_error_body_deserialises_optional_fields() {
1414        // `kind` + `retryable` absent (the usual shape for 400s).
1415        let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
1416        assert_eq!(b.error, "bad new_kid");
1417        assert!(b.kind.is_none());
1418        assert!(b.retryable.is_none());
1419
1420        // `kind` + `retryable` present (500 ValkeyError shape).
1421        let b: AdminErrorBody = serde_json::from_str(
1422            r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
1423        )
1424        .unwrap();
1425        assert_eq!(b.error, "valkey: timed out");
1426        assert_eq!(b.kind.as_deref(), Some("IoError"));
1427        assert_eq!(b.retryable, Some(true));
1428    }
1429
1430    #[test]
1431    fn rotate_response_deserialises_server_shape() {
1432        // Exact shape the server emits.
1433        let raw = r#"{
1434            "rotated": 3,
1435            "failed": [4, 5],
1436            "new_kid": "kid-2026-04-18"
1437        }"#;
1438        let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
1439        assert_eq!(r.rotated, 3);
1440        assert_eq!(r.failed, vec![4, 5]);
1441        assert_eq!(r.new_kid, "kid-2026-04-18");
1442    }
1443
1444    // ── ClaimForWorkerResponse::into_grant ──
1445
1446    fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
1447        ClaimForWorkerResponse {
1448            execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
1449            partition_key: serde_json::from_str(
1450                &serde_json::to_string(partition_key).unwrap(),
1451            )
1452            .unwrap(),
1453            grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
1454            expires_at_ms: 1_700_000_000_000,
1455        }
1456    }
1457
1458    #[test]
1459    fn into_grant_preserves_all_known_partition_key_shapes() {
1460        // Post-#91: families collapse into opaque PartitionKey literals.
1461        // Flow and Execution both produce "{fp:N}"; Budget is "{b:N}";
1462        // Quota is "{q:N}". The DTO preserves the wire string as-is;
1463        // into_grant hands it opaquely to the core type.
1464        for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
1465            let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
1466                panic!("key {key_str} should parse: {e:?}")
1467            });
1468            assert_eq!(g.partition_key.as_str(), key_str);
1469            assert_eq!(g.expires_at_ms, 1_700_000_000_000);
1470        }
1471    }
1472
1473    #[test]
1474    fn into_grant_preserves_opaque_partition_key() {
1475        // The SDK does NOT eagerly parse the partition_key on the
1476        // admin boundary — malformed keys are caught at the
1477        // claim_from_grant hot path where the typed Partition is
1478        // actually needed. This test pins the opacity contract.
1479        let resp = sample_claim_response("{zz:0}");
1480        let g = resp.into_grant().expect("SDK must not parse partition_key");
1481        assert_eq!(g.partition_key.as_str(), "{zz:0}");
1482        // Parsing surfaces the error explicitly.
1483        assert!(g.partition().is_err());
1484    }
1485
1486    #[test]
1487    fn into_grant_rejects_malformed_execution_id() {
1488        let mut resp = sample_claim_response("{fp:5}");
1489        resp.execution_id = "not-a-valid-eid".to_owned();
1490        let err = resp.into_grant().unwrap_err();
1491        match err {
1492            SdkError::AdminApi { message, kind, .. } => {
1493                assert!(message.contains("malformed execution_id"),
1494                    "msg: {message}");
1495                assert_eq!(kind.as_deref(), Some("malformed_response"));
1496            }
1497            other => panic!("expected AdminApi, got {other:?}"),
1498        }
1499    }
1500
1501    // ── ClaimForWorkerResponse wire shape (issue #91) ──
1502
1503    // `rotate_waitpoint_hmac_secret_all_partitions` exercise coverage
1504    // lives in `ff-test` — the integration test harness in
1505    // `crates/ff-test/tests/waitpoint_hmac_rotation_fcall.rs` and
1506    // `waitpoint_tokens.rs` calls through the function via the
1507    // `TestCluster::rotate_waitpoint_hmac_secret` fixture, which is
1508    // now a thin delegator. A pure unit test here would require a
1509    // mock `ferriskey::Client` (ferriskey's `Client` performs a live
1510    // RESP handshake on `ClientBuilder::build`, so a local TCP
1511    // listener alone isn't sufficient) — expensive to construct for
1512    // one-line iteration-count coverage.
1513
1514    #[test]
1515    fn read_waitpoint_token_url_percent_encodes_path_segments() {
1516        // The execution id carries `{fp:N}:` literal punctuation;
1517        // a naïve `format!` splice would ship those chars unencoded
1518        // and the server would match the wrong route. Pin that the
1519        // reqwest URL builder percent-encodes each segment.
1520        use ff_core::types::{ExecutionId, WaitpointId};
1521
1522        let mut url = reqwest::Url::parse("http://x").unwrap();
1523        let execution_id = ExecutionId::parse(
1524            "{fp:7}:11111111-1111-1111-1111-111111111111",
1525        )
1526        .unwrap();
1527        let waitpoint_id = WaitpointId::parse("22222222-2222-2222-2222-222222222222")
1528            .unwrap();
1529        url.path_segments_mut()
1530            .unwrap()
1531            .extend(&[
1532                "v1",
1533                "executions",
1534                execution_id.as_str(),
1535                "waitpoints",
1536                &waitpoint_id.to_string(),
1537                "token",
1538            ]);
1539        assert_eq!(
1540            url.as_str(),
1541            "http://x/v1/executions/%7Bfp:7%7D:11111111-1111-1111-1111-111111111111\
1542             /waitpoints/22222222-2222-2222-2222-222222222222/token"
1543        );
1544    }
1545
1546    #[test]
1547    fn claim_for_worker_response_deserialises_opaque_partition_key() {
1548        // Exact shape the server emits post-#91.
1549        let raw = r#"{
1550            "execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
1551            "partition_key": "{fp:7}",
1552            "grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
1553            "expires_at_ms": 1700000000000
1554        }"#;
1555        let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
1556        assert_eq!(r.partition_key.as_str(), "{fp:7}");
1557        assert_eq!(r.expires_at_ms, 1_700_000_000_000);
1558    }
1559}