ff_scheduler/claim.rs
1//! Claim-grant cycle: find eligible executions and issue grants.
2//!
3//! The scheduler selects candidates from partition-local eligible sorted sets,
4//! then atomically issues a claim grant via `FCALL ff_issue_claim_grant`.
5//! The worker (ff-sdk) subsequently consumes the grant via `ff_claim_execution`.
6//!
7//! Phase 5: single lane, budget/quota pre-checks before grant issuance.
8//! Candidates that fail budget/quota are blocked via ff_block_execution_for_admission.
9//!
10//! Reference: RFC-009 §12.7, RFC-010 §3.1, RFC-008 §1.7
11
12use ff_core::keys::{ExecKeyContext, IndexKeys};
13use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition, quota_partition};
14use ff_core::types::{BudgetId, ExecutionId, LaneId, QuotaPolicyId, WorkerId, WorkerInstanceId};
15use ff_script::error::ScriptError;
16use ff_script::result::FcallResult;
17use ff_script::retry::is_retryable_kind;
18use std::collections::BTreeSet;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21/// Short stable digest of a worker's capability CSV. Used in per-mismatch
22/// log lines so a worker-caps-CSV up to 4KB does not get echoed on every
23/// capability_mismatch event (would swamp aggregators during an incident).
24/// The full CSV is logged once at WARN when the worker connects; operators
25/// cross-reference this 8-hex prefix to the full set.
26/// Return true iff every non-empty comma-separated token in `required_csv`
27/// is present in `worker_caps`. Mirrors the authoritative Lua subset check
28/// in scheduling.lua (parse_capability_csv + missing_capabilities) — this
29/// is a fast-path Rust short-circuit so we can skip the quota-admission
30/// write for executions we already know the worker can't claim. Empty /
31/// all-separator CSV → subset holds trivially.
32fn caps_subset(required_csv: &str, worker_caps: &BTreeSet<String>) -> bool {
33 required_csv
34 .split(',')
35 .filter(|t| !t.is_empty())
36 .all(|t| worker_caps.contains(t))
37}
38
39/// Short, stable digest of a worker's caps CSV for per-event log lines.
40/// Thin wrapper around the shared helper so call sites read as
41/// "worker_caps_digest" locally while the algorithm lives in one place.
42fn worker_caps_digest(csv: &str) -> String {
43 ff_core::hash::fnv1a_xor8hex(csv)
44}
45
46/// A claim grant issued by the scheduler for a specific execution.
47///
48/// Re-exported from [`ff_core::contracts::ClaimGrant`]. Lives in
49/// `ff-core` so `ff-scheduler` (issuer) and `ff-sdk` (consumer)
50/// share one wire-level type without a cross-dep between them.
51pub use ff_core::contracts::ClaimGrant;
52
53/// A reclaim grant for a resumed (attempt_interrupted) execution.
54///
55/// Re-export of [`ff_core::contracts::ReclaimGrant`] for symmetry
56/// with [`ClaimGrant`]. `ff-scheduler` will be the canonical
57/// producer once the Batch-C reclaim scanner lands; today only
58/// test fixtures construct this type. Consumed by
59/// `FlowFabricWorker::claim_from_reclaim_grant`.
60pub use ff_core::contracts::ReclaimGrant;
61
62/// Budget check result from a cross-partition budget read.
63#[derive(Debug)]
64pub enum BudgetCheckResult {
65 /// Budget is within limits — proceed.
66 Ok,
67 /// Budget hard limit breached. Contains (dimension, detail_string).
68 HardBreach { dimension: String, detail: String },
69}
70
71/// Outcome of a quota admission check.
72enum QuotaCheckOutcome {
73 /// No quota attached to this execution.
74 NoQuota,
75 /// Quota admitted — carries context for release on subsequent failure.
76 Admitted { tag: String, quota_id: String, eid: String },
77 /// Quota denied — execution should be blocked.
78 Blocked(String),
79}
80
81/// Cross-partition budget checker with per-cycle caching.
82///
83/// Reads budget usage/limits once per scan cycle (not per candidate).
84/// This is MANDATORY for performance — without it, 50K blocked executions
85/// would produce 50K budget reads per cycle.
86pub struct BudgetChecker {
87 /// Cached budget status: budget_id → BudgetCheckResult.
88 /// Reset at the start of each scheduler cycle.
89 cache: std::collections::HashMap<String, BudgetCheckResult>,
90 config: PartitionConfig,
91}
92
93impl BudgetChecker {
94 pub fn new(config: PartitionConfig) -> Self {
95 Self {
96 cache: std::collections::HashMap::new(),
97 config,
98 }
99 }
100
101 /// Check a budget by ID. Reads from Valkey on first call per budget,
102 /// caches for subsequent candidates in the same cycle.
103 ///
104 /// Fail-closed: transport errors propagate as
105 /// [`SchedulerError::ValkeyContext`] rather than being cached as
106 /// `Ok`. Caching an Err would be wrong — a transient blip would then
107 /// pin every candidate in this cycle to the same denial. Successful
108 /// results (including `HardBreach`) are still cached so 50K blocked
109 /// candidates sharing a budget do one read per cycle, not 50K.
110 pub async fn check_budget(
111 &mut self,
112 client: &ferriskey::Client,
113 budget_id: &str,
114 ) -> Result<&BudgetCheckResult, SchedulerError> {
115 if self.cache.contains_key(budget_id) {
116 return Ok(&self.cache[budget_id]);
117 }
118
119 // Compute real {b:M} partition tag from budget_id
120 let (usage_key, limits_key) = match BudgetId::parse(budget_id) {
121 Ok(bid) => {
122 let partition = budget_partition(&bid, &self.config);
123 let tag = partition.hash_tag();
124 (
125 format!("ff:budget:{}:{}:usage", tag, budget_id),
126 format!("ff:budget:{}:{}:limits", tag, budget_id),
127 )
128 }
129 Err(_) => {
130 // Fallback for non-UUID budget IDs (test compat)
131 (
132 format!("ff:budget:{{b:0}}:{}:usage", budget_id),
133 format!("ff:budget:{{b:0}}:{}:limits", budget_id),
134 )
135 }
136 };
137
138 let result =
139 Self::read_and_check(client, &usage_key, &limits_key)
140 .await
141 .map_err(|source| SchedulerError::ValkeyContext {
142 source,
143 context: format!("budget_checker read {budget_id}"),
144 })?;
145
146 self.cache.insert(budget_id.to_owned(), result);
147 Ok(&self.cache[budget_id])
148 }
149
150 /// Read budget usage and limits, compare each dimension.
151 async fn read_and_check(
152 client: &ferriskey::Client,
153 usage_key: &str,
154 limits_key: &str,
155 ) -> Result<BudgetCheckResult, ferriskey::Error> {
156 // Read all limit dimensions via hgetall (returns HashMap, not flat pairs)
157 let limits: std::collections::HashMap<String, String> = client
158 .hgetall(limits_key)
159 .await?;
160
161 // Parse hard limits
162 for (field, limit_val) in &limits {
163 if !field.starts_with("hard:") {
164 continue;
165 }
166 let dimension = &field[5..]; // strip "hard:" prefix
167 let limit: u64 = match limit_val.parse() {
168 Ok(v) if v > 0 => v,
169 _ => continue,
170 };
171
172 // Read current usage for this dimension. Fail-closed: a
173 // transport error must NOT silently default the usage to 0
174 // (which would make every breach invisible). Absence is
175 // still treated as 0 — the caller is expected to HSET
176 // usage_key on every increment.
177 let usage_str: Option<String> = client
178 .cmd("HGET")
179 .arg(usage_key)
180 .arg(dimension)
181 .execute()
182 .await?;
183 let usage: u64 = usage_str
184 .as_deref()
185 .and_then(|s| s.parse().ok())
186 .unwrap_or(0);
187
188 if usage >= limit {
189 return Ok(BudgetCheckResult::HardBreach {
190 dimension: dimension.to_owned(),
191 detail: format!("budget {}: {} {}/{}", usage_key, dimension, usage, limit),
192 });
193 }
194 }
195
196 Ok(BudgetCheckResult::Ok)
197 }
198
199 /// Clear the cache at the start of a new scheduler cycle.
200 pub fn reset(&mut self) {
201 self.cache.clear();
202 }
203}
204
205/// Tunable scheduler behavior, separate from the topology-level
206/// [`PartitionConfig`]. Lives in `ff-scheduler` because every field is a
207/// scheduler-internal scan policy — none of it leaks into persisted keys,
208/// Lua scripts, or cross-crate wire shapes (unlike `PartitionConfig`).
209/// If a future RFC needs a unified knob surface we can lift this up; not
210/// premature today.
211#[derive(Debug, Clone, Copy)]
212pub struct SchedulerConfig {
213 /// Maximum number of partitions to probe in a single
214 /// [`Scheduler::claim_for_worker`] call before giving up and returning
215 /// `Ok(None)`.
216 ///
217 /// **Trade-off:** smaller = lower worst-case no-hit latency per claim
218 /// call (each probe is a ZRANGEBYSCORE round-trip, ~0.1ms LAN), larger
219 /// = better fairness per call (more partitions seen before a worker
220 /// yields). At the default of 32 with 256 partitions, any given
221 /// partition is reached within `ceil(256/32) = 8` scheduling ticks —
222 /// combined with the rotation cursor, that bounds worst-case
223 /// starvation for a specific partition's head-of-queue execution.
224 pub max_partitions_per_scan: u16,
225 /// Duration a rotation cursor position stays stable before advancing.
226 ///
227 /// **Trade-off:** too short and tight-loop workers re-enter the same
228 /// window on every tick (cursor never actually rotates relative to
229 /// them); too long and slow-poll workers keep seeing the same cursor
230 /// across many ticks (reducing fairness benefit). 250ms is a middle
231 /// ground: tight-loop workers (sub-ms claim cycles) see a fresh
232 /// window every ~250 ticks, 1s-poll workers see a fresh window every
233 /// 4 ticks. Tune down if your workers all idle-poll >1s; tune up if
234 /// you run a fleet of tight-loop claimers and want less cursor churn.
235 pub rotation_window_ms: u64,
236}
237
238impl SchedulerConfig {
239 /// Default scan budget: probe 32 partitions per claim call.
240 /// See [`Self::max_partitions_per_scan`] for the latency/fairness
241 /// rationale.
242 pub const DEFAULT_MAX_PARTITIONS_PER_SCAN: u16 = 32;
243
244 /// Default rotation window: advance the cursor every 250ms.
245 /// See [`Self::rotation_window_ms`].
246 pub const DEFAULT_ROTATION_WINDOW_MS: u64 = 250;
247}
248
249impl Default for SchedulerConfig {
250 fn default() -> Self {
251 Self {
252 max_partitions_per_scan: Self::DEFAULT_MAX_PARTITIONS_PER_SCAN,
253 rotation_window_ms: Self::DEFAULT_ROTATION_WINDOW_MS,
254 }
255 }
256}
257
258/// Iterate partitions `[start, start+1, ..., start+count-1] mod total`.
259///
260/// Factored out of [`Scheduler::claim_for_worker`] so the modular-wrap +
261/// bounded-length contract has a dedicated unit test surface, independent
262/// of Valkey. Called with `count <= total`; values are always distinct.
263///
264/// Returns an iterator (not a Vec) to keep the hot claim path allocation-
265/// free — this runs once per claim tick per worker.
266fn iter_partitions(total: u16, start: u16, count: u16) -> impl Iterator<Item = u16> {
267 debug_assert!(total > 0);
268 let count = count.min(total);
269 (0..count).map(move |i| start.wrapping_add(i) % total)
270}
271
272/// Single-lane scheduler with budget/quota pre-checks.
273///
274/// Iterates execution partitions sequentially, picks the first eligible
275/// execution (lowest priority score). Before issuing a claim grant:
276/// 1. Check all attached budgets (cross-partition, cached per cycle)
277/// 2. Check quota admission (cross-partition FCALL)
278/// 3. If any check fails: block the candidate and try next
279/// 4. If all pass: issue the claim grant
280///
281/// Scan bounding + rotation: each call probes at most
282/// [`SchedulerConfig::max_partitions_per_scan`] partitions starting from
283/// a rotation cursor that advances once per
284/// [`SchedulerConfig::rotation_window_ms`]. The per-worker FNV jitter is
285/// applied on top so different workers diverge within any given window.
286pub struct Scheduler {
287 client: ferriskey::Client,
288 config: PartitionConfig,
289 scheduler_config: SchedulerConfig,
290 /// Packed rotation state: high 48 bits = last window epoch seen
291 /// (`now_ms / rotation_window_ms`), low 16 bits = current cursor
292 /// partition index. A single atomic keeps "advance cursor iff we're
293 /// the first call in a new window" race-free without a Mutex.
294 rotation_state: AtomicU64,
295 /// PR-94: observability handle for claim_from_grant duration and
296 /// budget/quota hit counters. Defaults to a fresh
297 /// `ff_observability::Metrics::new()` — under the default build
298 /// (`observability` feature off) that's the no-op shim; with the
299 /// feature on it's a private real OTEL registry not shared with
300 /// any scrape. Callers that want a shared registry plumb it in
301 /// via [`Self::with_metrics`] / [`Self::with_config_and_metrics`].
302 metrics: std::sync::Arc<ff_observability::Metrics>,
303}
304
305impl Scheduler {
306 pub fn new(client: ferriskey::Client, config: PartitionConfig) -> Self {
307 Self::with_config(client, config, SchedulerConfig::default())
308 }
309
310 /// Construct a scheduler with an explicit [`SchedulerConfig`].
311 /// Use this when you need non-default scan bounds (e.g., in tests that
312 /// want to walk every partition in one call, or deployments with
313 /// non-default polling cadence). Most callers should use
314 /// [`Self::new`].
315 pub fn with_config(
316 client: ferriskey::Client,
317 config: PartitionConfig,
318 scheduler_config: SchedulerConfig,
319 ) -> Self {
320 Self::with_config_and_metrics(
321 client,
322 config,
323 scheduler_config,
324 std::sync::Arc::new(ff_observability::Metrics::new()),
325 )
326 }
327
328 /// PR-94: construct a scheduler with a shared observability
329 /// registry and default [`SchedulerConfig`]. Used by `ff-server`
330 /// so claim/budget/quota metrics land in the same registry
331 /// exposed at `/metrics`. For test harnesses that need both a
332 /// custom `SchedulerConfig` AND observability, use
333 /// [`Self::with_config_and_metrics`].
334 pub fn with_metrics(
335 client: ferriskey::Client,
336 config: PartitionConfig,
337 metrics: std::sync::Arc<ff_observability::Metrics>,
338 ) -> Self {
339 Self::with_config_and_metrics(client, config, SchedulerConfig::default(), metrics)
340 }
341
342 /// PR-94: construct a scheduler with an explicit
343 /// [`SchedulerConfig`] AND a shared observability registry.
344 /// Convenience constructor so callers don't have to thread
345 /// both concerns through separate builders.
346 pub fn with_config_and_metrics(
347 client: ferriskey::Client,
348 config: PartitionConfig,
349 scheduler_config: SchedulerConfig,
350 metrics: std::sync::Arc<ff_observability::Metrics>,
351 ) -> Self {
352 Self {
353 client,
354 config,
355 scheduler_config,
356 rotation_state: AtomicU64::new(0),
357 metrics,
358 }
359 }
360
361 /// Return the current cursor for this call, advancing it if we're the
362 /// first caller to observe a new rotation window. Pure compare-exchange
363 /// on the packed atomic; no Mutex, no clock dep beyond `now_ms`.
364 fn rotation_cursor(&self, now_ms: u64, num_partitions: u16) -> u16 {
365 let window_ms = self.scheduler_config.rotation_window_ms.max(1);
366 let step = self.scheduler_config.max_partitions_per_scan.max(1);
367 let this_epoch = now_ms / window_ms;
368
369 loop {
370 let prev = self.rotation_state.load(Ordering::Relaxed);
371 let prev_epoch = prev >> 16;
372 let prev_cursor = (prev & 0xFFFF) as u16;
373
374 if prev_epoch == this_epoch {
375 return prev_cursor;
376 }
377 // New window: advance cursor by max_partitions_per_scan so
378 // the next scan covers a fresh slice. Modular wrap on
379 // num_partitions. Zero-partitions is a config bug; caller
380 // guards it, but fall back to 0 to be safe.
381 let new_cursor = if num_partitions == 0 {
382 0
383 } else {
384 prev_cursor.wrapping_add(step) % num_partitions
385 };
386 let new_state = (this_epoch << 16) | u64::from(new_cursor);
387 match self.rotation_state.compare_exchange(
388 prev,
389 new_state,
390 Ordering::Relaxed,
391 Ordering::Relaxed,
392 ) {
393 Ok(_) => return new_cursor,
394 // Lost race; another caller advanced. Re-read and use
395 // whatever cursor now belongs to this epoch.
396 Err(_) => continue,
397 }
398 }
399 }
400
401 /// Find an eligible execution and issue a claim grant.
402 ///
403 /// Iterates all execution partitions looking for the first partition
404 /// with an eligible execution. Issues a claim grant via FCALL.
405 ///
406 /// `worker_capabilities` is sent to `ff_issue_claim_grant` as a sorted
407 /// CSV (BTreeSet guarantees deterministic order). Executions whose
408 /// `required_capabilities` are not a subset of this set are skipped
409 /// (stay queued) via `ScriptError::CapabilityMismatch`.
410 ///
411 /// Returns `Ok(None)` if no eligible executions exist anywhere.
412 /// Returns `Ok(Some(grant))` on success.
413 /// Returns `Err` on Valkey errors.
414 pub async fn claim_for_worker(
415 &self,
416 lane: &LaneId,
417 worker_id: &WorkerId,
418 worker_instance_id: &WorkerInstanceId,
419 worker_capabilities: &BTreeSet<String>,
420 grant_ttl_ms: u64,
421 ) -> Result<Option<ClaimGrant>, SchedulerError> {
422 // PR-94: grant issuance latency histogram. Captures the full
423 // cycle (candidate selection + budget/quota admission +
424 // issuance FCALL) so an operator can spot slow paths without
425 // having to correlate scheduler logs across partitions. Drop
426 // guard records on *every* exit path (including ?-propagated
427 // errors) so the histogram reflects wall-clock cost, not just
428 // the happy path.
429 struct ClaimTimer<'a> {
430 start: std::time::Instant,
431 lane: &'a str,
432 metrics: &'a ff_observability::Metrics,
433 }
434 impl Drop for ClaimTimer<'_> {
435 fn drop(&mut self) {
436 self.metrics
437 .record_claim_from_grant(self.lane, self.start.elapsed());
438 }
439 }
440 let _claim_timer = ClaimTimer {
441 start: std::time::Instant::now(),
442 lane: lane.as_str(),
443 metrics: &self.metrics,
444 };
445 let num_partitions = self.config.num_flow_partitions;
446 // Guard misconfiguration: a zero partition count would hit a
447 // `% num_partitions` division-by-zero in the scan_start / jitter
448 // computation below. The pre-bounded loop simply skipped on
449 // `for offset in 0..0`; preserve that graceful no-op rather than
450 // panicking. Config-returning (not Ok(None)) so an operator who
451 // misconfigures gets a loud, actionable error — silent Ok(None)
452 // would make every claim call look like an empty queue forever.
453 if num_partitions == 0 {
454 return Err(SchedulerError::Config(
455 "num_flow_partitions must be > 0".to_owned(),
456 ));
457 }
458 let mut budget_checker = BudgetChecker::new(self.config);
459
460 // Jitter the partition scan start to avoid thundering-herd on
461 // partition 0 when 100 workers all tick simultaneously. Seeded
462 // from worker_instance_id so this worker hits a stable window
463 // within a single scheduling cycle (still covers every partition),
464 // and different workers naturally diverge. Uses the shared
465 // ff_core::hash FNV-1a reducer — same helper powering ff-sdk's
466 // PARTITION_SCAN_CHUNK cursor seed. Zero-modulus safe.
467 let start_p: u16 = ff_core::hash::fnv1a_u16_mod(
468 worker_instance_id.as_str(),
469 num_partitions,
470 );
471 // BTreeSet iterates sorted → stable CSV for Lua subset match.
472 // Ingress validation mirrors FlowFabricWorker::connect (ff-sdk):
473 // - `,` is the CSV delimiter — a token containing one would split
474 // mid-parse and could let a {"gpu"} worker appear to satisfy
475 // {"gpu,cuda"} (silent auth bypass).
476 // - Empty strings would inflate the CSV with leading / adjacent
477 // commas ("" → ",gpu" → [" "," gpu"]) and inflate the token
478 // count past CAPS_MAX_TOKENS for no semantic reason.
479 // - Non-printable / whitespace: "gpu " vs "gpu" or "gpu\n" vs
480 // "gpu" silently mis-routes. Require printable ASCII excluding
481 // space (0x21-0x7E) at ingress so typos fail loud.
482 // Enforce ALL here so operator misconfig at the scheduler entry
483 // point fails loud, symmetric with the SDK inline-claim path.
484 // Bounds (#csv, #tokens) are enforced by the Lua side.
485 for cap in worker_capabilities {
486 if cap.is_empty() {
487 return Err(SchedulerError::Config(
488 "capability token must not be empty".to_owned(),
489 ));
490 }
491 if cap.contains(',') {
492 return Err(SchedulerError::Config(format!(
493 "capability token may not contain ',' (CSV delimiter): {cap:?}"
494 )));
495 }
496 // Reject ASCII control + whitespace (incl. Unicode whitespace);
497 // allow non-ASCII printable UTF-8 so i18n cap names work. CSV
498 // delimiter `,` is single-byte and never a UTF-8 continuation,
499 // so multibyte UTF-8 is safe on the wire. Symmetric with
500 // ff-sdk::FlowFabricWorker::connect.
501 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
502 return Err(SchedulerError::Config(format!(
503 "capability token must not contain whitespace or control characters: {cap:?}"
504 )));
505 }
506 }
507 if worker_capabilities.len() > ff_core::policy::CAPS_MAX_TOKENS {
508 return Err(SchedulerError::Config(format!(
509 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
510 ff_core::policy::CAPS_MAX_TOKENS,
511 worker_capabilities.len()
512 )));
513 }
514 let worker_caps_csv = worker_capabilities
515 .iter()
516 .filter(|s| !s.is_empty())
517 .cloned()
518 .collect::<Vec<_>>()
519 .join(",");
520 // Stable digest used in per-mismatch logs so the full 4KB CSV
521 // doesn't get echoed on every mismatch. See worker_caps_digest.
522 let worker_caps_hash = worker_caps_digest(&worker_caps_csv);
523 if worker_caps_csv.len() > ff_core::policy::CAPS_MAX_BYTES {
524 return Err(SchedulerError::Config(format!(
525 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
526 ff_core::policy::CAPS_MAX_BYTES,
527 worker_caps_csv.len()
528 )));
529 }
530
531 // ── Bounded scan with rotation cursor ──
532 // Prior impl walked all `num_partitions` partitions on every call,
533 // which at `num_flow_partitions = 256` meant a quiet cluster cost
534 // 256 ZRANGEBYSCORE round-trips per claim tick per worker. The
535 // bounded scan caps per-call work at `max_partitions_per_scan`
536 // while the rotation cursor ensures every partition is still
537 // visited within `ceil(total / max_partitions_per_scan)` ticks.
538 //
539 // Needs `now_ms` to compute the rotation window, so we snap server
540 // time up front. One extra TIME round-trip for quiet-cluster no-hit
541 // calls, but on hit paths the existing inner-loop `server_time_ms`
542 // call is now redundant — we reuse this value.
543 let scan_now_ms = match server_time_ms(&self.client).await {
544 Ok(t) => t,
545 Err(e) => {
546 // Transport error talking to Valkey; surface via the same
547 // `ValkeyContext` channel the admission checks use so the
548 // caller retries after backoff instead of silently hitting
549 // partition 0 with a zero cursor.
550 return Err(SchedulerError::ValkeyContext {
551 source: e,
552 context: "scheduler: TIME for rotation cursor".to_owned(),
553 });
554 }
555 };
556 let rotation_cursor = self.rotation_cursor(scan_now_ms, num_partitions);
557 // Per-worker jitter stacks on the shared cursor: `start_p` diverges
558 // different workers within one window; `rotation_cursor` drifts the
559 // whole fleet across windows so no single partition is anyone's
560 // permanent "partition 0".
561 let scan_start = (start_p + rotation_cursor) % num_partitions;
562 let scan_budget = self
563 .scheduler_config
564 .max_partitions_per_scan
565 .min(num_partitions)
566 .max(1);
567
568 // Observability counters (RFC-plan item 3): one debug line per
569 // call, not per partition, so a tight-loop worker doesn't flood
570 // logs. `partitions_hit` is 0/1 in practice — the loop returns on
571 // the first grant — but keeping it a counter means future
572 // multi-grant variants (N per call) don't need the log format to
573 // change.
574 let mut partitions_visited: u16 = 0;
575 let mut partitions_skipped: u16 = 0;
576 let mut partitions_hit: u16 = 0;
577 let call_start = std::time::Instant::now();
578
579 for p_idx in iter_partitions(num_partitions, scan_start, scan_budget) {
580 partitions_visited += 1;
581 let partition = Partition {
582 family: PartitionFamily::Execution,
583 index: p_idx,
584 };
585 let idx = IndexKeys::new(&partition);
586 let eligible_key = idx.lane_eligible(lane);
587
588 // ZRANGEBYSCORE eligible -inf +inf LIMIT 0 1
589 // Lowest score = highest priority candidate
590 let candidates: Vec<String> = match self
591 .client
592 .cmd("ZRANGEBYSCORE")
593 .arg(&eligible_key)
594 .arg("-inf")
595 .arg("+inf")
596 .arg("LIMIT")
597 .arg("0")
598 .arg("1")
599 .execute()
600 .await
601 {
602 Ok(ids) => ids,
603 Err(e) => {
604 tracing::warn!(
605 partition = p_idx,
606 error = %e,
607 "scheduler: ZRANGEBYSCORE eligible failed, skipping partition"
608 );
609 partitions_skipped += 1;
610 continue;
611 }
612 };
613
614 let eid_str = match candidates.first() {
615 Some(s) => s,
616 None => {
617 // Empty partition — this is the hot path on a quiet
618 // cluster and the whole point of the bounded scan.
619 // Don't count as a "skip" (skip implies something
620 // went wrong); treat as a normal miss.
621 continue;
622 }
623 };
624
625 // Parse the execution ID
626 let eid = match ExecutionId::parse(eid_str) {
627 Ok(id) => id,
628 Err(e) => {
629 tracing::warn!(
630 partition = p_idx,
631 execution_id = eid_str.as_str(),
632 error = %e,
633 "scheduler: invalid execution_id in eligible set, skipping"
634 );
635 partitions_skipped += 1;
636 continue;
637 }
638 };
639
640 let exec_ctx = ExecKeyContext::new(&partition, &eid);
641 let core_key = exec_ctx.core();
642 let eid_s = eid.to_string();
643 // Reuse the call-scoped `scan_now_ms` we read for the rotation
644 // cursor. Semantics for block-detail timestamps are unchanged —
645 // a block decision at time T is still written with a T in the
646 // same call; the tiny staleness (<< 1 RTT) is dwarfed by the
647 // usual scheduler→Valkey latency and saves one TIME round-trip
648 // per candidate.
649 let now_ms = scan_now_ms;
650
651 // ── Capability pre-check (in-slot HGET, cheap) ──
652 // Runs BEFORE quota admission so we never ZADD a quota slot
653 // for an execution this worker can't actually claim. Without
654 // this, on an unmatchable-top-of-zset, every scheduling tick
655 // would ZADD {q:K}:admitted_set then ZREM it via
656 // release_admission on the capability_mismatch reject — a
657 // cross-slot write storm amplifying the mismatch loop.
658 //
659 // Lua `ff_issue_claim_grant` still does the authoritative
660 // check; this is a fast-path short-circuit, not a substitute.
661 // A narrow race exists where `required_capabilities` is
662 // updated between our HGET and the FCALL — the FCALL is still
663 // atomic and correct.
664 let required_caps_csv: Option<String> = match self
665 .client
666 .cmd("HGET")
667 .arg(&core_key)
668 .arg("required_capabilities")
669 .execute::<Option<String>>()
670 .await
671 {
672 Ok(v) => v,
673 Err(e) => {
674 tracing::warn!(
675 partition = p_idx,
676 execution_id = eid_s.as_str(),
677 error = %e,
678 "scheduler: HGET required_capabilities failed, skipping candidate"
679 );
680 continue;
681 }
682 };
683 if let Some(req) = required_caps_csv.as_deref()
684 && !req.is_empty()
685 && !caps_subset(req, worker_capabilities)
686 {
687 // Move this execution out of eligible into blocked_route
688 // so we don't hot-loop on it every tick (RFC-009 §564). A
689 // periodic sweep (scanner side) promotes blocked_route →
690 // eligible when a worker with matching caps registers.
691 // Logged with a hash digest, not the raw CSV, to keep
692 // per-mismatch log volume bounded.
693 tracing::info!(
694 partition = p_idx,
695 execution_id = eid_s.as_str(),
696 worker_id = worker_id.as_str(),
697 worker_caps_hash = worker_caps_hash.as_str(),
698 required = req,
699 "scheduler: capability mismatch, blocking execution off eligible"
700 );
701 self.block_candidate(
702 &partition, &idx, lane, &eid, &eligible_key,
703 "waiting_for_capable_worker",
704 "no connected worker satisfies required_capabilities",
705 now_ms,
706 ).await;
707 continue;
708 }
709
710 // ── Budget pre-check (cross-partition, cached per cycle) ──
711 if let Some(block_detail) = self
712 .check_budgets(&mut budget_checker, &exec_ctx, &core_key, &eid_s)
713 .await?
714 {
715 // Budget breached — block candidate and try next
716 self.block_candidate(
717 &partition, &idx, lane, &eid, &eligible_key,
718 "waiting_for_budget", &block_detail, now_ms,
719 ).await;
720 continue;
721 }
722
723 // ── Quota pre-check (cross-partition FCALL on {q:K}) ──
724 let quota_admission = self
725 .check_quota(&exec_ctx, &core_key, &eid_s, now_ms)
726 .await?;
727 match "a_admission {
728 QuotaCheckOutcome::Blocked(block_detail) => {
729 self.block_candidate(
730 &partition, &idx, lane, &eid, &eligible_key,
731 "waiting_for_quota", block_detail, now_ms,
732 ).await;
733 continue;
734 }
735 QuotaCheckOutcome::NoQuota | QuotaCheckOutcome::Admitted { .. } => {}
736 }
737
738 // ── All checks passed — issue claim grant ──
739 let grant_key = exec_ctx.claim_grant();
740 let keys: [&str; 3] = [&core_key, &grant_key, &eligible_key];
741
742 let ttl_str = grant_ttl_ms.to_string();
743 let wid_s = worker_id.to_string();
744 let wiid_s = worker_instance_id.to_string();
745 let lane_s = lane.to_string();
746
747 let argv: [&str; 9] = [
748 &eid_s,
749 &wid_s,
750 &wiid_s,
751 &lane_s,
752 "", // capability_hash
753 &ttl_str,
754 "", // route_snapshot_json
755 "", // admission_summary
756 &worker_caps_csv, // sorted CSV; empty → matches only empty-required execs
757 ];
758
759 let raw = match self
760 .client
761 .fcall::<ferriskey::Value>("ff_issue_claim_grant", &keys, &argv)
762 .await
763 {
764 Ok(v) => v,
765 Err(e) => {
766 // Transport failure on the FCALL — NOSCRIPT, IoError,
767 // ClusterDown, etc. This is NOT a normal soft-reject;
768 // persistent transport errors mean the scheduler is
769 // effectively idle even though it looks like it's
770 // running. WARN so ops dashboards (WARN+ aggregators)
771 // fire instead of burying it at DEBUG.
772 tracing::warn!(
773 partition = p_idx,
774 execution_id = eid_s.as_str(),
775 error = %e,
776 "scheduler: ff_issue_claim_grant transport error, trying next"
777 );
778 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
779 self.release_admission(tag, quota_id, eid).await;
780 }
781 continue;
782 }
783 };
784
785 match FcallResult::parse(&raw).and_then(|r| r.into_success()) {
786 Ok(_) => {
787 partitions_hit += 1;
788 tracing::debug!(
789 partition = p_idx,
790 execution_id = eid_s.as_str(),
791 worker_instance_id = worker_instance_id.as_str(),
792 start_p = scan_start,
793 partitions_visited,
794 partitions_skipped,
795 partitions_hit,
796 elapsed_ms = call_start.elapsed().as_millis() as u64,
797 "scheduler: claim call completed (hit)"
798 );
799 return Ok(Some(ClaimGrant {
800 execution_id: eid,
801 partition_key: ff_core::partition::PartitionKey::from(&partition),
802 grant_key: grant_key.clone(),
803 expires_at_ms: now_ms + grant_ttl_ms,
804 }));
805 }
806 Err(script_err) => {
807 if matches!(script_err, ScriptError::CapabilityMismatch(_)) {
808 // Should be rare: the Rust pre-check above
809 // normally catches this and blocks the execution
810 // off eligible. Reaching here means the
811 // required_capabilities field mutated between our
812 // HGET and the Lua atomic check (narrow race).
813 // Block here too so the next tick doesn't loop.
814 //
815 // Log uses worker_caps_hash (8-hex digest), not
816 // the full 4KB CSV, to keep per-mismatch log
817 // volume bounded. Full CSV is logged once at
818 // worker connect under "worker caps" WARN.
819 tracing::info!(
820 partition = p_idx,
821 execution_id = eid_s.as_str(),
822 worker_id = wid_s.as_str(),
823 worker_caps_hash = worker_caps_hash.as_str(),
824 error = %script_err,
825 "scheduler: capability mismatch via Lua (race), blocking execution"
826 );
827 self.block_candidate(
828 &partition, &idx, lane, &eid, &eligible_key,
829 "waiting_for_capable_worker",
830 "no connected worker satisfies required_capabilities",
831 now_ms,
832 ).await;
833 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
834 self.release_admission(tag, quota_id, eid).await;
835 }
836 continue;
837 } else {
838 // Any other logical reject (grant_already_exists,
839 // execution_not_in_eligible_set, execution_not_eligible,
840 // invalid_capabilities, etc.). These are rare and each
841 // indicates either a race or a config problem — in
842 // either case ops need to see it, so WARN, not DEBUG.
843 tracing::warn!(
844 partition = p_idx,
845 execution_id = eid_s.as_str(),
846 error = %script_err,
847 "scheduler: ff_issue_claim_grant rejected, trying next"
848 );
849 }
850 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
851 self.release_admission(tag, quota_id, eid).await;
852 }
853 continue;
854 }
855 }
856 }
857
858 tracing::debug!(
859 worker_instance_id = worker_instance_id.as_str(),
860 start_p = scan_start,
861 partitions_visited,
862 partitions_skipped,
863 partitions_hit,
864 elapsed_ms = call_start.elapsed().as_millis() as u64,
865 "scheduler: claim call completed (no hit)"
866 );
867 Ok(None)
868 }
869
870 /// Read budget_ids from exec_core and check each. Returns block detail
871 /// string if any budget is breached, None if all pass.
872 async fn check_budgets(
873 &self,
874 checker: &mut BudgetChecker,
875 _exec_ctx: &ExecKeyContext,
876 core_key: &str,
877 _eid_s: &str,
878 ) -> Result<Option<String>, SchedulerError> {
879 // Read budget_ids from exec_core (comma-separated or JSON list)
880 let budget_ids_str: Option<String> = self
881 .client
882 .cmd("HGET")
883 .arg(core_key)
884 .arg("budget_ids")
885 .execute()
886 .await?;
887
888 let budget_ids_str = match budget_ids_str {
889 Some(s) => s,
890 None => return Ok(None),
891 };
892 if budget_ids_str.is_empty() {
893 return Ok(None); // no budgets attached
894 }
895
896 // Parse comma-separated budget IDs
897 for budget_id in budget_ids_str.split(',') {
898 let budget_id = budget_id.trim();
899 if budget_id.is_empty() {
900 continue;
901 }
902 let result = checker.check_budget(&self.client, budget_id).await?;
903 if let BudgetCheckResult::HardBreach { dimension, detail } = &result {
904 // PR-94: budget hard-breach counter, labelled by
905 // dimension so operators can distinguish "tokens"
906 // breaches from "cost" breaches at a glance.
907 self.metrics.inc_budget_hit(dimension);
908 return Ok(Some(detail.clone()));
909 }
910 }
911
912 Ok(None)
913 }
914
915 /// Check quota admission for the candidate.
916 async fn check_quota(
917 &self,
918 _exec_ctx: &ExecKeyContext,
919 core_key: &str,
920 eid_s: &str,
921 now_ms: u64,
922 ) -> Result<QuotaCheckOutcome, SchedulerError> {
923 // Read quota_policy_id from exec_core
924 let quota_id_str: Option<String> = self
925 .client
926 .cmd("HGET")
927 .arg(core_key)
928 .arg("quota_policy_id")
929 .execute()
930 .await?;
931
932 let quota_id_str = match quota_id_str {
933 Some(s) => s,
934 None => return Ok(QuotaCheckOutcome::NoQuota),
935 };
936 if quota_id_str.is_empty() {
937 return Ok(QuotaCheckOutcome::NoQuota);
938 }
939
940 // Compute real {q:K} partition tag from quota_policy_id
941 let tag = match QuotaPolicyId::parse("a_id_str) {
942 Ok(qid) => {
943 let partition = quota_partition(&qid, &self.config);
944 partition.hash_tag()
945 }
946 Err(_) => "{q:0}".to_owned(), // fallback for non-UUID test IDs
947 };
948
949 let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id_str);
950 let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id_str);
951 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id_str);
952 let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id_str, eid_s);
953 let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id_str);
954
955 // Read quota limits from policy hash
956 let rate_limit: Option<String> = self.client
957 .cmd("HGET").arg("a_def_key).arg("max_requests_per_window")
958 .execute().await?;
959 let window_secs: Option<String> = self.client
960 .cmd("HGET").arg("a_def_key).arg("requests_per_window_seconds")
961 .execute().await?;
962 let concurrency_cap: Option<String> = self.client
963 .cmd("HGET").arg("a_def_key).arg("active_concurrency_cap")
964 .execute().await?;
965 let jitter: Option<String> = self.client
966 .cmd("HGET").arg("a_def_key).arg("jitter_ms")
967 .execute().await?;
968
969 let rate_limit = rate_limit.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
970 let window_secs = window_secs.as_deref().and_then(|s| s.parse().ok()).unwrap_or(60u64);
971 let concurrency_cap = concurrency_cap.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
972 let jitter_ms = jitter.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
973
974 // No limits configured — admit without recording
975 if rate_limit == 0 && concurrency_cap == 0 {
976 return Ok(QuotaCheckOutcome::NoQuota);
977 }
978
979 // FCALL ff_check_admission_and_record on {q:K}
980 let keys: [&str; 5] = [&window_key, &concurrency_key, "a_def_key, &admitted_key, &admitted_set_key];
981 let now_s = now_ms.to_string();
982 let ws = window_secs.to_string();
983 let rl = rate_limit.to_string();
984 let cc = concurrency_cap.to_string();
985 let jt = jitter_ms.to_string();
986 let argv: [&str; 6] = [&now_s, &ws, &rl, &cc, eid_s, &jt];
987
988 match self.client
989 .fcall::<ferriskey::Value>("ff_check_admission_and_record", &keys, &argv)
990 .await
991 {
992 Ok(result) => {
993 // Parse domain-specific result: {"ADMITTED"}, {"RATE_EXCEEDED", retry_after},
994 // {"CONCURRENCY_EXCEEDED"}, {"ALREADY_ADMITTED"}
995 let status = Self::parse_admission_status(&result);
996 match status.as_str() {
997 "ADMITTED" | "ALREADY_ADMITTED" => Ok(QuotaCheckOutcome::Admitted {
998 tag: tag.clone(),
999 quota_id: quota_id_str.clone(),
1000 eid: eid_s.to_owned(),
1001 }),
1002 "RATE_EXCEEDED" => {
1003 // PR-94: quota admission block, labelled by
1004 // reason so rate-vs-concurrency waves are
1005 // distinguishable in a dashboard.
1006 self.metrics.inc_quota_hit("rate");
1007 Ok(QuotaCheckOutcome::Blocked(format!(
1008 "quota {}: rate limit {}/{} per {}s window",
1009 quota_id_str, rate_limit, rate_limit, window_secs
1010 )))
1011 }
1012 "CONCURRENCY_EXCEEDED" => {
1013 self.metrics.inc_quota_hit("concurrency");
1014 Ok(QuotaCheckOutcome::Blocked(format!(
1015 "quota {}: concurrency cap {}",
1016 quota_id_str, concurrency_cap
1017 )))
1018 }
1019 other => {
1020 // Fail-closed: an unrecognised status is the Lua
1021 // telling us a contract we don't understand. Do
1022 // NOT default to admit — surface it so the
1023 // scheduler retries next cycle and ops sees the
1024 // event.
1025 tracing::warn!(
1026 quota_id = quota_id_str.as_str(),
1027 status = other,
1028 "scheduler: unexpected admission result, denying (fail-closed)"
1029 );
1030 Err(SchedulerError::Config(format!(
1031 "quota {quota_id_str}: unexpected admission status \"{other}\""
1032 )))
1033 }
1034 }
1035 }
1036 Err(e) => {
1037 // Fail-closed: transport fault on the admission FCALL
1038 // must NOT silently admit the candidate. Propagate so
1039 // the outer cycle returns the error and the worker
1040 // retries after the usual backoff; the next cycle will
1041 // re-run the admission check against fresh state.
1042 tracing::warn!(
1043 quota_id = quota_id_str.as_str(),
1044 error = %e,
1045 "scheduler: quota FCALL failed, denying (fail-closed)"
1046 );
1047 Err(SchedulerError::ValkeyContext {
1048 source: e,
1049 context: format!("ff_check_admission_and_record {quota_id_str}"),
1050 })
1051 }
1052 }
1053 }
1054
1055 /// Parse the first element of a Valkey array result as a status string.
1056 fn parse_admission_status(result: &ferriskey::Value) -> String {
1057 match result {
1058 ferriskey::Value::Array(arr) => {
1059 match arr.first() {
1060 Some(Ok(ferriskey::Value::BulkString(b))) => {
1061 String::from_utf8_lossy(b).into_owned()
1062 }
1063 Some(Ok(ferriskey::Value::SimpleString(s))) => s.clone(),
1064 _ => "UNKNOWN".to_owned(),
1065 }
1066 }
1067 _ => "UNKNOWN".to_owned(),
1068 }
1069 }
1070
1071 /// Block a candidate that failed budget/quota check.
1072 /// FCALL ff_block_execution_for_admission on {p:N}.
1073 #[allow(clippy::too_many_arguments)]
1074 async fn block_candidate(
1075 &self,
1076 partition: &Partition,
1077 idx: &IndexKeys,
1078 lane: &LaneId,
1079 eid: &ExecutionId,
1080 eligible_key: &str,
1081 block_reason: &str,
1082 blocking_detail: &str,
1083 now_ms: u64,
1084 ) {
1085 let exec_ctx = ExecKeyContext::new(partition, eid);
1086 let core_key = exec_ctx.core();
1087 let eid_s = eid.to_string();
1088 let blocked_key = match block_reason {
1089 "waiting_for_budget" => idx.lane_blocked_budget(lane),
1090 "waiting_for_quota" => idx.lane_blocked_quota(lane),
1091 "waiting_for_capable_worker" => idx.lane_blocked_route(lane),
1092 _ => idx.lane_blocked_budget(lane),
1093 };
1094
1095 let keys: [&str; 3] = [&core_key, eligible_key, &blocked_key];
1096 let now_s = now_ms.to_string();
1097 let argv: [&str; 4] = [&eid_s, block_reason, blocking_detail, &now_s];
1098
1099 // Parse FcallResult so we distinguish Lua-level rejections (e.g.
1100 // execution_not_active because the execution went terminal between
1101 // our HGET and the FCALL) from a real block. Previously `Ok(_)`
1102 // treated an err-tuple as success → INFO log "candidate blocked"
1103 // while nothing actually changed on exec_core, then the next tick
1104 // re-picked the same candidate and looped. Mirrors the
1105 // release_admission parse fix.
1106 match self.client
1107 .fcall::<ferriskey::Value>("ff_block_execution_for_admission", &keys, &argv)
1108 .await
1109 {
1110 Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
1111 Ok(_) => {
1112 tracing::info!(
1113 execution_id = eid_s,
1114 reason = block_reason,
1115 "scheduler: candidate blocked by admission check"
1116 );
1117 }
1118 Err(script_err) => {
1119 // Logical reject from Lua (e.g. execution_not_active
1120 // — the execution went terminal between the scheduler
1121 // pick and the block FCALL; the candidate loop will
1122 // naturally move on). WARN so ops dashboards surface
1123 // actual block failures, but not so loud that a common
1124 // race spams alerts.
1125 tracing::warn!(
1126 execution_id = eid_s,
1127 reason = block_reason,
1128 error = %script_err,
1129 "scheduler: ff_block_execution_for_admission rejected by Lua"
1130 );
1131 }
1132 },
1133 Err(e) => {
1134 tracing::warn!(
1135 execution_id = eid_s,
1136 error = %e,
1137 "scheduler: ff_block_execution_for_admission transport failed"
1138 );
1139 }
1140 }
1141 }
1142
1143 /// Release a previously-recorded quota admission slot.
1144 /// Called when ff_issue_claim_grant fails after admission was recorded.
1145 async fn release_admission(
1146 &self,
1147 tag: &str,
1148 quota_id: &str,
1149 eid_s: &str,
1150 ) {
1151 let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid_s);
1152 let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
1153 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
1154
1155 let keys: [&str; 3] = [&admitted_key, &admitted_set_key, &concurrency_key];
1156 let argv: [&str; 1] = [eid_s];
1157
1158 // Parse the Lua response properly: FCALL returns `Ok(Value)` for
1159 // BOTH success and logical-error paths. Treating Ok(_) blindly as
1160 // "released" logs a false positive when the Lua returns
1161 // `{0, "quota_not_found"}` (or any other script-level err) — the
1162 // slot in fact remains pinned until its TTL expires, which is
1163 // minutes to hours. Surface the real outcome so on-call sees
1164 // actual release failures instead of clean "released" events.
1165 match self.client
1166 .fcall::<ferriskey::Value>("ff_release_admission", &keys, &argv)
1167 .await
1168 {
1169 Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
1170 Ok(_) => {
1171 tracing::info!(
1172 execution_id = eid_s,
1173 quota_id,
1174 "scheduler: released admission after claim failure"
1175 );
1176 }
1177 Err(script_err) => {
1178 tracing::warn!(
1179 execution_id = eid_s,
1180 quota_id,
1181 error = %script_err,
1182 "scheduler: ff_release_admission rejected by Lua \
1183 (slot will expire via TTL)"
1184 );
1185 }
1186 },
1187 Err(e) => {
1188 tracing::warn!(
1189 execution_id = eid_s,
1190 quota_id,
1191 error = %e,
1192 "scheduler: ff_release_admission transport failed \
1193 (slot will expire via TTL)"
1194 );
1195 }
1196 }
1197 }
1198}
1199
1200/// Get server time in milliseconds via the TIME command.
1201async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
1202 let result: Vec<String> = client
1203 .cmd("TIME")
1204 .execute()
1205 .await?;
1206 if result.len() < 2 {
1207 return Err(ferriskey::Error::from((
1208 ferriskey::ErrorKind::ClientError,
1209 "TIME returned fewer than 2 elements",
1210 )));
1211 }
1212 let secs: u64 = result[0].parse().map_err(|_| {
1213 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
1214 })?;
1215 let micros: u64 = result[1].parse().map_err(|_| {
1216 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
1217 })?;
1218 Ok(secs * 1000 + micros / 1000)
1219}
1220
1221/// Errors from the scheduler.
1222#[derive(Debug, thiserror::Error)]
1223pub enum SchedulerError {
1224 /// Valkey connection or command error (preserves ErrorKind for caller inspection).
1225 #[error("valkey: {0}")]
1226 Valkey(#[from] ferriskey::Error),
1227 /// Valkey error with additional context (preserves ErrorKind via #[source]).
1228 #[error("valkey ({context}): {source}")]
1229 ValkeyContext {
1230 #[source]
1231 source: ferriskey::Error,
1232 context: String,
1233 },
1234 /// Caller-supplied value failed ingress validation. NOT retryable — the
1235 /// caller must fix its input before retrying.
1236 #[error("config: {0}")]
1237 Config(String),
1238}
1239
1240impl SchedulerError {
1241 /// Returns the underlying ferriskey ErrorKind, if this is a Valkey error.
1242 /// Matches `ServerError::valkey_kind` and `ScriptError::valkey_kind` so
1243 /// callers can treat all three uniformly.
1244 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
1245 match self {
1246 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
1247 Self::Config(_) => None,
1248 }
1249 }
1250
1251 /// Whether this error is safely retryable by a caller. Mirrors
1252 /// `ServerError::is_retryable` semantics.
1253 pub fn is_retryable(&self) -> bool {
1254 self.valkey_kind()
1255 .map(is_retryable_kind)
1256 .unwrap_or(false)
1257 }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262 use super::*;
1263 use ferriskey::ErrorKind;
1264
1265 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
1266 ferriskey::Error::from((kind, "synthetic"))
1267 }
1268
1269 #[test]
1270 fn scheduler_is_retryable_matches_kind_table() {
1271 assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
1272 assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
1273
1274 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
1275 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
1276 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
1277 }
1278
1279 #[test]
1280 fn scheduler_valkey_context_is_retryable() {
1281 let err = SchedulerError::ValkeyContext {
1282 source: mk_fk_err(ErrorKind::BusyLoadingError),
1283 context: "HGET budget_ids".into(),
1284 };
1285 assert!(err.is_retryable());
1286 }
1287
1288 #[test]
1289 fn scheduler_valkey_kind_exposed() {
1290 let err = SchedulerError::Valkey(mk_fk_err(ErrorKind::TryAgain));
1291 assert_eq!(err.valkey_kind(), Some(ErrorKind::TryAgain));
1292 }
1293
1294 // ── iter_partitions: regression test for the modular wrap + bounded
1295 // length contract. The fairness behaviour of the bounded scheduler
1296 // scan depends entirely on this helper returning exactly `count`
1297 // distinct partition indices starting at `start` and wrapping modulo
1298 // `total`. Tested in isolation so a bug here is distinguishable from
1299 // a bug in the Valkey-backed loop. ──
1300
1301 #[test]
1302 fn iter_partitions_no_wrap() {
1303 // start=10, count=5, total=256 → 10..15, no wrap involved.
1304 let ps: Vec<u16> = iter_partitions(256, 10, 5).collect();
1305 assert_eq!(ps, vec![10, 11, 12, 13, 14]);
1306 }
1307
1308 #[test]
1309 fn iter_partitions_wraps_modulo_total() {
1310 // start=254, count=5, total=256 → 254, 255, 0, 1, 2.
1311 let ps: Vec<u16> = iter_partitions(256, 254, 5).collect();
1312 assert_eq!(ps, vec![254, 255, 0, 1, 2]);
1313 }
1314
1315 #[test]
1316 fn iter_partitions_count_capped_to_total() {
1317 // Asking for more than `total` yields exactly `total` distinct
1318 // partitions — never a duplicate, never more than the universe.
1319 let ps: Vec<u16> = iter_partitions(4, 1, 100).collect();
1320 assert_eq!(ps, vec![1, 2, 3, 0]);
1321 }
1322
1323 #[test]
1324 fn iter_partitions_length_matches_count() {
1325 // Invariant: output length == min(count, total). The scan loop
1326 // upper-bounds round-trips on this, so regressing it would
1327 // silently re-introduce the 256-round-trip-per-tick bug.
1328 for start in [0u16, 1, 50, 255] {
1329 for count in [0u16, 1, 16, 32, 256] {
1330 let len = iter_partitions(256, start, count).count();
1331 assert_eq!(len, count.min(256) as usize);
1332 }
1333 }
1334 }
1335
1336 // ── Fairness: the union of the partitions visited across
1337 // `ceil(total / max_partitions_per_scan)` successive scans, with the
1338 // rotation cursor advancing each scan, must cover every partition
1339 // exactly once. This is the contract the operator rustdoc promises
1340 // ("any given partition reached within 8 ticks at defaults"). ──
1341 #[test]
1342 fn fairness_full_coverage_in_ceil_total_over_budget_scans() {
1343 const TOTAL: u16 = 256;
1344 const BUDGET: u16 = SchedulerConfig::DEFAULT_MAX_PARTITIONS_PER_SCAN;
1345 let scans = TOTAL.div_ceil(BUDGET); // 8 at defaults
1346
1347 // Simulate the same advance logic the live cursor performs: each
1348 // "scan" starts at the previous start + BUDGET (mod TOTAL). We
1349 // pin start to 0 for determinism; the per-worker FNV jitter is a
1350 // phase offset on top and doesn't change coverage.
1351 let mut union = std::collections::BTreeSet::new();
1352 let mut cursor: u16 = 0;
1353 for _ in 0..scans {
1354 for p in iter_partitions(TOTAL, cursor, BUDGET) {
1355 union.insert(p);
1356 }
1357 cursor = cursor.wrapping_add(BUDGET) % TOTAL;
1358 }
1359
1360 assert_eq!(union.len(), TOTAL as usize, "every partition visited once");
1361 for p in 0..TOTAL {
1362 assert!(union.contains(&p), "missing partition {p}");
1363 }
1364 }
1365
1366 #[test]
1367 fn fairness_full_coverage_with_phase_offset() {
1368 // Regression: the per-worker FNV phase must not change the
1369 // coverage property. Pick a non-zero start; we still cover the
1370 // whole universe in ceil(total/budget) scans.
1371 const TOTAL: u16 = 256;
1372 const BUDGET: u16 = SchedulerConfig::DEFAULT_MAX_PARTITIONS_PER_SCAN;
1373 let scans = TOTAL.div_ceil(BUDGET);
1374
1375 let mut union = std::collections::BTreeSet::new();
1376 let mut cursor: u16 = 137; // arbitrary per-worker jitter
1377 for _ in 0..scans {
1378 for p in iter_partitions(TOTAL, cursor, BUDGET) {
1379 union.insert(p);
1380 }
1381 cursor = cursor.wrapping_add(BUDGET) % TOTAL;
1382 }
1383 assert_eq!(union.len(), TOTAL as usize);
1384 }
1385
1386 #[test]
1387 fn scheduler_config_defaults_match_rustdoc() {
1388 let c = SchedulerConfig::default();
1389 assert_eq!(c.max_partitions_per_scan, 32);
1390 assert_eq!(c.rotation_window_ms, 250);
1391 }
1392}
1393
1394
1395