use std::collections::HashMap;
use ferriskey::Client;
use ff_core::partition::PartitionConfig;
use crate::config::WorkerConfig;
use crate::SdkError;
pub(crate) struct PreambleOutput {
pub partition_config: PartitionConfig,
pub capabilities_csv: String,
pub capabilities_hash: String,
}
pub(crate) async fn run(
client: &Client,
config: &WorkerConfig,
) -> Result<PreambleOutput, SdkError> {
let pong: String = client
.cmd("PING")
.execute()
.await
.map_err(|e| crate::backend_context(e, "PING failed"))?;
if pong != "PONG" {
return Err(SdkError::Config {
context: "worker_connect".into(),
field: None,
message: format!("unexpected PING response: {pong}"),
});
}
let alive_key =
ff_core::keys::worker_alive_key_ns(&config.namespace, &config.worker_instance_id);
let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
let set_result: Option<String> = client
.cmd("SET")
.arg(&alive_key)
.arg("1")
.arg("NX")
.arg("PX")
.arg(alive_ttl_ms.to_string().as_str())
.execute()
.await
.map_err(|e| crate::backend_context(e, "SET NX worker alive key"))?;
if set_result.is_none() {
return Err(SdkError::Config {
context: "worker_connect".into(),
field: Some("worker_instance_id".into()),
message: format!(
"duplicate worker_instance_id '{}': another process already holds {alive_key}",
config.worker_instance_id
),
});
}
let partition_config = read_partition_config(client).await.unwrap_or_else(|e| {
tracing::warn!(
error = %e,
"ff:config:partitions not found, using defaults"
);
PartitionConfig::default()
});
for cap in &config.capabilities {
if cap.is_empty() {
return Err(SdkError::Config {
context: "worker_config".into(),
field: Some("capabilities".into()),
message: "capability token must not be empty".into(),
});
}
if cap.contains(',') {
return Err(SdkError::Config {
context: "worker_config".into(),
field: Some("capabilities".into()),
message: format!(
"capability token may not contain ',' (CSV delimiter): {cap:?}"
),
});
}
if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
return Err(SdkError::Config {
context: "worker_config".into(),
field: Some("capabilities".into()),
message: format!(
"capability token must not contain whitespace or control \
characters: {cap:?}"
),
});
}
}
let capabilities_csv: String = {
let set: std::collections::BTreeSet<&str> = config
.capabilities
.iter()
.map(|s| s.as_str())
.filter(|s| !s.is_empty())
.collect();
if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
return Err(SdkError::Config {
context: "worker_config".into(),
field: Some("capabilities".into()),
message: format!(
"capability set exceeds CAPS_MAX_TOKENS ({}): {}",
ff_core::policy::CAPS_MAX_TOKENS,
set.len()
),
});
}
let csv = set.into_iter().collect::<Vec<_>>().join(",");
if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
return Err(SdkError::Config {
context: "worker_config".into(),
field: Some("capabilities".into()),
message: format!(
"capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
ff_core::policy::CAPS_MAX_BYTES,
csv.len()
),
});
}
csv
};
let capabilities_hash = ff_core::hash::fnv1a_xor8hex(&capabilities_csv);
if !capabilities_csv.is_empty() {
tracing::info!(
worker_instance_id = %config.worker_instance_id,
worker_caps_hash = %capabilities_hash,
worker_caps = %capabilities_csv,
"worker connected with capabilities (full CSV — mismatch logs use hash only)"
);
}
{
let caps_key = ff_core::keys::worker_caps_key_ns(
&config.namespace,
&config.worker_instance_id,
);
let index_key = ff_core::keys::workers_index_key_ns(&config.namespace);
let instance_id = config.worker_instance_id.to_string();
if capabilities_csv.is_empty() {
let _ = client
.cmd("DEL")
.arg(&caps_key)
.execute::<Option<i64>>()
.await;
if let Err(e) = client
.cmd("SREM")
.arg(&index_key)
.arg(&instance_id)
.execute::<Option<i64>>()
.await
{
tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
"SREM workers-index failed; continuing (non-authoritative)");
}
} else {
let lanes_csv: String = {
let set: std::collections::BTreeSet<&str> =
config.lanes.iter().map(|l| l.as_str()).collect();
set.into_iter().collect::<Vec<_>>().join(",")
};
let now_ms: u64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let ttl_ms_str = alive_ttl_ms.to_string();
let now_ms_str = now_ms.to_string();
let worker_id_str = config.worker_id.to_string();
if let Err(e) = client
.cmd("HSET")
.arg(&caps_key)
.arg("worker_id")
.arg(worker_id_str.as_str())
.arg("lanes_csv")
.arg(lanes_csv.as_str())
.arg("caps_csv")
.arg(capabilities_csv.as_str())
.arg("ttl_ms")
.arg(ttl_ms_str.as_str())
.arg("registered_at_ms")
.arg(now_ms_str.as_str())
.arg("last_heartbeat_ms")
.arg(now_ms_str.as_str())
.execute::<Option<i64>>()
.await
{
tracing::warn!(error = %e, key = %caps_key,
"HSET worker caps advertisement failed; continuing");
}
if let Err(e) = client
.cmd("SADD")
.arg(&index_key)
.arg(&instance_id)
.execute::<Option<i64>>()
.await
{
tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
"SADD workers-index failed; continuing");
}
}
}
Ok(PreambleOutput {
partition_config,
capabilities_csv,
capabilities_hash,
})
}
async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
let key = ff_core::keys::global_config_partitions();
let fields: HashMap<String, String> = client
.hgetall(&key)
.await
.map_err(|e| crate::backend_context(e, format!("HGETALL {key}")))?;
if fields.is_empty() {
return Err(SdkError::Config {
context: "read_partition_config".into(),
field: None,
message: "ff:config:partitions not found in Valkey".into(),
});
}
let parse = |field: &str, default: u16| -> u16 {
fields
.get(field)
.and_then(|v| v.parse().ok())
.filter(|&n: &u16| n > 0)
.unwrap_or(default)
};
Ok(PartitionConfig {
num_flow_partitions: parse("num_flow_partitions", 256),
num_budget_partitions: parse("num_budget_partitions", 32),
num_quota_partitions: parse("num_quota_partitions", 32),
})
}