1use std::collections::{BTreeSet, HashMap};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use futures::stream::{FuturesUnordered, StreamExt};
29use tokio::sync::Mutex as AsyncMutex;
30
31use ff_core::backend::ScannerFilter;
32use ff_core::keys::IndexKeys;
33use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition};
34use ff_core::types::{BudgetId, LaneId};
35
36use super::{should_skip_candidate, ScanResult, Scanner};
37
38const BATCH_SIZE: u32 = 100;
39
40const WORKERS_SSCAN_COUNT: usize = 100;
45
46const CAPS_GET_CONCURRENCY: usize = 16;
52
53pub struct UnblockScanner {
54 interval: Duration,
55 lanes: Vec<LaneId>,
56 partition_config: PartitionConfig,
57 filter: ScannerFilter,
58 caps_cache: Arc<AsyncMutex<CapsUnionCache>>,
72}
73
74struct CapsUnionCache {
78 snapshot: Option<BTreeSet<String>>,
79 fetched_at: Option<Instant>,
80 ttl: Duration,
81}
82
83impl UnblockScanner {
84 pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
85 Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
86 }
87
88 pub fn with_filter(
91 interval: Duration,
92 lanes: Vec<LaneId>,
93 partition_config: PartitionConfig,
94 filter: ScannerFilter,
95 ) -> Self {
96 Self {
97 interval,
98 lanes,
99 partition_config,
100 filter,
101 caps_cache: Arc::new(AsyncMutex::new(CapsUnionCache {
102 snapshot: None,
103 fetched_at: None,
104 ttl: interval,
105 })),
106 }
107 }
108}
109
110impl Scanner for UnblockScanner {
111 fn name(&self) -> &'static str {
112 "unblock"
113 }
114
115 fn interval(&self) -> Duration {
116 self.interval
117 }
118
119 fn filter(&self) -> &ScannerFilter {
120 &self.filter
121 }
122
123 async fn scan_partition(
124 &self,
125 client: &ferriskey::Client,
126 partition: u16,
127 ) -> ScanResult {
128 let p = Partition {
129 family: PartitionFamily::Execution,
130 index: partition,
131 };
132 let idx = IndexKeys::new(&p);
133
134 let mut total_processed: u32 = 0;
135 let mut total_errors: u32 = 0;
136
137 let mut budget_cache: HashMap<String, bool> = HashMap::new();
140
141 let caps_cache = self.caps_cache.clone();
149
150 for lane in &self.lanes {
151 let budget_key = idx.lane_blocked_budget(lane);
153 let r = scan_blocked_set(
154 client, &p, &idx, lane, &budget_key,
155 "waiting_for_budget", &mut budget_cache,
156 &caps_cache,
157 &self.partition_config,
158 &self.filter,
159 ).await;
160 total_processed += r.processed;
161 total_errors += r.errors;
162
163 let quota_key = idx.lane_blocked_quota(lane);
165 let r = scan_blocked_set(
166 client, &p, &idx, lane, "a_key,
167 "waiting_for_quota", &mut budget_cache,
168 &caps_cache,
169 &self.partition_config,
170 &self.filter,
171 ).await;
172 total_processed += r.processed;
173 total_errors += r.errors;
174
175 let route_key = idx.lane_blocked_route(lane);
179 let r = scan_blocked_set(
180 client, &p, &idx, lane, &route_key,
181 "waiting_for_capable_worker", &mut budget_cache,
182 &caps_cache,
183 &self.partition_config,
184 &self.filter,
185 ).await;
186 total_processed += r.processed;
187 total_errors += r.errors;
188 }
189
190 ScanResult {
191 processed: total_processed,
192 errors: total_errors,
193 }
194 }
195}
196
197#[allow(clippy::too_many_arguments)]
199async fn scan_blocked_set(
200 client: &ferriskey::Client,
201 partition: &Partition,
202 idx: &IndexKeys,
203 lane: &LaneId,
204 blocked_key: &str,
205 expected_reason: &str,
206 budget_cache: &mut HashMap<String, bool>,
207 caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
208 partition_config: &PartitionConfig,
209 filter: &ScannerFilter,
210) -> ScanResult {
211 let blocked: Vec<String> = match client
213 .cmd("ZRANGEBYSCORE")
214 .arg(blocked_key)
215 .arg("-inf")
216 .arg("+inf")
217 .arg("LIMIT")
218 .arg("0")
219 .arg(BATCH_SIZE.to_string().as_str())
220 .execute()
221 .await
222 {
223 Ok(ids) => ids,
224 Err(e) => {
225 tracing::warn!(
226 error = %e,
227 blocked_key,
228 "unblock_scanner: ZRANGEBYSCORE failed"
229 );
230 return ScanResult { processed: 0, errors: 1 };
231 }
232 };
233
234 if blocked.is_empty() {
235 return ScanResult { processed: 0, errors: 0 };
236 }
237
238 let mut processed: u32 = 0;
239 let mut errors: u32 = 0;
240 let tag = partition.hash_tag();
241
242 for eid_str in &blocked {
243 if should_skip_candidate(client, filter, partition.index, eid_str).await {
244 continue;
245 }
246 let core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
248 let reason: Option<String> = match client
249 .cmd("HGET")
250 .arg(&core_key)
251 .arg("blocking_reason")
252 .execute()
253 .await
254 {
255 Ok(r) => r,
256 Err(e) => {
257 tracing::warn!(
258 execution_id = eid_str.as_str(),
259 error = %e,
260 "unblock_scanner: HGET blocking_reason failed, skipping"
261 );
262 errors += 1;
263 continue;
264 }
265 };
266
267 let reason = reason.unwrap_or_default();
268
269 if reason != expected_reason {
271 continue;
272 }
273
274 let should_unblock = match expected_reason {
276 "waiting_for_budget" => {
277 check_budget_cleared(client, &core_key, budget_cache, partition_config).await
278 }
279 "waiting_for_quota" => {
280 check_quota_cleared(client, &core_key, eid_str, partition_config).await
281 }
282 "waiting_for_capable_worker" => {
283 check_route_cleared(client, &core_key, caps_cache).await
284 }
285 _ => false,
286 };
287
288 if !should_unblock {
289 continue;
290 }
291
292 let eligible_key = idx.lane_eligible(lane);
294 let keys: [&str; 3] = [&core_key, blocked_key, &eligible_key];
295
296 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
297 Ok(t) => t.to_string(),
298 Err(e) => {
299 tracing::warn!(
300 execution_id = eid_str.as_str(),
301 error = %e,
302 "unblock_scanner: server TIME failed, skipping unblock"
303 );
304 errors += 1;
305 continue;
306 }
307 };
308 let argv: [&str; 3] = [eid_str, &now_ms, expected_reason];
309
310 match client
311 .fcall::<ferriskey::Value>("ff_unblock_execution", &keys, &argv)
312 .await
313 {
314 Ok(_) => {
315 tracing::info!(
316 execution_id = eid_str.as_str(),
317 reason = expected_reason,
318 "unblock_scanner: execution unblocked"
319 );
320 processed += 1;
321 }
322 Err(e) => {
323 tracing::warn!(
324 execution_id = eid_str.as_str(),
325 error = %e,
326 "unblock_scanner: ff_unblock_execution failed"
327 );
328 errors += 1;
329 }
330 }
331 }
332
333 ScanResult { processed, errors }
334}
335
336async fn check_budget_cleared(
339 client: &ferriskey::Client,
340 core_key: &str,
341 cache: &mut HashMap<String, bool>,
342 config: &PartitionConfig,
343) -> bool {
344 let budget_ids_str: Option<String> = client
346 .cmd("HGET")
347 .arg(core_key)
348 .arg("budget_ids")
349 .execute()
350 .await
351 .unwrap_or(None);
352
353 let budget_ids_str = match budget_ids_str {
354 Some(s) if !s.is_empty() => s,
355 _ => return true, };
357
358 for budget_id in budget_ids_str.split(',') {
359 let budget_id = budget_id.trim();
360 if budget_id.is_empty() {
361 continue;
362 }
363
364 if let Some(&breached) = cache.get(budget_id) {
366 if breached {
367 return false; }
369 continue;
370 }
371
372 let breached = is_budget_breached(client, budget_id, config).await;
374 cache.insert(budget_id.to_owned(), breached);
375 if breached {
376 return false;
377 }
378 }
379
380 true }
382
383async fn is_budget_breached(
386 client: &ferriskey::Client,
387 budget_id: &str,
388 config: &PartitionConfig,
389) -> bool {
390 let bid = match BudgetId::parse(budget_id) {
392 Ok(id) => id,
393 Err(_) => return false, };
395 let partition = budget_partition(&bid, config);
396 let tag = partition.hash_tag();
397 let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
398 let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
399
400 let limits: Vec<String> = match client
403 .cmd("HGETALL")
404 .arg(&limits_key)
405 .execute()
406 .await
407 {
408 Ok(v) => v,
409 Err(e) => {
410 tracing::error!(
411 budget_id,
412 error = %e,
413 "unblock_scanner: budget limits read failed, keeping blocked (fail-closed)"
414 );
415 return true; }
417 };
418
419 let mut i = 0;
420 while i + 1 < limits.len() {
421 let field = &limits[i];
422 let limit_str = &limits[i + 1];
423 i += 2;
424
425 if !field.starts_with("hard:") {
426 continue;
427 }
428 let dimension = &field[5..];
429 let limit: u64 = match limit_str.parse() {
430 Ok(v) if v > 0 => v,
431 _ => continue,
432 };
433
434 let usage_str: Option<String> = match client
435 .cmd("HGET")
436 .arg(&usage_key)
437 .arg(dimension)
438 .execute()
439 .await
440 {
441 Ok(v) => v,
442 Err(e) => {
443 tracing::error!(
444 budget_id,
445 dimension,
446 error = %e,
447 "unblock_scanner: budget usage read failed, keeping blocked (fail-closed)"
448 );
449 return true; }
451 };
452 let usage: u64 = usage_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
453
454 if usage >= limit {
455 return true; }
457 }
458
459 false
460}
461
462async fn check_quota_cleared(
466 client: &ferriskey::Client,
467 core_key: &str,
468 _eid_str: &str,
469 config: &PartitionConfig,
470) -> bool {
471 let quota_id: Option<String> = match client
473 .cmd("HGET")
474 .arg(core_key)
475 .arg("quota_policy_id")
476 .execute()
477 .await
478 {
479 Ok(v) => v,
480 Err(e) => {
481 tracing::error!(
482 core_key,
483 error = %e,
484 "unblock_scanner: quota_policy_id read failed, keeping blocked (fail-closed)"
485 );
486 return false;
487 }
488 };
489
490 let quota_id = match quota_id {
491 Some(s) if !s.is_empty() => s,
492 _ => return true, };
494
495 let qid = match ff_core::types::QuotaPolicyId::parse("a_id) {
497 Ok(id) => id,
498 Err(_) => return true, };
500 let partition = ff_core::partition::quota_partition(&qid, config);
501 let tag = partition.hash_tag();
502
503 let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id);
504 let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
505 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
506
507 let def_fields: Vec<Option<String>> = match client
509 .cmd("HMGET")
510 .arg("a_def_key)
511 .arg("max_requests_per_window")
512 .arg("requests_per_window_seconds")
513 .arg("active_concurrency_cap")
514 .execute()
515 .await
516 {
517 Ok(v) => v,
518 Err(e) => {
519 tracing::error!(
520 quota_id = %quota_id,
521 error = %e,
522 "unblock_scanner: quota definition read failed, keeping blocked (fail-closed)"
523 );
524 return false;
525 }
526 };
527 let rate_limit: u64 = def_fields.first()
528 .and_then(|v| v.as_ref())
529 .and_then(|s| s.parse().ok())
530 .unwrap_or(0);
531 let window_secs: u64 = def_fields.get(1)
532 .and_then(|v| v.as_ref())
533 .and_then(|s| s.parse().ok())
534 .unwrap_or(60);
535 let concurrency_cap: u64 = def_fields.get(2)
536 .and_then(|v| v.as_ref())
537 .and_then(|s| s.parse().ok())
538 .unwrap_or(0);
539
540 if rate_limit > 0 {
542 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
543 Ok(t) => t,
544 Err(_) => return false,
545 };
546 let window_ms = window_secs * 1000;
547 let cutoff = (now_ms.saturating_sub(window_ms)).to_string();
548
549 let _: Result<i64, _> = client
550 .cmd("ZREMRANGEBYSCORE")
551 .arg(&window_key)
552 .arg("-inf")
553 .arg(&cutoff)
554 .execute()
555 .await;
556
557 let count: i64 = client
558 .cmd("ZCARD")
559 .arg(&window_key)
560 .execute()
561 .await
562 .unwrap_or(0);
563
564 if count as u64 >= rate_limit {
565 return false; }
567 }
568
569 if concurrency_cap > 0 {
571 let active: i64 = client
572 .cmd("GET")
573 .arg(&concurrency_key)
574 .execute()
575 .await
576 .unwrap_or(0);
577
578 if active as u64 >= concurrency_cap {
579 return false; }
581 }
582
583 true }
585
586async fn check_route_cleared(
601 client: &ferriskey::Client,
602 core_key: &str,
603 caps_cache: &Arc<AsyncMutex<CapsUnionCache>>,
604) -> bool {
605 let required_csv: Option<String> = client
606 .cmd("HGET")
607 .arg(core_key)
608 .arg("required_capabilities")
609 .execute()
610 .await
611 .unwrap_or(None);
612 let required_csv = match required_csv {
613 Some(s) if !s.is_empty() => s,
614 _ => return true, };
616
617 let snapshot: BTreeSet<String> = {
623 let mut guard = caps_cache.lock().await;
624 let stale = guard
625 .fetched_at
626 .map(|t| t.elapsed() >= guard.ttl)
627 .unwrap_or(true);
628 if stale {
629 match load_worker_caps_union(client).await {
630 Ok(union) => {
631 guard.snapshot = Some(union);
632 guard.fetched_at = Some(Instant::now());
633 }
634 Err(e) => {
635 tracing::warn!(
636 error = %e,
637 "unblock_scanner: failed to read worker caps union — \
638 assuming match possible (fail-open to preserve liveness)"
639 );
640 return true;
641 }
642 }
643 }
644 guard.snapshot.clone().unwrap_or_default()
645 };
646
647 required_csv
649 .split(',')
650 .filter(|t| !t.is_empty())
651 .all(|t| snapshot.contains(t))
652}
653
654async fn load_worker_caps_union(
681 client: &ferriskey::Client,
682) -> Result<BTreeSet<String>, ferriskey::Error> {
683 let mut union = BTreeSet::new();
684 let index_key = ff_core::keys::workers_index_key();
685
686 fn absorb(
691 union: &mut BTreeSet<String>,
692 res: Result<Option<String>, ferriskey::Error>,
693 ) -> Result<(), ferriskey::Error> {
694 let csv = res?;
695 if let Some(csv) = csv {
696 for token in csv.split(',') {
697 if !token.is_empty() {
698 union.insert(token.to_owned());
699 }
700 }
701 }
702 Ok(())
703 }
704
705 let mut cursor: String = "0".to_owned();
708 loop {
709 let reply: (String, Vec<String>) = client
710 .cmd("SSCAN")
711 .arg(&index_key)
712 .arg(&cursor)
713 .arg("COUNT")
714 .arg(WORKERS_SSCAN_COUNT.to_string().as_str())
715 .execute()
716 .await?;
717 cursor = reply.0;
718 let worker_ids = reply.1;
719
720 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
728 for id in worker_ids {
729 let client = client.clone();
730 pending.push(async move {
731 let caps_key = format!("ff:worker:{}:caps", id);
732 let csv: Option<String> = client
733 .cmd("GET")
734 .arg(&caps_key)
735 .execute()
736 .await?;
737 Ok::<Option<String>, ferriskey::Error>(csv)
738 });
739 if pending.len() >= CAPS_GET_CONCURRENCY
740 && let Some(res) = pending.next().await
741 {
742 absorb(&mut union, res)?;
743 }
744 }
745 while let Some(res) = pending.next().await {
749 absorb(&mut union, res)?;
750 }
751
752 if cursor == "0" {
753 break;
754 }
755 }
756 Ok(union)
757}