Skip to main content

ff_scheduler/
claim.rs

1//! Claim-grant cycle: find eligible executions and issue grants.
2//!
3//! The scheduler selects candidates from partition-local eligible sorted sets,
4//! then atomically issues a claim grant via `FCALL ff_issue_claim_grant`.
5//! The worker (ff-sdk) subsequently consumes the grant via `ff_claim_execution`.
6//!
7//! Phase 5: single lane, budget/quota pre-checks before grant issuance.
8//! Candidates that fail budget/quota are blocked via ff_block_execution_for_admission.
9//!
10//! Reference: RFC-009 §12.7, RFC-010 §3.1, RFC-008 §1.7
11
12use ff_core::keys::{ExecKeyContext, IndexKeys};
13use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition, quota_partition};
14use ff_core::types::{BudgetId, ExecutionId, LaneId, QuotaPolicyId, WorkerId, WorkerInstanceId};
15use ff_script::error::ScriptError;
16use ff_script::result::FcallResult;
17use ff_script::retry::is_retryable_kind;
18use std::collections::BTreeSet;
19
20/// Short stable digest of a worker's capability CSV. Used in per-mismatch
21/// log lines so a worker-caps-CSV up to 4KB does not get echoed on every
22/// capability_mismatch event (would swamp aggregators during an incident).
23/// The full CSV is logged once at WARN when the worker connects; operators
24/// cross-reference this 8-hex prefix to the full set.
25/// Return true iff every non-empty comma-separated token in `required_csv`
26/// is present in `worker_caps`. Mirrors the authoritative Lua subset check
27/// in scheduling.lua (parse_capability_csv + missing_capabilities) — this
28/// is a fast-path Rust short-circuit so we can skip the quota-admission
29/// write for executions we already know the worker can't claim. Empty /
30/// all-separator CSV → subset holds trivially.
31fn caps_subset(required_csv: &str, worker_caps: &BTreeSet<String>) -> bool {
32    required_csv
33        .split(',')
34        .filter(|t| !t.is_empty())
35        .all(|t| worker_caps.contains(t))
36}
37
38/// Short, stable digest of a worker's caps CSV for per-event log lines.
39/// Thin wrapper around the shared helper so call sites read as
40/// "worker_caps_digest" locally while the algorithm lives in one place.
41fn worker_caps_digest(csv: &str) -> String {
42    ff_core::hash::fnv1a_xor8hex(csv)
43}
44
45/// A claim grant issued by the scheduler for a specific execution.
46///
47/// Re-exported from [`ff_core::contracts::ClaimGrant`]. Lives in
48/// `ff-core` so `ff-scheduler` (issuer) and `ff-sdk` (consumer)
49/// share one wire-level type without a cross-dep between them.
50pub use ff_core::contracts::ClaimGrant;
51
52/// A reclaim grant for a resumed (attempt_interrupted) execution.
53///
54/// Re-export of [`ff_core::contracts::ReclaimGrant`] for symmetry
55/// with [`ClaimGrant`]. `ff-scheduler` will be the canonical
56/// producer once the Batch-C reclaim scanner lands; today only
57/// test fixtures construct this type. Consumed by
58/// `FlowFabricWorker::claim_from_reclaim_grant`.
59pub use ff_core::contracts::ReclaimGrant;
60
61/// Budget check result from a cross-partition budget read.
62#[derive(Debug)]
63pub enum BudgetCheckResult {
64    /// Budget is within limits — proceed.
65    Ok,
66    /// Budget hard limit breached. Contains (dimension, detail_string).
67    HardBreach { dimension: String, detail: String },
68}
69
70/// Outcome of a quota admission check.
71enum QuotaCheckOutcome {
72    /// No quota attached to this execution.
73    NoQuota,
74    /// Quota admitted — carries context for release on subsequent failure.
75    Admitted { tag: String, quota_id: String, eid: String },
76    /// Quota denied — execution should be blocked.
77    Blocked(String),
78}
79
80/// Cross-partition budget checker with per-cycle caching.
81///
82/// Reads budget usage/limits once per scan cycle (not per candidate).
83/// This is MANDATORY for performance — without it, 50K blocked executions
84/// would produce 50K budget reads per cycle.
85pub struct BudgetChecker {
86    /// Cached budget status: budget_id → BudgetCheckResult.
87    /// Reset at the start of each scheduler cycle.
88    cache: std::collections::HashMap<String, BudgetCheckResult>,
89    config: PartitionConfig,
90}
91
92impl BudgetChecker {
93    pub fn new(config: PartitionConfig) -> Self {
94        Self {
95            cache: std::collections::HashMap::new(),
96            config,
97        }
98    }
99
100    /// Check a budget by ID. Reads from Valkey on first call per budget,
101    /// caches for subsequent candidates in the same cycle.
102    pub async fn check_budget(
103        &mut self,
104        client: &ferriskey::Client,
105        budget_id: &str,
106    ) -> &BudgetCheckResult {
107        if self.cache.contains_key(budget_id) {
108            return &self.cache[budget_id];
109        }
110
111        // Compute real {b:M} partition tag from budget_id
112        let (usage_key, limits_key) = match BudgetId::parse(budget_id) {
113            Ok(bid) => {
114                let partition = budget_partition(&bid, &self.config);
115                let tag = partition.hash_tag();
116                (
117                    format!("ff:budget:{}:{}:usage", tag, budget_id),
118                    format!("ff:budget:{}:{}:limits", tag, budget_id),
119                )
120            }
121            Err(_) => {
122                // Fallback for non-UUID budget IDs (test compat)
123                (
124                    format!("ff:budget:{{b:0}}:{}:usage", budget_id),
125                    format!("ff:budget:{{b:0}}:{}:limits", budget_id),
126                )
127            }
128        };
129
130        let result = match Self::read_and_check(client, &usage_key, &limits_key).await {
131            Ok(r) => r,
132            Err(e) => {
133                tracing::warn!(
134                    budget_id,
135                    error = %e,
136                    "budget_checker: failed to read budget, allowing (advisory)"
137                );
138                BudgetCheckResult::Ok
139            }
140        };
141
142        self.cache.insert(budget_id.to_owned(), result);
143        &self.cache[budget_id]
144    }
145
146    /// Read budget usage and limits, compare each dimension.
147    async fn read_and_check(
148        client: &ferriskey::Client,
149        usage_key: &str,
150        limits_key: &str,
151    ) -> Result<BudgetCheckResult, ferriskey::Error> {
152        // Read all limit dimensions via hgetall (returns HashMap, not flat pairs)
153        let limits: std::collections::HashMap<String, String> = client
154            .hgetall(limits_key)
155            .await?;
156
157        // Parse hard limits
158        for (field, limit_val) in &limits {
159            if !field.starts_with("hard:") {
160                continue;
161            }
162            let dimension = &field[5..]; // strip "hard:" prefix
163            let limit: u64 = match limit_val.parse() {
164                Ok(v) if v > 0 => v,
165                _ => continue,
166            };
167
168            // Read current usage for this dimension
169            let usage_str: Option<String> = client
170                .cmd("HGET")
171                .arg(usage_key)
172                .arg(dimension)
173                .execute()
174                .await
175                .unwrap_or(None);
176            let usage: u64 = usage_str
177                .as_deref()
178                .and_then(|s| s.parse().ok())
179                .unwrap_or(0);
180
181            if usage >= limit {
182                return Ok(BudgetCheckResult::HardBreach {
183                    dimension: dimension.to_owned(),
184                    detail: format!("budget {}: {} {}/{}", usage_key, dimension, usage, limit),
185                });
186            }
187        }
188
189        Ok(BudgetCheckResult::Ok)
190    }
191
192    /// Clear the cache at the start of a new scheduler cycle.
193    pub fn reset(&mut self) {
194        self.cache.clear();
195    }
196}
197
198/// Single-lane scheduler with budget/quota pre-checks.
199///
200/// Iterates execution partitions sequentially, picks the first eligible
201/// execution (lowest priority score). Before issuing a claim grant:
202/// 1. Check all attached budgets (cross-partition, cached per cycle)
203/// 2. Check quota admission (cross-partition FCALL)
204/// 3. If any check fails: block the candidate and try next
205/// 4. If all pass: issue the claim grant
206pub struct Scheduler {
207    client: ferriskey::Client,
208    config: PartitionConfig,
209}
210
211impl Scheduler {
212    pub fn new(client: ferriskey::Client, config: PartitionConfig) -> Self {
213        Self { client, config }
214    }
215
216    /// Find an eligible execution and issue a claim grant.
217    ///
218    /// Iterates all execution partitions looking for the first partition
219    /// with an eligible execution. Issues a claim grant via FCALL.
220    ///
221    /// `worker_capabilities` is sent to `ff_issue_claim_grant` as a sorted
222    /// CSV (BTreeSet guarantees deterministic order). Executions whose
223    /// `required_capabilities` are not a subset of this set are skipped
224    /// (stay queued) via `ScriptError::CapabilityMismatch`.
225    ///
226    /// Returns `Ok(None)` if no eligible executions exist anywhere.
227    /// Returns `Ok(Some(grant))` on success.
228    /// Returns `Err` on Valkey errors.
229    pub async fn claim_for_worker(
230        &self,
231        lane: &LaneId,
232        worker_id: &WorkerId,
233        worker_instance_id: &WorkerInstanceId,
234        worker_capabilities: &BTreeSet<String>,
235        grant_ttl_ms: u64,
236    ) -> Result<Option<ClaimGrant>, SchedulerError> {
237        let num_partitions = self.config.num_flow_partitions;
238        let mut budget_checker = BudgetChecker::new(self.config);
239
240        // Jitter the partition scan start to avoid thundering-herd on
241        // partition 0 when 100 workers all tick simultaneously. Seeded
242        // from worker_instance_id so this worker hits a stable window
243        // within a single scheduling cycle (still covers every partition),
244        // and different workers naturally diverge. Uses the shared
245        // ff_core::hash FNV-1a reducer — same helper powering ff-sdk's
246        // PARTITION_SCAN_CHUNK cursor seed. Zero-modulus safe.
247        let start_p: u16 = ff_core::hash::fnv1a_u16_mod(
248            worker_instance_id.as_str(),
249            num_partitions,
250        );
251        // BTreeSet iterates sorted → stable CSV for Lua subset match.
252        // Ingress validation mirrors FlowFabricWorker::connect (ff-sdk):
253        //   - `,` is the CSV delimiter — a token containing one would split
254        //     mid-parse and could let a {"gpu"} worker appear to satisfy
255        //     {"gpu,cuda"} (silent auth bypass).
256        //   - Empty strings would inflate the CSV with leading / adjacent
257        //     commas ("" → ",gpu" → [" "," gpu"]) and inflate the token
258        //     count past CAPS_MAX_TOKENS for no semantic reason.
259        //   - Non-printable / whitespace: "gpu " vs "gpu" or "gpu\n" vs
260        //     "gpu" silently mis-routes. Require printable ASCII excluding
261        //     space (0x21-0x7E) at ingress so typos fail loud.
262        // Enforce ALL here so operator misconfig at the scheduler entry
263        // point fails loud, symmetric with the SDK inline-claim path.
264        // Bounds (#csv, #tokens) are enforced by the Lua side.
265        for cap in worker_capabilities {
266            if cap.is_empty() {
267                return Err(SchedulerError::Config(
268                    "capability token must not be empty".to_owned(),
269                ));
270            }
271            if cap.contains(',') {
272                return Err(SchedulerError::Config(format!(
273                    "capability token may not contain ',' (CSV delimiter): {cap:?}"
274                )));
275            }
276            // Reject ASCII control + whitespace (incl. Unicode whitespace);
277            // allow non-ASCII printable UTF-8 so i18n cap names work. CSV
278            // delimiter `,` is single-byte and never a UTF-8 continuation,
279            // so multibyte UTF-8 is safe on the wire. Symmetric with
280            // ff-sdk::FlowFabricWorker::connect.
281            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
282                return Err(SchedulerError::Config(format!(
283                    "capability token must not contain whitespace or control characters: {cap:?}"
284                )));
285            }
286        }
287        if worker_capabilities.len() > ff_core::policy::CAPS_MAX_TOKENS {
288            return Err(SchedulerError::Config(format!(
289                "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
290                ff_core::policy::CAPS_MAX_TOKENS,
291                worker_capabilities.len()
292            )));
293        }
294        let worker_caps_csv = worker_capabilities
295            .iter()
296            .filter(|s| !s.is_empty())
297            .cloned()
298            .collect::<Vec<_>>()
299            .join(",");
300        // Stable digest used in per-mismatch logs so the full 4KB CSV
301        // doesn't get echoed on every mismatch. See worker_caps_digest.
302        let worker_caps_hash = worker_caps_digest(&worker_caps_csv);
303        if worker_caps_csv.len() > ff_core::policy::CAPS_MAX_BYTES {
304            return Err(SchedulerError::Config(format!(
305                "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
306                ff_core::policy::CAPS_MAX_BYTES,
307                worker_caps_csv.len()
308            )));
309        }
310
311        for offset in 0..num_partitions {
312            // Jittered iteration: start at start_p, wrap modulo num_partitions.
313            // Still covers every partition once per cycle; prevents all
314            // workers from hammering partition 0 first simultaneously.
315            let p_idx = (start_p + offset) % num_partitions;
316            let partition = Partition {
317                family: PartitionFamily::Execution,
318                index: p_idx,
319            };
320            let idx = IndexKeys::new(&partition);
321            let eligible_key = idx.lane_eligible(lane);
322
323            // ZRANGEBYSCORE eligible -inf +inf LIMIT 0 1
324            // Lowest score = highest priority candidate
325            let candidates: Vec<String> = match self
326                .client
327                .cmd("ZRANGEBYSCORE")
328                .arg(&eligible_key)
329                .arg("-inf")
330                .arg("+inf")
331                .arg("LIMIT")
332                .arg("0")
333                .arg("1")
334                .execute()
335                .await
336            {
337                Ok(ids) => ids,
338                Err(e) => {
339                    tracing::warn!(
340                        partition = p_idx,
341                        error = %e,
342                        "scheduler: ZRANGEBYSCORE eligible failed, skipping partition"
343                    );
344                    continue;
345                }
346            };
347
348            let eid_str = match candidates.first() {
349                Some(s) => s,
350                None => continue, // no eligible in this partition
351            };
352
353            // Parse the execution ID
354            let eid = match ExecutionId::parse(eid_str) {
355                Ok(id) => id,
356                Err(e) => {
357                    tracing::warn!(
358                        partition = p_idx,
359                        execution_id = eid_str.as_str(),
360                        error = %e,
361                        "scheduler: invalid execution_id in eligible set, skipping"
362                    );
363                    continue;
364                }
365            };
366
367            let exec_ctx = ExecKeyContext::new(&partition, &eid);
368            let core_key = exec_ctx.core();
369            let eid_s = eid.to_string();
370            let now_ms = match server_time_ms(&self.client).await {
371                Ok(t) => t,
372                Err(e) => {
373                    tracing::warn!(
374                        partition = p_idx,
375                        error = %e,
376                        "scheduler: failed to get server time, skipping partition"
377                    );
378                    continue;
379                }
380            };
381
382            // ── Capability pre-check (in-slot HGET, cheap) ──
383            // Runs BEFORE quota admission so we never ZADD a quota slot
384            // for an execution this worker can't actually claim. Without
385            // this, on an unmatchable-top-of-zset, every scheduling tick
386            // would ZADD {q:K}:admitted_set then ZREM it via
387            // release_admission on the capability_mismatch reject — a
388            // cross-slot write storm amplifying the mismatch loop.
389            //
390            // Lua `ff_issue_claim_grant` still does the authoritative
391            // check; this is a fast-path short-circuit, not a substitute.
392            // A narrow race exists where `required_capabilities` is
393            // updated between our HGET and the FCALL — the FCALL is still
394            // atomic and correct.
395            let required_caps_csv: Option<String> = match self
396                .client
397                .cmd("HGET")
398                .arg(&core_key)
399                .arg("required_capabilities")
400                .execute::<Option<String>>()
401                .await
402            {
403                Ok(v) => v,
404                Err(e) => {
405                    tracing::warn!(
406                        partition = p_idx,
407                        execution_id = eid_s.as_str(),
408                        error = %e,
409                        "scheduler: HGET required_capabilities failed, skipping candidate"
410                    );
411                    continue;
412                }
413            };
414            if let Some(req) = required_caps_csv.as_deref()
415                && !req.is_empty()
416                && !caps_subset(req, worker_capabilities)
417            {
418                // Move this execution out of eligible into blocked_route
419                // so we don't hot-loop on it every tick (RFC-009 §564). A
420                // periodic sweep (scanner side) promotes blocked_route →
421                // eligible when a worker with matching caps registers.
422                // Logged with a hash digest, not the raw CSV, to keep
423                // per-mismatch log volume bounded.
424                tracing::info!(
425                    partition = p_idx,
426                    execution_id = eid_s.as_str(),
427                    worker_id = worker_id.as_str(),
428                    worker_caps_hash = worker_caps_hash.as_str(),
429                    required = req,
430                    "scheduler: capability mismatch, blocking execution off eligible"
431                );
432                self.block_candidate(
433                    &partition, &idx, lane, &eid, &eligible_key,
434                    "waiting_for_capable_worker",
435                    "no connected worker satisfies required_capabilities",
436                    now_ms,
437                ).await;
438                continue;
439            }
440
441            // ── Budget pre-check (cross-partition, cached per cycle) ──
442            if let Some(block_detail) = self
443                .check_budgets(&mut budget_checker, &exec_ctx, &core_key, &eid_s)
444                .await?
445            {
446                // Budget breached — block candidate and try next
447                self.block_candidate(
448                    &partition, &idx, lane, &eid, &eligible_key,
449                    "waiting_for_budget", &block_detail, now_ms,
450                ).await;
451                continue;
452            }
453
454            // ── Quota pre-check (cross-partition FCALL on {q:K}) ──
455            let quota_admission = self
456                .check_quota(&exec_ctx, &core_key, &eid_s, now_ms)
457                .await?;
458            match &quota_admission {
459                QuotaCheckOutcome::Blocked(block_detail) => {
460                    self.block_candidate(
461                        &partition, &idx, lane, &eid, &eligible_key,
462                        "waiting_for_quota", block_detail, now_ms,
463                    ).await;
464                    continue;
465                }
466                QuotaCheckOutcome::NoQuota | QuotaCheckOutcome::Admitted { .. } => {}
467            }
468
469            // ── All checks passed — issue claim grant ──
470            let grant_key = exec_ctx.claim_grant();
471            let keys: [&str; 3] = [&core_key, &grant_key, &eligible_key];
472
473            let ttl_str = grant_ttl_ms.to_string();
474            let wid_s = worker_id.to_string();
475            let wiid_s = worker_instance_id.to_string();
476            let lane_s = lane.to_string();
477
478            let argv: [&str; 9] = [
479                &eid_s,
480                &wid_s,
481                &wiid_s,
482                &lane_s,
483                "",   // capability_hash
484                &ttl_str,
485                "",   // route_snapshot_json
486                "",   // admission_summary
487                &worker_caps_csv, // sorted CSV; empty → matches only empty-required execs
488            ];
489
490            let raw = match self
491                .client
492                .fcall::<ferriskey::Value>("ff_issue_claim_grant", &keys, &argv)
493                .await
494            {
495                Ok(v) => v,
496                Err(e) => {
497                    // Transport failure on the FCALL — NOSCRIPT, IoError,
498                    // ClusterDown, etc. This is NOT a normal soft-reject;
499                    // persistent transport errors mean the scheduler is
500                    // effectively idle even though it looks like it's
501                    // running. WARN so ops dashboards (WARN+ aggregators)
502                    // fire instead of burying it at DEBUG.
503                    tracing::warn!(
504                        partition = p_idx,
505                        execution_id = eid_s.as_str(),
506                        error = %e,
507                        "scheduler: ff_issue_claim_grant transport error, trying next"
508                    );
509                    if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
510                        self.release_admission(tag, quota_id, eid).await;
511                    }
512                    continue;
513                }
514            };
515
516            match FcallResult::parse(&raw).and_then(|r| r.into_success()) {
517                Ok(_) => {
518                    tracing::debug!(
519                        partition = p_idx,
520                        execution_id = eid_s.as_str(),
521                        "scheduler: claim grant issued"
522                    );
523                    return Ok(Some(ClaimGrant {
524                        execution_id: eid,
525                        partition,
526                        grant_key: grant_key.clone(),
527                        expires_at_ms: now_ms + grant_ttl_ms,
528                    }));
529                }
530                Err(script_err) => {
531                    if matches!(script_err, ScriptError::CapabilityMismatch(_)) {
532                        // Should be rare: the Rust pre-check above
533                        // normally catches this and blocks the execution
534                        // off eligible. Reaching here means the
535                        // required_capabilities field mutated between our
536                        // HGET and the Lua atomic check (narrow race).
537                        // Block here too so the next tick doesn't loop.
538                        //
539                        // Log uses worker_caps_hash (8-hex digest), not
540                        // the full 4KB CSV, to keep per-mismatch log
541                        // volume bounded. Full CSV is logged once at
542                        // worker connect under "worker caps" WARN.
543                        tracing::info!(
544                            partition = p_idx,
545                            execution_id = eid_s.as_str(),
546                            worker_id = wid_s.as_str(),
547                            worker_caps_hash = worker_caps_hash.as_str(),
548                            error = %script_err,
549                            "scheduler: capability mismatch via Lua (race), blocking execution"
550                        );
551                        self.block_candidate(
552                            &partition, &idx, lane, &eid, &eligible_key,
553                            "waiting_for_capable_worker",
554                            "no connected worker satisfies required_capabilities",
555                            now_ms,
556                        ).await;
557                        if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
558                            self.release_admission(tag, quota_id, eid).await;
559                        }
560                        continue;
561                    } else {
562                        // Any other logical reject (grant_already_exists,
563                        // execution_not_in_eligible_set, execution_not_eligible,
564                        // invalid_capabilities, etc.). These are rare and each
565                        // indicates either a race or a config problem — in
566                        // either case ops need to see it, so WARN, not DEBUG.
567                        tracing::warn!(
568                            partition = p_idx,
569                            execution_id = eid_s.as_str(),
570                            error = %script_err,
571                            "scheduler: ff_issue_claim_grant rejected, trying next"
572                        );
573                    }
574                    if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
575                        self.release_admission(tag, quota_id, eid).await;
576                    }
577                    continue;
578                }
579            }
580        }
581
582        Ok(None)
583    }
584
585    /// Read budget_ids from exec_core and check each. Returns block detail
586    /// string if any budget is breached, None if all pass.
587    async fn check_budgets(
588        &self,
589        checker: &mut BudgetChecker,
590        _exec_ctx: &ExecKeyContext,
591        core_key: &str,
592        _eid_s: &str,
593    ) -> Result<Option<String>, SchedulerError> {
594        // Read budget_ids from exec_core (comma-separated or JSON list)
595        let budget_ids_str: Option<String> = self
596            .client
597            .cmd("HGET")
598            .arg(core_key)
599            .arg("budget_ids")
600            .execute()
601            .await?;
602
603        let budget_ids_str = match budget_ids_str {
604            Some(s) => s,
605            None => return Ok(None),
606        };
607        if budget_ids_str.is_empty() {
608            return Ok(None); // no budgets attached
609        }
610
611        // Parse comma-separated budget IDs
612        for budget_id in budget_ids_str.split(',') {
613            let budget_id = budget_id.trim();
614            if budget_id.is_empty() {
615                continue;
616            }
617            let result = checker.check_budget(&self.client, budget_id).await;
618            if let BudgetCheckResult::HardBreach { detail, .. } = result {
619                return Ok(Some(detail.clone()));
620            }
621        }
622
623        Ok(None)
624    }
625
626    /// Check quota admission for the candidate.
627    async fn check_quota(
628        &self,
629        _exec_ctx: &ExecKeyContext,
630        core_key: &str,
631        eid_s: &str,
632        now_ms: u64,
633    ) -> Result<QuotaCheckOutcome, SchedulerError> {
634        // Read quota_policy_id from exec_core
635        let quota_id_str: Option<String> = self
636            .client
637            .cmd("HGET")
638            .arg(core_key)
639            .arg("quota_policy_id")
640            .execute()
641            .await?;
642
643        let quota_id_str = match quota_id_str {
644            Some(s) => s,
645            None => return Ok(QuotaCheckOutcome::NoQuota),
646        };
647        if quota_id_str.is_empty() {
648            return Ok(QuotaCheckOutcome::NoQuota);
649        }
650
651        // Compute real {q:K} partition tag from quota_policy_id
652        let tag = match QuotaPolicyId::parse(&quota_id_str) {
653            Ok(qid) => {
654                let partition = quota_partition(&qid, &self.config);
655                partition.hash_tag()
656            }
657            Err(_) => "{q:0}".to_owned(), // fallback for non-UUID test IDs
658        };
659
660        let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id_str);
661        let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id_str);
662        let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id_str);
663        let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id_str, eid_s);
664        let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id_str);
665
666        // Read quota limits from policy hash
667        let rate_limit: Option<String> = self.client
668            .cmd("HGET").arg(&quota_def_key).arg("max_requests_per_window")
669            .execute().await?;
670        let window_secs: Option<String> = self.client
671            .cmd("HGET").arg(&quota_def_key).arg("requests_per_window_seconds")
672            .execute().await?;
673        let concurrency_cap: Option<String> = self.client
674            .cmd("HGET").arg(&quota_def_key).arg("active_concurrency_cap")
675            .execute().await?;
676        let jitter: Option<String> = self.client
677            .cmd("HGET").arg(&quota_def_key).arg("jitter_ms")
678            .execute().await?;
679
680        let rate_limit = rate_limit.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
681        let window_secs = window_secs.as_deref().and_then(|s| s.parse().ok()).unwrap_or(60u64);
682        let concurrency_cap = concurrency_cap.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
683        let jitter_ms = jitter.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
684
685        // No limits configured — admit without recording
686        if rate_limit == 0 && concurrency_cap == 0 {
687            return Ok(QuotaCheckOutcome::NoQuota);
688        }
689
690        // FCALL ff_check_admission_and_record on {q:K}
691        let keys: [&str; 5] = [&window_key, &concurrency_key, &quota_def_key, &admitted_key, &admitted_set_key];
692        let now_s = now_ms.to_string();
693        let ws = window_secs.to_string();
694        let rl = rate_limit.to_string();
695        let cc = concurrency_cap.to_string();
696        let jt = jitter_ms.to_string();
697        let argv: [&str; 6] = [&now_s, &ws, &rl, &cc, eid_s, &jt];
698
699        match self.client
700            .fcall::<ferriskey::Value>("ff_check_admission_and_record", &keys, &argv)
701            .await
702        {
703            Ok(result) => {
704                // Parse domain-specific result: {"ADMITTED"}, {"RATE_EXCEEDED", retry_after},
705                // {"CONCURRENCY_EXCEEDED"}, {"ALREADY_ADMITTED"}
706                let status = Self::parse_admission_status(&result);
707                match status.as_str() {
708                    "ADMITTED" | "ALREADY_ADMITTED" => Ok(QuotaCheckOutcome::Admitted {
709                        tag: tag.clone(),
710                        quota_id: quota_id_str.clone(),
711                        eid: eid_s.to_owned(),
712                    }),
713                    "RATE_EXCEEDED" => Ok(QuotaCheckOutcome::Blocked(format!(
714                        "quota {}: rate limit {}/{} per {}s window",
715                        quota_id_str, rate_limit, rate_limit, window_secs
716                    ))),
717                    "CONCURRENCY_EXCEEDED" => Ok(QuotaCheckOutcome::Blocked(format!(
718                        "quota {}: concurrency cap {}",
719                        quota_id_str, concurrency_cap
720                    ))),
721                    _ => {
722                        tracing::warn!(
723                            quota_id = quota_id_str.as_str(),
724                            status = status.as_str(),
725                            "scheduler: unexpected admission result"
726                        );
727                        Ok(QuotaCheckOutcome::NoQuota)
728                    }
729                }
730            }
731            Err(e) => {
732                tracing::warn!(
733                    quota_id = quota_id_str.as_str(),
734                    error = %e,
735                    "scheduler: quota FCALL failed, allowing (advisory)"
736                );
737                Ok(QuotaCheckOutcome::NoQuota) // allow on FCALL error (advisory)
738            }
739        }
740    }
741
742    /// Parse the first element of a Valkey array result as a status string.
743    fn parse_admission_status(result: &ferriskey::Value) -> String {
744        match result {
745            ferriskey::Value::Array(arr) => {
746                match arr.first() {
747                    Some(Ok(ferriskey::Value::BulkString(b))) => {
748                        String::from_utf8_lossy(b).into_owned()
749                    }
750                    Some(Ok(ferriskey::Value::SimpleString(s))) => s.clone(),
751                    _ => "UNKNOWN".to_owned(),
752                }
753            }
754            _ => "UNKNOWN".to_owned(),
755        }
756    }
757
758    /// Block a candidate that failed budget/quota check.
759    /// FCALL ff_block_execution_for_admission on {p:N}.
760    #[allow(clippy::too_many_arguments)]
761    async fn block_candidate(
762        &self,
763        partition: &Partition,
764        idx: &IndexKeys,
765        lane: &LaneId,
766        eid: &ExecutionId,
767        eligible_key: &str,
768        block_reason: &str,
769        blocking_detail: &str,
770        now_ms: u64,
771    ) {
772        let exec_ctx = ExecKeyContext::new(partition, eid);
773        let core_key = exec_ctx.core();
774        let eid_s = eid.to_string();
775        let blocked_key = match block_reason {
776            "waiting_for_budget" => idx.lane_blocked_budget(lane),
777            "waiting_for_quota" => idx.lane_blocked_quota(lane),
778            "waiting_for_capable_worker" => idx.lane_blocked_route(lane),
779            _ => idx.lane_blocked_budget(lane),
780        };
781
782        let keys: [&str; 3] = [&core_key, eligible_key, &blocked_key];
783        let now_s = now_ms.to_string();
784        let argv: [&str; 4] = [&eid_s, block_reason, blocking_detail, &now_s];
785
786        // Parse FcallResult so we distinguish Lua-level rejections (e.g.
787        // execution_not_active because the execution went terminal between
788        // our HGET and the FCALL) from a real block. Previously `Ok(_)`
789        // treated an err-tuple as success → INFO log "candidate blocked"
790        // while nothing actually changed on exec_core, then the next tick
791        // re-picked the same candidate and looped. Mirrors the
792        // release_admission parse fix.
793        match self.client
794            .fcall::<ferriskey::Value>("ff_block_execution_for_admission", &keys, &argv)
795            .await
796        {
797            Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
798                Ok(_) => {
799                    tracing::info!(
800                        execution_id = eid_s,
801                        reason = block_reason,
802                        "scheduler: candidate blocked by admission check"
803                    );
804                }
805                Err(script_err) => {
806                    // Logical reject from Lua (e.g. execution_not_active
807                    // — the execution went terminal between the scheduler
808                    // pick and the block FCALL; the candidate loop will
809                    // naturally move on). WARN so ops dashboards surface
810                    // actual block failures, but not so loud that a common
811                    // race spams alerts.
812                    tracing::warn!(
813                        execution_id = eid_s,
814                        reason = block_reason,
815                        error = %script_err,
816                        "scheduler: ff_block_execution_for_admission rejected by Lua"
817                    );
818                }
819            },
820            Err(e) => {
821                tracing::warn!(
822                    execution_id = eid_s,
823                    error = %e,
824                    "scheduler: ff_block_execution_for_admission transport failed"
825                );
826            }
827        }
828    }
829
830    /// Release a previously-recorded quota admission slot.
831    /// Called when ff_issue_claim_grant fails after admission was recorded.
832    async fn release_admission(
833        &self,
834        tag: &str,
835        quota_id: &str,
836        eid_s: &str,
837    ) {
838        let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid_s);
839        let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
840        let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
841
842        let keys: [&str; 3] = [&admitted_key, &admitted_set_key, &concurrency_key];
843        let argv: [&str; 1] = [eid_s];
844
845        // Parse the Lua response properly: FCALL returns `Ok(Value)` for
846        // BOTH success and logical-error paths. Treating Ok(_) blindly as
847        // "released" logs a false positive when the Lua returns
848        // `{0, "quota_not_found"}` (or any other script-level err) — the
849        // slot in fact remains pinned until its TTL expires, which is
850        // minutes to hours. Surface the real outcome so on-call sees
851        // actual release failures instead of clean "released" events.
852        match self.client
853            .fcall::<ferriskey::Value>("ff_release_admission", &keys, &argv)
854            .await
855        {
856            Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
857                Ok(_) => {
858                    tracing::info!(
859                        execution_id = eid_s,
860                        quota_id,
861                        "scheduler: released admission after claim failure"
862                    );
863                }
864                Err(script_err) => {
865                    tracing::warn!(
866                        execution_id = eid_s,
867                        quota_id,
868                        error = %script_err,
869                        "scheduler: ff_release_admission rejected by Lua \
870                         (slot will expire via TTL)"
871                    );
872                }
873            },
874            Err(e) => {
875                tracing::warn!(
876                    execution_id = eid_s,
877                    quota_id,
878                    error = %e,
879                    "scheduler: ff_release_admission transport failed \
880                     (slot will expire via TTL)"
881                );
882            }
883        }
884    }
885}
886
887/// Get server time in milliseconds via the TIME command.
888async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
889    let result: Vec<String> = client
890        .cmd("TIME")
891        .execute()
892        .await?;
893    if result.len() < 2 {
894        return Err(ferriskey::Error::from((
895            ferriskey::ErrorKind::ClientError,
896            "TIME returned fewer than 2 elements",
897        )));
898    }
899    let secs: u64 = result[0].parse().map_err(|_| {
900        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
901    })?;
902    let micros: u64 = result[1].parse().map_err(|_| {
903        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
904    })?;
905    Ok(secs * 1000 + micros / 1000)
906}
907
908/// Errors from the scheduler.
909#[derive(Debug, thiserror::Error)]
910pub enum SchedulerError {
911    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
912    #[error("valkey: {0}")]
913    Valkey(#[from] ferriskey::Error),
914    /// Valkey error with additional context (preserves ErrorKind via #[source]).
915    #[error("valkey ({context}): {source}")]
916    ValkeyContext {
917        #[source]
918        source: ferriskey::Error,
919        context: String,
920    },
921    /// Caller-supplied value failed ingress validation. NOT retryable — the
922    /// caller must fix its input before retrying.
923    #[error("config: {0}")]
924    Config(String),
925}
926
927impl SchedulerError {
928    /// Returns the underlying ferriskey ErrorKind, if this is a Valkey error.
929    /// Matches `ServerError::valkey_kind` and `ScriptError::valkey_kind` so
930    /// callers can treat all three uniformly.
931    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
932        match self {
933            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
934            Self::Config(_) => None,
935        }
936    }
937
938    /// Whether this error is safely retryable by a caller. Mirrors
939    /// `ServerError::is_retryable` semantics.
940    pub fn is_retryable(&self) -> bool {
941        self.valkey_kind()
942            .map(is_retryable_kind)
943            .unwrap_or(false)
944    }
945}
946
947#[cfg(test)]
948mod tests {
949    use super::*;
950    use ferriskey::ErrorKind;
951
952    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
953        ferriskey::Error::from((kind, "synthetic"))
954    }
955
956    #[test]
957    fn scheduler_is_retryable_matches_kind_table() {
958        assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
959        assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
960
961        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
962        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
963        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
964    }
965
966    #[test]
967    fn scheduler_valkey_context_is_retryable() {
968        let err = SchedulerError::ValkeyContext {
969            source: mk_fk_err(ErrorKind::BusyLoadingError),
970            context: "HGET budget_ids".into(),
971        };
972        assert!(err.is_retryable());
973    }
974
975    #[test]
976    fn scheduler_valkey_kind_exposed() {
977        let err = SchedulerError::Valkey(mk_fk_err(ErrorKind::TryAgain));
978        assert_eq!(err.valkey_kind(), Some(ErrorKind::TryAgain));
979    }
980}
981
982
983