use std::sync::Arc;
use std::time::Duration;
use ff_core::contracts::RotateWaitpointHmacSecretOutcome;
use ff_core::engine_backend::EngineBackend;
#[cfg(feature = "valkey-default")]
use ff_core::contracts::RotateWaitpointHmacSecretArgs;
#[cfg(feature = "valkey-default")]
use ff_core::keys::IndexKeys;
#[cfg(feature = "valkey-default")]
use ff_core::partition::{Partition, PartitionFamily};
use serde::{Deserialize, Serialize};
use crate::SdkError;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(130);
pub const EMBEDDED_WAITPOINT_HMAC_GRACE_MS: u64 = 86_400_000;
const EMBEDDED_GRANT_TTL_MS_MAX: u64 = 60_000;
#[derive(Debug, Clone)]
pub struct FlowFabricAdminClient {
transport: AdminTransport,
}
#[derive(Clone)]
enum AdminTransport {
Http {
http: reqwest::Client,
base_url: String,
},
Embedded(Arc<dyn EngineBackend>),
}
impl std::fmt::Debug for AdminTransport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AdminTransport::Http { base_url, .. } => f
.debug_struct("Http")
.field("base_url", base_url)
.finish_non_exhaustive(),
AdminTransport::Embedded(backend) => f
.debug_struct("Embedded")
.field("backend", &backend.backend_label())
.finish(),
}
}
}
impl FlowFabricAdminClient {
pub fn new(base_url: impl Into<String>) -> Result<Self, SdkError> {
let http = reqwest::Client::builder()
.timeout(DEFAULT_TIMEOUT)
.build()
.map_err(|e| SdkError::Http {
source: e,
context: "build reqwest::Client".into(),
})?;
Ok(Self {
transport: AdminTransport::Http {
http,
base_url: normalize_base_url(base_url.into()),
},
})
}
pub fn connect_with(backend: Arc<dyn EngineBackend>) -> Self {
Self {
transport: AdminTransport::Embedded(backend),
}
}
pub fn with_token(
base_url: impl Into<String>,
token: impl AsRef<str>,
) -> Result<Self, SdkError> {
let token_str = token.as_ref();
if token_str.trim().is_empty() {
return Err(SdkError::Config {
context: "admin_client".into(),
field: Some("bearer_token".into()),
message: "is empty or all-whitespace; use \
FlowFabricAdminClient::new for unauthenticated access"
.into(),
});
}
let mut headers = reqwest::header::HeaderMap::new();
let mut auth_value =
reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token_str)).map_err(
|_| SdkError::Config {
context: "admin_client".into(),
field: Some("bearer_token".into()),
message: "contains characters not valid in an HTTP header".into(),
},
)?;
auth_value.set_sensitive(true);
headers.insert(reqwest::header::AUTHORIZATION, auth_value);
let http = reqwest::Client::builder()
.timeout(DEFAULT_TIMEOUT)
.default_headers(headers)
.build()
.map_err(|e| SdkError::Http {
source: e,
context: "build reqwest::Client".into(),
})?;
Ok(Self {
transport: AdminTransport::Http {
http,
base_url: normalize_base_url(base_url.into()),
},
})
}
pub async fn claim_for_worker(
&self,
req: ClaimForWorkerRequest,
) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
match &self.transport {
AdminTransport::Http { http, base_url } => {
claim_for_worker_http(http, base_url, req).await
}
AdminTransport::Embedded(backend) => {
claim_for_worker_embedded(backend.as_ref(), req).await
}
}
}
pub async fn rotate_waitpoint_secret(
&self,
req: RotateWaitpointSecretRequest,
) -> Result<RotateWaitpointSecretResponse, SdkError> {
match &self.transport {
AdminTransport::Http { http, base_url } => {
rotate_waitpoint_secret_http(http, base_url, req).await
}
AdminTransport::Embedded(backend) => {
rotate_waitpoint_secret_embedded(backend.as_ref(), req).await
}
}
}
pub async fn read_waitpoint_token(
&self,
execution_id: &ff_core::types::ExecutionId,
waitpoint_id: &ff_core::types::WaitpointId,
) -> Result<Option<String>, SdkError> {
match &self.transport {
AdminTransport::Http { http, base_url } => {
read_waitpoint_token_http(http, base_url, execution_id, waitpoint_id).await
}
AdminTransport::Embedded(backend) => {
read_waitpoint_token_embedded(backend.as_ref(), execution_id, waitpoint_id).await
}
}
}
pub async fn issue_reclaim_grant(
&self,
execution_id: &str,
req: IssueReclaimGrantRequest,
) -> Result<IssueReclaimGrantResponse, SdkError> {
match &self.transport {
AdminTransport::Http { http, base_url } => {
issue_reclaim_grant_http(http, base_url, execution_id, req).await
}
AdminTransport::Embedded(backend) => {
issue_reclaim_grant_embedded(backend.as_ref(), execution_id, req).await
}
}
}
}
async fn claim_for_worker_http(
http: &reqwest::Client,
base_url: &str,
req: ClaimForWorkerRequest,
) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
context: "admin_client: claim_for_worker".into(),
field: Some("base_url".into()),
message: format!("invalid base_url '{}': {e}", base_url),
})?;
{
let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
context: "admin_client: claim_for_worker".into(),
field: Some("base_url".into()),
message: format!("base_url cannot be a base URL: '{}'", base_url),
})?;
segs.extend(&["v1", "workers", &req.worker_id, "claim"]);
}
let url = url.to_string();
let resp = http
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "POST /v1/workers/{worker_id}/claim".into(),
})?;
let status = resp.status();
if status == reqwest::StatusCode::NO_CONTENT {
return Ok(None);
}
if status.is_success() {
return resp
.json::<ClaimForWorkerResponse>()
.await
.map(Some)
.map_err(|e| SdkError::Http {
source: e,
context: "decode claim_for_worker response body".into(),
});
}
let status_u16 = status.as_u16();
let raw = resp.text().await.map_err(|e| SdkError::Http {
source: e,
context: format!("read claim_for_worker error body (status {status_u16})"),
})?;
let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
Err(SdkError::AdminApi {
status: status_u16,
message: parsed
.as_ref()
.map(|b| b.error.clone())
.unwrap_or_else(|| raw.clone()),
kind: parsed.as_ref().and_then(|b| b.kind.clone()),
retryable: parsed.as_ref().and_then(|b| b.retryable),
raw_body: raw,
})
}
async fn rotate_waitpoint_secret_http(
http: &reqwest::Client,
base_url: &str,
req: RotateWaitpointSecretRequest,
) -> Result<RotateWaitpointSecretResponse, SdkError> {
let url = format!("{}/v1/admin/rotate-waitpoint-secret", base_url);
let resp = http
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "POST /v1/admin/rotate-waitpoint-secret".into(),
})?;
let status = resp.status();
if status.is_success() {
return resp
.json::<RotateWaitpointSecretResponse>()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "decode rotate-waitpoint-secret response body".into(),
});
}
let status_u16 = status.as_u16();
let raw = resp.text().await.map_err(|e| SdkError::Http {
source: e,
context: format!(
"read rotate-waitpoint-secret error response body (status {status_u16})"
),
})?;
let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
Err(SdkError::AdminApi {
status: status_u16,
message: parsed
.as_ref()
.map(|b| b.error.clone())
.unwrap_or_else(|| raw.clone()),
kind: parsed.as_ref().and_then(|b| b.kind.clone()),
retryable: parsed.as_ref().and_then(|b| b.retryable),
raw_body: raw,
})
}
async fn issue_reclaim_grant_http(
http: &reqwest::Client,
base_url: &str,
execution_id: &str,
req: IssueReclaimGrantRequest,
) -> Result<IssueReclaimGrantResponse, SdkError> {
let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
context: "admin_client: issue_reclaim_grant".into(),
field: Some("base_url".into()),
message: format!("invalid base_url '{}': {e}", base_url),
})?;
{
let mut segs = url.path_segments_mut().map_err(|_| SdkError::Config {
context: "admin_client: issue_reclaim_grant".into(),
field: Some("base_url".into()),
message: format!("base_url cannot be a base URL: '{}'", base_url),
})?;
segs.extend(&["v1", "executions", execution_id, "reclaim"]);
}
let url = url.to_string();
let resp = http
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "POST /v1/executions/{id}/reclaim".into(),
})?;
let status = resp.status();
if status.is_success() {
return resp
.json::<IssueReclaimGrantResponse>()
.await
.map_err(|e| SdkError::Http {
source: e,
context: "decode issue_reclaim_grant response body".into(),
});
}
let status_u16 = status.as_u16();
let raw = resp.text().await.map_err(|e| SdkError::Http {
source: e,
context: format!(
"read issue_reclaim_grant error body (status {status_u16})"
),
})?;
let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
Err(SdkError::AdminApi {
status: status_u16,
message: parsed
.as_ref()
.map(|b| b.error.clone())
.unwrap_or_else(|| raw.clone()),
kind: parsed.as_ref().and_then(|b| b.kind.clone()),
retryable: parsed.as_ref().and_then(|b| b.retryable),
raw_body: raw,
})
}
fn validate_admin_identifier(
op: &'static str,
field: &'static str,
value: &str,
) -> Result<(), SdkError> {
if value.is_empty() {
return Err(SdkError::Config {
context: format!("admin_client: {op}"),
field: Some(field.into()),
message: "must not be empty".into(),
});
}
if value.len() > 256 {
return Err(SdkError::Config {
context: format!("admin_client: {op}"),
field: Some(field.into()),
message: format!("exceeds 256 bytes (got {})", value.len()),
});
}
if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
return Err(SdkError::Config {
context: format!("admin_client: {op}"),
field: Some(field.into()),
message: "must not contain whitespace or control characters".into(),
});
}
Ok(())
}
fn validate_admin_grant_ttl(op: &'static str, grant_ttl_ms: u64) -> Result<(), SdkError> {
if grant_ttl_ms == 0 || grant_ttl_ms > EMBEDDED_GRANT_TTL_MS_MAX {
return Err(SdkError::Config {
context: format!("admin_client: {op}"),
field: Some("grant_ttl_ms".into()),
message: format!("must be in 1..={EMBEDDED_GRANT_TTL_MS_MAX}"),
});
}
Ok(())
}
fn engine_err_to_admin(err: ff_core::engine_error::EngineError, op: &str) -> SdkError {
if let ff_core::engine_error::EngineError::Unavailable { op: backend_op } = &err {
return SdkError::AdminApi {
status: 503,
message: format!(
"{op}: backend does not implement '{backend_op}' on this transport"
),
kind: Some("unavailable".to_owned()),
retryable: Some(false),
raw_body: String::new(),
};
}
SdkError::Engine(Box::new(err))
}
async fn claim_for_worker_embedded(
backend: &dyn EngineBackend,
req: ClaimForWorkerRequest,
) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
context: "admin_client: claim_for_worker".into(),
field: Some("lane_id".into()),
message: e.to_string(),
})?;
validate_admin_identifier("claim_for_worker", "worker_id", &req.worker_id)?;
validate_admin_identifier(
"claim_for_worker",
"worker_instance_id",
&req.worker_instance_id,
)?;
validate_admin_grant_ttl("claim_for_worker", req.grant_ttl_ms)?;
let caps: std::collections::BTreeSet<String> = req.capabilities.into_iter().collect();
let args = ff_core::contracts::ClaimForWorkerArgs::new(
lane_id,
ff_core::types::WorkerId::new(req.worker_id),
ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
caps,
req.grant_ttl_ms,
);
match backend
.claim_for_worker(args)
.await
.map_err(|e| engine_err_to_admin(e, "claim_for_worker"))?
{
ff_core::contracts::ClaimForWorkerOutcome::NoWork => Ok(None),
ff_core::contracts::ClaimForWorkerOutcome::Granted(grant) => {
Ok(Some(ClaimForWorkerResponse {
execution_id: grant.execution_id.to_string(),
partition_key: grant.partition_key,
grant_key: grant.grant_key,
expires_at_ms: grant.expires_at_ms,
}))
}
_ => Err(SdkError::AdminApi {
status: 503,
message: "claim_for_worker: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
kind: Some("unknown_outcome".to_owned()),
retryable: Some(false),
raw_body: String::new(),
}),
}
}
async fn issue_reclaim_grant_embedded(
backend: &dyn EngineBackend,
execution_id: &str,
req: IssueReclaimGrantRequest,
) -> Result<IssueReclaimGrantResponse, SdkError> {
let exec_id = ff_core::types::ExecutionId::parse(execution_id).map_err(|e| SdkError::Config {
context: "admin_client: issue_reclaim_grant".into(),
field: Some("execution_id".into()),
message: e.to_string(),
})?;
let lane_id = ff_core::types::LaneId::try_new(req.lane_id).map_err(|e| SdkError::Config {
context: "admin_client: issue_reclaim_grant".into(),
field: Some("lane_id".into()),
message: e.to_string(),
})?;
validate_admin_identifier("issue_reclaim_grant", "worker_id", &req.worker_id)?;
validate_admin_identifier(
"issue_reclaim_grant",
"worker_instance_id",
&req.worker_instance_id,
)?;
validate_admin_grant_ttl("issue_reclaim_grant", req.grant_ttl_ms)?;
let caps: std::collections::BTreeSet<String> = req.worker_capabilities.into_iter().collect();
let args = ff_core::contracts::IssueReclaimGrantArgs::new(
exec_id,
ff_core::types::WorkerId::new(req.worker_id),
ff_core::types::WorkerInstanceId::new(req.worker_instance_id),
lane_id,
req.capability_hash,
req.grant_ttl_ms,
req.route_snapshot_json,
req.admission_summary,
caps,
ff_core::types::TimestampMs::now(),
);
match backend
.issue_reclaim_grant(args)
.await
.map_err(|e| engine_err_to_admin(e, "issue_reclaim_grant"))?
{
ff_core::contracts::IssueReclaimGrantOutcome::Granted(grant) => {
Ok(IssueReclaimGrantResponse::Granted {
execution_id: grant.execution_id.to_string(),
partition_key: grant.partition_key,
grant_key: grant.grant_key,
expires_at_ms: grant.expires_at_ms,
lane_id: grant.lane_id.as_str().to_owned(),
})
}
ff_core::contracts::IssueReclaimGrantOutcome::NotReclaimable { execution_id, detail } => {
Ok(IssueReclaimGrantResponse::NotReclaimable {
execution_id: execution_id.to_string(),
detail,
})
}
ff_core::contracts::IssueReclaimGrantOutcome::ReclaimCapExceeded {
execution_id,
reclaim_count,
} => Ok(IssueReclaimGrantResponse::ReclaimCapExceeded {
execution_id: execution_id.to_string(),
reclaim_count,
}),
_ => Err(SdkError::AdminApi {
status: 503,
message: "issue_reclaim_grant: backend returned a non-exhaustive outcome this SDK build does not understand".to_owned(),
kind: Some("unknown_outcome".to_owned()),
retryable: Some(false),
raw_body: String::new(),
}),
}
}
async fn read_waitpoint_token_http(
http: &reqwest::Client,
base_url: &str,
execution_id: &ff_core::types::ExecutionId,
waitpoint_id: &ff_core::types::WaitpointId,
) -> Result<Option<String>, SdkError> {
let mut url = reqwest::Url::parse(base_url).map_err(|e| SdkError::Config {
context: "admin_client: read_waitpoint_token".into(),
field: Some("base_url".into()),
message: format!("invalid base_url '{}': {e}", base_url),
})?;
url.path_segments_mut()
.map_err(|_| SdkError::Config {
context: "admin_client: read_waitpoint_token".into(),
field: Some("base_url".into()),
message: format!("base_url '{}' cannot be a base", base_url),
})?
.extend(&[
"v1",
"executions",
execution_id.as_str(),
"waitpoints",
&waitpoint_id.to_string(),
"token",
]);
let resp = http
.get(url.clone())
.send()
.await
.map_err(|e| SdkError::Http {
source: e,
context: format!("GET {url}"),
})?;
let status = resp.status();
if status == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
if status.is_success() {
#[derive(Deserialize)]
struct Body {
token: String,
}
let body: Body = resp.json().await.map_err(|e| SdkError::Http {
source: e,
context: "decode read_waitpoint_token response body".into(),
})?;
return Ok(Some(body.token));
}
let status_u16 = status.as_u16();
let raw = resp.text().await.map_err(|e| SdkError::Http {
source: e,
context: format!("read read_waitpoint_token error body (status {status_u16})"),
})?;
let parsed = serde_json::from_str::<AdminErrorBody>(&raw).ok();
Err(SdkError::AdminApi {
status: status_u16,
message: parsed
.as_ref()
.map(|b| b.error.clone())
.unwrap_or_else(|| raw.clone()),
kind: parsed.as_ref().and_then(|b| b.kind.clone()),
retryable: parsed.as_ref().and_then(|b| b.retryable),
raw_body: raw,
})
}
async fn read_waitpoint_token_embedded(
backend: &dyn EngineBackend,
execution_id: &ff_core::types::ExecutionId,
waitpoint_id: &ff_core::types::WaitpointId,
) -> Result<Option<String>, SdkError> {
let partition =
ff_core::partition::PartitionKey::from(&ff_core::partition::Partition {
family: ff_core::partition::PartitionFamily::Flow,
index: execution_id.partition(),
});
backend
.read_waitpoint_token(partition, waitpoint_id)
.await
.map_err(|e| engine_err_to_admin(e, "read_waitpoint_token"))
}
async fn rotate_waitpoint_secret_embedded(
backend: &dyn EngineBackend,
req: RotateWaitpointSecretRequest,
) -> Result<RotateWaitpointSecretResponse, SdkError> {
if req.new_kid.is_empty() || req.new_kid.contains(':') {
return Err(SdkError::Config {
context: "admin_client: rotate_waitpoint_secret".into(),
field: Some("new_kid".into()),
message: "must be non-empty and must not contain ':'".into(),
});
}
if req.new_secret_hex.is_empty()
|| !req.new_secret_hex.len().is_multiple_of(2)
|| !req.new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
{
return Err(SdkError::Config {
context: "admin_client: rotate_waitpoint_secret".into(),
field: Some("new_secret_hex".into()),
message: "must be a non-empty even-length hex string".into(),
});
}
let args = ff_core::contracts::RotateWaitpointHmacSecretAllArgs::new(
req.new_kid.clone(),
req.new_secret_hex,
EMBEDDED_WAITPOINT_HMAC_GRACE_MS,
);
let result = backend
.rotate_waitpoint_hmac_secret_all(args)
.await
.map_err(|e| engine_err_to_admin(e, "rotate_waitpoint_secret"))?;
let mut rotated: u16 = 0;
let mut failed: Vec<u16> = Vec::new();
for entry in &result.entries {
match &entry.result {
Ok(_) => {
rotated = rotated.saturating_add(1);
}
Err(_) => failed.push(entry.partition),
}
}
Ok(RotateWaitpointSecretResponse {
rotated,
failed,
new_kid: req.new_kid,
})
}
#[derive(Debug, Clone, Serialize)]
pub struct IssueReclaimGrantRequest {
pub worker_id: String,
pub worker_instance_id: String,
pub lane_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub capability_hash: Option<String>,
pub grant_ttl_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub route_snapshot_json: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admission_summary: Option<String>,
#[serde(default)]
pub worker_capabilities: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum IssueReclaimGrantResponse {
Granted {
execution_id: String,
partition_key: ff_core::partition::PartitionKey,
grant_key: String,
expires_at_ms: u64,
lane_id: String,
},
NotReclaimable {
execution_id: String,
detail: String,
},
ReclaimCapExceeded {
execution_id: String,
reclaim_count: u32,
},
}
impl IssueReclaimGrantResponse {
pub fn into_grant(self) -> Result<ff_core::contracts::ReclaimGrant, SdkError> {
match self {
IssueReclaimGrantResponse::Granted {
execution_id,
partition_key,
grant_key,
expires_at_ms,
lane_id,
} => {
let eid = ff_core::types::ExecutionId::parse(&execution_id)
.map_err(|e| SdkError::AdminApi {
status: 200,
message: format!(
"issue_reclaim_grant: server returned malformed execution_id '{execution_id}': {e}"
),
kind: Some("malformed_response".to_owned()),
retryable: Some(false),
raw_body: String::new(),
})?;
let lane = ff_core::types::LaneId::try_new(lane_id.clone())
.map_err(|e| SdkError::AdminApi {
status: 200,
message: format!(
"issue_reclaim_grant: server returned malformed lane_id '{lane_id}': {e}"
),
kind: Some("malformed_response".to_owned()),
retryable: Some(false),
raw_body: String::new(),
})?;
Ok(ff_core::contracts::ReclaimGrant::new(
eid,
partition_key,
grant_key,
expires_at_ms,
lane,
))
}
IssueReclaimGrantResponse::NotReclaimable { execution_id, detail } => {
Err(SdkError::AdminApi {
status: 200,
message: format!(
"issue_reclaim_grant: execution '{execution_id}' not reclaimable: {detail}"
),
kind: Some("not_reclaimable".to_owned()),
retryable: Some(false),
raw_body: String::new(),
})
}
IssueReclaimGrantResponse::ReclaimCapExceeded {
execution_id,
reclaim_count,
} => Err(SdkError::AdminApi {
status: 200,
message: format!(
"issue_reclaim_grant: execution '{execution_id}' hit reclaim cap ({reclaim_count})"
),
kind: Some("reclaim_cap_exceeded".to_owned()),
retryable: Some(false),
raw_body: String::new(),
}),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RotateWaitpointSecretRequest {
pub new_kid: String,
pub new_secret_hex: String,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct RotateWaitpointSecretResponse {
pub rotated: u16,
pub failed: Vec<u16>,
pub new_kid: String,
}
#[derive(Debug, Clone, Deserialize)]
struct AdminErrorBody {
error: String,
#[serde(default)]
kind: Option<String>,
#[serde(default)]
retryable: Option<bool>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ClaimForWorkerRequest {
#[serde(skip)]
pub worker_id: String,
pub lane_id: String,
pub worker_instance_id: String,
#[serde(default)]
pub capabilities: Vec<String>,
pub grant_ttl_ms: u64,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ClaimForWorkerResponse {
pub execution_id: String,
pub partition_key: ff_core::partition::PartitionKey,
pub grant_key: String,
pub expires_at_ms: u64,
}
impl ClaimForWorkerResponse {
pub fn into_grant(self) -> Result<ff_core::contracts::ClaimGrant, SdkError> {
let execution_id = ff_core::types::ExecutionId::parse(&self.execution_id)
.map_err(|e| SdkError::AdminApi {
status: 200,
message: format!(
"claim_for_worker: server returned malformed execution_id '{}': {e}",
self.execution_id
),
kind: Some("malformed_response".to_owned()),
retryable: Some(false),
raw_body: String::new(),
})?;
Ok(ff_core::contracts::ClaimGrant::new(
execution_id,
self.partition_key,
self.grant_key,
self.expires_at_ms,
))
}
}
#[derive(Debug)]
pub struct PartitionRotationOutcome {
pub partition: u16,
pub result: Result<RotateWaitpointHmacSecretOutcome, SdkError>,
}
#[cfg(feature = "valkey-default")]
pub async fn rotate_waitpoint_hmac_secret_all_partitions(
client: &ferriskey::Client,
num_partitions: u16,
new_kid: &str,
new_secret_hex: &str,
grace_ms: u64,
) -> Vec<PartitionRotationOutcome> {
let args = RotateWaitpointHmacSecretArgs {
new_kid: new_kid.to_owned(),
new_secret_hex: new_secret_hex.to_owned(),
grace_ms,
};
let mut out = Vec::with_capacity(num_partitions as usize);
for index in 0..num_partitions {
let partition = Partition {
family: PartitionFamily::Execution,
index,
};
let idx = IndexKeys::new(&partition);
let result = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
client, &idx, &args,
)
.await
.map_err(SdkError::from);
out.push(PartitionRotationOutcome {
partition: index,
result,
});
}
out
}
fn normalize_base_url(mut url: String) -> String {
while url.ends_with('/') {
url.pop();
}
url
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn base_url_strips_trailing_slash() {
assert_eq!(normalize_base_url("http://x".into()), "http://x");
assert_eq!(normalize_base_url("http://x/".into()), "http://x");
assert_eq!(normalize_base_url("http://x///".into()), "http://x");
}
#[test]
fn with_token_rejects_bad_header_chars() {
let err = FlowFabricAdminClient::with_token("http://x", "tok\nevil").unwrap_err();
assert!(
matches!(err, SdkError::Config { .. }),
"got: {err:?}"
);
}
#[test]
fn with_token_rejects_empty_or_whitespace() {
for s in ["", " ", "\t\n ", " "] {
let err = FlowFabricAdminClient::with_token("http://x", s)
.unwrap_err();
assert!(
matches!(&err, SdkError::Config { field: Some(f), .. } if f == "bearer_token"),
"token {s:?} should return Config with field=bearer_token; got: {err:?}"
);
}
}
#[test]
fn admin_error_body_deserialises_optional_fields() {
let b: AdminErrorBody = serde_json::from_str(r#"{"error":"bad new_kid"}"#).unwrap();
assert_eq!(b.error, "bad new_kid");
assert!(b.kind.is_none());
assert!(b.retryable.is_none());
let b: AdminErrorBody = serde_json::from_str(
r#"{"error":"valkey: timed out","kind":"IoError","retryable":true}"#,
)
.unwrap();
assert_eq!(b.error, "valkey: timed out");
assert_eq!(b.kind.as_deref(), Some("IoError"));
assert_eq!(b.retryable, Some(true));
}
#[test]
fn rotate_response_deserialises_server_shape() {
let raw = r#"{
"rotated": 3,
"failed": [4, 5],
"new_kid": "kid-2026-04-18"
}"#;
let r: RotateWaitpointSecretResponse = serde_json::from_str(raw).unwrap();
assert_eq!(r.rotated, 3);
assert_eq!(r.failed, vec![4, 5]);
assert_eq!(r.new_kid, "kid-2026-04-18");
}
fn sample_claim_response(partition_key: &str) -> ClaimForWorkerResponse {
ClaimForWorkerResponse {
execution_id: "{fp:5}:11111111-1111-1111-1111-111111111111".to_owned(),
partition_key: serde_json::from_str(
&serde_json::to_string(partition_key).unwrap(),
)
.unwrap(),
grant_key: "ff:exec:{fp:5}:11111111-1111-1111-1111-111111111111:claim_grant".to_owned(),
expires_at_ms: 1_700_000_000_000,
}
}
#[test]
fn into_grant_preserves_all_known_partition_key_shapes() {
for key_str in ["{fp:5}", "{b:5}", "{q:5}"] {
let g = sample_claim_response(key_str).into_grant().unwrap_or_else(|e| {
panic!("key {key_str} should parse: {e:?}")
});
assert_eq!(g.partition_key.as_str(), key_str);
assert_eq!(g.expires_at_ms, 1_700_000_000_000);
}
}
#[test]
fn into_grant_preserves_opaque_partition_key() {
let resp = sample_claim_response("{zz:0}");
let g = resp.into_grant().expect("SDK must not parse partition_key");
assert_eq!(g.partition_key.as_str(), "{zz:0}");
assert!(g.partition().is_err());
}
#[test]
fn into_grant_rejects_malformed_execution_id() {
let mut resp = sample_claim_response("{fp:5}");
resp.execution_id = "not-a-valid-eid".to_owned();
let err = resp.into_grant().unwrap_err();
match err {
SdkError::AdminApi { message, kind, .. } => {
assert!(message.contains("malformed execution_id"),
"msg: {message}");
assert_eq!(kind.as_deref(), Some("malformed_response"));
}
other => panic!("expected AdminApi, got {other:?}"),
}
}
#[test]
fn read_waitpoint_token_url_percent_encodes_path_segments() {
use ff_core::types::{ExecutionId, WaitpointId};
let mut url = reqwest::Url::parse("http://x").unwrap();
let execution_id = ExecutionId::parse(
"{fp:7}:11111111-1111-1111-1111-111111111111",
)
.unwrap();
let waitpoint_id = WaitpointId::parse("22222222-2222-2222-2222-222222222222")
.unwrap();
url.path_segments_mut()
.unwrap()
.extend(&[
"v1",
"executions",
execution_id.as_str(),
"waitpoints",
&waitpoint_id.to_string(),
"token",
]);
assert_eq!(
url.as_str(),
"http://x/v1/executions/%7Bfp:7%7D:11111111-1111-1111-1111-111111111111\
/waitpoints/22222222-2222-2222-2222-222222222222/token"
);
}
#[test]
fn claim_for_worker_response_deserialises_opaque_partition_key() {
let raw = r#"{
"execution_id": "{fp:7}:11111111-1111-1111-1111-111111111111",
"partition_key": "{fp:7}",
"grant_key": "ff:exec:{fp:7}:11111111-1111-1111-1111-111111111111:claim_grant",
"expires_at_ms": 1700000000000
}"#;
let r: ClaimForWorkerResponse = serde_json::from_str(raw).unwrap();
assert_eq!(r.partition_key.as_str(), "{fp:7}");
assert_eq!(r.expires_at_ms, 1_700_000_000_000);
}
}