Skip to main content

ff_scheduler/
claim.rs

1//! Claim-grant cycle: find eligible executions and issue grants.
2//!
3//! The scheduler selects candidates from partition-local eligible sorted sets,
4//! then atomically issues a claim grant via `FCALL ff_issue_claim_grant`.
5//! The worker (ff-sdk) subsequently consumes the grant via `ff_claim_execution`.
6//!
7//! Phase 5: single lane, budget/quota pre-checks before grant issuance.
8//! Candidates that fail budget/quota are blocked via ff_block_execution_for_admission.
9//!
10//! Reference: RFC-009 §12.7, RFC-010 §3.1, RFC-008 §1.7
11//!
12//! # Postgres variant (RFC-v0.7 Wave 5b)
13//!
14//! The admission pipeline is ported to Postgres as
15//! [`ff_backend_postgres::scheduler::PostgresScheduler`]. It lives in
16//! the backend crate — not as a `Scheduler` variant here — to keep
17//! `ff-scheduler` off the sqlx dep graph. `ff-engine` / `ff-server`
18//! branch at construction time on the concrete backend type.
19//! See `crates/ff-backend-postgres/src/scheduler.rs` for the
20//! pipeline + isolation notes.
21
22use ff_core::keys::{ExecKeyContext, IndexKeys};
23use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition, quota_partition};
24use ff_core::types::{BudgetId, ExecutionId, LaneId, QuotaPolicyId, WorkerId, WorkerInstanceId};
25use ff_script::error::ScriptError;
26use ff_script::result::FcallResult;
27use ff_script::retry::is_retryable_kind;
28use std::collections::BTreeSet;
29use std::sync::atomic::{AtomicU64, Ordering};
30
31/// Short stable digest of a worker's caps CSV for per-event log lines.
32/// Thin wrapper around the shared helper so call sites read as
33/// "worker_caps_digest" locally while the algorithm lives in one place.
34fn worker_caps_digest(csv: &str) -> String {
35    ff_core::hash::fnv1a_xor8hex(csv)
36}
37
38/// A claim grant issued by the scheduler for a specific execution.
39///
40/// Re-exported from [`ff_core::contracts::ClaimGrant`]. Lives in
41/// `ff-core` so `ff-scheduler` (issuer) and `ff-sdk` (consumer)
42/// share one wire-level type without a cross-dep between them.
43pub use ff_core::contracts::ClaimGrant;
44
45/// A reclaim grant for a resumed (attempt_interrupted) execution.
46///
47/// Re-export of [`ff_core::contracts::ReclaimGrant`] for symmetry
48/// with [`ClaimGrant`]. `ff-scheduler` will be the canonical
49/// producer once the Batch-C reclaim scanner lands; today only
50/// test fixtures construct this type. Consumed by
51/// `FlowFabricWorker::claim_from_reclaim_grant`.
52pub use ff_core::contracts::ReclaimGrant;
53
54/// Budget check result from a cross-partition budget read.
55#[derive(Debug)]
56pub enum BudgetCheckResult {
57    /// Budget is within limits — proceed.
58    Ok,
59    /// Budget hard limit breached. Contains (dimension, detail_string).
60    HardBreach { dimension: String, detail: String },
61}
62
63/// Outcome of a quota admission check.
64enum QuotaCheckOutcome {
65    /// No quota attached to this execution.
66    NoQuota,
67    /// Quota admitted — carries context for release on subsequent failure.
68    Admitted { tag: String, quota_id: String, eid: String },
69    /// Quota denied — execution should be blocked.
70    Blocked(String),
71}
72
73/// Cross-partition budget checker with per-cycle caching.
74///
75/// Reads budget usage/limits once per scan cycle (not per candidate).
76/// This is MANDATORY for performance — without it, 50K blocked executions
77/// would produce 50K budget reads per cycle.
78pub struct BudgetChecker {
79    /// Cached budget status: budget_id → BudgetCheckResult.
80    /// Reset at the start of each scheduler cycle.
81    cache: std::collections::HashMap<String, BudgetCheckResult>,
82    config: PartitionConfig,
83}
84
85impl BudgetChecker {
86    pub fn new(config: PartitionConfig) -> Self {
87        Self {
88            cache: std::collections::HashMap::new(),
89            config,
90        }
91    }
92
93    /// Check a budget by ID. Reads from Valkey on first call per budget,
94    /// caches for subsequent candidates in the same cycle.
95    ///
96    /// Fail-closed: transport errors propagate as
97    /// [`SchedulerError::ValkeyContext`] rather than being cached as
98    /// `Ok`. Caching an Err would be wrong — a transient blip would then
99    /// pin every candidate in this cycle to the same denial.  Successful
100    /// results (including `HardBreach`) are still cached so 50K blocked
101    /// candidates sharing a budget do one read per cycle, not 50K.
102    pub async fn check_budget(
103        &mut self,
104        client: &ferriskey::Client,
105        budget_id: &str,
106    ) -> Result<&BudgetCheckResult, SchedulerError> {
107        if self.cache.contains_key(budget_id) {
108            return Ok(&self.cache[budget_id]);
109        }
110
111        // Compute real {b:M} partition tag from budget_id
112        let (usage_key, limits_key) = match BudgetId::parse(budget_id) {
113            Ok(bid) => {
114                let partition = budget_partition(&bid, &self.config);
115                let tag = partition.hash_tag();
116                (
117                    format!("ff:budget:{}:{}:usage", tag, budget_id),
118                    format!("ff:budget:{}:{}:limits", tag, budget_id),
119                )
120            }
121            Err(_) => {
122                // Fallback for non-UUID budget IDs (test compat)
123                (
124                    format!("ff:budget:{{b:0}}:{}:usage", budget_id),
125                    format!("ff:budget:{{b:0}}:{}:limits", budget_id),
126                )
127            }
128        };
129
130        let result =
131            Self::read_and_check(client, &usage_key, &limits_key)
132                .await
133                .map_err(|source| SchedulerError::ValkeyContext {
134                    source,
135                    context: format!("budget_checker read {budget_id}"),
136                })?;
137
138        self.cache.insert(budget_id.to_owned(), result);
139        Ok(&self.cache[budget_id])
140    }
141
142    /// Read budget usage and limits, compare each dimension.
143    async fn read_and_check(
144        client: &ferriskey::Client,
145        usage_key: &str,
146        limits_key: &str,
147    ) -> Result<BudgetCheckResult, ferriskey::Error> {
148        // Read all limit dimensions via hgetall (returns HashMap, not flat pairs)
149        let limits: std::collections::HashMap<String, String> = client
150            .hgetall(limits_key)
151            .await?;
152
153        // Parse hard limits
154        for (field, limit_val) in &limits {
155            if !field.starts_with("hard:") {
156                continue;
157            }
158            let dimension = &field[5..]; // strip "hard:" prefix
159            let limit: u64 = match limit_val.parse() {
160                Ok(v) if v > 0 => v,
161                _ => continue,
162            };
163
164            // Read current usage for this dimension. Fail-closed: a
165            // transport error must NOT silently default the usage to 0
166            // (which would make every breach invisible).  Absence is
167            // still treated as 0 — the caller is expected to HSET
168            // usage_key on every increment.
169            let usage_str: Option<String> = client
170                .cmd("HGET")
171                .arg(usage_key)
172                .arg(dimension)
173                .execute()
174                .await?;
175            let usage: u64 = usage_str
176                .as_deref()
177                .and_then(|s| s.parse().ok())
178                .unwrap_or(0);
179
180            if usage >= limit {
181                return Ok(BudgetCheckResult::HardBreach {
182                    dimension: dimension.to_owned(),
183                    detail: format!("budget {}: {} {}/{}", usage_key, dimension, usage, limit),
184                });
185            }
186        }
187
188        Ok(BudgetCheckResult::Ok)
189    }
190
191    /// Clear the cache at the start of a new scheduler cycle.
192    pub fn reset(&mut self) {
193        self.cache.clear();
194    }
195}
196
197/// Tunable scheduler behavior, separate from the topology-level
198/// [`PartitionConfig`]. Lives in `ff-scheduler` because every field is a
199/// scheduler-internal scan policy — none of it leaks into persisted keys,
200/// Lua scripts, or cross-crate wire shapes (unlike `PartitionConfig`).
201/// If a future RFC needs a unified knob surface we can lift this up; not
202/// premature today.
203#[derive(Debug, Clone, Copy)]
204pub struct SchedulerConfig {
205    /// Maximum number of partitions to probe in a single
206    /// [`Scheduler::claim_for_worker`] call before giving up and returning
207    /// `Ok(None)`.
208    ///
209    /// **Trade-off:** smaller = lower worst-case no-hit latency per claim
210    /// call (each probe is a ZRANGEBYSCORE round-trip, ~0.1ms LAN), larger
211    /// = better fairness per call (more partitions seen before a worker
212    /// yields). At the default of 32 with 256 partitions, any given
213    /// partition is reached within `ceil(256/32) = 8` scheduling ticks —
214    /// combined with the rotation cursor, that bounds worst-case
215    /// starvation for a specific partition's head-of-queue execution.
216    pub max_partitions_per_scan: u16,
217    /// Duration a rotation cursor position stays stable before advancing.
218    ///
219    /// **Trade-off:** too short and tight-loop workers re-enter the same
220    /// window on every tick (cursor never actually rotates relative to
221    /// them); too long and slow-poll workers keep seeing the same cursor
222    /// across many ticks (reducing fairness benefit). 250ms is a middle
223    /// ground: tight-loop workers (sub-ms claim cycles) see a fresh
224    /// window every ~250 ticks, 1s-poll workers see a fresh window every
225    /// 4 ticks. Tune down if your workers all idle-poll >1s; tune up if
226    /// you run a fleet of tight-loop claimers and want less cursor churn.
227    pub rotation_window_ms: u64,
228}
229
230impl SchedulerConfig {
231    /// Default scan budget: probe 32 partitions per claim call.
232    /// See [`Self::max_partitions_per_scan`] for the latency/fairness
233    /// rationale.
234    pub const DEFAULT_MAX_PARTITIONS_PER_SCAN: u16 = 32;
235
236    /// Default rotation window: advance the cursor every 250ms.
237    /// See [`Self::rotation_window_ms`].
238    pub const DEFAULT_ROTATION_WINDOW_MS: u64 = 250;
239}
240
241impl Default for SchedulerConfig {
242    fn default() -> Self {
243        Self {
244            max_partitions_per_scan: Self::DEFAULT_MAX_PARTITIONS_PER_SCAN,
245            rotation_window_ms: Self::DEFAULT_ROTATION_WINDOW_MS,
246        }
247    }
248}
249
250/// Iterate partitions `[start, start+1, ..., start+count-1] mod total`.
251///
252/// Factored out of [`Scheduler::claim_for_worker`] so the modular-wrap +
253/// bounded-length contract has a dedicated unit test surface, independent
254/// of Valkey. Called with `count <= total`; values are always distinct.
255///
256/// Returns an iterator (not a Vec) to keep the hot claim path allocation-
257/// free — this runs once per claim tick per worker.
258fn iter_partitions(total: u16, start: u16, count: u16) -> impl Iterator<Item = u16> {
259    debug_assert!(total > 0);
260    let count = count.min(total);
261    (0..count).map(move |i| start.wrapping_add(i) % total)
262}
263
264/// Single-lane scheduler with budget/quota pre-checks.
265///
266/// Iterates execution partitions sequentially, picks the first eligible
267/// execution (lowest priority score). Before issuing a claim grant:
268/// 1. Check all attached budgets (cross-partition, cached per cycle)
269/// 2. Check quota admission (cross-partition FCALL)
270/// 3. If any check fails: block the candidate and try next
271/// 4. If all pass: issue the claim grant
272///
273/// Scan bounding + rotation: each call probes at most
274/// [`SchedulerConfig::max_partitions_per_scan`] partitions starting from
275/// a rotation cursor that advances once per
276/// [`SchedulerConfig::rotation_window_ms`]. The per-worker FNV jitter is
277/// applied on top so different workers diverge within any given window.
278pub struct Scheduler {
279    client: ferriskey::Client,
280    config: PartitionConfig,
281    scheduler_config: SchedulerConfig,
282    /// Packed rotation state: high 48 bits = last window epoch seen
283    /// (`now_ms / rotation_window_ms`), low 16 bits = current cursor
284    /// partition index. A single atomic keeps "advance cursor iff we're
285    /// the first call in a new window" race-free without a Mutex.
286    rotation_state: AtomicU64,
287    /// PR-94: observability handle for claim_from_grant duration and
288    /// budget/quota hit counters. Defaults to a fresh
289    /// `ff_observability::Metrics::new()` — under the default build
290    /// (`observability` feature off) that's the no-op shim; with the
291    /// feature on it's a private real OTEL registry not shared with
292    /// any scrape. Callers that want a shared registry plumb it in
293    /// via [`Self::with_metrics`] / [`Self::with_config_and_metrics`].
294    metrics: std::sync::Arc<ff_observability::Metrics>,
295}
296
297impl Scheduler {
298    pub fn new(client: ferriskey::Client, config: PartitionConfig) -> Self {
299        Self::with_config(client, config, SchedulerConfig::default())
300    }
301
302    /// Construct a scheduler with an explicit [`SchedulerConfig`].
303    /// Use this when you need non-default scan bounds (e.g., in tests that
304    /// want to walk every partition in one call, or deployments with
305    /// non-default polling cadence). Most callers should use
306    /// [`Self::new`].
307    pub fn with_config(
308        client: ferriskey::Client,
309        config: PartitionConfig,
310        scheduler_config: SchedulerConfig,
311    ) -> Self {
312        Self::with_config_and_metrics(
313            client,
314            config,
315            scheduler_config,
316            std::sync::Arc::new(ff_observability::Metrics::new()),
317        )
318    }
319
320    /// PR-94: construct a scheduler with a shared observability
321    /// registry and default [`SchedulerConfig`]. Used by `ff-server`
322    /// so claim/budget/quota metrics land in the same registry
323    /// exposed at `/metrics`. For test harnesses that need both a
324    /// custom `SchedulerConfig` AND observability, use
325    /// [`Self::with_config_and_metrics`].
326    pub fn with_metrics(
327        client: ferriskey::Client,
328        config: PartitionConfig,
329        metrics: std::sync::Arc<ff_observability::Metrics>,
330    ) -> Self {
331        Self::with_config_and_metrics(client, config, SchedulerConfig::default(), metrics)
332    }
333
334    /// PR-94: construct a scheduler with an explicit
335    /// [`SchedulerConfig`] AND a shared observability registry.
336    /// Convenience constructor so callers don't have to thread
337    /// both concerns through separate builders.
338    pub fn with_config_and_metrics(
339        client: ferriskey::Client,
340        config: PartitionConfig,
341        scheduler_config: SchedulerConfig,
342        metrics: std::sync::Arc<ff_observability::Metrics>,
343    ) -> Self {
344        Self {
345            client,
346            config,
347            scheduler_config,
348            rotation_state: AtomicU64::new(0),
349            metrics,
350        }
351    }
352
353    /// Return the current cursor for this call, advancing it if we're the
354    /// first caller to observe a new rotation window. Pure compare-exchange
355    /// on the packed atomic; no Mutex, no clock dep beyond `now_ms`.
356    fn rotation_cursor(&self, now_ms: u64, num_partitions: u16) -> u16 {
357        let window_ms = self.scheduler_config.rotation_window_ms.max(1);
358        let step = self.scheduler_config.max_partitions_per_scan.max(1);
359        let this_epoch = now_ms / window_ms;
360
361        loop {
362            let prev = self.rotation_state.load(Ordering::Relaxed);
363            let prev_epoch = prev >> 16;
364            let prev_cursor = (prev & 0xFFFF) as u16;
365
366            if prev_epoch == this_epoch {
367                return prev_cursor;
368            }
369            // New window: advance cursor by max_partitions_per_scan so
370            // the next scan covers a fresh slice. Modular wrap on
371            // num_partitions. Zero-partitions is a config bug; caller
372            // guards it, but fall back to 0 to be safe.
373            let new_cursor = if num_partitions == 0 {
374                0
375            } else {
376                prev_cursor.wrapping_add(step) % num_partitions
377            };
378            let new_state = (this_epoch << 16) | u64::from(new_cursor);
379            match self.rotation_state.compare_exchange(
380                prev,
381                new_state,
382                Ordering::Relaxed,
383                Ordering::Relaxed,
384            ) {
385                Ok(_) => return new_cursor,
386                // Lost race; another caller advanced. Re-read and use
387                // whatever cursor now belongs to this epoch.
388                Err(_) => continue,
389            }
390        }
391    }
392
393    /// Find an eligible execution and issue a claim grant.
394    ///
395    /// Iterates all execution partitions looking for the first partition
396    /// with an eligible execution. Issues a claim grant via FCALL.
397    ///
398    /// `worker_capabilities` is sent to `ff_issue_claim_grant` as a sorted
399    /// CSV (BTreeSet guarantees deterministic order). Executions whose
400    /// `required_capabilities` are not a subset of this set are skipped
401    /// (stay queued) via `ScriptError::CapabilityMismatch`.
402    ///
403    /// Returns `Ok(None)` if no eligible executions exist anywhere.
404    /// Returns `Ok(Some(grant))` on success.
405    /// Returns `Err` on Valkey errors.
406    ///
407    /// # Lane-FIFO: prior-run leftovers drain first
408    ///
409    /// Claim grants are issued in per-lane FIFO order over the lane's
410    /// eligible queue. A smoke / dev script that reuses a lane across
411    /// runs will observe the lane's leftover executions from earlier
412    /// runs — failed, orphaned, or still-eligible — drain *before*
413    /// the fresh submission is claimed. This surfaces as a confusing
414    /// "wrong execution picked up" during iterative development.
415    /// Smoke tests should either (a) use a fresh lane name per run
416    /// (e.g. suffixing with a UUID or timestamp), or (b) call
417    /// `cancel_flow` / `FLUSHDB` between runs to drain the lane.
418    pub async fn claim_for_worker(
419        &self,
420        lane: &LaneId,
421        worker_id: &WorkerId,
422        worker_instance_id: &WorkerInstanceId,
423        worker_capabilities: &BTreeSet<String>,
424        grant_ttl_ms: u64,
425    ) -> Result<Option<ClaimGrant>, SchedulerError> {
426        // PR-94: grant issuance latency histogram. Captures the full
427        // cycle (candidate selection + budget/quota admission +
428        // issuance FCALL) so an operator can spot slow paths without
429        // having to correlate scheduler logs across partitions. Drop
430        // guard records on *every* exit path (including ?-propagated
431        // errors) so the histogram reflects wall-clock cost, not just
432        // the happy path.
433        struct ClaimTimer<'a> {
434            start: std::time::Instant,
435            lane: &'a str,
436            metrics: &'a ff_observability::Metrics,
437        }
438        impl Drop for ClaimTimer<'_> {
439            fn drop(&mut self) {
440                self.metrics
441                    .record_claim_from_grant(self.lane, self.start.elapsed());
442            }
443        }
444        let _claim_timer = ClaimTimer {
445            start: std::time::Instant::now(),
446            lane: lane.as_str(),
447            metrics: &self.metrics,
448        };
449        let num_partitions = self.config.num_flow_partitions;
450        // Guard misconfiguration: a zero partition count would hit a
451        // `% num_partitions` division-by-zero in the scan_start / jitter
452        // computation below. The pre-bounded loop simply skipped on
453        // `for offset in 0..0`; preserve that graceful no-op rather than
454        // panicking. Config-returning (not Ok(None)) so an operator who
455        // misconfigures gets a loud, actionable error — silent Ok(None)
456        // would make every claim call look like an empty queue forever.
457        if num_partitions == 0 {
458            return Err(SchedulerError::Config(
459                "num_flow_partitions must be > 0".to_owned(),
460            ));
461        }
462        let mut budget_checker = BudgetChecker::new(self.config);
463
464        // Jitter the partition scan start to avoid thundering-herd on
465        // partition 0 when 100 workers all tick simultaneously. Seeded
466        // from worker_instance_id so this worker hits a stable window
467        // within a single scheduling cycle (still covers every partition),
468        // and different workers naturally diverge. Uses the shared
469        // ff_core::hash FNV-1a reducer — same helper powering ff-sdk's
470        // PARTITION_SCAN_CHUNK cursor seed. Zero-modulus safe.
471        let start_p: u16 = ff_core::hash::fnv1a_u16_mod(
472            worker_instance_id.as_str(),
473            num_partitions,
474        );
475        // BTreeSet iterates sorted → stable CSV for Lua subset match.
476        // Ingress validation mirrors FlowFabricWorker::connect (ff-sdk):
477        //   - `,` is the CSV delimiter — a token containing one would split
478        //     mid-parse and could let a {"gpu"} worker appear to satisfy
479        //     {"gpu,cuda"} (silent auth bypass).
480        //   - Empty strings would inflate the CSV with leading / adjacent
481        //     commas ("" → ",gpu" → [" "," gpu"]) and inflate the token
482        //     count past CAPS_MAX_TOKENS for no semantic reason.
483        //   - Non-printable / whitespace: "gpu " vs "gpu" or "gpu\n" vs
484        //     "gpu" silently mis-routes. Require printable ASCII excluding
485        //     space (0x21-0x7E) at ingress so typos fail loud.
486        // Enforce ALL here so operator misconfig at the scheduler entry
487        // point fails loud, symmetric with the SDK inline-claim path.
488        // Bounds (#csv, #tokens) are enforced by the Lua side.
489        for cap in worker_capabilities {
490            if cap.is_empty() {
491                return Err(SchedulerError::Config(
492                    "capability token must not be empty".to_owned(),
493                ));
494            }
495            if cap.contains(',') {
496                return Err(SchedulerError::Config(format!(
497                    "capability token may not contain ',' (CSV delimiter): {cap:?}"
498                )));
499            }
500            // Reject ASCII control + whitespace (incl. Unicode whitespace);
501            // allow non-ASCII printable UTF-8 so i18n cap names work. CSV
502            // delimiter `,` is single-byte and never a UTF-8 continuation,
503            // so multibyte UTF-8 is safe on the wire. Symmetric with
504            // ff-sdk::FlowFabricWorker::connect.
505            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
506                return Err(SchedulerError::Config(format!(
507                    "capability token must not contain whitespace or control characters: {cap:?}"
508                )));
509            }
510        }
511        if worker_capabilities.len() > ff_core::policy::CAPS_MAX_TOKENS {
512            return Err(SchedulerError::Config(format!(
513                "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
514                ff_core::policy::CAPS_MAX_TOKENS,
515                worker_capabilities.len()
516            )));
517        }
518        let worker_caps_csv = worker_capabilities
519            .iter()
520            .filter(|s| !s.is_empty())
521            .cloned()
522            .collect::<Vec<_>>()
523            .join(",");
524        // Stable digest used in per-mismatch logs so the full 4KB CSV
525        // doesn't get echoed on every mismatch. See worker_caps_digest.
526        let worker_caps_hash = worker_caps_digest(&worker_caps_csv);
527        if worker_caps_csv.len() > ff_core::policy::CAPS_MAX_BYTES {
528            return Err(SchedulerError::Config(format!(
529                "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
530                ff_core::policy::CAPS_MAX_BYTES,
531                worker_caps_csv.len()
532            )));
533        }
534
535        // ── Bounded scan with rotation cursor ──
536        // Prior impl walked all `num_partitions` partitions on every call,
537        // which at `num_flow_partitions = 256` meant a quiet cluster cost
538        // 256 ZRANGEBYSCORE round-trips per claim tick per worker. The
539        // bounded scan caps per-call work at `max_partitions_per_scan`
540        // while the rotation cursor ensures every partition is still
541        // visited within `ceil(total / max_partitions_per_scan)` ticks.
542        //
543        // Needs `now_ms` to compute the rotation window, so we snap server
544        // time up front. One extra TIME round-trip for quiet-cluster no-hit
545        // calls, but on hit paths the existing inner-loop `server_time_ms`
546        // call is now redundant — we reuse this value.
547        let scan_now_ms = match server_time_ms(&self.client).await {
548            Ok(t) => t,
549            Err(e) => {
550                // Transport error talking to Valkey; surface via the same
551                // `ValkeyContext` channel the admission checks use so the
552                // caller retries after backoff instead of silently hitting
553                // partition 0 with a zero cursor.
554                return Err(SchedulerError::ValkeyContext {
555                    source: e,
556                    context: "scheduler: TIME for rotation cursor".to_owned(),
557                });
558            }
559        };
560        let rotation_cursor = self.rotation_cursor(scan_now_ms, num_partitions);
561        // Per-worker jitter stacks on the shared cursor: `start_p` diverges
562        // different workers within one window; `rotation_cursor` drifts the
563        // whole fleet across windows so no single partition is anyone's
564        // permanent "partition 0".
565        let scan_start = (start_p + rotation_cursor) % num_partitions;
566        let scan_budget = self
567            .scheduler_config
568            .max_partitions_per_scan
569            .min(num_partitions)
570            .max(1);
571
572        // Observability counters (RFC-plan item 3): one debug line per
573        // call, not per partition, so a tight-loop worker doesn't flood
574        // logs. `partitions_hit` is 0/1 in practice — the loop returns on
575        // the first grant — but keeping it a counter means future
576        // multi-grant variants (N per call) don't need the log format to
577        // change.
578        let mut partitions_visited: u16 = 0;
579        let mut partitions_skipped: u16 = 0;
580        let mut partitions_hit: u16 = 0;
581        let call_start = std::time::Instant::now();
582
583        for p_idx in iter_partitions(num_partitions, scan_start, scan_budget) {
584            partitions_visited += 1;
585            let partition = Partition {
586                family: PartitionFamily::Execution,
587                index: p_idx,
588            };
589            let idx = IndexKeys::new(&partition);
590            let eligible_key = idx.lane_eligible(lane);
591
592            // ZRANGEBYSCORE eligible -inf +inf LIMIT 0 1
593            // Lowest score = highest priority candidate
594            let candidates: Vec<String> = match self
595                .client
596                .cmd("ZRANGEBYSCORE")
597                .arg(&eligible_key)
598                .arg("-inf")
599                .arg("+inf")
600                .arg("LIMIT")
601                .arg("0")
602                .arg("1")
603                .execute()
604                .await
605            {
606                Ok(ids) => ids,
607                Err(e) => {
608                    tracing::warn!(
609                        partition = p_idx,
610                        error = %e,
611                        "scheduler: ZRANGEBYSCORE eligible failed, skipping partition"
612                    );
613                    partitions_skipped += 1;
614                    continue;
615                }
616            };
617
618            let eid_str = match candidates.first() {
619                Some(s) => s,
620                None => {
621                    // Empty partition — this is the hot path on a quiet
622                    // cluster and the whole point of the bounded scan.
623                    // Don't count as a "skip" (skip implies something
624                    // went wrong); treat as a normal miss.
625                    continue;
626                }
627            };
628
629            // Parse the execution ID
630            let eid = match ExecutionId::parse(eid_str) {
631                Ok(id) => id,
632                Err(e) => {
633                    tracing::warn!(
634                        partition = p_idx,
635                        execution_id = eid_str.as_str(),
636                        error = %e,
637                        "scheduler: invalid execution_id in eligible set, skipping"
638                    );
639                    partitions_skipped += 1;
640                    continue;
641                }
642            };
643
644            let exec_ctx = ExecKeyContext::new(&partition, &eid);
645            let core_key = exec_ctx.core();
646            let eid_s = eid.to_string();
647            // Reuse the call-scoped `scan_now_ms` we read for the rotation
648            // cursor. Semantics for block-detail timestamps are unchanged —
649            // a block decision at time T is still written with a T in the
650            // same call; the tiny staleness (<< 1 RTT) is dwarfed by the
651            // usual scheduler→Valkey latency and saves one TIME round-trip
652            // per candidate.
653            let now_ms = scan_now_ms;
654
655            // ── Capability pre-check (in-slot HGET, cheap) ──
656            // Runs BEFORE quota admission so we never ZADD a quota slot
657            // for an execution this worker can't actually claim. Without
658            // this, on an unmatchable-top-of-zset, every scheduling tick
659            // would ZADD {q:K}:admitted_set then ZREM it via
660            // release_admission on the capability_mismatch reject — a
661            // cross-slot write storm amplifying the mismatch loop.
662            //
663            // Lua `ff_issue_claim_grant` still does the authoritative
664            // check; this is a fast-path short-circuit, not a substitute.
665            // A narrow race exists where `required_capabilities` is
666            // updated between our HGET and the FCALL — the FCALL is still
667            // atomic and correct.
668            let required_caps_csv: Option<String> = match self
669                .client
670                .cmd("HGET")
671                .arg(&core_key)
672                .arg("required_capabilities")
673                .execute::<Option<String>>()
674                .await
675            {
676                Ok(v) => v,
677                Err(e) => {
678                    tracing::warn!(
679                        partition = p_idx,
680                        execution_id = eid_s.as_str(),
681                        error = %e,
682                        "scheduler: HGET required_capabilities failed, skipping candidate"
683                    );
684                    continue;
685                }
686            };
687            if let Some(req) = required_caps_csv.as_deref()
688                && !req.is_empty()
689                && !ff_core::caps::matches_csv(req, worker_capabilities)
690            {
691                // Move this execution out of eligible into blocked_route
692                // so we don't hot-loop on it every tick (RFC-009 §564). A
693                // periodic sweep (scanner side) promotes blocked_route →
694                // eligible when a worker with matching caps registers.
695                // Logged with a hash digest, not the raw CSV, to keep
696                // per-mismatch log volume bounded.
697                tracing::info!(
698                    partition = p_idx,
699                    execution_id = eid_s.as_str(),
700                    worker_id = worker_id.as_str(),
701                    worker_caps_hash = worker_caps_hash.as_str(),
702                    required = req,
703                    "scheduler: capability mismatch, blocking execution off eligible"
704                );
705                self.block_candidate(
706                    &partition, &idx, lane, &eid, &eligible_key,
707                    "waiting_for_capable_worker",
708                    "no connected worker satisfies required_capabilities",
709                    now_ms,
710                ).await;
711                continue;
712            }
713
714            // ── Budget pre-check (cross-partition, cached per cycle) ──
715            if let Some(block_detail) = self
716                .check_budgets(&mut budget_checker, &exec_ctx, &core_key, &eid_s)
717                .await?
718            {
719                // Budget breached — block candidate and try next
720                self.block_candidate(
721                    &partition, &idx, lane, &eid, &eligible_key,
722                    "waiting_for_budget", &block_detail, now_ms,
723                ).await;
724                continue;
725            }
726
727            // ── Quota pre-check (cross-partition FCALL on {q:K}) ──
728            let quota_admission = self
729                .check_quota(&exec_ctx, &core_key, &eid_s, now_ms)
730                .await?;
731            match &quota_admission {
732                QuotaCheckOutcome::Blocked(block_detail) => {
733                    self.block_candidate(
734                        &partition, &idx, lane, &eid, &eligible_key,
735                        "waiting_for_quota", block_detail, now_ms,
736                    ).await;
737                    continue;
738                }
739                QuotaCheckOutcome::NoQuota | QuotaCheckOutcome::Admitted { .. } => {}
740            }
741
742            // ── All checks passed — issue claim grant ──
743            let grant_key = exec_ctx.claim_grant();
744            let keys: [&str; 3] = [&core_key, &grant_key, &eligible_key];
745
746            let ttl_str = grant_ttl_ms.to_string();
747            let wid_s = worker_id.to_string();
748            let wiid_s = worker_instance_id.to_string();
749            let lane_s = lane.to_string();
750
751            let argv: [&str; 9] = [
752                &eid_s,
753                &wid_s,
754                &wiid_s,
755                &lane_s,
756                "",   // capability_hash
757                &ttl_str,
758                "",   // route_snapshot_json
759                "",   // admission_summary
760                &worker_caps_csv, // sorted CSV; empty → matches only empty-required execs
761            ];
762
763            let raw = match self
764                .client
765                .fcall::<ferriskey::Value>("ff_issue_claim_grant", &keys, &argv)
766                .await
767            {
768                Ok(v) => v,
769                Err(e) => {
770                    // Transport failure on the FCALL — NOSCRIPT, IoError,
771                    // ClusterDown, etc. This is NOT a normal soft-reject;
772                    // persistent transport errors mean the scheduler is
773                    // effectively idle even though it looks like it's
774                    // running. WARN so ops dashboards (WARN+ aggregators)
775                    // fire instead of burying it at DEBUG.
776                    tracing::warn!(
777                        partition = p_idx,
778                        execution_id = eid_s.as_str(),
779                        error = %e,
780                        "scheduler: ff_issue_claim_grant transport error, trying next"
781                    );
782                    if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
783                        self.release_admission(tag, quota_id, eid).await;
784                    }
785                    continue;
786                }
787            };
788
789            match FcallResult::parse(&raw).and_then(|r| r.into_success()) {
790                Ok(_) => {
791                    partitions_hit += 1;
792                    tracing::debug!(
793                        partition = p_idx,
794                        execution_id = eid_s.as_str(),
795                        worker_instance_id = worker_instance_id.as_str(),
796                        start_p = scan_start,
797                        partitions_visited,
798                        partitions_skipped,
799                        partitions_hit,
800                        elapsed_ms = call_start.elapsed().as_millis() as u64,
801                        "scheduler: claim call completed (hit)"
802                    );
803                    return Ok(Some(ClaimGrant {
804                        execution_id: eid,
805                        partition_key: ff_core::partition::PartitionKey::from(&partition),
806                        grant_key: grant_key.clone(),
807                        expires_at_ms: now_ms + grant_ttl_ms,
808                    }));
809                }
810                Err(script_err) => {
811                    if matches!(script_err, ScriptError::CapabilityMismatch(_)) {
812                        // Should be rare: the Rust pre-check above
813                        // normally catches this and blocks the execution
814                        // off eligible. Reaching here means the
815                        // required_capabilities field mutated between our
816                        // HGET and the Lua atomic check (narrow race).
817                        // Block here too so the next tick doesn't loop.
818                        //
819                        // Log uses worker_caps_hash (8-hex digest), not
820                        // the full 4KB CSV, to keep per-mismatch log
821                        // volume bounded. Full CSV is logged once at
822                        // worker connect under "worker caps" WARN.
823                        tracing::info!(
824                            partition = p_idx,
825                            execution_id = eid_s.as_str(),
826                            worker_id = wid_s.as_str(),
827                            worker_caps_hash = worker_caps_hash.as_str(),
828                            error = %script_err,
829                            "scheduler: capability mismatch via Lua (race), blocking execution"
830                        );
831                        self.block_candidate(
832                            &partition, &idx, lane, &eid, &eligible_key,
833                            "waiting_for_capable_worker",
834                            "no connected worker satisfies required_capabilities",
835                            now_ms,
836                        ).await;
837                        if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
838                            self.release_admission(tag, quota_id, eid).await;
839                        }
840                        continue;
841                    } else {
842                        // Any other logical reject (grant_already_exists,
843                        // execution_not_in_eligible_set, execution_not_eligible,
844                        // invalid_capabilities, etc.). These are rare and each
845                        // indicates either a race or a config problem — in
846                        // either case ops need to see it, so WARN, not DEBUG.
847                        tracing::warn!(
848                            partition = p_idx,
849                            execution_id = eid_s.as_str(),
850                            error = %script_err,
851                            "scheduler: ff_issue_claim_grant rejected, trying next"
852                        );
853                    }
854                    if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
855                        self.release_admission(tag, quota_id, eid).await;
856                    }
857                    continue;
858                }
859            }
860        }
861
862        tracing::debug!(
863            worker_instance_id = worker_instance_id.as_str(),
864            start_p = scan_start,
865            partitions_visited,
866            partitions_skipped,
867            partitions_hit,
868            elapsed_ms = call_start.elapsed().as_millis() as u64,
869            "scheduler: claim call completed (no hit)"
870        );
871        Ok(None)
872    }
873
874    /// Read budget_ids from exec_core and check each. Returns block detail
875    /// string if any budget is breached, None if all pass.
876    async fn check_budgets(
877        &self,
878        checker: &mut BudgetChecker,
879        _exec_ctx: &ExecKeyContext,
880        core_key: &str,
881        _eid_s: &str,
882    ) -> Result<Option<String>, SchedulerError> {
883        // Read budget_ids from exec_core (comma-separated or JSON list)
884        let budget_ids_str: Option<String> = self
885            .client
886            .cmd("HGET")
887            .arg(core_key)
888            .arg("budget_ids")
889            .execute()
890            .await?;
891
892        let budget_ids_str = match budget_ids_str {
893            Some(s) => s,
894            None => return Ok(None),
895        };
896        if budget_ids_str.is_empty() {
897            return Ok(None); // no budgets attached
898        }
899
900        // Parse comma-separated budget IDs
901        for budget_id in budget_ids_str.split(',') {
902            let budget_id = budget_id.trim();
903            if budget_id.is_empty() {
904                continue;
905            }
906            let result = checker.check_budget(&self.client, budget_id).await?;
907            if let BudgetCheckResult::HardBreach { dimension, detail } = &result {
908                // PR-94: budget hard-breach counter, labelled by
909                // dimension so operators can distinguish "tokens"
910                // breaches from "cost" breaches at a glance.
911                self.metrics.inc_budget_hit(dimension);
912                return Ok(Some(detail.clone()));
913            }
914        }
915
916        Ok(None)
917    }
918
919    /// Check quota admission for the candidate.
920    async fn check_quota(
921        &self,
922        _exec_ctx: &ExecKeyContext,
923        core_key: &str,
924        eid_s: &str,
925        now_ms: u64,
926    ) -> Result<QuotaCheckOutcome, SchedulerError> {
927        // Read quota_policy_id from exec_core
928        let quota_id_str: Option<String> = self
929            .client
930            .cmd("HGET")
931            .arg(core_key)
932            .arg("quota_policy_id")
933            .execute()
934            .await?;
935
936        let quota_id_str = match quota_id_str {
937            Some(s) => s,
938            None => return Ok(QuotaCheckOutcome::NoQuota),
939        };
940        if quota_id_str.is_empty() {
941            return Ok(QuotaCheckOutcome::NoQuota);
942        }
943
944        // Compute real {q:K} partition tag from quota_policy_id
945        let tag = match QuotaPolicyId::parse(&quota_id_str) {
946            Ok(qid) => {
947                let partition = quota_partition(&qid, &self.config);
948                partition.hash_tag()
949            }
950            Err(_) => "{q:0}".to_owned(), // fallback for non-UUID test IDs
951        };
952
953        let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id_str);
954        let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id_str);
955        let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id_str);
956        let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id_str, eid_s);
957        let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id_str);
958
959        // Read quota limits from policy hash
960        let rate_limit: Option<String> = self.client
961            .cmd("HGET").arg(&quota_def_key).arg("max_requests_per_window")
962            .execute().await?;
963        let window_secs: Option<String> = self.client
964            .cmd("HGET").arg(&quota_def_key).arg("requests_per_window_seconds")
965            .execute().await?;
966        let concurrency_cap: Option<String> = self.client
967            .cmd("HGET").arg(&quota_def_key).arg("active_concurrency_cap")
968            .execute().await?;
969        let jitter: Option<String> = self.client
970            .cmd("HGET").arg(&quota_def_key).arg("jitter_ms")
971            .execute().await?;
972
973        let rate_limit = rate_limit.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
974        let window_secs = window_secs.as_deref().and_then(|s| s.parse().ok()).unwrap_or(60u64);
975        let concurrency_cap = concurrency_cap.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
976        let jitter_ms = jitter.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
977
978        // No limits configured — admit without recording
979        if rate_limit == 0 && concurrency_cap == 0 {
980            return Ok(QuotaCheckOutcome::NoQuota);
981        }
982
983        // FCALL ff_check_admission_and_record on {q:K}
984        let keys: [&str; 5] = [&window_key, &concurrency_key, &quota_def_key, &admitted_key, &admitted_set_key];
985        let now_s = now_ms.to_string();
986        let ws = window_secs.to_string();
987        let rl = rate_limit.to_string();
988        let cc = concurrency_cap.to_string();
989        let jt = jitter_ms.to_string();
990        let argv: [&str; 6] = [&now_s, &ws, &rl, &cc, eid_s, &jt];
991
992        match self.client
993            .fcall::<ferriskey::Value>("ff_check_admission_and_record", &keys, &argv)
994            .await
995        {
996            Ok(result) => {
997                // Parse domain-specific result: {"ADMITTED"}, {"RATE_EXCEEDED", retry_after},
998                // {"CONCURRENCY_EXCEEDED"}, {"ALREADY_ADMITTED"}
999                let status = Self::parse_admission_status(&result);
1000                match status.as_str() {
1001                    "ADMITTED" | "ALREADY_ADMITTED" => Ok(QuotaCheckOutcome::Admitted {
1002                        tag: tag.clone(),
1003                        quota_id: quota_id_str.clone(),
1004                        eid: eid_s.to_owned(),
1005                    }),
1006                    "RATE_EXCEEDED" => {
1007                        // PR-94: quota admission block, labelled by
1008                        // reason so rate-vs-concurrency waves are
1009                        // distinguishable in a dashboard.
1010                        self.metrics.inc_quota_hit("rate");
1011                        Ok(QuotaCheckOutcome::Blocked(format!(
1012                            "quota {}: rate limit {}/{} per {}s window",
1013                            quota_id_str, rate_limit, rate_limit, window_secs
1014                        )))
1015                    }
1016                    "CONCURRENCY_EXCEEDED" => {
1017                        self.metrics.inc_quota_hit("concurrency");
1018                        Ok(QuotaCheckOutcome::Blocked(format!(
1019                            "quota {}: concurrency cap {}",
1020                            quota_id_str, concurrency_cap
1021                        )))
1022                    }
1023                    other => {
1024                        // Fail-closed: an unrecognised status is the Lua
1025                        // telling us a contract we don't understand. Do
1026                        // NOT default to admit — surface it so the
1027                        // scheduler retries next cycle and ops sees the
1028                        // event.
1029                        tracing::warn!(
1030                            quota_id = quota_id_str.as_str(),
1031                            status = other,
1032                            "scheduler: unexpected admission result, denying (fail-closed)"
1033                        );
1034                        Err(SchedulerError::Config(format!(
1035                            "quota {quota_id_str}: unexpected admission status \"{other}\""
1036                        )))
1037                    }
1038                }
1039            }
1040            Err(e) => {
1041                // Fail-closed: transport fault on the admission FCALL
1042                // must NOT silently admit the candidate. Propagate so
1043                // the outer cycle returns the error and the worker
1044                // retries after the usual backoff; the next cycle will
1045                // re-run the admission check against fresh state.
1046                tracing::warn!(
1047                    quota_id = quota_id_str.as_str(),
1048                    error = %e,
1049                    "scheduler: quota FCALL failed, denying (fail-closed)"
1050                );
1051                Err(SchedulerError::ValkeyContext {
1052                    source: e,
1053                    context: format!("ff_check_admission_and_record {quota_id_str}"),
1054                })
1055            }
1056        }
1057    }
1058
1059    /// Parse the first element of a Valkey array result as a status string.
1060    fn parse_admission_status(result: &ferriskey::Value) -> String {
1061        match result {
1062            ferriskey::Value::Array(arr) => {
1063                match arr.first() {
1064                    Some(Ok(ferriskey::Value::BulkString(b))) => {
1065                        String::from_utf8_lossy(b).into_owned()
1066                    }
1067                    Some(Ok(ferriskey::Value::SimpleString(s))) => s.clone(),
1068                    _ => "UNKNOWN".to_owned(),
1069                }
1070            }
1071            _ => "UNKNOWN".to_owned(),
1072        }
1073    }
1074
1075    /// Block a candidate that failed budget/quota check.
1076    /// FCALL ff_block_execution_for_admission on {p:N}.
1077    #[allow(clippy::too_many_arguments)]
1078    async fn block_candidate(
1079        &self,
1080        partition: &Partition,
1081        idx: &IndexKeys,
1082        lane: &LaneId,
1083        eid: &ExecutionId,
1084        eligible_key: &str,
1085        block_reason: &str,
1086        blocking_detail: &str,
1087        now_ms: u64,
1088    ) {
1089        let exec_ctx = ExecKeyContext::new(partition, eid);
1090        let core_key = exec_ctx.core();
1091        let eid_s = eid.to_string();
1092        let blocked_key = match block_reason {
1093            "waiting_for_budget" => idx.lane_blocked_budget(lane),
1094            "waiting_for_quota" => idx.lane_blocked_quota(lane),
1095            "waiting_for_capable_worker" => idx.lane_blocked_route(lane),
1096            _ => idx.lane_blocked_budget(lane),
1097        };
1098
1099        let keys: [&str; 3] = [&core_key, eligible_key, &blocked_key];
1100        let now_s = now_ms.to_string();
1101        let argv: [&str; 4] = [&eid_s, block_reason, blocking_detail, &now_s];
1102
1103        // Parse FcallResult so we distinguish Lua-level rejections (e.g.
1104        // execution_not_active because the execution went terminal between
1105        // our HGET and the FCALL) from a real block. Previously `Ok(_)`
1106        // treated an err-tuple as success → INFO log "candidate blocked"
1107        // while nothing actually changed on exec_core, then the next tick
1108        // re-picked the same candidate and looped. Mirrors the
1109        // release_admission parse fix.
1110        match self.client
1111            .fcall::<ferriskey::Value>("ff_block_execution_for_admission", &keys, &argv)
1112            .await
1113        {
1114            Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
1115                Ok(_) => {
1116                    tracing::info!(
1117                        execution_id = eid_s,
1118                        reason = block_reason,
1119                        "scheduler: candidate blocked by admission check"
1120                    );
1121                }
1122                Err(script_err) => {
1123                    // Logical reject from Lua (e.g. execution_not_active
1124                    // — the execution went terminal between the scheduler
1125                    // pick and the block FCALL; the candidate loop will
1126                    // naturally move on). WARN so ops dashboards surface
1127                    // actual block failures, but not so loud that a common
1128                    // race spams alerts.
1129                    tracing::warn!(
1130                        execution_id = eid_s,
1131                        reason = block_reason,
1132                        error = %script_err,
1133                        "scheduler: ff_block_execution_for_admission rejected by Lua"
1134                    );
1135                }
1136            },
1137            Err(e) => {
1138                tracing::warn!(
1139                    execution_id = eid_s,
1140                    error = %e,
1141                    "scheduler: ff_block_execution_for_admission transport failed"
1142                );
1143            }
1144        }
1145    }
1146
1147    /// Release a previously-recorded quota admission slot.
1148    /// Called when ff_issue_claim_grant fails after admission was recorded.
1149    async fn release_admission(
1150        &self,
1151        tag: &str,
1152        quota_id: &str,
1153        eid_s: &str,
1154    ) {
1155        let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid_s);
1156        let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
1157        let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
1158
1159        let keys: [&str; 3] = [&admitted_key, &admitted_set_key, &concurrency_key];
1160        let argv: [&str; 1] = [eid_s];
1161
1162        // Parse the Lua response properly: FCALL returns `Ok(Value)` for
1163        // BOTH success and logical-error paths. Treating Ok(_) blindly as
1164        // "released" logs a false positive when the Lua returns
1165        // `{0, "quota_not_found"}` (or any other script-level err) — the
1166        // slot in fact remains pinned until its TTL expires, which is
1167        // minutes to hours. Surface the real outcome so on-call sees
1168        // actual release failures instead of clean "released" events.
1169        match self.client
1170            .fcall::<ferriskey::Value>("ff_release_admission", &keys, &argv)
1171            .await
1172        {
1173            Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
1174                Ok(_) => {
1175                    tracing::info!(
1176                        execution_id = eid_s,
1177                        quota_id,
1178                        "scheduler: released admission after claim failure"
1179                    );
1180                }
1181                Err(script_err) => {
1182                    tracing::warn!(
1183                        execution_id = eid_s,
1184                        quota_id,
1185                        error = %script_err,
1186                        "scheduler: ff_release_admission rejected by Lua \
1187                         (slot will expire via TTL)"
1188                    );
1189                }
1190            },
1191            Err(e) => {
1192                tracing::warn!(
1193                    execution_id = eid_s,
1194                    quota_id,
1195                    error = %e,
1196                    "scheduler: ff_release_admission transport failed \
1197                     (slot will expire via TTL)"
1198                );
1199            }
1200        }
1201    }
1202}
1203
1204/// Get server time in milliseconds via the TIME command.
1205async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
1206    let result: Vec<String> = client
1207        .cmd("TIME")
1208        .execute()
1209        .await?;
1210    if result.len() < 2 {
1211        return Err(ferriskey::Error::from((
1212            ferriskey::ErrorKind::ClientError,
1213            "TIME returned fewer than 2 elements",
1214        )));
1215    }
1216    let secs: u64 = result[0].parse().map_err(|_| {
1217        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
1218    })?;
1219    let micros: u64 = result[1].parse().map_err(|_| {
1220        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
1221    })?;
1222    Ok(secs * 1000 + micros / 1000)
1223}
1224
1225/// Errors from the scheduler.
1226#[derive(Debug, thiserror::Error)]
1227pub enum SchedulerError {
1228    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
1229    #[error("valkey: {0}")]
1230    Valkey(#[from] ferriskey::Error),
1231    /// Valkey error with additional context (preserves ErrorKind via #[source]).
1232    #[error("valkey ({context}): {source}")]
1233    ValkeyContext {
1234        #[source]
1235        source: ferriskey::Error,
1236        context: String,
1237    },
1238    /// Caller-supplied value failed ingress validation. NOT retryable — the
1239    /// caller must fix its input before retrying.
1240    #[error("config: {0}")]
1241    Config(String),
1242}
1243
1244impl SchedulerError {
1245    /// Returns the underlying ferriskey ErrorKind, if this is a Valkey error.
1246    /// Matches `ScriptError::valkey_kind` so callers can treat both
1247    /// uniformly. (Note: `ServerError` exposed the same shape until #88
1248    /// renamed it to `ServerError::backend_kind` with a `BackendErrorKind`
1249    /// return; this scheduler-internal surface keeps the raw-ErrorKind
1250    /// name pending its own sealing.)
1251    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
1252        match self {
1253            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
1254            Self::Config(_) => None,
1255        }
1256    }
1257
1258    /// Whether this error is safely retryable by a caller. Mirrors
1259    /// `ServerError::is_retryable` semantics.
1260    pub fn is_retryable(&self) -> bool {
1261        self.valkey_kind()
1262            .map(is_retryable_kind)
1263            .unwrap_or(false)
1264    }
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269    use super::*;
1270    use ferriskey::ErrorKind;
1271
1272    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
1273        ferriskey::Error::from((kind, "synthetic"))
1274    }
1275
1276    #[test]
1277    fn scheduler_is_retryable_matches_kind_table() {
1278        assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
1279        assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
1280
1281        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
1282        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
1283        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
1284    }
1285
1286    #[test]
1287    fn scheduler_valkey_context_is_retryable() {
1288        let err = SchedulerError::ValkeyContext {
1289            source: mk_fk_err(ErrorKind::BusyLoadingError),
1290            context: "HGET budget_ids".into(),
1291        };
1292        assert!(err.is_retryable());
1293    }
1294
1295    #[test]
1296    fn scheduler_valkey_kind_exposed() {
1297        let err = SchedulerError::Valkey(mk_fk_err(ErrorKind::TryAgain));
1298        assert_eq!(err.valkey_kind(), Some(ErrorKind::TryAgain));
1299    }
1300
1301    // ── iter_partitions: regression test for the modular wrap + bounded
1302    // length contract. The fairness behaviour of the bounded scheduler
1303    // scan depends entirely on this helper returning exactly `count`
1304    // distinct partition indices starting at `start` and wrapping modulo
1305    // `total`. Tested in isolation so a bug here is distinguishable from
1306    // a bug in the Valkey-backed loop. ──
1307
1308    #[test]
1309    fn iter_partitions_no_wrap() {
1310        // start=10, count=5, total=256 → 10..15, no wrap involved.
1311        let ps: Vec<u16> = iter_partitions(256, 10, 5).collect();
1312        assert_eq!(ps, vec![10, 11, 12, 13, 14]);
1313    }
1314
1315    #[test]
1316    fn iter_partitions_wraps_modulo_total() {
1317        // start=254, count=5, total=256 → 254, 255, 0, 1, 2.
1318        let ps: Vec<u16> = iter_partitions(256, 254, 5).collect();
1319        assert_eq!(ps, vec![254, 255, 0, 1, 2]);
1320    }
1321
1322    #[test]
1323    fn iter_partitions_count_capped_to_total() {
1324        // Asking for more than `total` yields exactly `total` distinct
1325        // partitions — never a duplicate, never more than the universe.
1326        let ps: Vec<u16> = iter_partitions(4, 1, 100).collect();
1327        assert_eq!(ps, vec![1, 2, 3, 0]);
1328    }
1329
1330    #[test]
1331    fn iter_partitions_length_matches_count() {
1332        // Invariant: output length == min(count, total). The scan loop
1333        // upper-bounds round-trips on this, so regressing it would
1334        // silently re-introduce the 256-round-trip-per-tick bug.
1335        for start in [0u16, 1, 50, 255] {
1336            for count in [0u16, 1, 16, 32, 256] {
1337                let len = iter_partitions(256, start, count).count();
1338                assert_eq!(len, count.min(256) as usize);
1339            }
1340        }
1341    }
1342
1343    // ── Fairness: the union of the partitions visited across
1344    // `ceil(total / max_partitions_per_scan)` successive scans, with the
1345    // rotation cursor advancing each scan, must cover every partition
1346    // exactly once. This is the contract the operator rustdoc promises
1347    // ("any given partition reached within 8 ticks at defaults"). ──
1348    #[test]
1349    fn fairness_full_coverage_in_ceil_total_over_budget_scans() {
1350        const TOTAL: u16 = 256;
1351        const BUDGET: u16 = SchedulerConfig::DEFAULT_MAX_PARTITIONS_PER_SCAN;
1352        let scans = TOTAL.div_ceil(BUDGET); // 8 at defaults
1353
1354        // Simulate the same advance logic the live cursor performs: each
1355        // "scan" starts at the previous start + BUDGET (mod TOTAL). We
1356        // pin start to 0 for determinism; the per-worker FNV jitter is a
1357        // phase offset on top and doesn't change coverage.
1358        let mut union = std::collections::BTreeSet::new();
1359        let mut cursor: u16 = 0;
1360        for _ in 0..scans {
1361            for p in iter_partitions(TOTAL, cursor, BUDGET) {
1362                union.insert(p);
1363            }
1364            cursor = cursor.wrapping_add(BUDGET) % TOTAL;
1365        }
1366
1367        assert_eq!(union.len(), TOTAL as usize, "every partition visited once");
1368        for p in 0..TOTAL {
1369            assert!(union.contains(&p), "missing partition {p}");
1370        }
1371    }
1372
1373    #[test]
1374    fn fairness_full_coverage_with_phase_offset() {
1375        // Regression: the per-worker FNV phase must not change the
1376        // coverage property. Pick a non-zero start; we still cover the
1377        // whole universe in ceil(total/budget) scans.
1378        const TOTAL: u16 = 256;
1379        const BUDGET: u16 = SchedulerConfig::DEFAULT_MAX_PARTITIONS_PER_SCAN;
1380        let scans = TOTAL.div_ceil(BUDGET);
1381
1382        let mut union = std::collections::BTreeSet::new();
1383        let mut cursor: u16 = 137; // arbitrary per-worker jitter
1384        for _ in 0..scans {
1385            for p in iter_partitions(TOTAL, cursor, BUDGET) {
1386                union.insert(p);
1387            }
1388            cursor = cursor.wrapping_add(BUDGET) % TOTAL;
1389        }
1390        assert_eq!(union.len(), TOTAL as usize);
1391    }
1392
1393    #[test]
1394    fn scheduler_config_defaults_match_rustdoc() {
1395        let c = SchedulerConfig::default();
1396        assert_eq!(c.max_partitions_per_scan, 32);
1397        assert_eq!(c.rotation_window_ms, 250);
1398    }
1399}
1400
1401
1402