ff_sdk/admin.rs
1//! Admin REST client for operator-facing endpoints on `ff-server`.
2//!
3//! Wraps `POST /v1/admin/*` so downstream consumers (cairn-fabric)
4//! don't hand-roll the HTTP call for admin surfaces like HMAC secret
5//! rotation. Mirrors the server's wire types exactly — request
6//! bodies and response shapes are defined against
7//! [`ff_server::api`] + [`ff_server::server`] and kept 1:1 with the
8//! producer.
9//!
10//! Authentication is Bearer token. Callers pick up the token from
11//! wherever they hold it (`FF_API_TOKEN` env var is the common
12//! pattern, but the SDK does not read env vars on the caller's
13//! behalf — [`FlowFabricAdminClient::with_token`] accepts a
14//! string-like token value (`&str` or `String`) via
15//! `impl AsRef<str>`).
16
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20
21use crate::SdkError;
22
23/// Default per-request timeout. The server's own
24/// `ROTATE_HTTP_TIMEOUT` is 120s; pick 130s client-side so the
25/// client deadline is LATER than the server deadline and
26/// operators see the structured 504 GATEWAY_TIMEOUT body rather
27/// than a client-side timeout error.
28const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
29
30/// Client for the `ff-server` admin REST surface.
31///
32/// Construct via [`FlowFabricAdminClient::new`] (no auth) or
33/// [`FlowFabricAdminClient::with_token`] (Bearer auth). Both
34/// return a ready-to-use client backed by a single pooled
35/// `reqwest::Client` — reuse the instance across calls instead of
36/// building one per request.
37#[derive(Debug, Clone)]
38pub struct FlowFabricAdminClient {
39 http: reqwest::Client,
40 base_url: String,
41}
42
43impl FlowFabricAdminClient {
44 /// Build a client without auth. Suitable for a dev ff-server
45 /// whose `api_token` is unconfigured. Production deployments
46 /// should use [`with_token`](Self::with_token).
47 pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
48 let http = reqwest::Client::builder()
49 .timeout(DEFAULT_TIMEOUT)
50 .build()
51 .map_err(|e| SdkError::Http {
52 source: e,
53 context: "build reqwest::Client".into(),
54 })?;
55 Ok(Self {
56 http,
57 base_url: normalize_base_url(base_url.into()),
58 })
59 }
60
61 /// Build a client that sends `Authorization: Bearer <token>` on
62 /// every request. The token is passed by value so the caller
63 /// retains ownership policy (e.g. zeroize on drop at the
64 /// caller side); the SDK only reads it.
65 ///
66 /// # Empty-token guard
67 ///
68 /// An empty or all-whitespace `token` returns
69 /// [`SdkError::Config`] instead of silently constructing
70 /// `Authorization: Bearer ` (which the server rejects with
71 /// 401, leaving the operator chasing a "why is auth broken"
72 /// ghost). Common source: `FF_ADMIN_TOKEN=""` in a shell
73 /// where the var was meant to be set; the unset-expansion is
74 /// the empty string. Prefer an obvious error at construction
75 /// over a silent 401 at first request.
76 ///
77 /// If the caller genuinely wants an unauthenticated client
78 /// (dev ff-server without `api_token` configured), use
79 /// [`FlowFabricAdminClient::new`] instead.
80 pub fn with_token(
81 base_url: impl Into<String>,
82 token: impl AsRef<str>,
83 ) -> Result<Self, SdkError> {
84 let token_str = token.as_ref();
85 if token_str.trim().is_empty() {
86 return Err(SdkError::Config(
87 "bearer token is empty or all-whitespace; use \
88 FlowFabricAdminClient::new for unauthenticated access"
89 .into(),
90 ));
91 }
92 let mut headers = reqwest::header::HeaderMap::new();
93 let mut auth_value =
94 reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
95 |_| {
96 SdkError::Config(
97 "bearer token contains characters not valid in an HTTP header".into(),
98 )
99 },
100 )?;
101 // Mark Authorization as sensitive so it doesn't appear in
102 // reqwest's Debug output / logs.
103 auth_value.set_sensitive(true);
104 headers.insert(reqwest::header::AUTHORIZATION, auth_value);
105
106 let http = reqwest::Client::builder()
107 .timeout(DEFAULT_TIMEOUT)
108 .default_headers(headers)
109 .build()
110 .map_err(|e| SdkError::Http {
111 source: e,
112 context: "build reqwest::Client".into(),
113 })?;
114 Ok(Self {
115 http,
116 base_url: normalize_base_url(base_url.into()),
117 })
118 }
119
120 /// POST `/v1/workers/{worker_id}/claim` — scheduler-routed claim.
121 ///
122 /// Batch C item 2 PR-B. Swaps the SDK's direct-Valkey claim for a
123 /// server-side one: the request carries lane + identity +
124 /// capabilities + grant TTL; the server runs budget, quota, and
125 /// capability admission via `ff_scheduler::Scheduler::claim_for_worker`
126 /// and returns a `ClaimGrant` on success.
127 ///
128 /// Returns `Ok(None)` when the server responds 204 No Content
129 /// (no eligible execution on the lane). Callers that want to keep
130 /// polling should back off per their claim cadence.
131 pub async fn claim_for_worker(
132 &self,
133 req: ClaimForWorkerRequest,
134 ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
135 // Percent-encode `worker_id` in the URL path — `WorkerId` is a
136 // free-form string (could contain `/`, spaces, `%`, etc.) and
137 // splicing it verbatim would produce malformed URLs or
138 // misrouted paths. `Url::path_segments_mut().push` handles the
139 // encoding natively.
140 let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| {
141 SdkError::Config(format!("invalid base_url '{}': {e}", self.base_url))
142 })?;
143 {
144 let mut segs = url.path_segments_mut().map_err(|_| {
145 SdkError::Config(format!(
146 "base_url cannot be a base URL: '{}'",
147 self.base_url
148 ))
149 })?;
150 segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
151 }
152 let url = url.to_string();
153 let resp = self
154 .http
155 .post(&url)
156 .json(&req)
157 .send()
158 .await
159 .map_err(|e| SdkError::Http {
160 source: e,
161 context: "POST /v1/workers/{worker_id}/claim".into(),
162 })?;
163
164 let status = resp.status();
165 if status == reqwest::StatusCode::NO_CONTENT {
166 return Ok(None);
167 }
168 if status.is_success() {
169 return resp
170 .json::<ClaimForWorkerResponse>()
171 .await
172 .map(Some)
173 .map_err(|e| SdkError::Http {
174 source: e,
175 context: "decode claim_for_worker response body".into(),
176 });
177 }
178
179 // Error path — mirror rotate_waitpoint_secret's ErrorBody decode.
180 let status_u16 = status.as_u16();
181 let raw = resp.text().await.map_err(|e| SdkError::Http {
182 source: e,
183 context: format!("read claim_for_worker error body (status {status_u16})"),
184 })?;
185 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
186 Err(SdkError::AdminApi {
187 status: status_u16,
188 message: parsed
189 .as_ref()
190 .map(|b| b.error.clone())
191 .unwrap_or_else(|| raw.clone()),
192 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
193 retryable: parsed.as_ref().and_then(|b| b.retryable),
194 raw_body: raw,
195 })
196 }
197
198 /// Rotate the waitpoint HMAC secret on the server.
199 ///
200 /// Promotes the currently-installed kid to `previous_kid`
201 /// (accepted for the server's configured
202 /// `FF_WAITPOINT_HMAC_GRACE_MS` window) and installs
203 /// `new_secret_hex` under `new_kid` as the new current. Fans
204 /// out across every execution partition. Idempotent: re-running
205 /// with the same `(new_kid, new_secret_hex)` converges.
206 ///
207 /// The server returns 200 if at least one partition rotated OR
208 /// at least one partition was already rotating under a
209 /// concurrent request. See `RotateWaitpointSecretResponse`
210 /// fields for the breakdown.
211 ///
212 /// # Errors
213 ///
214 /// * [`SdkError::AdminApi`] — non-2xx response (400 invalid
215 /// input, 401 missing/bad bearer, 429 concurrent rotate,
216 /// 500 all partitions failed, 504 server-side timeout).
217 /// * [`SdkError::Http`] — transport error (connect, body
218 /// decode, client-side timeout).
219 ///
220 /// # Retry semantics
221 ///
222 /// Rotation is idempotent on the same `(new_kid,
223 /// new_secret_hex)` so retries are SAFE even on 504s or
224 /// partial failures.
225 pub async fn rotate_waitpoint_secret(
226 &self,
227 req: RotateWaitpointSecretRequest,
228 ) -> Result<RotateWaitpointSecretResponse, SdkError> {
229 let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
230 let resp = self
231 .http
232 .post(&url)
233 .json(&req)
234 .send()
235 .await
236 .map_err(|e| SdkError::Http {
237 source: e,
238 context: "POST /v1/admin/rotate-waitpoint-secret".into(),
239 })?;
240
241 let status = resp.status();
242 if status.is_success() {
243 return resp
244 .json::<RotateWaitpointSecretResponse>()
245 .await
246 .map_err(|e| SdkError::Http {
247 source: e,
248 context: "decode rotate-waitpoint-secret response body".into(),
249 });
250 }
251
252 // Non-2xx: parse the server's ErrorBody if we can, fall
253 // back to a raw body otherwise. Propagate body-read
254 // transport errors as Http rather than silently flattening
255 // them into `AdminApi { raw_body: "" }` — a connection drop
256 // mid-body-read is a transport fault, not an API-layer
257 // reject, and misclassifying it strips `is_retryable`'s
258 // timeout/connect signal from the caller.
259 let status_u16 = status.as_u16();
260 let raw = resp.text().await.map_err(|e| SdkError::Http {
261 source: e,
262 context: format!(
263 "read rotate-waitpoint-secret error response body (status {status_u16})"
264 ),
265 })?;
266 let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
267 Err(SdkError::AdminApi {
268 status: status_u16,
269 message: parsed
270 .as_ref()
271 .map(|b| b.error.clone())
272 .unwrap_or_else(|| raw.clone()),
273 kind: parsed.as_ref().and_then(|b| b.kind.clone()),
274 retryable: parsed.as_ref().and_then(|b| b.retryable),
275 raw_body: raw,
276 })
277 }
278}
279
280/// Request body for `POST /v1/admin/rotate-waitpoint-secret`.
281///
282/// Mirrors `ff_server::api::RotateWaitpointSecretBody` 1:1.
283#[derive(Debug, Clone, Serialize)]
284pub struct RotateWaitpointSecretRequest {
285 /// New key identifier. Non-empty, must not contain `:` (the
286 /// server uses `:` as the field separator in the secret hash).
287 pub new_kid: String,
288 /// Hex-encoded new secret. Even-length, `[0-9a-fA-F]`.
289 pub new_secret_hex: String,
290}
291
292/// Response body for `POST /v1/admin/rotate-waitpoint-secret`.
293///
294/// Mirrors `ff_server::server::RotateWaitpointSecretResult` 1:1.
295/// The server serializes this struct as-is via `Json(result)`.
296#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
297pub struct RotateWaitpointSecretResponse {
298 /// Count of partitions that accepted the rotation.
299 pub rotated: u16,
300 /// Partition indices where the rotation failed — operator
301 /// should investigate. Rotation is idempotent on the same
302 /// `(new_kid, new_secret_hex)` so a retry after the underlying
303 /// fault clears converges.
304 pub failed: Vec<u16>,
305 /// Partition indices where another rotation was already in
306 /// progress (per-partition `SETNX` lock held). These will be
307 /// rotated by the concurrent request; NOT a fault.
308 #[serde(default)]
309 pub in_progress: Vec<u16>,
310 /// The `new_kid` that was installed as current on every
311 /// rotated partition — echoes the request field back for
312 /// confirmation.
313 pub new_kid: String,
314}
315
316/// Server-side error body shape, as emitted by
317/// `ff_server::api::ErrorBody`. Kept internal because consumers
318/// match on the flattened fields of [`SdkError::AdminApi`].
319#[derive(Debug, Clone, Deserialize)]
320struct AdminErrorBody {
321 error: String,
322 #[serde(default)]
323 kind: Option<String>,
324 #[serde(default)]
325 retryable: Option<bool>,
326}
327
328/// Request body for `POST /v1/workers/{worker_id}/claim`.
329///
330/// Mirrors `ff_server::api::ClaimForWorkerBody` 1:1. `worker_id`
331/// goes in the URL path (not the body) but is kept on the struct
332/// for ergonomics — callers don't juggle a separate arg.
333#[derive(Debug, Clone, Serialize)]
334pub struct ClaimForWorkerRequest {
335 #[serde(skip)]
336 pub worker_id: String,
337 pub lane_id: String,
338 pub worker_instance_id: String,
339 #[serde(default)]
340 pub capabilities: Vec<String>,
341 /// Grant TTL in milliseconds. Server rejects 0 or anything over
342 /// 60s (its `CLAIM_GRANT_TTL_MS_MAX`).
343 pub grant_ttl_ms: u64,
344}
345
346/// Response body for `POST /v1/workers/{worker_id}/claim`.
347///
348/// Wire shape of `ff_core::contracts::ClaimGrant`. The core type is
349/// not serde-derived (carries a `Partition` with a non-scalar family
350/// enum) so the SDK decodes into this DTO and reconstructs the core
351/// type via [`Self::into_grant`].
352#[derive(Debug, Clone, Deserialize)]
353pub struct ClaimForWorkerResponse {
354 pub execution_id: String,
355 pub partition_family: String,
356 pub partition_index: u16,
357 pub grant_key: String,
358 pub expires_at_ms: u64,
359}
360
361impl ClaimForWorkerResponse {
362 /// Convert the wire DTO into a typed
363 /// [`ff_core::contracts::ClaimGrant`] for handoff to
364 /// [`crate::FlowFabricWorker::claim_from_grant`]. Returns
365 /// [`SdkError::AdminApi`] on malformed execution_id / unknown
366 /// partition_family (server and SDK have drifted; fail loud so
367 /// callers don't route to a ghost partition).
368 pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
369 let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
370 .map_err(|e| SdkError::AdminApi {
371 status: 200,
372 message: format!(
373 "claim_for_worker: server returned malformed execution_id '{}': {e}",
374 self.execution_id
375 ),
376 kind: Some("malformed_response".to_owned()),
377 retryable: Some(false),
378 raw_body: String::new(),
379 })?;
380 let family = match self.partition_family.as_str() {
381 "flow" => ff_core::partition::PartitionFamily::Flow,
382 "execution" => ff_core::partition::PartitionFamily::Execution,
383 "budget" => ff_core::partition::PartitionFamily::Budget,
384 "quota" => ff_core::partition::PartitionFamily::Quota,
385 other => {
386 return Err(SdkError::AdminApi {
387 status: 200,
388 message: format!(
389 "claim_for_worker: unknown partition_family '{other}'"
390 ),
391 kind: Some("malformed_response".to_owned()),
392 retryable: Some(false),
393 raw_body: String::new(),
394 });
395 }
396 };
397 Ok(ff_core::contracts::ClaimGrant {
398 execution_id,
399 partition: ff_core::partition::Partition {
400 family,
401 index: self.partition_index,
402 },
403 grant_key: self.grant_key,
404 expires_at_ms: self.expires_at_ms,
405 })
406 }
407}
408
409/// Trim trailing slashes from a base URL so `format!("{base}/v1/...")`
410/// never produces `https://host//v1/...`. Mirror of
411/// media-pipeline's pattern.
412fn normalize_base_url(mut url: String) -> String {
413 while url.ends_with('/') {
414 url.pop();
415 }
416 url
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn base_url_strips_trailing_slash() {
425 assert_eq!(normalize_base_url("http://x".into()), "http://x");
426 assert_eq!(normalize_base_url("http://x/".into()), "http://x");
427 assert_eq!(normalize_base_url("http://x///".into()), "http://x");
428 }
429
430 #[test]
431 fn with_token_rejects_bad_header_chars() {
432 // Raw newline in the token would split the Authorization
433 // header — must fail loudly at construction.
434 let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
435 assert!(matches!(err, SdkError::Config(_)), "got: {err:?}");
436 }
437
438 #[test]
439 fn with_token_rejects_empty_or_whitespace() {
440 // Exact shell footgun: FF_ADMIN_TOKEN="" expands to "".
441 // Fail loudly at construction instead of shipping a client
442 // that silently 401s on first request.
443 for s in ["", " ", "\t\n ", " "] {
444 let err = FlowFabricAdminClient::with_token("http://x", s)
445 .unwrap_err();
446 assert!(
447 matches!(&err, SdkError::Config(msg) if msg.contains("empty")),
448 "token {s:?} should return Config(empty/whitespace); got: {err:?}"
449 );
450 }
451 }
452
453 #[test]
454 fn admin_error_body_deserialises_optional_fields() {
455 // `kind` + `retryable` absent (the usual shape for 400s).
456 let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
457 assert_eq!(b.error, "bad new_kid");
458 assert!(b.kind.is_none());
459 assert!(b.retryable.is_none());
460
461 // `kind` + `retryable` present (500 ValkeyError shape).
462 let b: AdminErrorBody = serde_json::from_str(
463 r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
464 )
465 .unwrap();
466 assert_eq!(b.error, "valkey: timed out");
467 assert_eq!(b.kind.as_deref(), Some("IoError"));
468 assert_eq!(b.retryable, Some(true));
469 }
470
471 #[test]
472 fn rotate_response_deserialises_server_shape() {
473 // Exact shape the server emits (ff-server server.rs:2636).
474 let raw = r#"{
475 "rotated": 3,
476 "failed": [4, 5],
477 "in_progress": [6],
478 "new_kid": "kid-2026-04-18"
479 }"#;
480 let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
481 assert_eq!(r.rotated, 3);
482 assert_eq!(r.failed, vec![4, 5]);
483 assert_eq!(r.in_progress, vec![6]);
484 assert_eq!(r.new_kid, "kid-2026-04-18");
485 }
486
487 #[test]
488 fn rotate_response_handles_missing_in_progress() {
489 // Older server versions may omit in_progress. Default to
490 // empty so the client stays forward-compatible.
491 let raw = r#"{"rotated": 1, "failed": [], "new_kid": "k1"}"#;
492 let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
493 assert_eq!(r.in_progress, Vec::<u16>::new());
494 }
495
496 // ── ClaimForWorkerResponse::into_grant ──
497
498 fn sample_claim_response(family: &str) -> ClaimForWorkerResponse {
499 ClaimForWorkerResponse {
500 execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
501 partition_family: family.to_owned(),
502 partition_index: 5,
503 grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
504 expires_at_ms: 1_700_000_000_000,
505 }
506 }
507
508 #[test]
509 fn into_grant_accepts_all_known_families() {
510 for family in ["flow", "execution", "budget", "quota"] {
511 let g = sample_claim_response(family).into_grant().unwrap_or_else(|e| {
512 panic!("family {family} should parse: {e:?}")
513 });
514 assert_eq!(g.partition.index, 5);
515 assert_eq!(g.expires_at_ms, 1_700_000_000_000);
516 }
517 }
518
519 #[test]
520 fn into_grant_rejects_unknown_family() {
521 let resp = sample_claim_response("nonsense");
522 let err = resp.into_grant().unwrap_err();
523 match err {
524 SdkError::AdminApi { message, kind, .. } => {
525 assert!(message.contains("unknown partition_family"),
526 "msg: {message}");
527 assert_eq!(kind.as_deref(), Some("malformed_response"));
528 }
529 other => panic!("expected AdminApi, got {other:?}"),
530 }
531 }
532
533 #[test]
534 fn into_grant_rejects_malformed_execution_id() {
535 let mut resp = sample_claim_response("flow");
536 resp.execution_id = "not-a-valid-eid".to_owned();
537 let err = resp.into_grant().unwrap_err();
538 match err {
539 SdkError::AdminApi { message, kind, .. } => {
540 assert!(message.contains("malformed execution_id"),
541 "msg: {message}");
542 assert_eq!(kind.as_deref(), Some("malformed_response"));
543 }
544 other => panic!("expected AdminApi, got {other:?}"),
545 }
546 }
547}