Skip to main content

ff_engine/scanner/
unblock.rs

1//! Unblock scanner for budget/quota/capability-blocked executions.
2//!
3//! Scans `ff:idx:{p:N}:lane:<lane>:blocked:{budget,quota,route}` per
4//! execution partition. For each blocked execution, re-evaluates the
5//! blocking condition. If cleared, calls `FCALL ff_unblock_execution`.
6//!
7//! Cross-partition budget check is cached per scan cycle (MANDATORY —
8//! without it, 50K blocked executions = 50K budget reads).
9//!
10//! Capability sweep reads the union of non-authoritative worker cap sets
11//! (`ff:worker:*:caps` — written by `ff-sdk::FlowFabricWorker::connect`)
12//! ONCE per scan cycle and uses it to decide whether a `waiting_for_capable_worker`
13//! execution has a matching worker. This is best-effort: caps sets may
14//! be slightly stale (TTL-less STRING, overwrite on restart), but the
15//! promotion path is self-correcting — a promoted execution that still
16//! can't be claimed gets re-blocked on the next scheduler tick. RFC-009
17//! §7.5 documents the v1 sweep approach and defers connect-triggered
18//! sweeps to V2.
19//!
20//! MUST skip `paused_by_flow_cancel` — only cancel_flow clears that.
21//!
22//! Reference: RFC-008 §2.4, RFC-009 §7.5, RFC-010 §6
23
24use std::collections::{BTreeSet, HashMap};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use futures::stream::{FuturesUnordered, StreamExt};
29use tokio::sync::Mutex as AsyncMutex;
30
31use ff_core::backend::ScannerFilter;
32use ff_core::keys::IndexKeys;
33use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition};
34use ff_core::types::{BudgetId, LaneId};
35
36use super::{should_skip_candidate, ScanResult, Scanner};
37
38const BATCH_SIZE: u32 = 100;
39
40/// SSCAN page size for the workers-index SET. Same COUNT the other
41/// index-SET scanners use (budget_reconciler, flow_projector). Bounds
42/// per-cursor round-trip response size; total iteration is still the
43/// full SET size across cursor=0 wrap.
44const WORKERS_SSCAN_COUNT: usize = 100;
45
46/// Per-worker caps GET fan-out concurrency cap. Mirrors the bounded
47/// parallelism W1 used in initialize_waitpoint_hmac_secret. Too low and
48/// large fleets (1000+ workers) pay serial round-trip latency; too high
49/// and we head-of-line the scanner client with a pipeline burst. 16 is
50/// a pragmatic middle for the current fleet scales.
51const CAPS_GET_CONCURRENCY: usize = 16;
52
53pub struct UnblockScanner {
54    interval: Duration,
55    lanes: Vec<LaneId>,
56    partition_config: PartitionConfig,
57    filter: ScannerFilter,
58    /// Shared worker-caps union cache across ALL partitions in one scan
59    /// pass. Previously this cache was declared inside `scan_partition`
60    /// which runs once PER PARTITION — at 256 partitions that meant up
61    /// to 256 redundant SSCAN + fan-out GET sequences per scan interval.
62    /// Now: one load per TTL window (= `interval`), shared across every
63    /// partition visited in that window. `TTL == interval` is natural:
64    /// a worker connecting "now" propagates into the caps union on the
65    /// next scan cycle, not faster or slower than the cycle itself.
66    ///
67    /// `Arc<AsyncMutex<_>>` because the Scanner trait's `scan_partition`
68    /// takes `&self`. Contention is bounded by the partition iteration
69    /// cadence (one partition at a time per scanner task), so the mutex
70    /// is effectively uncontended in steady state.
71    caps_cache: Arc<AsyncMutex<CapsUnionCache>>,
72}
73
74/// Worker-caps union snapshot with a monotonic freshness timestamp.
75/// `None` on first scan; filled by `get_or_load`. Invalidated when
76/// `Instant::now() - fetched_at >= ttl`.
77struct CapsUnionCache {
78    snapshot: Option<BTreeSet<String>>,
79    fetched_at: Option<Instant>,
80    ttl: Duration,
81}
82
83impl UnblockScanner {
84    pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
85        Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
86    }
87
88    /// Construct with a [`ScannerFilter`] applied per candidate
89    /// (issue #122).
90    pub fn with_filter(
91        interval: Duration,
92        lanes: Vec<LaneId>,
93        partition_config: PartitionConfig,
94        filter: ScannerFilter,
95    ) -> Self {
96        Self {
97            interval,
98            lanes,
99            partition_config,
100            filter,
101            caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
102                snapshot: None,
103                fetched_at: None,
104                ttl: interval,
105            })),
106        }
107    }
108}
109
110impl Scanner for UnblockScanner {
111    fn name(&self) -> &'static str {
112        "unblock"
113    }
114
115    fn interval(&self) -> Duration {
116        self.interval
117    }
118
119    fn filter(&self) -> &ScannerFilter {
120        &self.filter
121    }
122
123    async fn scan_partition(
124        &self,
125        client: &ferriskey::Client,
126        partition: u16,
127    ) -> ScanResult {
128        let p = Partition {
129            family: PartitionFamily::Execution,
130            index: partition,
131        };
132        let idx = IndexKeys::new(&p);
133
134        let mut total_processed: u32 = 0;
135        let mut total_errors: u32 = 0;
136
137        // Cross-partition budget cache: budget_id → is_breached.
138        // Reset per partition scan (each partition scan is one "cycle").
139        let mut budget_cache: HashMap<String, bool> = HashMap::new();
140
141        // Worker-caps union cache is shared across ALL partitions via
142        // the scanner struct (Arc<AsyncMutex<CapsUnionCache>>). `get_or_load`
143        // returns the cached snapshot if its fetched_at is within
144        // `interval` (TTL == scan interval), otherwise loads fresh via
145        // SSCAN + concurrent GET fan-out. Without this, at 256 partitions
146        // the old per-partition-local cache re-ran load_worker_caps_union
147        // up to 256× per cycle.
148        let caps_cache = self.caps_cache.clone();
149
150        for lane in &self.lanes {
151            // Scan blocked:budget
152            let budget_key = idx.lane_blocked_budget(lane);
153            let r = scan_blocked_set(
154                client, &p, &idx, lane, &budget_key,
155                "waiting_for_budget", &mut budget_cache,
156                &caps_cache,
157                &self.partition_config,
158                &self.filter,
159            ).await;
160            total_processed += r.processed;
161            total_errors += r.errors;
162
163            // Scan blocked:quota
164            let quota_key = idx.lane_blocked_quota(lane);
165            let r = scan_blocked_set(
166                client, &p, &idx, lane, &quota_key,
167                "waiting_for_quota", &mut budget_cache,
168                &caps_cache,
169                &self.partition_config,
170                &self.filter,
171            ).await;
172            total_processed += r.processed;
173            total_errors += r.errors;
174
175            // Scan blocked:route (capability-mismatch blocks). Promotion
176            // decision reads the union of connected workers' caps and
177            // checks subset coverage. See check_route_cleared below.
178            let route_key = idx.lane_blocked_route(lane);
179            let r = scan_blocked_set(
180                client, &p, &idx, lane, &route_key,
181                "waiting_for_capable_worker", &mut budget_cache,
182                &caps_cache,
183                &self.partition_config,
184                &self.filter,
185            ).await;
186            total_processed += r.processed;
187            total_errors += r.errors;
188        }
189
190        ScanResult {
191            processed: total_processed,
192            errors: total_errors,
193        }
194    }
195}
196
197/// Scan one blocked set and unblock executions whose condition has cleared.
198#[allow(clippy::too_many_arguments)]
199async fn scan_blocked_set(
200    client: &ferriskey::Client,
201    partition: &Partition,
202    idx: &IndexKeys,
203    lane: &LaneId,
204    blocked_key: &str,
205    expected_reason: &str,
206    budget_cache: &mut HashMap<String, bool>,
207    caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
208    partition_config: &PartitionConfig,
209    filter: &ScannerFilter,
210) -> ScanResult {
211    // Read all members from the blocked set (they're scored by block time)
212    let blocked: Vec<String> = match client
213        .cmd("ZRANGEBYSCORE")
214        .arg(blocked_key)
215        .arg("-inf")
216        .arg("+inf")
217        .arg("LIMIT")
218        .arg("0")
219        .arg(BATCH_SIZE.to_string().as_str())
220        .execute()
221        .await
222    {
223        Ok(ids) => ids,
224        Err(e) => {
225            tracing::warn!(
226                error = %e,
227                blocked_key,
228                "unblock_scanner: ZRANGEBYSCORE failed"
229            );
230            return ScanResult { processed: 0, errors: 1 };
231        }
232    };
233
234    if blocked.is_empty() {
235        return ScanResult { processed: 0, errors: 0 };
236    }
237
238    let mut processed: u32 = 0;
239    let mut errors: u32 = 0;
240    let tag = partition.hash_tag();
241
242    for eid_str in &blocked {
243        if should_skip_candidate(client, filter, partition.index, eid_str).await {
244            continue;
245        }
246        // Read blocking_reason from exec_core to confirm still blocked
247        let core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
248        let reason: Option<String> = match client
249            .cmd("HGET")
250            .arg(&core_key)
251            .arg("blocking_reason")
252            .execute()
253            .await
254        {
255            Ok(r) => r,
256            Err(e) => {
257                tracing::warn!(
258                    execution_id = eid_str.as_str(),
259                    error = %e,
260                    "unblock_scanner: HGET blocking_reason failed, skipping"
261                );
262                errors += 1;
263                continue;
264            }
265        };
266
267        let reason = reason.unwrap_or_default();
268
269        // Skip if not blocked by the expected reason (e.g. paused_by_flow_cancel)
270        if reason != expected_reason {
271            continue;
272        }
273
274        // Re-evaluate the blocking condition
275        let should_unblock = match expected_reason {
276            "waiting_for_budget" => {
277                check_budget_cleared(client, &core_key, budget_cache, partition_config).await
278            }
279            "waiting_for_quota" => {
280                check_quota_cleared(client, &core_key, eid_str, partition_config).await
281            }
282            "waiting_for_capable_worker" => {
283                check_route_cleared(client, &core_key, caps_cache).await
284            }
285            _ => false,
286        };
287
288        if !should_unblock {
289            continue;
290        }
291
292        // Unblock: FCALL ff_unblock_execution on {p:N}
293        let eligible_key = idx.lane_eligible(lane);
294        let keys: [&str; 3] = [&core_key, blocked_key, &eligible_key];
295
296        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
297            Ok(t) => t.to_string(),
298            Err(e) => {
299                tracing::warn!(
300                    execution_id = eid_str.as_str(),
301                    error = %e,
302                    "unblock_scanner: server TIME failed, skipping unblock"
303                );
304                errors += 1;
305                continue;
306            }
307        };
308        let argv: [&str; 3] = [eid_str, &now_ms, expected_reason];
309
310        match client
311            .fcall::<ferriskey::Value>("ff_unblock_execution", &keys, &argv)
312            .await
313        {
314            Ok(_) => {
315                tracing::info!(
316                    execution_id = eid_str.as_str(),
317                    reason = expected_reason,
318                    "unblock_scanner: execution unblocked"
319                );
320                processed += 1;
321            }
322            Err(e) => {
323                tracing::warn!(
324                    execution_id = eid_str.as_str(),
325                    error = %e,
326                    "unblock_scanner: ff_unblock_execution failed"
327                );
328                errors += 1;
329            }
330        }
331    }
332
333    ScanResult { processed, errors }
334}
335
336/// Check if budget blocking condition has cleared.
337/// Uses cross-partition cache to avoid redundant reads.
338async fn check_budget_cleared(
339    client: &ferriskey::Client,
340    core_key: &str,
341    cache: &mut HashMap<String, bool>,
342    config: &PartitionConfig,
343) -> bool {
344    // Read budget_ids from exec_core
345    let budget_ids_str: Option<String> = client
346        .cmd("HGET")
347        .arg(core_key)
348        .arg("budget_ids")
349        .execute()
350        .await
351        .unwrap_or(None);
352
353    let budget_ids_str = match budget_ids_str {
354        Some(s) if !s.is_empty() => s,
355        _ => return true, // no budgets → unblock
356    };
357
358    for budget_id in budget_ids_str.split(',') {
359        let budget_id = budget_id.trim();
360        if budget_id.is_empty() {
361            continue;
362        }
363
364        // Check cache first
365        if let Some(&breached) = cache.get(budget_id) {
366            if breached {
367                return false; // still breached
368            }
369            continue;
370        }
371
372        // Read from Valkey and cache
373        let breached = is_budget_breached(client, budget_id, config).await;
374        cache.insert(budget_id.to_owned(), breached);
375        if breached {
376            return false;
377        }
378    }
379
380    true // all budgets within limits
381}
382
383/// Read budget usage + limits and check if any hard limit is breached.
384/// Computes real {b:M} partition tag from budget_id.
385async fn is_budget_breached(
386    client: &ferriskey::Client,
387    budget_id: &str,
388    config: &PartitionConfig,
389) -> bool {
390    // Compute the real budget partition tag
391    let bid = match BudgetId::parse(budget_id) {
392        Ok(id) => id,
393        Err(_) => return false, // invalid budget_id → treat as not breached
394    };
395    let partition = budget_partition(&bid, config);
396    let tag = partition.hash_tag();
397    let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
398    let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
399
400    // Read hard limits — fail-closed: if Valkey read fails, treat as breached
401    // (keep execution blocked) rather than silently unblocking
402    let limits: Vec<String> = match client
403        .cmd("HGETALL")
404        .arg(&limits_key)
405        .execute()
406        .await
407    {
408        Ok(v) => v,
409        Err(e) => {
410            tracing::error!(
411                budget_id,
412                error = %e,
413                "unblock_scanner: budget limits read failed, keeping blocked (fail-closed)"
414            );
415            return true; // treat as breached
416        }
417    };
418
419    let mut i = 0;
420    while i + 1 < limits.len() {
421        let field = &limits[i];
422        let limit_str = &limits[i + 1];
423        i += 2;
424
425        if !field.starts_with("hard:") {
426            continue;
427        }
428        let dimension = &field[5..];
429        let limit: u64 = match limit_str.parse() {
430            Ok(v) if v > 0 => v,
431            _ => continue,
432        };
433
434        let usage_str: Option<String> = match client
435            .cmd("HGET")
436            .arg(&usage_key)
437            .arg(dimension)
438            .execute()
439            .await
440        {
441            Ok(v) => v,
442            Err(e) => {
443                tracing::error!(
444                    budget_id,
445                    dimension,
446                    error = %e,
447                    "unblock_scanner: budget usage read failed, keeping blocked (fail-closed)"
448                );
449                return true; // treat as breached
450            }
451        };
452        let usage: u64 = usage_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
453
454        if usage >= limit {
455            return true; // breached
456        }
457    }
458
459    false
460}
461
462/// Check if quota blocking condition has cleared.
463/// Re-checks the sliding window after cleanup.
464/// Computes real {q:K} partition tag from quota_policy_id.
465async fn check_quota_cleared(
466    client: &ferriskey::Client,
467    core_key: &str,
468    _eid_str: &str,
469    config: &PartitionConfig,
470) -> bool {
471    // Read quota_policy_id from exec_core — fail-closed on Valkey error
472    let quota_id: Option<String> = match client
473        .cmd("HGET")
474        .arg(core_key)
475        .arg("quota_policy_id")
476        .execute()
477        .await
478    {
479        Ok(v) => v,
480        Err(e) => {
481            tracing::error!(
482                core_key,
483                error = %e,
484                "unblock_scanner: quota_policy_id read failed, keeping blocked (fail-closed)"
485            );
486            return false;
487        }
488    };
489
490    let quota_id = match quota_id {
491        Some(s) if !s.is_empty() => s,
492        _ => return true, // no quota → unblock
493    };
494
495    // Compute real quota partition tag
496    let qid = match ff_core::types::QuotaPolicyId::parse(&quota_id) {
497        Ok(id) => id,
498        Err(_) => return true, // invalid → unblock (advisory)
499    };
500    let partition = ff_core::partition::quota_partition(&qid, config);
501    let tag = partition.hash_tag();
502
503    let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id);
504    let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
505    let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
506
507    // Read quota definition fields — fail-closed on Valkey error
508    let def_fields: Vec<Option<String>> = match client
509        .cmd("HMGET")
510        .arg(&quota_def_key)
511        .arg("max_requests_per_window")
512        .arg("requests_per_window_seconds")
513        .arg("active_concurrency_cap")
514        .execute()
515        .await
516    {
517        Ok(v) => v,
518        Err(e) => {
519            tracing::error!(
520                quota_id = %quota_id,
521                error = %e,
522                "unblock_scanner: quota definition read failed, keeping blocked (fail-closed)"
523            );
524            return false;
525        }
526    };
527    let rate_limit: u64 = def_fields.first()
528        .and_then(|v| v.as_ref())
529        .and_then(|s| s.parse().ok())
530        .unwrap_or(0);
531    let window_secs: u64 = def_fields.get(1)
532        .and_then(|v| v.as_ref())
533        .and_then(|s| s.parse().ok())
534        .unwrap_or(60);
535    let concurrency_cap: u64 = def_fields.get(2)
536        .and_then(|v| v.as_ref())
537        .and_then(|s| s.parse().ok())
538        .unwrap_or(0);
539
540    // Check rate: clean window, count
541    if rate_limit > 0 {
542        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
543            Ok(t) => t,
544            Err(_) => return false,
545        };
546        let window_ms = window_secs * 1000;
547        let cutoff = (now_ms.saturating_sub(window_ms)).to_string();
548
549        let _: Result<i64, _> = client
550            .cmd("ZREMRANGEBYSCORE")
551            .arg(&window_key)
552            .arg("-inf")
553            .arg(&cutoff)
554            .execute()
555            .await;
556
557        let count: i64 = client
558            .cmd("ZCARD")
559            .arg(&window_key)
560            .execute()
561            .await
562            .unwrap_or(0);
563
564        if count as u64 >= rate_limit {
565            return false; // still at limit
566        }
567    }
568
569    // Check concurrency
570    if concurrency_cap > 0 {
571        let active: i64 = client
572            .cmd("GET")
573            .arg(&concurrency_key)
574            .execute()
575            .await
576            .unwrap_or(0);
577
578        if active as u64 >= concurrency_cap {
579            return false; // still at cap
580        }
581    }
582
583    true // quota cleared
584}
585
586/// Check if the capability-block has cleared: some connected worker's
587/// caps now cover the execution's `required_capabilities`.
588///
589/// The UNION of every connected worker's caps is cached on the scanner
590/// struct with a TTL equal to `interval`; so within one scan cycle every
591/// partition reuses the same snapshot, and between cycles a stale
592/// snapshot is automatically refreshed.
593///
594/// Fail-OPEN on union-load failure: if the SSCAN or fan-out GET errors
595/// out, assume a worker might match and let the scheduler re-decide on
596/// the next tick. The caps set is non-authoritative (Lua never reads it);
597/// treating it as "unknown → maybe" preserves liveness. Fail-closed
598/// would leave executions stuck whenever the caps read hits a transient
599/// Valkey error.
600async fn check_route_cleared(
601    client: &ferriskey::Client,
602    core_key: &str,
603    caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
604) -> bool {
605    let required_csv: Option<String> = client
606        .cmd("HGET")
607        .arg(core_key)
608        .arg("required_capabilities")
609        .execute()
610        .await
611        .unwrap_or(None);
612    let required_csv = match required_csv {
613        Some(s) if !s.is_empty() => s,
614        _ => return true, // no required caps → anyone can claim
615    };
616
617    // Acquire the cache, return a cheap clone of the snapshot so the
618    // subset check runs OUTSIDE the mutex (BTreeSet clone is O(n) but
619    // n is bounded by total caps across the fleet — typically tens).
620    // Holding the mutex across the subset loop would serialize every
621    // partition's capability-block decision behind this one mutex.
622    let snapshot: BTreeSet<String> = {
623        let mut guard = caps_cache.lock().await;
624        let stale = guard
625            .fetched_at
626            .map(|t| t.elapsed() >= guard.ttl)
627            .unwrap_or(true);
628        if stale {
629            match load_worker_caps_union(client).await {
630                Ok(union) => {
631                    guard.snapshot = Some(union);
632                    guard.fetched_at = Some(Instant::now());
633                }
634                Err(e) => {
635                    tracing::warn!(
636                        error = %e,
637                        "unblock_scanner: failed to read worker caps union — \
638                         assuming match possible (fail-open to preserve liveness)"
639                    );
640                    return true;
641                }
642            }
643        }
644        guard.snapshot.clone().unwrap_or_default()
645    };
646
647    // Subset check: every non-empty token in required_csv present in union.
648    required_csv
649        .split(',')
650        .filter(|t| !t.is_empty())
651        .all(|t| snapshot.contains(t))
652}
653
654/// Union of every connected worker's advertised capabilities.
655///
656/// Cluster-safe enumeration pattern (matches Batch A's index SETs for
657/// budget/flow/deps): SSCAN the `workers_index_key()` SET (single-slot,
658/// no hash tag needed — the key name literally hashes to one slot), then
659/// fan-out concurrent `GET ff:worker:<id>:caps` with a bounded concurrency
660/// cap. A keyspace `SCAN MATCH ff:worker:*:caps` in cluster mode visits
661/// only one shard per call and silently drops workers on other shards —
662/// exactly the class of bug Batch A Issue #11 fixed.
663///
664/// SSCAN is used instead of SMEMBERS so a fleet of thousands of workers
665/// doesn't round-trip the entire member list in one reply. `COUNT = 100`
666/// matches the convention in budget_reconciler / flow_projector.
667///
668/// Empty caps STRING or missing key = "no caps for that worker"; scanner
669/// keeps accumulating. Per-worker GET error, in contrast, PROPAGATES — a
670/// previous version used `.unwrap_or(None)` which silently merged error
671/// and empty into the same branch, making a transient error look like
672/// "this worker has no caps". In a single-capable-worker fleet that
673/// produced false-negative unions and left executions blocked even
674/// though a matching worker existed, contradicting the scanner's
675/// documented fail-open behavior. Now an error bubbles up; the only
676/// caller (`check_route_cleared`) treats `Err` by returning `true`
677/// (unblock — "we don't know, let the scheduler re-decide next tick"),
678/// which preserves liveness uniformly whether the fault is SSCAN, GET,
679/// or deeper transport.
680async fn load_worker_caps_union(
681    client: &ferriskey::Client,
682) -> Result<BTreeSet<String>, ferriskey::Error> {
683    let mut union = BTreeSet::new();
684    let index_key = ff_core::keys::workers_index_key();
685
686    // Helper: drain one completed future and fold its caps into the
687    // union, or propagate its error. Centralizing keeps the in-loop +
688    // drain paths symmetric (both must behave the same — a missed error
689    // at either point re-introduces the false-negative-union bug).
690    fn absorb(
691        union: &mut BTreeSet<String>,
692        res: Result<Option<String>, ferriskey::Error>,
693    ) -> Result<(), ferriskey::Error> {
694        let csv = res?;
695        if let Some(csv) = csv {
696            for token in csv.split(',') {
697                if !token.is_empty() {
698                    union.insert(token.to_owned());
699                }
700            }
701        }
702        Ok(())
703    }
704
705    // SSCAN loop — bounded per-page response size. Cursor starts at "0"
706    // and wraps back to "0" when iteration completes.
707    let mut cursor: String = "0".to_owned();
708    loop {
709        let reply: (String, Vec<String>) = client
710            .cmd("SSCAN")
711            .arg(&index_key)
712            .arg(&cursor)
713            .arg("COUNT")
714            .arg(WORKERS_SSCAN_COUNT.to_string().as_str())
715            .execute()
716            .await?;
717        cursor = reply.0;
718        let worker_ids = reply.1;
719
720        // Bounded concurrent GETs per SSCAN page. FuturesUnordered with
721        // a buffered stream caps in-flight work at CAPS_GET_CONCURRENCY —
722        // enough parallelism to amortize round-trip latency, bounded so
723        // one scanner tick can't flood the shared Valkey client. Each
724        // spawned future returns `Result<Option<String>, Error>` so
725        // transient Valkey errors propagate up (no .unwrap_or(None)
726        // swallowing) — see the fn-level doc for why.
727        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
728        for id in worker_ids {
729            let client = client.clone();
730            pending.push(async move {
731                let caps_key = format!("ff:worker:{}:caps", id);
732                let csv: Option<String> = client
733                    .cmd("GET")
734                    .arg(&caps_key)
735                    .execute()
736                    .await?;
737                Ok::<Option<String>, ferriskey::Error>(csv)
738            });
739            if pending.len() >= CAPS_GET_CONCURRENCY
740                && let Some(res) = pending.next().await
741            {
742                absorb(&mut union, res)?;
743            }
744        }
745        // Drain remaining pending GETs from this page before advancing
746        // the SSCAN cursor. Keeps the pipeline window bounded and the
747        // union observation consistent with "all workers visible so far".
748        while let Some(res) = pending.next().await {
749            absorb(&mut union, res)?;
750        }
751
752        if cursor == "0" {
753            break;
754        }
755    }
756    Ok(union)
757}