use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use ferriskey::{Client, ClientBuilder, Value};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinSet;
use ff_core::contracts::{
AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
RevokeLeaseResult,
RotateWaitpointHmacSecretArgs,
StageDependencyEdgeArgs, StageDependencyEdgeResult,
};
use ff_core::keys::{
self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
IndexKeys, QuotaKeyContext,
};
use ff_core::partition::{
budget_partition, execution_partition, flow_partition, quota_partition, Partition,
PartitionConfig, PartitionFamily,
};
use ff_core::state::{PublicState, StateVector};
use ff_core::types::*;
use ff_engine::Engine;
use ff_script::retry::is_retryable_kind;
use crate::config::ServerConfig;
const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
fn validate_create_budget_dimensions(
dimensions: &[String],
hard_limits: &[u64],
soft_limits: &[u64],
) -> Result<(), ServerError> {
let dim_count = dimensions.len();
if dim_count > MAX_BUDGET_DIMENSIONS {
return Err(ServerError::InvalidInput(format!(
"too_many_dimensions: limit={}, got={}",
MAX_BUDGET_DIMENSIONS, dim_count
)));
}
if hard_limits.len() != dim_count {
return Err(ServerError::InvalidInput(format!(
"dimension_limit_array_mismatch: dimensions={} hard_limits={}",
dim_count,
hard_limits.len()
)));
}
if soft_limits.len() != dim_count {
return Err(ServerError::InvalidInput(format!(
"dimension_limit_array_mismatch: dimensions={} soft_limits={}",
dim_count,
soft_limits.len()
)));
}
Ok(())
}
fn validate_report_usage_dimensions(
dimensions: &[String],
deltas: &[u64],
) -> Result<(), ServerError> {
let dim_count = dimensions.len();
if dim_count > MAX_BUDGET_DIMENSIONS {
return Err(ServerError::InvalidInput(format!(
"too_many_dimensions: limit={}, got={}",
MAX_BUDGET_DIMENSIONS, dim_count
)));
}
if deltas.len() != dim_count {
return Err(ServerError::InvalidInput(format!(
"dimension_delta_array_mismatch: dimensions={} deltas={}",
dim_count,
deltas.len()
)));
}
Ok(())
}
pub struct Server {
client: Client,
tail_client: Client,
stream_semaphore: Arc<tokio::sync::Semaphore>,
xread_block_lock: Arc<tokio::sync::Mutex<()>>,
admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
engine: Engine,
config: ServerConfig,
scheduler: Arc<ff_scheduler::Scheduler>,
background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
metrics: Arc<ff_observability::Metrics>,
}
#[derive(Debug, thiserror::Error)]
pub enum ServerError {
#[error("valkey: {0}")]
Valkey(#[from] ferriskey::Error),
#[error("valkey ({context}): {source}")]
ValkeyContext {
#[source]
source: ferriskey::Error,
context: String,
},
#[error("config: {0}")]
Config(#[from] crate::config::ConfigError),
#[error("library load: {0}")]
LibraryLoad(#[from] ff_script::loader::LoadError),
#[error("partition mismatch: {0}")]
PartitionMismatch(String),
#[error("not found: {0}")]
NotFound(String),
#[error("invalid input: {0}")]
InvalidInput(String),
#[error("operation failed: {0}")]
OperationFailed(String),
#[error("script: {0}")]
Script(String),
#[error("too many concurrent {0} calls (max: {1})")]
ConcurrencyLimitExceeded(&'static str, u32),
#[error(
"valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
)]
ValkeyVersionTooLow {
detected: String,
required: String,
},
}
impl ServerError {
pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
match self {
Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
Self::LibraryLoad(e) => e.valkey_kind(),
_ => None,
}
}
pub fn is_retryable(&self) -> bool {
match self {
Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
is_retryable_kind(e.kind())
}
Self::LibraryLoad(load_err) => load_err
.valkey_kind()
.map(is_retryable_kind)
.unwrap_or(false),
Self::Config(_)
| Self::PartitionMismatch(_)
| Self::NotFound(_)
| Self::InvalidInput(_)
| Self::OperationFailed(_)
| Self::Script(_) => false,
Self::ConcurrencyLimitExceeded(_, _) => true,
Self::ValkeyVersionTooLow { .. } => false,
}
}
}
impl Server {
pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
}
pub async fn start_with_metrics(
config: ServerConfig,
metrics: Arc<ff_observability::Metrics>,
) -> Result<Self, ServerError> {
tracing::info!(
host = %config.host, port = config.port,
tls = config.tls, cluster = config.cluster,
"connecting to Valkey"
);
let mut builder = ClientBuilder::new()
.host(&config.host, config.port)
.connect_timeout(Duration::from_secs(10))
.request_timeout(Duration::from_millis(5000));
if config.tls {
builder = builder.tls();
}
if config.cluster {
builder = builder.cluster();
}
let client = builder
.build()
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "connect".into() })?;
let pong: String = client
.cmd("PING")
.execute()
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "PING".into() })?;
if pong != "PONG" {
return Err(ServerError::OperationFailed(format!(
"unexpected PING response: {pong}"
)));
}
tracing::info!("Valkey connection established");
verify_valkey_version(&client).await?;
validate_or_create_partition_config(&client, &config.partition_config).await?;
initialize_waitpoint_hmac_secret(
&client,
&config.partition_config,
&config.waitpoint_hmac_secret,
)
.await?;
if !config.skip_library_load {
tracing::info!("loading flowfabric Lua library");
ff_script::loader::ensure_library(&client)
.await
.map_err(ServerError::LibraryLoad)?;
} else {
tracing::info!("skipping library load (skip_library_load=true)");
}
let engine_cfg = ff_engine::EngineConfig {
partition_config: config.partition_config,
lanes: config.lanes.clone(),
lease_expiry_interval: config.engine_config.lease_expiry_interval,
delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
index_reconciler_interval: config.engine_config.index_reconciler_interval,
attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
budget_reset_interval: config.engine_config.budget_reset_interval,
budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
unblock_interval: config.engine_config.unblock_interval,
dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
flow_projector_interval: config.engine_config.flow_projector_interval,
execution_deadline_interval: config.engine_config.execution_deadline_interval,
cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
scanner_filter: config.engine_config.scanner_filter.clone(),
};
let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
config.host.clone(),
config.port,
);
valkey_conn.tls = config.tls;
valkey_conn.cluster = config.cluster;
let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
client.clone(),
config.partition_config,
valkey_conn,
);
let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
.await
.map_err(|e| ServerError::OperationFailed(format!(
"subscribe_completions: {e}"
)))?;
let engine = Engine::start_with_completions(
engine_cfg,
client.clone(),
metrics.clone(),
completion_stream,
);
tracing::info!("opening dedicated tail connection");
let mut tail_builder = ClientBuilder::new()
.host(&config.host, config.port)
.connect_timeout(Duration::from_secs(10))
.request_timeout(Duration::from_millis(5000));
if config.tls {
tail_builder = tail_builder.tls();
}
if config.cluster {
tail_builder = tail_builder.cluster();
}
let tail_client = tail_builder
.build()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "connect (tail)".into(),
})?;
let tail_pong: String = tail_client
.cmd("PING")
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "PING (tail)".into(),
})?;
if tail_pong != "PONG" {
return Err(ServerError::OperationFailed(format!(
"tail client unexpected PING response: {tail_pong}"
)));
}
let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
config.max_concurrent_stream_ops as usize,
));
let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
tracing::info!(
max_concurrent_stream_ops = config.max_concurrent_stream_ops,
"stream-op client ready (read + tail share the semaphore; \
tails additionally serialize via xread_block_lock)"
);
if config.api_token.is_none() {
tracing::warn!(
listen_addr = %config.listen_addr,
"FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
FF_API_TOKEN for any deployment reachable from untrusted \
networks."
);
tracing::warn!(
listen_addr = %config.listen_addr,
"FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
and GET /v1/executions/{{id}}/result returns raw completion payload \
bytes (may contain PII). Both are UNAUTHENTICATED in this \
configuration."
);
}
tracing::info!(
flow_partitions = config.partition_config.num_flow_partitions,
budget_partitions = config.partition_config.num_budget_partitions,
quota_partitions = config.partition_config.num_quota_partitions,
lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
listen_addr = %config.listen_addr,
"FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
config.partition_config.num_flow_partitions,
config.partition_config.num_budget_partitions,
config.partition_config.num_quota_partitions,
);
let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
client.clone(),
config.partition_config,
metrics.clone(),
));
Ok(Self {
client,
tail_client,
stream_semaphore,
xread_block_lock,
admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
engine,
config,
scheduler,
background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
metrics,
})
}
pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
&self.metrics
}
pub fn client(&self) -> &Client {
&self.client
}
async fn fcall_with_reload(
&self,
function: &str,
keys: &[&str],
args: &[&str],
) -> Result<Value, ServerError> {
fcall_with_reload_on_client(&self.client, function, keys, args).await
}
pub fn config(&self) -> &ServerConfig {
&self.config
}
pub fn partition_config(&self) -> &PartitionConfig {
&self.config.partition_config
}
pub async fn create_execution(
&self,
args: &CreateExecutionArgs,
) -> Result<CreateExecutionResult, ServerError> {
let partition = execution_partition(&args.execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, &args.execution_id);
let idx = IndexKeys::new(&partition);
let lane = &args.lane_id;
let tag = partition.hash_tag();
let idem_key = match &args.idempotency_key {
Some(k) if !k.is_empty() => {
keys::idempotency_key(&tag, args.namespace.as_str(), k)
}
_ => ctx.noop(),
};
let delay_str = args
.delay_until
.map(|d| d.0.to_string())
.unwrap_or_default();
let is_delayed = !delay_str.is_empty();
let scheduling_zset = if is_delayed {
idx.lane_delayed(lane)
} else {
idx.lane_eligible(lane)
};
let fcall_keys: Vec<String> = vec![
ctx.core(), ctx.payload(), ctx.policy(), ctx.tags(), scheduling_zset, idem_key, idx.execution_deadline(), idx.all_executions(), ];
let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
let fcall_args: Vec<String> = vec![
args.execution_id.to_string(), args.namespace.to_string(), args.lane_id.to_string(), args.execution_kind.clone(), args.priority.to_string(), args.creator_identity.clone(), args.policy.as_ref()
.map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
.unwrap_or_else(|| "{}".to_owned()), String::from_utf8_lossy(&args.input_payload).into_owned(), delay_str, args.idempotency_key.as_ref()
.map(|_| "86400000".to_string())
.unwrap_or_default(), tags_json, args.execution_deadline_at
.map(|d| d.to_string())
.unwrap_or_default(), args.partition_id.to_string(), ];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
.await?;
parse_create_result(&raw, &args.execution_id)
}
pub async fn cancel_execution(
&self,
args: &CancelExecutionArgs,
) -> Result<CancelExecutionResult, ServerError> {
let raw = self
.fcall_cancel_execution_with_reload(args)
.await?;
parse_cancel_result(&raw, &args.execution_id)
}
async fn fcall_cancel_execution_with_reload(
&self,
args: &CancelExecutionArgs,
) -> Result<Value, ServerError> {
let (keys, argv) = build_cancel_execution_fcall(
&self.client,
&self.config.partition_config,
args,
)
.await?;
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
}
pub async fn get_execution_state(
&self,
execution_id: &ExecutionId,
) -> Result<PublicState, ServerError> {
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let state_str: Option<String> = self
.client
.hget(&ctx.core(), "public_state")
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET public_state".into() })?;
match state_str {
Some(s) => {
let quoted = format!("\"{s}\"");
serde_json::from_str("ed).map_err(|e| {
ServerError::Script(format!(
"invalid public_state '{s}' for {execution_id}: {e}"
))
})
}
None => Err(ServerError::NotFound(format!(
"execution not found: {execution_id}"
))),
}
}
pub async fn get_execution_result(
&self,
execution_id: &ExecutionId,
) -> Result<Option<Vec<u8>>, ServerError> {
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let payload: Option<Vec<u8>> = self
.client
.cmd("GET")
.arg(ctx.result())
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "GET exec result".into(),
})?;
Ok(payload)
}
pub async fn list_pending_waitpoints(
&self,
execution_id: &ExecutionId,
) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let core_exists: bool = self
.client
.cmd("EXISTS")
.arg(ctx.core())
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "EXISTS exec_core (pending waitpoints)".into(),
})?;
if !core_exists {
return Err(ServerError::NotFound(format!(
"execution not found: {execution_id}"
)));
}
const WAITPOINTS_SSCAN_COUNT: usize = 100;
let waitpoints_key = ctx.waitpoints();
let mut wp_ids_raw: Vec<String> = Vec::new();
let mut cursor: String = "0".to_owned();
loop {
let reply: (String, Vec<String>) = self
.client
.cmd("SSCAN")
.arg(&waitpoints_key)
.arg(&cursor)
.arg("COUNT")
.arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "SSCAN waitpoints".into(),
})?;
cursor = reply.0;
wp_ids_raw.extend(reply.1);
if cursor == "0" {
break;
}
}
wp_ids_raw.sort_unstable();
wp_ids_raw.dedup();
if wp_ids_raw.is_empty() {
return Ok(Vec::new());
}
let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
for raw in &wp_ids_raw {
match WaitpointId::parse(raw) {
Ok(id) => wp_ids.push(id),
Err(e) => tracing::warn!(
raw_id = %raw,
error = %e,
execution_id = %execution_id,
"list_pending_waitpoints: skipping unparseable waitpoint_id"
),
}
}
if wp_ids.is_empty() {
return Ok(Vec::new());
}
const WP_FIELDS: [&str; 6] = [
"state",
"waitpoint_key",
"waitpoint_token",
"created_at",
"activated_at",
"expires_at",
];
let mut pass1 = self.client.pipeline();
let mut wp_slots = Vec::with_capacity(wp_ids.len());
let mut cond_slots = Vec::with_capacity(wp_ids.len());
for wp_id in &wp_ids {
let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
cmd = cmd.arg(ctx.waitpoint(wp_id));
for f in WP_FIELDS {
cmd = cmd.arg(f);
}
wp_slots.push(cmd.finish());
cond_slots.push(
pass1
.cmd::<Option<String>>("HGET")
.arg(ctx.waitpoint_condition(wp_id))
.arg("total_matchers")
.finish(),
);
}
pass1
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "pipeline HMGET waitpoints + HGET total_matchers".into(),
})?;
struct Kept {
wp_id: WaitpointId,
wp_fields: Vec<Option<String>>,
total_matchers: usize,
}
let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
for ((wp_id, wp_slot), cond_slot) in wp_ids
.iter()
.zip(wp_slots)
.zip(cond_slots)
{
let wp_fields: Vec<Option<String>> =
wp_slot.value().map_err(|e| ServerError::ValkeyContext {
source: e,
context: format!("pipeline slot HMGET waitpoint {wp_id}"),
})?;
if wp_fields.iter().all(Option::is_none) {
let _ = cond_slot.value();
continue;
}
let state_ref = wp_fields
.first()
.and_then(|v| v.as_deref())
.unwrap_or("");
if state_ref != "pending" && state_ref != "active" {
let _ = cond_slot.value();
continue;
}
let token_ref = wp_fields
.get(2)
.and_then(|v| v.as_deref())
.unwrap_or("");
if token_ref.is_empty() {
let _ = cond_slot.value();
tracing::warn!(
waitpoint_id = %wp_id,
execution_id = %execution_id,
waitpoint_hash_key = %ctx.waitpoint(wp_id),
state = %state_ref,
"list_pending_waitpoints: waitpoint hash present but waitpoint_token \
field is empty — likely storage corruption (half-populated write, \
operator edit, or interrupted script). Skipping this entry in the \
response. HGETALL the waitpoint_hash_key to inspect."
);
continue;
}
let total_matchers = cond_slot
.value()
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: format!("pipeline slot HGET total_matchers {wp_id}"),
})?
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(0);
kept.push(Kept {
wp_id: wp_id.clone(),
wp_fields,
total_matchers,
});
}
if kept.is_empty() {
return Ok(Vec::new());
}
let mut pass2 = self.client.pipeline();
let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
let mut pass2_needed = false;
for k in &kept {
if k.total_matchers == 0 {
matcher_slots.push(None);
continue;
}
pass2_needed = true;
let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
for i in 0..k.total_matchers {
cmd = cmd.arg(format!("matcher:{i}:name"));
}
matcher_slots.push(Some(cmd.finish()));
}
if pass2_needed {
pass2.execute().await.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "pipeline HMGET wp_condition matchers".into(),
})?;
}
let parse_ts = |raw: &str| -> Option<TimestampMs> {
if raw.is_empty() {
None
} else {
raw.parse::<i64>().ok().map(TimestampMs)
}
};
let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
for (k, slot) in kept.into_iter().zip(matcher_slots) {
let get = |i: usize| -> &str {
k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
};
let required_signal_names: Vec<String> = match slot {
None => Vec::new(),
Some(s) => {
let vals: Vec<Option<String>> =
s.value().map_err(|e| ServerError::ValkeyContext {
source: e,
context: format!(
"pipeline slot HMGET wp_condition matchers {}",
k.wp_id
),
})?;
vals.into_iter()
.flatten()
.filter(|name| !name.is_empty())
.collect()
}
};
out.push(PendingWaitpointInfo {
waitpoint_id: k.wp_id,
waitpoint_key: get(1).to_owned(),
state: get(0).to_owned(),
waitpoint_token: WaitpointToken(get(2).to_owned()),
required_signal_names,
created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
activated_at: parse_ts(get(4)),
expires_at: parse_ts(get(5)),
});
}
Ok(out)
}
pub async fn create_budget(
&self,
args: &CreateBudgetArgs,
) -> Result<CreateBudgetResult, ServerError> {
validate_create_budget_dimensions(
&args.dimensions,
&args.hard_limits,
&args.soft_limits,
)?;
let partition = budget_partition(&args.budget_id, &self.config.partition_config);
let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
let resets_key = keys::budget_resets_key(bctx.hash_tag());
let policies_index = keys::budget_policies_index(bctx.hash_tag());
let fcall_keys: Vec<String> = vec![
bctx.definition(),
bctx.limits(),
bctx.usage(),
resets_key,
policies_index,
];
let dim_count = args.dimensions.len();
let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
fcall_args.push(args.budget_id.to_string());
fcall_args.push(args.scope_type.clone());
fcall_args.push(args.scope_id.clone());
fcall_args.push(args.enforcement_mode.clone());
fcall_args.push(args.on_hard_limit.clone());
fcall_args.push(args.on_soft_limit.clone());
fcall_args.push(args.reset_interval_ms.to_string());
fcall_args.push(args.now.to_string());
fcall_args.push(dim_count.to_string());
for dim in &args.dimensions {
fcall_args.push(dim.clone());
}
for hard in &args.hard_limits {
fcall_args.push(hard.to_string());
}
for soft in &args.soft_limits {
fcall_args.push(soft.to_string());
}
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
.await?;
parse_budget_create_result(&raw, &args.budget_id)
}
pub async fn create_quota_policy(
&self,
args: &CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, ServerError> {
let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
let fcall_keys: Vec<String> = vec![
qctx.definition(),
qctx.window("requests_per_window"),
qctx.concurrency(),
qctx.admitted_set(),
keys::quota_policies_index(qctx.hash_tag()),
];
let fcall_args: Vec<String> = vec![
args.quota_policy_id.to_string(),
args.window_seconds.to_string(),
args.max_requests_per_window.to_string(),
args.max_concurrent.to_string(),
args.now.to_string(),
];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
.await?;
parse_quota_create_result(&raw, &args.quota_policy_id)
}
pub async fn get_budget_status(
&self,
budget_id: &BudgetId,
) -> Result<BudgetStatus, ServerError> {
let partition = budget_partition(budget_id, &self.config.partition_config);
let bctx = BudgetKeyContext::new(&partition, budget_id);
let def: HashMap<String, String> = self
.client
.hgetall(&bctx.definition())
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_def".into() })?;
if def.is_empty() {
return Err(ServerError::NotFound(format!(
"budget not found: {budget_id}"
)));
}
let usage_raw: HashMap<String, String> = self
.client
.hgetall(&bctx.usage())
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_usage".into() })?;
let usage: HashMap<String, u64> = usage_raw
.into_iter()
.filter(|(k, _)| k != "_init")
.map(|(k, v)| (k, v.parse().unwrap_or(0)))
.collect();
let limits_raw: HashMap<String, String> = self
.client
.hgetall(&bctx.limits())
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_limits".into() })?;
let mut hard_limits = HashMap::new();
let mut soft_limits = HashMap::new();
for (k, v) in &limits_raw {
if let Some(dim) = k.strip_prefix("hard:") {
hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
} else if let Some(dim) = k.strip_prefix("soft:") {
soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
}
}
let non_empty = |s: Option<&String>| -> Option<String> {
s.filter(|v| !v.is_empty()).cloned()
};
Ok(BudgetStatus {
budget_id: budget_id.to_string(),
scope_type: def.get("scope_type").cloned().unwrap_or_default(),
scope_id: def.get("scope_id").cloned().unwrap_or_default(),
enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
usage,
hard_limits,
soft_limits,
breach_count: def
.get("breach_count")
.and_then(|v| v.parse().ok())
.unwrap_or(0),
soft_breach_count: def
.get("soft_breach_count")
.and_then(|v| v.parse().ok())
.unwrap_or(0),
last_breach_at: non_empty(def.get("last_breach_at")),
last_breach_dim: non_empty(def.get("last_breach_dim")),
next_reset_at: non_empty(def.get("next_reset_at")),
created_at: non_empty(def.get("created_at")),
})
}
pub async fn report_usage(
&self,
budget_id: &BudgetId,
args: &ReportUsageArgs,
) -> Result<ReportUsageResult, ServerError> {
validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
let partition = budget_partition(budget_id, &self.config.partition_config);
let bctx = BudgetKeyContext::new(&partition, budget_id);
let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
let dim_count = args.dimensions.len();
let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
fcall_args.push(dim_count.to_string());
for dim in &args.dimensions {
fcall_args.push(dim.clone());
}
for delta in &args.deltas {
fcall_args.push(delta.to_string());
}
fcall_args.push(args.now.to_string());
let dedup_key_val = args
.dedup_key
.as_ref()
.filter(|k| !k.is_empty())
.map(|k| usage_dedup_key(bctx.hash_tag(), k))
.unwrap_or_default();
fcall_args.push(dedup_key_val);
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
.await?;
parse_report_usage_result(&raw)
}
pub async fn reset_budget(
&self,
budget_id: &BudgetId,
) -> Result<ResetBudgetResult, ServerError> {
let partition = budget_partition(budget_id, &self.config.partition_config);
let bctx = BudgetKeyContext::new(&partition, budget_id);
let resets_key = keys::budget_resets_key(bctx.hash_tag());
let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
let now = TimestampMs::now();
let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
.await?;
parse_reset_budget_result(&raw)
}
pub async fn create_flow(
&self,
args: &CreateFlowArgs,
) -> Result<CreateFlowResult, ServerError> {
let partition = flow_partition(&args.flow_id, &self.config.partition_config);
let fctx = FlowKeyContext::new(&partition, &args.flow_id);
let fidx = FlowIndexKeys::new(&partition);
let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
let fcall_args: Vec<String> = vec![
args.flow_id.to_string(),
args.flow_kind.clone(),
args.namespace.to_string(),
args.now.to_string(),
];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
.await?;
parse_create_flow_result(&raw, &args.flow_id)
}
pub async fn add_execution_to_flow(
&self,
args: &AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, ServerError> {
let partition = flow_partition(&args.flow_id, &self.config.partition_config);
let fctx = FlowKeyContext::new(&partition, &args.flow_id);
let fidx = FlowIndexKeys::new(&partition);
let exec_partition =
execution_partition(&args.execution_id, &self.config.partition_config);
let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
if exec_partition.index != partition.index {
return Err(ServerError::PartitionMismatch(format!(
"add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
so the exec's hash-tag matches the flow's `{{fp:N}}`.",
exec_p = exec_partition.index,
flow_p = partition.index,
)));
}
let fcall_keys: Vec<String> = vec![
fctx.core(),
fctx.members(),
fidx.flow_index(),
ectx.core(),
];
let fcall_args: Vec<String> = vec![
args.flow_id.to_string(),
args.execution_id.to_string(),
args.now.to_string(),
];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
.await?;
parse_add_execution_to_flow_result(&raw)
}
pub async fn cancel_flow(
&self,
args: &CancelFlowArgs,
) -> Result<CancelFlowResult, ServerError> {
self.cancel_flow_inner(args, false).await
}
pub async fn cancel_flow_wait(
&self,
args: &CancelFlowArgs,
) -> Result<CancelFlowResult, ServerError> {
self.cancel_flow_inner(args, true).await
}
async fn cancel_flow_inner(
&self,
args: &CancelFlowArgs,
wait: bool,
) -> Result<CancelFlowResult, ServerError> {
let partition = flow_partition(&args.flow_id, &self.config.partition_config);
let fctx = FlowKeyContext::new(&partition, &args.flow_id);
let fidx = FlowIndexKeys::new(&partition);
const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
let fcall_keys: Vec<String> = vec![
fctx.core(),
fctx.members(),
fidx.flow_index(),
fctx.pending_cancels(),
fidx.cancel_backlog(),
];
let fcall_args: Vec<String> = vec![
args.flow_id.to_string(),
args.reason.clone(),
args.cancellation_policy.clone(),
args.now.to_string(),
CANCEL_RECONCILER_GRACE_MS.to_string(),
];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
.await?;
let (policy, members) = match parse_cancel_flow_raw(&raw)? {
ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
(policy, member_execution_ids)
}
ParsedCancelFlow::AlreadyTerminal => {
let flow_meta: Vec<Option<String>> = self
.client
.cmd("HMGET")
.arg(fctx.core())
.arg("cancellation_policy")
.arg("cancel_reason")
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "HMGET flow_core cancellation_policy,cancel_reason".into(),
})?;
let stored_policy = flow_meta
.first()
.and_then(|v| v.as_ref())
.filter(|s| !s.is_empty())
.cloned();
let stored_reason = flow_meta
.get(1)
.and_then(|v| v.as_ref())
.filter(|s| !s.is_empty())
.cloned();
let all_members: Vec<String> = self
.client
.cmd("SMEMBERS")
.arg(fctx.members())
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "SMEMBERS flow members (already terminal)".into(),
})?;
let total_members = all_members.len();
let stored_members: Vec<String> = all_members
.into_iter()
.take(ALREADY_TERMINAL_MEMBER_CAP)
.collect();
tracing::debug!(
flow_id = %args.flow_id,
stored_policy = stored_policy.as_deref().unwrap_or(""),
stored_reason = stored_reason.as_deref().unwrap_or(""),
total_members,
returned_members = stored_members.len(),
"cancel_flow: flow already terminal, returning idempotent Cancelled"
);
return Ok(CancelFlowResult::Cancelled {
cancellation_policy: stored_policy
.unwrap_or_else(|| args.cancellation_policy.clone()),
member_execution_ids: stored_members,
});
}
};
let needs_dispatch = policy == "cancel_all" && !members.is_empty();
if !needs_dispatch {
return Ok(CancelFlowResult::Cancelled {
cancellation_policy: policy,
member_execution_ids: members,
});
}
let pending_cancels_key = fctx.pending_cancels();
let cancel_backlog_key = fidx.cancel_backlog();
if wait {
let mut failed: Vec<String> = Vec::new();
for eid_str in &members {
match cancel_member_execution(
&self.client,
&self.config.partition_config,
eid_str,
&args.reason,
args.now,
)
.await
{
Ok(()) => {
ack_cancel_member(
&self.client,
&pending_cancels_key,
&cancel_backlog_key,
eid_str,
&args.flow_id.to_string(),
)
.await;
}
Err(e) => {
if is_terminal_ack_error(&e) {
ack_cancel_member(
&self.client,
&pending_cancels_key,
&cancel_backlog_key,
eid_str,
&args.flow_id.to_string(),
)
.await;
continue;
}
tracing::warn!(
execution_id = %eid_str,
error = %e,
"cancel_flow(wait): individual execution cancel failed \
(transport/contract fault; reconciler will retry if transient)"
);
failed.push(eid_str.clone());
}
}
}
if failed.is_empty() {
return Ok(CancelFlowResult::Cancelled {
cancellation_policy: policy,
member_execution_ids: members,
});
}
return Ok(CancelFlowResult::PartiallyCancelled {
cancellation_policy: policy,
member_execution_ids: members,
failed_member_execution_ids: failed,
});
}
let client = self.client.clone();
let partition_config = self.config.partition_config;
let reason = args.reason.clone();
let now = args.now;
let dispatch_members = members.clone();
let flow_id = args.flow_id.clone();
let mut guard = self.background_tasks.lock().await;
while let Some(joined) = guard.try_join_next() {
if let Err(e) = joined {
tracing::warn!(
error = %e,
"cancel_flow: background dispatch task panicked or was aborted"
);
}
}
let pending_key_owned = pending_cancels_key.clone();
let backlog_key_owned = cancel_backlog_key.clone();
let flow_id_str = args.flow_id.to_string();
guard.spawn(async move {
use futures::stream::StreamExt;
const CONCURRENCY: usize = 16;
let member_count = dispatch_members.len();
let flow_id_for_log = flow_id.clone();
futures::stream::iter(dispatch_members)
.map(|eid_str| {
let client = client.clone();
let reason = reason.clone();
let flow_id = flow_id.clone();
let pending = pending_key_owned.clone();
let backlog = backlog_key_owned.clone();
let flow_id_str = flow_id_str.clone();
async move {
match cancel_member_execution(
&client,
&partition_config,
&eid_str,
&reason,
now,
)
.await
{
Ok(()) => {
ack_cancel_member(
&client,
&pending,
&backlog,
&eid_str,
&flow_id_str,
)
.await;
}
Err(e) => {
if is_terminal_ack_error(&e) {
ack_cancel_member(
&client,
&pending,
&backlog,
&eid_str,
&flow_id_str,
)
.await;
} else {
tracing::warn!(
flow_id = %flow_id,
execution_id = %eid_str,
error = %e,
"cancel_flow(async): individual execution cancel failed \
(transport/contract fault; reconciler will retry if transient)"
);
}
}
}
}
})
.buffer_unordered(CONCURRENCY)
.for_each(|()| async {})
.await;
tracing::debug!(
flow_id = %flow_id_for_log,
member_count,
concurrency = CONCURRENCY,
"cancel_flow: background member dispatch complete"
);
});
drop(guard);
let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
Ok(CancelFlowResult::CancellationScheduled {
cancellation_policy: policy,
member_count,
member_execution_ids: members,
})
}
pub async fn stage_dependency_edge(
&self,
args: &StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, ServerError> {
let partition = flow_partition(&args.flow_id, &self.config.partition_config);
let fctx = FlowKeyContext::new(&partition, &args.flow_id);
let fcall_keys: Vec<String> = vec![
fctx.core(),
fctx.members(),
fctx.edge(&args.edge_id),
fctx.outgoing(&args.upstream_execution_id),
fctx.incoming(&args.downstream_execution_id),
fctx.grant(&args.edge_id.to_string()),
];
let fcall_args: Vec<String> = vec![
args.flow_id.to_string(),
args.edge_id.to_string(),
args.upstream_execution_id.to_string(),
args.downstream_execution_id.to_string(),
args.dependency_kind.clone(),
args.data_passing_ref.clone().unwrap_or_default(),
args.expected_graph_revision.to_string(),
args.now.to_string(),
];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
.await?;
parse_stage_dependency_edge_result(&raw)
}
pub async fn apply_dependency_to_child(
&self,
args: &ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, ServerError> {
let partition = execution_partition(
&args.downstream_execution_id,
&self.config.partition_config,
);
let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
let idx = IndexKeys::new(&partition);
let lane_str: Option<String> = self
.client
.hget(&ctx.core(), "lane_id")
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
let fcall_keys: Vec<String> = vec![
ctx.core(),
ctx.deps_meta(),
ctx.deps_unresolved(),
ctx.dep_edge(&args.edge_id),
idx.lane_eligible(&lane),
idx.lane_blocked_dependencies(&lane),
ctx.deps_all_edges(),
];
let fcall_args: Vec<String> = vec![
args.flow_id.to_string(),
args.edge_id.to_string(),
args.upstream_execution_id.to_string(),
args.graph_revision.to_string(),
args.dependency_kind.clone(),
args.data_passing_ref.clone().unwrap_or_default(),
args.now.to_string(),
];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
.await?;
parse_apply_dependency_result(&raw)
}
pub async fn deliver_signal(
&self,
args: &DeliverSignalArgs,
) -> Result<DeliverSignalResult, ServerError> {
let partition = execution_partition(&args.execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, &args.execution_id);
let idx = IndexKeys::new(&partition);
let lane_str: Option<String> = self
.client
.hget(&ctx.core(), "lane_id")
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
let wp_id = &args.waitpoint_id;
let sig_id = &args.signal_id;
let idem_key = args
.idempotency_key
.as_ref()
.filter(|k| !k.is_empty())
.map(|k| ctx.signal_dedup(wp_id, k))
.unwrap_or_else(|| ctx.noop());
let fcall_keys: Vec<String> = vec![
ctx.core(), ctx.waitpoint_condition(wp_id), ctx.waitpoint_signals(wp_id), ctx.exec_signals(), ctx.signal(sig_id), ctx.signal_payload(sig_id), idem_key, ctx.waitpoint(wp_id), ctx.suspension_current(), idx.lane_eligible(&lane), idx.lane_suspended(&lane), idx.lane_delayed(&lane), idx.suspension_timeout(), idx.waitpoint_hmac_secrets(), ];
let fcall_args: Vec<String> = vec![
args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
.map(|p| String::from_utf8_lossy(p).into_owned())
.unwrap_or_default(), args.payload_encoding
.clone()
.unwrap_or_else(|| "json".to_owned()), args.idempotency_key
.clone()
.unwrap_or_default(), args.correlation_id
.clone()
.unwrap_or_default(), args.target_scope.clone(), args.created_at
.map(|ts| ts.to_string())
.unwrap_or_else(|| args.now.to_string()), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution
.unwrap_or(10_000)
.to_string(), args.waitpoint_token.as_str().to_owned(), ];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
.await?;
parse_deliver_signal_result(&raw, &args.signal_id)
}
pub async fn change_priority(
&self,
execution_id: &ExecutionId,
new_priority: i32,
) -> Result<ChangePriorityResult, ServerError> {
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let idx = IndexKeys::new(&partition);
let lane_str: Option<String> = self
.client
.hget(&ctx.core(), "lane_id")
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
let fcall_args: Vec<String> = vec![
execution_id.to_string(),
new_priority.to_string(),
];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
.await?;
parse_change_priority_result(&raw, execution_id)
}
pub async fn claim_for_worker(
&self,
lane: &LaneId,
worker_id: &WorkerId,
worker_instance_id: &WorkerInstanceId,
worker_capabilities: &std::collections::BTreeSet<String>,
grant_ttl_ms: u64,
) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
self.scheduler
.claim_for_worker(
lane,
worker_id,
worker_instance_id,
worker_capabilities,
grant_ttl_ms,
)
.await
.map_err(|e| match e {
ff_scheduler::SchedulerError::Valkey(inner) => {
ServerError::Valkey(inner)
}
ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
ServerError::ValkeyContext { source, context }
}
ff_scheduler::SchedulerError::Config(msg) => {
ServerError::InvalidInput(msg)
}
})
}
pub async fn revoke_lease(
&self,
execution_id: &ExecutionId,
) -> Result<RevokeLeaseResult, ServerError> {
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let idx = IndexKeys::new(&partition);
let wiid_str: Option<String> = self
.client
.hget(&ctx.core(), "current_worker_instance_id")
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET worker_instance_id".into() })?;
let wiid = match wiid_str {
Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
_ => {
return Err(ServerError::NotFound(format!(
"no active lease for execution {execution_id} (no current_worker_instance_id)"
)));
}
};
let fcall_keys: Vec<String> = vec![
ctx.core(),
ctx.lease_current(),
ctx.lease_history(),
idx.lease_expiry(),
idx.worker_leases(&wiid),
];
let fcall_args: Vec<String> = vec![
execution_id.to_string(),
String::new(), "operator_revoke".to_owned(),
];
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
.await?;
parse_revoke_lease_result(&raw)
}
pub async fn get_execution(
&self,
execution_id: &ExecutionId,
) -> Result<ExecutionInfo, ServerError> {
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let fields: HashMap<String, String> = self
.client
.hgetall(&ctx.core())
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL exec_core".into() })?;
if fields.is_empty() {
return Err(ServerError::NotFound(format!(
"execution not found: {execution_id}"
)));
}
let parse_enum = |field: &str| -> String {
fields.get(field).cloned().unwrap_or_default()
};
fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
let quoted = format!("\"{raw}\"");
serde_json::from_str("ed).map_err(|e| {
ServerError::Script(format!("invalid {field} '{raw}': {e}"))
})
}
let lp_str = parse_enum("lifecycle_phase");
let os_str = parse_enum("ownership_state");
let es_str = parse_enum("eligibility_state");
let br_str = parse_enum("blocking_reason");
let to_str = parse_enum("terminal_outcome");
let as_str = parse_enum("attempt_state");
let ps_str = parse_enum("public_state");
let state_vector = StateVector {
lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
ownership_state: deserialize("ownership_state", &os_str)?,
eligibility_state: deserialize("eligibility_state", &es_str)?,
blocking_reason: deserialize("blocking_reason", &br_str)?,
terminal_outcome: deserialize("terminal_outcome", &to_str)?,
attempt_state: deserialize("attempt_state", &as_str)?,
public_state: deserialize("public_state", &ps_str)?,
};
let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
let started_at_opt = fields
.get("started_at")
.filter(|s| !s.is_empty())
.cloned();
let completed_at_opt = fields
.get("completed_at")
.filter(|s| !s.is_empty())
.cloned();
Ok(ExecutionInfo {
execution_id: execution_id.clone(),
namespace: parse_enum("namespace"),
lane_id: parse_enum("lane_id"),
priority: fields
.get("priority")
.and_then(|v| v.parse().ok())
.unwrap_or(0),
execution_kind: parse_enum("execution_kind"),
state_vector,
public_state: deserialize("public_state", &ps_str)?,
created_at: parse_enum("created_at"),
started_at: started_at_opt,
completed_at: completed_at_opt,
current_attempt_index: fields
.get("current_attempt_index")
.and_then(|v| v.parse().ok())
.unwrap_or(0),
flow_id: flow_id_val,
blocking_detail: parse_enum("blocking_detail"),
})
}
pub async fn list_executions(
&self,
partition_id: u16,
lane: &LaneId,
state_filter: &str,
offset: u64,
limit: u64,
) -> Result<ListExecutionsResult, ServerError> {
let partition = ff_core::partition::Partition {
family: ff_core::partition::PartitionFamily::Execution,
index: partition_id,
};
let idx = IndexKeys::new(&partition);
let zset_key = match state_filter {
"eligible" => idx.lane_eligible(lane),
"delayed" => idx.lane_delayed(lane),
"terminal" => idx.lane_terminal(lane),
"suspended" => idx.lane_suspended(lane),
"active" => idx.lane_active(lane),
other => {
return Err(ServerError::InvalidInput(format!(
"invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
)));
}
};
let eids: Vec<String> = self
.client
.cmd("ZRANGE")
.arg(&zset_key)
.arg("-inf")
.arg("+inf")
.arg("BYSCORE")
.arg("LIMIT")
.arg(offset)
.arg(limit)
.execute()
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: format!("ZRANGE {zset_key}") })?;
if eids.is_empty() {
return Ok(ListExecutionsResult {
executions: vec![],
total_returned: 0,
});
}
let mut parsed = Vec::with_capacity(eids.len());
for eid_str in &eids {
match ExecutionId::parse(eid_str) {
Ok(id) => parsed.push(id),
Err(e) => {
tracing::warn!(
raw_id = %eid_str,
error = %e,
zset = %zset_key,
"list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
);
}
}
}
if parsed.is_empty() {
return Ok(ListExecutionsResult {
executions: vec![],
total_returned: 0,
});
}
let mut pipe = self.client.pipeline();
let mut slots = Vec::with_capacity(parsed.len());
for eid in &parsed {
let ep = execution_partition(eid, &self.config.partition_config);
let ctx = ExecKeyContext::new(&ep, eid);
let slot = pipe
.cmd::<Vec<Option<String>>>("HMGET")
.arg(ctx.core())
.arg("namespace")
.arg("lane_id")
.arg("execution_kind")
.arg("public_state")
.arg("priority")
.arg("created_at")
.finish();
slots.push(slot);
}
pipe.execute()
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline HMGET".into() })?;
let mut summaries = Vec::with_capacity(parsed.len());
for (eid, slot) in parsed.into_iter().zip(slots) {
let fields: Vec<Option<String>> = slot.value()
.map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline slot".into() })?;
let field = |i: usize| -> String {
fields
.get(i)
.and_then(|v| v.as_ref())
.cloned()
.unwrap_or_default()
};
summaries.push(ExecutionSummary {
execution_id: eid,
namespace: field(0),
lane_id: field(1),
execution_kind: field(2),
public_state: field(3),
priority: field(4).parse().unwrap_or(0),
created_at: field(5),
});
}
let total = summaries.len();
Ok(ListExecutionsResult {
executions: summaries,
total_returned: total,
})
}
pub async fn replay_execution(
&self,
execution_id: &ExecutionId,
) -> Result<ReplayExecutionResult, ServerError> {
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let idx = IndexKeys::new(&partition);
let dyn_fields: Vec<Option<String>> = self
.client
.cmd("HMGET")
.arg(ctx.core())
.arg("lane_id")
.arg("flow_id")
.arg("terminal_outcome")
.execute()
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET replay pre-read".into() })?;
let lane = LaneId::new(
dyn_fields
.first()
.and_then(|v| v.as_ref())
.cloned()
.unwrap_or_else(|| "default".to_owned()),
);
let flow_id_str = dyn_fields
.get(1)
.and_then(|v| v.as_ref())
.cloned()
.unwrap_or_default();
let terminal_outcome = dyn_fields
.get(2)
.and_then(|v| v.as_ref())
.cloned()
.unwrap_or_default();
let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
let mut fcall_keys: Vec<String> = vec![
ctx.core(),
idx.lane_terminal(&lane),
idx.lane_eligible(&lane),
ctx.lease_history(),
];
let now = TimestampMs::now();
let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
if is_skipped_flow_member {
let flow_id = FlowId::parse(&flow_id_str)
.map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
let flow_part =
flow_partition(&flow_id, &self.config.partition_config);
let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
let edge_ids: Vec<String> = self
.client
.cmd("SMEMBERS")
.arg(flow_ctx.incoming(execution_id))
.execute()
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "SMEMBERS replay edges".into() })?;
fcall_keys.push(idx.lane_blocked_dependencies(&lane)); fcall_keys.push(ctx.deps_meta()); fcall_keys.push(ctx.deps_unresolved()); for eid_str in &edge_ids {
let edge_id = EdgeId::parse(eid_str)
.unwrap_or_else(|_| EdgeId::new());
fcall_keys.push(ctx.dep_edge(&edge_id)); fcall_args.push(eid_str.clone()); }
}
let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
.await?;
parse_replay_result(&raw)
}
pub async fn read_attempt_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from_id: &str,
to_id: &str,
count_limit: u64,
) -> Result<ff_core::contracts::StreamFrames, ServerError> {
use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
if count_limit == 0 {
return Err(ServerError::InvalidInput(
"count_limit must be >= 1".to_owned(),
));
}
let permit = match self.stream_semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(tokio::sync::TryAcquireError::NoPermits) => {
return Err(ServerError::ConcurrencyLimitExceeded(
"stream_ops",
self.config.max_concurrent_stream_ops,
));
}
Err(tokio::sync::TryAcquireError::Closed) => {
return Err(ServerError::OperationFailed(
"stream semaphore closed (server shutting down)".into(),
));
}
};
let args = ReadFramesArgs {
execution_id: execution_id.clone(),
attempt_index,
from_id: from_id.to_owned(),
to_id: to_id.to_owned(),
count_limit,
};
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
let result = ff_script::functions::stream::ff_read_attempt_stream(
&self.tail_client, &keys, &args,
)
.await
.map_err(script_error_to_server);
drop(permit);
match result? {
ReadFramesResult::Frames(f) => Ok(f),
}
}
pub async fn tail_attempt_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
last_id: &str,
block_ms: u64,
count_limit: u64,
) -> Result<ff_core::contracts::StreamFrames, ServerError> {
if count_limit == 0 {
return Err(ServerError::InvalidInput(
"count_limit must be >= 1".to_owned(),
));
}
let permit = match self.stream_semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(tokio::sync::TryAcquireError::NoPermits) => {
return Err(ServerError::ConcurrencyLimitExceeded(
"stream_ops",
self.config.max_concurrent_stream_ops,
));
}
Err(tokio::sync::TryAcquireError::Closed) => {
return Err(ServerError::OperationFailed(
"stream semaphore closed (server shutting down)".into(),
));
}
};
let partition = execution_partition(execution_id, &self.config.partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let stream_key = ctx.stream(attempt_index);
let stream_meta_key = ctx.stream_meta(attempt_index);
let _xread_guard = self.xread_block_lock.lock().await;
let result = ff_script::stream_tail::xread_block(
&self.tail_client,
&stream_key,
&stream_meta_key,
last_id,
block_ms,
count_limit,
)
.await
.map_err(script_error_to_server);
drop(_xread_guard);
drop(permit);
result
}
pub async fn shutdown(self) {
tracing::info!("shutting down FlowFabric server");
self.stream_semaphore.close();
tracing::info!(
"stream semaphore closed; no new read/tail attempts will be accepted"
);
let drain_timeout = Duration::from_secs(15);
let background = self.background_tasks.clone();
let drain = async move {
let mut guard = background.lock().await;
while guard.join_next().await.is_some() {}
};
match tokio::time::timeout(drain_timeout, drain).await {
Ok(()) => {}
Err(_) => {
tracing::warn!(
timeout_s = drain_timeout.as_secs(),
"shutdown: background tasks did not finish in time, aborting"
);
self.background_tasks.lock().await.abort_all();
}
}
self.engine.shutdown().await;
tracing::info!("FlowFabric server shutdown complete");
}
}
const REQUIRED_VALKEY_MAJOR: u32 = 7;
const REQUIRED_VALKEY_MINOR: u32 = 2;
const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
let mut backoff = Duration::from_millis(200);
loop {
let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
match query_valkey_version(client).await {
Ok((detected_major, detected_minor))
if (detected_major, detected_minor)
>= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
{
tracing::info!(
detected_major,
detected_minor,
required_major = REQUIRED_VALKEY_MAJOR,
required_minor = REQUIRED_VALKEY_MINOR,
"Valkey version accepted"
);
return Ok(());
}
Ok((detected_major, detected_minor)) => (
true,
ServerError::ValkeyVersionTooLow {
detected: format!("{detected_major}.{detected_minor}"),
required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
},
format!(
"detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
),
),
Err(e) => {
let retryable = e
.valkey_kind()
.map(ff_script::retry::is_retryable_kind)
.unwrap_or(true);
let detail = e.to_string();
(retryable, e, detail)
}
};
if !should_retry {
return Err(err_for_budget_exhaust);
}
if tokio::time::Instant::now() >= deadline {
return Err(err_for_budget_exhaust);
}
tracing::warn!(
backoff_ms = backoff.as_millis() as u64,
detail = %log_detail,
"valkey version check transient failure; retrying"
);
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(5));
}
}
async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
let raw: Value = client
.cmd("INFO")
.arg("server")
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: "INFO server".into(),
})?;
let bodies = extract_info_bodies(&raw)?;
let mut min_version: Option<(u32, u32)> = None;
for body in &bodies {
let version = parse_valkey_version(body)?;
min_version = Some(match min_version {
None => version,
Some(existing) => existing.min(version),
});
}
min_version.ok_or_else(|| {
ServerError::OperationFailed(
"valkey version check: cluster INFO returned no node bodies".into(),
)
})
}
fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
match raw {
Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
Value::SimpleString(s) => Ok(vec![s.clone()]),
Value::Map(entries) => {
if entries.is_empty() {
return Err(ServerError::OperationFailed(
"valkey version check: cluster INFO returned empty map".into(),
));
}
let mut out = Vec::with_capacity(entries.len());
for (_, body) in entries {
out.extend(extract_info_bodies(body)?);
}
Ok(out)
}
other => Err(ServerError::OperationFailed(format!(
"valkey version check: unexpected INFO shape: {other:?}"
))),
}
}
fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
let trimmed = line.trim();
let mut parts = trimmed.split('.');
let major_str = parts.next().unwrap_or("").trim();
if major_str.is_empty() {
return Err(ServerError::OperationFailed(format!(
"valkey version check: empty version field in '{trimmed}'"
)));
}
let major = major_str.parse::<u32>().map_err(|_| {
ServerError::OperationFailed(format!(
"valkey version check: non-numeric major in '{trimmed}'"
))
})?;
let minor_str = parts.next().unwrap_or("").trim();
if minor_str.is_empty() {
return Err(ServerError::OperationFailed(format!(
"valkey version check: missing minor component in '{trimmed}'"
)));
}
let minor = minor_str.parse::<u32>().map_err(|_| {
ServerError::OperationFailed(format!(
"valkey version check: non-numeric minor in '{trimmed}'"
))
})?;
Ok((major, minor))
};
if let Some(valkey_line) = info
.lines()
.find_map(|line| line.strip_prefix("valkey_version:"))
{
return extract_major_minor(valkey_line);
}
let server_is_valkey = info
.lines()
.map(str::trim)
.any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
if !server_is_valkey {
return Err(ServerError::OperationFailed(
"valkey version check: INFO missing valkey_version and server_name:valkey marker \
(unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
.into(),
));
}
if let Some(redis_line) = info
.lines()
.find_map(|line| line.strip_prefix("redis_version:"))
{
return extract_major_minor(redis_line);
}
Err(ServerError::OperationFailed(
"valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
.into(),
))
}
async fn validate_or_create_partition_config(
client: &Client,
config: &PartitionConfig,
) -> Result<(), ServerError> {
let key = keys::global_config_partitions();
let existing: HashMap<String, String> = client
.hgetall(&key)
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
if existing.is_empty() {
tracing::info!("first boot: creating {key}");
client
.hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_flow_partitions".into() })?;
client
.hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_budget_partitions".into() })?;
client
.hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_quota_partitions".into() })?;
return Ok(());
}
let check = |field: &str, expected: u16| -> Result<(), ServerError> {
let stored: u16 = existing
.get(field)
.and_then(|v| v.parse().ok())
.unwrap_or(0);
if stored != expected {
return Err(ServerError::PartitionMismatch(format!(
"{field}: stored={stored}, config={expected}. \
Partition counts are fixed at deployment time. \
Either fix your config or migrate the data."
)));
}
Ok(())
};
check("num_flow_partitions", config.num_flow_partitions)?;
check("num_budget_partitions", config.num_budget_partitions)?;
check("num_quota_partitions", config.num_quota_partitions)?;
tracing::info!("partition config validated against stored {key}");
Ok(())
}
const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
enum PartitionBootOutcome {
Match,
Mismatch,
Repaired,
Installed,
}
const BOOT_INIT_CONCURRENCY: usize = 16;
async fn init_one_partition(
client: &Client,
partition: Partition,
secret_hex: &str,
) -> Result<PartitionBootOutcome, ServerError> {
let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
let stored_kid: Option<String> = client
.cmd("HGET")
.arg(&key)
.arg("current_kid")
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: format!("HGET {key} current_kid (init probe)"),
})?;
if let Some(stored_kid) = stored_kid {
let field = format!("secret:{stored_kid}");
let stored_secret: Option<String> = client
.hget(&key, &field)
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: format!("HGET {key} secret:<kid> (init check)"),
})?;
if stored_secret.is_none() {
client
.hset(&key, &field, secret_hex)
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: format!("HSET {key} secret:<kid> (repair torn write)"),
})?;
return Ok(PartitionBootOutcome::Repaired);
}
if stored_secret.as_deref() != Some(secret_hex) {
return Ok(PartitionBootOutcome::Mismatch);
}
return Ok(PartitionBootOutcome::Match);
}
let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
let _: i64 = client
.cmd("HSET")
.arg(&key)
.arg("current_kid")
.arg(WAITPOINT_HMAC_INITIAL_KID)
.arg(&secret_field)
.arg(secret_hex)
.execute()
.await
.map_err(|e| ServerError::ValkeyContext {
source: e,
context: format!("HSET {key} (init waitpoint HMAC atomic)"),
})?;
Ok(PartitionBootOutcome::Installed)
}
async fn initialize_waitpoint_hmac_secret(
client: &Client,
partition_config: &PartitionConfig,
secret_hex: &str,
) -> Result<(), ServerError> {
use futures::stream::{FuturesUnordered, StreamExt};
let n = partition_config.num_flow_partitions;
tracing::info!(
partitions = n,
concurrency = BOOT_INIT_CONCURRENCY,
"installing waitpoint HMAC secret across {n} execution partitions"
);
let mut mismatch_count: u16 = 0;
let mut repaired_count: u16 = 0;
let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
let mut next_index: u16 = 0;
loop {
while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
let partition = Partition {
family: PartitionFamily::Execution,
index: next_index,
};
let client = client.clone();
let secret_hex = secret_hex.to_owned();
pending.push(async move {
init_one_partition(&client, partition, &secret_hex).await
});
next_index += 1;
}
match pending.next().await {
Some(res) => match res? {
PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
PartitionBootOutcome::Mismatch => mismatch_count += 1,
PartitionBootOutcome::Repaired => repaired_count += 1,
},
None => break,
}
}
if repaired_count > 0 {
tracing::warn!(
repaired_partitions = repaired_count,
total_partitions = n,
"repaired {repaired_count} partitions with torn waitpoint HMAC writes \
(current_kid present but secret:<kid> missing, likely crash during prior boot)"
);
}
if mismatch_count > 0 {
tracing::warn!(
mismatched_partitions = mismatch_count,
total_partitions = n,
"stored/env secret mismatch on {mismatch_count} partitions — \
env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
run POST /v1/admin/rotate-waitpoint-secret to sync"
);
}
tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
Ok(())
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct RotateWaitpointSecretResult {
pub rotated: u16,
pub failed: Vec<u16>,
pub new_kid: String,
}
impl Server {
pub async fn rotate_waitpoint_secret(
&self,
new_kid: &str,
new_secret_hex: &str,
) -> Result<RotateWaitpointSecretResult, ServerError> {
if new_kid.is_empty() || new_kid.contains(':') {
return Err(ServerError::OperationFailed(
"new_kid must be non-empty and must not contain ':'".into(),
));
}
if new_secret_hex.is_empty()
|| !new_secret_hex.len().is_multiple_of(2)
|| !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
{
return Err(ServerError::OperationFailed(
"new_secret_hex must be a non-empty even-length hex string".into(),
));
}
let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(tokio::sync::TryAcquireError::NoPermits) => {
return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
}
Err(tokio::sync::TryAcquireError::Closed) => {
return Err(ServerError::OperationFailed(
"admin rotate semaphore closed (server shutting down)".into(),
));
}
};
let n = self.config.partition_config.num_flow_partitions;
let grace_ms = self.config.waitpoint_hmac_grace_ms;
use futures::stream::{FuturesUnordered, StreamExt};
let mut rotated = 0u16;
let mut failed = Vec::new();
let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
let mut next_index: u16 = 0;
loop {
while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
let partition = Partition {
family: PartitionFamily::Execution,
index: next_index,
};
let idx = next_index;
let new_kid_owned = new_kid.to_owned();
let new_secret_owned = new_secret_hex.to_owned();
let partition_owned = partition;
let fut = async move {
let outcome = self
.rotate_single_partition(
&partition_owned,
&new_kid_owned,
&new_secret_owned,
grace_ms,
)
.await;
(idx, partition_owned, outcome)
};
pending.push(fut);
next_index += 1;
}
match pending.next().await {
Some((idx, partition, outcome)) => match outcome {
Ok(()) => {
rotated += 1;
tracing::debug!(
partition = %partition,
new_kid = %new_kid,
"waitpoint_hmac_rotated"
);
}
Err(e) => {
tracing::error!(
target: "audit",
partition = %partition,
err = %e,
"waitpoint_hmac_rotation_failed"
);
failed.push(idx);
}
},
None => break,
}
}
tracing::info!(
target: "audit",
new_kid = %new_kid,
total_partitions = n,
rotated,
failed_count = failed.len(),
"waitpoint_hmac_rotation_complete"
);
Ok(RotateWaitpointSecretResult {
rotated,
failed,
new_kid: new_kid.to_owned(),
})
}
async fn rotate_single_partition(
&self,
partition: &Partition,
new_kid: &str,
new_secret_hex: &str,
grace_ms: u64,
) -> Result<(), ServerError> {
let idx = IndexKeys::new(partition);
let args = RotateWaitpointHmacSecretArgs {
new_kid: new_kid.to_owned(),
new_secret_hex: new_secret_hex.to_owned(),
grace_ms,
};
let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
&self.client,
&idx,
&args,
)
.await
.map_err(|e| match e {
ff_script::ScriptError::RotationConflict(kid) => {
ServerError::OperationFailed(format!(
"rotation conflict: kid {kid} already installed with a \
different secret. Either use a fresh kid or restore the \
original secret for this kid before retrying."
))
}
ff_script::ScriptError::Valkey(v) => ServerError::ValkeyContext {
source: v,
context: format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
},
other => ServerError::OperationFailed(format!(
"rotation failed on partition {partition}: {other}"
)),
})?;
let _ = outcome;
Ok(())
}
}
fn parse_create_result(
raw: &Value,
execution_id: &ExecutionId,
) -> Result<CreateExecutionResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
};
if status == 1 {
let sub = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
if sub == "DUPLICATE" {
Ok(CreateExecutionResult::Duplicate {
execution_id: execution_id.clone(),
})
} else {
Ok(CreateExecutionResult::Created {
execution_id: execution_id.clone(),
public_state: PublicState::Waiting,
})
}
} else {
let error_code = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_else(|| "unknown".to_owned());
Err(ServerError::OperationFailed(format!(
"ff_create_execution failed: {error_code}"
)))
}
}
fn parse_cancel_result(
raw: &Value,
execution_id: &ExecutionId,
) -> Result<CancelExecutionResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
};
if status == 1 {
Ok(CancelExecutionResult::Cancelled {
execution_id: execution_id.clone(),
public_state: PublicState::Cancelled,
})
} else {
let error_code = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_else(|| "unknown".to_owned());
Err(ServerError::OperationFailed(format!(
"ff_cancel_execution failed: {error_code}"
)))
}
}
fn parse_budget_create_result(
raw: &Value,
budget_id: &BudgetId,
) -> Result<CreateBudgetResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
};
if status == 1 {
let sub = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
if sub == "ALREADY_SATISFIED" {
Ok(CreateBudgetResult::AlreadySatisfied {
budget_id: budget_id.clone(),
})
} else {
Ok(CreateBudgetResult::Created {
budget_id: budget_id.clone(),
})
}
} else {
let error_code = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_else(|| "unknown".to_owned());
Err(ServerError::OperationFailed(format!(
"ff_create_budget failed: {error_code}"
)))
}
}
fn parse_quota_create_result(
raw: &Value,
quota_policy_id: &QuotaPolicyId,
) -> Result<CreateQuotaPolicyResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
};
if status == 1 {
let sub = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
if sub == "ALREADY_SATISFIED" {
Ok(CreateQuotaPolicyResult::AlreadySatisfied {
quota_policy_id: quota_policy_id.clone(),
})
} else {
Ok(CreateQuotaPolicyResult::Created {
quota_policy_id: quota_policy_id.clone(),
})
}
} else {
let error_code = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_else(|| "unknown".to_owned());
Err(ServerError::OperationFailed(format!(
"ff_create_quota_policy failed: {error_code}"
)))
}
}
fn parse_create_flow_result(
raw: &Value,
flow_id: &FlowId,
) -> Result<CreateFlowResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
};
if status == 1 {
let sub = fcall_field_str(arr, 1);
if sub == "ALREADY_SATISFIED" {
Ok(CreateFlowResult::AlreadySatisfied {
flow_id: flow_id.clone(),
})
} else {
Ok(CreateFlowResult::Created {
flow_id: flow_id.clone(),
})
}
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_create_flow failed: {error_code}"
)))
}
}
fn parse_add_execution_to_flow_result(
raw: &Value,
) -> Result<AddExecutionToFlowResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(ServerError::Script(
"ff_add_execution_to_flow: expected Array".into(),
))
}
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(ServerError::Script(
"ff_add_execution_to_flow: bad status code".into(),
))
}
};
if status == 1 {
let sub = fcall_field_str(arr, 1);
let eid_str = fcall_field_str(arr, 2);
let nc_str = fcall_field_str(arr, 3);
let eid = ExecutionId::parse(&eid_str)
.map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
let nc: u32 = nc_str.parse().unwrap_or(0);
if sub == "ALREADY_SATISFIED" {
Ok(AddExecutionToFlowResult::AlreadyMember {
execution_id: eid,
node_count: nc,
})
} else {
Ok(AddExecutionToFlowResult::Added {
execution_id: eid,
new_node_count: nc,
})
}
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_add_execution_to_flow failed: {error_code}"
)))
}
}
enum ParsedCancelFlow {
Cancelled {
policy: String,
member_execution_ids: Vec<String>,
},
AlreadyTerminal,
}
fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
};
if status != 1 {
let error_code = fcall_field_str(arr, 1);
if error_code == "flow_already_terminal" {
return Ok(ParsedCancelFlow::AlreadyTerminal);
}
return Err(ServerError::OperationFailed(format!(
"ff_cancel_flow failed: {error_code}"
)));
}
let policy = fcall_field_str(arr, 2);
let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
for i in 3..arr.len() {
members.push(fcall_field_str(arr, i));
}
Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
}
fn parse_stage_dependency_edge_result(
raw: &Value,
) -> Result<StageDependencyEdgeResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
};
if status == 1 {
let edge_id_str = fcall_field_str(arr, 2);
let rev_str = fcall_field_str(arr, 3);
let edge_id = EdgeId::parse(&edge_id_str)
.map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
let rev: u64 = rev_str.parse().unwrap_or(0);
Ok(StageDependencyEdgeResult::Staged {
edge_id,
new_graph_revision: rev,
})
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_stage_dependency_edge failed: {error_code}"
)))
}
}
fn parse_apply_dependency_result(
raw: &Value,
) -> Result<ApplyDependencyToChildResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
};
if status == 1 {
let sub = fcall_field_str(arr, 1);
if sub == "ALREADY_APPLIED" || sub == "already_applied" {
Ok(ApplyDependencyToChildResult::AlreadyApplied)
} else {
let count_str = fcall_field_str(arr, 2);
let count: u32 = count_str.parse().unwrap_or(0);
Ok(ApplyDependencyToChildResult::Applied {
unsatisfied_count: count,
})
}
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_apply_dependency_to_child failed: {error_code}"
)))
}
}
fn parse_deliver_signal_result(
raw: &Value,
signal_id: &SignalId,
) -> Result<DeliverSignalResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
};
if status == 1 {
let sub = fcall_field_str(arr, 1);
if sub == "DUPLICATE" {
let existing_str = fcall_field_str(arr, 2);
let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
Ok(DeliverSignalResult::Duplicate {
existing_signal_id: existing_id,
})
} else {
let effect = fcall_field_str(arr, 3);
Ok(DeliverSignalResult::Accepted {
signal_id: signal_id.clone(),
effect,
})
}
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_deliver_signal failed: {error_code}"
)))
}
}
fn parse_change_priority_result(
raw: &Value,
execution_id: &ExecutionId,
) -> Result<ChangePriorityResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
};
if status == 1 {
Ok(ChangePriorityResult::Changed {
execution_id: execution_id.clone(),
})
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_change_priority failed: {error_code}"
)))
}
}
fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
};
if status == 1 {
let unsatisfied = fcall_field_str(arr, 2);
let ps = if unsatisfied == "0" {
PublicState::Waiting
} else {
PublicState::WaitingChildren
};
Ok(ReplayExecutionResult::Replayed { public_state: ps })
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_replay_execution failed: {error_code}"
)))
}
}
fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
match e {
ff_script::error::ScriptError::Valkey(valkey_err) => ServerError::ValkeyContext {
source: valkey_err,
context: "stream FCALL transport".into(),
},
other => ServerError::Script(other.to_string()),
}
}
fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
match arr.get(index) {
Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
Some(Ok(Value::SimpleString(s))) => s.clone(),
Some(Ok(Value::Int(n))) => n.to_string(),
_ => String::new(),
}
}
fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(ServerError::Script(
"ff_report_usage_and_check: expected Int status code".into(),
))
}
};
if status_code != 1 {
let error_code = fcall_field_str(arr, 1);
return Err(ServerError::OperationFailed(format!(
"ff_report_usage_and_check failed: {error_code}"
)));
}
let sub_status = fcall_field_str(arr, 1);
match sub_status.as_str() {
"OK" => Ok(ReportUsageResult::Ok),
"ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
"SOFT_BREACH" => {
let dim = fcall_field_str(arr, 2);
let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
}
"HARD_BREACH" => {
let dim = fcall_field_str(arr, 2);
let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
Ok(ReportUsageResult::HardBreach {
dimension: dim,
current_usage: current,
hard_limit: limit,
})
}
_ => Err(ServerError::OperationFailed(format!(
"ff_report_usage_and_check: unknown sub-status: {sub_status}"
))),
}
}
fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
};
if status == 1 {
let sub = fcall_field_str(arr, 1);
if sub == "ALREADY_SATISFIED" {
let reason = fcall_field_str(arr, 2);
Ok(RevokeLeaseResult::AlreadySatisfied { reason })
} else {
let lid = fcall_field_str(arr, 2);
let epoch = fcall_field_str(arr, 3);
Ok(RevokeLeaseResult::Revoked {
lease_id: lid,
lease_epoch: epoch,
})
}
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_revoke_lease failed: {error_code}"
)))
}
}
fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
return true;
}
e.detail()
.map(|d| {
d.contains("Function not loaded")
|| d.contains("No matching function")
|| d.contains("function not found")
})
.unwrap_or(false)
|| e.to_string().contains("Function not loaded")
}
async fn fcall_with_reload_on_client(
client: &Client,
function: &str,
keys: &[&str],
args: &[&str],
) -> Result<Value, ServerError> {
match client.fcall(function, keys, args).await {
Ok(v) => Ok(v),
Err(e) if is_function_not_loaded(&e) => {
tracing::warn!(function, "Lua library not found on server, reloading");
ff_script::loader::ensure_library(client)
.await
.map_err(ServerError::LibraryLoad)?;
client
.fcall(function, keys, args)
.await
.map_err(ServerError::Valkey)
}
Err(e) => Err(ServerError::Valkey(e)),
}
}
async fn build_cancel_execution_fcall(
client: &Client,
partition_config: &PartitionConfig,
args: &CancelExecutionArgs,
) -> Result<(Vec<String>, Vec<String>), ServerError> {
let partition = execution_partition(&args.execution_id, partition_config);
let ctx = ExecKeyContext::new(&partition, &args.execution_id);
let idx = IndexKeys::new(&partition);
let lane_str: Option<String> = client
.hget(&ctx.core(), "lane_id")
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
let dyn_fields: Vec<Option<String>> = client
.cmd("HMGET")
.arg(ctx.core())
.arg("current_attempt_index")
.arg("current_waitpoint_id")
.arg("current_worker_instance_id")
.execute()
.await
.map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET cancel pre-read".into() })?;
let att_idx_val = dyn_fields.first()
.and_then(|v| v.as_ref())
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(0);
let att_idx = AttemptIndex::new(att_idx_val);
let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
let wp_id = if wp_id_str.is_empty() {
WaitpointId::new()
} else {
WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
};
let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
let wiid = WorkerInstanceId::new(&wiid_str);
let keys: Vec<String> = vec![
ctx.core(), ctx.attempt_hash(att_idx), ctx.stream_meta(att_idx), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&wiid), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&lane), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&lane), idx.lane_delayed(&lane), idx.lane_blocked_dependencies(&lane), idx.lane_blocked_budget(&lane), idx.lane_blocked_quota(&lane), idx.lane_blocked_route(&lane), idx.lane_blocked_operator(&lane), ];
let argv: Vec<String> = vec![
args.execution_id.to_string(),
args.reason.clone(),
args.source.to_string(),
args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
];
Ok((keys, argv))
}
const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
fn extract_valkey_kind(e: &ServerError) -> Option<ferriskey::ErrorKind> {
match e {
ServerError::Valkey(err) | ServerError::ValkeyContext { source: err, .. } => {
Some(err.kind())
}
ServerError::LibraryLoad(load_err) => load_err.valkey_kind(),
_ => None,
}
}
async fn ack_cancel_member(
client: &Client,
pending_cancels_key: &str,
cancel_backlog_key: &str,
eid_str: &str,
flow_id: &str,
) {
let keys = [pending_cancels_key, cancel_backlog_key];
let args_v = [eid_str, flow_id];
let fut: Result<Value, _> =
client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
if let Err(e) = fut {
tracing::warn!(
flow_id = %flow_id,
execution_id = %eid_str,
error = %e,
"ff_ack_cancel_member failed; reconciler will drain on next pass"
);
}
}
fn is_terminal_ack_error(err: &ServerError) -> bool {
match err {
ServerError::OperationFailed(msg) => {
msg.contains("execution_not_active") || msg.contains("execution_not_found")
}
_ => false,
}
}
async fn cancel_member_execution(
client: &Client,
partition_config: &PartitionConfig,
eid_str: &str,
reason: &str,
now: TimestampMs,
) -> Result<(), ServerError> {
let execution_id = ExecutionId::parse(eid_str)
.map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
let args = CancelExecutionArgs {
execution_id: execution_id.clone(),
reason: reason.to_owned(),
source: CancelSource::OperatorOverride,
lease_id: None,
lease_epoch: None,
attempt_id: None,
now,
};
let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
let is_last = attempt_idx + 1 == attempts;
match try_cancel_member_once(client, partition_config, &args).await {
Ok(()) => return Ok(()),
Err(e) => {
let retryable = extract_valkey_kind(&e)
.map(ff_script::retry::is_retryable_kind)
.unwrap_or(false);
if !retryable || is_last {
return Err(e);
}
tracing::debug!(
execution_id = %execution_id,
attempt = attempt_idx + 1,
delay_ms = *delay_ms,
error = %e,
"cancel_member_execution: transient error, retrying"
);
tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
}
}
}
Err(ServerError::OperationFailed(format!(
"cancel_member_execution: retries exhausted for {execution_id}"
)))
}
async fn try_cancel_member_once(
client: &Client,
partition_config: &PartitionConfig,
args: &CancelExecutionArgs,
) -> Result<(), ServerError> {
let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
let raw =
fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
parse_cancel_result(&raw, &args.execution_id).map(|_| ())
}
fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
};
let status = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
};
if status == 1 {
let next_str = fcall_field_str(arr, 2);
let next_ms: i64 = next_str.parse().unwrap_or(0);
Ok(ResetBudgetResult::Reset {
next_reset_at: TimestampMs::from_millis(next_ms),
})
} else {
let error_code = fcall_field_str(arr, 1);
Err(ServerError::OperationFailed(format!(
"ff_reset_budget failed: {error_code}"
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use ferriskey::ErrorKind;
fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
ferriskey::Error::from((kind, "synthetic"))
}
#[test]
fn create_budget_rejects_over_cap_dimension_count() {
let n = MAX_BUDGET_DIMENSIONS + 1;
let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
let hard = vec![1u64; n];
let soft = vec![0u64; n];
let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
match err {
ServerError::InvalidInput(msg) => {
assert!(msg.contains("too_many_dimensions"), "got: {msg}");
assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
assert!(msg.contains(&format!("got={n}")), "got: {msg}");
}
other => panic!("expected InvalidInput, got {other:?}"),
}
}
#[test]
fn create_budget_accepts_exactly_cap_dimensions() {
let n = MAX_BUDGET_DIMENSIONS;
let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
let hard = vec![1u64; n];
let soft = vec![0u64; n];
assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
}
#[test]
fn create_budget_rejects_hard_limit_length_mismatch() {
let dims = vec!["a".to_string(), "b".to_string()];
let hard = vec![1u64]; let soft = vec![0u64, 0u64];
let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
match err {
ServerError::InvalidInput(msg) => {
assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
assert!(msg.contains("hard_limits=1"), "got: {msg}");
assert!(msg.contains("dimensions=2"), "got: {msg}");
}
other => panic!("expected InvalidInput, got {other:?}"),
}
}
#[test]
fn create_budget_rejects_soft_limit_length_mismatch() {
let dims = vec!["a".to_string(), "b".to_string()];
let hard = vec![1u64, 2u64];
let soft = vec![0u64, 0u64, 0u64]; let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
match err {
ServerError::InvalidInput(msg) => {
assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
assert!(msg.contains("soft_limits=3"), "got: {msg}");
}
other => panic!("expected InvalidInput, got {other:?}"),
}
}
#[test]
fn report_usage_rejects_over_cap_dimension_count() {
let n = MAX_BUDGET_DIMENSIONS + 1;
let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
let deltas = vec![1u64; n];
let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
match err {
ServerError::InvalidInput(msg) => {
assert!(msg.contains("too_many_dimensions"), "got: {msg}");
assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
}
other => panic!("expected InvalidInput, got {other:?}"),
}
}
#[test]
fn report_usage_accepts_exactly_cap_dimensions() {
let n = MAX_BUDGET_DIMENSIONS;
let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
let deltas = vec![1u64; n];
assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
}
#[test]
fn report_usage_rejects_delta_length_mismatch() {
let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
let deltas = vec![1u64, 2u64]; let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
match err {
ServerError::InvalidInput(msg) => {
assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
assert!(msg.contains("dimensions=3"), "got: {msg}");
assert!(msg.contains("deltas=2"), "got: {msg}");
}
other => panic!("expected InvalidInput, got {other:?}"),
}
}
#[test]
fn report_usage_accepts_empty_dimensions() {
assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
}
#[test]
fn is_retryable_valkey_variant_uses_kind_table() {
assert!(ServerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
assert!(ServerError::Valkey(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
assert!(ServerError::Valkey(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
assert!(ServerError::Valkey(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
assert!(ServerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Ask)).is_retryable());
assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
}
#[test]
fn is_retryable_valkey_context_uses_kind_table() {
let err = ServerError::ValkeyContext {
source: mk_fk_err(ErrorKind::IoError),
context: "HGET test".into(),
};
assert!(err.is_retryable());
let err = ServerError::ValkeyContext {
source: mk_fk_err(ErrorKind::AuthenticationFailed),
context: "auth".into(),
};
assert!(!err.is_retryable());
}
#[test]
fn is_retryable_library_load_delegates_to_inner_kind() {
let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
mk_fk_err(ErrorKind::IoError),
));
assert!(err.is_retryable());
let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
mk_fk_err(ErrorKind::AuthenticationFailed),
));
assert!(!err.is_retryable());
let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
expected: "1".into(),
got: "2".into(),
});
assert!(!err.is_retryable());
}
#[test]
fn is_retryable_business_logic_variants_are_false() {
assert!(!ServerError::NotFound("x".into()).is_retryable());
assert!(!ServerError::InvalidInput("x".into()).is_retryable());
assert!(!ServerError::OperationFailed("x".into()).is_retryable());
assert!(!ServerError::Script("x".into()).is_retryable());
assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
}
#[test]
fn valkey_kind_delegates_through_library_load() {
let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
mk_fk_err(ErrorKind::ClusterDown),
));
assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
expected: "1".into(),
got: "2".into(),
});
assert_eq!(err.valkey_kind(), None);
}
#[test]
fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
let info = "\
# Server\r\n\
redis_version:7.2.4\r\n\
valkey_version:9.0.3\r\n\
server_mode:cluster\r\n\
os:Linux\r\n";
assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
}
#[test]
fn parse_valkey_version_real_valkey_8_cluster_body() {
let info = "\
# Server\r\n\
redis_version:7.2.4\r\n\
server_name:valkey\r\n\
valkey_version:9.0.3\r\n\
valkey_release_stage:ga\r\n\
redis_git_sha1:00000000\r\n\
server_mode:cluster\r\n";
assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
}
#[test]
fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
}
#[test]
fn parse_valkey_version_rejects_redis_backend() {
let info = "\
# Server\r\n\
redis_version:7.4.0\r\n\
redis_mode:standalone\r\n\
os:Linux\r\n";
let err = parse_valkey_version(info).unwrap_err();
assert!(matches!(err, ServerError::OperationFailed(_)));
let msg = err.to_string();
assert!(
msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
"expected Redis-rejection message, got: {msg}"
);
}
#[test]
fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
}
#[test]
fn parse_valkey_version_errors_when_no_version_field() {
let info = "# Server\r\nfoo:bar\r\n";
let err = parse_valkey_version(info).unwrap_err();
assert!(matches!(err, ServerError::OperationFailed(_)));
assert!(
err.to_string().contains("missing"),
"expected 'missing' in message, got: {err}"
);
}
#[test]
fn parse_valkey_version_errors_on_non_numeric_major() {
let info = "valkey_version:invalid.x.y\n";
let err = parse_valkey_version(info).unwrap_err();
assert!(matches!(err, ServerError::OperationFailed(_)));
assert!(err.to_string().contains("non-numeric major"));
}
#[test]
fn parse_valkey_version_errors_on_non_numeric_minor() {
let info = "valkey_version:7.x.0\n";
let err = parse_valkey_version(info).unwrap_err();
assert!(matches!(err, ServerError::OperationFailed(_)));
assert!(err.to_string().contains("non-numeric minor"));
}
#[test]
fn parse_valkey_version_errors_on_missing_minor() {
let info = "valkey_version:7\n";
let err = parse_valkey_version(info).unwrap_err();
assert!(matches!(err, ServerError::OperationFailed(_)));
assert!(err.to_string().contains("missing minor"));
}
#[test]
fn extract_info_bodies_unwraps_cluster_map_all_entries() {
let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
let map = Value::Map(vec![
(
Value::SimpleString("127.0.0.1:7000".to_string()),
Value::VerbatimString {
format: ferriskey::value::VerbatimFormat::Text,
text: body_a.to_string(),
},
),
(
Value::SimpleString("127.0.0.1:7001".to_string()),
Value::VerbatimString {
format: ferriskey::value::VerbatimFormat::Text,
text: body_b.to_string(),
},
),
]);
let bodies = extract_info_bodies(&map).unwrap();
assert_eq!(bodies.len(), 2);
assert_eq!(bodies[0], body_a);
assert_eq!(bodies[1], body_b);
}
#[test]
fn extract_info_bodies_handles_simple_string() {
let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
let v = Value::SimpleString(body_text.to_string());
let bodies = extract_info_bodies(&v).unwrap();
assert_eq!(bodies, vec![body_text.to_string()]);
}
#[test]
fn extract_info_bodies_rejects_empty_cluster_map() {
let map = Value::Map(vec![]);
let err = extract_info_bodies(&map).unwrap_err();
assert!(matches!(err, ServerError::OperationFailed(_)));
assert!(err.to_string().contains("empty map"));
}
#[test]
fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
let map = Value::Map(vec![
(
Value::SimpleString("node1:6379".to_string()),
Value::VerbatimString {
format: ferriskey::value::VerbatimFormat::Text,
text: body_node1.to_string(),
},
),
(
Value::SimpleString("node2:6379".to_string()),
Value::VerbatimString {
format: ferriskey::value::VerbatimFormat::Text,
text: body_node2.to_string(),
},
),
(
Value::SimpleString("node3:6379".to_string()),
Value::VerbatimString {
format: ferriskey::value::VerbatimFormat::Text,
text: body_node3.to_string(),
},
),
]);
let bodies = extract_info_bodies(&map).unwrap();
let min = bodies
.iter()
.map(|b| parse_valkey_version(b).unwrap())
.min()
.unwrap();
assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
assert!(
min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
"mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
);
}
#[test]
fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
let map = Value::Map(vec![
(
Value::SimpleString("node1:6379".to_string()),
Value::VerbatimString {
format: ferriskey::value::VerbatimFormat::Text,
text: body_node1.to_string(),
},
),
(
Value::SimpleString("node2:6379".to_string()),
Value::VerbatimString {
format: ferriskey::value::VerbatimFormat::Text,
text: body_node2.to_string(),
},
),
(
Value::SimpleString("node3:6379".to_string()),
Value::VerbatimString {
format: ferriskey::value::VerbatimFormat::Text,
text: body_node3.to_string(),
},
),
]);
let bodies = extract_info_bodies(&map).unwrap();
let min = bodies
.iter()
.map(|b| parse_valkey_version(b).unwrap())
.min()
.unwrap();
assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
assert!(
min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
"all-above-floor cluster must pass the gate"
);
}
#[test]
fn valkey_version_too_low_is_not_retryable() {
let err = ServerError::ValkeyVersionTooLow {
detected: "7.0".into(),
required: "7.2".into(),
};
assert!(!err.is_retryable());
assert_eq!(err.valkey_kind(), None);
}
#[test]
fn valkey_version_too_low_error_message_includes_both_versions() {
let err = ServerError::ValkeyVersionTooLow {
detected: "7.0".into(),
required: "7.2".into(),
};
let msg = err.to_string();
assert!(msg.contains("7.0"), "detected version in message: {msg}");
assert!(msg.contains("7.2"), "required version in message: {msg}");
assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
}
}