Skip to main content

ff_scheduler/
claim.rs

1//! Claim-grant cycle: find eligible executions and issue grants.
2//!
3//! The scheduler selects candidates from partition-local eligible sorted sets,
4//! then atomically issues a claim grant via `FCALL ff_issue_claim_grant`.
5//! The worker (ff-sdk) subsequently consumes the grant via `ff_claim_execution`.
6//!
7//! Phase 5: single lane, budget/quota pre-checks before grant issuance.
8//! Candidates that fail budget/quota are blocked via ff_block_execution_for_admission.
9//!
10//! Reference: RFC-009 §12.7, RFC-010 §3.1, RFC-008 §1.7
11
12use ff_core::keys::{ExecKeyContext, IndexKeys};
13use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition, quota_partition};
14use ff_core::types::{BudgetId, ExecutionId, LaneId, QuotaPolicyId, WorkerId, WorkerInstanceId};
15use ff_script::error::ScriptError;
16use ff_script::result::FcallResult;
17use ff_script::retry::is_retryable_kind;
18use std::collections::BTreeSet;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21/// Short stable digest of a worker's capability CSV. Used in per-mismatch
22/// log lines so a worker-caps-CSV up to 4KB does not get echoed on every
23/// capability_mismatch event (would swamp aggregators during an incident).
24/// The full CSV is logged once at WARN when the worker connects; operators
25/// cross-reference this 8-hex prefix to the full set.
26/// Return true iff every non-empty comma-separated token in `required_csv`
27/// is present in `worker_caps`. Mirrors the authoritative Lua subset check
28/// in scheduling.lua (parse_capability_csv + missing_capabilities) — this
29/// is a fast-path Rust short-circuit so we can skip the quota-admission
30/// write for executions we already know the worker can't claim. Empty /
31/// all-separator CSV → subset holds trivially.
32fn caps_subset(required_csv: &str, worker_caps: &BTreeSet<String>) -> bool {
33    required_csv
34        .split(',')
35        .filter(|t| !t.is_empty())
36        .all(|t| worker_caps.contains(t))
37}
38
39/// Short, stable digest of a worker's caps CSV for per-event log lines.
40/// Thin wrapper around the shared helper so call sites read as
41/// "worker_caps_digest" locally while the algorithm lives in one place.
42fn worker_caps_digest(csv: &str) -> String {
43    ff_core::hash::fnv1a_xor8hex(csv)
44}
45
46/// A claim grant issued by the scheduler for a specific execution.
47///
48/// Re-exported from [`ff_core::contracts::ClaimGrant`]. Lives in
49/// `ff-core` so `ff-scheduler` (issuer) and `ff-sdk` (consumer)
50/// share one wire-level type without a cross-dep between them.
51pub use ff_core::contracts::ClaimGrant;
52
53/// A reclaim grant for a resumed (attempt_interrupted) execution.
54///
55/// Re-export of [`ff_core::contracts::ReclaimGrant`] for symmetry
56/// with [`ClaimGrant`]. `ff-scheduler` will be the canonical
57/// producer once the Batch-C reclaim scanner lands; today only
58/// test fixtures construct this type. Consumed by
59/// `FlowFabricWorker::claim_from_reclaim_grant`.
60pub use ff_core::contracts::ReclaimGrant;
61
62/// Budget check result from a cross-partition budget read.
63#[derive(Debug)]
64pub enum BudgetCheckResult {
65    /// Budget is within limits — proceed.
66    Ok,
67    /// Budget hard limit breached. Contains (dimension, detail_string).
68    HardBreach { dimension: String, detail: String },
69}
70
71/// Outcome of a quota admission check.
72enum QuotaCheckOutcome {
73    /// No quota attached to this execution.
74    NoQuota,
75    /// Quota admitted — carries context for release on subsequent failure.
76    Admitted { tag: String, quota_id: String, eid: String },
77    /// Quota denied — execution should be blocked.
78    Blocked(String),
79}
80
81/// Cross-partition budget checker with per-cycle caching.
82///
83/// Reads budget usage/limits once per scan cycle (not per candidate).
84/// This is MANDATORY for performance — without it, 50K blocked executions
85/// would produce 50K budget reads per cycle.
86pub struct BudgetChecker {
87    /// Cached budget status: budget_id → BudgetCheckResult.
88    /// Reset at the start of each scheduler cycle.
89    cache: std::collections::HashMap<String, BudgetCheckResult>,
90    config: PartitionConfig,
91}
92
93impl BudgetChecker {
94    pub fn new(config: PartitionConfig) -> Self {
95        Self {
96            cache: std::collections::HashMap::new(),
97            config,
98        }
99    }
100
101    /// Check a budget by ID. Reads from Valkey on first call per budget,
102    /// caches for subsequent candidates in the same cycle.
103    ///
104    /// Fail-closed: transport errors propagate as
105    /// [`SchedulerError::ValkeyContext`] rather than being cached as
106    /// `Ok`. Caching an Err would be wrong — a transient blip would then
107    /// pin every candidate in this cycle to the same denial.  Successful
108    /// results (including `HardBreach`) are still cached so 50K blocked
109    /// candidates sharing a budget do one read per cycle, not 50K.
110    pub async fn check_budget(
111        &mut self,
112        client: &ferriskey::Client,
113        budget_id: &str,
114    ) -> Result<&BudgetCheckResult, SchedulerError> {
115        if self.cache.contains_key(budget_id) {
116            return Ok(&self.cache[budget_id]);
117        }
118
119        // Compute real {b:M} partition tag from budget_id
120        let (usage_key, limits_key) = match BudgetId::parse(budget_id) {
121            Ok(bid) => {
122                let partition = budget_partition(&bid, &self.config);
123                let tag = partition.hash_tag();
124                (
125                    format!("ff:budget:{}:{}:usage", tag, budget_id),
126                    format!("ff:budget:{}:{}:limits", tag, budget_id),
127                )
128            }
129            Err(_) => {
130                // Fallback for non-UUID budget IDs (test compat)
131                (
132                    format!("ff:budget:{{b:0}}:{}:usage", budget_id),
133                    format!("ff:budget:{{b:0}}:{}:limits", budget_id),
134                )
135            }
136        };
137
138        let result =
139            Self::read_and_check(client, &usage_key, &limits_key)
140                .await
141                .map_err(|source| SchedulerError::ValkeyContext {
142                    source,
143                    context: format!("budget_checker read {budget_id}"),
144                })?;
145
146        self.cache.insert(budget_id.to_owned(), result);
147        Ok(&self.cache[budget_id])
148    }
149
150    /// Read budget usage and limits, compare each dimension.
151    async fn read_and_check(
152        client: &ferriskey::Client,
153        usage_key: &str,
154        limits_key: &str,
155    ) -> Result<BudgetCheckResult, ferriskey::Error> {
156        // Read all limit dimensions via hgetall (returns HashMap, not flat pairs)
157        let limits: std::collections::HashMap<String, String> = client
158            .hgetall(limits_key)
159            .await?;
160
161        // Parse hard limits
162        for (field, limit_val) in &limits {
163            if !field.starts_with("hard:") {
164                continue;
165            }
166            let dimension = &field[5..]; // strip "hard:" prefix
167            let limit: u64 = match limit_val.parse() {
168                Ok(v) if v > 0 => v,
169                _ => continue,
170            };
171
172            // Read current usage for this dimension. Fail-closed: a
173            // transport error must NOT silently default the usage to 0
174            // (which would make every breach invisible).  Absence is
175            // still treated as 0 — the caller is expected to HSET
176            // usage_key on every increment.
177            let usage_str: Option<String> = client
178                .cmd("HGET")
179                .arg(usage_key)
180                .arg(dimension)
181                .execute()
182                .await?;
183            let usage: u64 = usage_str
184                .as_deref()
185                .and_then(|s| s.parse().ok())
186                .unwrap_or(0);
187
188            if usage >= limit {
189                return Ok(BudgetCheckResult::HardBreach {
190                    dimension: dimension.to_owned(),
191                    detail: format!("budget {}: {} {}/{}", usage_key, dimension, usage, limit),
192                });
193            }
194        }
195
196        Ok(BudgetCheckResult::Ok)
197    }
198
199    /// Clear the cache at the start of a new scheduler cycle.
200    pub fn reset(&mut self) {
201        self.cache.clear();
202    }
203}
204
205/// Tunable scheduler behavior, separate from the topology-level
206/// [`PartitionConfig`]. Lives in `ff-scheduler` because every field is a
207/// scheduler-internal scan policy — none of it leaks into persisted keys,
208/// Lua scripts, or cross-crate wire shapes (unlike `PartitionConfig`).
209/// If a future RFC needs a unified knob surface we can lift this up; not
210/// premature today.
211#[derive(Debug, Clone, Copy)]
212pub struct SchedulerConfig {
213    /// Maximum number of partitions to probe in a single
214    /// [`Scheduler::claim_for_worker`] call before giving up and returning
215    /// `Ok(None)`.
216    ///
217    /// **Trade-off:** smaller = lower worst-case no-hit latency per claim
218    /// call (each probe is a ZRANGEBYSCORE round-trip, ~0.1ms LAN), larger
219    /// = better fairness per call (more partitions seen before a worker
220    /// yields). At the default of 32 with 256 partitions, any given
221    /// partition is reached within `ceil(256/32) = 8` scheduling ticks —
222    /// combined with the rotation cursor, that bounds worst-case
223    /// starvation for a specific partition's head-of-queue execution.
224    pub max_partitions_per_scan: u16,
225    /// Duration a rotation cursor position stays stable before advancing.
226    ///
227    /// **Trade-off:** too short and tight-loop workers re-enter the same
228    /// window on every tick (cursor never actually rotates relative to
229    /// them); too long and slow-poll workers keep seeing the same cursor
230    /// across many ticks (reducing fairness benefit). 250ms is a middle
231    /// ground: tight-loop workers (sub-ms claim cycles) see a fresh
232    /// window every ~250 ticks, 1s-poll workers see a fresh window every
233    /// 4 ticks. Tune down if your workers all idle-poll >1s; tune up if
234    /// you run a fleet of tight-loop claimers and want less cursor churn.
235    pub rotation_window_ms: u64,
236}
237
238impl SchedulerConfig {
239    /// Default scan budget: probe 32 partitions per claim call.
240    /// See [`Self::max_partitions_per_scan`] for the latency/fairness
241    /// rationale.
242    pub const DEFAULT_MAX_PARTITIONS_PER_SCAN: u16 = 32;
243
244    /// Default rotation window: advance the cursor every 250ms.
245    /// See [`Self::rotation_window_ms`].
246    pub const DEFAULT_ROTATION_WINDOW_MS: u64 = 250;
247}
248
249impl Default for SchedulerConfig {
250    fn default() -> Self {
251        Self {
252            max_partitions_per_scan: Self::DEFAULT_MAX_PARTITIONS_PER_SCAN,
253            rotation_window_ms: Self::DEFAULT_ROTATION_WINDOW_MS,
254        }
255    }
256}
257
258/// Iterate partitions `[start, start+1, ..., start+count-1] mod total`.
259///
260/// Factored out of [`Scheduler::claim_for_worker`] so the modular-wrap +
261/// bounded-length contract has a dedicated unit test surface, independent
262/// of Valkey. Called with `count <= total`; values are always distinct.
263///
264/// Returns an iterator (not a Vec) to keep the hot claim path allocation-
265/// free — this runs once per claim tick per worker.
266fn iter_partitions(total: u16, start: u16, count: u16) -> impl Iterator<Item = u16> {
267    debug_assert!(total > 0);
268    let count = count.min(total);
269    (0..count).map(move |i| start.wrapping_add(i) % total)
270}
271
272/// Single-lane scheduler with budget/quota pre-checks.
273///
274/// Iterates execution partitions sequentially, picks the first eligible
275/// execution (lowest priority score). Before issuing a claim grant:
276/// 1. Check all attached budgets (cross-partition, cached per cycle)
277/// 2. Check quota admission (cross-partition FCALL)
278/// 3. If any check fails: block the candidate and try next
279/// 4. If all pass: issue the claim grant
280///
281/// Scan bounding + rotation: each call probes at most
282/// [`SchedulerConfig::max_partitions_per_scan`] partitions starting from
283/// a rotation cursor that advances once per
284/// [`SchedulerConfig::rotation_window_ms`]. The per-worker FNV jitter is
285/// applied on top so different workers diverge within any given window.
286pub struct Scheduler {
287    client: ferriskey::Client,
288    config: PartitionConfig,
289    scheduler_config: SchedulerConfig,
290    /// Packed rotation state: high 48 bits = last window epoch seen
291    /// (`now_ms / rotation_window_ms`), low 16 bits = current cursor
292    /// partition index. A single atomic keeps "advance cursor iff we're
293    /// the first call in a new window" race-free without a Mutex.
294    rotation_state: AtomicU64,
295    /// PR-94: observability handle for claim_from_grant duration and
296    /// budget/quota hit counters. Defaults to a fresh
297    /// `ff_observability::Metrics::new()` — under the default build
298    /// (`observability` feature off) that's the no-op shim; with the
299    /// feature on it's a private real OTEL registry not shared with
300    /// any scrape. Callers that want a shared registry plumb it in
301    /// via [`Self::with_metrics`] / [`Self::with_config_and_metrics`].
302    metrics: std::sync::Arc<ff_observability::Metrics>,
303}
304
305impl Scheduler {
306    pub fn new(client: ferriskey::Client, config: PartitionConfig) -> Self {
307        Self::with_config(client, config, SchedulerConfig::default())
308    }
309
310    /// Construct a scheduler with an explicit [`SchedulerConfig`].
311    /// Use this when you need non-default scan bounds (e.g., in tests that
312    /// want to walk every partition in one call, or deployments with
313    /// non-default polling cadence). Most callers should use
314    /// [`Self::new`].
315    pub fn with_config(
316        client: ferriskey::Client,
317        config: PartitionConfig,
318        scheduler_config: SchedulerConfig,
319    ) -> Self {
320        Self::with_config_and_metrics(
321            client,
322            config,
323            scheduler_config,
324            std::sync::Arc::new(ff_observability::Metrics::new()),
325        )
326    }
327
328    /// PR-94: construct a scheduler with a shared observability
329    /// registry and default [`SchedulerConfig`]. Used by `ff-server`
330    /// so claim/budget/quota metrics land in the same registry
331    /// exposed at `/metrics`. For test harnesses that need both a
332    /// custom `SchedulerConfig` AND observability, use
333    /// [`Self::with_config_and_metrics`].
334    pub fn with_metrics(
335        client: ferriskey::Client,
336        config: PartitionConfig,
337        metrics: std::sync::Arc<ff_observability::Metrics>,
338    ) -> Self {
339        Self::with_config_and_metrics(client, config, SchedulerConfig::default(), metrics)
340    }
341
342    /// PR-94: construct a scheduler with an explicit
343    /// [`SchedulerConfig`] AND a shared observability registry.
344    /// Convenience constructor so callers don't have to thread
345    /// both concerns through separate builders.
346    pub fn with_config_and_metrics(
347        client: ferriskey::Client,
348        config: PartitionConfig,
349        scheduler_config: SchedulerConfig,
350        metrics: std::sync::Arc<ff_observability::Metrics>,
351    ) -> Self {
352        Self {
353            client,
354            config,
355            scheduler_config,
356            rotation_state: AtomicU64::new(0),
357            metrics,
358        }
359    }
360
361    /// Return the current cursor for this call, advancing it if we're the
362    /// first caller to observe a new rotation window. Pure compare-exchange
363    /// on the packed atomic; no Mutex, no clock dep beyond `now_ms`.
364    fn rotation_cursor(&self, now_ms: u64, num_partitions: u16) -> u16 {
365        let window_ms = self.scheduler_config.rotation_window_ms.max(1);
366        let step = self.scheduler_config.max_partitions_per_scan.max(1);
367        let this_epoch = now_ms / window_ms;
368
369        loop {
370            let prev = self.rotation_state.load(Ordering::Relaxed);
371            let prev_epoch = prev >> 16;
372            let prev_cursor = (prev & 0xFFFF) as u16;
373
374            if prev_epoch == this_epoch {
375                return prev_cursor;
376            }
377            // New window: advance cursor by max_partitions_per_scan so
378            // the next scan covers a fresh slice. Modular wrap on
379            // num_partitions. Zero-partitions is a config bug; caller
380            // guards it, but fall back to 0 to be safe.
381            let new_cursor = if num_partitions == 0 {
382                0
383            } else {
384                prev_cursor.wrapping_add(step) % num_partitions
385            };
386            let new_state = (this_epoch << 16) | u64::from(new_cursor);
387            match self.rotation_state.compare_exchange(
388                prev,
389                new_state,
390                Ordering::Relaxed,
391                Ordering::Relaxed,
392            ) {
393                Ok(_) => return new_cursor,
394                // Lost race; another caller advanced. Re-read and use
395                // whatever cursor now belongs to this epoch.
396                Err(_) => continue,
397            }
398        }
399    }
400
401    /// Find an eligible execution and issue a claim grant.
402    ///
403    /// Iterates all execution partitions looking for the first partition
404    /// with an eligible execution. Issues a claim grant via FCALL.
405    ///
406    /// `worker_capabilities` is sent to `ff_issue_claim_grant` as a sorted
407    /// CSV (BTreeSet guarantees deterministic order). Executions whose
408    /// `required_capabilities` are not a subset of this set are skipped
409    /// (stay queued) via `ScriptError::CapabilityMismatch`.
410    ///
411    /// Returns `Ok(None)` if no eligible executions exist anywhere.
412    /// Returns `Ok(Some(grant))` on success.
413    /// Returns `Err` on Valkey errors.
414    pub async fn claim_for_worker(
415        &self,
416        lane: &LaneId,
417        worker_id: &WorkerId,
418        worker_instance_id: &WorkerInstanceId,
419        worker_capabilities: &BTreeSet<String>,
420        grant_ttl_ms: u64,
421    ) -> Result<Option<ClaimGrant>, SchedulerError> {
422        // PR-94: grant issuance latency histogram. Captures the full
423        // cycle (candidate selection + budget/quota admission +
424        // issuance FCALL) so an operator can spot slow paths without
425        // having to correlate scheduler logs across partitions. Drop
426        // guard records on *every* exit path (including ?-propagated
427        // errors) so the histogram reflects wall-clock cost, not just
428        // the happy path.
429        struct ClaimTimer<'a> {
430            start: std::time::Instant,
431            lane: &'a str,
432            metrics: &'a ff_observability::Metrics,
433        }
434        impl Drop for ClaimTimer<'_> {
435            fn drop(&mut self) {
436                self.metrics
437                    .record_claim_from_grant(self.lane, self.start.elapsed());
438            }
439        }
440        let _claim_timer = ClaimTimer {
441            start: std::time::Instant::now(),
442            lane: lane.as_str(),
443            metrics: &self.metrics,
444        };
445        let num_partitions = self.config.num_flow_partitions;
446        // Guard misconfiguration: a zero partition count would hit a
447        // `% num_partitions` division-by-zero in the scan_start / jitter
448        // computation below. The pre-bounded loop simply skipped on
449        // `for offset in 0..0`; preserve that graceful no-op rather than
450        // panicking. Config-returning (not Ok(None)) so an operator who
451        // misconfigures gets a loud, actionable error — silent Ok(None)
452        // would make every claim call look like an empty queue forever.
453        if num_partitions == 0 {
454            return Err(SchedulerError::Config(
455                "num_flow_partitions must be > 0".to_owned(),
456            ));
457        }
458        let mut budget_checker = BudgetChecker::new(self.config);
459
460        // Jitter the partition scan start to avoid thundering-herd on
461        // partition 0 when 100 workers all tick simultaneously. Seeded
462        // from worker_instance_id so this worker hits a stable window
463        // within a single scheduling cycle (still covers every partition),
464        // and different workers naturally diverge. Uses the shared
465        // ff_core::hash FNV-1a reducer — same helper powering ff-sdk's
466        // PARTITION_SCAN_CHUNK cursor seed. Zero-modulus safe.
467        let start_p: u16 = ff_core::hash::fnv1a_u16_mod(
468            worker_instance_id.as_str(),
469            num_partitions,
470        );
471        // BTreeSet iterates sorted → stable CSV for Lua subset match.
472        // Ingress validation mirrors FlowFabricWorker::connect (ff-sdk):
473        //   - `,` is the CSV delimiter — a token containing one would split
474        //     mid-parse and could let a {"gpu"} worker appear to satisfy
475        //     {"gpu,cuda"} (silent auth bypass).
476        //   - Empty strings would inflate the CSV with leading / adjacent
477        //     commas ("" → ",gpu" → [" "," gpu"]) and inflate the token
478        //     count past CAPS_MAX_TOKENS for no semantic reason.
479        //   - Non-printable / whitespace: "gpu " vs "gpu" or "gpu\n" vs
480        //     "gpu" silently mis-routes. Require printable ASCII excluding
481        //     space (0x21-0x7E) at ingress so typos fail loud.
482        // Enforce ALL here so operator misconfig at the scheduler entry
483        // point fails loud, symmetric with the SDK inline-claim path.
484        // Bounds (#csv, #tokens) are enforced by the Lua side.
485        for cap in worker_capabilities {
486            if cap.is_empty() {
487                return Err(SchedulerError::Config(
488                    "capability token must not be empty".to_owned(),
489                ));
490            }
491            if cap.contains(',') {
492                return Err(SchedulerError::Config(format!(
493                    "capability token may not contain ',' (CSV delimiter): {cap:?}"
494                )));
495            }
496            // Reject ASCII control + whitespace (incl. Unicode whitespace);
497            // allow non-ASCII printable UTF-8 so i18n cap names work. CSV
498            // delimiter `,` is single-byte and never a UTF-8 continuation,
499            // so multibyte UTF-8 is safe on the wire. Symmetric with
500            // ff-sdk::FlowFabricWorker::connect.
501            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
502                return Err(SchedulerError::Config(format!(
503                    "capability token must not contain whitespace or control characters: {cap:?}"
504                )));
505            }
506        }
507        if worker_capabilities.len() > ff_core::policy::CAPS_MAX_TOKENS {
508            return Err(SchedulerError::Config(format!(
509                "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
510                ff_core::policy::CAPS_MAX_TOKENS,
511                worker_capabilities.len()
512            )));
513        }
514        let worker_caps_csv = worker_capabilities
515            .iter()
516            .filter(|s| !s.is_empty())
517            .cloned()
518            .collect::<Vec<_>>()
519            .join(",");
520        // Stable digest used in per-mismatch logs so the full 4KB CSV
521        // doesn't get echoed on every mismatch. See worker_caps_digest.
522        let worker_caps_hash = worker_caps_digest(&worker_caps_csv);
523        if worker_caps_csv.len() > ff_core::policy::CAPS_MAX_BYTES {
524            return Err(SchedulerError::Config(format!(
525                "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
526                ff_core::policy::CAPS_MAX_BYTES,
527                worker_caps_csv.len()
528            )));
529        }
530
531        // ── Bounded scan with rotation cursor ──
532        // Prior impl walked all `num_partitions` partitions on every call,
533        // which at `num_flow_partitions = 256` meant a quiet cluster cost
534        // 256 ZRANGEBYSCORE round-trips per claim tick per worker. The
535        // bounded scan caps per-call work at `max_partitions_per_scan`
536        // while the rotation cursor ensures every partition is still
537        // visited within `ceil(total / max_partitions_per_scan)` ticks.
538        //
539        // Needs `now_ms` to compute the rotation window, so we snap server
540        // time up front. One extra TIME round-trip for quiet-cluster no-hit
541        // calls, but on hit paths the existing inner-loop `server_time_ms`
542        // call is now redundant — we reuse this value.
543        let scan_now_ms = match server_time_ms(&self.client).await {
544            Ok(t) => t,
545            Err(e) => {
546                // Transport error talking to Valkey; surface via the same
547                // `ValkeyContext` channel the admission checks use so the
548                // caller retries after backoff instead of silently hitting
549                // partition 0 with a zero cursor.
550                return Err(SchedulerError::ValkeyContext {
551                    source: e,
552                    context: "scheduler: TIME for rotation cursor".to_owned(),
553                });
554            }
555        };
556        let rotation_cursor = self.rotation_cursor(scan_now_ms, num_partitions);
557        // Per-worker jitter stacks on the shared cursor: `start_p` diverges
558        // different workers within one window; `rotation_cursor` drifts the
559        // whole fleet across windows so no single partition is anyone's
560        // permanent "partition 0".
561        let scan_start = (start_p + rotation_cursor) % num_partitions;
562        let scan_budget = self
563            .scheduler_config
564            .max_partitions_per_scan
565            .min(num_partitions)
566            .max(1);
567
568        // Observability counters (RFC-plan item 3): one debug line per
569        // call, not per partition, so a tight-loop worker doesn't flood
570        // logs. `partitions_hit` is 0/1 in practice — the loop returns on
571        // the first grant — but keeping it a counter means future
572        // multi-grant variants (N per call) don't need the log format to
573        // change.
574        let mut partitions_visited: u16 = 0;
575        let mut partitions_skipped: u16 = 0;
576        let mut partitions_hit: u16 = 0;
577        let call_start = std::time::Instant::now();
578
579        for p_idx in iter_partitions(num_partitions, scan_start, scan_budget) {
580            partitions_visited += 1;
581            let partition = Partition {
582                family: PartitionFamily::Execution,
583                index: p_idx,
584            };
585            let idx = IndexKeys::new(&partition);
586            let eligible_key = idx.lane_eligible(lane);
587
588            // ZRANGEBYSCORE eligible -inf +inf LIMIT 0 1
589            // Lowest score = highest priority candidate
590            let candidates: Vec<String> = match self
591                .client
592                .cmd("ZRANGEBYSCORE")
593                .arg(&eligible_key)
594                .arg("-inf")
595                .arg("+inf")
596                .arg("LIMIT")
597                .arg("0")
598                .arg("1")
599                .execute()
600                .await
601            {
602                Ok(ids) => ids,
603                Err(e) => {
604                    tracing::warn!(
605                        partition = p_idx,
606                        error = %e,
607                        "scheduler: ZRANGEBYSCORE eligible failed, skipping partition"
608                    );
609                    partitions_skipped += 1;
610                    continue;
611                }
612            };
613
614            let eid_str = match candidates.first() {
615                Some(s) => s,
616                None => {
617                    // Empty partition — this is the hot path on a quiet
618                    // cluster and the whole point of the bounded scan.
619                    // Don't count as a "skip" (skip implies something
620                    // went wrong); treat as a normal miss.
621                    continue;
622                }
623            };
624
625            // Parse the execution ID
626            let eid = match ExecutionId::parse(eid_str) {
627                Ok(id) => id,
628                Err(e) => {
629                    tracing::warn!(
630                        partition = p_idx,
631                        execution_id = eid_str.as_str(),
632                        error = %e,
633                        "scheduler: invalid execution_id in eligible set, skipping"
634                    );
635                    partitions_skipped += 1;
636                    continue;
637                }
638            };
639
640            let exec_ctx = ExecKeyContext::new(&partition, &eid);
641            let core_key = exec_ctx.core();
642            let eid_s = eid.to_string();
643            // Reuse the call-scoped `scan_now_ms` we read for the rotation
644            // cursor. Semantics for block-detail timestamps are unchanged —
645            // a block decision at time T is still written with a T in the
646            // same call; the tiny staleness (<< 1 RTT) is dwarfed by the
647            // usual scheduler→Valkey latency and saves one TIME round-trip
648            // per candidate.
649            let now_ms = scan_now_ms;
650
651            // ── Capability pre-check (in-slot HGET, cheap) ──
652            // Runs BEFORE quota admission so we never ZADD a quota slot
653            // for an execution this worker can't actually claim. Without
654            // this, on an unmatchable-top-of-zset, every scheduling tick
655            // would ZADD {q:K}:admitted_set then ZREM it via
656            // release_admission on the capability_mismatch reject — a
657            // cross-slot write storm amplifying the mismatch loop.
658            //
659            // Lua `ff_issue_claim_grant` still does the authoritative
660            // check; this is a fast-path short-circuit, not a substitute.
661            // A narrow race exists where `required_capabilities` is
662            // updated between our HGET and the FCALL — the FCALL is still
663            // atomic and correct.
664            let required_caps_csv: Option<String> = match self
665                .client
666                .cmd("HGET")
667                .arg(&core_key)
668                .arg("required_capabilities")
669                .execute::<Option<String>>()
670                .await
671            {
672                Ok(v) => v,
673                Err(e) => {
674                    tracing::warn!(
675                        partition = p_idx,
676                        execution_id = eid_s.as_str(),
677                        error = %e,
678                        "scheduler: HGET required_capabilities failed, skipping candidate"
679                    );
680                    continue;
681                }
682            };
683            if let Some(req) = required_caps_csv.as_deref()
684                && !req.is_empty()
685                && !caps_subset(req, worker_capabilities)
686            {
687                // Move this execution out of eligible into blocked_route
688                // so we don't hot-loop on it every tick (RFC-009 §564). A
689                // periodic sweep (scanner side) promotes blocked_route →
690                // eligible when a worker with matching caps registers.
691                // Logged with a hash digest, not the raw CSV, to keep
692                // per-mismatch log volume bounded.
693                tracing::info!(
694                    partition = p_idx,
695                    execution_id = eid_s.as_str(),
696                    worker_id = worker_id.as_str(),
697                    worker_caps_hash = worker_caps_hash.as_str(),
698                    required = req,
699                    "scheduler: capability mismatch, blocking execution off eligible"
700                );
701                self.block_candidate(
702                    &partition, &idx, lane, &eid, &eligible_key,
703                    "waiting_for_capable_worker",
704                    "no connected worker satisfies required_capabilities",
705                    now_ms,
706                ).await;
707                continue;
708            }
709
710            // ── Budget pre-check (cross-partition, cached per cycle) ──
711            if let Some(block_detail) = self
712                .check_budgets(&mut budget_checker, &exec_ctx, &core_key, &eid_s)
713                .await?
714            {
715                // Budget breached — block candidate and try next
716                self.block_candidate(
717                    &partition, &idx, lane, &eid, &eligible_key,
718                    "waiting_for_budget", &block_detail, now_ms,
719                ).await;
720                continue;
721            }
722
723            // ── Quota pre-check (cross-partition FCALL on {q:K}) ──
724            let quota_admission = self
725                .check_quota(&exec_ctx, &core_key, &eid_s, now_ms)
726                .await?;
727            match &quota_admission {
728                QuotaCheckOutcome::Blocked(block_detail) => {
729                    self.block_candidate(
730                        &partition, &idx, lane, &eid, &eligible_key,
731                        "waiting_for_quota", block_detail, now_ms,
732                    ).await;
733                    continue;
734                }
735                QuotaCheckOutcome::NoQuota | QuotaCheckOutcome::Admitted { .. } => {}
736            }
737
738            // ── All checks passed — issue claim grant ──
739            let grant_key = exec_ctx.claim_grant();
740            let keys: [&str; 3] = [&core_key, &grant_key, &eligible_key];
741
742            let ttl_str = grant_ttl_ms.to_string();
743            let wid_s = worker_id.to_string();
744            let wiid_s = worker_instance_id.to_string();
745            let lane_s = lane.to_string();
746
747            let argv: [&str; 9] = [
748                &eid_s,
749                &wid_s,
750                &wiid_s,
751                &lane_s,
752                "",   // capability_hash
753                &ttl_str,
754                "",   // route_snapshot_json
755                "",   // admission_summary
756                &worker_caps_csv, // sorted CSV; empty → matches only empty-required execs
757            ];
758
759            let raw = match self
760                .client
761                .fcall::<ferriskey::Value>("ff_issue_claim_grant", &keys, &argv)
762                .await
763            {
764                Ok(v) => v,
765                Err(e) => {
766                    // Transport failure on the FCALL — NOSCRIPT, IoError,
767                    // ClusterDown, etc. This is NOT a normal soft-reject;
768                    // persistent transport errors mean the scheduler is
769                    // effectively idle even though it looks like it's
770                    // running. WARN so ops dashboards (WARN+ aggregators)
771                    // fire instead of burying it at DEBUG.
772                    tracing::warn!(
773                        partition = p_idx,
774                        execution_id = eid_s.as_str(),
775                        error = %e,
776                        "scheduler: ff_issue_claim_grant transport error, trying next"
777                    );
778                    if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
779                        self.release_admission(tag, quota_id, eid).await;
780                    }
781                    continue;
782                }
783            };
784
785            match FcallResult::parse(&raw).and_then(|r| r.into_success()) {
786                Ok(_) => {
787                    partitions_hit += 1;
788                    tracing::debug!(
789                        partition = p_idx,
790                        execution_id = eid_s.as_str(),
791                        worker_instance_id = worker_instance_id.as_str(),
792                        start_p = scan_start,
793                        partitions_visited,
794                        partitions_skipped,
795                        partitions_hit,
796                        elapsed_ms = call_start.elapsed().as_millis() as u64,
797                        "scheduler: claim call completed (hit)"
798                    );
799                    return Ok(Some(ClaimGrant {
800                        execution_id: eid,
801                        partition_key: ff_core::partition::PartitionKey::from(&partition),
802                        grant_key: grant_key.clone(),
803                        expires_at_ms: now_ms + grant_ttl_ms,
804                    }));
805                }
806                Err(script_err) => {
807                    if matches!(script_err, ScriptError::CapabilityMismatch(_)) {
808                        // Should be rare: the Rust pre-check above
809                        // normally catches this and blocks the execution
810                        // off eligible. Reaching here means the
811                        // required_capabilities field mutated between our
812                        // HGET and the Lua atomic check (narrow race).
813                        // Block here too so the next tick doesn't loop.
814                        //
815                        // Log uses worker_caps_hash (8-hex digest), not
816                        // the full 4KB CSV, to keep per-mismatch log
817                        // volume bounded. Full CSV is logged once at
818                        // worker connect under "worker caps" WARN.
819                        tracing::info!(
820                            partition = p_idx,
821                            execution_id = eid_s.as_str(),
822                            worker_id = wid_s.as_str(),
823                            worker_caps_hash = worker_caps_hash.as_str(),
824                            error = %script_err,
825                            "scheduler: capability mismatch via Lua (race), blocking execution"
826                        );
827                        self.block_candidate(
828                            &partition, &idx, lane, &eid, &eligible_key,
829                            "waiting_for_capable_worker",
830                            "no connected worker satisfies required_capabilities",
831                            now_ms,
832                        ).await;
833                        if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
834                            self.release_admission(tag, quota_id, eid).await;
835                        }
836                        continue;
837                    } else {
838                        // Any other logical reject (grant_already_exists,
839                        // execution_not_in_eligible_set, execution_not_eligible,
840                        // invalid_capabilities, etc.). These are rare and each
841                        // indicates either a race or a config problem — in
842                        // either case ops need to see it, so WARN, not DEBUG.
843                        tracing::warn!(
844                            partition = p_idx,
845                            execution_id = eid_s.as_str(),
846                            error = %script_err,
847                            "scheduler: ff_issue_claim_grant rejected, trying next"
848                        );
849                    }
850                    if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = &quota_admission {
851                        self.release_admission(tag, quota_id, eid).await;
852                    }
853                    continue;
854                }
855            }
856        }
857
858        tracing::debug!(
859            worker_instance_id = worker_instance_id.as_str(),
860            start_p = scan_start,
861            partitions_visited,
862            partitions_skipped,
863            partitions_hit,
864            elapsed_ms = call_start.elapsed().as_millis() as u64,
865            "scheduler: claim call completed (no hit)"
866        );
867        Ok(None)
868    }
869
870    /// Read budget_ids from exec_core and check each. Returns block detail
871    /// string if any budget is breached, None if all pass.
872    async fn check_budgets(
873        &self,
874        checker: &mut BudgetChecker,
875        _exec_ctx: &ExecKeyContext,
876        core_key: &str,
877        _eid_s: &str,
878    ) -> Result<Option<String>, SchedulerError> {
879        // Read budget_ids from exec_core (comma-separated or JSON list)
880        let budget_ids_str: Option<String> = self
881            .client
882            .cmd("HGET")
883            .arg(core_key)
884            .arg("budget_ids")
885            .execute()
886            .await?;
887
888        let budget_ids_str = match budget_ids_str {
889            Some(s) => s,
890            None => return Ok(None),
891        };
892        if budget_ids_str.is_empty() {
893            return Ok(None); // no budgets attached
894        }
895
896        // Parse comma-separated budget IDs
897        for budget_id in budget_ids_str.split(',') {
898            let budget_id = budget_id.trim();
899            if budget_id.is_empty() {
900                continue;
901            }
902            let result = checker.check_budget(&self.client, budget_id).await?;
903            if let BudgetCheckResult::HardBreach { dimension, detail } = &result {
904                // PR-94: budget hard-breach counter, labelled by
905                // dimension so operators can distinguish "tokens"
906                // breaches from "cost" breaches at a glance.
907                self.metrics.inc_budget_hit(dimension);
908                return Ok(Some(detail.clone()));
909            }
910        }
911
912        Ok(None)
913    }
914
915    /// Check quota admission for the candidate.
916    async fn check_quota(
917        &self,
918        _exec_ctx: &ExecKeyContext,
919        core_key: &str,
920        eid_s: &str,
921        now_ms: u64,
922    ) -> Result<QuotaCheckOutcome, SchedulerError> {
923        // Read quota_policy_id from exec_core
924        let quota_id_str: Option<String> = self
925            .client
926            .cmd("HGET")
927            .arg(core_key)
928            .arg("quota_policy_id")
929            .execute()
930            .await?;
931
932        let quota_id_str = match quota_id_str {
933            Some(s) => s,
934            None => return Ok(QuotaCheckOutcome::NoQuota),
935        };
936        if quota_id_str.is_empty() {
937            return Ok(QuotaCheckOutcome::NoQuota);
938        }
939
940        // Compute real {q:K} partition tag from quota_policy_id
941        let tag = match QuotaPolicyId::parse(&quota_id_str) {
942            Ok(qid) => {
943                let partition = quota_partition(&qid, &self.config);
944                partition.hash_tag()
945            }
946            Err(_) => "{q:0}".to_owned(), // fallback for non-UUID test IDs
947        };
948
949        let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id_str);
950        let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id_str);
951        let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id_str);
952        let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id_str, eid_s);
953        let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id_str);
954
955        // Read quota limits from policy hash
956        let rate_limit: Option<String> = self.client
957            .cmd("HGET").arg(&quota_def_key).arg("max_requests_per_window")
958            .execute().await?;
959        let window_secs: Option<String> = self.client
960            .cmd("HGET").arg(&quota_def_key).arg("requests_per_window_seconds")
961            .execute().await?;
962        let concurrency_cap: Option<String> = self.client
963            .cmd("HGET").arg(&quota_def_key).arg("active_concurrency_cap")
964            .execute().await?;
965        let jitter: Option<String> = self.client
966            .cmd("HGET").arg(&quota_def_key).arg("jitter_ms")
967            .execute().await?;
968
969        let rate_limit = rate_limit.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
970        let window_secs = window_secs.as_deref().and_then(|s| s.parse().ok()).unwrap_or(60u64);
971        let concurrency_cap = concurrency_cap.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
972        let jitter_ms = jitter.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
973
974        // No limits configured — admit without recording
975        if rate_limit == 0 && concurrency_cap == 0 {
976            return Ok(QuotaCheckOutcome::NoQuota);
977        }
978
979        // FCALL ff_check_admission_and_record on {q:K}
980        let keys: [&str; 5] = [&window_key, &concurrency_key, &quota_def_key, &admitted_key, &admitted_set_key];
981        let now_s = now_ms.to_string();
982        let ws = window_secs.to_string();
983        let rl = rate_limit.to_string();
984        let cc = concurrency_cap.to_string();
985        let jt = jitter_ms.to_string();
986        let argv: [&str; 6] = [&now_s, &ws, &rl, &cc, eid_s, &jt];
987
988        match self.client
989            .fcall::<ferriskey::Value>("ff_check_admission_and_record", &keys, &argv)
990            .await
991        {
992            Ok(result) => {
993                // Parse domain-specific result: {"ADMITTED"}, {"RATE_EXCEEDED", retry_after},
994                // {"CONCURRENCY_EXCEEDED"}, {"ALREADY_ADMITTED"}
995                let status = Self::parse_admission_status(&result);
996                match status.as_str() {
997                    "ADMITTED" | "ALREADY_ADMITTED" => Ok(QuotaCheckOutcome::Admitted {
998                        tag: tag.clone(),
999                        quota_id: quota_id_str.clone(),
1000                        eid: eid_s.to_owned(),
1001                    }),
1002                    "RATE_EXCEEDED" => {
1003                        // PR-94: quota admission block, labelled by
1004                        // reason so rate-vs-concurrency waves are
1005                        // distinguishable in a dashboard.
1006                        self.metrics.inc_quota_hit("rate");
1007                        Ok(QuotaCheckOutcome::Blocked(format!(
1008                            "quota {}: rate limit {}/{} per {}s window",
1009                            quota_id_str, rate_limit, rate_limit, window_secs
1010                        )))
1011                    }
1012                    "CONCURRENCY_EXCEEDED" => {
1013                        self.metrics.inc_quota_hit("concurrency");
1014                        Ok(QuotaCheckOutcome::Blocked(format!(
1015                            "quota {}: concurrency cap {}",
1016                            quota_id_str, concurrency_cap
1017                        )))
1018                    }
1019                    other => {
1020                        // Fail-closed: an unrecognised status is the Lua
1021                        // telling us a contract we don't understand. Do
1022                        // NOT default to admit — surface it so the
1023                        // scheduler retries next cycle and ops sees the
1024                        // event.
1025                        tracing::warn!(
1026                            quota_id = quota_id_str.as_str(),
1027                            status = other,
1028                            "scheduler: unexpected admission result, denying (fail-closed)"
1029                        );
1030                        Err(SchedulerError::Config(format!(
1031                            "quota {quota_id_str}: unexpected admission status \"{other}\""
1032                        )))
1033                    }
1034                }
1035            }
1036            Err(e) => {
1037                // Fail-closed: transport fault on the admission FCALL
1038                // must NOT silently admit the candidate. Propagate so
1039                // the outer cycle returns the error and the worker
1040                // retries after the usual backoff; the next cycle will
1041                // re-run the admission check against fresh state.
1042                tracing::warn!(
1043                    quota_id = quota_id_str.as_str(),
1044                    error = %e,
1045                    "scheduler: quota FCALL failed, denying (fail-closed)"
1046                );
1047                Err(SchedulerError::ValkeyContext {
1048                    source: e,
1049                    context: format!("ff_check_admission_and_record {quota_id_str}"),
1050                })
1051            }
1052        }
1053    }
1054
1055    /// Parse the first element of a Valkey array result as a status string.
1056    fn parse_admission_status(result: &ferriskey::Value) -> String {
1057        match result {
1058            ferriskey::Value::Array(arr) => {
1059                match arr.first() {
1060                    Some(Ok(ferriskey::Value::BulkString(b))) => {
1061                        String::from_utf8_lossy(b).into_owned()
1062                    }
1063                    Some(Ok(ferriskey::Value::SimpleString(s))) => s.clone(),
1064                    _ => "UNKNOWN".to_owned(),
1065                }
1066            }
1067            _ => "UNKNOWN".to_owned(),
1068        }
1069    }
1070
1071    /// Block a candidate that failed budget/quota check.
1072    /// FCALL ff_block_execution_for_admission on {p:N}.
1073    #[allow(clippy::too_many_arguments)]
1074    async fn block_candidate(
1075        &self,
1076        partition: &Partition,
1077        idx: &IndexKeys,
1078        lane: &LaneId,
1079        eid: &ExecutionId,
1080        eligible_key: &str,
1081        block_reason: &str,
1082        blocking_detail: &str,
1083        now_ms: u64,
1084    ) {
1085        let exec_ctx = ExecKeyContext::new(partition, eid);
1086        let core_key = exec_ctx.core();
1087        let eid_s = eid.to_string();
1088        let blocked_key = match block_reason {
1089            "waiting_for_budget" => idx.lane_blocked_budget(lane),
1090            "waiting_for_quota" => idx.lane_blocked_quota(lane),
1091            "waiting_for_capable_worker" => idx.lane_blocked_route(lane),
1092            _ => idx.lane_blocked_budget(lane),
1093        };
1094
1095        let keys: [&str; 3] = [&core_key, eligible_key, &blocked_key];
1096        let now_s = now_ms.to_string();
1097        let argv: [&str; 4] = [&eid_s, block_reason, blocking_detail, &now_s];
1098
1099        // Parse FcallResult so we distinguish Lua-level rejections (e.g.
1100        // execution_not_active because the execution went terminal between
1101        // our HGET and the FCALL) from a real block. Previously `Ok(_)`
1102        // treated an err-tuple as success → INFO log "candidate blocked"
1103        // while nothing actually changed on exec_core, then the next tick
1104        // re-picked the same candidate and looped. Mirrors the
1105        // release_admission parse fix.
1106        match self.client
1107            .fcall::<ferriskey::Value>("ff_block_execution_for_admission", &keys, &argv)
1108            .await
1109        {
1110            Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
1111                Ok(_) => {
1112                    tracing::info!(
1113                        execution_id = eid_s,
1114                        reason = block_reason,
1115                        "scheduler: candidate blocked by admission check"
1116                    );
1117                }
1118                Err(script_err) => {
1119                    // Logical reject from Lua (e.g. execution_not_active
1120                    // — the execution went terminal between the scheduler
1121                    // pick and the block FCALL; the candidate loop will
1122                    // naturally move on). WARN so ops dashboards surface
1123                    // actual block failures, but not so loud that a common
1124                    // race spams alerts.
1125                    tracing::warn!(
1126                        execution_id = eid_s,
1127                        reason = block_reason,
1128                        error = %script_err,
1129                        "scheduler: ff_block_execution_for_admission rejected by Lua"
1130                    );
1131                }
1132            },
1133            Err(e) => {
1134                tracing::warn!(
1135                    execution_id = eid_s,
1136                    error = %e,
1137                    "scheduler: ff_block_execution_for_admission transport failed"
1138                );
1139            }
1140        }
1141    }
1142
1143    /// Release a previously-recorded quota admission slot.
1144    /// Called when ff_issue_claim_grant fails after admission was recorded.
1145    async fn release_admission(
1146        &self,
1147        tag: &str,
1148        quota_id: &str,
1149        eid_s: &str,
1150    ) {
1151        let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid_s);
1152        let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
1153        let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
1154
1155        let keys: [&str; 3] = [&admitted_key, &admitted_set_key, &concurrency_key];
1156        let argv: [&str; 1] = [eid_s];
1157
1158        // Parse the Lua response properly: FCALL returns `Ok(Value)` for
1159        // BOTH success and logical-error paths. Treating Ok(_) blindly as
1160        // "released" logs a false positive when the Lua returns
1161        // `{0, "quota_not_found"}` (or any other script-level err) — the
1162        // slot in fact remains pinned until its TTL expires, which is
1163        // minutes to hours. Surface the real outcome so on-call sees
1164        // actual release failures instead of clean "released" events.
1165        match self.client
1166            .fcall::<ferriskey::Value>("ff_release_admission", &keys, &argv)
1167            .await
1168        {
1169            Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
1170                Ok(_) => {
1171                    tracing::info!(
1172                        execution_id = eid_s,
1173                        quota_id,
1174                        "scheduler: released admission after claim failure"
1175                    );
1176                }
1177                Err(script_err) => {
1178                    tracing::warn!(
1179                        execution_id = eid_s,
1180                        quota_id,
1181                        error = %script_err,
1182                        "scheduler: ff_release_admission rejected by Lua \
1183                         (slot will expire via TTL)"
1184                    );
1185                }
1186            },
1187            Err(e) => {
1188                tracing::warn!(
1189                    execution_id = eid_s,
1190                    quota_id,
1191                    error = %e,
1192                    "scheduler: ff_release_admission transport failed \
1193                     (slot will expire via TTL)"
1194                );
1195            }
1196        }
1197    }
1198}
1199
1200/// Get server time in milliseconds via the TIME command.
1201async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
1202    let result: Vec<String> = client
1203        .cmd("TIME")
1204        .execute()
1205        .await?;
1206    if result.len() < 2 {
1207        return Err(ferriskey::Error::from((
1208            ferriskey::ErrorKind::ClientError,
1209            "TIME returned fewer than 2 elements",
1210        )));
1211    }
1212    let secs: u64 = result[0].parse().map_err(|_| {
1213        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
1214    })?;
1215    let micros: u64 = result[1].parse().map_err(|_| {
1216        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
1217    })?;
1218    Ok(secs * 1000 + micros / 1000)
1219}
1220
1221/// Errors from the scheduler.
1222#[derive(Debug, thiserror::Error)]
1223pub enum SchedulerError {
1224    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
1225    #[error("valkey: {0}")]
1226    Valkey(#[from] ferriskey::Error),
1227    /// Valkey error with additional context (preserves ErrorKind via #[source]).
1228    #[error("valkey ({context}): {source}")]
1229    ValkeyContext {
1230        #[source]
1231        source: ferriskey::Error,
1232        context: String,
1233    },
1234    /// Caller-supplied value failed ingress validation. NOT retryable — the
1235    /// caller must fix its input before retrying.
1236    #[error("config: {0}")]
1237    Config(String),
1238}
1239
1240impl SchedulerError {
1241    /// Returns the underlying ferriskey ErrorKind, if this is a Valkey error.
1242    /// Matches `ServerError::valkey_kind` and `ScriptError::valkey_kind` so
1243    /// callers can treat all three uniformly.
1244    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
1245        match self {
1246            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
1247            Self::Config(_) => None,
1248        }
1249    }
1250
1251    /// Whether this error is safely retryable by a caller. Mirrors
1252    /// `ServerError::is_retryable` semantics.
1253    pub fn is_retryable(&self) -> bool {
1254        self.valkey_kind()
1255            .map(is_retryable_kind)
1256            .unwrap_or(false)
1257    }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262    use super::*;
1263    use ferriskey::ErrorKind;
1264
1265    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
1266        ferriskey::Error::from((kind, "synthetic"))
1267    }
1268
1269    #[test]
1270    fn scheduler_is_retryable_matches_kind_table() {
1271        assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
1272        assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
1273
1274        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
1275        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
1276        assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
1277    }
1278
1279    #[test]
1280    fn scheduler_valkey_context_is_retryable() {
1281        let err = SchedulerError::ValkeyContext {
1282            source: mk_fk_err(ErrorKind::BusyLoadingError),
1283            context: "HGET budget_ids".into(),
1284        };
1285        assert!(err.is_retryable());
1286    }
1287
1288    #[test]
1289    fn scheduler_valkey_kind_exposed() {
1290        let err = SchedulerError::Valkey(mk_fk_err(ErrorKind::TryAgain));
1291        assert_eq!(err.valkey_kind(), Some(ErrorKind::TryAgain));
1292    }
1293
1294    // ── iter_partitions: regression test for the modular wrap + bounded
1295    // length contract. The fairness behaviour of the bounded scheduler
1296    // scan depends entirely on this helper returning exactly `count`
1297    // distinct partition indices starting at `start` and wrapping modulo
1298    // `total`. Tested in isolation so a bug here is distinguishable from
1299    // a bug in the Valkey-backed loop. ──
1300
1301    #[test]
1302    fn iter_partitions_no_wrap() {
1303        // start=10, count=5, total=256 → 10..15, no wrap involved.
1304        let ps: Vec<u16> = iter_partitions(256, 10, 5).collect();
1305        assert_eq!(ps, vec![10, 11, 12, 13, 14]);
1306    }
1307
1308    #[test]
1309    fn iter_partitions_wraps_modulo_total() {
1310        // start=254, count=5, total=256 → 254, 255, 0, 1, 2.
1311        let ps: Vec<u16> = iter_partitions(256, 254, 5).collect();
1312        assert_eq!(ps, vec![254, 255, 0, 1, 2]);
1313    }
1314
1315    #[test]
1316    fn iter_partitions_count_capped_to_total() {
1317        // Asking for more than `total` yields exactly `total` distinct
1318        // partitions — never a duplicate, never more than the universe.
1319        let ps: Vec<u16> = iter_partitions(4, 1, 100).collect();
1320        assert_eq!(ps, vec![1, 2, 3, 0]);
1321    }
1322
1323    #[test]
1324    fn iter_partitions_length_matches_count() {
1325        // Invariant: output length == min(count, total). The scan loop
1326        // upper-bounds round-trips on this, so regressing it would
1327        // silently re-introduce the 256-round-trip-per-tick bug.
1328        for start in [0u16, 1, 50, 255] {
1329            for count in [0u16, 1, 16, 32, 256] {
1330                let len = iter_partitions(256, start, count).count();
1331                assert_eq!(len, count.min(256) as usize);
1332            }
1333        }
1334    }
1335
1336    // ── Fairness: the union of the partitions visited across
1337    // `ceil(total / max_partitions_per_scan)` successive scans, with the
1338    // rotation cursor advancing each scan, must cover every partition
1339    // exactly once. This is the contract the operator rustdoc promises
1340    // ("any given partition reached within 8 ticks at defaults"). ──
1341    #[test]
1342    fn fairness_full_coverage_in_ceil_total_over_budget_scans() {
1343        const TOTAL: u16 = 256;
1344        const BUDGET: u16 = SchedulerConfig::DEFAULT_MAX_PARTITIONS_PER_SCAN;
1345        let scans = TOTAL.div_ceil(BUDGET); // 8 at defaults
1346
1347        // Simulate the same advance logic the live cursor performs: each
1348        // "scan" starts at the previous start + BUDGET (mod TOTAL). We
1349        // pin start to 0 for determinism; the per-worker FNV jitter is a
1350        // phase offset on top and doesn't change coverage.
1351        let mut union = std::collections::BTreeSet::new();
1352        let mut cursor: u16 = 0;
1353        for _ in 0..scans {
1354            for p in iter_partitions(TOTAL, cursor, BUDGET) {
1355                union.insert(p);
1356            }
1357            cursor = cursor.wrapping_add(BUDGET) % TOTAL;
1358        }
1359
1360        assert_eq!(union.len(), TOTAL as usize, "every partition visited once");
1361        for p in 0..TOTAL {
1362            assert!(union.contains(&p), "missing partition {p}");
1363        }
1364    }
1365
1366    #[test]
1367    fn fairness_full_coverage_with_phase_offset() {
1368        // Regression: the per-worker FNV phase must not change the
1369        // coverage property. Pick a non-zero start; we still cover the
1370        // whole universe in ceil(total/budget) scans.
1371        const TOTAL: u16 = 256;
1372        const BUDGET: u16 = SchedulerConfig::DEFAULT_MAX_PARTITIONS_PER_SCAN;
1373        let scans = TOTAL.div_ceil(BUDGET);
1374
1375        let mut union = std::collections::BTreeSet::new();
1376        let mut cursor: u16 = 137; // arbitrary per-worker jitter
1377        for _ in 0..scans {
1378            for p in iter_partitions(TOTAL, cursor, BUDGET) {
1379                union.insert(p);
1380            }
1381            cursor = cursor.wrapping_add(BUDGET) % TOTAL;
1382        }
1383        assert_eq!(union.len(), TOTAL as usize);
1384    }
1385
1386    #[test]
1387    fn scheduler_config_defaults_match_rustdoc() {
1388        let c = SchedulerConfig::default();
1389        assert_eq!(c.max_partitions_per_scan, 32);
1390        assert_eq!(c.rotation_window_ms, 250);
1391    }
1392}
1393
1394
1395