Skip to main content

ff_sdk/
admin.rs

1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20
21use crate::SdkError;
22
23/// Default per-request timeout. The server's own
24/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
25/// client deadline is LATER than the server deadline and
26/// operators see the structured 504 GATEWAY_TIMEOUT body rather
27/// than a client-side timeout error.
28const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
29
30/// Client for the `ff-server` admin REST surface.
31///
32/// Construct via [`FlowFabricAdminClient::new`] (no auth) or
33/// [`FlowFabricAdminClient::with_token`] (Bearer auth). Both
34/// return a ready-to-use client backed by a single pooled
35/// `reqwest::Client` — reuse the instance across calls instead of
36/// building one per request.
37#[derive(Debug, Clone)]
38pub struct FlowFabricAdminClient {
39    http: reqwest::Client,
40    base_url: String,
41}
42
43impl FlowFabricAdminClient {
44    /// Build a client without auth. Suitable for a dev ff-server
45    /// whose `api_token` is unconfigured. Production deployments
46    /// should use [`with_token`](Self::with_token).
47    pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
48        let http = reqwest::Client::builder()
49            .timeout(DEFAULT_TIMEOUT)
50            .build()
51            .map_err(|e| SdkError::Http {
52                source: e,
53                context: "build reqwest::Client".into(),
54            })?;
55        Ok(Self {
56            http,
57            base_url: normalize_base_url(base_url.into()),
58        })
59    }
60
61    /// Build a client that sends `Authorization: Bearer <token>` on
62    /// every request. The token is passed by value so the caller
63    /// retains ownership policy (e.g. zeroize on drop at the
64    /// caller side); the SDK only reads it.
65    ///
66    /// # Empty-token guard
67    ///
68    /// An empty or all-whitespace `token` returns
69    /// [`SdkError::Config`] instead of silently constructing
70    /// `Authorization: Bearer ` (which the server rejects with
71    /// 401, leaving the operator chasing a "why is auth broken"
72    /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
73    /// where the var was meant to be set; the unset-expansion is
74    /// the empty string. Prefer an obvious error at construction
75    /// over a silent 401 at first request.
76    ///
77    /// If the caller genuinely wants an unauthenticated client
78    /// (dev ff-server without `api_token` configured), use
79    /// [`FlowFabricAdminClient::new`] instead.
80    pub fn with_token(
81        base_url: impl Into<String>,
82        token: impl AsRef<str>,
83    ) -> Result<Self, SdkError> {
84        let token_str = token.as_ref();
85        if token_str.trim().is_empty() {
86            return Err(SdkError::Config(
87                "bearer token is empty or all-whitespace; use \
88                 FlowFabricAdminClient::new for unauthenticated access"
89                    .into(),
90            ));
91        }
92        let mut headers = reqwest::header::HeaderMap::new();
93        let mut auth_value =
94            reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
95                |_| {
96                    SdkError::Config(
97                        "bearer token contains characters not valid in an HTTP header".into(),
98                    )
99                },
100            )?;
101        // Mark Authorization as sensitive so it doesn't appear in
102        // reqwest's Debug output / logs.
103        auth_value.set_sensitive(true);
104        headers.insert(reqwest::header::AUTHORIZATION, auth_value);
105
106        let http = reqwest::Client::builder()
107            .timeout(DEFAULT_TIMEOUT)
108            .default_headers(headers)
109            .build()
110            .map_err(|e| SdkError::Http {
111                source: e,
112                context: "build reqwest::Client".into(),
113            })?;
114        Ok(Self {
115            http,
116            base_url: normalize_base_url(base_url.into()),
117        })
118    }
119
120    /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
121    ///
122    /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
123    /// server-side one: the request carries lane + identity +
124    /// capabilities + grant TTL; the server runs budget, quota, and
125    /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
126    /// and returns a `ClaimGrant` on success.
127    ///
128    /// Returns `Ok(None)` when the server responds 204 No Content
129    /// (no eligible execution on the lane). Callers that want to keep
130    /// polling should back off per their claim cadence.
131    pub async fn claim_for_worker(
132        &self,
133        req: ClaimForWorkerRequest,
134    ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
135        // Percent-encode `worker_id` in the URL path — `WorkerId` is a
136        // free-form string (could contain `/`, spaces, `%`, etc.) and
137        // splicing it verbatim would produce malformed URLs or
138        // misrouted paths. `Url::path_segments_mut().push` handles the
139        // encoding natively.
140        let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| {
141            SdkError::Config(format!("invalid base_url '{}': {e}", self.base_url))
142        })?;
143        {
144            let mut segs = url.path_segments_mut().map_err(|_| {
145                SdkError::Config(format!(
146                    "base_url cannot be a base URL: '{}'",
147                    self.base_url
148                ))
149            })?;
150            segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
151        }
152        let url = url.to_string();
153        let resp = self
154            .http
155            .post(&url)
156            .json(&req)
157            .send()
158            .await
159            .map_err(|e| SdkError::Http {
160                source: e,
161                context: "POST /v1/workers/{worker_id}/claim".into(),
162            })?;
163
164        let status = resp.status();
165        if status == reqwest::StatusCode::NO_CONTENT {
166            return Ok(None);
167        }
168        if status.is_success() {
169            return resp
170                .json::<ClaimForWorkerResponse>()
171                .await
172                .map(Some)
173                .map_err(|e| SdkError::Http {
174                    source: e,
175                    context: "decode claim_for_worker response body".into(),
176                });
177        }
178
179        // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
180        let status_u16 = status.as_u16();
181        let raw = resp.text().await.map_err(|e| SdkError::Http {
182            source: e,
183            context: format!("read claim_for_worker error body (status {status_u16})"),
184        })?;
185        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
186        Err(SdkError::AdminApi {
187            status: status_u16,
188            message: parsed
189                .as_ref()
190                .map(|b| b.error.clone())
191                .unwrap_or_else(|| raw.clone()),
192            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
193            retryable: parsed.as_ref().and_then(|b| b.retryable),
194            raw_body: raw,
195        })
196    }
197
198    /// Rotate the waitpoint HMAC secret on the server.
199    ///
200    /// Promotes the currently-installed kid to `previous_kid`
201    /// (accepted for the server's configured
202    /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
203    /// `new_secret_hex` under `new_kid` as the new current. Fans
204    /// out across every execution partition. Idempotent: re-running
205    /// with the same `(new_kid, new_secret_hex)` converges.
206    ///
207    /// The server returns 200 if at least one partition rotated OR
208    /// at least one partition was already rotating under a
209    /// concurrent request. See `RotateWaitpointSecretResponse`
210    /// fields for the breakdown.
211    ///
212    /// # Errors
213    ///
214    /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
215    ///   input, 401 missing/bad bearer, 429 concurrent rotate,
216    ///   500 all partitions failed, 504 server-side timeout).
217    /// * [`SdkError::Http`] — transport error (connect, body
218    ///   decode, client-side timeout).
219    ///
220    /// # Retry semantics
221    ///
222    /// Rotation is idempotent on the same `(new_kid,
223    /// new_secret_hex)` so retries are SAFE even on 504s or
224    /// partial failures.
225    pub async fn rotate_waitpoint_secret(
226        &self,
227        req: RotateWaitpointSecretRequest,
228    ) -> Result<RotateWaitpointSecretResponse, SdkError> {
229        let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
230        let resp = self
231            .http
232            .post(&url)
233            .json(&req)
234            .send()
235            .await
236            .map_err(|e| SdkError::Http {
237                source: e,
238                context: "POST /v1/admin/rotate-waitpoint-secret".into(),
239            })?;
240
241        let status = resp.status();
242        if status.is_success() {
243            return resp
244                .json::<RotateWaitpointSecretResponse>()
245                .await
246                .map_err(|e| SdkError::Http {
247                    source: e,
248                    context: "decode rotate-waitpoint-secret response body".into(),
249                });
250        }
251
252        // Non-2xx: parse the server's ErrorBody if we can, fall
253        // back to a raw body otherwise. Propagate body-read
254        // transport errors as Http rather than silently flattening
255        // them into `AdminApi { raw_body: "" }` — a connection drop
256        // mid-body-read is a transport fault, not an API-layer
257        // reject, and misclassifying it strips `is_retryable`'s
258        // timeout/connect signal from the caller.
259        let status_u16 = status.as_u16();
260        let raw = resp.text().await.map_err(|e| SdkError::Http {
261            source: e,
262            context: format!(
263                "read rotate-waitpoint-secret error response body (status {status_u16})"
264            ),
265        })?;
266        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
267        Err(SdkError::AdminApi {
268            status: status_u16,
269            message: parsed
270                .as_ref()
271                .map(|b| b.error.clone())
272                .unwrap_or_else(|| raw.clone()),
273            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
274            retryable: parsed.as_ref().and_then(|b| b.retryable),
275            raw_body: raw,
276        })
277    }
278}
279
280/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
281///
282/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
283#[derive(Debug, Clone, Serialize)]
284pub struct RotateWaitpointSecretRequest {
285    /// New key identifier. Non-empty, must not contain `:` (the
286    /// server uses `:` as the field separator in the secret hash).
287    pub new_kid: String,
288    /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
289    pub new_secret_hex: String,
290}
291
292/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
293///
294/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
295/// The server serializes this struct as-is via `Json(result)`.
296#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
297pub struct RotateWaitpointSecretResponse {
298    /// Count of partitions that accepted the rotation.
299    pub rotated: u16,
300    /// Partition indices where the rotation failed — operator
301    /// should investigate. Rotation is idempotent on the same
302    /// `(new_kid, new_secret_hex)` so a retry after the underlying
303    /// fault clears converges.
304    pub failed: Vec<u16>,
305    /// Partition indices where another rotation was already in
306    /// progress (per-partition `SETNX` lock held). These will be
307    /// rotated by the concurrent request; NOT a fault.
308    #[serde(default)]
309    pub in_progress: Vec<u16>,
310    /// The `new_kid` that was installed as current on every
311    /// rotated partition — echoes the request field back for
312    /// confirmation.
313    pub new_kid: String,
314}
315
316/// Server-side error body shape, as emitted by
317/// `ff_server::api::ErrorBody`. Kept internal because consumers
318/// match on the flattened fields of [`SdkError::AdminApi`].
319#[derive(Debug, Clone, Deserialize)]
320struct AdminErrorBody {
321    error: String,
322    #[serde(default)]
323    kind: Option<String>,
324    #[serde(default)]
325    retryable: Option<bool>,
326}
327
328/// Request body for `POST /v1/workers/{worker_id}/claim`.
329///
330/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
331/// goes in the URL path (not the body) but is kept on the struct
332/// for ergonomics — callers don't juggle a separate arg.
333#[derive(Debug, Clone, Serialize)]
334pub struct ClaimForWorkerRequest {
335    #[serde(skip)]
336    pub worker_id: String,
337    pub lane_id: String,
338    pub worker_instance_id: String,
339    #[serde(default)]
340    pub capabilities: Vec<String>,
341    /// Grant TTL in milliseconds. Server rejects 0 or anything over
342    /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
343    pub grant_ttl_ms: u64,
344}
345
346/// Response body for `POST /v1/workers/{worker_id}/claim`.
347///
348/// Wire shape of `ff_core::contracts::ClaimGrant`. The core type is
349/// not serde-derived (carries a `Partition` with a non-scalar family
350/// enum) so the SDK decodes into this DTO and reconstructs the core
351/// type via [`Self::into_grant`].
352#[derive(Debug, Clone, Deserialize)]
353pub struct ClaimForWorkerResponse {
354    pub execution_id: String,
355    pub partition_family: String,
356    pub partition_index: u16,
357    pub grant_key: String,
358    pub expires_at_ms: u64,
359}
360
361impl ClaimForWorkerResponse {
362    /// Convert the wire DTO into a typed
363    /// [`ff_core::contracts::ClaimGrant`] for handoff to
364    /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
365    /// [`SdkError::AdminApi`] on malformed execution_id / unknown
366    /// partition_family (server and SDK have drifted; fail loud so
367    /// callers don't route to a ghost partition).
368    pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
369        let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
370            .map_err(|e| SdkError::AdminApi {
371                status: 200,
372                message: format!(
373                    "claim_for_worker: server returned malformed execution_id '{}': {e}",
374                    self.execution_id
375                ),
376                kind: Some("malformed_response".to_owned()),
377                retryable: Some(false),
378                raw_body: String::new(),
379            })?;
380        let family = match self.partition_family.as_str() {
381            "flow" => ff_core::partition::PartitionFamily::Flow,
382            "execution" => ff_core::partition::PartitionFamily::Execution,
383            "budget" => ff_core::partition::PartitionFamily::Budget,
384            "quota" => ff_core::partition::PartitionFamily::Quota,
385            other => {
386                return Err(SdkError::AdminApi {
387                    status: 200,
388                    message: format!(
389                        "claim_for_worker: unknown partition_family '{other}'"
390                    ),
391                    kind: Some("malformed_response".to_owned()),
392                    retryable: Some(false),
393                    raw_body: String::new(),
394                });
395            }
396        };
397        Ok(ff_core::contracts::ClaimGrant {
398            execution_id,
399            partition: ff_core::partition::Partition {
400                family,
401                index: self.partition_index,
402            },
403            grant_key: self.grant_key,
404            expires_at_ms: self.expires_at_ms,
405        })
406    }
407}
408
409/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
410/// never produces `https://host//v1/...`. Mirror of
411/// media-pipeline's pattern.
412fn normalize_base_url(mut url: String) -> String {
413    while url.ends_with('/') {
414        url.pop();
415    }
416    url
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn base_url_strips_trailing_slash() {
425        assert_eq!(normalize_base_url("http://x".into()), "http://x");
426        assert_eq!(normalize_base_url("http://x/".into()), "http://x");
427        assert_eq!(normalize_base_url("http://x///".into()), "http://x");
428    }
429
430    #[test]
431    fn with_token_rejects_bad_header_chars() {
432        // Raw newline in the token would split the Authorization
433        // header — must fail loudly at construction.
434        let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
435        assert!(matches!(err, SdkError::Config(_)), "got: {err:?}");
436    }
437
438    #[test]
439    fn with_token_rejects_empty_or_whitespace() {
440        // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
441        // Fail loudly at construction instead of shipping a client
442        // that silently 401s on first request.
443        for s in ["", " ", "\t\n ", "   "] {
444            let err = FlowFabricAdminClient::with_token("http://x", s)
445                .unwrap_err();
446            assert!(
447                matches!(&err, SdkError::Config(msg) if msg.contains("empty")),
448                "token {s:?} should return Config(empty/whitespace); got: {err:?}"
449            );
450        }
451    }
452
453    #[test]
454    fn admin_error_body_deserialises_optional_fields() {
455        // `kind` + `retryable` absent (the usual shape for 400s).
456        let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
457        assert_eq!(b.error, "bad new_kid");
458        assert!(b.kind.is_none());
459        assert!(b.retryable.is_none());
460
461        // `kind` + `retryable` present (500 ValkeyError shape).
462        let b: AdminErrorBody = serde_json::from_str(
463            r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
464        )
465        .unwrap();
466        assert_eq!(b.error, "valkey: timed out");
467        assert_eq!(b.kind.as_deref(), Some("IoError"));
468        assert_eq!(b.retryable, Some(true));
469    }
470
471    #[test]
472    fn rotate_response_deserialises_server_shape() {
473        // Exact shape the server emits (ff-server server.rs:2636).
474        let raw = r#"{
475            "rotated": 3,
476            "failed": [4, 5],
477            "in_progress": [6],
478            "new_kid": "kid-2026-04-18"
479        }"#;
480        let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
481        assert_eq!(r.rotated, 3);
482        assert_eq!(r.failed, vec![4, 5]);
483        assert_eq!(r.in_progress, vec![6]);
484        assert_eq!(r.new_kid, "kid-2026-04-18");
485    }
486
487    #[test]
488    fn rotate_response_handles_missing_in_progress() {
489        // Older server versions may omit in_progress. Default to
490        // empty so the client stays forward-compatible.
491        let raw = r#"{"rotated": 1, "failed": [], "new_kid": "k1"}"#;
492        let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
493        assert_eq!(r.in_progress, Vec::<u16>::new());
494    }
495
496    // ── ClaimForWorkerResponse::into_grant ──
497
498    fn sample_claim_response(family: &str) -> ClaimForWorkerResponse {
499        ClaimForWorkerResponse {
500            execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
501            partition_family: family.to_owned(),
502            partition_index: 5,
503            grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
504            expires_at_ms: 1_700_000_000_000,
505        }
506    }
507
508    #[test]
509    fn into_grant_accepts_all_known_families() {
510        for family in ["flow", "execution", "budget", "quota"] {
511            let g = sample_claim_response(family).into_grant().unwrap_or_else(|e| {
512                panic!("family {family} should parse: {e:?}")
513            });
514            assert_eq!(g.partition.index, 5);
515            assert_eq!(g.expires_at_ms, 1_700_000_000_000);
516        }
517    }
518
519    #[test]
520    fn into_grant_rejects_unknown_family() {
521        let resp = sample_claim_response("nonsense");
522        let err = resp.into_grant().unwrap_err();
523        match err {
524            SdkError::AdminApi { message, kind, .. } => {
525                assert!(message.contains("unknown partition_family"),
526                    "msg: {message}");
527                assert_eq!(kind.as_deref(), Some("malformed_response"));
528            }
529            other => panic!("expected AdminApi, got {other:?}"),
530        }
531    }
532
533    #[test]
534    fn into_grant_rejects_malformed_execution_id() {
535        let mut resp = sample_claim_response("flow");
536        resp.execution_id = "not-a-valid-eid".to_owned();
537        let err = resp.into_grant().unwrap_err();
538        match err {
539            SdkError::AdminApi { message, kind, .. } => {
540                assert!(message.contains("malformed execution_id"),
541                    "msg: {message}");
542                assert_eq!(kind.as_deref(), Some("malformed_response"));
543            }
544            other => panic!("expected AdminApi, got {other:?}"),
545        }
546    }
547}