Skip to main content

ff_sdk/
admin.rs

1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20
21use crate::SdkError;
22
23/// Default per-request timeout. The server's own
24/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
25/// client deadline is LATER than the server deadline and
26/// operators see the structured 504 GATEWAY_TIMEOUT body rather
27/// than a client-side timeout error.
28const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
29
30/// Client for the `ff-server` admin REST surface.
31///
32/// Construct via [`FlowFabricAdminClient::new`] (no auth) or
33/// [`FlowFabricAdminClient::with_token`] (Bearer auth). Both
34/// return a ready-to-use client backed by a single pooled
35/// `reqwest::Client` — reuse the instance across calls instead of
36/// building one per request.
37#[derive(Debug, Clone)]
38pub struct FlowFabricAdminClient {
39    http: reqwest::Client,
40    base_url: String,
41}
42
43impl FlowFabricAdminClient {
44    /// Build a client without auth. Suitable for a dev ff-server
45    /// whose `api_token` is unconfigured. Production deployments
46    /// should use [`with_token`](Self::with_token).
47    pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
48        let http = reqwest::Client::builder()
49            .timeout(DEFAULT_TIMEOUT)
50            .build()
51            .map_err(|e| SdkError::Http {
52                source: e,
53                context: "build reqwest::Client".into(),
54            })?;
55        Ok(Self {
56            http,
57            base_url: normalize_base_url(base_url.into()),
58        })
59    }
60
61    /// Build a client that sends `Authorization: Bearer <token>` on
62    /// every request. The token is passed by value so the caller
63    /// retains ownership policy (e.g. zeroize on drop at the
64    /// caller side); the SDK only reads it.
65    ///
66    /// # Empty-token guard
67    ///
68    /// An empty or all-whitespace `token` returns
69    /// [`SdkError::Config`] instead of silently constructing
70    /// `Authorization: Bearer ` (which the server rejects with
71    /// 401, leaving the operator chasing a "why is auth broken"
72    /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
73    /// where the var was meant to be set; the unset-expansion is
74    /// the empty string. Prefer an obvious error at construction
75    /// over a silent 401 at first request.
76    ///
77    /// If the caller genuinely wants an unauthenticated client
78    /// (dev ff-server without `api_token` configured), use
79    /// [`FlowFabricAdminClient::new`] instead.
80    pub fn with_token(
81        base_url: impl Into<String>,
82        token: impl AsRef<str>,
83    ) -> Result<Self, SdkError> {
84        let token_str = token.as_ref();
85        if token_str.trim().is_empty() {
86            return Err(SdkError::Config(
87                "bearer token is empty or all-whitespace; use \
88                 FlowFabricAdminClient::new for unauthenticated access"
89                    .into(),
90            ));
91        }
92        let mut headers = reqwest::header::HeaderMap::new();
93        let mut auth_value =
94            reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
95                |_| {
96                    SdkError::Config(
97                        "bearer token contains characters not valid in an HTTP header".into(),
98                    )
99                },
100            )?;
101        // Mark Authorization as sensitive so it doesn't appear in
102        // reqwest's Debug output / logs.
103        auth_value.set_sensitive(true);
104        headers.insert(reqwest::header::AUTHORIZATION, auth_value);
105
106        let http = reqwest::Client::builder()
107            .timeout(DEFAULT_TIMEOUT)
108            .default_headers(headers)
109            .build()
110            .map_err(|e| SdkError::Http {
111                source: e,
112                context: "build reqwest::Client".into(),
113            })?;
114        Ok(Self {
115            http,
116            base_url: normalize_base_url(base_url.into()),
117        })
118    }
119
120    /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
121    ///
122    /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
123    /// server-side one: the request carries lane + identity +
124    /// capabilities + grant TTL; the server runs budget, quota, and
125    /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
126    /// and returns a `ClaimGrant` on success.
127    ///
128    /// Returns `Ok(None)` when the server responds 204 No Content
129    /// (no eligible execution on the lane). Callers that want to keep
130    /// polling should back off per their claim cadence.
131    pub async fn claim_for_worker(
132        &self,
133        req: ClaimForWorkerRequest,
134    ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
135        // Percent-encode `worker_id` in the URL path — `WorkerId` is a
136        // free-form string (could contain `/`, spaces, `%`, etc.) and
137        // splicing it verbatim would produce malformed URLs or
138        // misrouted paths. `Url::path_segments_mut().push` handles the
139        // encoding natively.
140        let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| {
141            SdkError::Config(format!("invalid base_url '{}': {e}", self.base_url))
142        })?;
143        {
144            let mut segs = url.path_segments_mut().map_err(|_| {
145                SdkError::Config(format!(
146                    "base_url cannot be a base URL: '{}'",
147                    self.base_url
148                ))
149            })?;
150            segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
151        }
152        let url = url.to_string();
153        let resp = self
154            .http
155            .post(&url)
156            .json(&req)
157            .send()
158            .await
159            .map_err(|e| SdkError::Http {
160                source: e,
161                context: "POST /v1/workers/{worker_id}/claim".into(),
162            })?;
163
164        let status = resp.status();
165        if status == reqwest::StatusCode::NO_CONTENT {
166            return Ok(None);
167        }
168        if status.is_success() {
169            return resp
170                .json::<ClaimForWorkerResponse>()
171                .await
172                .map(Some)
173                .map_err(|e| SdkError::Http {
174                    source: e,
175                    context: "decode claim_for_worker response body".into(),
176                });
177        }
178
179        // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
180        let status_u16 = status.as_u16();
181        let raw = resp.text().await.map_err(|e| SdkError::Http {
182            source: e,
183            context: format!("read claim_for_worker error body (status {status_u16})"),
184        })?;
185        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
186        Err(SdkError::AdminApi {
187            status: status_u16,
188            message: parsed
189                .as_ref()
190                .map(|b| b.error.clone())
191                .unwrap_or_else(|| raw.clone()),
192            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
193            retryable: parsed.as_ref().and_then(|b| b.retryable),
194            raw_body: raw,
195        })
196    }
197
198    /// Rotate the waitpoint HMAC secret on the server.
199    ///
200    /// Promotes the currently-installed kid to `previous_kid`
201    /// (accepted for the server's configured
202    /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
203    /// `new_secret_hex` under `new_kid` as the new current. Fans
204    /// out across every execution partition. Idempotent: re-running
205    /// with the same `(new_kid, new_secret_hex)` converges.
206    ///
207    /// The server returns 200 if at least one partition rotated OR
208    /// at least one partition was already rotating under a
209    /// concurrent request. See `RotateWaitpointSecretResponse`
210    /// fields for the breakdown.
211    ///
212    /// # Errors
213    ///
214    /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
215    ///   input, 401 missing/bad bearer, 429 concurrent rotate,
216    ///   500 all partitions failed, 504 server-side timeout).
217    /// * [`SdkError::Http`] — transport error (connect, body
218    ///   decode, client-side timeout).
219    ///
220    /// # Retry semantics
221    ///
222    /// Rotation is idempotent on the same `(new_kid,
223    /// new_secret_hex)` so retries are SAFE even on 504s or
224    /// partial failures.
225    pub async fn rotate_waitpoint_secret(
226        &self,
227        req: RotateWaitpointSecretRequest,
228    ) -> Result<RotateWaitpointSecretResponse, SdkError> {
229        let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
230        let resp = self
231            .http
232            .post(&url)
233            .json(&req)
234            .send()
235            .await
236            .map_err(|e| SdkError::Http {
237                source: e,
238                context: "POST /v1/admin/rotate-waitpoint-secret".into(),
239            })?;
240
241        let status = resp.status();
242        if status.is_success() {
243            return resp
244                .json::<RotateWaitpointSecretResponse>()
245                .await
246                .map_err(|e| SdkError::Http {
247                    source: e,
248                    context: "decode rotate-waitpoint-secret response body".into(),
249                });
250        }
251
252        // Non-2xx: parse the server's ErrorBody if we can, fall
253        // back to a raw body otherwise. Propagate body-read
254        // transport errors as Http rather than silently flattening
255        // them into `AdminApi { raw_body: "" }` — a connection drop
256        // mid-body-read is a transport fault, not an API-layer
257        // reject, and misclassifying it strips `is_retryable`'s
258        // timeout/connect signal from the caller.
259        let status_u16 = status.as_u16();
260        let raw = resp.text().await.map_err(|e| SdkError::Http {
261            source: e,
262            context: format!(
263                "read rotate-waitpoint-secret error response body (status {status_u16})"
264            ),
265        })?;
266        let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
267        Err(SdkError::AdminApi {
268            status: status_u16,
269            message: parsed
270                .as_ref()
271                .map(|b| b.error.clone())
272                .unwrap_or_else(|| raw.clone()),
273            kind: parsed.as_ref().and_then(|b| b.kind.clone()),
274            retryable: parsed.as_ref().and_then(|b| b.retryable),
275            raw_body: raw,
276        })
277    }
278}
279
280/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
281///
282/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
283#[derive(Debug, Clone, Serialize)]
284pub struct RotateWaitpointSecretRequest {
285    /// New key identifier. Non-empty, must not contain `:` (the
286    /// server uses `:` as the field separator in the secret hash).
287    pub new_kid: String,
288    /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
289    pub new_secret_hex: String,
290}
291
292/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
293///
294/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
295/// The server serializes this struct as-is via `Json(result)`.
296#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
297pub struct RotateWaitpointSecretResponse {
298    /// Count of partitions that accepted the rotation.
299    pub rotated: u16,
300    /// Partition indices where the rotation failed — operator
301    /// should investigate. Rotation is idempotent on the same
302    /// `(new_kid, new_secret_hex)` so a retry after the underlying
303    /// fault clears converges.
304    pub failed: Vec<u16>,
305    /// The `new_kid` that was installed as current on every
306    /// rotated partition — echoes the request field back for
307    /// confirmation.
308    pub new_kid: String,
309}
310
311/// Server-side error body shape, as emitted by
312/// `ff_server::api::ErrorBody`. Kept internal because consumers
313/// match on the flattened fields of [`SdkError::AdminApi`].
314#[derive(Debug, Clone, Deserialize)]
315struct AdminErrorBody {
316    error: String,
317    #[serde(default)]
318    kind: Option<String>,
319    #[serde(default)]
320    retryable: Option<bool>,
321}
322
323/// Request body for `POST /v1/workers/{worker_id}/claim`.
324///
325/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
326/// goes in the URL path (not the body) but is kept on the struct
327/// for ergonomics — callers don't juggle a separate arg.
328#[derive(Debug, Clone, Serialize)]
329pub struct ClaimForWorkerRequest {
330    #[serde(skip)]
331    pub worker_id: String,
332    pub lane_id: String,
333    pub worker_instance_id: String,
334    #[serde(default)]
335    pub capabilities: Vec<String>,
336    /// Grant TTL in milliseconds. Server rejects 0 or anything over
337    /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
338    pub grant_ttl_ms: u64,
339}
340
341/// Response body for `POST /v1/workers/{worker_id}/claim`.
342///
343/// Wire shape of `ff_core::contracts::ClaimGrant`. The core type is
344/// not serde-derived (carries a `Partition` with a non-scalar family
345/// enum) so the SDK decodes into this DTO and reconstructs the core
346/// type via [`Self::into_grant`].
347#[derive(Debug, Clone, Deserialize)]
348pub struct ClaimForWorkerResponse {
349    pub execution_id: String,
350    pub partition_family: String,
351    pub partition_index: u16,
352    pub grant_key: String,
353    pub expires_at_ms: u64,
354}
355
356impl ClaimForWorkerResponse {
357    /// Convert the wire DTO into a typed
358    /// [`ff_core::contracts::ClaimGrant`] for handoff to
359    /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
360    /// [`SdkError::AdminApi`] on malformed execution_id / unknown
361    /// partition_family (server and SDK have drifted; fail loud so
362    /// callers don't route to a ghost partition).
363    pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
364        let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
365            .map_err(|e| SdkError::AdminApi {
366                status: 200,
367                message: format!(
368                    "claim_for_worker: server returned malformed execution_id '{}': {e}",
369                    self.execution_id
370                ),
371                kind: Some("malformed_response".to_owned()),
372                retryable: Some(false),
373                raw_body: String::new(),
374            })?;
375        let family = match self.partition_family.as_str() {
376            "flow" => ff_core::partition::PartitionFamily::Flow,
377            "execution" => ff_core::partition::PartitionFamily::Execution,
378            "budget" => ff_core::partition::PartitionFamily::Budget,
379            "quota" => ff_core::partition::PartitionFamily::Quota,
380            other => {
381                return Err(SdkError::AdminApi {
382                    status: 200,
383                    message: format!(
384                        "claim_for_worker: unknown partition_family '{other}'"
385                    ),
386                    kind: Some("malformed_response".to_owned()),
387                    retryable: Some(false),
388                    raw_body: String::new(),
389                });
390            }
391        };
392        Ok(ff_core::contracts::ClaimGrant {
393            execution_id,
394            partition: ff_core::partition::Partition {
395                family,
396                index: self.partition_index,
397            },
398            grant_key: self.grant_key,
399            expires_at_ms: self.expires_at_ms,
400        })
401    }
402}
403
404/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
405/// never produces `https://host//v1/...`. Mirror of
406/// media-pipeline's pattern.
407fn normalize_base_url(mut url: String) -> String {
408    while url.ends_with('/') {
409        url.pop();
410    }
411    url
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417
418    #[test]
419    fn base_url_strips_trailing_slash() {
420        assert_eq!(normalize_base_url("http://x".into()), "http://x");
421        assert_eq!(normalize_base_url("http://x/".into()), "http://x");
422        assert_eq!(normalize_base_url("http://x///".into()), "http://x");
423    }
424
425    #[test]
426    fn with_token_rejects_bad_header_chars() {
427        // Raw newline in the token would split the Authorization
428        // header — must fail loudly at construction.
429        let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
430        assert!(matches!(err, SdkError::Config(_)), "got: {err:?}");
431    }
432
433    #[test]
434    fn with_token_rejects_empty_or_whitespace() {
435        // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
436        // Fail loudly at construction instead of shipping a client
437        // that silently 401s on first request.
438        for s in ["", " ", "\t\n ", "   "] {
439            let err = FlowFabricAdminClient::with_token("http://x", s)
440                .unwrap_err();
441            assert!(
442                matches!(&err, SdkError::Config(msg) if msg.contains("empty")),
443                "token {s:?} should return Config(empty/whitespace); got: {err:?}"
444            );
445        }
446    }
447
448    #[test]
449    fn admin_error_body_deserialises_optional_fields() {
450        // `kind` + `retryable` absent (the usual shape for 400s).
451        let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
452        assert_eq!(b.error, "bad new_kid");
453        assert!(b.kind.is_none());
454        assert!(b.retryable.is_none());
455
456        // `kind` + `retryable` present (500 ValkeyError shape).
457        let b: AdminErrorBody = serde_json::from_str(
458            r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
459        )
460        .unwrap();
461        assert_eq!(b.error, "valkey: timed out");
462        assert_eq!(b.kind.as_deref(), Some("IoError"));
463        assert_eq!(b.retryable, Some(true));
464    }
465
466    #[test]
467    fn rotate_response_deserialises_server_shape() {
468        // Exact shape the server emits.
469        let raw = r#"{
470            "rotated": 3,
471            "failed": [4, 5],
472            "new_kid": "kid-2026-04-18"
473        }"#;
474        let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
475        assert_eq!(r.rotated, 3);
476        assert_eq!(r.failed, vec![4, 5]);
477        assert_eq!(r.new_kid, "kid-2026-04-18");
478    }
479
480    // ── ClaimForWorkerResponse::into_grant ──
481
482    fn sample_claim_response(family: &str) -> ClaimForWorkerResponse {
483        ClaimForWorkerResponse {
484            execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
485            partition_family: family.to_owned(),
486            partition_index: 5,
487            grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
488            expires_at_ms: 1_700_000_000_000,
489        }
490    }
491
492    #[test]
493    fn into_grant_accepts_all_known_families() {
494        for family in ["flow", "execution", "budget", "quota"] {
495            let g = sample_claim_response(family).into_grant().unwrap_or_else(|e| {
496                panic!("family {family} should parse: {e:?}")
497            });
498            assert_eq!(g.partition.index, 5);
499            assert_eq!(g.expires_at_ms, 1_700_000_000_000);
500        }
501    }
502
503    #[test]
504    fn into_grant_rejects_unknown_family() {
505        let resp = sample_claim_response("nonsense");
506        let err = resp.into_grant().unwrap_err();
507        match err {
508            SdkError::AdminApi { message, kind, .. } => {
509                assert!(message.contains("unknown partition_family"),
510                    "msg: {message}");
511                assert_eq!(kind.as_deref(), Some("malformed_response"));
512            }
513            other => panic!("expected AdminApi, got {other:?}"),
514        }
515    }
516
517    #[test]
518    fn into_grant_rejects_malformed_execution_id() {
519        let mut resp = sample_claim_response("flow");
520        resp.execution_id = "not-a-valid-eid".to_owned();
521        let err = resp.into_grant().unwrap_err();
522        match err {
523            SdkError::AdminApi { message, kind, .. } => {
524                assert!(message.contains("malformed execution_id"),
525                    "msg: {message}");
526                assert_eq!(kind.as_deref(), Some("malformed_response"));
527            }
528            other => panic!("expected AdminApi, got {other:?}"),
529        }
530    }
531}