use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::SdkError;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
#[derive(Debug, Clone)]
pub struct FlowFabricAdminClient {
http: reqwest::Client,
base_url: String,
}
impl FlowFabricAdminClient {
pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
let http = reqwest::Client::builder()
.timeout(DEFAULT_TIMEOUT)
.build()
.map_err(|e| SdkError::Http {
source: e,
context: "build reqwest::Client".into(),
})?;
Ok(Self {
http,
base_url: normalize_base_url(base_url.into()),
})
}
pub fn with_token(
base_url: impl Into<String>,
token: impl AsRef<str>,
) -> Result<Self, SdkError> {
let token_str = token.as_ref();
if token_str.trim().is_empty() {
return Err(SdkError::Config(
"bearer token is empty or all-whitespace; use \
FlowFabricAdminClient::new for unauthenticated access"
.into(),
));
}
let mut headers = reqwest::header::HeaderMap::new();
let mut auth_value =
reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
|_| {
SdkError::Config(
"bearer token contains characters not valid in an HTTP header".into(),
)
},
)?;
auth_value.set_sensitive(true);
headers.insert(reqwest::header::AUTHORIZATION, auth_value);
let http = reqwest::Client::builder()
.timeout(DEFAULT_TIMEOUT)
.default_headers(headers)
.build()
.map_err(|e| SdkError::Http {
source: e,
context: "build reqwest::Client".into(),
})?;
Ok(Self {
http,
base_url: normalize_base_url(base_url.into()),
})
}
pub async fn claim_for_worker(
&self,
req: ClaimForWorkerRequest,
) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| {
SdkError::Config(format!("invalid base_url '{}': {e}", self.base_url))
})?;
{
let mut segs = url.path_segments_mut().map_err(|_| {
SdkError::Config(format!(
"base_url cannot be a base URL: '{}'",
self.base_url
))
})?;
segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
}
let url = url.to_string();
let resp = self
.http
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "POST /v1/workers/{worker_id}/claim".into(),
})?;
let status = resp.status();
if status == reqwest::StatusCode::NO_CONTENT {
return Ok(None);
}
if status.is_success() {
return resp
.json::<ClaimForWorkerResponse>()
.await
.map(Some)
.map_err(|e| SdkError::Http {
source: e,
context: "decode claim_for_worker response body".into(),
});
}
let status_u16 = status.as_u16();
let raw = resp.text().await.map_err(|e| SdkError::Http {
source: e,
context: format!("read claim_for_worker error body (status {status_u16})"),
})?;
let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
Err(SdkError::AdminApi {
status: status_u16,
message: parsed
.as_ref()
.map(|b| b.error.clone())
.unwrap_or_else(|| raw.clone()),
kind: parsed.as_ref().and_then(|b| b.kind.clone()),
retryable: parsed.as_ref().and_then(|b| b.retryable),
raw_body: raw,
})
}
pub async fn rotate_waitpoint_secret(
&self,
req: RotateWaitpointSecretRequest,
) -> Result<RotateWaitpointSecretResponse, SdkError> {
let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
let resp = self
.http
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "POST /v1/admin/rotate-waitpoint-secret".into(),
})?;
let status = resp.status();
if status.is_success() {
return resp
.json::<RotateWaitpointSecretResponse>()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "decode rotate-waitpoint-secret response body".into(),
});
}
let status_u16 = status.as_u16();
let raw = resp.text().await.map_err(|e| SdkError::Http {
source: e,
context: format!(
"read rotate-waitpoint-secret error response body (status {status_u16})"
),
})?;
let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
Err(SdkError::AdminApi {
status: status_u16,
message: parsed
.as_ref()
.map(|b| b.error.clone())
.unwrap_or_else(|| raw.clone()),
kind: parsed.as_ref().and_then(|b| b.kind.clone()),
retryable: parsed.as_ref().and_then(|b| b.retryable),
raw_body: raw,
})
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RotateWaitpointSecretRequest {
pub new_kid: String,
pub new_secret_hex: String,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct RotateWaitpointSecretResponse {
pub rotated: u16,
pub failed: Vec<u16>,
#[serde(default)]
pub in_progress: Vec<u16>,
pub new_kid: String,
}
#[derive(Debug, Clone, Deserialize)]
struct AdminErrorBody {
error: String,
#[serde(default)]
kind: Option<String>,
#[serde(default)]
retryable: Option<bool>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ClaimForWorkerRequest {
#[serde(skip)]
pub worker_id: String,
pub lane_id: String,
pub worker_instance_id: String,
#[serde(default)]
pub capabilities: Vec<String>,
pub grant_ttl_ms: u64,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ClaimForWorkerResponse {
pub execution_id: String,
pub partition_family: String,
pub partition_index: u16,
pub grant_key: String,
pub expires_at_ms: u64,
}
impl ClaimForWorkerResponse {
pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
.map_err(|e| SdkError::AdminApi {
status: 200,
message: format!(
"claim_for_worker: server returned malformed execution_id '{}': {e}",
self.execution_id
),
kind: Some("malformed_response".to_owned()),
retryable: Some(false),
raw_body: String::new(),
})?;
let family = match self.partition_family.as_str() {
"flow" => ff_core::partition::PartitionFamily::Flow,
"execution" => ff_core::partition::PartitionFamily::Execution,
"budget" => ff_core::partition::PartitionFamily::Budget,
"quota" => ff_core::partition::PartitionFamily::Quota,
other => {
return Err(SdkError::AdminApi {
status: 200,
message: format!(
"claim_for_worker: unknown partition_family '{other}'"
),
kind: Some("malformed_response".to_owned()),
retryable: Some(false),
raw_body: String::new(),
});
}
};
Ok(ff_core::contracts::ClaimGrant {
execution_id,
partition: ff_core::partition::Partition {
family,
index: self.partition_index,
},
grant_key: self.grant_key,
expires_at_ms: self.expires_at_ms,
})
}
}
fn normalize_base_url(mut url: String) -> String {
while url.ends_with('/') {
url.pop();
}
url
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn base_url_strips_trailing_slash() {
assert_eq!(normalize_base_url("http://x".into()), "http://x");
assert_eq!(normalize_base_url("http://x/".into()), "http://x");
assert_eq!(normalize_base_url("http://x///".into()), "http://x");
}
#[test]
fn with_token_rejects_bad_header_chars() {
let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
assert!(matches!(err, SdkError::Config(_)), "got: {err:?}");
}
#[test]
fn with_token_rejects_empty_or_whitespace() {
for s in ["", " ", "\t\n ", " "] {
let err = FlowFabricAdminClient::with_token("http://x", s)
.unwrap_err();
assert!(
matches!(&err, SdkError::Config(msg) if msg.contains("empty")),
"token {s:?} should return Config(empty/whitespace); got: {err:?}"
);
}
}
#[test]
fn admin_error_body_deserialises_optional_fields() {
let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
assert_eq!(b.error, "bad new_kid");
assert!(b.kind.is_none());
assert!(b.retryable.is_none());
let b: AdminErrorBody = serde_json::from_str(
r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
)
.unwrap();
assert_eq!(b.error, "valkey: timed out");
assert_eq!(b.kind.as_deref(), Some("IoError"));
assert_eq!(b.retryable, Some(true));
}
#[test]
fn rotate_response_deserialises_server_shape() {
let raw = r#"{
"rotated": 3,
"failed": [4, 5],
"in_progress": [6],
"new_kid": "kid-2026-04-18"
}"#;
let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
assert_eq!(r.rotated, 3);
assert_eq!(r.failed, vec![4, 5]);
assert_eq!(r.in_progress, vec![6]);
assert_eq!(r.new_kid, "kid-2026-04-18");
}
#[test]
fn rotate_response_handles_missing_in_progress() {
let raw = r#"{"rotated": 1, "failed": [], "new_kid": "k1"}"#;
let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
assert_eq!(r.in_progress, Vec::<u16>::new());
}
fn sample_claim_response(family: &str) -> ClaimForWorkerResponse {
ClaimForWorkerResponse {
execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
partition_family: family.to_owned(),
partition_index: 5,
grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
expires_at_ms: 1_700_000_000_000,
}
}
#[test]
fn into_grant_accepts_all_known_families() {
for family in ["flow", "execution", "budget", "quota"] {
let g = sample_claim_response(family).into_grant().unwrap_or_else(|e| {
panic!("family {family} should parse: {e:?}")
});
assert_eq!(g.partition.index, 5);
assert_eq!(g.expires_at_ms, 1_700_000_000_000);
}
}
#[test]
fn into_grant_rejects_unknown_family() {
let resp = sample_claim_response("nonsense");
let err = resp.into_grant().unwrap_err();
match err {
SdkError::AdminApi { message, kind, .. } => {
assert!(message.contains("unknown partition_family"),
"msg: {message}");
assert_eq!(kind.as_deref(), Some("malformed_response"));
}
other => panic!("expected AdminApi, got {other:?}"),
}
}
#[test]
fn into_grant_rejects_malformed_execution_id() {
let mut resp = sample_claim_response("flow");
resp.execution_id = "not-a-valid-eid".to_owned();
let err = resp.into_grant().unwrap_err();
match err {
SdkError::AdminApi { message, kind, .. } => {
assert!(message.contains("malformed execution_id"),
"msg: {message}");
assert_eq!(kind.as_deref(), Some("malformed_response"));
}
other => panic!("expected AdminApi, got {other:?}"),
}
}
}