ff_engine/scanner/unblock.rs
1//! Unblock scanner for budget/quota/capability-blocked executions.
2//!
3//! Scans `ff:idx:{p:N}:lane:<lane>:blocked:{budget,quota,route}` per
4//! execution partition. For each blocked execution, re-evaluates the
5//! blocking condition. If cleared, calls `FCALL ff_unblock_execution`.
6//!
7//! Cross-partition budget check is cached per scan cycle (MANDATORY —
8//! without it, 50K blocked executions = 50K budget reads).
9//!
10//! Capability sweep reads the union of non-authoritative worker cap sets
11//! (`ff:worker:*:caps` — written by `ff-sdk::FlowFabricWorker::connect`)
12//! ONCE per scan cycle and uses it to decide whether a `waiting_for_capable_worker`
13//! execution has a matching worker. This is best-effort: caps sets may
14//! be slightly stale (TTL-less STRING, overwrite on restart), but the
15//! promotion path is self-correcting — a promoted execution that still
16//! can't be claimed gets re-blocked on the next scheduler tick. RFC-009
17//! §7.5 documents the v1 sweep approach and defers connect-triggered
18//! sweeps to V2.
19//!
20//! MUST skip `paused_by_flow_cancel` — only cancel_flow clears that.
21//!
22//! Reference: RFC-008 §2.4, RFC-009 §7.5, RFC-010 §6
23
24use std::collections::{BTreeSet, HashMap};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use futures::stream::{FuturesUnordered, StreamExt};
29use tokio::sync::Mutex as AsyncMutex;
30
31use ff_core::keys::IndexKeys;
32use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition};
33use ff_core::types::{BudgetId, LaneId};
34
35use super::{ScanResult, Scanner};
36
37const BATCH_SIZE: u32 = 100;
38
39/// SSCAN page size for the workers-index SET. Same COUNT the other
40/// index-SET scanners use (budget_reconciler, flow_projector). Bounds
41/// per-cursor round-trip response size; total iteration is still the
42/// full SET size across cursor=0 wrap.
43const WORKERS_SSCAN_COUNT: usize = 100;
44
45/// Per-worker caps GET fan-out concurrency cap. Mirrors the bounded
46/// parallelism W1 used in initialize_waitpoint_hmac_secret. Too low and
47/// large fleets (1000+ workers) pay serial round-trip latency; too high
48/// and we head-of-line the scanner client with a pipeline burst. 16 is
49/// a pragmatic middle for the current fleet scales.
50const CAPS_GET_CONCURRENCY: usize = 16;
51
52pub struct UnblockScanner {
53 interval: Duration,
54 lanes: Vec<LaneId>,
55 partition_config: PartitionConfig,
56 /// Shared worker-caps union cache across ALL partitions in one scan
57 /// pass. Previously this cache was declared inside `scan_partition`
58 /// which runs once PER PARTITION — at 256 partitions that meant up
59 /// to 256 redundant SSCAN + fan-out GET sequences per scan interval.
60 /// Now: one load per TTL window (= `interval`), shared across every
61 /// partition visited in that window. `TTL == interval` is natural:
62 /// a worker connecting "now" propagates into the caps union on the
63 /// next scan cycle, not faster or slower than the cycle itself.
64 ///
65 /// `Arc<AsyncMutex<_>>` because the Scanner trait's `scan_partition`
66 /// takes `&self`. Contention is bounded by the partition iteration
67 /// cadence (one partition at a time per scanner task), so the mutex
68 /// is effectively uncontended in steady state.
69 caps_cache: Arc<AsyncMutex<CapsUnionCache>>,
70}
71
72/// Worker-caps union snapshot with a monotonic freshness timestamp.
73/// `None` on first scan; filled by `get_or_load`. Invalidated when
74/// `Instant::now() - fetched_at >= ttl`.
75struct CapsUnionCache {
76 snapshot: Option<BTreeSet<String>>,
77 fetched_at: Option<Instant>,
78 ttl: Duration,
79}
80
81impl UnblockScanner {
82 pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
83 Self {
84 interval,
85 lanes,
86 partition_config,
87 caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
88 snapshot: None,
89 fetched_at: None,
90 ttl: interval,
91 })),
92 }
93 }
94}
95
96impl Scanner for UnblockScanner {
97 fn name(&self) -> &'static str {
98 "unblock"
99 }
100
101 fn interval(&self) -> Duration {
102 self.interval
103 }
104
105 async fn scan_partition(
106 &self,
107 client: &ferriskey::Client,
108 partition: u16,
109 ) -> ScanResult {
110 let p = Partition {
111 family: PartitionFamily::Execution,
112 index: partition,
113 };
114 let idx = IndexKeys::new(&p);
115
116 let mut total_processed: u32 = 0;
117 let mut total_errors: u32 = 0;
118
119 // Cross-partition budget cache: budget_id → is_breached.
120 // Reset per partition scan (each partition scan is one "cycle").
121 let mut budget_cache: HashMap<String, bool> = HashMap::new();
122
123 // Worker-caps union cache is shared across ALL partitions via
124 // the scanner struct (Arc<AsyncMutex<CapsUnionCache>>). `get_or_load`
125 // returns the cached snapshot if its fetched_at is within
126 // `interval` (TTL == scan interval), otherwise loads fresh via
127 // SSCAN + concurrent GET fan-out. Without this, at 256 partitions
128 // the old per-partition-local cache re-ran load_worker_caps_union
129 // up to 256× per cycle.
130 let caps_cache = self.caps_cache.clone();
131
132 for lane in &self.lanes {
133 // Scan blocked:budget
134 let budget_key = idx.lane_blocked_budget(lane);
135 let r = scan_blocked_set(
136 client, &p, &idx, lane, &budget_key,
137 "waiting_for_budget", &mut budget_cache,
138 &caps_cache,
139 &self.partition_config,
140 ).await;
141 total_processed += r.processed;
142 total_errors += r.errors;
143
144 // Scan blocked:quota
145 let quota_key = idx.lane_blocked_quota(lane);
146 let r = scan_blocked_set(
147 client, &p, &idx, lane, "a_key,
148 "waiting_for_quota", &mut budget_cache,
149 &caps_cache,
150 &self.partition_config,
151 ).await;
152 total_processed += r.processed;
153 total_errors += r.errors;
154
155 // Scan blocked:route (capability-mismatch blocks). Promotion
156 // decision reads the union of connected workers' caps and
157 // checks subset coverage. See check_route_cleared below.
158 let route_key = idx.lane_blocked_route(lane);
159 let r = scan_blocked_set(
160 client, &p, &idx, lane, &route_key,
161 "waiting_for_capable_worker", &mut budget_cache,
162 &caps_cache,
163 &self.partition_config,
164 ).await;
165 total_processed += r.processed;
166 total_errors += r.errors;
167 }
168
169 ScanResult {
170 processed: total_processed,
171 errors: total_errors,
172 }
173 }
174}
175
176/// Scan one blocked set and unblock executions whose condition has cleared.
177#[allow(clippy::too_many_arguments)]
178async fn scan_blocked_set(
179 client: &ferriskey::Client,
180 partition: &Partition,
181 idx: &IndexKeys,
182 lane: &LaneId,
183 blocked_key: &str,
184 expected_reason: &str,
185 budget_cache: &mut HashMap<String, bool>,
186 caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
187 partition_config: &PartitionConfig,
188) -> ScanResult {
189 // Read all members from the blocked set (they're scored by block time)
190 let blocked: Vec<String> = match client
191 .cmd("ZRANGEBYSCORE")
192 .arg(blocked_key)
193 .arg("-inf")
194 .arg("+inf")
195 .arg("LIMIT")
196 .arg("0")
197 .arg(BATCH_SIZE.to_string().as_str())
198 .execute()
199 .await
200 {
201 Ok(ids) => ids,
202 Err(e) => {
203 tracing::warn!(
204 error = %e,
205 blocked_key,
206 "unblock_scanner: ZRANGEBYSCORE failed"
207 );
208 return ScanResult { processed: 0, errors: 1 };
209 }
210 };
211
212 if blocked.is_empty() {
213 return ScanResult { processed: 0, errors: 0 };
214 }
215
216 let mut processed: u32 = 0;
217 let mut errors: u32 = 0;
218 let tag = partition.hash_tag();
219
220 for eid_str in &blocked {
221 // Read blocking_reason from exec_core to confirm still blocked
222 let core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
223 let reason: Option<String> = match client
224 .cmd("HGET")
225 .arg(&core_key)
226 .arg("blocking_reason")
227 .execute()
228 .await
229 {
230 Ok(r) => r,
231 Err(e) => {
232 tracing::warn!(
233 execution_id = eid_str.as_str(),
234 error = %e,
235 "unblock_scanner: HGET blocking_reason failed, skipping"
236 );
237 errors += 1;
238 continue;
239 }
240 };
241
242 let reason = reason.unwrap_or_default();
243
244 // Skip if not blocked by the expected reason (e.g. paused_by_flow_cancel)
245 if reason != expected_reason {
246 continue;
247 }
248
249 // Re-evaluate the blocking condition
250 let should_unblock = match expected_reason {
251 "waiting_for_budget" => {
252 check_budget_cleared(client, &core_key, budget_cache, partition_config).await
253 }
254 "waiting_for_quota" => {
255 check_quota_cleared(client, &core_key, eid_str, partition_config).await
256 }
257 "waiting_for_capable_worker" => {
258 check_route_cleared(client, &core_key, caps_cache).await
259 }
260 _ => false,
261 };
262
263 if !should_unblock {
264 continue;
265 }
266
267 // Unblock: FCALL ff_unblock_execution on {p:N}
268 let eligible_key = idx.lane_eligible(lane);
269 let keys: [&str; 3] = [&core_key, blocked_key, &eligible_key];
270
271 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
272 Ok(t) => t.to_string(),
273 Err(e) => {
274 tracing::warn!(
275 execution_id = eid_str.as_str(),
276 error = %e,
277 "unblock_scanner: server TIME failed, skipping unblock"
278 );
279 errors += 1;
280 continue;
281 }
282 };
283 let argv: [&str; 3] = [eid_str, &now_ms, expected_reason];
284
285 match client
286 .fcall::<ferriskey::Value>("ff_unblock_execution", &keys, &argv)
287 .await
288 {
289 Ok(_) => {
290 tracing::info!(
291 execution_id = eid_str.as_str(),
292 reason = expected_reason,
293 "unblock_scanner: execution unblocked"
294 );
295 processed += 1;
296 }
297 Err(e) => {
298 tracing::warn!(
299 execution_id = eid_str.as_str(),
300 error = %e,
301 "unblock_scanner: ff_unblock_execution failed"
302 );
303 errors += 1;
304 }
305 }
306 }
307
308 ScanResult { processed, errors }
309}
310
311/// Check if budget blocking condition has cleared.
312/// Uses cross-partition cache to avoid redundant reads.
313async fn check_budget_cleared(
314 client: &ferriskey::Client,
315 core_key: &str,
316 cache: &mut HashMap<String, bool>,
317 config: &PartitionConfig,
318) -> bool {
319 // Read budget_ids from exec_core
320 let budget_ids_str: Option<String> = client
321 .cmd("HGET")
322 .arg(core_key)
323 .arg("budget_ids")
324 .execute()
325 .await
326 .unwrap_or(None);
327
328 let budget_ids_str = match budget_ids_str {
329 Some(s) if !s.is_empty() => s,
330 _ => return true, // no budgets → unblock
331 };
332
333 for budget_id in budget_ids_str.split(',') {
334 let budget_id = budget_id.trim();
335 if budget_id.is_empty() {
336 continue;
337 }
338
339 // Check cache first
340 if let Some(&breached) = cache.get(budget_id) {
341 if breached {
342 return false; // still breached
343 }
344 continue;
345 }
346
347 // Read from Valkey and cache
348 let breached = is_budget_breached(client, budget_id, config).await;
349 cache.insert(budget_id.to_owned(), breached);
350 if breached {
351 return false;
352 }
353 }
354
355 true // all budgets within limits
356}
357
358/// Read budget usage + limits and check if any hard limit is breached.
359/// Computes real {b:M} partition tag from budget_id.
360async fn is_budget_breached(
361 client: &ferriskey::Client,
362 budget_id: &str,
363 config: &PartitionConfig,
364) -> bool {
365 // Compute the real budget partition tag
366 let bid = match BudgetId::parse(budget_id) {
367 Ok(id) => id,
368 Err(_) => return false, // invalid budget_id → treat as not breached
369 };
370 let partition = budget_partition(&bid, config);
371 let tag = partition.hash_tag();
372 let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
373 let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
374
375 // Read hard limits — fail-closed: if Valkey read fails, treat as breached
376 // (keep execution blocked) rather than silently unblocking
377 let limits: Vec<String> = match client
378 .cmd("HGETALL")
379 .arg(&limits_key)
380 .execute()
381 .await
382 {
383 Ok(v) => v,
384 Err(e) => {
385 tracing::error!(
386 budget_id,
387 error = %e,
388 "unblock_scanner: budget limits read failed, keeping blocked (fail-closed)"
389 );
390 return true; // treat as breached
391 }
392 };
393
394 let mut i = 0;
395 while i + 1 < limits.len() {
396 let field = &limits[i];
397 let limit_str = &limits[i + 1];
398 i += 2;
399
400 if !field.starts_with("hard:") {
401 continue;
402 }
403 let dimension = &field[5..];
404 let limit: u64 = match limit_str.parse() {
405 Ok(v) if v > 0 => v,
406 _ => continue,
407 };
408
409 let usage_str: Option<String> = match client
410 .cmd("HGET")
411 .arg(&usage_key)
412 .arg(dimension)
413 .execute()
414 .await
415 {
416 Ok(v) => v,
417 Err(e) => {
418 tracing::error!(
419 budget_id,
420 dimension,
421 error = %e,
422 "unblock_scanner: budget usage read failed, keeping blocked (fail-closed)"
423 );
424 return true; // treat as breached
425 }
426 };
427 let usage: u64 = usage_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
428
429 if usage >= limit {
430 return true; // breached
431 }
432 }
433
434 false
435}
436
437/// Check if quota blocking condition has cleared.
438/// Re-checks the sliding window after cleanup.
439/// Computes real {q:K} partition tag from quota_policy_id.
440async fn check_quota_cleared(
441 client: &ferriskey::Client,
442 core_key: &str,
443 _eid_str: &str,
444 config: &PartitionConfig,
445) -> bool {
446 // Read quota_policy_id from exec_core — fail-closed on Valkey error
447 let quota_id: Option<String> = match client
448 .cmd("HGET")
449 .arg(core_key)
450 .arg("quota_policy_id")
451 .execute()
452 .await
453 {
454 Ok(v) => v,
455 Err(e) => {
456 tracing::error!(
457 core_key,
458 error = %e,
459 "unblock_scanner: quota_policy_id read failed, keeping blocked (fail-closed)"
460 );
461 return false;
462 }
463 };
464
465 let quota_id = match quota_id {
466 Some(s) if !s.is_empty() => s,
467 _ => return true, // no quota → unblock
468 };
469
470 // Compute real quota partition tag
471 let qid = match ff_core::types::QuotaPolicyId::parse("a_id) {
472 Ok(id) => id,
473 Err(_) => return true, // invalid → unblock (advisory)
474 };
475 let partition = ff_core::partition::quota_partition(&qid, config);
476 let tag = partition.hash_tag();
477
478 let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id);
479 let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
480 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
481
482 // Read quota definition fields — fail-closed on Valkey error
483 let def_fields: Vec<Option<String>> = match client
484 .cmd("HMGET")
485 .arg("a_def_key)
486 .arg("max_requests_per_window")
487 .arg("requests_per_window_seconds")
488 .arg("active_concurrency_cap")
489 .execute()
490 .await
491 {
492 Ok(v) => v,
493 Err(e) => {
494 tracing::error!(
495 quota_id = %quota_id,
496 error = %e,
497 "unblock_scanner: quota definition read failed, keeping blocked (fail-closed)"
498 );
499 return false;
500 }
501 };
502 let rate_limit: u64 = def_fields.first()
503 .and_then(|v| v.as_ref())
504 .and_then(|s| s.parse().ok())
505 .unwrap_or(0);
506 let window_secs: u64 = def_fields.get(1)
507 .and_then(|v| v.as_ref())
508 .and_then(|s| s.parse().ok())
509 .unwrap_or(60);
510 let concurrency_cap: u64 = def_fields.get(2)
511 .and_then(|v| v.as_ref())
512 .and_then(|s| s.parse().ok())
513 .unwrap_or(0);
514
515 // Check rate: clean window, count
516 if rate_limit > 0 {
517 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
518 Ok(t) => t,
519 Err(_) => return false,
520 };
521 let window_ms = window_secs * 1000;
522 let cutoff = (now_ms.saturating_sub(window_ms)).to_string();
523
524 let _: Result<i64, _> = client
525 .cmd("ZREMRANGEBYSCORE")
526 .arg(&window_key)
527 .arg("-inf")
528 .arg(&cutoff)
529 .execute()
530 .await;
531
532 let count: i64 = client
533 .cmd("ZCARD")
534 .arg(&window_key)
535 .execute()
536 .await
537 .unwrap_or(0);
538
539 if count as u64 >= rate_limit {
540 return false; // still at limit
541 }
542 }
543
544 // Check concurrency
545 if concurrency_cap > 0 {
546 let active: i64 = client
547 .cmd("GET")
548 .arg(&concurrency_key)
549 .execute()
550 .await
551 .unwrap_or(0);
552
553 if active as u64 >= concurrency_cap {
554 return false; // still at cap
555 }
556 }
557
558 true // quota cleared
559}
560
561/// Check if the capability-block has cleared: some connected worker's
562/// caps now cover the execution's `required_capabilities`.
563///
564/// The UNION of every connected worker's caps is cached on the scanner
565/// struct with a TTL equal to `interval`; so within one scan cycle every
566/// partition reuses the same snapshot, and between cycles a stale
567/// snapshot is automatically refreshed.
568///
569/// Fail-OPEN on union-load failure: if the SSCAN or fan-out GET errors
570/// out, assume a worker might match and let the scheduler re-decide on
571/// the next tick. The caps set is non-authoritative (Lua never reads it);
572/// treating it as "unknown → maybe" preserves liveness. Fail-closed
573/// would leave executions stuck whenever the caps read hits a transient
574/// Valkey error.
575async fn check_route_cleared(
576 client: &ferriskey::Client,
577 core_key: &str,
578 caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
579) -> bool {
580 let required_csv: Option<String> = client
581 .cmd("HGET")
582 .arg(core_key)
583 .arg("required_capabilities")
584 .execute()
585 .await
586 .unwrap_or(None);
587 let required_csv = match required_csv {
588 Some(s) if !s.is_empty() => s,
589 _ => return true, // no required caps → anyone can claim
590 };
591
592 // Acquire the cache, return a cheap clone of the snapshot so the
593 // subset check runs OUTSIDE the mutex (BTreeSet clone is O(n) but
594 // n is bounded by total caps across the fleet — typically tens).
595 // Holding the mutex across the subset loop would serialize every
596 // partition's capability-block decision behind this one mutex.
597 let snapshot: BTreeSet<String> = {
598 let mut guard = caps_cache.lock().await;
599 let stale = guard
600 .fetched_at
601 .map(|t| t.elapsed() >= guard.ttl)
602 .unwrap_or(true);
603 if stale {
604 match load_worker_caps_union(client).await {
605 Ok(union) => {
606 guard.snapshot = Some(union);
607 guard.fetched_at = Some(Instant::now());
608 }
609 Err(e) => {
610 tracing::warn!(
611 error = %e,
612 "unblock_scanner: failed to read worker caps union — \
613 assuming match possible (fail-open to preserve liveness)"
614 );
615 return true;
616 }
617 }
618 }
619 guard.snapshot.clone().unwrap_or_default()
620 };
621
622 // Subset check: every non-empty token in required_csv present in union.
623 required_csv
624 .split(',')
625 .filter(|t| !t.is_empty())
626 .all(|t| snapshot.contains(t))
627}
628
629/// Union of every connected worker's advertised capabilities.
630///
631/// Cluster-safe enumeration pattern (matches Batch A's index SETs for
632/// budget/flow/deps): SSCAN the `workers_index_key()` SET (single-slot,
633/// no hash tag needed — the key name literally hashes to one slot), then
634/// fan-out concurrent `GET ff:worker:<id>:caps` with a bounded concurrency
635/// cap. A keyspace `SCAN MATCH ff:worker:*:caps` in cluster mode visits
636/// only one shard per call and silently drops workers on other shards —
637/// exactly the class of bug Batch A Issue #11 fixed.
638///
639/// SSCAN is used instead of SMEMBERS so a fleet of thousands of workers
640/// doesn't round-trip the entire member list in one reply. `COUNT = 100`
641/// matches the convention in budget_reconciler / flow_projector.
642///
643/// Empty caps STRING or missing key = "no caps for that worker"; scanner
644/// keeps accumulating. Per-worker GET error, in contrast, PROPAGATES — a
645/// previous version used `.unwrap_or(None)` which silently merged error
646/// and empty into the same branch, making a transient error look like
647/// "this worker has no caps". In a single-capable-worker fleet that
648/// produced false-negative unions and left executions blocked even
649/// though a matching worker existed, contradicting the scanner's
650/// documented fail-open behavior. Now an error bubbles up; the only
651/// caller (`check_route_cleared`) treats `Err` by returning `true`
652/// (unblock — "we don't know, let the scheduler re-decide next tick"),
653/// which preserves liveness uniformly whether the fault is SSCAN, GET,
654/// or deeper transport.
655async fn load_worker_caps_union(
656 client: &ferriskey::Client,
657) -> Result<BTreeSet<String>, ferriskey::Error> {
658 let mut union = BTreeSet::new();
659 let index_key = ff_core::keys::workers_index_key();
660
661 // Helper: drain one completed future and fold its caps into the
662 // union, or propagate its error. Centralizing keeps the in-loop +
663 // drain paths symmetric (both must behave the same — a missed error
664 // at either point re-introduces the false-negative-union bug).
665 fn absorb(
666 union: &mut BTreeSet<String>,
667 res: Result<Option<String>, ferriskey::Error>,
668 ) -> Result<(), ferriskey::Error> {
669 let csv = res?;
670 if let Some(csv) = csv {
671 for token in csv.split(',') {
672 if !token.is_empty() {
673 union.insert(token.to_owned());
674 }
675 }
676 }
677 Ok(())
678 }
679
680 // SSCAN loop — bounded per-page response size. Cursor starts at "0"
681 // and wraps back to "0" when iteration completes.
682 let mut cursor: String = "0".to_owned();
683 loop {
684 let reply: (String, Vec<String>) = client
685 .cmd("SSCAN")
686 .arg(&index_key)
687 .arg(&cursor)
688 .arg("COUNT")
689 .arg(WORKERS_SSCAN_COUNT.to_string().as_str())
690 .execute()
691 .await?;
692 cursor = reply.0;
693 let worker_ids = reply.1;
694
695 // Bounded concurrent GETs per SSCAN page. FuturesUnordered with
696 // a buffered stream caps in-flight work at CAPS_GET_CONCURRENCY —
697 // enough parallelism to amortize round-trip latency, bounded so
698 // one scanner tick can't flood the shared Valkey client. Each
699 // spawned future returns `Result<Option<String>, Error>` so
700 // transient Valkey errors propagate up (no .unwrap_or(None)
701 // swallowing) — see the fn-level doc for why.
702 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
703 for id in worker_ids {
704 let client = client.clone();
705 pending.push(async move {
706 let caps_key = format!("ff:worker:{}:caps", id);
707 let csv: Option<String> = client
708 .cmd("GET")
709 .arg(&caps_key)
710 .execute()
711 .await?;
712 Ok::<Option<String>, ferriskey::Error>(csv)
713 });
714 if pending.len() >= CAPS_GET_CONCURRENCY
715 && let Some(res) = pending.next().await
716 {
717 absorb(&mut union, res)?;
718 }
719 }
720 // Drain remaining pending GETs from this page before advancing
721 // the SSCAN cursor. Keeps the pipeline window bounded and the
722 // union observation consistent with "all workers visible so far".
723 while let Some(res) = pending.next().await {
724 absorb(&mut union, res)?;
725 }
726
727 if cursor == "0" {
728 break;
729 }
730 }
731 Ok(union)
732}