ff_scheduler/claim.rs
1//! Claim-grant cycle: find eligible executions and issue grants.
2//!
3//! The scheduler selects candidates from partition-local eligible sorted sets,
4//! then atomically issues a claim grant via `FCALL ff_issue_claim_grant`.
5//! The worker (ff-sdk) subsequently consumes the grant via `ff_claim_execution`.
6//!
7//! Phase 5: single lane, budget/quota pre-checks before grant issuance.
8//! Candidates that fail budget/quota are blocked via ff_block_execution_for_admission.
9//!
10//! Reference: RFC-009 §12.7, RFC-010 §3.1, RFC-008 §1.7
11
12use ff_core::keys::{ExecKeyContext, IndexKeys};
13use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition, quota_partition};
14use ff_core::types::{BudgetId, ExecutionId, LaneId, QuotaPolicyId, WorkerId, WorkerInstanceId};
15use ff_script::error::ScriptError;
16use ff_script::result::FcallResult;
17use ff_script::retry::is_retryable_kind;
18use std::collections::BTreeSet;
19
20/// Short stable digest of a worker's capability CSV. Used in per-mismatch
21/// log lines so a worker-caps-CSV up to 4KB does not get echoed on every
22/// capability_mismatch event (would swamp aggregators during an incident).
23/// The full CSV is logged once at WARN when the worker connects; operators
24/// cross-reference this 8-hex prefix to the full set.
25/// Return true iff every non-empty comma-separated token in `required_csv`
26/// is present in `worker_caps`. Mirrors the authoritative Lua subset check
27/// in scheduling.lua (parse_capability_csv + missing_capabilities) — this
28/// is a fast-path Rust short-circuit so we can skip the quota-admission
29/// write for executions we already know the worker can't claim. Empty /
30/// all-separator CSV → subset holds trivially.
31fn caps_subset(required_csv: &str, worker_caps: &BTreeSet<String>) -> bool {
32 required_csv
33 .split(',')
34 .filter(|t| !t.is_empty())
35 .all(|t| worker_caps.contains(t))
36}
37
38/// Short, stable digest of a worker's caps CSV for per-event log lines.
39/// Thin wrapper around the shared helper so call sites read as
40/// "worker_caps_digest" locally while the algorithm lives in one place.
41fn worker_caps_digest(csv: &str) -> String {
42 ff_core::hash::fnv1a_xor8hex(csv)
43}
44
45/// A claim grant issued by the scheduler for a specific execution.
46///
47/// Re-exported from [`ff_core::contracts::ClaimGrant`]. Lives in
48/// `ff-core` so `ff-scheduler` (issuer) and `ff-sdk` (consumer)
49/// share one wire-level type without a cross-dep between them.
50pub use ff_core::contracts::ClaimGrant;
51
52/// A reclaim grant for a resumed (attempt_interrupted) execution.
53///
54/// Re-export of [`ff_core::contracts::ReclaimGrant`] for symmetry
55/// with [`ClaimGrant`]. `ff-scheduler` will be the canonical
56/// producer once the Batch-C reclaim scanner lands; today only
57/// test fixtures construct this type. Consumed by
58/// `FlowFabricWorker::claim_from_reclaim_grant`.
59pub use ff_core::contracts::ReclaimGrant;
60
61/// Budget check result from a cross-partition budget read.
62#[derive(Debug)]
63pub enum BudgetCheckResult {
64 /// Budget is within limits — proceed.
65 Ok,
66 /// Budget hard limit breached. Contains (dimension, detail_string).
67 HardBreach { dimension: String, detail: String },
68}
69
70/// Outcome of a quota admission check.
71enum QuotaCheckOutcome {
72 /// No quota attached to this execution.
73 NoQuota,
74 /// Quota admitted — carries context for release on subsequent failure.
75 Admitted { tag: String, quota_id: String, eid: String },
76 /// Quota denied — execution should be blocked.
77 Blocked(String),
78}
79
80/// Cross-partition budget checker with per-cycle caching.
81///
82/// Reads budget usage/limits once per scan cycle (not per candidate).
83/// This is MANDATORY for performance — without it, 50K blocked executions
84/// would produce 50K budget reads per cycle.
85pub struct BudgetChecker {
86 /// Cached budget status: budget_id → BudgetCheckResult.
87 /// Reset at the start of each scheduler cycle.
88 cache: std::collections::HashMap<String, BudgetCheckResult>,
89 config: PartitionConfig,
90}
91
92impl BudgetChecker {
93 pub fn new(config: PartitionConfig) -> Self {
94 Self {
95 cache: std::collections::HashMap::new(),
96 config,
97 }
98 }
99
100 /// Check a budget by ID. Reads from Valkey on first call per budget,
101 /// caches for subsequent candidates in the same cycle.
102 pub async fn check_budget(
103 &mut self,
104 client: &ferriskey::Client,
105 budget_id: &str,
106 ) -> &BudgetCheckResult {
107 if self.cache.contains_key(budget_id) {
108 return &self.cache[budget_id];
109 }
110
111 // Compute real {b:M} partition tag from budget_id
112 let (usage_key, limits_key) = match BudgetId::parse(budget_id) {
113 Ok(bid) => {
114 let partition = budget_partition(&bid, &self.config);
115 let tag = partition.hash_tag();
116 (
117 format!("ff:budget:{}:{}:usage", tag, budget_id),
118 format!("ff:budget:{}:{}:limits", tag, budget_id),
119 )
120 }
121 Err(_) => {
122 // Fallback for non-UUID budget IDs (test compat)
123 (
124 format!("ff:budget:{{b:0}}:{}:usage", budget_id),
125 format!("ff:budget:{{b:0}}:{}:limits", budget_id),
126 )
127 }
128 };
129
130 let result = match Self::read_and_check(client, &usage_key, &limits_key).await {
131 Ok(r) => r,
132 Err(e) => {
133 tracing::warn!(
134 budget_id,
135 error = %e,
136 "budget_checker: failed to read budget, allowing (advisory)"
137 );
138 BudgetCheckResult::Ok
139 }
140 };
141
142 self.cache.insert(budget_id.to_owned(), result);
143 &self.cache[budget_id]
144 }
145
146 /// Read budget usage and limits, compare each dimension.
147 async fn read_and_check(
148 client: &ferriskey::Client,
149 usage_key: &str,
150 limits_key: &str,
151 ) -> Result<BudgetCheckResult, ferriskey::Error> {
152 // Read all limit dimensions via hgetall (returns HashMap, not flat pairs)
153 let limits: std::collections::HashMap<String, String> = client
154 .hgetall(limits_key)
155 .await?;
156
157 // Parse hard limits
158 for (field, limit_val) in &limits {
159 if !field.starts_with("hard:") {
160 continue;
161 }
162 let dimension = &field[5..]; // strip "hard:" prefix
163 let limit: u64 = match limit_val.parse() {
164 Ok(v) if v > 0 => v,
165 _ => continue,
166 };
167
168 // Read current usage for this dimension
169 let usage_str: Option<String> = client
170 .cmd("HGET")
171 .arg(usage_key)
172 .arg(dimension)
173 .execute()
174 .await
175 .unwrap_or(None);
176 let usage: u64 = usage_str
177 .as_deref()
178 .and_then(|s| s.parse().ok())
179 .unwrap_or(0);
180
181 if usage >= limit {
182 return Ok(BudgetCheckResult::HardBreach {
183 dimension: dimension.to_owned(),
184 detail: format!("budget {}: {} {}/{}", usage_key, dimension, usage, limit),
185 });
186 }
187 }
188
189 Ok(BudgetCheckResult::Ok)
190 }
191
192 /// Clear the cache at the start of a new scheduler cycle.
193 pub fn reset(&mut self) {
194 self.cache.clear();
195 }
196}
197
198/// Single-lane scheduler with budget/quota pre-checks.
199///
200/// Iterates execution partitions sequentially, picks the first eligible
201/// execution (lowest priority score). Before issuing a claim grant:
202/// 1. Check all attached budgets (cross-partition, cached per cycle)
203/// 2. Check quota admission (cross-partition FCALL)
204/// 3. If any check fails: block the candidate and try next
205/// 4. If all pass: issue the claim grant
206pub struct Scheduler {
207 client: ferriskey::Client,
208 config: PartitionConfig,
209}
210
211impl Scheduler {
212 pub fn new(client: ferriskey::Client, config: PartitionConfig) -> Self {
213 Self { client, config }
214 }
215
216 /// Find an eligible execution and issue a claim grant.
217 ///
218 /// Iterates all execution partitions looking for the first partition
219 /// with an eligible execution. Issues a claim grant via FCALL.
220 ///
221 /// `worker_capabilities` is sent to `ff_issue_claim_grant` as a sorted
222 /// CSV (BTreeSet guarantees deterministic order). Executions whose
223 /// `required_capabilities` are not a subset of this set are skipped
224 /// (stay queued) via `ScriptError::CapabilityMismatch`.
225 ///
226 /// Returns `Ok(None)` if no eligible executions exist anywhere.
227 /// Returns `Ok(Some(grant))` on success.
228 /// Returns `Err` on Valkey errors.
229 pub async fn claim_for_worker(
230 &self,
231 lane: &LaneId,
232 worker_id: &WorkerId,
233 worker_instance_id: &WorkerInstanceId,
234 worker_capabilities: &BTreeSet<String>,
235 grant_ttl_ms: u64,
236 ) -> Result<Option<ClaimGrant>, SchedulerError> {
237 let num_partitions = self.config.num_flow_partitions;
238 let mut budget_checker = BudgetChecker::new(self.config);
239
240 // Jitter the partition scan start to avoid thundering-herd on
241 // partition 0 when 100 workers all tick simultaneously. Seeded
242 // from worker_instance_id so this worker hits a stable window
243 // within a single scheduling cycle (still covers every partition),
244 // and different workers naturally diverge. Uses the shared
245 // ff_core::hash FNV-1a reducer — same helper powering ff-sdk's
246 // PARTITION_SCAN_CHUNK cursor seed. Zero-modulus safe.
247 let start_p: u16 = ff_core::hash::fnv1a_u16_mod(
248 worker_instance_id.as_str(),
249 num_partitions,
250 );
251 // BTreeSet iterates sorted → stable CSV for Lua subset match.
252 // Ingress validation mirrors FlowFabricWorker::connect (ff-sdk):
253 // - `,` is the CSV delimiter — a token containing one would split
254 // mid-parse and could let a {"gpu"} worker appear to satisfy
255 // {"gpu,cuda"} (silent auth bypass).
256 // - Empty strings would inflate the CSV with leading / adjacent
257 // commas ("" → ",gpu" → [" "," gpu"]) and inflate the token
258 // count past CAPS_MAX_TOKENS for no semantic reason.
259 // - Non-printable / whitespace: "gpu " vs "gpu" or "gpu\n" vs
260 // "gpu" silently mis-routes. Require printable ASCII excluding
261 // space (0x21-0x7E) at ingress so typos fail loud.
262 // Enforce ALL here so operator misconfig at the scheduler entry
263 // point fails loud, symmetric with the SDK inline-claim path.
264 // Bounds (#csv, #tokens) are enforced by the Lua side.
265 for cap in worker_capabilities {
266 if cap.is_empty() {
267 return Err(SchedulerError::Config(
268 "capability token must not be empty".to_owned(),
269 ));
270 }
271 if cap.contains(',') {
272 return Err(SchedulerError::Config(format!(
273 "capability token may not contain ',' (CSV delimiter): {cap:?}"
274 )));
275 }
276 // Reject ASCII control + whitespace (incl. Unicode whitespace);
277 // allow non-ASCII printable UTF-8 so i18n cap names work. CSV
278 // delimiter `,` is single-byte and never a UTF-8 continuation,
279 // so multibyte UTF-8 is safe on the wire. Symmetric with
280 // ff-sdk::FlowFabricWorker::connect.
281 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
282 return Err(SchedulerError::Config(format!(
283 "capability token must not contain whitespace or control characters: {cap:?}"
284 )));
285 }
286 }
287 if worker_capabilities.len() > ff_core::policy::CAPS_MAX_TOKENS {
288 return Err(SchedulerError::Config(format!(
289 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
290 ff_core::policy::CAPS_MAX_TOKENS,
291 worker_capabilities.len()
292 )));
293 }
294 let worker_caps_csv = worker_capabilities
295 .iter()
296 .filter(|s| !s.is_empty())
297 .cloned()
298 .collect::<Vec<_>>()
299 .join(",");
300 // Stable digest used in per-mismatch logs so the full 4KB CSV
301 // doesn't get echoed on every mismatch. See worker_caps_digest.
302 let worker_caps_hash = worker_caps_digest(&worker_caps_csv);
303 if worker_caps_csv.len() > ff_core::policy::CAPS_MAX_BYTES {
304 return Err(SchedulerError::Config(format!(
305 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
306 ff_core::policy::CAPS_MAX_BYTES,
307 worker_caps_csv.len()
308 )));
309 }
310
311 for offset in 0..num_partitions {
312 // Jittered iteration: start at start_p, wrap modulo num_partitions.
313 // Still covers every partition once per cycle; prevents all
314 // workers from hammering partition 0 first simultaneously.
315 let p_idx = (start_p + offset) % num_partitions;
316 let partition = Partition {
317 family: PartitionFamily::Execution,
318 index: p_idx,
319 };
320 let idx = IndexKeys::new(&partition);
321 let eligible_key = idx.lane_eligible(lane);
322
323 // ZRANGEBYSCORE eligible -inf +inf LIMIT 0 1
324 // Lowest score = highest priority candidate
325 let candidates: Vec<String> = match self
326 .client
327 .cmd("ZRANGEBYSCORE")
328 .arg(&eligible_key)
329 .arg("-inf")
330 .arg("+inf")
331 .arg("LIMIT")
332 .arg("0")
333 .arg("1")
334 .execute()
335 .await
336 {
337 Ok(ids) => ids,
338 Err(e) => {
339 tracing::warn!(
340 partition = p_idx,
341 error = %e,
342 "scheduler: ZRANGEBYSCORE eligible failed, skipping partition"
343 );
344 continue;
345 }
346 };
347
348 let eid_str = match candidates.first() {
349 Some(s) => s,
350 None => continue, // no eligible in this partition
351 };
352
353 // Parse the execution ID
354 let eid = match ExecutionId::parse(eid_str) {
355 Ok(id) => id,
356 Err(e) => {
357 tracing::warn!(
358 partition = p_idx,
359 execution_id = eid_str.as_str(),
360 error = %e,
361 "scheduler: invalid execution_id in eligible set, skipping"
362 );
363 continue;
364 }
365 };
366
367 let exec_ctx = ExecKeyContext::new(&partition, &eid);
368 let core_key = exec_ctx.core();
369 let eid_s = eid.to_string();
370 let now_ms = match server_time_ms(&self.client).await {
371 Ok(t) => t,
372 Err(e) => {
373 tracing::warn!(
374 partition = p_idx,
375 error = %e,
376 "scheduler: failed to get server time, skipping partition"
377 );
378 continue;
379 }
380 };
381
382 // ── Capability pre-check (in-slot HGET, cheap) ──
383 // Runs BEFORE quota admission so we never ZADD a quota slot
384 // for an execution this worker can't actually claim. Without
385 // this, on an unmatchable-top-of-zset, every scheduling tick
386 // would ZADD {q:K}:admitted_set then ZREM it via
387 // release_admission on the capability_mismatch reject — a
388 // cross-slot write storm amplifying the mismatch loop.
389 //
390 // Lua `ff_issue_claim_grant` still does the authoritative
391 // check; this is a fast-path short-circuit, not a substitute.
392 // A narrow race exists where `required_capabilities` is
393 // updated between our HGET and the FCALL — the FCALL is still
394 // atomic and correct.
395 let required_caps_csv: Option<String> = match self
396 .client
397 .cmd("HGET")
398 .arg(&core_key)
399 .arg("required_capabilities")
400 .execute::<Option<String>>()
401 .await
402 {
403 Ok(v) => v,
404 Err(e) => {
405 tracing::warn!(
406 partition = p_idx,
407 execution_id = eid_s.as_str(),
408 error = %e,
409 "scheduler: HGET required_capabilities failed, skipping candidate"
410 );
411 continue;
412 }
413 };
414 if let Some(req) = required_caps_csv.as_deref()
415 && !req.is_empty()
416 && !caps_subset(req, worker_capabilities)
417 {
418 // Move this execution out of eligible into blocked_route
419 // so we don't hot-loop on it every tick (RFC-009 §564). A
420 // periodic sweep (scanner side) promotes blocked_route →
421 // eligible when a worker with matching caps registers.
422 // Logged with a hash digest, not the raw CSV, to keep
423 // per-mismatch log volume bounded.
424 tracing::info!(
425 partition = p_idx,
426 execution_id = eid_s.as_str(),
427 worker_id = worker_id.as_str(),
428 worker_caps_hash = worker_caps_hash.as_str(),
429 required = req,
430 "scheduler: capability mismatch, blocking execution off eligible"
431 );
432 self.block_candidate(
433 &partition, &idx, lane, &eid, &eligible_key,
434 "waiting_for_capable_worker",
435 "no connected worker satisfies required_capabilities",
436 now_ms,
437 ).await;
438 continue;
439 }
440
441 // ── Budget pre-check (cross-partition, cached per cycle) ──
442 if let Some(block_detail) = self
443 .check_budgets(&mut budget_checker, &exec_ctx, &core_key, &eid_s)
444 .await?
445 {
446 // Budget breached — block candidate and try next
447 self.block_candidate(
448 &partition, &idx, lane, &eid, &eligible_key,
449 "waiting_for_budget", &block_detail, now_ms,
450 ).await;
451 continue;
452 }
453
454 // ── Quota pre-check (cross-partition FCALL on {q:K}) ──
455 let quota_admission = self
456 .check_quota(&exec_ctx, &core_key, &eid_s, now_ms)
457 .await?;
458 match "a_admission {
459 QuotaCheckOutcome::Blocked(block_detail) => {
460 self.block_candidate(
461 &partition, &idx, lane, &eid, &eligible_key,
462 "waiting_for_quota", block_detail, now_ms,
463 ).await;
464 continue;
465 }
466 QuotaCheckOutcome::NoQuota | QuotaCheckOutcome::Admitted { .. } => {}
467 }
468
469 // ── All checks passed — issue claim grant ──
470 let grant_key = exec_ctx.claim_grant();
471 let keys: [&str; 3] = [&core_key, &grant_key, &eligible_key];
472
473 let ttl_str = grant_ttl_ms.to_string();
474 let wid_s = worker_id.to_string();
475 let wiid_s = worker_instance_id.to_string();
476 let lane_s = lane.to_string();
477
478 let argv: [&str; 9] = [
479 &eid_s,
480 &wid_s,
481 &wiid_s,
482 &lane_s,
483 "", // capability_hash
484 &ttl_str,
485 "", // route_snapshot_json
486 "", // admission_summary
487 &worker_caps_csv, // sorted CSV; empty → matches only empty-required execs
488 ];
489
490 let raw = match self
491 .client
492 .fcall::<ferriskey::Value>("ff_issue_claim_grant", &keys, &argv)
493 .await
494 {
495 Ok(v) => v,
496 Err(e) => {
497 // Transport failure on the FCALL — NOSCRIPT, IoError,
498 // ClusterDown, etc. This is NOT a normal soft-reject;
499 // persistent transport errors mean the scheduler is
500 // effectively idle even though it looks like it's
501 // running. WARN so ops dashboards (WARN+ aggregators)
502 // fire instead of burying it at DEBUG.
503 tracing::warn!(
504 partition = p_idx,
505 execution_id = eid_s.as_str(),
506 error = %e,
507 "scheduler: ff_issue_claim_grant transport error, trying next"
508 );
509 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
510 self.release_admission(tag, quota_id, eid).await;
511 }
512 continue;
513 }
514 };
515
516 match FcallResult::parse(&raw).and_then(|r| r.into_success()) {
517 Ok(_) => {
518 tracing::debug!(
519 partition = p_idx,
520 execution_id = eid_s.as_str(),
521 "scheduler: claim grant issued"
522 );
523 return Ok(Some(ClaimGrant {
524 execution_id: eid,
525 partition,
526 grant_key: grant_key.clone(),
527 expires_at_ms: now_ms + grant_ttl_ms,
528 }));
529 }
530 Err(script_err) => {
531 if matches!(script_err, ScriptError::CapabilityMismatch(_)) {
532 // Should be rare: the Rust pre-check above
533 // normally catches this and blocks the execution
534 // off eligible. Reaching here means the
535 // required_capabilities field mutated between our
536 // HGET and the Lua atomic check (narrow race).
537 // Block here too so the next tick doesn't loop.
538 //
539 // Log uses worker_caps_hash (8-hex digest), not
540 // the full 4KB CSV, to keep per-mismatch log
541 // volume bounded. Full CSV is logged once at
542 // worker connect under "worker caps" WARN.
543 tracing::info!(
544 partition = p_idx,
545 execution_id = eid_s.as_str(),
546 worker_id = wid_s.as_str(),
547 worker_caps_hash = worker_caps_hash.as_str(),
548 error = %script_err,
549 "scheduler: capability mismatch via Lua (race), blocking execution"
550 );
551 self.block_candidate(
552 &partition, &idx, lane, &eid, &eligible_key,
553 "waiting_for_capable_worker",
554 "no connected worker satisfies required_capabilities",
555 now_ms,
556 ).await;
557 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
558 self.release_admission(tag, quota_id, eid).await;
559 }
560 continue;
561 } else {
562 // Any other logical reject (grant_already_exists,
563 // execution_not_in_eligible_set, execution_not_eligible,
564 // invalid_capabilities, etc.). These are rare and each
565 // indicates either a race or a config problem — in
566 // either case ops need to see it, so WARN, not DEBUG.
567 tracing::warn!(
568 partition = p_idx,
569 execution_id = eid_s.as_str(),
570 error = %script_err,
571 "scheduler: ff_issue_claim_grant rejected, trying next"
572 );
573 }
574 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
575 self.release_admission(tag, quota_id, eid).await;
576 }
577 continue;
578 }
579 }
580 }
581
582 Ok(None)
583 }
584
585 /// Read budget_ids from exec_core and check each. Returns block detail
586 /// string if any budget is breached, None if all pass.
587 async fn check_budgets(
588 &self,
589 checker: &mut BudgetChecker,
590 _exec_ctx: &ExecKeyContext,
591 core_key: &str,
592 _eid_s: &str,
593 ) -> Result<Option<String>, SchedulerError> {
594 // Read budget_ids from exec_core (comma-separated or JSON list)
595 let budget_ids_str: Option<String> = self
596 .client
597 .cmd("HGET")
598 .arg(core_key)
599 .arg("budget_ids")
600 .execute()
601 .await?;
602
603 let budget_ids_str = match budget_ids_str {
604 Some(s) => s,
605 None => return Ok(None),
606 };
607 if budget_ids_str.is_empty() {
608 return Ok(None); // no budgets attached
609 }
610
611 // Parse comma-separated budget IDs
612 for budget_id in budget_ids_str.split(',') {
613 let budget_id = budget_id.trim();
614 if budget_id.is_empty() {
615 continue;
616 }
617 let result = checker.check_budget(&self.client, budget_id).await;
618 if let BudgetCheckResult::HardBreach { detail, .. } = result {
619 return Ok(Some(detail.clone()));
620 }
621 }
622
623 Ok(None)
624 }
625
626 /// Check quota admission for the candidate.
627 async fn check_quota(
628 &self,
629 _exec_ctx: &ExecKeyContext,
630 core_key: &str,
631 eid_s: &str,
632 now_ms: u64,
633 ) -> Result<QuotaCheckOutcome, SchedulerError> {
634 // Read quota_policy_id from exec_core
635 let quota_id_str: Option<String> = self
636 .client
637 .cmd("HGET")
638 .arg(core_key)
639 .arg("quota_policy_id")
640 .execute()
641 .await?;
642
643 let quota_id_str = match quota_id_str {
644 Some(s) => s,
645 None => return Ok(QuotaCheckOutcome::NoQuota),
646 };
647 if quota_id_str.is_empty() {
648 return Ok(QuotaCheckOutcome::NoQuota);
649 }
650
651 // Compute real {q:K} partition tag from quota_policy_id
652 let tag = match QuotaPolicyId::parse("a_id_str) {
653 Ok(qid) => {
654 let partition = quota_partition(&qid, &self.config);
655 partition.hash_tag()
656 }
657 Err(_) => "{q:0}".to_owned(), // fallback for non-UUID test IDs
658 };
659
660 let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id_str);
661 let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id_str);
662 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id_str);
663 let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id_str, eid_s);
664 let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id_str);
665
666 // Read quota limits from policy hash
667 let rate_limit: Option<String> = self.client
668 .cmd("HGET").arg("a_def_key).arg("max_requests_per_window")
669 .execute().await?;
670 let window_secs: Option<String> = self.client
671 .cmd("HGET").arg("a_def_key).arg("requests_per_window_seconds")
672 .execute().await?;
673 let concurrency_cap: Option<String> = self.client
674 .cmd("HGET").arg("a_def_key).arg("active_concurrency_cap")
675 .execute().await?;
676 let jitter: Option<String> = self.client
677 .cmd("HGET").arg("a_def_key).arg("jitter_ms")
678 .execute().await?;
679
680 let rate_limit = rate_limit.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
681 let window_secs = window_secs.as_deref().and_then(|s| s.parse().ok()).unwrap_or(60u64);
682 let concurrency_cap = concurrency_cap.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
683 let jitter_ms = jitter.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
684
685 // No limits configured — admit without recording
686 if rate_limit == 0 && concurrency_cap == 0 {
687 return Ok(QuotaCheckOutcome::NoQuota);
688 }
689
690 // FCALL ff_check_admission_and_record on {q:K}
691 let keys: [&str; 5] = [&window_key, &concurrency_key, "a_def_key, &admitted_key, &admitted_set_key];
692 let now_s = now_ms.to_string();
693 let ws = window_secs.to_string();
694 let rl = rate_limit.to_string();
695 let cc = concurrency_cap.to_string();
696 let jt = jitter_ms.to_string();
697 let argv: [&str; 6] = [&now_s, &ws, &rl, &cc, eid_s, &jt];
698
699 match self.client
700 .fcall::<ferriskey::Value>("ff_check_admission_and_record", &keys, &argv)
701 .await
702 {
703 Ok(result) => {
704 // Parse domain-specific result: {"ADMITTED"}, {"RATE_EXCEEDED", retry_after},
705 // {"CONCURRENCY_EXCEEDED"}, {"ALREADY_ADMITTED"}
706 let status = Self::parse_admission_status(&result);
707 match status.as_str() {
708 "ADMITTED" | "ALREADY_ADMITTED" => Ok(QuotaCheckOutcome::Admitted {
709 tag: tag.clone(),
710 quota_id: quota_id_str.clone(),
711 eid: eid_s.to_owned(),
712 }),
713 "RATE_EXCEEDED" => Ok(QuotaCheckOutcome::Blocked(format!(
714 "quota {}: rate limit {}/{} per {}s window",
715 quota_id_str, rate_limit, rate_limit, window_secs
716 ))),
717 "CONCURRENCY_EXCEEDED" => Ok(QuotaCheckOutcome::Blocked(format!(
718 "quota {}: concurrency cap {}",
719 quota_id_str, concurrency_cap
720 ))),
721 _ => {
722 tracing::warn!(
723 quota_id = quota_id_str.as_str(),
724 status = status.as_str(),
725 "scheduler: unexpected admission result"
726 );
727 Ok(QuotaCheckOutcome::NoQuota)
728 }
729 }
730 }
731 Err(e) => {
732 tracing::warn!(
733 quota_id = quota_id_str.as_str(),
734 error = %e,
735 "scheduler: quota FCALL failed, allowing (advisory)"
736 );
737 Ok(QuotaCheckOutcome::NoQuota) // allow on FCALL error (advisory)
738 }
739 }
740 }
741
742 /// Parse the first element of a Valkey array result as a status string.
743 fn parse_admission_status(result: &ferriskey::Value) -> String {
744 match result {
745 ferriskey::Value::Array(arr) => {
746 match arr.first() {
747 Some(Ok(ferriskey::Value::BulkString(b))) => {
748 String::from_utf8_lossy(b).into_owned()
749 }
750 Some(Ok(ferriskey::Value::SimpleString(s))) => s.clone(),
751 _ => "UNKNOWN".to_owned(),
752 }
753 }
754 _ => "UNKNOWN".to_owned(),
755 }
756 }
757
758 /// Block a candidate that failed budget/quota check.
759 /// FCALL ff_block_execution_for_admission on {p:N}.
760 #[allow(clippy::too_many_arguments)]
761 async fn block_candidate(
762 &self,
763 partition: &Partition,
764 idx: &IndexKeys,
765 lane: &LaneId,
766 eid: &ExecutionId,
767 eligible_key: &str,
768 block_reason: &str,
769 blocking_detail: &str,
770 now_ms: u64,
771 ) {
772 let exec_ctx = ExecKeyContext::new(partition, eid);
773 let core_key = exec_ctx.core();
774 let eid_s = eid.to_string();
775 let blocked_key = match block_reason {
776 "waiting_for_budget" => idx.lane_blocked_budget(lane),
777 "waiting_for_quota" => idx.lane_blocked_quota(lane),
778 "waiting_for_capable_worker" => idx.lane_blocked_route(lane),
779 _ => idx.lane_blocked_budget(lane),
780 };
781
782 let keys: [&str; 3] = [&core_key, eligible_key, &blocked_key];
783 let now_s = now_ms.to_string();
784 let argv: [&str; 4] = [&eid_s, block_reason, blocking_detail, &now_s];
785
786 // Parse FcallResult so we distinguish Lua-level rejections (e.g.
787 // execution_not_active because the execution went terminal between
788 // our HGET and the FCALL) from a real block. Previously `Ok(_)`
789 // treated an err-tuple as success → INFO log "candidate blocked"
790 // while nothing actually changed on exec_core, then the next tick
791 // re-picked the same candidate and looped. Mirrors the
792 // release_admission parse fix.
793 match self.client
794 .fcall::<ferriskey::Value>("ff_block_execution_for_admission", &keys, &argv)
795 .await
796 {
797 Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
798 Ok(_) => {
799 tracing::info!(
800 execution_id = eid_s,
801 reason = block_reason,
802 "scheduler: candidate blocked by admission check"
803 );
804 }
805 Err(script_err) => {
806 // Logical reject from Lua (e.g. execution_not_active
807 // — the execution went terminal between the scheduler
808 // pick and the block FCALL; the candidate loop will
809 // naturally move on). WARN so ops dashboards surface
810 // actual block failures, but not so loud that a common
811 // race spams alerts.
812 tracing::warn!(
813 execution_id = eid_s,
814 reason = block_reason,
815 error = %script_err,
816 "scheduler: ff_block_execution_for_admission rejected by Lua"
817 );
818 }
819 },
820 Err(e) => {
821 tracing::warn!(
822 execution_id = eid_s,
823 error = %e,
824 "scheduler: ff_block_execution_for_admission transport failed"
825 );
826 }
827 }
828 }
829
830 /// Release a previously-recorded quota admission slot.
831 /// Called when ff_issue_claim_grant fails after admission was recorded.
832 async fn release_admission(
833 &self,
834 tag: &str,
835 quota_id: &str,
836 eid_s: &str,
837 ) {
838 let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid_s);
839 let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
840 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
841
842 let keys: [&str; 3] = [&admitted_key, &admitted_set_key, &concurrency_key];
843 let argv: [&str; 1] = [eid_s];
844
845 // Parse the Lua response properly: FCALL returns `Ok(Value)` for
846 // BOTH success and logical-error paths. Treating Ok(_) blindly as
847 // "released" logs a false positive when the Lua returns
848 // `{0, "quota_not_found"}` (or any other script-level err) — the
849 // slot in fact remains pinned until its TTL expires, which is
850 // minutes to hours. Surface the real outcome so on-call sees
851 // actual release failures instead of clean "released" events.
852 match self.client
853 .fcall::<ferriskey::Value>("ff_release_admission", &keys, &argv)
854 .await
855 {
856 Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
857 Ok(_) => {
858 tracing::info!(
859 execution_id = eid_s,
860 quota_id,
861 "scheduler: released admission after claim failure"
862 );
863 }
864 Err(script_err) => {
865 tracing::warn!(
866 execution_id = eid_s,
867 quota_id,
868 error = %script_err,
869 "scheduler: ff_release_admission rejected by Lua \
870 (slot will expire via TTL)"
871 );
872 }
873 },
874 Err(e) => {
875 tracing::warn!(
876 execution_id = eid_s,
877 quota_id,
878 error = %e,
879 "scheduler: ff_release_admission transport failed \
880 (slot will expire via TTL)"
881 );
882 }
883 }
884 }
885}
886
887/// Get server time in milliseconds via the TIME command.
888async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
889 let result: Vec<String> = client
890 .cmd("TIME")
891 .execute()
892 .await?;
893 if result.len() < 2 {
894 return Err(ferriskey::Error::from((
895 ferriskey::ErrorKind::ClientError,
896 "TIME returned fewer than 2 elements",
897 )));
898 }
899 let secs: u64 = result[0].parse().map_err(|_| {
900 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
901 })?;
902 let micros: u64 = result[1].parse().map_err(|_| {
903 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
904 })?;
905 Ok(secs * 1000 + micros / 1000)
906}
907
908/// Errors from the scheduler.
909#[derive(Debug, thiserror::Error)]
910pub enum SchedulerError {
911 /// Valkey connection or command error (preserves ErrorKind for caller inspection).
912 #[error("valkey: {0}")]
913 Valkey(#[from] ferriskey::Error),
914 /// Valkey error with additional context (preserves ErrorKind via #[source]).
915 #[error("valkey ({context}): {source}")]
916 ValkeyContext {
917 #[source]
918 source: ferriskey::Error,
919 context: String,
920 },
921 /// Caller-supplied value failed ingress validation. NOT retryable — the
922 /// caller must fix its input before retrying.
923 #[error("config: {0}")]
924 Config(String),
925}
926
927impl SchedulerError {
928 /// Returns the underlying ferriskey ErrorKind, if this is a Valkey error.
929 /// Matches `ServerError::valkey_kind` and `ScriptError::valkey_kind` so
930 /// callers can treat all three uniformly.
931 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
932 match self {
933 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
934 Self::Config(_) => None,
935 }
936 }
937
938 /// Whether this error is safely retryable by a caller. Mirrors
939 /// `ServerError::is_retryable` semantics.
940 pub fn is_retryable(&self) -> bool {
941 self.valkey_kind()
942 .map(is_retryable_kind)
943 .unwrap_or(false)
944 }
945}
946
947#[cfg(test)]
948mod tests {
949 use super::*;
950 use ferriskey::ErrorKind;
951
952 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
953 ferriskey::Error::from((kind, "synthetic"))
954 }
955
956 #[test]
957 fn scheduler_is_retryable_matches_kind_table() {
958 assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
959 assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
960
961 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
962 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
963 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
964 }
965
966 #[test]
967 fn scheduler_valkey_context_is_retryable() {
968 let err = SchedulerError::ValkeyContext {
969 source: mk_fk_err(ErrorKind::BusyLoadingError),
970 context: "HGET budget_ids".into(),
971 };
972 assert!(err.is_retryable());
973 }
974
975 #[test]
976 fn scheduler_valkey_kind_exposed() {
977 let err = SchedulerError::Valkey(mk_fk_err(ErrorKind::TryAgain));
978 assert_eq!(err.valkey_kind(), Some(ErrorKind::TryAgain));
979 }
980}
981
982
983