use std::time::Duration;
use ff_core::contracts::{RotateWaitpointHmacSecretArgs, RotateWaitpointHmacSecretOutcome};
use ff_core::keys::IndexKeys;
use ff_core::partition::{Partition, PartitionFamily};
use serde::{Deserialize, Serialize};
use crate::SdkError;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
#[derive(Debug, Clone)]
pub struct FlowFabricAdminClient {
http: reqwest::Client,
base_url: String,
}
impl FlowFabricAdminClient {
pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
let http = reqwest::Client::builder()
.timeout(DEFAULT_TIMEOUT)
.build()
.map_err(|e| SdkError::Http {
source: e,
context: "build reqwest::Client".into(),
})?;
Ok(Self {
http,
base_url: normalize_base_url(base_url.into()),
})
}
pub fn with_token(
base_url: impl Into<String>,
token: impl AsRef<str>,
) -> Result<Self, SdkError> {
let token_str = token.as_ref();
if token_str.trim().is_empty() {
return Err(SdkError::Config {
context: "admin_client".into(),
field: Some("bearer_token".into()),
message: "is empty or all-whitespace; use \
FlowFabricAdminClient::new for unauthenticated access"
.into(),
});
}
let mut headers = reqwest::header::HeaderMap::new();
let mut auth_value =
reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
|_| SdkError::Config {
context: "admin_client".into(),
field: Some("bearer_token".into()),
message: "contains characters not valid in an HTTP header".into(),
},
)?;
auth_value.set_sensitive(true);
headers.insert(reqwest::header::AUTHORIZATION, auth_value);
let http = reqwest::Client::builder()
.timeout(DEFAULT_TIMEOUT)
.default_headers(headers)
.build()
.map_err(|e| SdkError::Http {
source: e,
context: "build reqwest::Client".into(),
})?;
Ok(Self {
http,
base_url: normalize_base_url(base_url.into()),
})
}
pub async fn claim_for_worker(
&self,
req: ClaimForWorkerRequest,
) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
let mut url = reqwest::Url::parse(&self.base_url).map_err(|e| SdkError::Config {
context: "admin_client: claim_for_worker".into(),
field: Some("base_url".into()),
message: format!("invalid base_url '{}': {e}", self.base_url),
})?;
{
let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
context: "admin_client: claim_for_worker".into(),
field: Some("base_url".into()),
message: format!("base_url cannot be a base URL: '{}'", self.base_url),
})?;
segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
}
let url = url.to_string();
let resp = self
.http
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "POST /v1/workers/{worker_id}/claim".into(),
})?;
let status = resp.status();
if status == reqwest::StatusCode::NO_CONTENT {
return Ok(None);
}
if status.is_success() {
return resp
.json::<ClaimForWorkerResponse>()
.await
.map(Some)
.map_err(|e| SdkError::Http {
source: e,
context: "decode claim_for_worker response body".into(),
});
}
let status_u16 = status.as_u16();
let raw = resp.text().await.map_err(|e| SdkError::Http {
source: e,
context: format!("read claim_for_worker error body (status {status_u16})"),
})?;
let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
Err(SdkError::AdminApi {
status: status_u16,
message: parsed
.as_ref()
.map(|b| b.error.clone())
.unwrap_or_else(|| raw.clone()),
kind: parsed.as_ref().and_then(|b| b.kind.clone()),
retryable: parsed.as_ref().and_then(|b| b.retryable),
raw_body: raw,
})
}
pub async fn rotate_waitpoint_secret(
&self,
req: RotateWaitpointSecretRequest,
) -> Result<RotateWaitpointSecretResponse, SdkError> {
let url = format!("{}/v1/admin/rotate-waitpoint-secret", self.base_url);
let resp = self
.http
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "POST /v1/admin/rotate-waitpoint-secret".into(),
})?;
let status = resp.status();
if status.is_success() {
return resp
.json::<RotateWaitpointSecretResponse>()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "decode rotate-waitpoint-secret response body".into(),
});
}
let status_u16 = status.as_u16();
let raw = resp.text().await.map_err(|e| SdkError::Http {
source: e,
context: format!(
"read rotate-waitpoint-secret error response body (status {status_u16})"
),
})?;
let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
Err(SdkError::AdminApi {
status: status_u16,
message: parsed
.as_ref()
.map(|b| b.error.clone())
.unwrap_or_else(|| raw.clone()),
kind: parsed.as_ref().and_then(|b| b.kind.clone()),
retryable: parsed.as_ref().and_then(|b| b.retryable),
raw_body: raw,
})
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RotateWaitpointSecretRequest {
pub new_kid: String,
pub new_secret_hex: String,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct RotateWaitpointSecretResponse {
pub rotated: u16,
pub failed: Vec<u16>,
pub new_kid: String,
}
#[derive(Debug, Clone, Deserialize)]
struct AdminErrorBody {
error: String,
#[serde(default)]
kind: Option<String>,
#[serde(default)]
retryable: Option<bool>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ClaimForWorkerRequest {
#[serde(skip)]
pub worker_id: String,
pub lane_id: String,
pub worker_instance_id: String,
#[serde(default)]
pub capabilities: Vec<String>,
pub grant_ttl_ms: u64,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ClaimForWorkerResponse {
pub execution_id: String,
pub partition_key: ff_core::partition::PartitionKey,
pub grant_key: String,
pub expires_at_ms: u64,
}
impl ClaimForWorkerResponse {
pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
.map_err(|e| SdkError::AdminApi {
status: 200,
message: format!(
"claim_for_worker: server returned malformed execution_id '{}': {e}",
self.execution_id
),
kind: Some("malformed_response".to_owned()),
retryable: Some(false),
raw_body: String::new(),
})?;
Ok(ff_core::contracts::ClaimGrant {
execution_id,
partition_key: self.partition_key,
grant_key: self.grant_key,
expires_at_ms: self.expires_at_ms,
})
}
}
#[derive(Debug)]
pub struct PartitionRotationOutcome {
pub partition: u16,
pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
}
pub async fn rotate_waitpoint_hmac_secret_all_partitions(
client: &ferriskey::Client,
num_partitions: u16,
new_kid: &str,
new_secret_hex: &str,
grace_ms: u64,
) -> Vec<PartitionRotationOutcome> {
let args = RotateWaitpointHmacSecretArgs {
new_kid: new_kid.to_owned(),
new_secret_hex: new_secret_hex.to_owned(),
grace_ms,
};
let mut out = Vec::with_capacity(num_partitions as usize);
for index in 0..num_partitions {
let partition = Partition {
family: PartitionFamily::Execution,
index,
};
let idx = IndexKeys::new(&partition);
let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
client, &idx, &args,
)
.await
.map_err(SdkError::from);
out.push(PartitionRotationOutcome {
partition: index,
result,
});
}
out
}
fn normalize_base_url(mut url: String) -> String {
while url.ends_with('/') {
url.pop();
}
url
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn base_url_strips_trailing_slash() {
assert_eq!(normalize_base_url("http://x".into()), "http://x");
assert_eq!(normalize_base_url("http://x/".into()), "http://x");
assert_eq!(normalize_base_url("http://x///".into()), "http://x");
}
#[test]
fn with_token_rejects_bad_header_chars() {
let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
assert!(
matches!(err, SdkError::Config { .. }),
"got: {err:?}"
);
}
#[test]
fn with_token_rejects_empty_or_whitespace() {
for s in ["", " ", "\t\n ", " "] {
let err = FlowFabricAdminClient::with_token("http://x", s)
.unwrap_err();
assert!(
matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
"token {s:?} should return Config with field=bearer_token; got: {err:?}"
);
}
}
#[test]
fn admin_error_body_deserialises_optional_fields() {
let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
assert_eq!(b.error, "bad new_kid");
assert!(b.kind.is_none());
assert!(b.retryable.is_none());
let b: AdminErrorBody = serde_json::from_str(
r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
)
.unwrap();
assert_eq!(b.error, "valkey: timed out");
assert_eq!(b.kind.as_deref(), Some("IoError"));
assert_eq!(b.retryable, Some(true));
}
#[test]
fn rotate_response_deserialises_server_shape() {
let raw = r#"{
"rotated": 3,
"failed": [4, 5],
"new_kid": "kid-2026-04-18"
}"#;
let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
assert_eq!(r.rotated, 3);
assert_eq!(r.failed, vec![4, 5]);
assert_eq!(r.new_kid, "kid-2026-04-18");
}
fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
ClaimForWorkerResponse {
execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
partition_key: serde_json::from_str(
&serde_json::to_string(partition_key).unwrap(),
)
.unwrap(),
grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
expires_at_ms: 1_700_000_000_000,
}
}
#[test]
fn into_grant_preserves_all_known_partition_key_shapes() {
for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
panic!("key {key_str} should parse: {e:?}")
});
assert_eq!(g.partition_key.as_str(), key_str);
assert_eq!(g.expires_at_ms, 1_700_000_000_000);
}
}
#[test]
fn into_grant_preserves_opaque_partition_key() {
let resp = sample_claim_response("{zz:0}");
let g = resp.into_grant().expect("SDK must not parse partition_key");
assert_eq!(g.partition_key.as_str(), "{zz:0}");
assert!(g.partition().is_err());
}
#[test]
fn into_grant_rejects_malformed_execution_id() {
let mut resp = sample_claim_response("{fp:5}");
resp.execution_id = "not-a-valid-eid".to_owned();
let err = resp.into_grant().unwrap_err();
match err {
SdkError::AdminApi { message, kind, .. } => {
assert!(message.contains("malformed execution_id"),
"msg: {message}");
assert_eq!(kind.as_deref(), Some("malformed_response"));
}
other => panic!("expected AdminApi, got {other:?}"),
}
}
#[test]
fn claim_for_worker_response_deserialises_opaque_partition_key() {
let raw = r#"{
"execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
"partition_key": "{fp:7}",
"grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
"expires_at_ms": 1700000000000
}"#;
let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
assert_eq!(r.partition_key.as_str(), "{fp:7}");
assert_eq!(r.expires_at_ms, 1_700_000_000_000);
}
}