Skip to main content

ff_engine/scanner/
unblock.rs

1//! Unblock scanner for budget/quota/capability-blocked executions.
2//!
3//! Scans `ff:idx:{p:N}:lane:<lane>:blocked:{budget,quota,route}` per
4//! execution partition. For each blocked execution, re-evaluates the
5//! blocking condition. If cleared, calls `FCALL ff_unblock_execution`.
6//!
7//! Cross-partition budget check is cached per scan cycle (MANDATORY —
8//! without it, 50K blocked executions = 50K budget reads).
9//!
10//! Capability sweep reads the union of non-authoritative worker cap sets
11//! (`ff:worker:*:caps` — written by `ff-sdk::FlowFabricWorker::connect`)
12//! ONCE per scan cycle and uses it to decide whether a `waiting_for_capable_worker`
13//! execution has a matching worker. This is best-effort: caps sets may
14//! be slightly stale (TTL-less STRING, overwrite on restart), but the
15//! promotion path is self-correcting — a promoted execution that still
16//! can't be claimed gets re-blocked on the next scheduler tick. RFC-009
17//! §7.5 documents the v1 sweep approach and defers connect-triggered
18//! sweeps to V2.
19//!
20//! MUST skip `paused_by_flow_cancel` — only cancel_flow clears that.
21//!
22//! Reference: RFC-008 §2.4, RFC-009 §7.5, RFC-010 §6
23
24use std::collections::{BTreeSet, HashMap};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use futures::stream::{FuturesUnordered, StreamExt};
29use tokio::sync::Mutex as AsyncMutex;
30
31use ff_core::keys::IndexKeys;
32use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition};
33use ff_core::types::{BudgetId, LaneId};
34
35use super::{ScanResult, Scanner};
36
37const BATCH_SIZE: u32 = 100;
38
39/// SSCAN page size for the workers-index SET. Same COUNT the other
40/// index-SET scanners use (budget_reconciler, flow_projector). Bounds
41/// per-cursor round-trip response size; total iteration is still the
42/// full SET size across cursor=0 wrap.
43const WORKERS_SSCAN_COUNT: usize = 100;
44
45/// Per-worker caps GET fan-out concurrency cap. Mirrors the bounded
46/// parallelism W1 used in initialize_waitpoint_hmac_secret. Too low and
47/// large fleets (1000+ workers) pay serial round-trip latency; too high
48/// and we head-of-line the scanner client with a pipeline burst. 16 is
49/// a pragmatic middle for the current fleet scales.
50const CAPS_GET_CONCURRENCY: usize = 16;
51
52pub struct UnblockScanner {
53    interval: Duration,
54    lanes: Vec<LaneId>,
55    partition_config: PartitionConfig,
56    /// Shared worker-caps union cache across ALL partitions in one scan
57    /// pass. Previously this cache was declared inside `scan_partition`
58    /// which runs once PER PARTITION — at 256 partitions that meant up
59    /// to 256 redundant SSCAN + fan-out GET sequences per scan interval.
60    /// Now: one load per TTL window (= `interval`), shared across every
61    /// partition visited in that window. `TTL == interval` is natural:
62    /// a worker connecting "now" propagates into the caps union on the
63    /// next scan cycle, not faster or slower than the cycle itself.
64    ///
65    /// `Arc<AsyncMutex<_>>` because the Scanner trait's `scan_partition`
66    /// takes `&self`. Contention is bounded by the partition iteration
67    /// cadence (one partition at a time per scanner task), so the mutex
68    /// is effectively uncontended in steady state.
69    caps_cache: Arc<AsyncMutex<CapsUnionCache>>,
70}
71
72/// Worker-caps union snapshot with a monotonic freshness timestamp.
73/// `None` on first scan; filled by `get_or_load`. Invalidated when
74/// `Instant::now() - fetched_at >= ttl`.
75struct CapsUnionCache {
76    snapshot: Option<BTreeSet<String>>,
77    fetched_at: Option<Instant>,
78    ttl: Duration,
79}
80
81impl UnblockScanner {
82    pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
83        Self {
84            interval,
85            lanes,
86            partition_config,
87            caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
88                snapshot: None,
89                fetched_at: None,
90                ttl: interval,
91            })),
92        }
93    }
94}
95
96impl Scanner for UnblockScanner {
97    fn name(&self) -> &'static str {
98        "unblock"
99    }
100
101    fn interval(&self) -> Duration {
102        self.interval
103    }
104
105    async fn scan_partition(
106        &self,
107        client: &ferriskey::Client,
108        partition: u16,
109    ) -> ScanResult {
110        let p = Partition {
111            family: PartitionFamily::Execution,
112            index: partition,
113        };
114        let idx = IndexKeys::new(&p);
115
116        let mut total_processed: u32 = 0;
117        let mut total_errors: u32 = 0;
118
119        // Cross-partition budget cache: budget_id → is_breached.
120        // Reset per partition scan (each partition scan is one "cycle").
121        let mut budget_cache: HashMap<String, bool> = HashMap::new();
122
123        // Worker-caps union cache is shared across ALL partitions via
124        // the scanner struct (Arc<AsyncMutex<CapsUnionCache>>). `get_or_load`
125        // returns the cached snapshot if its fetched_at is within
126        // `interval` (TTL == scan interval), otherwise loads fresh via
127        // SSCAN + concurrent GET fan-out. Without this, at 256 partitions
128        // the old per-partition-local cache re-ran load_worker_caps_union
129        // up to 256× per cycle.
130        let caps_cache = self.caps_cache.clone();
131
132        for lane in &self.lanes {
133            // Scan blocked:budget
134            let budget_key = idx.lane_blocked_budget(lane);
135            let r = scan_blocked_set(
136                client, &p, &idx, lane, &budget_key,
137                "waiting_for_budget", &mut budget_cache,
138                &caps_cache,
139                &self.partition_config,
140            ).await;
141            total_processed += r.processed;
142            total_errors += r.errors;
143
144            // Scan blocked:quota
145            let quota_key = idx.lane_blocked_quota(lane);
146            let r = scan_blocked_set(
147                client, &p, &idx, lane, &quota_key,
148                "waiting_for_quota", &mut budget_cache,
149                &caps_cache,
150                &self.partition_config,
151            ).await;
152            total_processed += r.processed;
153            total_errors += r.errors;
154
155            // Scan blocked:route (capability-mismatch blocks). Promotion
156            // decision reads the union of connected workers' caps and
157            // checks subset coverage. See check_route_cleared below.
158            let route_key = idx.lane_blocked_route(lane);
159            let r = scan_blocked_set(
160                client, &p, &idx, lane, &route_key,
161                "waiting_for_capable_worker", &mut budget_cache,
162                &caps_cache,
163                &self.partition_config,
164            ).await;
165            total_processed += r.processed;
166            total_errors += r.errors;
167        }
168
169        ScanResult {
170            processed: total_processed,
171            errors: total_errors,
172        }
173    }
174}
175
176/// Scan one blocked set and unblock executions whose condition has cleared.
177#[allow(clippy::too_many_arguments)]
178async fn scan_blocked_set(
179    client: &ferriskey::Client,
180    partition: &Partition,
181    idx: &IndexKeys,
182    lane: &LaneId,
183    blocked_key: &str,
184    expected_reason: &str,
185    budget_cache: &mut HashMap<String, bool>,
186    caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
187    partition_config: &PartitionConfig,
188) -> ScanResult {
189    // Read all members from the blocked set (they're scored by block time)
190    let blocked: Vec<String> = match client
191        .cmd("ZRANGEBYSCORE")
192        .arg(blocked_key)
193        .arg("-inf")
194        .arg("+inf")
195        .arg("LIMIT")
196        .arg("0")
197        .arg(BATCH_SIZE.to_string().as_str())
198        .execute()
199        .await
200    {
201        Ok(ids) => ids,
202        Err(e) => {
203            tracing::warn!(
204                error = %e,
205                blocked_key,
206                "unblock_scanner: ZRANGEBYSCORE failed"
207            );
208            return ScanResult { processed: 0, errors: 1 };
209        }
210    };
211
212    if blocked.is_empty() {
213        return ScanResult { processed: 0, errors: 0 };
214    }
215
216    let mut processed: u32 = 0;
217    let mut errors: u32 = 0;
218    let tag = partition.hash_tag();
219
220    for eid_str in &blocked {
221        // Read blocking_reason from exec_core to confirm still blocked
222        let core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
223        let reason: Option<String> = match client
224            .cmd("HGET")
225            .arg(&core_key)
226            .arg("blocking_reason")
227            .execute()
228            .await
229        {
230            Ok(r) => r,
231            Err(e) => {
232                tracing::warn!(
233                    execution_id = eid_str.as_str(),
234                    error = %e,
235                    "unblock_scanner: HGET blocking_reason failed, skipping"
236                );
237                errors += 1;
238                continue;
239            }
240        };
241
242        let reason = reason.unwrap_or_default();
243
244        // Skip if not blocked by the expected reason (e.g. paused_by_flow_cancel)
245        if reason != expected_reason {
246            continue;
247        }
248
249        // Re-evaluate the blocking condition
250        let should_unblock = match expected_reason {
251            "waiting_for_budget" => {
252                check_budget_cleared(client, &core_key, budget_cache, partition_config).await
253            }
254            "waiting_for_quota" => {
255                check_quota_cleared(client, &core_key, eid_str, partition_config).await
256            }
257            "waiting_for_capable_worker" => {
258                check_route_cleared(client, &core_key, caps_cache).await
259            }
260            _ => false,
261        };
262
263        if !should_unblock {
264            continue;
265        }
266
267        // Unblock: FCALL ff_unblock_execution on {p:N}
268        let eligible_key = idx.lane_eligible(lane);
269        let keys: [&str; 3] = [&core_key, blocked_key, &eligible_key];
270
271        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
272            Ok(t) => t.to_string(),
273            Err(e) => {
274                tracing::warn!(
275                    execution_id = eid_str.as_str(),
276                    error = %e,
277                    "unblock_scanner: server TIME failed, skipping unblock"
278                );
279                errors += 1;
280                continue;
281            }
282        };
283        let argv: [&str; 3] = [eid_str, &now_ms, expected_reason];
284
285        match client
286            .fcall::<ferriskey::Value>("ff_unblock_execution", &keys, &argv)
287            .await
288        {
289            Ok(_) => {
290                tracing::info!(
291                    execution_id = eid_str.as_str(),
292                    reason = expected_reason,
293                    "unblock_scanner: execution unblocked"
294                );
295                processed += 1;
296            }
297            Err(e) => {
298                tracing::warn!(
299                    execution_id = eid_str.as_str(),
300                    error = %e,
301                    "unblock_scanner: ff_unblock_execution failed"
302                );
303                errors += 1;
304            }
305        }
306    }
307
308    ScanResult { processed, errors }
309}
310
311/// Check if budget blocking condition has cleared.
312/// Uses cross-partition cache to avoid redundant reads.
313async fn check_budget_cleared(
314    client: &ferriskey::Client,
315    core_key: &str,
316    cache: &mut HashMap<String, bool>,
317    config: &PartitionConfig,
318) -> bool {
319    // Read budget_ids from exec_core
320    let budget_ids_str: Option<String> = client
321        .cmd("HGET")
322        .arg(core_key)
323        .arg("budget_ids")
324        .execute()
325        .await
326        .unwrap_or(None);
327
328    let budget_ids_str = match budget_ids_str {
329        Some(s) if !s.is_empty() => s,
330        _ => return true, // no budgets → unblock
331    };
332
333    for budget_id in budget_ids_str.split(',') {
334        let budget_id = budget_id.trim();
335        if budget_id.is_empty() {
336            continue;
337        }
338
339        // Check cache first
340        if let Some(&breached) = cache.get(budget_id) {
341            if breached {
342                return false; // still breached
343            }
344            continue;
345        }
346
347        // Read from Valkey and cache
348        let breached = is_budget_breached(client, budget_id, config).await;
349        cache.insert(budget_id.to_owned(), breached);
350        if breached {
351            return false;
352        }
353    }
354
355    true // all budgets within limits
356}
357
358/// Read budget usage + limits and check if any hard limit is breached.
359/// Computes real {b:M} partition tag from budget_id.
360async fn is_budget_breached(
361    client: &ferriskey::Client,
362    budget_id: &str,
363    config: &PartitionConfig,
364) -> bool {
365    // Compute the real budget partition tag
366    let bid = match BudgetId::parse(budget_id) {
367        Ok(id) => id,
368        Err(_) => return false, // invalid budget_id → treat as not breached
369    };
370    let partition = budget_partition(&bid, config);
371    let tag = partition.hash_tag();
372    let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
373    let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
374
375    // Read hard limits — fail-closed: if Valkey read fails, treat as breached
376    // (keep execution blocked) rather than silently unblocking
377    let limits: Vec<String> = match client
378        .cmd("HGETALL")
379        .arg(&limits_key)
380        .execute()
381        .await
382    {
383        Ok(v) => v,
384        Err(e) => {
385            tracing::error!(
386                budget_id,
387                error = %e,
388                "unblock_scanner: budget limits read failed, keeping blocked (fail-closed)"
389            );
390            return true; // treat as breached
391        }
392    };
393
394    let mut i = 0;
395    while i + 1 < limits.len() {
396        let field = &limits[i];
397        let limit_str = &limits[i + 1];
398        i += 2;
399
400        if !field.starts_with("hard:") {
401            continue;
402        }
403        let dimension = &field[5..];
404        let limit: u64 = match limit_str.parse() {
405            Ok(v) if v > 0 => v,
406            _ => continue,
407        };
408
409        let usage_str: Option<String> = match client
410            .cmd("HGET")
411            .arg(&usage_key)
412            .arg(dimension)
413            .execute()
414            .await
415        {
416            Ok(v) => v,
417            Err(e) => {
418                tracing::error!(
419                    budget_id,
420                    dimension,
421                    error = %e,
422                    "unblock_scanner: budget usage read failed, keeping blocked (fail-closed)"
423                );
424                return true; // treat as breached
425            }
426        };
427        let usage: u64 = usage_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
428
429        if usage >= limit {
430            return true; // breached
431        }
432    }
433
434    false
435}
436
437/// Check if quota blocking condition has cleared.
438/// Re-checks the sliding window after cleanup.
439/// Computes real {q:K} partition tag from quota_policy_id.
440async fn check_quota_cleared(
441    client: &ferriskey::Client,
442    core_key: &str,
443    _eid_str: &str,
444    config: &PartitionConfig,
445) -> bool {
446    // Read quota_policy_id from exec_core — fail-closed on Valkey error
447    let quota_id: Option<String> = match client
448        .cmd("HGET")
449        .arg(core_key)
450        .arg("quota_policy_id")
451        .execute()
452        .await
453    {
454        Ok(v) => v,
455        Err(e) => {
456            tracing::error!(
457                core_key,
458                error = %e,
459                "unblock_scanner: quota_policy_id read failed, keeping blocked (fail-closed)"
460            );
461            return false;
462        }
463    };
464
465    let quota_id = match quota_id {
466        Some(s) if !s.is_empty() => s,
467        _ => return true, // no quota → unblock
468    };
469
470    // Compute real quota partition tag
471    let qid = match ff_core::types::QuotaPolicyId::parse(&quota_id) {
472        Ok(id) => id,
473        Err(_) => return true, // invalid → unblock (advisory)
474    };
475    let partition = ff_core::partition::quota_partition(&qid, config);
476    let tag = partition.hash_tag();
477
478    let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id);
479    let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
480    let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
481
482    // Read quota definition fields — fail-closed on Valkey error
483    let def_fields: Vec<Option<String>> = match client
484        .cmd("HMGET")
485        .arg(&quota_def_key)
486        .arg("max_requests_per_window")
487        .arg("requests_per_window_seconds")
488        .arg("active_concurrency_cap")
489        .execute()
490        .await
491    {
492        Ok(v) => v,
493        Err(e) => {
494            tracing::error!(
495                quota_id = %quota_id,
496                error = %e,
497                "unblock_scanner: quota definition read failed, keeping blocked (fail-closed)"
498            );
499            return false;
500        }
501    };
502    let rate_limit: u64 = def_fields.first()
503        .and_then(|v| v.as_ref())
504        .and_then(|s| s.parse().ok())
505        .unwrap_or(0);
506    let window_secs: u64 = def_fields.get(1)
507        .and_then(|v| v.as_ref())
508        .and_then(|s| s.parse().ok())
509        .unwrap_or(60);
510    let concurrency_cap: u64 = def_fields.get(2)
511        .and_then(|v| v.as_ref())
512        .and_then(|s| s.parse().ok())
513        .unwrap_or(0);
514
515    // Check rate: clean window, count
516    if rate_limit > 0 {
517        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
518            Ok(t) => t,
519            Err(_) => return false,
520        };
521        let window_ms = window_secs * 1000;
522        let cutoff = (now_ms.saturating_sub(window_ms)).to_string();
523
524        let _: Result<i64, _> = client
525            .cmd("ZREMRANGEBYSCORE")
526            .arg(&window_key)
527            .arg("-inf")
528            .arg(&cutoff)
529            .execute()
530            .await;
531
532        let count: i64 = client
533            .cmd("ZCARD")
534            .arg(&window_key)
535            .execute()
536            .await
537            .unwrap_or(0);
538
539        if count as u64 >= rate_limit {
540            return false; // still at limit
541        }
542    }
543
544    // Check concurrency
545    if concurrency_cap > 0 {
546        let active: i64 = client
547            .cmd("GET")
548            .arg(&concurrency_key)
549            .execute()
550            .await
551            .unwrap_or(0);
552
553        if active as u64 >= concurrency_cap {
554            return false; // still at cap
555        }
556    }
557
558    true // quota cleared
559}
560
561/// Check if the capability-block has cleared: some connected worker's
562/// caps now cover the execution's `required_capabilities`.
563///
564/// The UNION of every connected worker's caps is cached on the scanner
565/// struct with a TTL equal to `interval`; so within one scan cycle every
566/// partition reuses the same snapshot, and between cycles a stale
567/// snapshot is automatically refreshed.
568///
569/// Fail-OPEN on union-load failure: if the SSCAN or fan-out GET errors
570/// out, assume a worker might match and let the scheduler re-decide on
571/// the next tick. The caps set is non-authoritative (Lua never reads it);
572/// treating it as "unknown → maybe" preserves liveness. Fail-closed
573/// would leave executions stuck whenever the caps read hits a transient
574/// Valkey error.
575async fn check_route_cleared(
576    client: &ferriskey::Client,
577    core_key: &str,
578    caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
579) -> bool {
580    let required_csv: Option<String> = client
581        .cmd("HGET")
582        .arg(core_key)
583        .arg("required_capabilities")
584        .execute()
585        .await
586        .unwrap_or(None);
587    let required_csv = match required_csv {
588        Some(s) if !s.is_empty() => s,
589        _ => return true, // no required caps → anyone can claim
590    };
591
592    // Acquire the cache, return a cheap clone of the snapshot so the
593    // subset check runs OUTSIDE the mutex (BTreeSet clone is O(n) but
594    // n is bounded by total caps across the fleet — typically tens).
595    // Holding the mutex across the subset loop would serialize every
596    // partition's capability-block decision behind this one mutex.
597    let snapshot: BTreeSet<String> = {
598        let mut guard = caps_cache.lock().await;
599        let stale = guard
600            .fetched_at
601            .map(|t| t.elapsed() >= guard.ttl)
602            .unwrap_or(true);
603        if stale {
604            match load_worker_caps_union(client).await {
605                Ok(union) => {
606                    guard.snapshot = Some(union);
607                    guard.fetched_at = Some(Instant::now());
608                }
609                Err(e) => {
610                    tracing::warn!(
611                        error = %e,
612                        "unblock_scanner: failed to read worker caps union — \
613                         assuming match possible (fail-open to preserve liveness)"
614                    );
615                    return true;
616                }
617            }
618        }
619        guard.snapshot.clone().unwrap_or_default()
620    };
621
622    // Subset check: every non-empty token in required_csv present in union.
623    required_csv
624        .split(',')
625        .filter(|t| !t.is_empty())
626        .all(|t| snapshot.contains(t))
627}
628
629/// Union of every connected worker's advertised capabilities.
630///
631/// Cluster-safe enumeration pattern (matches Batch A's index SETs for
632/// budget/flow/deps): SSCAN the `workers_index_key()` SET (single-slot,
633/// no hash tag needed — the key name literally hashes to one slot), then
634/// fan-out concurrent `GET ff:worker:<id>:caps` with a bounded concurrency
635/// cap. A keyspace `SCAN MATCH ff:worker:*:caps` in cluster mode visits
636/// only one shard per call and silently drops workers on other shards —
637/// exactly the class of bug Batch A Issue #11 fixed.
638///
639/// SSCAN is used instead of SMEMBERS so a fleet of thousands of workers
640/// doesn't round-trip the entire member list in one reply. `COUNT = 100`
641/// matches the convention in budget_reconciler / flow_projector.
642///
643/// Empty caps STRING or missing key = "no caps for that worker"; scanner
644/// keeps accumulating. Per-worker GET error, in contrast, PROPAGATES — a
645/// previous version used `.unwrap_or(None)` which silently merged error
646/// and empty into the same branch, making a transient error look like
647/// "this worker has no caps". In a single-capable-worker fleet that
648/// produced false-negative unions and left executions blocked even
649/// though a matching worker existed, contradicting the scanner's
650/// documented fail-open behavior. Now an error bubbles up; the only
651/// caller (`check_route_cleared`) treats `Err` by returning `true`
652/// (unblock — "we don't know, let the scheduler re-decide next tick"),
653/// which preserves liveness uniformly whether the fault is SSCAN, GET,
654/// or deeper transport.
655async fn load_worker_caps_union(
656    client: &ferriskey::Client,
657) -> Result<BTreeSet<String>, ferriskey::Error> {
658    let mut union = BTreeSet::new();
659    let index_key = ff_core::keys::workers_index_key();
660
661    // Helper: drain one completed future and fold its caps into the
662    // union, or propagate its error. Centralizing keeps the in-loop +
663    // drain paths symmetric (both must behave the same — a missed error
664    // at either point re-introduces the false-negative-union bug).
665    fn absorb(
666        union: &mut BTreeSet<String>,
667        res: Result<Option<String>, ferriskey::Error>,
668    ) -> Result<(), ferriskey::Error> {
669        let csv = res?;
670        if let Some(csv) = csv {
671            for token in csv.split(',') {
672                if !token.is_empty() {
673                    union.insert(token.to_owned());
674                }
675            }
676        }
677        Ok(())
678    }
679
680    // SSCAN loop — bounded per-page response size. Cursor starts at "0"
681    // and wraps back to "0" when iteration completes.
682    let mut cursor: String = "0".to_owned();
683    loop {
684        let reply: (String, Vec<String>) = client
685            .cmd("SSCAN")
686            .arg(&index_key)
687            .arg(&cursor)
688            .arg("COUNT")
689            .arg(WORKERS_SSCAN_COUNT.to_string().as_str())
690            .execute()
691            .await?;
692        cursor = reply.0;
693        let worker_ids = reply.1;
694
695        // Bounded concurrent GETs per SSCAN page. FuturesUnordered with
696        // a buffered stream caps in-flight work at CAPS_GET_CONCURRENCY —
697        // enough parallelism to amortize round-trip latency, bounded so
698        // one scanner tick can't flood the shared Valkey client. Each
699        // spawned future returns `Result<Option<String>, Error>` so
700        // transient Valkey errors propagate up (no .unwrap_or(None)
701        // swallowing) — see the fn-level doc for why.
702        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
703        for id in worker_ids {
704            let client = client.clone();
705            pending.push(async move {
706                let caps_key = format!("ff:worker:{}:caps", id);
707                let csv: Option<String> = client
708                    .cmd("GET")
709                    .arg(&caps_key)
710                    .execute()
711                    .await?;
712                Ok::<Option<String>, ferriskey::Error>(csv)
713            });
714            if pending.len() >= CAPS_GET_CONCURRENCY
715                && let Some(res) = pending.next().await
716            {
717                absorb(&mut union, res)?;
718            }
719        }
720        // Drain remaining pending GETs from this page before advancing
721        // the SSCAN cursor. Keeps the pipeline window bounded and the
722        // union observation consistent with "all workers visible so far".
723        while let Some(res) = pending.next().await {
724            absorb(&mut union, res)?;
725        }
726
727        if cursor == "0" {
728            break;
729        }
730    }
731    Ok(union)
732}