use ff_core::keys::{ExecKeyContext, IndexKeys};
use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition, quota_partition};
use ff_core::types::{BudgetId, ExecutionId, LaneId, QuotaPolicyId, WorkerId, WorkerInstanceId};
use ff_script::error::ScriptError;
use ff_script::result::FcallResult;
use ff_script::retry::is_retryable_kind;
use std::collections::BTreeSet;
fn caps_subset(required_csv: &str, worker_caps: &BTreeSet<String>) -> bool {
required_csv
.split(',')
.filter(|t| !t.is_empty())
.all(|t| worker_caps.contains(t))
}
fn worker_caps_digest(csv: &str) -> String {
ff_core::hash::fnv1a_xor8hex(csv)
}
pub use ff_core::contracts::ClaimGrant;
pub use ff_core::contracts::ReclaimGrant;
#[derive(Debug)]
pub enum BudgetCheckResult {
Ok,
HardBreach { dimension: String, detail: String },
}
enum QuotaCheckOutcome {
NoQuota,
Admitted { tag: String, quota_id: String, eid: String },
Blocked(String),
}
pub struct BudgetChecker {
cache: std::collections::HashMap<String, BudgetCheckResult>,
config: PartitionConfig,
}
impl BudgetChecker {
pub fn new(config: PartitionConfig) -> Self {
Self {
cache: std::collections::HashMap::new(),
config,
}
}
pub async fn check_budget(
&mut self,
client: &ferriskey::Client,
budget_id: &str,
) -> &BudgetCheckResult {
if self.cache.contains_key(budget_id) {
return &self.cache[budget_id];
}
let (usage_key, limits_key) = match BudgetId::parse(budget_id) {
Ok(bid) => {
let partition = budget_partition(&bid, &self.config);
let tag = partition.hash_tag();
(
format!("ff:budget:{}:{}:usage", tag, budget_id),
format!("ff:budget:{}:{}:limits", tag, budget_id),
)
}
Err(_) => {
(
format!("ff:budget:{{b:0}}:{}:usage", budget_id),
format!("ff:budget:{{b:0}}:{}:limits", budget_id),
)
}
};
let result = match Self::read_and_check(client, &usage_key, &limits_key).await {
Ok(r) => r,
Err(e) => {
tracing::warn!(
budget_id,
error = %e,
"budget_checker: failed to read budget, allowing (advisory)"
);
BudgetCheckResult::Ok
}
};
self.cache.insert(budget_id.to_owned(), result);
&self.cache[budget_id]
}
async fn read_and_check(
client: &ferriskey::Client,
usage_key: &str,
limits_key: &str,
) -> Result<BudgetCheckResult, ferriskey::Error> {
let limits: std::collections::HashMap<String, String> = client
.hgetall(limits_key)
.await?;
for (field, limit_val) in &limits {
if !field.starts_with("hard:") {
continue;
}
let dimension = &field[5..]; let limit: u64 = match limit_val.parse() {
Ok(v) if v > 0 => v,
_ => continue,
};
let usage_str: Option<String> = client
.cmd("HGET")
.arg(usage_key)
.arg(dimension)
.execute()
.await
.unwrap_or(None);
let usage: u64 = usage_str
.as_deref()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if usage >= limit {
return Ok(BudgetCheckResult::HardBreach {
dimension: dimension.to_owned(),
detail: format!("budget {}: {} {}/{}", usage_key, dimension, usage, limit),
});
}
}
Ok(BudgetCheckResult::Ok)
}
pub fn reset(&mut self) {
self.cache.clear();
}
}
pub struct Scheduler {
client: ferriskey::Client,
config: PartitionConfig,
}
impl Scheduler {
pub fn new(client: ferriskey::Client, config: PartitionConfig) -> Self {
Self { client, config }
}
pub async fn claim_for_worker(
&self,
lane: &LaneId,
worker_id: &WorkerId,
worker_instance_id: &WorkerInstanceId,
worker_capabilities: &BTreeSet<String>,
grant_ttl_ms: u64,
) -> Result<Option<ClaimGrant>, SchedulerError> {
let num_partitions = self.config.num_flow_partitions;
let mut budget_checker = BudgetChecker::new(self.config);
let start_p: u16 = ff_core::hash::fnv1a_u16_mod(
worker_instance_id.as_str(),
num_partitions,
);
for cap in worker_capabilities {
if cap.is_empty() {
return Err(SchedulerError::Config(
"capability token must not be empty".to_owned(),
));
}
if cap.contains(',') {
return Err(SchedulerError::Config(format!(
"capability token may not contain ',' (CSV delimiter): {cap:?}"
)));
}
if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
return Err(SchedulerError::Config(format!(
"capability token must not contain whitespace or control characters: {cap:?}"
)));
}
}
if worker_capabilities.len() > ff_core::policy::CAPS_MAX_TOKENS {
return Err(SchedulerError::Config(format!(
"capability set exceeds CAPS_MAX_TOKENS ({}): {}",
ff_core::policy::CAPS_MAX_TOKENS,
worker_capabilities.len()
)));
}
let worker_caps_csv = worker_capabilities
.iter()
.filter(|s| !s.is_empty())
.cloned()
.collect::<Vec<_>>()
.join(",");
let worker_caps_hash = worker_caps_digest(&worker_caps_csv);
if worker_caps_csv.len() > ff_core::policy::CAPS_MAX_BYTES {
return Err(SchedulerError::Config(format!(
"capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
ff_core::policy::CAPS_MAX_BYTES,
worker_caps_csv.len()
)));
}
for offset in 0..num_partitions {
let p_idx = (start_p + offset) % num_partitions;
let partition = Partition {
family: PartitionFamily::Execution,
index: p_idx,
};
let idx = IndexKeys::new(&partition);
let eligible_key = idx.lane_eligible(lane);
let candidates: Vec<String> = match self
.client
.cmd("ZRANGEBYSCORE")
.arg(&eligible_key)
.arg("-inf")
.arg("+inf")
.arg("LIMIT")
.arg("0")
.arg("1")
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
partition = p_idx,
error = %e,
"scheduler: ZRANGEBYSCORE eligible failed, skipping partition"
);
continue;
}
};
let eid_str = match candidates.first() {
Some(s) => s,
None => continue, };
let eid = match ExecutionId::parse(eid_str) {
Ok(id) => id,
Err(e) => {
tracing::warn!(
partition = p_idx,
execution_id = eid_str.as_str(),
error = %e,
"scheduler: invalid execution_id in eligible set, skipping"
);
continue;
}
};
let exec_ctx = ExecKeyContext::new(&partition, &eid);
let core_key = exec_ctx.core();
let eid_s = eid.to_string();
let now_ms = match server_time_ms(&self.client).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(
partition = p_idx,
error = %e,
"scheduler: failed to get server time, skipping partition"
);
continue;
}
};
let required_caps_csv: Option<String> = match self
.client
.cmd("HGET")
.arg(&core_key)
.arg("required_capabilities")
.execute::<Option<String>>()
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(
partition = p_idx,
execution_id = eid_s.as_str(),
error = %e,
"scheduler: HGET required_capabilities failed, skipping candidate"
);
continue;
}
};
if let Some(req) = required_caps_csv.as_deref()
&& !req.is_empty()
&& !caps_subset(req, worker_capabilities)
{
tracing::info!(
partition = p_idx,
execution_id = eid_s.as_str(),
worker_id = worker_id.as_str(),
worker_caps_hash = worker_caps_hash.as_str(),
required = req,
"scheduler: capability mismatch, blocking execution off eligible"
);
self.block_candidate(
&partition, &idx, lane, &eid, &eligible_key,
"waiting_for_capable_worker",
"no connected worker satisfies required_capabilities",
now_ms,
).await;
continue;
}
if let Some(block_detail) = self
.check_budgets(&mut budget_checker, &exec_ctx, &core_key, &eid_s)
.await?
{
self.block_candidate(
&partition, &idx, lane, &eid, &eligible_key,
"waiting_for_budget", &block_detail, now_ms,
).await;
continue;
}
let quota_admission = self
.check_quota(&exec_ctx, &core_key, &eid_s, now_ms)
.await?;
match "a_admission {
QuotaCheckOutcome::Blocked(block_detail) => {
self.block_candidate(
&partition, &idx, lane, &eid, &eligible_key,
"waiting_for_quota", block_detail, now_ms,
).await;
continue;
}
QuotaCheckOutcome::NoQuota | QuotaCheckOutcome::Admitted { .. } => {}
}
let grant_key = exec_ctx.claim_grant();
let keys: [&str; 3] = [&core_key, &grant_key, &eligible_key];
let ttl_str = grant_ttl_ms.to_string();
let wid_s = worker_id.to_string();
let wiid_s = worker_instance_id.to_string();
let lane_s = lane.to_string();
let argv: [&str; 9] = [
&eid_s,
&wid_s,
&wiid_s,
&lane_s,
"", &ttl_str,
"", "", &worker_caps_csv, ];
let raw = match self
.client
.fcall::<ferriskey::Value>("ff_issue_claim_grant", &keys, &argv)
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(
partition = p_idx,
execution_id = eid_s.as_str(),
error = %e,
"scheduler: ff_issue_claim_grant transport error, trying next"
);
if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
self.release_admission(tag, quota_id, eid).await;
}
continue;
}
};
match FcallResult::parse(&raw).and_then(|r| r.into_success()) {
Ok(_) => {
tracing::debug!(
partition = p_idx,
execution_id = eid_s.as_str(),
"scheduler: claim grant issued"
);
return Ok(Some(ClaimGrant {
execution_id: eid,
partition,
grant_key: grant_key.clone(),
expires_at_ms: now_ms + grant_ttl_ms,
}));
}
Err(script_err) => {
if matches!(script_err, ScriptError::CapabilityMismatch(_)) {
tracing::info!(
partition = p_idx,
execution_id = eid_s.as_str(),
worker_id = wid_s.as_str(),
worker_caps_hash = worker_caps_hash.as_str(),
error = %script_err,
"scheduler: capability mismatch via Lua (race), blocking execution"
);
self.block_candidate(
&partition, &idx, lane, &eid, &eligible_key,
"waiting_for_capable_worker",
"no connected worker satisfies required_capabilities",
now_ms,
).await;
if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
self.release_admission(tag, quota_id, eid).await;
}
continue;
} else {
tracing::warn!(
partition = p_idx,
execution_id = eid_s.as_str(),
error = %script_err,
"scheduler: ff_issue_claim_grant rejected, trying next"
);
}
if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
self.release_admission(tag, quota_id, eid).await;
}
continue;
}
}
}
Ok(None)
}
async fn check_budgets(
&self,
checker: &mut BudgetChecker,
_exec_ctx: &ExecKeyContext,
core_key: &str,
_eid_s: &str,
) -> Result<Option<String>, SchedulerError> {
let budget_ids_str: Option<String> = self
.client
.cmd("HGET")
.arg(core_key)
.arg("budget_ids")
.execute()
.await?;
let budget_ids_str = match budget_ids_str {
Some(s) => s,
None => return Ok(None),
};
if budget_ids_str.is_empty() {
return Ok(None); }
for budget_id in budget_ids_str.split(',') {
let budget_id = budget_id.trim();
if budget_id.is_empty() {
continue;
}
let result = checker.check_budget(&self.client, budget_id).await;
if let BudgetCheckResult::HardBreach { detail, .. } = result {
return Ok(Some(detail.clone()));
}
}
Ok(None)
}
async fn check_quota(
&self,
_exec_ctx: &ExecKeyContext,
core_key: &str,
eid_s: &str,
now_ms: u64,
) -> Result<QuotaCheckOutcome, SchedulerError> {
let quota_id_str: Option<String> = self
.client
.cmd("HGET")
.arg(core_key)
.arg("quota_policy_id")
.execute()
.await?;
let quota_id_str = match quota_id_str {
Some(s) => s,
None => return Ok(QuotaCheckOutcome::NoQuota),
};
if quota_id_str.is_empty() {
return Ok(QuotaCheckOutcome::NoQuota);
}
let tag = match QuotaPolicyId::parse("a_id_str) {
Ok(qid) => {
let partition = quota_partition(&qid, &self.config);
partition.hash_tag()
}
Err(_) => "{q:0}".to_owned(), };
let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id_str);
let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id_str);
let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id_str);
let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id_str, eid_s);
let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id_str);
let rate_limit: Option<String> = self.client
.cmd("HGET").arg("a_def_key).arg("max_requests_per_window")
.execute().await?;
let window_secs: Option<String> = self.client
.cmd("HGET").arg("a_def_key).arg("requests_per_window_seconds")
.execute().await?;
let concurrency_cap: Option<String> = self.client
.cmd("HGET").arg("a_def_key).arg("active_concurrency_cap")
.execute().await?;
let jitter: Option<String> = self.client
.cmd("HGET").arg("a_def_key).arg("jitter_ms")
.execute().await?;
let rate_limit = rate_limit.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
let window_secs = window_secs.as_deref().and_then(|s| s.parse().ok()).unwrap_or(60u64);
let concurrency_cap = concurrency_cap.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
let jitter_ms = jitter.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
if rate_limit == 0 && concurrency_cap == 0 {
return Ok(QuotaCheckOutcome::NoQuota);
}
let keys: [&str; 5] = [&window_key, &concurrency_key, "a_def_key, &admitted_key, &admitted_set_key];
let now_s = now_ms.to_string();
let ws = window_secs.to_string();
let rl = rate_limit.to_string();
let cc = concurrency_cap.to_string();
let jt = jitter_ms.to_string();
let argv: [&str; 6] = [&now_s, &ws, &rl, &cc, eid_s, &jt];
match self.client
.fcall::<ferriskey::Value>("ff_check_admission_and_record", &keys, &argv)
.await
{
Ok(result) => {
let status = Self::parse_admission_status(&result);
match status.as_str() {
"ADMITTED" | "ALREADY_ADMITTED" => Ok(QuotaCheckOutcome::Admitted {
tag: tag.clone(),
quota_id: quota_id_str.clone(),
eid: eid_s.to_owned(),
}),
"RATE_EXCEEDED" => Ok(QuotaCheckOutcome::Blocked(format!(
"quota {}: rate limit {}/{} per {}s window",
quota_id_str, rate_limit, rate_limit, window_secs
))),
"CONCURRENCY_EXCEEDED" => Ok(QuotaCheckOutcome::Blocked(format!(
"quota {}: concurrency cap {}",
quota_id_str, concurrency_cap
))),
_ => {
tracing::warn!(
quota_id = quota_id_str.as_str(),
status = status.as_str(),
"scheduler: unexpected admission result"
);
Ok(QuotaCheckOutcome::NoQuota)
}
}
}
Err(e) => {
tracing::warn!(
quota_id = quota_id_str.as_str(),
error = %e,
"scheduler: quota FCALL failed, allowing (advisory)"
);
Ok(QuotaCheckOutcome::NoQuota) }
}
}
fn parse_admission_status(result: &ferriskey::Value) -> String {
match result {
ferriskey::Value::Array(arr) => {
match arr.first() {
Some(Ok(ferriskey::Value::BulkString(b))) => {
String::from_utf8_lossy(b).into_owned()
}
Some(Ok(ferriskey::Value::SimpleString(s))) => s.clone(),
_ => "UNKNOWN".to_owned(),
}
}
_ => "UNKNOWN".to_owned(),
}
}
#[allow(clippy::too_many_arguments)]
async fn block_candidate(
&self,
partition: &Partition,
idx: &IndexKeys,
lane: &LaneId,
eid: &ExecutionId,
eligible_key: &str,
block_reason: &str,
blocking_detail: &str,
now_ms: u64,
) {
let exec_ctx = ExecKeyContext::new(partition, eid);
let core_key = exec_ctx.core();
let eid_s = eid.to_string();
let blocked_key = match block_reason {
"waiting_for_budget" => idx.lane_blocked_budget(lane),
"waiting_for_quota" => idx.lane_blocked_quota(lane),
"waiting_for_capable_worker" => idx.lane_blocked_route(lane),
_ => idx.lane_blocked_budget(lane),
};
let keys: [&str; 3] = [&core_key, eligible_key, &blocked_key];
let now_s = now_ms.to_string();
let argv: [&str; 4] = [&eid_s, block_reason, blocking_detail, &now_s];
match self.client
.fcall::<ferriskey::Value>("ff_block_execution_for_admission", &keys, &argv)
.await
{
Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
Ok(_) => {
tracing::info!(
execution_id = eid_s,
reason = block_reason,
"scheduler: candidate blocked by admission check"
);
}
Err(script_err) => {
tracing::warn!(
execution_id = eid_s,
reason = block_reason,
error = %script_err,
"scheduler: ff_block_execution_for_admission rejected by Lua"
);
}
},
Err(e) => {
tracing::warn!(
execution_id = eid_s,
error = %e,
"scheduler: ff_block_execution_for_admission transport failed"
);
}
}
}
async fn release_admission(
&self,
tag: &str,
quota_id: &str,
eid_s: &str,
) {
let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid_s);
let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
let keys: [&str; 3] = [&admitted_key, &admitted_set_key, &concurrency_key];
let argv: [&str; 1] = [eid_s];
match self.client
.fcall::<ferriskey::Value>("ff_release_admission", &keys, &argv)
.await
{
Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
Ok(_) => {
tracing::info!(
execution_id = eid_s,
quota_id,
"scheduler: released admission after claim failure"
);
}
Err(script_err) => {
tracing::warn!(
execution_id = eid_s,
quota_id,
error = %script_err,
"scheduler: ff_release_admission rejected by Lua \
(slot will expire via TTL)"
);
}
},
Err(e) => {
tracing::warn!(
execution_id = eid_s,
quota_id,
error = %e,
"scheduler: ff_release_admission transport failed \
(slot will expire via TTL)"
);
}
}
}
}
async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
let result: Vec<String> = client
.cmd("TIME")
.execute()
.await?;
if result.len() < 2 {
return Err(ferriskey::Error::from((
ferriskey::ErrorKind::ClientError,
"TIME returned fewer than 2 elements",
)));
}
let secs: u64 = result[0].parse().map_err(|_| {
ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
})?;
let micros: u64 = result[1].parse().map_err(|_| {
ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
})?;
Ok(secs * 1000 + micros / 1000)
}
#[derive(Debug, thiserror::Error)]
pub enum SchedulerError {
#[error("valkey: {0}")]
Valkey(#[from] ferriskey::Error),
#[error("valkey ({context}): {source}")]
ValkeyContext {
#[source]
source: ferriskey::Error,
context: String,
},
#[error("config: {0}")]
Config(String),
}
impl SchedulerError {
pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
match self {
Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
Self::Config(_) => None,
}
}
pub fn is_retryable(&self) -> bool {
self.valkey_kind()
.map(is_retryable_kind)
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use ferriskey::ErrorKind;
fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
ferriskey::Error::from((kind, "synthetic"))
}
#[test]
fn scheduler_is_retryable_matches_kind_table() {
assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
}
#[test]
fn scheduler_valkey_context_is_retryable() {
let err = SchedulerError::ValkeyContext {
source: mk_fk_err(ErrorKind::BusyLoadingError),
context: "HGET budget_ids".into(),
};
assert!(err.is_retryable());
}
#[test]
fn scheduler_valkey_kind_exposed() {
let err = SchedulerError::Valkey(mk_fk_err(ErrorKind::TryAgain));
assert_eq!(err.valkey_kind(), Some(ErrorKind::TryAgain));
}
}