use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::sync::Mutex as AsyncMutex;
use ff_core::keys::IndexKeys;
use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition};
use ff_core::types::{BudgetId, LaneId};
use super::{ScanResult, Scanner};
const BATCH_SIZE: u32 = 100;
const WORKERS_SSCAN_COUNT: usize = 100;
const CAPS_GET_CONCURRENCY: usize = 16;
pub struct UnblockScanner {
interval: Duration,
lanes: Vec<LaneId>,
partition_config: PartitionConfig,
caps_cache: Arc<AsyncMutex<CapsUnionCache>>,
}
struct CapsUnionCache {
snapshot: Option<BTreeSet<String>>,
fetched_at: Option<Instant>,
ttl: Duration,
}
impl UnblockScanner {
pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
Self {
interval,
lanes,
partition_config,
caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
snapshot: None,
fetched_at: None,
ttl: interval,
})),
}
}
}
impl Scanner for UnblockScanner {
fn name(&self) -> &'static str {
"unblock"
}
fn interval(&self) -> Duration {
self.interval
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Execution,
index: partition,
};
let idx = IndexKeys::new(&p);
let mut total_processed: u32 = 0;
let mut total_errors: u32 = 0;
let mut budget_cache: HashMap<String, bool> = HashMap::new();
let caps_cache = self.caps_cache.clone();
for lane in &self.lanes {
let budget_key = idx.lane_blocked_budget(lane);
let r = scan_blocked_set(
client, &p, &idx, lane, &budget_key,
"waiting_for_budget", &mut budget_cache,
&caps_cache,
&self.partition_config,
).await;
total_processed += r.processed;
total_errors += r.errors;
let quota_key = idx.lane_blocked_quota(lane);
let r = scan_blocked_set(
client, &p, &idx, lane, "a_key,
"waiting_for_quota", &mut budget_cache,
&caps_cache,
&self.partition_config,
).await;
total_processed += r.processed;
total_errors += r.errors;
let route_key = idx.lane_blocked_route(lane);
let r = scan_blocked_set(
client, &p, &idx, lane, &route_key,
"waiting_for_capable_worker", &mut budget_cache,
&caps_cache,
&self.partition_config,
).await;
total_processed += r.processed;
total_errors += r.errors;
}
ScanResult {
processed: total_processed,
errors: total_errors,
}
}
}
#[allow(clippy::too_many_arguments)]
async fn scan_blocked_set(
client: &ferriskey::Client,
partition: &Partition,
idx: &IndexKeys,
lane: &LaneId,
blocked_key: &str,
expected_reason: &str,
budget_cache: &mut HashMap<String, bool>,
caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
partition_config: &PartitionConfig,
) -> ScanResult {
let blocked: Vec<String> = match client
.cmd("ZRANGEBYSCORE")
.arg(blocked_key)
.arg("-inf")
.arg("+inf")
.arg("LIMIT")
.arg("0")
.arg(BATCH_SIZE.to_string().as_str())
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
error = %e,
blocked_key,
"unblock_scanner: ZRANGEBYSCORE failed"
);
return ScanResult { processed: 0, errors: 1 };
}
};
if blocked.is_empty() {
return ScanResult { processed: 0, errors: 0 };
}
let mut processed: u32 = 0;
let mut errors: u32 = 0;
let tag = partition.hash_tag();
for eid_str in &blocked {
let core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
let reason: Option<String> = match client
.cmd("HGET")
.arg(&core_key)
.arg("blocking_reason")
.execute()
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!(
execution_id = eid_str.as_str(),
error = %e,
"unblock_scanner: HGET blocking_reason failed, skipping"
);
errors += 1;
continue;
}
};
let reason = reason.unwrap_or_default();
if reason != expected_reason {
continue;
}
let should_unblock = match expected_reason {
"waiting_for_budget" => {
check_budget_cleared(client, &core_key, budget_cache, partition_config).await
}
"waiting_for_quota" => {
check_quota_cleared(client, &core_key, eid_str, partition_config).await
}
"waiting_for_capable_worker" => {
check_route_cleared(client, &core_key, caps_cache).await
}
_ => false,
};
if !should_unblock {
continue;
}
let eligible_key = idx.lane_eligible(lane);
let keys: [&str; 3] = [&core_key, blocked_key, &eligible_key];
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t.to_string(),
Err(e) => {
tracing::warn!(
execution_id = eid_str.as_str(),
error = %e,
"unblock_scanner: server TIME failed, skipping unblock"
);
errors += 1;
continue;
}
};
let argv: [&str; 3] = [eid_str, &now_ms, expected_reason];
match client
.fcall::<ferriskey::Value>("ff_unblock_execution", &keys, &argv)
.await
{
Ok(_) => {
tracing::info!(
execution_id = eid_str.as_str(),
reason = expected_reason,
"unblock_scanner: execution unblocked"
);
processed += 1;
}
Err(e) => {
tracing::warn!(
execution_id = eid_str.as_str(),
error = %e,
"unblock_scanner: ff_unblock_execution failed"
);
errors += 1;
}
}
}
ScanResult { processed, errors }
}
async fn check_budget_cleared(
client: &ferriskey::Client,
core_key: &str,
cache: &mut HashMap<String, bool>,
config: &PartitionConfig,
) -> bool {
let budget_ids_str: Option<String> = client
.cmd("HGET")
.arg(core_key)
.arg("budget_ids")
.execute()
.await
.unwrap_or(None);
let budget_ids_str = match budget_ids_str {
Some(s) if !s.is_empty() => s,
_ => return true, };
for budget_id in budget_ids_str.split(',') {
let budget_id = budget_id.trim();
if budget_id.is_empty() {
continue;
}
if let Some(&breached) = cache.get(budget_id) {
if breached {
return false; }
continue;
}
let breached = is_budget_breached(client, budget_id, config).await;
cache.insert(budget_id.to_owned(), breached);
if breached {
return false;
}
}
true }
async fn is_budget_breached(
client: &ferriskey::Client,
budget_id: &str,
config: &PartitionConfig,
) -> bool {
let bid = match BudgetId::parse(budget_id) {
Ok(id) => id,
Err(_) => return false, };
let partition = budget_partition(&bid, config);
let tag = partition.hash_tag();
let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
let limits: Vec<String> = match client
.cmd("HGETALL")
.arg(&limits_key)
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::error!(
budget_id,
error = %e,
"unblock_scanner: budget limits read failed, keeping blocked (fail-closed)"
);
return true; }
};
let mut i = 0;
while i + 1 < limits.len() {
let field = &limits[i];
let limit_str = &limits[i + 1];
i += 2;
if !field.starts_with("hard:") {
continue;
}
let dimension = &field[5..];
let limit: u64 = match limit_str.parse() {
Ok(v) if v > 0 => v,
_ => continue,
};
let usage_str: Option<String> = match client
.cmd("HGET")
.arg(&usage_key)
.arg(dimension)
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::error!(
budget_id,
dimension,
error = %e,
"unblock_scanner: budget usage read failed, keeping blocked (fail-closed)"
);
return true; }
};
let usage: u64 = usage_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
if usage >= limit {
return true; }
}
false
}
async fn check_quota_cleared(
client: &ferriskey::Client,
core_key: &str,
_eid_str: &str,
config: &PartitionConfig,
) -> bool {
let quota_id: Option<String> = match client
.cmd("HGET")
.arg(core_key)
.arg("quota_policy_id")
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::error!(
core_key,
error = %e,
"unblock_scanner: quota_policy_id read failed, keeping blocked (fail-closed)"
);
return false;
}
};
let quota_id = match quota_id {
Some(s) if !s.is_empty() => s,
_ => return true, };
let qid = match ff_core::types::QuotaPolicyId::parse("a_id) {
Ok(id) => id,
Err(_) => return true, };
let partition = ff_core::partition::quota_partition(&qid, config);
let tag = partition.hash_tag();
let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id);
let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
let def_fields: Vec<Option<String>> = match client
.cmd("HMGET")
.arg("a_def_key)
.arg("max_requests_per_window")
.arg("requests_per_window_seconds")
.arg("active_concurrency_cap")
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::error!(
quota_id = %quota_id,
error = %e,
"unblock_scanner: quota definition read failed, keeping blocked (fail-closed)"
);
return false;
}
};
let rate_limit: u64 = def_fields.first()
.and_then(|v| v.as_ref())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let window_secs: u64 = def_fields.get(1)
.and_then(|v| v.as_ref())
.and_then(|s| s.parse().ok())
.unwrap_or(60);
let concurrency_cap: u64 = def_fields.get(2)
.and_then(|v| v.as_ref())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if rate_limit > 0 {
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t,
Err(_) => return false,
};
let window_ms = window_secs * 1000;
let cutoff = (now_ms.saturating_sub(window_ms)).to_string();
let _: Result<i64, _> = client
.cmd("ZREMRANGEBYSCORE")
.arg(&window_key)
.arg("-inf")
.arg(&cutoff)
.execute()
.await;
let count: i64 = client
.cmd("ZCARD")
.arg(&window_key)
.execute()
.await
.unwrap_or(0);
if count as u64 >= rate_limit {
return false; }
}
if concurrency_cap > 0 {
let active: i64 = client
.cmd("GET")
.arg(&concurrency_key)
.execute()
.await
.unwrap_or(0);
if active as u64 >= concurrency_cap {
return false; }
}
true }
async fn check_route_cleared(
client: &ferriskey::Client,
core_key: &str,
caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
) -> bool {
let required_csv: Option<String> = client
.cmd("HGET")
.arg(core_key)
.arg("required_capabilities")
.execute()
.await
.unwrap_or(None);
let required_csv = match required_csv {
Some(s) if !s.is_empty() => s,
_ => return true, };
let snapshot: BTreeSet<String> = {
let mut guard = caps_cache.lock().await;
let stale = guard
.fetched_at
.map(|t| t.elapsed() >= guard.ttl)
.unwrap_or(true);
if stale {
match load_worker_caps_union(client).await {
Ok(union) => {
guard.snapshot = Some(union);
guard.fetched_at = Some(Instant::now());
}
Err(e) => {
tracing::warn!(
error = %e,
"unblock_scanner: failed to read worker caps union — \
assuming match possible (fail-open to preserve liveness)"
);
return true;
}
}
}
guard.snapshot.clone().unwrap_or_default()
};
required_csv
.split(',')
.filter(|t| !t.is_empty())
.all(|t| snapshot.contains(t))
}
async fn load_worker_caps_union(
client: &ferriskey::Client,
) -> Result<BTreeSet<String>, ferriskey::Error> {
let mut union = BTreeSet::new();
let index_key = ff_core::keys::workers_index_key();
fn absorb(
union: &mut BTreeSet<String>,
res: Result<Option<String>, ferriskey::Error>,
) -> Result<(), ferriskey::Error> {
let csv = res?;
if let Some(csv) = csv {
for token in csv.split(',') {
if !token.is_empty() {
union.insert(token.to_owned());
}
}
}
Ok(())
}
let mut cursor: String = "0".to_owned();
loop {
let reply: (String, Vec<String>) = client
.cmd("SSCAN")
.arg(&index_key)
.arg(&cursor)
.arg("COUNT")
.arg(WORKERS_SSCAN_COUNT.to_string().as_str())
.execute()
.await?;
cursor = reply.0;
let worker_ids = reply.1;
let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
for id in worker_ids {
let client = client.clone();
pending.push(async move {
let caps_key = format!("ff:worker:{}:caps", id);
let csv: Option<String> = client
.cmd("GET")
.arg(&caps_key)
.execute()
.await?;
Ok::<Option<String>, ferriskey::Error>(csv)
});
if pending.len() >= CAPS_GET_CONCURRENCY
&& let Some(res) = pending.next().await
{
absorb(&mut union, res)?;
}
}
while let Some(res) = pending.next().await {
absorb(&mut union, res)?;
}
if cursor == "0" {
break;
}
}
Ok(union)
}