ff_sdk/admin.rs
1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20
21use crate::SdkError;
22
23/// Default per-request timeout. The server's own
24/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
25/// client deadline is LATER than the server deadline and
26/// operators see the structured 504 GATEWAY_TIMEOUT body rather
27/// than a client-side timeout error.
28const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
29
30/// Client for the `ff-server` admin REST surface.
31///
32/// Construct via [`FlowFabricAdminClient::new`] (no auth) or
33/// [`FlowFabricAdminClient::with_token`] (Bearer auth). Both
34/// return a ready-to-use client backed by a single pooled
35/// `reqwest::Client` — reuse the instance across calls instead of
36/// building one per request.
37#[derive(Debug, Clone)]
38pub struct FlowFabricAdminClient {
39 http: reqwest::Client,
40 base_url: String,
41}
42
43impl FlowFabricAdminClient {
44 /// Build a client without auth. Suitable for a dev ff-server
45 /// whose `api_token` is unconfigured. Production deployments
46 /// should use [`with_token`](Self::with_token).
47 pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
48 let http = reqwest::Client::builder()
49 .timeout(DEFAULT_TIMEOUT)
50 .build()
51 .map_err(|e| SdkError::Http {
52 source: e,
53 context: "build reqwest::Client".into(),
54 })?;
55 Ok(Self {
56 http,
57 base_url: normalize_base_url(base_url.into()),
58 })
59 }
60
61 /// Build a client that sends `Authorization: Bearer <token>` on
62 /// every request. The token is passed by value so the caller
63 /// retains ownership policy (e.g. zeroize on drop at the
64 /// caller side); the SDK only reads it.
65 ///
66 /// # Empty-token guard
67 ///
68 /// An empty or all-whitespace `token` returns
69 /// [`SdkError::Config`] instead of silently constructing
70 /// `Authorization: Bearer ` (which the server rejects with
71 /// 401, leaving the operator chasing a "why is auth broken"
72 /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
73 /// where the var was meant to be set; the unset-expansion is
74 /// the empty string. Prefer an obvious error at construction
75 /// over a silent 401 at first request.
76 ///
77 /// If the caller genuinely wants an unauthenticated client
78 /// (dev ff-server without `api_token` configured), use
79 /// [`FlowFabricAdminClient::new`] instead.
80 pub fn with_token(
81 base_url: impl Into<String>,
82 token: impl AsRef<str>,
83 ) -> Result<Self, SdkError> {
84 let token_str = token.as_ref();
85 if token_str.trim().is_empty() {
86 return Err(SdkError::Config(
87 "bearer token is empty or all-whitespace; use \
88 FlowFabricAdminClient::new for unauthenticated access"
89 .into(),
90 ));
91 }
92 let mut headers = reqwest::header::HeaderMap::new();
93 let mut auth_value =
94 reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
95 |_| {
96 SdkError::Config(
97 "bearer token contains characters not valid in an HTTP header".into(),
98 )
99 },
100 )?;
101 // Mark Authorization as sensitive so it doesn't appear in
102 // reqwest's Debug output / logs.
103 auth_value.set_sensitive(true);
104 headers.insert(reqwest::header::AUTHORIZATION, auth_value);
105
106 let http = reqwest::Client::builder()
107 .timeout(DEFAULT_TIMEOUT)
108 .default_headers(headers)
109 .build()
110 .map_err(|e| SdkError::Http {
111 source: e,
112 context: "build reqwest::Client".into(),
113 })?;
114 Ok(Self {
115 http,
116 base_url: normalize_base_url(base_url.into()),
117 })
118 }
119
120 /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
121 ///
122 /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
123 /// server-side one: the request carries lane + identity +
124 /// capabilities + grant TTL; the server runs budget, quota, and
125 /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
126 /// and returns a `ClaimGrant` on success.
127 ///
128 /// Returns `Ok(None)` when the server responds 204 No Content
129 /// (no eligible execution on the lane). Callers that want to keep
130 /// polling should back off per their claim cadence.
131 pub async fn claim_for_worker(
132 &self,
133 req: ClaimForWorkerRequest,
134 ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
135 // Percent-encode `worker_id` in the URL path — `WorkerId` is a
136 // free-form string (could contain `/`, spaces, `%`, etc.) and
137 // splicing it verbatim would produce malformed URLs or
138 // misrouted paths. `Url::path_segments_mut().push` handles the
139 // encoding natively.
140 let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| {
141 SdkError::Config(format!("invalid base_url '{}': {e}", self.base_url))
142 })?;
143 {
144 let mut segs = url.path_segments_mut().map_err(|_| {
145 SdkError::Config(format!(
146 "base_url cannot be a base URL: '{}'",
147 self.base_url
148 ))
149 })?;
150 segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
151 }
152 let url = url.to_string();
153 let resp = self
154 .http
155 .post(&url)
156 .json(&req)
157 .send()
158 .await
159 .map_err(|e| SdkError::Http {
160 source: e,
161 context: "POST /v1/workers/{worker_id}/claim".into(),
162 })?;
163
164 let status = resp.status();
165 if status == reqwest::StatusCode::NO_CONTENT {
166 return Ok(None);
167 }
168 if status.is_success() {
169 return resp
170 .json::<ClaimForWorkerResponse>()
171 .await
172 .map(Some)
173 .map_err(|e| SdkError::Http {
174 source: e,
175 context: "decode claim_for_worker response body".into(),
176 });
177 }
178
179 // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
180 let status_u16 = status.as_u16();
181 let raw = resp.text().await.map_err(|e| SdkError::Http {
182 source: e,
183 context: format!("read claim_for_worker error body (status {status_u16})"),
184 })?;
185 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
186 Err(SdkError::AdminApi {
187 status: status_u16,
188 message: parsed
189 .as_ref()
190 .map(|b| b.error.clone())
191 .unwrap_or_else(|| raw.clone()),
192 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
193 retryable: parsed.as_ref().and_then(|b| b.retryable),
194 raw_body: raw,
195 })
196 }
197
198 /// Rotate the waitpoint HMAC secret on the server.
199 ///
200 /// Promotes the currently-installed kid to `previous_kid`
201 /// (accepted for the server's configured
202 /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
203 /// `new_secret_hex` under `new_kid` as the new current. Fans
204 /// out across every execution partition. Idempotent: re-running
205 /// with the same `(new_kid, new_secret_hex)` converges.
206 ///
207 /// The server returns 200 if at least one partition rotated OR
208 /// at least one partition was already rotating under a
209 /// concurrent request. See `RotateWaitpointSecretResponse`
210 /// fields for the breakdown.
211 ///
212 /// # Errors
213 ///
214 /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
215 /// input, 401 missing/bad bearer, 429 concurrent rotate,
216 /// 500 all partitions failed, 504 server-side timeout).
217 /// * [`SdkError::Http`] — transport error (connect, body
218 /// decode, client-side timeout).
219 ///
220 /// # Retry semantics
221 ///
222 /// Rotation is idempotent on the same `(new_kid,
223 /// new_secret_hex)` so retries are SAFE even on 504s or
224 /// partial failures.
225 pub async fn rotate_waitpoint_secret(
226 &self,
227 req: RotateWaitpointSecretRequest,
228 ) -> Result<RotateWaitpointSecretResponse, SdkError> {
229 let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
230 let resp = self
231 .http
232 .post(&url)
233 .json(&req)
234 .send()
235 .await
236 .map_err(|e| SdkError::Http {
237 source: e,
238 context: "POST /v1/admin/rotate-waitpoint-secret".into(),
239 })?;
240
241 let status = resp.status();
242 if status.is_success() {
243 return resp
244 .json::<RotateWaitpointSecretResponse>()
245 .await
246 .map_err(|e| SdkError::Http {
247 source: e,
248 context: "decode rotate-waitpoint-secret response body".into(),
249 });
250 }
251
252 // Non-2xx: parse the server's ErrorBody if we can, fall
253 // back to a raw body otherwise. Propagate body-read
254 // transport errors as Http rather than silently flattening
255 // them into `AdminApi { raw_body: "" }` — a connection drop
256 // mid-body-read is a transport fault, not an API-layer
257 // reject, and misclassifying it strips `is_retryable`'s
258 // timeout/connect signal from the caller.
259 let status_u16 = status.as_u16();
260 let raw = resp.text().await.map_err(|e| SdkError::Http {
261 source: e,
262 context: format!(
263 "read rotate-waitpoint-secret error response body (status {status_u16})"
264 ),
265 })?;
266 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
267 Err(SdkError::AdminApi {
268 status: status_u16,
269 message: parsed
270 .as_ref()
271 .map(|b| b.error.clone())
272 .unwrap_or_else(|| raw.clone()),
273 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
274 retryable: parsed.as_ref().and_then(|b| b.retryable),
275 raw_body: raw,
276 })
277 }
278}
279
280/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
281///
282/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
283#[derive(Debug, Clone, Serialize)]
284pub struct RotateWaitpointSecretRequest {
285 /// New key identifier. Non-empty, must not contain `:` (the
286 /// server uses `:` as the field separator in the secret hash).
287 pub new_kid: String,
288 /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
289 pub new_secret_hex: String,
290}
291
292/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
293///
294/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
295/// The server serializes this struct as-is via `Json(result)`.
296#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
297pub struct RotateWaitpointSecretResponse {
298 /// Count of partitions that accepted the rotation.
299 pub rotated: u16,
300 /// Partition indices where the rotation failed — operator
301 /// should investigate. Rotation is idempotent on the same
302 /// `(new_kid, new_secret_hex)` so a retry after the underlying
303 /// fault clears converges.
304 pub failed: Vec<u16>,
305 /// The `new_kid` that was installed as current on every
306 /// rotated partition — echoes the request field back for
307 /// confirmation.
308 pub new_kid: String,
309}
310
311/// Server-side error body shape, as emitted by
312/// `ff_server::api::ErrorBody`. Kept internal because consumers
313/// match on the flattened fields of [`SdkError::AdminApi`].
314#[derive(Debug, Clone, Deserialize)]
315struct AdminErrorBody {
316 error: String,
317 #[serde(default)]
318 kind: Option<String>,
319 #[serde(default)]
320 retryable: Option<bool>,
321}
322
323/// Request body for `POST /v1/workers/{worker_id}/claim`.
324///
325/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
326/// goes in the URL path (not the body) but is kept on the struct
327/// for ergonomics — callers don't juggle a separate arg.
328#[derive(Debug, Clone, Serialize)]
329pub struct ClaimForWorkerRequest {
330 #[serde(skip)]
331 pub worker_id: String,
332 pub lane_id: String,
333 pub worker_instance_id: String,
334 #[serde(default)]
335 pub capabilities: Vec<String>,
336 /// Grant TTL in milliseconds. Server rejects 0 or anything over
337 /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
338 pub grant_ttl_ms: u64,
339}
340
341/// Response body for `POST /v1/workers/{worker_id}/claim`.
342///
343/// Wire shape of `ff_core::contracts::ClaimGrant`. The core type is
344/// not serde-derived (carries a `Partition` with a non-scalar family
345/// enum) so the SDK decodes into this DTO and reconstructs the core
346/// type via [`Self::into_grant`].
347#[derive(Debug, Clone, Deserialize)]
348pub struct ClaimForWorkerResponse {
349 pub execution_id: String,
350 pub partition_family: String,
351 pub partition_index: u16,
352 pub grant_key: String,
353 pub expires_at_ms: u64,
354}
355
356impl ClaimForWorkerResponse {
357 /// Convert the wire DTO into a typed
358 /// [`ff_core::contracts::ClaimGrant`] for handoff to
359 /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
360 /// [`SdkError::AdminApi`] on malformed execution_id / unknown
361 /// partition_family (server and SDK have drifted; fail loud so
362 /// callers don't route to a ghost partition).
363 pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
364 let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
365 .map_err(|e| SdkError::AdminApi {
366 status: 200,
367 message: format!(
368 "claim_for_worker: server returned malformed execution_id '{}': {e}",
369 self.execution_id
370 ),
371 kind: Some("malformed_response".to_owned()),
372 retryable: Some(false),
373 raw_body: String::new(),
374 })?;
375 let family = match self.partition_family.as_str() {
376 "flow" => ff_core::partition::PartitionFamily::Flow,
377 "execution" => ff_core::partition::PartitionFamily::Execution,
378 "budget" => ff_core::partition::PartitionFamily::Budget,
379 "quota" => ff_core::partition::PartitionFamily::Quota,
380 other => {
381 return Err(SdkError::AdminApi {
382 status: 200,
383 message: format!(
384 "claim_for_worker: unknown partition_family '{other}'"
385 ),
386 kind: Some("malformed_response".to_owned()),
387 retryable: Some(false),
388 raw_body: String::new(),
389 });
390 }
391 };
392 Ok(ff_core::contracts::ClaimGrant {
393 execution_id,
394 partition: ff_core::partition::Partition {
395 family,
396 index: self.partition_index,
397 },
398 grant_key: self.grant_key,
399 expires_at_ms: self.expires_at_ms,
400 })
401 }
402}
403
404/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
405/// never produces `https://host//v1/...`. Mirror of
406/// media-pipeline's pattern.
407fn normalize_base_url(mut url: String) -> String {
408 while url.ends_with('/') {
409 url.pop();
410 }
411 url
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417
418 #[test]
419 fn base_url_strips_trailing_slash() {
420 assert_eq!(normalize_base_url("http://x".into()), "http://x");
421 assert_eq!(normalize_base_url("http://x/".into()), "http://x");
422 assert_eq!(normalize_base_url("http://x///".into()), "http://x");
423 }
424
425 #[test]
426 fn with_token_rejects_bad_header_chars() {
427 // Raw newline in the token would split the Authorization
428 // header — must fail loudly at construction.
429 let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
430 assert!(matches!(err, SdkError::Config(_)), "got: {err:?}");
431 }
432
433 #[test]
434 fn with_token_rejects_empty_or_whitespace() {
435 // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
436 // Fail loudly at construction instead of shipping a client
437 // that silently 401s on first request.
438 for s in ["", " ", "\t\n ", " "] {
439 let err = FlowFabricAdminClient::with_token("http://x", s)
440 .unwrap_err();
441 assert!(
442 matches!(&err, SdkError::Config(msg) if msg.contains("empty")),
443 "token {s:?} should return Config(empty/whitespace); got: {err:?}"
444 );
445 }
446 }
447
448 #[test]
449 fn admin_error_body_deserialises_optional_fields() {
450 // `kind` + `retryable` absent (the usual shape for 400s).
451 let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
452 assert_eq!(b.error, "bad new_kid");
453 assert!(b.kind.is_none());
454 assert!(b.retryable.is_none());
455
456 // `kind` + `retryable` present (500 ValkeyError shape).
457 let b: AdminErrorBody = serde_json::from_str(
458 r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
459 )
460 .unwrap();
461 assert_eq!(b.error, "valkey: timed out");
462 assert_eq!(b.kind.as_deref(), Some("IoError"));
463 assert_eq!(b.retryable, Some(true));
464 }
465
466 #[test]
467 fn rotate_response_deserialises_server_shape() {
468 // Exact shape the server emits.
469 let raw = r#"{
470 "rotated": 3,
471 "failed": [4, 5],
472 "new_kid": "kid-2026-04-18"
473 }"#;
474 let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
475 assert_eq!(r.rotated, 3);
476 assert_eq!(r.failed, vec![4, 5]);
477 assert_eq!(r.new_kid, "kid-2026-04-18");
478 }
479
480 // ── ClaimForWorkerResponse::into_grant ──
481
482 fn sample_claim_response(family: &str) -> ClaimForWorkerResponse {
483 ClaimForWorkerResponse {
484 execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
485 partition_family: family.to_owned(),
486 partition_index: 5,
487 grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
488 expires_at_ms: 1_700_000_000_000,
489 }
490 }
491
492 #[test]
493 fn into_grant_accepts_all_known_families() {
494 for family in ["flow", "execution", "budget", "quota"] {
495 let g = sample_claim_response(family).into_grant().unwrap_or_else(|e| {
496 panic!("family {family} should parse: {e:?}")
497 });
498 assert_eq!(g.partition.index, 5);
499 assert_eq!(g.expires_at_ms, 1_700_000_000_000);
500 }
501 }
502
503 #[test]
504 fn into_grant_rejects_unknown_family() {
505 let resp = sample_claim_response("nonsense");
506 let err = resp.into_grant().unwrap_err();
507 match err {
508 SdkError::AdminApi { message, kind, .. } => {
509 assert!(message.contains("unknown partition_family"),
510 "msg: {message}");
511 assert_eq!(kind.as_deref(), Some("malformed_response"));
512 }
513 other => panic!("expected AdminApi, got {other:?}"),
514 }
515 }
516
517 #[test]
518 fn into_grant_rejects_malformed_execution_id() {
519 let mut resp = sample_claim_response("flow");
520 resp.execution_id = "not-a-valid-eid".to_owned();
521 let err = resp.into_grant().unwrap_err();
522 match err {
523 SdkError::AdminApi { message, kind, .. } => {
524 assert!(message.contains("malformed execution_id"),
525 "msg: {message}");
526 assert_eq!(kind.as_deref(), Some("malformed_response"));
527 }
528 other => panic!("expected AdminApi, got {other:?}"),
529 }
530 }
531}