ff_engine/scanner/unblock.rs
1//! Unblock scanner for budget/quota/capability-blocked executions.
2//!
3//! Scans `ff:idx:{p:N}:lane:<lane>:blocked:{budget,quota,route}` per
4//! execution partition. For each blocked execution, re-evaluates the
5//! blocking condition. If cleared, calls `FCALL ff_unblock_execution`.
6//!
7//! Cross-partition budget check is cached per scan cycle (MANDATORY —
8//! without it, 50K blocked executions = 50K budget reads).
9//!
10//! Capability sweep reads the union of non-authoritative worker cap
11//! HASHes (`ff:worker:{ns}:{instance_id}:caps` — written by
12//! `ff-sdk::FlowFabricWorker::connect` and by the `ff_register_worker`
13//! FCALL) ONCE per scan cycle PER NAMESPACE, and uses it to decide
14//! whether a `waiting_for_capable_worker` execution has a matching
15//! worker. This is best-effort: caps HASHes may be slightly stale
16//! (TTL'd, refreshed on connect), but the promotion path is
17//! self-correcting — a promoted execution that still can't be claimed
18//! gets re-blocked on the next scheduler tick. RFC-009 §7.5 documents
19//! the v1 sweep approach and defers connect-triggered sweeps to V2.
20//!
21//! RFC-025 Phase 5 cutover: caps reads go through the namespace-scoped
22//! `ff:idx:{ns}:workers` + `ff:worker:{ns}:{id}:caps` helpers; the
23//! cache keys off `Namespace` so multi-tenant deployments don't mix
24//! capability sets across tenants.
25//!
26//! MUST skip `paused_by_flow_cancel` — only cancel_flow clears that.
27//!
28//! Reference: RFC-008 §2.4, RFC-009 §7.5, RFC-010 §6
29
30use std::collections::{BTreeSet, HashMap};
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34use futures::stream::{FuturesUnordered, StreamExt};
35use tokio::sync::Mutex as AsyncMutex;
36
37use ff_core::backend::ScannerFilter;
38use ff_core::engine_backend::EngineBackend;
39use ff_core::keys::IndexKeys;
40use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition};
41use ff_core::types::{BudgetId, ExecutionId, LaneId, Namespace, TimestampMs};
42
43use super::{should_skip_candidate, ScanResult, Scanner};
44
45const BATCH_SIZE: u32 = 100;
46
47/// SSCAN page size for the workers-index SET. Same COUNT the other
48/// index-SET scanners use (budget_reconciler, flow_projector). Bounds
49/// per-cursor round-trip response size; total iteration is still the
50/// full SET size across cursor=0 wrap.
51const WORKERS_SSCAN_COUNT: usize = 100;
52
53/// Per-worker caps GET fan-out concurrency cap. Mirrors the bounded
54/// parallelism W1 used in initialize_waitpoint_hmac_secret. Too low and
55/// large fleets (1000+ workers) pay serial round-trip latency; too high
56/// and we head-of-line the scanner client with a pipeline burst. 16 is
57/// a pragmatic middle for the current fleet scales.
58const CAPS_GET_CONCURRENCY: usize = 16;
59
60pub struct UnblockScanner {
61 interval: Duration,
62 lanes: Vec<LaneId>,
63 partition_config: PartitionConfig,
64 filter: ScannerFilter,
65 /// Shared worker-caps union cache across ALL partitions in one scan
66 /// pass. Previously this cache was declared inside `scan_partition`
67 /// which runs once PER PARTITION — at 256 partitions that meant up
68 /// to 256 redundant SSCAN + fan-out GET sequences per scan interval.
69 /// Now: one load per TTL window (= `interval`), shared across every
70 /// partition visited in that window. `TTL == interval` is natural:
71 /// a worker connecting "now" propagates into the caps union on the
72 /// next scan cycle, not faster or slower than the cycle itself.
73 ///
74 /// `Arc<AsyncMutex<_>>` because the Scanner trait's `scan_partition`
75 /// takes `&self`. Contention is bounded by the partition iteration
76 /// cadence (one partition at a time per scanner task), so the mutex
77 /// is effectively uncontended in steady state.
78 caps_cache: Arc<AsyncMutex<CapsUnionCache>>,
79 backend: Option<Arc<dyn EngineBackend>>,
80}
81
82/// Per-namespace worker-caps union cache. Every blocked execution
83/// carries a `namespace` field on `exec_core`; the scanner reads that
84/// alongside `blocking_reason` and routes the subset check to the
85/// union of caps advertised by workers in THAT namespace only. Absent
86/// this partitioning, a tenant-A worker advertising
87/// `required_caps=[gpu]` would unblock a tenant-B execution waiting on
88/// the same token.
89struct CapsUnionCache {
90 /// TTL applied uniformly to every namespace entry — same cadence
91 /// as `UnblockScanner::interval`, so a freshly-connected worker
92 /// propagates into its namespace's union on the next scan cycle.
93 ttl: Duration,
94 /// Entry per observed namespace. Grows as new namespaces surface
95 /// on blocked executions; entries do NOT expire (the bounded fleet
96 /// of namespaces is small and refreshing an unused entry on next
97 /// visit costs one SSCAN). Cleared on scanner restart.
98 entries: HashMap<Namespace, CapsUnionEntry>,
99}
100
101struct CapsUnionEntry {
102 snapshot: Option<BTreeSet<String>>,
103 fetched_at: Option<Instant>,
104}
105
106impl UnblockScanner {
107 pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
108 Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
109 }
110
111 /// Construct with a [`ScannerFilter`] applied per candidate
112 /// (issue #122).
113 pub fn with_filter(
114 interval: Duration,
115 lanes: Vec<LaneId>,
116 partition_config: PartitionConfig,
117 filter: ScannerFilter,
118 ) -> Self {
119 Self {
120 interval,
121 lanes,
122 partition_config,
123 filter,
124 caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
125 ttl: interval,
126 entries: HashMap::new(),
127 })),
128 backend: None,
129 }
130 }
131
132 /// PR-7b Cluster 1: wire an `EngineBackend` for filter-resolution
133 /// reads. FCALL routing is cluster 2 scope.
134 pub fn with_filter_and_backend(
135 interval: Duration,
136 lanes: Vec<LaneId>,
137 partition_config: PartitionConfig,
138 filter: ScannerFilter,
139 backend: Arc<dyn EngineBackend>,
140 ) -> Self {
141 Self {
142 interval,
143 lanes,
144 partition_config,
145 filter,
146 caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
147 ttl: interval,
148 entries: HashMap::new(),
149 })),
150 backend: Some(backend),
151 }
152 }
153}
154
155impl Scanner for UnblockScanner {
156 fn name(&self) -> &'static str {
157 "unblock"
158 }
159
160 fn interval(&self) -> Duration {
161 self.interval
162 }
163
164 fn filter(&self) -> &ScannerFilter {
165 &self.filter
166 }
167
168 async fn scan_partition(
169 &self,
170 client: &ferriskey::Client,
171 partition: u16,
172 ) -> ScanResult {
173 let p = Partition {
174 family: PartitionFamily::Execution,
175 index: partition,
176 };
177 let idx = IndexKeys::new(&p);
178
179 let mut total_processed: u32 = 0;
180 let mut total_errors: u32 = 0;
181
182 // Cross-partition budget cache: budget_id → is_breached.
183 // Reset per partition scan (each partition scan is one "cycle").
184 let mut budget_cache: HashMap<String, bool> = HashMap::new();
185
186 // Worker-caps union cache is shared across ALL partitions via
187 // the scanner struct (Arc<AsyncMutex<CapsUnionCache>>). `get_or_load`
188 // returns the cached snapshot if its fetched_at is within
189 // `interval` (TTL == scan interval), otherwise loads fresh via
190 // SSCAN + concurrent GET fan-out. Without this, at 256 partitions
191 // the old per-partition-local cache re-ran load_worker_caps_union
192 // up to 256× per cycle.
193 let caps_cache = self.caps_cache.clone();
194
195 for lane in &self.lanes {
196 // Scan blocked:budget
197 let budget_key = idx.lane_blocked_budget(lane);
198 let r = scan_blocked_set(
199 client, self.backend.as_ref(), &p, &idx, lane, &budget_key,
200 "waiting_for_budget", &mut budget_cache,
201 &caps_cache,
202 &self.partition_config,
203 &self.filter,
204 ).await;
205 total_processed += r.processed;
206 total_errors += r.errors;
207
208 // Scan blocked:quota
209 let quota_key = idx.lane_blocked_quota(lane);
210 let r = scan_blocked_set(
211 client, self.backend.as_ref(), &p, &idx, lane, "a_key,
212 "waiting_for_quota", &mut budget_cache,
213 &caps_cache,
214 &self.partition_config,
215 &self.filter,
216 ).await;
217 total_processed += r.processed;
218 total_errors += r.errors;
219
220 // Scan blocked:route (capability-mismatch blocks). Promotion
221 // decision reads the union of connected workers' caps and
222 // checks subset coverage. See check_route_cleared below.
223 let route_key = idx.lane_blocked_route(lane);
224 let r = scan_blocked_set(
225 client, self.backend.as_ref(), &p, &idx, lane, &route_key,
226 "waiting_for_capable_worker", &mut budget_cache,
227 &caps_cache,
228 &self.partition_config,
229 &self.filter,
230 ).await;
231 total_processed += r.processed;
232 total_errors += r.errors;
233 }
234
235 ScanResult {
236 processed: total_processed,
237 errors: total_errors,
238 }
239 }
240}
241
242/// Scan one blocked set and unblock executions whose condition has cleared.
243#[allow(clippy::too_many_arguments)]
244async fn scan_blocked_set(
245 client: &ferriskey::Client,
246 backend: Option<&Arc<dyn EngineBackend>>,
247 partition: &Partition,
248 idx: &IndexKeys,
249 lane: &LaneId,
250 blocked_key: &str,
251 expected_reason: &str,
252 budget_cache: &mut HashMap<String, bool>,
253 caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
254 partition_config: &PartitionConfig,
255 filter: &ScannerFilter,
256) -> ScanResult {
257 // Read all members from the blocked set (they're scored by block time)
258 let blocked: Vec<String> = match client
259 .cmd("ZRANGEBYSCORE")
260 .arg(blocked_key)
261 .arg("-inf")
262 .arg("+inf")
263 .arg("LIMIT")
264 .arg("0")
265 .arg(BATCH_SIZE.to_string().as_str())
266 .execute()
267 .await
268 {
269 Ok(ids) => ids,
270 Err(e) => {
271 tracing::warn!(
272 error = %e,
273 blocked_key,
274 "unblock_scanner: ZRANGEBYSCORE failed"
275 );
276 return ScanResult { processed: 0, errors: 1 };
277 }
278 };
279
280 if blocked.is_empty() {
281 return ScanResult { processed: 0, errors: 0 };
282 }
283
284 let mut processed: u32 = 0;
285 let mut errors: u32 = 0;
286 let tag = partition.hash_tag();
287
288 for eid_str in &blocked {
289 if should_skip_candidate(backend, filter, partition.index, eid_str).await {
290 continue;
291 }
292 // Read blocking_reason + namespace from exec_core. Namespace
293 // gates the caps-union subset check so tenant-A worker caps
294 // never unblock a tenant-B `blocked_by_route` execution —
295 // RFC-025 Phase 5 multi-tenant safety property.
296 let core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
297 let fields: Vec<Option<String>> = match client
298 .cmd("HMGET")
299 .arg(&core_key)
300 .arg("blocking_reason")
301 .arg("namespace")
302 .execute()
303 .await
304 {
305 Ok(r) => r,
306 Err(e) => {
307 tracing::warn!(
308 execution_id = eid_str.as_str(),
309 error = %e,
310 "unblock_scanner: HMGET blocking_reason/namespace failed, skipping"
311 );
312 errors += 1;
313 continue;
314 }
315 };
316 if fields.len() < 2 {
317 tracing::warn!(
318 execution_id = eid_str.as_str(),
319 returned_fields = fields.len(),
320 "unblock_scanner: HMGET returned < 2 fields, skipping"
321 );
322 errors += 1;
323 continue;
324 }
325 let reason = fields[0].as_deref().unwrap_or("");
326 let namespace = match fields[1].as_deref().filter(|s| !s.is_empty()) {
327 Some(ns_str) => Namespace::new(ns_str),
328 None => {
329 // Every execution gets a namespace at create-time
330 // (`ff_create_execution` HSETs it). Missing here = data
331 // integrity defect; skip rather than risk a
332 // cross-tenant promotion.
333 tracing::warn!(
334 execution_id = eid_str.as_str(),
335 core_key = %core_key,
336 "unblock_scanner: exec_core missing namespace field, skipping"
337 );
338 errors += 1;
339 continue;
340 }
341 };
342
343 // Skip if not blocked by the expected reason (e.g. paused_by_flow_cancel)
344 if reason != expected_reason {
345 continue;
346 }
347
348 // Re-evaluate the blocking condition
349 let should_unblock = match expected_reason {
350 "waiting_for_budget" => {
351 check_budget_cleared(client, &core_key, budget_cache, partition_config).await
352 }
353 "waiting_for_quota" => {
354 check_quota_cleared(client, backend, &core_key, eid_str, partition_config).await
355 }
356 "waiting_for_capable_worker" => {
357 check_route_cleared(client, &core_key, caps_cache, &namespace).await
358 }
359 _ => false,
360 };
361
362 if !should_unblock {
363 continue;
364 }
365
366 // Unblock: trait-route → `EngineBackend::unblock_execution`
367 // (Valkey: wraps `ff_unblock_execution` FCALL on {p:N}).
368 let now_ms_res: Result<u64, String> = if let Some(b) = backend {
369 b.server_time_ms().await.map_err(|e| e.to_string())
370 } else {
371 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
372 };
373 let now_ms = match now_ms_res {
374 Ok(t) => t,
375 Err(e) => {
376 tracing::warn!(
377 execution_id = eid_str.as_str(),
378 error = %e,
379 "unblock_scanner: server TIME failed, skipping unblock"
380 );
381 errors += 1;
382 continue;
383 }
384 };
385
386 let res = if let Some(backend_arc) = backend {
387 let Ok(eid) = ExecutionId::parse(eid_str) else {
388 tracing::warn!(execution_id = eid_str.as_str(), "malformed eid; skipping");
389 continue;
390 };
391 backend_arc
392 .unblock_execution(
393 *partition,
394 lane,
395 &eid,
396 expected_reason,
397 TimestampMs::from_millis(now_ms as i64),
398 )
399 .await
400 .map_err(|e| e.to_string())
401 } else {
402 // Test-only fallback: direct FCALL on the scanner client.
403 // Mirrors cluster-1 lease_expiry. KEYS(3)/ARGV(3) identical
404 // to the Valkey impl.
405 let eligible_key = idx.lane_eligible(lane);
406 let keys: [&str; 3] = [&core_key, blocked_key, &eligible_key];
407 let now_s = now_ms.to_string();
408 let argv: [&str; 3] = [eid_str, &now_s, expected_reason];
409 client
410 .fcall::<ferriskey::Value>("ff_unblock_execution", &keys, &argv)
411 .await
412 .map(|_: ferriskey::Value| ())
413 .map_err(|e| e.to_string())
414 };
415
416 match res {
417 Ok(()) => {
418 tracing::info!(
419 execution_id = eid_str.as_str(),
420 reason = expected_reason,
421 "unblock_scanner: execution unblocked"
422 );
423 processed += 1;
424 }
425 Err(e) => {
426 tracing::warn!(
427 execution_id = eid_str.as_str(),
428 error = %e,
429 "unblock_scanner: unblock_execution failed"
430 );
431 errors += 1;
432 }
433 }
434 }
435
436 ScanResult { processed, errors }
437}
438
439/// Check if budget blocking condition has cleared.
440/// Uses cross-partition cache to avoid redundant reads.
441async fn check_budget_cleared(
442 client: &ferriskey::Client,
443 core_key: &str,
444 cache: &mut HashMap<String, bool>,
445 config: &PartitionConfig,
446) -> bool {
447 // Read budget_ids from exec_core
448 let budget_ids_str: Option<String> = client
449 .cmd("HGET")
450 .arg(core_key)
451 .arg("budget_ids")
452 .execute()
453 .await
454 .unwrap_or(None);
455
456 let budget_ids_str = match budget_ids_str {
457 Some(s) if !s.is_empty() => s,
458 _ => return true, // no budgets → unblock
459 };
460
461 for budget_id in budget_ids_str.split(',') {
462 let budget_id = budget_id.trim();
463 if budget_id.is_empty() {
464 continue;
465 }
466
467 // Check cache first
468 if let Some(&breached) = cache.get(budget_id) {
469 if breached {
470 return false; // still breached
471 }
472 continue;
473 }
474
475 // Read from Valkey and cache
476 let breached = is_budget_breached(client, budget_id, config).await;
477 cache.insert(budget_id.to_owned(), breached);
478 if breached {
479 return false;
480 }
481 }
482
483 true // all budgets within limits
484}
485
486/// Read budget usage + limits and check if any hard limit is breached.
487/// Computes real {b:M} partition tag from budget_id.
488async fn is_budget_breached(
489 client: &ferriskey::Client,
490 budget_id: &str,
491 config: &PartitionConfig,
492) -> bool {
493 // Compute the real budget partition tag
494 let bid = match BudgetId::parse(budget_id) {
495 Ok(id) => id,
496 Err(_) => return false, // invalid budget_id → treat as not breached
497 };
498 let partition = budget_partition(&bid, config);
499 let tag = partition.hash_tag();
500 let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
501 let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
502
503 // Read hard limits — fail-closed: if Valkey read fails, treat as breached
504 // (keep execution blocked) rather than silently unblocking
505 let limits: Vec<String> = match client
506 .cmd("HGETALL")
507 .arg(&limits_key)
508 .execute()
509 .await
510 {
511 Ok(v) => v,
512 Err(e) => {
513 tracing::error!(
514 budget_id,
515 error = %e,
516 "unblock_scanner: budget limits read failed, keeping blocked (fail-closed)"
517 );
518 return true; // treat as breached
519 }
520 };
521
522 let mut i = 0;
523 while i + 1 < limits.len() {
524 let field = &limits[i];
525 let limit_str = &limits[i + 1];
526 i += 2;
527
528 if !field.starts_with("hard:") {
529 continue;
530 }
531 let dimension = &field[5..];
532 let limit: u64 = match limit_str.parse() {
533 Ok(v) if v > 0 => v,
534 _ => continue,
535 };
536
537 let usage_str: Option<String> = match client
538 .cmd("HGET")
539 .arg(&usage_key)
540 .arg(dimension)
541 .execute()
542 .await
543 {
544 Ok(v) => v,
545 Err(e) => {
546 tracing::error!(
547 budget_id,
548 dimension,
549 error = %e,
550 "unblock_scanner: budget usage read failed, keeping blocked (fail-closed)"
551 );
552 return true; // treat as breached
553 }
554 };
555 let usage: u64 = usage_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
556
557 if usage >= limit {
558 return true; // breached
559 }
560 }
561
562 false
563}
564
565/// Check if quota blocking condition has cleared.
566/// Re-checks the sliding window after cleanup.
567/// Computes real {q:K} partition tag from quota_policy_id.
568async fn check_quota_cleared(
569 client: &ferriskey::Client,
570 backend: Option<&Arc<dyn EngineBackend>>,
571 core_key: &str,
572 _eid_str: &str,
573 config: &PartitionConfig,
574) -> bool {
575 // Read quota_policy_id from exec_core — fail-closed on Valkey error
576 let quota_id: Option<String> = match client
577 .cmd("HGET")
578 .arg(core_key)
579 .arg("quota_policy_id")
580 .execute()
581 .await
582 {
583 Ok(v) => v,
584 Err(e) => {
585 tracing::error!(
586 core_key,
587 error = %e,
588 "unblock_scanner: quota_policy_id read failed, keeping blocked (fail-closed)"
589 );
590 return false;
591 }
592 };
593
594 let quota_id = match quota_id {
595 Some(s) if !s.is_empty() => s,
596 _ => return true, // no quota → unblock
597 };
598
599 // Compute real quota partition tag
600 let qid = match ff_core::types::QuotaPolicyId::parse("a_id) {
601 Ok(id) => id,
602 Err(_) => return true, // invalid → unblock (advisory)
603 };
604 let partition = ff_core::partition::quota_partition(&qid, config);
605 let tag = partition.hash_tag();
606
607 let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id);
608 let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
609 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
610
611 // Read quota definition fields — fail-closed on Valkey error
612 let def_fields: Vec<Option<String>> = match client
613 .cmd("HMGET")
614 .arg("a_def_key)
615 .arg("max_requests_per_window")
616 .arg("requests_per_window_seconds")
617 .arg("active_concurrency_cap")
618 .execute()
619 .await
620 {
621 Ok(v) => v,
622 Err(e) => {
623 tracing::error!(
624 quota_id = %quota_id,
625 error = %e,
626 "unblock_scanner: quota definition read failed, keeping blocked (fail-closed)"
627 );
628 return false;
629 }
630 };
631 let rate_limit: u64 = def_fields.first()
632 .and_then(|v| v.as_ref())
633 .and_then(|s| s.parse().ok())
634 .unwrap_or(0);
635 let window_secs: u64 = def_fields.get(1)
636 .and_then(|v| v.as_ref())
637 .and_then(|s| s.parse().ok())
638 .unwrap_or(60);
639 let concurrency_cap: u64 = def_fields.get(2)
640 .and_then(|v| v.as_ref())
641 .and_then(|s| s.parse().ok())
642 .unwrap_or(0);
643
644 // Check rate: clean window, count
645 if rate_limit > 0 {
646 let now_ms_res: Result<u64, String> = if let Some(b) = backend {
647 b.server_time_ms().await.map_err(|e| e.to_string())
648 } else {
649 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
650 };
651 let now_ms = match now_ms_res {
652 Ok(t) => t,
653 Err(_) => return false,
654 };
655 let window_ms = window_secs * 1000;
656 let cutoff = (now_ms.saturating_sub(window_ms)).to_string();
657
658 let _: Result<i64, _> = client
659 .cmd("ZREMRANGEBYSCORE")
660 .arg(&window_key)
661 .arg("-inf")
662 .arg(&cutoff)
663 .execute()
664 .await;
665
666 let count: i64 = client
667 .cmd("ZCARD")
668 .arg(&window_key)
669 .execute()
670 .await
671 .unwrap_or(0);
672
673 if count as u64 >= rate_limit {
674 return false; // still at limit
675 }
676 }
677
678 // Check concurrency
679 if concurrency_cap > 0 {
680 let active: i64 = client
681 .cmd("GET")
682 .arg(&concurrency_key)
683 .execute()
684 .await
685 .unwrap_or(0);
686
687 if active as u64 >= concurrency_cap {
688 return false; // still at cap
689 }
690 }
691
692 true // quota cleared
693}
694
695/// Check if the capability-block has cleared: some connected worker's
696/// caps now cover the execution's `required_capabilities`.
697///
698/// The UNION of every connected worker's caps is cached on the scanner
699/// struct with a TTL equal to `interval`; so within one scan cycle every
700/// partition reuses the same snapshot, and between cycles a stale
701/// snapshot is automatically refreshed.
702///
703/// Fail-OPEN on union-load failure: if the SSCAN or fan-out GET errors
704/// out, assume a worker might match and let the scheduler re-decide on
705/// the next tick. The caps set is non-authoritative (Lua never reads it);
706/// treating it as "unknown → maybe" preserves liveness. Fail-closed
707/// would leave executions stuck whenever the caps read hits a transient
708/// Valkey error.
709async fn check_route_cleared(
710 client: &ferriskey::Client,
711 core_key: &str,
712 caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
713 namespace: &Namespace,
714) -> bool {
715 let required_csv: Option<String> = client
716 .cmd("HGET")
717 .arg(core_key)
718 .arg("required_capabilities")
719 .execute()
720 .await
721 .unwrap_or(None);
722 let required_csv = match required_csv {
723 Some(s) if !s.is_empty() => s,
724 _ => return true, // no required caps → anyone can claim
725 };
726
727 // Two-phase cache access: NEVER hold the mutex across the
728 // network refresh. Holding `AsyncMutex` across
729 // `load_worker_caps_union`'s SSCAN + HGET fan-out would serialize
730 // every tenant's cap-block decision behind one slow namespace's
731 // Valkey round trip — see PR #500 review.
732 //
733 // Phase 1: under the lock, decide if a refresh is needed and
734 // grab a current-or-empty snapshot for the fallback path.
735 //
736 // Phase 2: if stale, refresh off-lock, then re-acquire briefly
737 // to write the result. Racing refreshes for the same namespace
738 // resolve to "last writer wins" — acceptable because all fetches
739 // observe the same authoritative index at roughly the same time.
740 let (stale, cached_snapshot): (bool, BTreeSet<String>) = {
741 let mut guard = caps_cache.lock().await;
742 let ttl = guard.ttl;
743 let entry = guard
744 .entries
745 .entry(namespace.clone())
746 .or_insert(CapsUnionEntry {
747 snapshot: None,
748 fetched_at: None,
749 });
750 let stale = entry
751 .fetched_at
752 .map(|t| t.elapsed() >= ttl)
753 .unwrap_or(true);
754 (stale, entry.snapshot.clone().unwrap_or_default())
755 };
756
757 let snapshot: BTreeSet<String> = if stale {
758 match load_worker_caps_union(client, namespace).await {
759 Ok(union) => {
760 let refreshed = union.clone();
761 let mut guard = caps_cache.lock().await;
762 if let Some(entry) = guard.entries.get_mut(namespace) {
763 entry.snapshot = Some(refreshed);
764 entry.fetched_at = Some(Instant::now());
765 }
766 union
767 }
768 Err(e) => {
769 tracing::warn!(
770 error = %e,
771 namespace = %namespace,
772 "unblock_scanner: failed to read worker caps union — \
773 assuming match possible (fail-open to preserve liveness)"
774 );
775 return true;
776 }
777 }
778 } else {
779 cached_snapshot
780 };
781
782 // Subset check: every non-empty token in required_csv present in union.
783 required_csv
784 .split(',')
785 .filter(|t| !t.is_empty())
786 .all(|t| snapshot.contains(t))
787}
788
789/// Union of every connected worker's advertised capabilities within
790/// the given namespace.
791///
792/// #502 note — NOT folded into a single Lua FCALL by design: the
793/// `workers_index_key_ns` (`ff:idx:<ns>:workers`) and the per-worker
794/// `worker_caps_key_ns` (`ff:worker:<ns>:<inst>:caps`) do NOT share a
795/// Valkey hash-tag `{…}` in the current key shape. In cluster mode an
796/// FCALL only sees keys on a single slot; packaging SSCAN + per-worker
797/// HGET into one Lua body would silently drop workers whose caps key
798/// hashes to a different slot than the index SET. A restructure to
799/// share `{ns}` as a hash tag (e.g. `ff:idx:{<ns>}:workers` +
800/// `ff:worker:{<ns>}:<inst>:caps`) would require migrating the SDK
801/// preamble + `ff_register_worker` FCALL + every reader in one hop —
802/// cross-cutting change, deferred past the #502 closing-PR scope. The
803/// SSCAN + bounded-concurrent fan-out below is already the shape we'd
804/// emit from Lua, minus the single-round-trip framing.
805///
806/// Cluster-safe enumeration pattern (matches Batch A's index SETs for
807/// budget/flow/deps): SSCAN the namespace-scoped
808/// `workers_index_key_ns(namespace)` SET (single-slot, no hash tag
809/// needed — the key name literally hashes to one slot), then fan-out
810/// concurrent `HGET ff:worker:{ns}:{id}:caps caps_csv` with a bounded
811/// concurrency cap. A keyspace `SCAN MATCH ff:worker:*:caps` in
812/// cluster mode visits only one shard per call and silently drops
813/// workers on other shards — exactly the class of bug Batch A Issue
814/// #11 fixed.
815///
816/// Caps storage is a HASH (RFC-025 Phase 5 post-cutover — shape
817/// matches `ff_register_worker` FCALL + the SDK preamble), and only
818/// the `caps_csv` field drives the union. Per-worker HGET is a
819/// single-field read, not HGETALL, so the scanner pays only the bytes
820/// it uses.
821///
822/// SSCAN is used instead of SMEMBERS so a fleet of thousands of workers
823/// doesn't round-trip the entire member list in one reply. `COUNT = 100`
824/// matches the convention in budget_reconciler / flow_projector.
825///
826/// Empty `caps_csv` or missing key = "no caps for that worker";
827/// scanner keeps accumulating. Per-worker HGET error, in contrast,
828/// PROPAGATES — a previous version used `.unwrap_or(None)` which
829/// silently merged error and empty into the same branch, making a
830/// transient error look like "this worker has no caps". In a
831/// single-capable-worker fleet that produced false-negative unions and
832/// left executions blocked even though a matching worker existed,
833/// contradicting the scanner's documented fail-open behavior. Now an
834/// error bubbles up; the only caller (`check_route_cleared`) treats
835/// `Err` by returning `true` (unblock — "we don't know, let the
836/// scheduler re-decide next tick"), which preserves liveness uniformly
837/// whether the fault is SSCAN, HGET, or deeper transport.
838async fn load_worker_caps_union(
839 client: &ferriskey::Client,
840 namespace: &Namespace,
841) -> Result<BTreeSet<String>, ferriskey::Error> {
842 let mut union = BTreeSet::new();
843 let index_key = ff_core::keys::workers_index_key_ns(namespace);
844
845 // Helper: drain one completed future and fold its caps into the
846 // union, or propagate its error. Centralizing keeps the in-loop +
847 // drain paths symmetric (both must behave the same — a missed error
848 // at either point re-introduces the false-negative-union bug).
849 fn absorb(
850 union: &mut BTreeSet<String>,
851 res: Result<Option<String>, ferriskey::Error>,
852 ) -> Result<(), ferriskey::Error> {
853 let csv = res?;
854 if let Some(csv) = csv {
855 for token in csv.split(',') {
856 if !token.is_empty() {
857 union.insert(token.to_owned());
858 }
859 }
860 }
861 Ok(())
862 }
863
864 // SSCAN loop — bounded per-page response size. Cursor starts at "0"
865 // and wraps back to "0" when iteration completes.
866 let mut cursor: String = "0".to_owned();
867 loop {
868 let reply: (String, Vec<String>) = client
869 .cmd("SSCAN")
870 .arg(&index_key)
871 .arg(&cursor)
872 .arg("COUNT")
873 .arg(WORKERS_SSCAN_COUNT.to_string().as_str())
874 .execute()
875 .await?;
876 cursor = reply.0;
877 let worker_ids = reply.1;
878
879 // Bounded concurrent HGETs per SSCAN page. FuturesUnordered with
880 // a buffered stream caps in-flight work at CAPS_GET_CONCURRENCY —
881 // enough parallelism to amortize round-trip latency, bounded so
882 // one scanner tick can't flood the shared Valkey client. Each
883 // spawned future returns `Result<Option<String>, Error>` so
884 // transient Valkey errors propagate up (no .unwrap_or(None)
885 // swallowing) — see the fn-level doc for why.
886 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
887 for id in worker_ids {
888 let client = client.clone();
889 let namespace = namespace.clone();
890 pending.push(async move {
891 let instance = ff_core::types::WorkerInstanceId::new(id);
892 let caps_key =
893 ff_core::keys::worker_caps_key_ns(&namespace, &instance);
894 let csv: Option<String> = client
895 .cmd("HGET")
896 .arg(&caps_key)
897 .arg("caps_csv")
898 .execute()
899 .await?;
900 Ok::<Option<String>, ferriskey::Error>(csv)
901 });
902 if pending.len() >= CAPS_GET_CONCURRENCY
903 && let Some(res) = pending.next().await
904 {
905 absorb(&mut union, res)?;
906 }
907 }
908 // Drain remaining pending GETs from this page before advancing
909 // the SSCAN cursor. Keeps the pipeline window bounded and the
910 // union observation consistent with "all workers visible so far".
911 while let Some(res) = pending.next().await {
912 absorb(&mut union, res)?;
913 }
914
915 if cursor == "0" {
916 break;
917 }
918 }
919 Ok(union)
920}