Skip to main content

ff_engine/scanner/
unblock.rs

1//! Unblock scanner for budget/quota/capability-blocked executions.
2//!
3//! Scans `ff:idx:{p:N}:lane:<lane>:blocked:{budget,quota,route}` per
4//! execution partition. For each blocked execution, re-evaluates the
5//! blocking condition. If cleared, calls `FCALL ff_unblock_execution`.
6//!
7//! Cross-partition budget check is cached per scan cycle (MANDATORY —
8//! without it, 50K blocked executions = 50K budget reads).
9//!
10//! Capability sweep reads the union of non-authoritative worker cap
11//! HASHes (`ff:worker:{ns}:{instance_id}:caps` — written by
12//! `ff-sdk::FlowFabricWorker::connect` and by the `ff_register_worker`
13//! FCALL) ONCE per scan cycle PER NAMESPACE, and uses it to decide
14//! whether a `waiting_for_capable_worker` execution has a matching
15//! worker. This is best-effort: caps HASHes may be slightly stale
16//! (TTL'd, refreshed on connect), but the promotion path is
17//! self-correcting — a promoted execution that still can't be claimed
18//! gets re-blocked on the next scheduler tick. RFC-009 §7.5 documents
19//! the v1 sweep approach and defers connect-triggered sweeps to V2.
20//!
21//! RFC-025 Phase 5 cutover: caps reads go through the namespace-scoped
22//! `ff:idx:{ns}:workers` + `ff:worker:{ns}:{id}:caps` helpers; the
23//! cache keys off `Namespace` so multi-tenant deployments don't mix
24//! capability sets across tenants.
25//!
26//! MUST skip `paused_by_flow_cancel` — only cancel_flow clears that.
27//!
28//! Reference: RFC-008 §2.4, RFC-009 §7.5, RFC-010 §6
29
30use std::collections::{BTreeSet, HashMap};
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34use futures::stream::{FuturesUnordered, StreamExt};
35use tokio::sync::Mutex as AsyncMutex;
36
37use ff_core::backend::ScannerFilter;
38use ff_core::engine_backend::EngineBackend;
39use ff_core::keys::IndexKeys;
40use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition};
41use ff_core::types::{BudgetId, ExecutionId, LaneId, Namespace, TimestampMs};
42
43use super::{should_skip_candidate, ScanResult, Scanner};
44
45const BATCH_SIZE: u32 = 100;
46
47/// SSCAN page size for the workers-index SET. Same COUNT the other
48/// index-SET scanners use (budget_reconciler, flow_projector). Bounds
49/// per-cursor round-trip response size; total iteration is still the
50/// full SET size across cursor=0 wrap.
51const WORKERS_SSCAN_COUNT: usize = 100;
52
53/// Per-worker caps GET fan-out concurrency cap. Mirrors the bounded
54/// parallelism W1 used in initialize_waitpoint_hmac_secret. Too low and
55/// large fleets (1000+ workers) pay serial round-trip latency; too high
56/// and we head-of-line the scanner client with a pipeline burst. 16 is
57/// a pragmatic middle for the current fleet scales.
58const CAPS_GET_CONCURRENCY: usize = 16;
59
60pub struct UnblockScanner {
61    interval: Duration,
62    lanes: Vec<LaneId>,
63    partition_config: PartitionConfig,
64    filter: ScannerFilter,
65    /// Shared worker-caps union cache across ALL partitions in one scan
66    /// pass. Previously this cache was declared inside `scan_partition`
67    /// which runs once PER PARTITION — at 256 partitions that meant up
68    /// to 256 redundant SSCAN + fan-out GET sequences per scan interval.
69    /// Now: one load per TTL window (= `interval`), shared across every
70    /// partition visited in that window. `TTL == interval` is natural:
71    /// a worker connecting "now" propagates into the caps union on the
72    /// next scan cycle, not faster or slower than the cycle itself.
73    ///
74    /// `Arc<AsyncMutex<_>>` because the Scanner trait's `scan_partition`
75    /// takes `&self`. Contention is bounded by the partition iteration
76    /// cadence (one partition at a time per scanner task), so the mutex
77    /// is effectively uncontended in steady state.
78    caps_cache: Arc<AsyncMutex<CapsUnionCache>>,
79    backend: Option<Arc<dyn EngineBackend>>,
80}
81
82/// Per-namespace worker-caps union cache. Every blocked execution
83/// carries a `namespace` field on `exec_core`; the scanner reads that
84/// alongside `blocking_reason` and routes the subset check to the
85/// union of caps advertised by workers in THAT namespace only. Absent
86/// this partitioning, a tenant-A worker advertising
87/// `required_caps=[gpu]` would unblock a tenant-B execution waiting on
88/// the same token.
89struct CapsUnionCache {
90    /// TTL applied uniformly to every namespace entry — same cadence
91    /// as `UnblockScanner::interval`, so a freshly-connected worker
92    /// propagates into its namespace's union on the next scan cycle.
93    ttl: Duration,
94    /// Entry per observed namespace. Grows as new namespaces surface
95    /// on blocked executions; entries do NOT expire (the bounded fleet
96    /// of namespaces is small and refreshing an unused entry on next
97    /// visit costs one SSCAN). Cleared on scanner restart.
98    entries: HashMap<Namespace, CapsUnionEntry>,
99}
100
101struct CapsUnionEntry {
102    snapshot: Option<BTreeSet<String>>,
103    fetched_at: Option<Instant>,
104}
105
106impl UnblockScanner {
107    pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
108        Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
109    }
110
111    /// Construct with a [`ScannerFilter`] applied per candidate
112    /// (issue #122).
113    pub fn with_filter(
114        interval: Duration,
115        lanes: Vec<LaneId>,
116        partition_config: PartitionConfig,
117        filter: ScannerFilter,
118    ) -> Self {
119        Self {
120            interval,
121            lanes,
122            partition_config,
123            filter,
124            caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
125                ttl: interval,
126                entries: HashMap::new(),
127            })),
128            backend: None,
129        }
130    }
131
132    /// PR-7b Cluster 1: wire an `EngineBackend` for filter-resolution
133    /// reads. FCALL routing is cluster 2 scope.
134    pub fn with_filter_and_backend(
135        interval: Duration,
136        lanes: Vec<LaneId>,
137        partition_config: PartitionConfig,
138        filter: ScannerFilter,
139        backend: Arc<dyn EngineBackend>,
140    ) -> Self {
141        Self {
142            interval,
143            lanes,
144            partition_config,
145            filter,
146            caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
147                ttl: interval,
148                entries: HashMap::new(),
149            })),
150            backend: Some(backend),
151        }
152    }
153}
154
155impl Scanner for UnblockScanner {
156    fn name(&self) -> &'static str {
157        "unblock"
158    }
159
160    fn interval(&self) -> Duration {
161        self.interval
162    }
163
164    fn filter(&self) -> &ScannerFilter {
165        &self.filter
166    }
167
168    async fn scan_partition(
169        &self,
170        client: &ferriskey::Client,
171        partition: u16,
172    ) -> ScanResult {
173        let p = Partition {
174            family: PartitionFamily::Execution,
175            index: partition,
176        };
177        let idx = IndexKeys::new(&p);
178
179        let mut total_processed: u32 = 0;
180        let mut total_errors: u32 = 0;
181
182        // Cross-partition budget cache: budget_id → is_breached.
183        // Reset per partition scan (each partition scan is one "cycle").
184        let mut budget_cache: HashMap<String, bool> = HashMap::new();
185
186        // Worker-caps union cache is shared across ALL partitions via
187        // the scanner struct (Arc<AsyncMutex<CapsUnionCache>>). `get_or_load`
188        // returns the cached snapshot if its fetched_at is within
189        // `interval` (TTL == scan interval), otherwise loads fresh via
190        // SSCAN + concurrent GET fan-out. Without this, at 256 partitions
191        // the old per-partition-local cache re-ran load_worker_caps_union
192        // up to 256× per cycle.
193        let caps_cache = self.caps_cache.clone();
194
195        for lane in &self.lanes {
196            // Scan blocked:budget
197            let budget_key = idx.lane_blocked_budget(lane);
198            let r = scan_blocked_set(
199                client, self.backend.as_ref(), &p, &idx, lane, &budget_key,
200                "waiting_for_budget", &mut budget_cache,
201                &caps_cache,
202                &self.partition_config,
203                &self.filter,
204            ).await;
205            total_processed += r.processed;
206            total_errors += r.errors;
207
208            // Scan blocked:quota
209            let quota_key = idx.lane_blocked_quota(lane);
210            let r = scan_blocked_set(
211                client, self.backend.as_ref(), &p, &idx, lane, &quota_key,
212                "waiting_for_quota", &mut budget_cache,
213                &caps_cache,
214                &self.partition_config,
215                &self.filter,
216            ).await;
217            total_processed += r.processed;
218            total_errors += r.errors;
219
220            // Scan blocked:route (capability-mismatch blocks). Promotion
221            // decision reads the union of connected workers' caps and
222            // checks subset coverage. See check_route_cleared below.
223            let route_key = idx.lane_blocked_route(lane);
224            let r = scan_blocked_set(
225                client, self.backend.as_ref(), &p, &idx, lane, &route_key,
226                "waiting_for_capable_worker", &mut budget_cache,
227                &caps_cache,
228                &self.partition_config,
229                &self.filter,
230            ).await;
231            total_processed += r.processed;
232            total_errors += r.errors;
233        }
234
235        ScanResult {
236            processed: total_processed,
237            errors: total_errors,
238        }
239    }
240}
241
242/// Scan one blocked set and unblock executions whose condition has cleared.
243#[allow(clippy::too_many_arguments)]
244async fn scan_blocked_set(
245    client: &ferriskey::Client,
246    backend: Option<&Arc<dyn EngineBackend>>,
247    partition: &Partition,
248    idx: &IndexKeys,
249    lane: &LaneId,
250    blocked_key: &str,
251    expected_reason: &str,
252    budget_cache: &mut HashMap<String, bool>,
253    caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
254    partition_config: &PartitionConfig,
255    filter: &ScannerFilter,
256) -> ScanResult {
257    // Read all members from the blocked set (they're scored by block time)
258    let blocked: Vec<String> = match client
259        .cmd("ZRANGEBYSCORE")
260        .arg(blocked_key)
261        .arg("-inf")
262        .arg("+inf")
263        .arg("LIMIT")
264        .arg("0")
265        .arg(BATCH_SIZE.to_string().as_str())
266        .execute()
267        .await
268    {
269        Ok(ids) => ids,
270        Err(e) => {
271            tracing::warn!(
272                error = %e,
273                blocked_key,
274                "unblock_scanner: ZRANGEBYSCORE failed"
275            );
276            return ScanResult { processed: 0, errors: 1 };
277        }
278    };
279
280    if blocked.is_empty() {
281        return ScanResult { processed: 0, errors: 0 };
282    }
283
284    let mut processed: u32 = 0;
285    let mut errors: u32 = 0;
286    let tag = partition.hash_tag();
287
288    for eid_str in &blocked {
289        if should_skip_candidate(backend, filter, partition.index, eid_str).await {
290            continue;
291        }
292        // Read blocking_reason + namespace from exec_core. Namespace
293        // gates the caps-union subset check so tenant-A worker caps
294        // never unblock a tenant-B `blocked_by_route` execution —
295        // RFC-025 Phase 5 multi-tenant safety property.
296        let core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
297        let fields: Vec<Option<String>> = match client
298            .cmd("HMGET")
299            .arg(&core_key)
300            .arg("blocking_reason")
301            .arg("namespace")
302            .execute()
303            .await
304        {
305            Ok(r) => r,
306            Err(e) => {
307                tracing::warn!(
308                    execution_id = eid_str.as_str(),
309                    error = %e,
310                    "unblock_scanner: HMGET blocking_reason/namespace failed, skipping"
311                );
312                errors += 1;
313                continue;
314            }
315        };
316        if fields.len() < 2 {
317            tracing::warn!(
318                execution_id = eid_str.as_str(),
319                returned_fields = fields.len(),
320                "unblock_scanner: HMGET returned < 2 fields, skipping"
321            );
322            errors += 1;
323            continue;
324        }
325        let reason = fields[0].as_deref().unwrap_or("");
326        let namespace = match fields[1].as_deref().filter(|s| !s.is_empty()) {
327            Some(ns_str) => Namespace::new(ns_str),
328            None => {
329                // Every execution gets a namespace at create-time
330                // (`ff_create_execution` HSETs it). Missing here = data
331                // integrity defect; skip rather than risk a
332                // cross-tenant promotion.
333                tracing::warn!(
334                    execution_id = eid_str.as_str(),
335                    core_key = %core_key,
336                    "unblock_scanner: exec_core missing namespace field, skipping"
337                );
338                errors += 1;
339                continue;
340            }
341        };
342
343        // Skip if not blocked by the expected reason (e.g. paused_by_flow_cancel)
344        if reason != expected_reason {
345            continue;
346        }
347
348        // Re-evaluate the blocking condition
349        let should_unblock = match expected_reason {
350            "waiting_for_budget" => {
351                check_budget_cleared(client, &core_key, budget_cache, partition_config).await
352            }
353            "waiting_for_quota" => {
354                check_quota_cleared(client, backend, &core_key, eid_str, partition_config).await
355            }
356            "waiting_for_capable_worker" => {
357                check_route_cleared(client, &core_key, caps_cache, &namespace).await
358            }
359            _ => false,
360        };
361
362        if !should_unblock {
363            continue;
364        }
365
366        // Unblock: trait-route → `EngineBackend::unblock_execution`
367        // (Valkey: wraps `ff_unblock_execution` FCALL on {p:N}).
368        let now_ms_res: Result<u64, String> = if let Some(b) = backend {
369            b.server_time_ms().await.map_err(|e| e.to_string())
370        } else {
371            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
372        };
373        let now_ms = match now_ms_res {
374            Ok(t) => t,
375            Err(e) => {
376                tracing::warn!(
377                    execution_id = eid_str.as_str(),
378                    error = %e,
379                    "unblock_scanner: server TIME failed, skipping unblock"
380                );
381                errors += 1;
382                continue;
383            }
384        };
385
386        let res = if let Some(backend_arc) = backend {
387            let Ok(eid) = ExecutionId::parse(eid_str) else {
388                tracing::warn!(execution_id = eid_str.as_str(), "malformed eid; skipping");
389                continue;
390            };
391            backend_arc
392                .unblock_execution(
393                    *partition,
394                    lane,
395                    &eid,
396                    expected_reason,
397                    TimestampMs::from_millis(now_ms as i64),
398                )
399                .await
400                .map_err(|e| e.to_string())
401        } else {
402            // Test-only fallback: direct FCALL on the scanner client.
403            // Mirrors cluster-1 lease_expiry. KEYS(3)/ARGV(3) identical
404            // to the Valkey impl.
405            let eligible_key = idx.lane_eligible(lane);
406            let keys: [&str; 3] = [&core_key, blocked_key, &eligible_key];
407            let now_s = now_ms.to_string();
408            let argv: [&str; 3] = [eid_str, &now_s, expected_reason];
409            client
410                .fcall::<ferriskey::Value>("ff_unblock_execution", &keys, &argv)
411                .await
412                .map(|_: ferriskey::Value| ())
413                .map_err(|e| e.to_string())
414        };
415
416        match res {
417            Ok(()) => {
418                tracing::info!(
419                    execution_id = eid_str.as_str(),
420                    reason = expected_reason,
421                    "unblock_scanner: execution unblocked"
422                );
423                processed += 1;
424            }
425            Err(e) => {
426                tracing::warn!(
427                    execution_id = eid_str.as_str(),
428                    error = %e,
429                    "unblock_scanner: unblock_execution failed"
430                );
431                errors += 1;
432            }
433        }
434    }
435
436    ScanResult { processed, errors }
437}
438
439/// Check if budget blocking condition has cleared.
440/// Uses cross-partition cache to avoid redundant reads.
441async fn check_budget_cleared(
442    client: &ferriskey::Client,
443    core_key: &str,
444    cache: &mut HashMap<String, bool>,
445    config: &PartitionConfig,
446) -> bool {
447    // Read budget_ids from exec_core
448    let budget_ids_str: Option<String> = client
449        .cmd("HGET")
450        .arg(core_key)
451        .arg("budget_ids")
452        .execute()
453        .await
454        .unwrap_or(None);
455
456    let budget_ids_str = match budget_ids_str {
457        Some(s) if !s.is_empty() => s,
458        _ => return true, // no budgets → unblock
459    };
460
461    for budget_id in budget_ids_str.split(',') {
462        let budget_id = budget_id.trim();
463        if budget_id.is_empty() {
464            continue;
465        }
466
467        // Check cache first
468        if let Some(&breached) = cache.get(budget_id) {
469            if breached {
470                return false; // still breached
471            }
472            continue;
473        }
474
475        // Read from Valkey and cache
476        let breached = is_budget_breached(client, budget_id, config).await;
477        cache.insert(budget_id.to_owned(), breached);
478        if breached {
479            return false;
480        }
481    }
482
483    true // all budgets within limits
484}
485
486/// Read budget usage + limits and check if any hard limit is breached.
487/// Computes real {b:M} partition tag from budget_id.
488async fn is_budget_breached(
489    client: &ferriskey::Client,
490    budget_id: &str,
491    config: &PartitionConfig,
492) -> bool {
493    // Compute the real budget partition tag
494    let bid = match BudgetId::parse(budget_id) {
495        Ok(id) => id,
496        Err(_) => return false, // invalid budget_id → treat as not breached
497    };
498    let partition = budget_partition(&bid, config);
499    let tag = partition.hash_tag();
500    let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
501    let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
502
503    // Read hard limits — fail-closed: if Valkey read fails, treat as breached
504    // (keep execution blocked) rather than silently unblocking
505    let limits: Vec<String> = match client
506        .cmd("HGETALL")
507        .arg(&limits_key)
508        .execute()
509        .await
510    {
511        Ok(v) => v,
512        Err(e) => {
513            tracing::error!(
514                budget_id,
515                error = %e,
516                "unblock_scanner: budget limits read failed, keeping blocked (fail-closed)"
517            );
518            return true; // treat as breached
519        }
520    };
521
522    let mut i = 0;
523    while i + 1 < limits.len() {
524        let field = &limits[i];
525        let limit_str = &limits[i + 1];
526        i += 2;
527
528        if !field.starts_with("hard:") {
529            continue;
530        }
531        let dimension = &field[5..];
532        let limit: u64 = match limit_str.parse() {
533            Ok(v) if v > 0 => v,
534            _ => continue,
535        };
536
537        let usage_str: Option<String> = match client
538            .cmd("HGET")
539            .arg(&usage_key)
540            .arg(dimension)
541            .execute()
542            .await
543        {
544            Ok(v) => v,
545            Err(e) => {
546                tracing::error!(
547                    budget_id,
548                    dimension,
549                    error = %e,
550                    "unblock_scanner: budget usage read failed, keeping blocked (fail-closed)"
551                );
552                return true; // treat as breached
553            }
554        };
555        let usage: u64 = usage_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
556
557        if usage >= limit {
558            return true; // breached
559        }
560    }
561
562    false
563}
564
565/// Check if quota blocking condition has cleared.
566/// Re-checks the sliding window after cleanup.
567/// Computes real {q:K} partition tag from quota_policy_id.
568async fn check_quota_cleared(
569    client: &ferriskey::Client,
570    backend: Option<&Arc<dyn EngineBackend>>,
571    core_key: &str,
572    _eid_str: &str,
573    config: &PartitionConfig,
574) -> bool {
575    // Read quota_policy_id from exec_core — fail-closed on Valkey error
576    let quota_id: Option<String> = match client
577        .cmd("HGET")
578        .arg(core_key)
579        .arg("quota_policy_id")
580        .execute()
581        .await
582    {
583        Ok(v) => v,
584        Err(e) => {
585            tracing::error!(
586                core_key,
587                error = %e,
588                "unblock_scanner: quota_policy_id read failed, keeping blocked (fail-closed)"
589            );
590            return false;
591        }
592    };
593
594    let quota_id = match quota_id {
595        Some(s) if !s.is_empty() => s,
596        _ => return true, // no quota → unblock
597    };
598
599    // Compute real quota partition tag
600    let qid = match ff_core::types::QuotaPolicyId::parse(&quota_id) {
601        Ok(id) => id,
602        Err(_) => return true, // invalid → unblock (advisory)
603    };
604    let partition = ff_core::partition::quota_partition(&qid, config);
605    let tag = partition.hash_tag();
606
607    let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id);
608    let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
609    let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
610
611    // Read quota definition fields — fail-closed on Valkey error
612    let def_fields: Vec<Option<String>> = match client
613        .cmd("HMGET")
614        .arg(&quota_def_key)
615        .arg("max_requests_per_window")
616        .arg("requests_per_window_seconds")
617        .arg("active_concurrency_cap")
618        .execute()
619        .await
620    {
621        Ok(v) => v,
622        Err(e) => {
623            tracing::error!(
624                quota_id = %quota_id,
625                error = %e,
626                "unblock_scanner: quota definition read failed, keeping blocked (fail-closed)"
627            );
628            return false;
629        }
630    };
631    let rate_limit: u64 = def_fields.first()
632        .and_then(|v| v.as_ref())
633        .and_then(|s| s.parse().ok())
634        .unwrap_or(0);
635    let window_secs: u64 = def_fields.get(1)
636        .and_then(|v| v.as_ref())
637        .and_then(|s| s.parse().ok())
638        .unwrap_or(60);
639    let concurrency_cap: u64 = def_fields.get(2)
640        .and_then(|v| v.as_ref())
641        .and_then(|s| s.parse().ok())
642        .unwrap_or(0);
643
644    // Check rate: clean window, count
645    if rate_limit > 0 {
646        let now_ms_res: Result<u64, String> = if let Some(b) = backend {
647            b.server_time_ms().await.map_err(|e| e.to_string())
648        } else {
649            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
650        };
651        let now_ms = match now_ms_res {
652            Ok(t) => t,
653            Err(_) => return false,
654        };
655        let window_ms = window_secs * 1000;
656        let cutoff = (now_ms.saturating_sub(window_ms)).to_string();
657
658        let _: Result<i64, _> = client
659            .cmd("ZREMRANGEBYSCORE")
660            .arg(&window_key)
661            .arg("-inf")
662            .arg(&cutoff)
663            .execute()
664            .await;
665
666        let count: i64 = client
667            .cmd("ZCARD")
668            .arg(&window_key)
669            .execute()
670            .await
671            .unwrap_or(0);
672
673        if count as u64 >= rate_limit {
674            return false; // still at limit
675        }
676    }
677
678    // Check concurrency
679    if concurrency_cap > 0 {
680        let active: i64 = client
681            .cmd("GET")
682            .arg(&concurrency_key)
683            .execute()
684            .await
685            .unwrap_or(0);
686
687        if active as u64 >= concurrency_cap {
688            return false; // still at cap
689        }
690    }
691
692    true // quota cleared
693}
694
695/// Check if the capability-block has cleared: some connected worker's
696/// caps now cover the execution's `required_capabilities`.
697///
698/// The UNION of every connected worker's caps is cached on the scanner
699/// struct with a TTL equal to `interval`; so within one scan cycle every
700/// partition reuses the same snapshot, and between cycles a stale
701/// snapshot is automatically refreshed.
702///
703/// Fail-OPEN on union-load failure: if the SSCAN or fan-out GET errors
704/// out, assume a worker might match and let the scheduler re-decide on
705/// the next tick. The caps set is non-authoritative (Lua never reads it);
706/// treating it as "unknown → maybe" preserves liveness. Fail-closed
707/// would leave executions stuck whenever the caps read hits a transient
708/// Valkey error.
709async fn check_route_cleared(
710    client: &ferriskey::Client,
711    core_key: &str,
712    caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
713    namespace: &Namespace,
714) -> bool {
715    let required_csv: Option<String> = client
716        .cmd("HGET")
717        .arg(core_key)
718        .arg("required_capabilities")
719        .execute()
720        .await
721        .unwrap_or(None);
722    let required_csv = match required_csv {
723        Some(s) if !s.is_empty() => s,
724        _ => return true, // no required caps → anyone can claim
725    };
726
727    // Two-phase cache access: NEVER hold the mutex across the
728    // network refresh. Holding `AsyncMutex` across
729    // `load_worker_caps_union`'s SSCAN + HGET fan-out would serialize
730    // every tenant's cap-block decision behind one slow namespace's
731    // Valkey round trip — see PR #500 review.
732    //
733    // Phase 1: under the lock, decide if a refresh is needed and
734    // grab a current-or-empty snapshot for the fallback path.
735    //
736    // Phase 2: if stale, refresh off-lock, then re-acquire briefly
737    // to write the result. Racing refreshes for the same namespace
738    // resolve to "last writer wins" — acceptable because all fetches
739    // observe the same authoritative index at roughly the same time.
740    let (stale, cached_snapshot): (bool, BTreeSet<String>) = {
741        let mut guard = caps_cache.lock().await;
742        let ttl = guard.ttl;
743        let entry = guard
744            .entries
745            .entry(namespace.clone())
746            .or_insert(CapsUnionEntry {
747                snapshot: None,
748                fetched_at: None,
749            });
750        let stale = entry
751            .fetched_at
752            .map(|t| t.elapsed() >= ttl)
753            .unwrap_or(true);
754        (stale, entry.snapshot.clone().unwrap_or_default())
755    };
756
757    let snapshot: BTreeSet<String> = if stale {
758        match load_worker_caps_union(client, namespace).await {
759            Ok(union) => {
760                let refreshed = union.clone();
761                let mut guard = caps_cache.lock().await;
762                if let Some(entry) = guard.entries.get_mut(namespace) {
763                    entry.snapshot = Some(refreshed);
764                    entry.fetched_at = Some(Instant::now());
765                }
766                union
767            }
768            Err(e) => {
769                tracing::warn!(
770                    error = %e,
771                    namespace = %namespace,
772                    "unblock_scanner: failed to read worker caps union — \
773                     assuming match possible (fail-open to preserve liveness)"
774                );
775                return true;
776            }
777        }
778    } else {
779        cached_snapshot
780    };
781
782    // Subset check: every non-empty token in required_csv present in union.
783    required_csv
784        .split(',')
785        .filter(|t| !t.is_empty())
786        .all(|t| snapshot.contains(t))
787}
788
789/// Union of every connected worker's advertised capabilities within
790/// the given namespace.
791///
792/// #502 note — NOT folded into a single Lua FCALL by design: the
793/// `workers_index_key_ns` (`ff:idx:<ns>:workers`) and the per-worker
794/// `worker_caps_key_ns` (`ff:worker:<ns>:<inst>:caps`) do NOT share a
795/// Valkey hash-tag `{…}` in the current key shape. In cluster mode an
796/// FCALL only sees keys on a single slot; packaging SSCAN + per-worker
797/// HGET into one Lua body would silently drop workers whose caps key
798/// hashes to a different slot than the index SET. A restructure to
799/// share `{ns}` as a hash tag (e.g. `ff:idx:{<ns>}:workers` +
800/// `ff:worker:{<ns>}:<inst>:caps`) would require migrating the SDK
801/// preamble + `ff_register_worker` FCALL + every reader in one hop —
802/// cross-cutting change, deferred past the #502 closing-PR scope. The
803/// SSCAN + bounded-concurrent fan-out below is already the shape we'd
804/// emit from Lua, minus the single-round-trip framing.
805///
806/// Cluster-safe enumeration pattern (matches Batch A's index SETs for
807/// budget/flow/deps): SSCAN the namespace-scoped
808/// `workers_index_key_ns(namespace)` SET (single-slot, no hash tag
809/// needed — the key name literally hashes to one slot), then fan-out
810/// concurrent `HGET ff:worker:{ns}:{id}:caps caps_csv` with a bounded
811/// concurrency cap. A keyspace `SCAN MATCH ff:worker:*:caps` in
812/// cluster mode visits only one shard per call and silently drops
813/// workers on other shards — exactly the class of bug Batch A Issue
814/// #11 fixed.
815///
816/// Caps storage is a HASH (RFC-025 Phase 5 post-cutover — shape
817/// matches `ff_register_worker` FCALL + the SDK preamble), and only
818/// the `caps_csv` field drives the union. Per-worker HGET is a
819/// single-field read, not HGETALL, so the scanner pays only the bytes
820/// it uses.
821///
822/// SSCAN is used instead of SMEMBERS so a fleet of thousands of workers
823/// doesn't round-trip the entire member list in one reply. `COUNT = 100`
824/// matches the convention in budget_reconciler / flow_projector.
825///
826/// Empty `caps_csv` or missing key = "no caps for that worker";
827/// scanner keeps accumulating. Per-worker HGET error, in contrast,
828/// PROPAGATES — a previous version used `.unwrap_or(None)` which
829/// silently merged error and empty into the same branch, making a
830/// transient error look like "this worker has no caps". In a
831/// single-capable-worker fleet that produced false-negative unions and
832/// left executions blocked even though a matching worker existed,
833/// contradicting the scanner's documented fail-open behavior. Now an
834/// error bubbles up; the only caller (`check_route_cleared`) treats
835/// `Err` by returning `true` (unblock — "we don't know, let the
836/// scheduler re-decide next tick"), which preserves liveness uniformly
837/// whether the fault is SSCAN, HGET, or deeper transport.
838async fn load_worker_caps_union(
839    client: &ferriskey::Client,
840    namespace: &Namespace,
841) -> Result<BTreeSet<String>, ferriskey::Error> {
842    let mut union = BTreeSet::new();
843    let index_key = ff_core::keys::workers_index_key_ns(namespace);
844
845    // Helper: drain one completed future and fold its caps into the
846    // union, or propagate its error. Centralizing keeps the in-loop +
847    // drain paths symmetric (both must behave the same — a missed error
848    // at either point re-introduces the false-negative-union bug).
849    fn absorb(
850        union: &mut BTreeSet<String>,
851        res: Result<Option<String>, ferriskey::Error>,
852    ) -> Result<(), ferriskey::Error> {
853        let csv = res?;
854        if let Some(csv) = csv {
855            for token in csv.split(',') {
856                if !token.is_empty() {
857                    union.insert(token.to_owned());
858                }
859            }
860        }
861        Ok(())
862    }
863
864    // SSCAN loop — bounded per-page response size. Cursor starts at "0"
865    // and wraps back to "0" when iteration completes.
866    let mut cursor: String = "0".to_owned();
867    loop {
868        let reply: (String, Vec<String>) = client
869            .cmd("SSCAN")
870            .arg(&index_key)
871            .arg(&cursor)
872            .arg("COUNT")
873            .arg(WORKERS_SSCAN_COUNT.to_string().as_str())
874            .execute()
875            .await?;
876        cursor = reply.0;
877        let worker_ids = reply.1;
878
879        // Bounded concurrent HGETs per SSCAN page. FuturesUnordered with
880        // a buffered stream caps in-flight work at CAPS_GET_CONCURRENCY —
881        // enough parallelism to amortize round-trip latency, bounded so
882        // one scanner tick can't flood the shared Valkey client. Each
883        // spawned future returns `Result<Option<String>, Error>` so
884        // transient Valkey errors propagate up (no .unwrap_or(None)
885        // swallowing) — see the fn-level doc for why.
886        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
887        for id in worker_ids {
888            let client = client.clone();
889            let namespace = namespace.clone();
890            pending.push(async move {
891                let instance = ff_core::types::WorkerInstanceId::new(id);
892                let caps_key =
893                    ff_core::keys::worker_caps_key_ns(&namespace, &instance);
894                let csv: Option<String> = client
895                    .cmd("HGET")
896                    .arg(&caps_key)
897                    .arg("caps_csv")
898                    .execute()
899                    .await?;
900                Ok::<Option<String>, ferriskey::Error>(csv)
901            });
902            if pending.len() >= CAPS_GET_CONCURRENCY
903                && let Some(res) = pending.next().await
904            {
905                absorb(&mut union, res)?;
906            }
907        }
908        // Drain remaining pending GETs from this page before advancing
909        // the SSCAN cursor. Keeps the pipeline window bounded and the
910        // union observation consistent with "all workers visible so far".
911        while let Some(res) = pending.next().await {
912            absorb(&mut union, res)?;
913        }
914
915        if cursor == "0" {
916            break;
917        }
918    }
919    Ok(union)
920}