Skip to main content

ff_sdk/
admin.rs

1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::time::Duration;
18
19use ff_core::contracts::RotateWaitpointHmacSecretOutcome;
20// v0.12 PR-6: these imports only power the Valkey-typed
21// `rotate_waitpoint_hmac_secret_all_partitions` helper at the bottom
22// of the module; gated so the ungated module builds clean under
23// `--no-default-features --features sqlite`.
24#[cfg(feature = "valkey-default")]
25use ff_core::contracts::RotateWaitpointHmacSecretArgs;
26#[cfg(feature = "valkey-default")]
27use ff_core::keys::IndexKeys;
28#[cfg(feature = "valkey-default")]
29use ff_core::partition::{Partition, PartitionFamily};
30use serde::{Deserialize, Serialize};
31
32use crate::SdkError;
33
34/// Default per-request timeout. The server's own
35/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
36/// client deadline is LATER than the server deadline and
37/// operators see the structured 504 GATEWAY_TIMEOUT body rather
38/// than a client-side timeout error.
39const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
40
41/// Client for the `ff-server` admin REST surface.
42///
43/// Construct via [`FlowFabricAdminClient::new`] (no auth) or
44/// [`FlowFabricAdminClient::with_token`] (Bearer auth). Both
45/// return a ready-to-use client backed by a single pooled
46/// `reqwest::Client` — reuse the instance across calls instead of
47/// building one per request.
48#[derive(Debug, Clone)]
49pub struct FlowFabricAdminClient {
50    http: reqwest::Client,
51    base_url: String,
52}
53
54impl FlowFabricAdminClient {
55    /// Build a client without auth. Suitable for a dev ff-server
56    /// whose `api_token` is unconfigured. Production deployments
57    /// should use [`with_token`](Self::with_token).
58    pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
59        let http = reqwest::Client::builder()
60            .timeout(DEFAULT_TIMEOUT)
61            .build()
62            .map_err(|e| SdkError::Http {
63                source: e,
64                context: "build reqwest::Client".into(),
65            })?;
66        Ok(Self {
67            http,
68            base_url: normalize_base_url(base_url.into()),
69        })
70    }
71
72    /// Build a client that sends `Authorization: Bearer <token>` on
73    /// every request. The token is passed by value so the caller
74    /// retains ownership policy (e.g. zeroize on drop at the
75    /// caller side); the SDK only reads it.
76    ///
77    /// # Empty-token guard
78    ///
79    /// An empty or all-whitespace `token` returns
80    /// [`SdkError::Config`] instead of silently constructing
81    /// `Authorization: Bearer ` (which the server rejects with
82    /// 401, leaving the operator chasing a "why is auth broken"
83    /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
84    /// where the var was meant to be set; the unset-expansion is
85    /// the empty string. Prefer an obvious error at construction
86    /// over a silent 401 at first request.
87    ///
88    /// If the caller genuinely wants an unauthenticated client
89    /// (dev ff-server without `api_token` configured), use
90    /// [`FlowFabricAdminClient::new`] instead.
91    pub fn with_token(
92        base_url: impl Into<String>,
93        token: impl AsRef<str>,
94    ) -> Result<Self, SdkError> {
95        let token_str = token.as_ref();
96        if token_str.trim().is_empty() {
97            return Err(SdkError::Config {
98                context: "admin_client".into(),
99                field: Some("bearer_token".into()),
100                message: "is empty or all-whitespace; use \
101                          FlowFabricAdminClient::new for unauthenticated access"
102                    .into(),
103            });
104        }
105        let mut headers = reqwest::header::HeaderMap::new();
106        let mut auth_value =
107            reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
108                |_| SdkError::Config {
109                    context: "admin_client".into(),
110                    field: Some("bearer_token".into()),
111                    message: "contains characters not valid in an HTTP header".into(),
112                },
113            )?;
114        // Mark Authorization as sensitive so it doesn't appear in
115        // reqwest's Debug output / logs.
116        auth_value.set_sensitive(true);
117        headers.insert(reqwest::header::AUTHORIZATION, auth_value);
118
119        let http = reqwest::Client::builder()
120            .timeout(DEFAULT_TIMEOUT)
121            .default_headers(headers)
122            .build()
123            .map_err(|e| SdkError::Http {
124                source: e,
125                context: "build reqwest::Client".into(),
126            })?;
127        Ok(Self {
128            http,
129            base_url: normalize_base_url(base_url.into()),
130        })
131    }
132
133    /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
134    ///
135    /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
136    /// server-side one: the request carries lane + identity +
137    /// capabilities + grant TTL; the server runs budget, quota, and
138    /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
139    /// and returns a `ClaimGrant` on success.
140    ///
141    /// Returns `Ok(None)` when the server responds 204 No Content
142    /// (no eligible execution on the lane). Callers that want to keep
143    /// polling should back off per their claim cadence.
144    pub async fn claim_for_worker(
145        &self,
146        req: ClaimForWorkerRequest,
147    ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
148        // Percent-encode `worker_id` in the URL path — `WorkerId` is a
149        // free-form string (could contain `/`, spaces, `%`, etc.) and
150        // splicing it verbatim would produce malformed URLs or
151        // misrouted paths. `Url::path_segments_mut().push` handles the
152        // encoding natively.
153        let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| SdkError::Config {
154            context: "admin_client: claim_for_worker".into(),
155            field: Some("base_url".into()),
156            message: format!("invalid base_url '{}': {e}", self.base_url),
157        })?;
158        {
159            let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
160                context: "admin_client: claim_for_worker".into(),
161                field: Some("base_url".into()),
162                message: format!("base_url cannot be a base URL: '{}'", self.base_url),
163            })?;
164            segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
165        }
166        let url = url.to_string();
167        let resp = self
168            .http
169            .post(&url)
170            .json(&req)
171            .send()
172            .await
173            .map_err(|e| SdkError::Http {
174                source: e,
175                context: "POST /v1/workers/{worker_id}/claim".into(),
176            })?;
177
178        let status = resp.status();
179        if status == reqwest::StatusCode::NO_CONTENT {
180            return Ok(None);
181        }
182        if status.is_success() {
183            return resp
184                .json::<ClaimForWorkerResponse>()
185                .await
186                .map(Some)
187                .map_err(|e| SdkError::Http {
188                    source: e,
189                    context: "decode claim_for_worker response body".into(),
190                });
191        }
192
193        // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
194        let status_u16 = status.as_u16();
195        let raw = resp.text().await.map_err(|e| SdkError::Http {
196            source: e,
197            context: format!("read claim_for_worker error body (status {status_u16})"),
198        })?;
199        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
200        Err(SdkError::AdminApi {
201            status: status_u16,
202            message: parsed
203                .as_ref()
204                .map(|b| b.error.clone())
205                .unwrap_or_else(|| raw.clone()),
206            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
207            retryable: parsed.as_ref().and_then(|b| b.retryable),
208            raw_body: raw,
209        })
210    }
211
212    /// Rotate the waitpoint HMAC secret on the server.
213    ///
214    /// Promotes the currently-installed kid to `previous_kid`
215    /// (accepted for the server's configured
216    /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
217    /// `new_secret_hex` under `new_kid` as the new current. Fans
218    /// out across every execution partition. Idempotent: re-running
219    /// with the same `(new_kid, new_secret_hex)` converges.
220    ///
221    /// The server returns 200 if at least one partition rotated OR
222    /// at least one partition was already rotating under a
223    /// concurrent request. See `RotateWaitpointSecretResponse`
224    /// fields for the breakdown.
225    ///
226    /// # Errors
227    ///
228    /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
229    ///   input, 401 missing/bad bearer, 429 concurrent rotate,
230    ///   500 all partitions failed, 504 server-side timeout).
231    /// * [`SdkError::Http`] — transport error (connect, body
232    ///   decode, client-side timeout).
233    ///
234    /// # Retry semantics
235    ///
236    /// Rotation is idempotent on the same `(new_kid,
237    /// new_secret_hex)` so retries are SAFE even on 504s or
238    /// partial failures.
239    pub async fn rotate_waitpoint_secret(
240        &self,
241        req: RotateWaitpointSecretRequest,
242    ) -> Result<RotateWaitpointSecretResponse, SdkError> {
243        let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
244        let resp = self
245            .http
246            .post(&url)
247            .json(&req)
248            .send()
249            .await
250            .map_err(|e| SdkError::Http {
251                source: e,
252                context: "POST /v1/admin/rotate-waitpoint-secret".into(),
253            })?;
254
255        let status = resp.status();
256        if status.is_success() {
257            return resp
258                .json::<RotateWaitpointSecretResponse>()
259                .await
260                .map_err(|e| SdkError::Http {
261                    source: e,
262                    context: "decode rotate-waitpoint-secret response body".into(),
263                });
264        }
265
266        // Non-2xx: parse the server's ErrorBody if we can, fall
267        // back to a raw body otherwise. Propagate body-read
268        // transport errors as Http rather than silently flattening
269        // them into `AdminApi { raw_body: "" }` — a connection drop
270        // mid-body-read is a transport fault, not an API-layer
271        // reject, and misclassifying it strips `is_retryable`'s
272        // timeout/connect signal from the caller.
273        let status_u16 = status.as_u16();
274        let raw = resp.text().await.map_err(|e| SdkError::Http {
275            source: e,
276            context: format!(
277                "read rotate-waitpoint-secret error response body (status {status_u16})"
278            ),
279        })?;
280        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
281        Err(SdkError::AdminApi {
282            status: status_u16,
283            message: parsed
284                .as_ref()
285                .map(|b| b.error.clone())
286                .unwrap_or_else(|| raw.clone()),
287            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
288            retryable: parsed.as_ref().and_then(|b| b.retryable),
289            raw_body: raw,
290        })
291    }
292
293    /// Request a lease-reclaim grant for an execution in
294    /// `lease_expired_reclaimable` or `lease_revoked` state (RFC-024
295    /// §3.5).
296    ///
297    /// Routes `POST /v1/executions/{execution_id}/reclaim`. The
298    /// ff-server handler dispatches through the `EngineBackend` trait
299    /// to whichever backend the server was started with (Valkey /
300    /// Postgres / SQLite).
301    ///
302    /// # worker_capabilities (RFC-024 §3.2 B-2)
303    ///
304    /// The request body carries `worker_capabilities`. Consumers typically
305    /// source these from their registered worker's configured
306    /// `WorkerConfig::capabilities`. Admission compares
307    /// `worker_capabilities` against the execution's
308    /// `required_capabilities` (persisted on `exec_core` at
309    /// `create_execution` time from
310    /// `ExecutionPolicy.routing_requirements.required_capabilities`);
311    /// any required capability missing from the worker set surfaces as
312    /// `IssueReclaimGrantResponse::NotReclaimable { detail:
313    /// "capability_mismatch: <missing csv>" }` (Lua
314    /// `ff_issue_reclaim_grant`, `crates/ff-script/src/flowfabric.lua`
315    /// §3969-3982; sqlite/PG backends mirror the check). The SDK does
316    /// not re-read worker state automatically — admin clients are not
317    /// bound to a worker — so the consumer threads the capabilities
318    /// through at call-time.
319    ///
320    /// `capability_hash` is NOT consulted for admission; it is stored
321    /// verbatim on the grant hash for audit / downstream observability
322    /// only.
323    ///
324    /// # Consumer flow (RFC-024 §4.4)
325    ///
326    /// 1. Consumer's `POST /v1/runs/:id/complete` returns
327    ///    `lease_expired`.
328    /// 2. Consumer calls this method; handles
329    ///    [`IssueReclaimGrantResponse::Granted`] → builds a
330    ///    [`ff_core::contracts::ReclaimGrant`] via
331    ///    [`IssueReclaimGrantResponse::into_grant`].
332    /// 3. Consumer passes the grant to
333    ///    [`crate::FlowFabricWorker::claim_from_reclaim_grant`] along
334    ///    with a fresh [`ff_core::contracts::ReclaimExecutionArgs`];
335    ///    the new attempt is minted with `HandleKind::Reclaimed`.
336    /// 4. Consumer drives terminal writes on the fresh lease.
337    ///
338    /// # Errors
339    ///
340    /// * [`SdkError::AdminApi`] — non-2xx response. 404 when the
341    ///   execution does not exist; 400 on malformed `execution_id` or
342    ///   body.
343    /// * [`SdkError::Http`] — transport error (connect, body
344    ///   decode, client-side timeout).
345    ///
346    /// # Retry semantics
347    ///
348    /// Idempotent on the Lua side: repeated calls against an execution
349    /// already re-leased (a concurrent reclaim beat this one) surface
350    /// as `NotReclaimable`. Safe to retry on transport faults.
351    pub async fn issue_reclaim_grant(
352        &self,
353        execution_id: &str,
354        req: IssueReclaimGrantRequest,
355    ) -> Result<IssueReclaimGrantResponse, SdkError> {
356        // Percent-encode `execution_id` in the URL path — the id is a
357        // free-form string and splicing verbatim would produce
358        // malformed URLs. Mirrors `claim_for_worker`'s handling.
359        let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| SdkError::Config {
360            context: "admin_client: issue_reclaim_grant".into(),
361            field: Some("base_url".into()),
362            message: format!("invalid base_url '{}': {e}", self.base_url),
363        })?;
364        {
365            let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
366                context: "admin_client: issue_reclaim_grant".into(),
367                field: Some("base_url".into()),
368                message: format!("base_url cannot be a base URL: '{}'", self.base_url),
369            })?;
370            segs.extend(&["v1", "executions", execution_id, "reclaim"]);
371        }
372        let url = url.to_string();
373        let resp = self
374            .http
375            .post(&url)
376            .json(&req)
377            .send()
378            .await
379            .map_err(|e| SdkError::Http {
380                source: e,
381                context: "POST /v1/executions/{id}/reclaim".into(),
382            })?;
383
384        let status = resp.status();
385        if status.is_success() {
386            return resp
387                .json::<IssueReclaimGrantResponse>()
388                .await
389                .map_err(|e| SdkError::Http {
390                    source: e,
391                    context: "decode issue_reclaim_grant response body".into(),
392                });
393        }
394
395        let status_u16 = status.as_u16();
396        let raw = resp.text().await.map_err(|e| SdkError::Http {
397            source: e,
398            context: format!(
399                "read issue_reclaim_grant error body (status {status_u16})"
400            ),
401        })?;
402        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
403        Err(SdkError::AdminApi {
404            status: status_u16,
405            message: parsed
406                .as_ref()
407                .map(|b| b.error.clone())
408                .unwrap_or_else(|| raw.clone()),
409            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
410            retryable: parsed.as_ref().and_then(|b| b.retryable),
411            raw_body: raw,
412        })
413    }
414}
415
416/// Request body for `POST /v1/executions/{execution_id}/reclaim`
417/// (RFC-024 §3.5).
418///
419/// Mirrors `ff_server::api::IssueReclaimGrantBody` 1:1. The
420/// `execution_id` goes in the URL path, not the body.
421#[derive(Debug, Clone, Serialize)]
422pub struct IssueReclaimGrantRequest {
423    /// Worker identity requesting the grant. The Lua
424    /// `ff_reclaim_execution` validates grant consumption via
425    /// `grant.worker_id == args.worker_id` (RFC-024 §4.4) — the
426    /// worker consuming the grant must match this value.
427    pub worker_id: String,
428    /// Worker-instance identity. Informational at grant-issuance
429    /// time; stored on the grant so consumers can correlate events.
430    pub worker_instance_id: String,
431    /// Lane the execution belongs to. Needed by
432    /// `ff_issue_reclaim_grant` for `KEYS[*]` construction.
433    pub lane_id: String,
434    /// Opaque capability-hash token stored verbatim on the issued
435    /// grant for audit / downstream observability. NOT used for
436    /// admission — admission compares `worker_capabilities` against
437    /// the execution's `required_capabilities` (see the
438    /// [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc).
439    /// `None` leaves the field empty on the grant.
440    #[serde(default, skip_serializing_if = "Option::is_none")]
441    pub capability_hash: Option<String>,
442    /// Grant TTL in milliseconds. Bounded server-side.
443    pub grant_ttl_ms: u64,
444    /// Route snapshot JSON carried onto the grant for audit.
445    #[serde(default, skip_serializing_if = "Option::is_none")]
446    pub route_snapshot_json: Option<String>,
447    /// Admission summary string carried onto the grant for audit.
448    #[serde(default, skip_serializing_if = "Option::is_none")]
449    pub admission_summary: Option<String>,
450    /// Worker capability tokens. Consumers typically source these
451    /// from their registered worker's `WorkerConfig::capabilities`
452    /// (see [`FlowFabricAdminClient::issue_reclaim_grant`] rustdoc
453    /// for the override contract).
454    #[serde(default)]
455    pub worker_capabilities: Vec<String>,
456}
457
458/// Response body for `POST /v1/executions/{execution_id}/reclaim`
459/// (RFC-024 §3.5).
460///
461/// The server serializes this struct with a `status` discriminator so
462/// consumers can match on structured outcomes without re-parsing a
463/// 200-vs-4xx split for business-logic outcomes (mirrors
464/// `RotateWaitpointSecretResponse`'s precedent).
465#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
466#[serde(tag = "status", rename_all = "snake_case")]
467pub enum IssueReclaimGrantResponse {
468    /// Grant issued. Build a
469    /// [`ff_core::contracts::ReclaimGrant`] via
470    /// [`Self::into_grant`] and feed it to
471    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
472    Granted {
473        execution_id: String,
474        partition_key: ff_core::partition::PartitionKey,
475        grant_key: String,
476        expires_at_ms: u64,
477        lane_id: String,
478    },
479    /// Execution is not in a reclaimable state (not
480    /// `lease_expired_reclaimable` / `lease_revoked`).
481    NotReclaimable {
482        execution_id: String,
483        detail: String,
484    },
485    /// `max_reclaim_count` exceeded; execution transitioned to
486    /// terminal_failed. Consumers stop retrying and surface a
487    /// structural failure.
488    ReclaimCapExceeded {
489        execution_id: String,
490        reclaim_count: u32,
491    },
492}
493
494impl IssueReclaimGrantResponse {
495    /// Convert a [`Self::Granted`] response into a typed
496    /// [`ff_core::contracts::ReclaimGrant`] for handoff to
497    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`].
498    ///
499    /// Returns [`SdkError::AdminApi`] when the wire variant is not
500    /// `Granted` (consumer asked for a grant but the server replied
501    /// with a terminal outcome) or when `execution_id` / `lane_id`
502    /// are malformed — the latter signals a drift between server and
503    /// SDK, so failing loud prevents silent misrouting.
504    pub fn into_grant(self) -> Result<ff_core::contracts::ReclaimGrant, SdkError> {
505        match self {
506            IssueReclaimGrantResponse::Granted {
507                execution_id,
508                partition_key,
509                grant_key,
510                expires_at_ms,
511                lane_id,
512            } => {
513                let eid = ff_core::types::ExecutionId::parse(&execution_id)
514                    .map_err(|e| SdkError::AdminApi {
515                        status: 200,
516                        message: format!(
517                            "issue_reclaim_grant: server returned malformed execution_id '{execution_id}': {e}"
518                        ),
519                        kind: Some("malformed_response".to_owned()),
520                        retryable: Some(false),
521                        raw_body: String::new(),
522                    })?;
523                let lane = ff_core::types::LaneId::try_new(lane_id.clone())
524                    .map_err(|e| SdkError::AdminApi {
525                        status: 200,
526                        message: format!(
527                            "issue_reclaim_grant: server returned malformed lane_id '{lane_id}': {e}"
528                        ),
529                        kind: Some("malformed_response".to_owned()),
530                        retryable: Some(false),
531                        raw_body: String::new(),
532                    })?;
533                Ok(ff_core::contracts::ReclaimGrant::new(
534                    eid,
535                    partition_key,
536                    grant_key,
537                    expires_at_ms,
538                    lane,
539                ))
540            }
541            IssueReclaimGrantResponse::NotReclaimable { execution_id, detail } => {
542                Err(SdkError::AdminApi {
543                    status: 200,
544                    message: format!(
545                        "issue_reclaim_grant: execution '{execution_id}' not reclaimable: {detail}"
546                    ),
547                    kind: Some("not_reclaimable".to_owned()),
548                    retryable: Some(false),
549                    raw_body: String::new(),
550                })
551            }
552            IssueReclaimGrantResponse::ReclaimCapExceeded {
553                execution_id,
554                reclaim_count,
555            } => Err(SdkError::AdminApi {
556                status: 200,
557                message: format!(
558                    "issue_reclaim_grant: execution '{execution_id}' hit reclaim cap ({reclaim_count})"
559                ),
560                kind: Some("reclaim_cap_exceeded".to_owned()),
561                retryable: Some(false),
562                raw_body: String::new(),
563            }),
564        }
565    }
566}
567
568/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
569///
570/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
571#[derive(Debug, Clone, Serialize)]
572pub struct RotateWaitpointSecretRequest {
573    /// New key identifier. Non-empty, must not contain `:` (the
574    /// server uses `:` as the field separator in the secret hash).
575    pub new_kid: String,
576    /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
577    pub new_secret_hex: String,
578}
579
580/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
581///
582/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
583/// The server serializes this struct as-is via `Json(result)`.
584#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
585pub struct RotateWaitpointSecretResponse {
586    /// Count of partitions that accepted the rotation.
587    pub rotated: u16,
588    /// Partition indices where the rotation failed — operator
589    /// should investigate. Rotation is idempotent on the same
590    /// `(new_kid, new_secret_hex)` so a retry after the underlying
591    /// fault clears converges.
592    pub failed: Vec<u16>,
593    /// The `new_kid` that was installed as current on every
594    /// rotated partition — echoes the request field back for
595    /// confirmation.
596    pub new_kid: String,
597}
598
599/// Server-side error body shape, as emitted by
600/// `ff_server::api::ErrorBody`. Kept internal because consumers
601/// match on the flattened fields of [`SdkError::AdminApi`].
602#[derive(Debug, Clone, Deserialize)]
603struct AdminErrorBody {
604    error: String,
605    #[serde(default)]
606    kind: Option<String>,
607    #[serde(default)]
608    retryable: Option<bool>,
609}
610
611/// Request body for `POST /v1/workers/{worker_id}/claim`.
612///
613/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
614/// goes in the URL path (not the body) but is kept on the struct
615/// for ergonomics — callers don't juggle a separate arg.
616#[derive(Debug, Clone, Serialize)]
617pub struct ClaimForWorkerRequest {
618    #[serde(skip)]
619    pub worker_id: String,
620    pub lane_id: String,
621    pub worker_instance_id: String,
622    #[serde(default)]
623    pub capabilities: Vec<String>,
624    /// Grant TTL in milliseconds. Server rejects 0 or anything over
625    /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
626    pub grant_ttl_ms: u64,
627}
628
629/// Response body for `POST /v1/workers/{worker_id}/claim`.
630///
631/// Wire shape of `ff_core::contracts::ClaimGrant`. Carries the opaque
632/// [`ff_core::partition::PartitionKey`] directly on the wire (issue
633/// #91); the SDK reconstructs the core type via [`Self::into_grant`].
634#[derive(Debug, Clone, Deserialize)]
635pub struct ClaimForWorkerResponse {
636    pub execution_id: String,
637    pub partition_key: ff_core::partition::PartitionKey,
638    pub grant_key: String,
639    pub expires_at_ms: u64,
640}
641
642impl ClaimForWorkerResponse {
643    /// Convert the wire DTO into a typed
644    /// [`ff_core::contracts::ClaimGrant`] for handoff to
645    /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
646    /// [`SdkError::AdminApi`] on malformed execution_id — a drift
647    /// signal that the server and SDK disagree on the wire shape, so
648    /// failing loud prevents routing to a ghost partition.
649    ///
650    /// The `partition_key` itself is not eagerly parsed here: it is
651    /// carried opaquely to the `claim_from_grant` hot path, which
652    /// parses it there and surfaces a typed error on malformed keys.
653    pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
654        let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
655            .map_err(|e| SdkError::AdminApi {
656                status: 200,
657                message: format!(
658                    "claim_for_worker: server returned malformed execution_id '{}': {e}",
659                    self.execution_id
660                ),
661                kind: Some("malformed_response".to_owned()),
662                retryable: Some(false),
663                raw_body: String::new(),
664            })?;
665        Ok(ff_core::contracts::ClaimGrant::new(
666            execution_id,
667            self.partition_key,
668            self.grant_key,
669            self.expires_at_ms,
670        ))
671    }
672}
673
674/// Per-partition outcome of a cluster-wide waitpoint HMAC secret
675/// rotation. Returned by [`rotate_waitpoint_hmac_secret_all_partitions`]
676/// so operators can audit which partitions rotated vs. no-op'd vs. failed.
677///
678/// The index is the execution-partition index (`0..num_partitions`),
679/// matching `{fp:N}` in the keyspace.
680#[derive(Debug)]
681pub struct PartitionRotationOutcome {
682    /// Execution partition index (`0..num_partitions`).
683    pub partition: u16,
684    /// FCALL outcome on this partition, or the error it raised.
685    pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
686}
687
688/// Rotate the waitpoint HMAC secret across every execution partition
689/// by fanning out the `ff_rotate_waitpoint_hmac_secret` FCALL.
690///
691/// This is the canonical Rust-side rotation path for direct-Valkey
692/// consumers (e.g. cairn-fabric) that cannot route through the
693/// `ff-server` admin REST endpoint. Callers who have an HTTP-reachable
694/// `ff-server` should prefer [`FlowFabricAdminClient::rotate_waitpoint_secret`] —
695/// that path adds a single-writer admission gate, parallel fan-out,
696/// structured audit events, and the server's configured grace window.
697///
698/// # Production rotation recipe
699///
700/// Operators MUST coordinate so secret rotation **precedes** any
701/// waitpoint resolution that will present the new `kid`. The broad
702/// sequence:
703///
704/// 1. Pick a fresh `new_kid` (must NOT contain `:` — the server uses
705///    `:` as the field separator in the secret hash).
706/// 2. Call this helper with the previous `kid`'s grace window
707///    (`grace_ms` — the duration during which tokens signed by the
708///    outgoing secret remain valid).
709/// 3. Only after this call returns with all partitions `Ok(_)` (either
710///    `Rotated` or `Noop`), begin signing new tokens with `new_kid`.
711/// 4. Retain the previous secret in the keystore until the grace
712///    window elapses — the FCALL handles GC of expired kids on every
713///    rotation, so just don't rotate again before the grace window.
714///
715/// See RFC-004 §rotation for the full 4-key HSET + `previous_expires_at`
716/// dance the FCALL implements server-side.
717///
718/// # Idempotency
719///
720/// Each partition FCALL is idempotent on the same `(new_kid,
721/// new_secret_hex)` pair: a replay with identical args returns
722/// `RotateWaitpointHmacSecretOutcome::Noop`. A same-kid-different-secret
723/// replay surfaces as a per-partition `SdkError` (wrapping
724/// `ScriptError::RotationConflict`) — pick a fresh `new_kid` to recover.
725///
726/// # Error semantics
727///
728/// A per-partition FCALL failure (transport fault, rotation conflict,
729/// etc.) is recorded on that partition's [`PartitionRotationOutcome`]
730/// and fan-out **continues** — the contract matches the server's
731/// `rotate_waitpoint_secret` (partial success is allowed, operators
732/// retry on the failed partition subset). Returning `Vec<_>` (not
733/// `Result<Vec<_>, _>`) is deliberate: every whole-call invariant is
734/// enforced by the underlying FCALL on each partition (kid non-empty,
735/// no `:`, even-length hex, etc.), so the aggregate has nothing left
736/// to reject at the Rust boundary. Callers decide how to treat partial
737/// failures (fail loud / retry the subset / record metrics).
738///
739/// # Concurrency + performance
740///
741/// Sequential (one partition at a time) to keep the helper dependency-
742/// free: no `futures::stream` / tokio-specific primitives on the caller
743/// path. For a cluster with N partitions and per-partition RTT R, the
744/// total duration is ~N*R. Consumers needing parallel fan-out should
745/// wrap this with `FuturesUnordered` themselves, or use the server
746/// admin endpoint (which fans out with bounded concurrency = 16).
747///
748/// # Test harness
749///
750/// The `ff-test::fixtures::TestCluster::rotate_waitpoint_hmac_secret`
751/// method is a thin wrapper around this helper — integration tests and
752/// production code exercise the same code path.
753///
754/// # Example
755///
756/// ```rust,ignore
757/// use ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions;
758///
759/// let results = rotate_waitpoint_hmac_secret_all_partitions(
760///     &client,
761///     partition_config.num_flow_partitions,
762///     "kid-2026-04-22",
763///     "deadbeef...64-hex-chars...",
764///     60_000,
765/// )
766/// .await?;
767///
768/// for entry in &results {
769///     match &entry.result {
770///         Ok(outcome) => tracing::info!(partition = entry.partition, ?outcome, "rotated"),
771///         Err(e) => tracing::error!(partition = entry.partition, %e, "rotation failed"),
772///     }
773/// }
774/// ```
775// v0.12 PR-6: the `admin` module is ungated at module level so
776// consumers under `--no-default-features --features sqlite` can reach
777// the HTTP admin client surface. This helper is the one remaining
778// Valkey-typed item in the module (takes a `&ferriskey::Client` and
779// fans out `ff_rotate_waitpoint_hmac_secret` FCALLs), so it stays
780// `valkey-default`-gated. See `lib.rs` PR-6 comment for the Option 1
781// / Option 2 decision.
782#[cfg(feature = "valkey-default")]
783pub async fn rotate_waitpoint_hmac_secret_all_partitions(
784    client: &ferriskey::Client,
785    num_partitions: u16,
786    new_kid: &str,
787    new_secret_hex: &str,
788    grace_ms: u64,
789) -> Vec<PartitionRotationOutcome> {
790    // Hoisted out of the loop — `ff_rotate_waitpoint_hmac_secret` only
791    // borrows the args, so every partition can reuse the same struct.
792    // Avoids N × 2 string clones on the hot fan-out path.
793    let args = RotateWaitpointHmacSecretArgs {
794        new_kid: new_kid.to_owned(),
795        new_secret_hex: new_secret_hex.to_owned(),
796        grace_ms,
797    };
798    let mut out = Vec::with_capacity(num_partitions as usize);
799    for index in 0..num_partitions {
800        let partition = Partition {
801            family: PartitionFamily::Execution,
802            index,
803        };
804        let idx = IndexKeys::new(&partition);
805        let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
806            client, &idx, &args,
807        )
808        .await
809        .map_err(SdkError::from);
810        out.push(PartitionRotationOutcome {
811            partition: index,
812            result,
813        });
814    }
815    out
816}
817
818/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
819/// never produces `https://host//v1/...`. Mirror of
820/// media-pipeline's pattern.
821fn normalize_base_url(mut url: String) -> String {
822    while url.ends_with('/') {
823        url.pop();
824    }
825    url
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831
832    #[test]
833    fn base_url_strips_trailing_slash() {
834        assert_eq!(normalize_base_url("http://x".into()), "http://x");
835        assert_eq!(normalize_base_url("http://x/".into()), "http://x");
836        assert_eq!(normalize_base_url("http://x///".into()), "http://x");
837    }
838
839    #[test]
840    fn with_token_rejects_bad_header_chars() {
841        // Raw newline in the token would split the Authorization
842        // header — must fail loudly at construction.
843        let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
844        assert!(
845            matches!(err, SdkError::Config { .. }),
846            "got: {err:?}"
847        );
848    }
849
850    #[test]
851    fn with_token_rejects_empty_or_whitespace() {
852        // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
853        // Fail loudly at construction instead of shipping a client
854        // that silently 401s on first request.
855        for s in ["", " ", "\t\n ", "   "] {
856            let err = FlowFabricAdminClient::with_token("http://x", s)
857                .unwrap_err();
858            assert!(
859                matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
860                "token {s:?} should return Config with field=bearer_token; got: {err:?}"
861            );
862        }
863    }
864
865    #[test]
866    fn admin_error_body_deserialises_optional_fields() {
867        // `kind` + `retryable` absent (the usual shape for 400s).
868        let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
869        assert_eq!(b.error, "bad new_kid");
870        assert!(b.kind.is_none());
871        assert!(b.retryable.is_none());
872
873        // `kind` + `retryable` present (500 ValkeyError shape).
874        let b: AdminErrorBody = serde_json::from_str(
875            r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
876        )
877        .unwrap();
878        assert_eq!(b.error, "valkey: timed out");
879        assert_eq!(b.kind.as_deref(), Some("IoError"));
880        assert_eq!(b.retryable, Some(true));
881    }
882
883    #[test]
884    fn rotate_response_deserialises_server_shape() {
885        // Exact shape the server emits.
886        let raw = r#"{
887            "rotated": 3,
888            "failed": [4, 5],
889            "new_kid": "kid-2026-04-18"
890        }"#;
891        let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
892        assert_eq!(r.rotated, 3);
893        assert_eq!(r.failed, vec![4, 5]);
894        assert_eq!(r.new_kid, "kid-2026-04-18");
895    }
896
897    // ── ClaimForWorkerResponse::into_grant ──
898
899    fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
900        ClaimForWorkerResponse {
901            execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
902            partition_key: serde_json::from_str(
903                &serde_json::to_string(partition_key).unwrap(),
904            )
905            .unwrap(),
906            grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
907            expires_at_ms: 1_700_000_000_000,
908        }
909    }
910
911    #[test]
912    fn into_grant_preserves_all_known_partition_key_shapes() {
913        // Post-#91: families collapse into opaque PartitionKey literals.
914        // Flow and Execution both produce "{fp:N}"; Budget is "{b:N}";
915        // Quota is "{q:N}". The DTO preserves the wire string as-is;
916        // into_grant hands it opaquely to the core type.
917        for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
918            let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
919                panic!("key {key_str} should parse: {e:?}")
920            });
921            assert_eq!(g.partition_key.as_str(), key_str);
922            assert_eq!(g.expires_at_ms, 1_700_000_000_000);
923        }
924    }
925
926    #[test]
927    fn into_grant_preserves_opaque_partition_key() {
928        // The SDK does NOT eagerly parse the partition_key on the
929        // admin boundary — malformed keys are caught at the
930        // claim_from_grant hot path where the typed Partition is
931        // actually needed. This test pins the opacity contract.
932        let resp = sample_claim_response("{zz:0}");
933        let g = resp.into_grant().expect("SDK must not parse partition_key");
934        assert_eq!(g.partition_key.as_str(), "{zz:0}");
935        // Parsing surfaces the error explicitly.
936        assert!(g.partition().is_err());
937    }
938
939    #[test]
940    fn into_grant_rejects_malformed_execution_id() {
941        let mut resp = sample_claim_response("{fp:5}");
942        resp.execution_id = "not-a-valid-eid".to_owned();
943        let err = resp.into_grant().unwrap_err();
944        match err {
945            SdkError::AdminApi { message, kind, .. } => {
946                assert!(message.contains("malformed execution_id"),
947                    "msg: {message}");
948                assert_eq!(kind.as_deref(), Some("malformed_response"));
949            }
950            other => panic!("expected AdminApi, got {other:?}"),
951        }
952    }
953
954    // ── ClaimForWorkerResponse wire shape (issue #91) ──
955
956    // `rotate_waitpoint_hmac_secret_all_partitions` exercise coverage
957    // lives in `ff-test` — the integration test harness in
958    // `crates/ff-test/tests/waitpoint_hmac_rotation_fcall.rs` and
959    // `waitpoint_tokens.rs` calls through the function via the
960    // `TestCluster::rotate_waitpoint_hmac_secret` fixture, which is
961    // now a thin delegator. A pure unit test here would require a
962    // mock `ferriskey::Client` (ferriskey's `Client` performs a live
963    // RESP handshake on `ClientBuilder::build`, so a local TCP
964    // listener alone isn't sufficient) — expensive to construct for
965    // one-line iteration-count coverage.
966
967    #[test]
968    fn claim_for_worker_response_deserialises_opaque_partition_key() {
969        // Exact shape the server emits post-#91.
970        let raw = r#"{
971            "execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
972            "partition_key": "{fp:7}",
973            "grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
974            "expires_at_ms": 1700000000000
975        }"#;
976        let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
977        assert_eq!(r.partition_key.as_str(), "{fp:7}");
978        assert_eq!(r.expires_at_ms, 1_700_000_000_000);
979    }
980}