ff_scheduler/claim.rs
1//! Claim-grant cycle: find eligible executions and issue grants.
2//!
3//! The scheduler selects candidates from partition-local eligible sorted sets,
4//! then atomically issues a claim grant via `FCALL ff_issue_claim_grant`.
5//! The worker (ff-sdk) subsequently consumes the grant via `ff_claim_execution`.
6//!
7//! Phase 5: single lane, budget/quota pre-checks before grant issuance.
8//! Candidates that fail budget/quota are blocked via ff_block_execution_for_admission.
9//!
10//! Reference: RFC-009 §12.7, RFC-010 §3.1, RFC-008 §1.7
11//!
12//! # Postgres variant (RFC-v0.7 Wave 5b)
13//!
14//! The admission pipeline is ported to Postgres as
15//! [`ff_backend_postgres::scheduler::PostgresScheduler`]. It lives in
16//! the backend crate — not as a `Scheduler` variant here — to keep
17//! `ff-scheduler` off the sqlx dep graph. `ff-engine` / `ff-server`
18//! branch at construction time on the concrete backend type.
19//! See `crates/ff-backend-postgres/src/scheduler.rs` for the
20//! pipeline + isolation notes.
21
22use ff_core::keys::{ExecKeyContext, IndexKeys};
23use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, budget_partition, quota_partition};
24use ff_core::types::{BudgetId, ExecutionId, LaneId, QuotaPolicyId, WorkerId, WorkerInstanceId};
25use ff_script::error::ScriptError;
26use ff_script::result::FcallResult;
27use ff_script::retry::is_retryable_kind;
28use std::collections::BTreeSet;
29use std::sync::atomic::{AtomicU64, Ordering};
30
31/// Short stable digest of a worker's caps CSV for per-event log lines.
32/// Thin wrapper around the shared helper so call sites read as
33/// "worker_caps_digest" locally while the algorithm lives in one place.
34fn worker_caps_digest(csv: &str) -> String {
35 ff_core::hash::fnv1a_xor8hex(csv)
36}
37
38/// A claim grant issued by the scheduler for a specific execution.
39///
40/// Re-exported from [`ff_core::contracts::ClaimGrant`]. Lives in
41/// `ff-core` so `ff-scheduler` (issuer) and `ff-sdk` (consumer)
42/// share one wire-level type without a cross-dep between them.
43pub use ff_core::contracts::ClaimGrant;
44
45/// A reclaim grant for a resumed (attempt_interrupted) execution.
46///
47/// Re-export of [`ff_core::contracts::ReclaimGrant`] for symmetry
48/// with [`ClaimGrant`]. `ff-scheduler` will be the canonical
49/// producer once the Batch-C reclaim scanner lands; today only
50/// test fixtures construct this type. Consumed by
51/// `FlowFabricWorker::claim_from_reclaim_grant`.
52pub use ff_core::contracts::ReclaimGrant;
53
54/// Budget check result from a cross-partition budget read.
55#[derive(Debug)]
56pub enum BudgetCheckResult {
57 /// Budget is within limits — proceed.
58 Ok,
59 /// Budget hard limit breached. Contains (dimension, detail_string).
60 HardBreach { dimension: String, detail: String },
61}
62
63/// Outcome of a quota admission check.
64enum QuotaCheckOutcome {
65 /// No quota attached to this execution.
66 NoQuota,
67 /// Quota admitted — carries context for release on subsequent failure.
68 Admitted { tag: String, quota_id: String, eid: String },
69 /// Quota denied — execution should be blocked.
70 Blocked(String),
71}
72
73/// Cross-partition budget checker with per-cycle caching.
74///
75/// Reads budget usage/limits once per scan cycle (not per candidate).
76/// This is MANDATORY for performance — without it, 50K blocked executions
77/// would produce 50K budget reads per cycle.
78pub struct BudgetChecker {
79 /// Cached budget status: budget_id → BudgetCheckResult.
80 /// Reset at the start of each scheduler cycle.
81 cache: std::collections::HashMap<String, BudgetCheckResult>,
82 config: PartitionConfig,
83}
84
85impl BudgetChecker {
86 pub fn new(config: PartitionConfig) -> Self {
87 Self {
88 cache: std::collections::HashMap::new(),
89 config,
90 }
91 }
92
93 /// Check a budget by ID. Reads from Valkey on first call per budget,
94 /// caches for subsequent candidates in the same cycle.
95 ///
96 /// Fail-closed: transport errors propagate as
97 /// [`SchedulerError::ValkeyContext`] rather than being cached as
98 /// `Ok`. Caching an Err would be wrong — a transient blip would then
99 /// pin every candidate in this cycle to the same denial. Successful
100 /// results (including `HardBreach`) are still cached so 50K blocked
101 /// candidates sharing a budget do one read per cycle, not 50K.
102 pub async fn check_budget(
103 &mut self,
104 client: &ferriskey::Client,
105 budget_id: &str,
106 ) -> Result<&BudgetCheckResult, SchedulerError> {
107 if self.cache.contains_key(budget_id) {
108 return Ok(&self.cache[budget_id]);
109 }
110
111 // Compute real {b:M} partition tag from budget_id
112 let (usage_key, limits_key) = match BudgetId::parse(budget_id) {
113 Ok(bid) => {
114 let partition = budget_partition(&bid, &self.config);
115 let tag = partition.hash_tag();
116 (
117 format!("ff:budget:{}:{}:usage", tag, budget_id),
118 format!("ff:budget:{}:{}:limits", tag, budget_id),
119 )
120 }
121 Err(_) => {
122 // Fallback for non-UUID budget IDs (test compat)
123 (
124 format!("ff:budget:{{b:0}}:{}:usage", budget_id),
125 format!("ff:budget:{{b:0}}:{}:limits", budget_id),
126 )
127 }
128 };
129
130 let result =
131 Self::read_and_check(client, &usage_key, &limits_key)
132 .await
133 .map_err(|source| SchedulerError::ValkeyContext {
134 source,
135 context: format!("budget_checker read {budget_id}"),
136 })?;
137
138 self.cache.insert(budget_id.to_owned(), result);
139 Ok(&self.cache[budget_id])
140 }
141
142 /// Read budget usage and limits, compare each dimension.
143 async fn read_and_check(
144 client: &ferriskey::Client,
145 usage_key: &str,
146 limits_key: &str,
147 ) -> Result<BudgetCheckResult, ferriskey::Error> {
148 // Read all limit dimensions via hgetall (returns HashMap, not flat pairs)
149 let limits: std::collections::HashMap<String, String> = client
150 .hgetall(limits_key)
151 .await?;
152
153 // Parse hard limits
154 for (field, limit_val) in &limits {
155 if !field.starts_with("hard:") {
156 continue;
157 }
158 let dimension = &field[5..]; // strip "hard:" prefix
159 let limit: u64 = match limit_val.parse() {
160 Ok(v) if v > 0 => v,
161 _ => continue,
162 };
163
164 // Read current usage for this dimension. Fail-closed: a
165 // transport error must NOT silently default the usage to 0
166 // (which would make every breach invisible). Absence is
167 // still treated as 0 — the caller is expected to HSET
168 // usage_key on every increment.
169 let usage_str: Option<String> = client
170 .cmd("HGET")
171 .arg(usage_key)
172 .arg(dimension)
173 .execute()
174 .await?;
175 let usage: u64 = usage_str
176 .as_deref()
177 .and_then(|s| s.parse().ok())
178 .unwrap_or(0);
179
180 if usage >= limit {
181 return Ok(BudgetCheckResult::HardBreach {
182 dimension: dimension.to_owned(),
183 detail: format!("budget {}: {} {}/{}", usage_key, dimension, usage, limit),
184 });
185 }
186 }
187
188 Ok(BudgetCheckResult::Ok)
189 }
190
191 /// Clear the cache at the start of a new scheduler cycle.
192 pub fn reset(&mut self) {
193 self.cache.clear();
194 }
195}
196
197/// Tunable scheduler behavior, separate from the topology-level
198/// [`PartitionConfig`]. Lives in `ff-scheduler` because every field is a
199/// scheduler-internal scan policy — none of it leaks into persisted keys,
200/// Lua scripts, or cross-crate wire shapes (unlike `PartitionConfig`).
201/// If a future RFC needs a unified knob surface we can lift this up; not
202/// premature today.
203#[derive(Debug, Clone, Copy)]
204pub struct SchedulerConfig {
205 /// Maximum number of partitions to probe in a single
206 /// [`Scheduler::claim_for_worker`] call before giving up and returning
207 /// `Ok(None)`.
208 ///
209 /// **Trade-off:** smaller = lower worst-case no-hit latency per claim
210 /// call (each probe is a ZRANGEBYSCORE round-trip, ~0.1ms LAN), larger
211 /// = better fairness per call (more partitions seen before a worker
212 /// yields). At the default of 32 with 256 partitions, any given
213 /// partition is reached within `ceil(256/32) = 8` scheduling ticks —
214 /// combined with the rotation cursor, that bounds worst-case
215 /// starvation for a specific partition's head-of-queue execution.
216 pub max_partitions_per_scan: u16,
217 /// Duration a rotation cursor position stays stable before advancing.
218 ///
219 /// **Trade-off:** too short and tight-loop workers re-enter the same
220 /// window on every tick (cursor never actually rotates relative to
221 /// them); too long and slow-poll workers keep seeing the same cursor
222 /// across many ticks (reducing fairness benefit). 250ms is a middle
223 /// ground: tight-loop workers (sub-ms claim cycles) see a fresh
224 /// window every ~250 ticks, 1s-poll workers see a fresh window every
225 /// 4 ticks. Tune down if your workers all idle-poll >1s; tune up if
226 /// you run a fleet of tight-loop claimers and want less cursor churn.
227 pub rotation_window_ms: u64,
228}
229
230impl SchedulerConfig {
231 /// Default scan budget: probe 32 partitions per claim call.
232 /// See [`Self::max_partitions_per_scan`] for the latency/fairness
233 /// rationale.
234 pub const DEFAULT_MAX_PARTITIONS_PER_SCAN: u16 = 32;
235
236 /// Default rotation window: advance the cursor every 250ms.
237 /// See [`Self::rotation_window_ms`].
238 pub const DEFAULT_ROTATION_WINDOW_MS: u64 = 250;
239}
240
241impl Default for SchedulerConfig {
242 fn default() -> Self {
243 Self {
244 max_partitions_per_scan: Self::DEFAULT_MAX_PARTITIONS_PER_SCAN,
245 rotation_window_ms: Self::DEFAULT_ROTATION_WINDOW_MS,
246 }
247 }
248}
249
250/// Iterate partitions `[start, start+1, ..., start+count-1] mod total`.
251///
252/// Factored out of [`Scheduler::claim_for_worker`] so the modular-wrap +
253/// bounded-length contract has a dedicated unit test surface, independent
254/// of Valkey. Called with `count <= total`; values are always distinct.
255///
256/// Returns an iterator (not a Vec) to keep the hot claim path allocation-
257/// free — this runs once per claim tick per worker.
258fn iter_partitions(total: u16, start: u16, count: u16) -> impl Iterator<Item = u16> {
259 debug_assert!(total > 0);
260 let count = count.min(total);
261 (0..count).map(move |i| start.wrapping_add(i) % total)
262}
263
264/// Single-lane scheduler with budget/quota pre-checks.
265///
266/// Iterates execution partitions sequentially, picks the first eligible
267/// execution (lowest priority score). Before issuing a claim grant:
268/// 1. Check all attached budgets (cross-partition, cached per cycle)
269/// 2. Check quota admission (cross-partition FCALL)
270/// 3. If any check fails: block the candidate and try next
271/// 4. If all pass: issue the claim grant
272///
273/// Scan bounding + rotation: each call probes at most
274/// [`SchedulerConfig::max_partitions_per_scan`] partitions starting from
275/// a rotation cursor that advances once per
276/// [`SchedulerConfig::rotation_window_ms`]. The per-worker FNV jitter is
277/// applied on top so different workers diverge within any given window.
278pub struct Scheduler {
279 client: ferriskey::Client,
280 config: PartitionConfig,
281 scheduler_config: SchedulerConfig,
282 /// Packed rotation state: high 48 bits = last window epoch seen
283 /// (`now_ms / rotation_window_ms`), low 16 bits = current cursor
284 /// partition index. A single atomic keeps "advance cursor iff we're
285 /// the first call in a new window" race-free without a Mutex.
286 rotation_state: AtomicU64,
287 /// PR-94: observability handle for claim_from_grant duration and
288 /// budget/quota hit counters. Defaults to a fresh
289 /// `ff_observability::Metrics::new()` — under the default build
290 /// (`observability` feature off) that's the no-op shim; with the
291 /// feature on it's a private real OTEL registry not shared with
292 /// any scrape. Callers that want a shared registry plumb it in
293 /// via [`Self::with_metrics`] / [`Self::with_config_and_metrics`].
294 metrics: std::sync::Arc<ff_observability::Metrics>,
295}
296
297impl Scheduler {
298 pub fn new(client: ferriskey::Client, config: PartitionConfig) -> Self {
299 Self::with_config(client, config, SchedulerConfig::default())
300 }
301
302 /// Construct a scheduler with an explicit [`SchedulerConfig`].
303 /// Use this when you need non-default scan bounds (e.g., in tests that
304 /// want to walk every partition in one call, or deployments with
305 /// non-default polling cadence). Most callers should use
306 /// [`Self::new`].
307 pub fn with_config(
308 client: ferriskey::Client,
309 config: PartitionConfig,
310 scheduler_config: SchedulerConfig,
311 ) -> Self {
312 Self::with_config_and_metrics(
313 client,
314 config,
315 scheduler_config,
316 std::sync::Arc::new(ff_observability::Metrics::new()),
317 )
318 }
319
320 /// PR-94: construct a scheduler with a shared observability
321 /// registry and default [`SchedulerConfig`]. Used by `ff-server`
322 /// so claim/budget/quota metrics land in the same registry
323 /// exposed at `/metrics`. For test harnesses that need both a
324 /// custom `SchedulerConfig` AND observability, use
325 /// [`Self::with_config_and_metrics`].
326 pub fn with_metrics(
327 client: ferriskey::Client,
328 config: PartitionConfig,
329 metrics: std::sync::Arc<ff_observability::Metrics>,
330 ) -> Self {
331 Self::with_config_and_metrics(client, config, SchedulerConfig::default(), metrics)
332 }
333
334 /// PR-94: construct a scheduler with an explicit
335 /// [`SchedulerConfig`] AND a shared observability registry.
336 /// Convenience constructor so callers don't have to thread
337 /// both concerns through separate builders.
338 pub fn with_config_and_metrics(
339 client: ferriskey::Client,
340 config: PartitionConfig,
341 scheduler_config: SchedulerConfig,
342 metrics: std::sync::Arc<ff_observability::Metrics>,
343 ) -> Self {
344 Self {
345 client,
346 config,
347 scheduler_config,
348 rotation_state: AtomicU64::new(0),
349 metrics,
350 }
351 }
352
353 /// Return the current cursor for this call, advancing it if we're the
354 /// first caller to observe a new rotation window. Pure compare-exchange
355 /// on the packed atomic; no Mutex, no clock dep beyond `now_ms`.
356 fn rotation_cursor(&self, now_ms: u64, num_partitions: u16) -> u16 {
357 let window_ms = self.scheduler_config.rotation_window_ms.max(1);
358 let step = self.scheduler_config.max_partitions_per_scan.max(1);
359 let this_epoch = now_ms / window_ms;
360
361 loop {
362 let prev = self.rotation_state.load(Ordering::Relaxed);
363 let prev_epoch = prev >> 16;
364 let prev_cursor = (prev & 0xFFFF) as u16;
365
366 if prev_epoch == this_epoch {
367 return prev_cursor;
368 }
369 // New window: advance cursor by max_partitions_per_scan so
370 // the next scan covers a fresh slice. Modular wrap on
371 // num_partitions. Zero-partitions is a config bug; caller
372 // guards it, but fall back to 0 to be safe.
373 let new_cursor = if num_partitions == 0 {
374 0
375 } else {
376 prev_cursor.wrapping_add(step) % num_partitions
377 };
378 let new_state = (this_epoch << 16) | u64::from(new_cursor);
379 match self.rotation_state.compare_exchange(
380 prev,
381 new_state,
382 Ordering::Relaxed,
383 Ordering::Relaxed,
384 ) {
385 Ok(_) => return new_cursor,
386 // Lost race; another caller advanced. Re-read and use
387 // whatever cursor now belongs to this epoch.
388 Err(_) => continue,
389 }
390 }
391 }
392
393 /// Find an eligible execution and issue a claim grant.
394 ///
395 /// Iterates all execution partitions looking for the first partition
396 /// with an eligible execution. Issues a claim grant via FCALL.
397 ///
398 /// `worker_capabilities` is sent to `ff_issue_claim_grant` as a sorted
399 /// CSV (BTreeSet guarantees deterministic order). Executions whose
400 /// `required_capabilities` are not a subset of this set are skipped
401 /// (stay queued) via `ScriptError::CapabilityMismatch`.
402 ///
403 /// Returns `Ok(None)` if no eligible executions exist anywhere.
404 /// Returns `Ok(Some(grant))` on success.
405 /// Returns `Err` on Valkey errors.
406 ///
407 /// # Lane-FIFO: prior-run leftovers drain first
408 ///
409 /// Claim grants are issued in per-lane FIFO order over the lane's
410 /// eligible queue. A smoke / dev script that reuses a lane across
411 /// runs will observe the lane's leftover executions from earlier
412 /// runs — failed, orphaned, or still-eligible — drain *before*
413 /// the fresh submission is claimed. This surfaces as a confusing
414 /// "wrong execution picked up" during iterative development.
415 /// Smoke tests should either (a) use a fresh lane name per run
416 /// (e.g. suffixing with a UUID or timestamp), or (b) call
417 /// `cancel_flow` / `FLUSHDB` between runs to drain the lane.
418 pub async fn claim_for_worker(
419 &self,
420 lane: &LaneId,
421 worker_id: &WorkerId,
422 worker_instance_id: &WorkerInstanceId,
423 worker_capabilities: &BTreeSet<String>,
424 grant_ttl_ms: u64,
425 ) -> Result<Option<ClaimGrant>, SchedulerError> {
426 // PR-94: grant issuance latency histogram. Captures the full
427 // cycle (candidate selection + budget/quota admission +
428 // issuance FCALL) so an operator can spot slow paths without
429 // having to correlate scheduler logs across partitions. Drop
430 // guard records on *every* exit path (including ?-propagated
431 // errors) so the histogram reflects wall-clock cost, not just
432 // the happy path.
433 struct ClaimTimer<'a> {
434 start: std::time::Instant,
435 lane: &'a str,
436 metrics: &'a ff_observability::Metrics,
437 }
438 impl Drop for ClaimTimer<'_> {
439 fn drop(&mut self) {
440 self.metrics
441 .record_claim_from_grant(self.lane, self.start.elapsed());
442 }
443 }
444 let _claim_timer = ClaimTimer {
445 start: std::time::Instant::now(),
446 lane: lane.as_str(),
447 metrics: &self.metrics,
448 };
449 let num_partitions = self.config.num_flow_partitions;
450 // Guard misconfiguration: a zero partition count would hit a
451 // `% num_partitions` division-by-zero in the scan_start / jitter
452 // computation below. The pre-bounded loop simply skipped on
453 // `for offset in 0..0`; preserve that graceful no-op rather than
454 // panicking. Config-returning (not Ok(None)) so an operator who
455 // misconfigures gets a loud, actionable error — silent Ok(None)
456 // would make every claim call look like an empty queue forever.
457 if num_partitions == 0 {
458 return Err(SchedulerError::Config(
459 "num_flow_partitions must be > 0".to_owned(),
460 ));
461 }
462 let mut budget_checker = BudgetChecker::new(self.config);
463
464 // Jitter the partition scan start to avoid thundering-herd on
465 // partition 0 when 100 workers all tick simultaneously. Seeded
466 // from worker_instance_id so this worker hits a stable window
467 // within a single scheduling cycle (still covers every partition),
468 // and different workers naturally diverge. Uses the shared
469 // ff_core::hash FNV-1a reducer — same helper powering ff-sdk's
470 // PARTITION_SCAN_CHUNK cursor seed. Zero-modulus safe.
471 let start_p: u16 = ff_core::hash::fnv1a_u16_mod(
472 worker_instance_id.as_str(),
473 num_partitions,
474 );
475 // BTreeSet iterates sorted → stable CSV for Lua subset match.
476 // Ingress validation mirrors FlowFabricWorker::connect (ff-sdk):
477 // - `,` is the CSV delimiter — a token containing one would split
478 // mid-parse and could let a {"gpu"} worker appear to satisfy
479 // {"gpu,cuda"} (silent auth bypass).
480 // - Empty strings would inflate the CSV with leading / adjacent
481 // commas ("" → ",gpu" → [" "," gpu"]) and inflate the token
482 // count past CAPS_MAX_TOKENS for no semantic reason.
483 // - Non-printable / whitespace: "gpu " vs "gpu" or "gpu\n" vs
484 // "gpu" silently mis-routes. Require printable ASCII excluding
485 // space (0x21-0x7E) at ingress so typos fail loud.
486 // Enforce ALL here so operator misconfig at the scheduler entry
487 // point fails loud, symmetric with the SDK inline-claim path.
488 // Bounds (#csv, #tokens) are enforced by the Lua side.
489 for cap in worker_capabilities {
490 if cap.is_empty() {
491 return Err(SchedulerError::Config(
492 "capability token must not be empty".to_owned(),
493 ));
494 }
495 if cap.contains(',') {
496 return Err(SchedulerError::Config(format!(
497 "capability token may not contain ',' (CSV delimiter): {cap:?}"
498 )));
499 }
500 // Reject ASCII control + whitespace (incl. Unicode whitespace);
501 // allow non-ASCII printable UTF-8 so i18n cap names work. CSV
502 // delimiter `,` is single-byte and never a UTF-8 continuation,
503 // so multibyte UTF-8 is safe on the wire. Symmetric with
504 // ff-sdk::FlowFabricWorker::connect.
505 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
506 return Err(SchedulerError::Config(format!(
507 "capability token must not contain whitespace or control characters: {cap:?}"
508 )));
509 }
510 }
511 if worker_capabilities.len() > ff_core::policy::CAPS_MAX_TOKENS {
512 return Err(SchedulerError::Config(format!(
513 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
514 ff_core::policy::CAPS_MAX_TOKENS,
515 worker_capabilities.len()
516 )));
517 }
518 let worker_caps_csv = worker_capabilities
519 .iter()
520 .filter(|s| !s.is_empty())
521 .cloned()
522 .collect::<Vec<_>>()
523 .join(",");
524 // Stable digest used in per-mismatch logs so the full 4KB CSV
525 // doesn't get echoed on every mismatch. See worker_caps_digest.
526 let worker_caps_hash = worker_caps_digest(&worker_caps_csv);
527 if worker_caps_csv.len() > ff_core::policy::CAPS_MAX_BYTES {
528 return Err(SchedulerError::Config(format!(
529 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
530 ff_core::policy::CAPS_MAX_BYTES,
531 worker_caps_csv.len()
532 )));
533 }
534
535 // ── Bounded scan with rotation cursor ──
536 // Prior impl walked all `num_partitions` partitions on every call,
537 // which at `num_flow_partitions = 256` meant a quiet cluster cost
538 // 256 ZRANGEBYSCORE round-trips per claim tick per worker. The
539 // bounded scan caps per-call work at `max_partitions_per_scan`
540 // while the rotation cursor ensures every partition is still
541 // visited within `ceil(total / max_partitions_per_scan)` ticks.
542 //
543 // Needs `now_ms` to compute the rotation window, so we snap server
544 // time up front. One extra TIME round-trip for quiet-cluster no-hit
545 // calls, but on hit paths the existing inner-loop `server_time_ms`
546 // call is now redundant — we reuse this value.
547 let scan_now_ms = match server_time_ms(&self.client).await {
548 Ok(t) => t,
549 Err(e) => {
550 // Transport error talking to Valkey; surface via the same
551 // `ValkeyContext` channel the admission checks use so the
552 // caller retries after backoff instead of silently hitting
553 // partition 0 with a zero cursor.
554 return Err(SchedulerError::ValkeyContext {
555 source: e,
556 context: "scheduler: TIME for rotation cursor".to_owned(),
557 });
558 }
559 };
560 let rotation_cursor = self.rotation_cursor(scan_now_ms, num_partitions);
561 // Per-worker jitter stacks on the shared cursor: `start_p` diverges
562 // different workers within one window; `rotation_cursor` drifts the
563 // whole fleet across windows so no single partition is anyone's
564 // permanent "partition 0".
565 let scan_start = (start_p + rotation_cursor) % num_partitions;
566 let scan_budget = self
567 .scheduler_config
568 .max_partitions_per_scan
569 .min(num_partitions)
570 .max(1);
571
572 // Observability counters (RFC-plan item 3): one debug line per
573 // call, not per partition, so a tight-loop worker doesn't flood
574 // logs. `partitions_hit` is 0/1 in practice — the loop returns on
575 // the first grant — but keeping it a counter means future
576 // multi-grant variants (N per call) don't need the log format to
577 // change.
578 let mut partitions_visited: u16 = 0;
579 let mut partitions_skipped: u16 = 0;
580 let mut partitions_hit: u16 = 0;
581 let call_start = std::time::Instant::now();
582
583 for p_idx in iter_partitions(num_partitions, scan_start, scan_budget) {
584 partitions_visited += 1;
585 let partition = Partition {
586 family: PartitionFamily::Execution,
587 index: p_idx,
588 };
589 let idx = IndexKeys::new(&partition);
590 let eligible_key = idx.lane_eligible(lane);
591
592 // ZRANGEBYSCORE eligible -inf +inf LIMIT 0 1
593 // Lowest score = highest priority candidate
594 let candidates: Vec<String> = match self
595 .client
596 .cmd("ZRANGEBYSCORE")
597 .arg(&eligible_key)
598 .arg("-inf")
599 .arg("+inf")
600 .arg("LIMIT")
601 .arg("0")
602 .arg("1")
603 .execute()
604 .await
605 {
606 Ok(ids) => ids,
607 Err(e) => {
608 tracing::warn!(
609 partition = p_idx,
610 error = %e,
611 "scheduler: ZRANGEBYSCORE eligible failed, skipping partition"
612 );
613 partitions_skipped += 1;
614 continue;
615 }
616 };
617
618 let eid_str = match candidates.first() {
619 Some(s) => s,
620 None => {
621 // Empty partition — this is the hot path on a quiet
622 // cluster and the whole point of the bounded scan.
623 // Don't count as a "skip" (skip implies something
624 // went wrong); treat as a normal miss.
625 continue;
626 }
627 };
628
629 // Parse the execution ID
630 let eid = match ExecutionId::parse(eid_str) {
631 Ok(id) => id,
632 Err(e) => {
633 tracing::warn!(
634 partition = p_idx,
635 execution_id = eid_str.as_str(),
636 error = %e,
637 "scheduler: invalid execution_id in eligible set, skipping"
638 );
639 partitions_skipped += 1;
640 continue;
641 }
642 };
643
644 let exec_ctx = ExecKeyContext::new(&partition, &eid);
645 let core_key = exec_ctx.core();
646 let eid_s = eid.to_string();
647 // Reuse the call-scoped `scan_now_ms` we read for the rotation
648 // cursor. Semantics for block-detail timestamps are unchanged —
649 // a block decision at time T is still written with a T in the
650 // same call; the tiny staleness (<< 1 RTT) is dwarfed by the
651 // usual scheduler→Valkey latency and saves one TIME round-trip
652 // per candidate.
653 let now_ms = scan_now_ms;
654
655 // ── Capability pre-check (in-slot HGET, cheap) ──
656 // Runs BEFORE quota admission so we never ZADD a quota slot
657 // for an execution this worker can't actually claim. Without
658 // this, on an unmatchable-top-of-zset, every scheduling tick
659 // would ZADD {q:K}:admitted_set then ZREM it via
660 // release_admission on the capability_mismatch reject — a
661 // cross-slot write storm amplifying the mismatch loop.
662 //
663 // Lua `ff_issue_claim_grant` still does the authoritative
664 // check; this is a fast-path short-circuit, not a substitute.
665 // A narrow race exists where `required_capabilities` is
666 // updated between our HGET and the FCALL — the FCALL is still
667 // atomic and correct.
668 let required_caps_csv: Option<String> = match self
669 .client
670 .cmd("HGET")
671 .arg(&core_key)
672 .arg("required_capabilities")
673 .execute::<Option<String>>()
674 .await
675 {
676 Ok(v) => v,
677 Err(e) => {
678 tracing::warn!(
679 partition = p_idx,
680 execution_id = eid_s.as_str(),
681 error = %e,
682 "scheduler: HGET required_capabilities failed, skipping candidate"
683 );
684 continue;
685 }
686 };
687 if let Some(req) = required_caps_csv.as_deref()
688 && !req.is_empty()
689 && !ff_core::caps::matches_csv(req, worker_capabilities)
690 {
691 // Move this execution out of eligible into blocked_route
692 // so we don't hot-loop on it every tick (RFC-009 §564). A
693 // periodic sweep (scanner side) promotes blocked_route →
694 // eligible when a worker with matching caps registers.
695 // Logged with a hash digest, not the raw CSV, to keep
696 // per-mismatch log volume bounded.
697 tracing::info!(
698 partition = p_idx,
699 execution_id = eid_s.as_str(),
700 worker_id = worker_id.as_str(),
701 worker_caps_hash = worker_caps_hash.as_str(),
702 required = req,
703 "scheduler: capability mismatch, blocking execution off eligible"
704 );
705 self.block_candidate(
706 &partition, &idx, lane, &eid, &eligible_key,
707 "waiting_for_capable_worker",
708 "no connected worker satisfies required_capabilities",
709 now_ms,
710 ).await;
711 continue;
712 }
713
714 // ── Budget pre-check (cross-partition, cached per cycle) ──
715 if let Some(block_detail) = self
716 .check_budgets(&mut budget_checker, &exec_ctx, &core_key, &eid_s)
717 .await?
718 {
719 // Budget breached — block candidate and try next
720 self.block_candidate(
721 &partition, &idx, lane, &eid, &eligible_key,
722 "waiting_for_budget", &block_detail, now_ms,
723 ).await;
724 continue;
725 }
726
727 // ── Quota pre-check (cross-partition FCALL on {q:K}) ──
728 let quota_admission = self
729 .check_quota(&exec_ctx, &core_key, &eid_s, now_ms)
730 .await?;
731 match "a_admission {
732 QuotaCheckOutcome::Blocked(block_detail) => {
733 self.block_candidate(
734 &partition, &idx, lane, &eid, &eligible_key,
735 "waiting_for_quota", block_detail, now_ms,
736 ).await;
737 continue;
738 }
739 QuotaCheckOutcome::NoQuota | QuotaCheckOutcome::Admitted { .. } => {}
740 }
741
742 // ── All checks passed — issue claim grant ──
743 let grant_key = exec_ctx.claim_grant();
744 let keys: [&str; 3] = [&core_key, &grant_key, &eligible_key];
745
746 let ttl_str = grant_ttl_ms.to_string();
747 let wid_s = worker_id.to_string();
748 let wiid_s = worker_instance_id.to_string();
749 let lane_s = lane.to_string();
750
751 let argv: [&str; 9] = [
752 &eid_s,
753 &wid_s,
754 &wiid_s,
755 &lane_s,
756 "", // capability_hash
757 &ttl_str,
758 "", // route_snapshot_json
759 "", // admission_summary
760 &worker_caps_csv, // sorted CSV; empty → matches only empty-required execs
761 ];
762
763 let raw = match self
764 .client
765 .fcall::<ferriskey::Value>("ff_issue_claim_grant", &keys, &argv)
766 .await
767 {
768 Ok(v) => v,
769 Err(e) => {
770 // Transport failure on the FCALL — NOSCRIPT, IoError,
771 // ClusterDown, etc. This is NOT a normal soft-reject;
772 // persistent transport errors mean the scheduler is
773 // effectively idle even though it looks like it's
774 // running. WARN so ops dashboards (WARN+ aggregators)
775 // fire instead of burying it at DEBUG.
776 tracing::warn!(
777 partition = p_idx,
778 execution_id = eid_s.as_str(),
779 error = %e,
780 "scheduler: ff_issue_claim_grant transport error, trying next"
781 );
782 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
783 self.release_admission(tag, quota_id, eid).await;
784 }
785 continue;
786 }
787 };
788
789 match FcallResult::parse(&raw).and_then(|r| r.into_success()) {
790 Ok(_) => {
791 partitions_hit += 1;
792 tracing::debug!(
793 partition = p_idx,
794 execution_id = eid_s.as_str(),
795 worker_instance_id = worker_instance_id.as_str(),
796 start_p = scan_start,
797 partitions_visited,
798 partitions_skipped,
799 partitions_hit,
800 elapsed_ms = call_start.elapsed().as_millis() as u64,
801 "scheduler: claim call completed (hit)"
802 );
803 return Ok(Some(ClaimGrant {
804 execution_id: eid,
805 partition_key: ff_core::partition::PartitionKey::from(&partition),
806 grant_key: grant_key.clone(),
807 expires_at_ms: now_ms + grant_ttl_ms,
808 }));
809 }
810 Err(script_err) => {
811 if matches!(script_err, ScriptError::CapabilityMismatch(_)) {
812 // Should be rare: the Rust pre-check above
813 // normally catches this and blocks the execution
814 // off eligible. Reaching here means the
815 // required_capabilities field mutated between our
816 // HGET and the Lua atomic check (narrow race).
817 // Block here too so the next tick doesn't loop.
818 //
819 // Log uses worker_caps_hash (8-hex digest), not
820 // the full 4KB CSV, to keep per-mismatch log
821 // volume bounded. Full CSV is logged once at
822 // worker connect under "worker caps" WARN.
823 tracing::info!(
824 partition = p_idx,
825 execution_id = eid_s.as_str(),
826 worker_id = wid_s.as_str(),
827 worker_caps_hash = worker_caps_hash.as_str(),
828 error = %script_err,
829 "scheduler: capability mismatch via Lua (race), blocking execution"
830 );
831 self.block_candidate(
832 &partition, &idx, lane, &eid, &eligible_key,
833 "waiting_for_capable_worker",
834 "no connected worker satisfies required_capabilities",
835 now_ms,
836 ).await;
837 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
838 self.release_admission(tag, quota_id, eid).await;
839 }
840 continue;
841 } else {
842 // Any other logical reject (grant_already_exists,
843 // execution_not_in_eligible_set, execution_not_eligible,
844 // invalid_capabilities, etc.). These are rare and each
845 // indicates either a race or a config problem — in
846 // either case ops need to see it, so WARN, not DEBUG.
847 tracing::warn!(
848 partition = p_idx,
849 execution_id = eid_s.as_str(),
850 error = %script_err,
851 "scheduler: ff_issue_claim_grant rejected, trying next"
852 );
853 }
854 if let QuotaCheckOutcome::Admitted { tag, quota_id, eid } = "a_admission {
855 self.release_admission(tag, quota_id, eid).await;
856 }
857 continue;
858 }
859 }
860 }
861
862 tracing::debug!(
863 worker_instance_id = worker_instance_id.as_str(),
864 start_p = scan_start,
865 partitions_visited,
866 partitions_skipped,
867 partitions_hit,
868 elapsed_ms = call_start.elapsed().as_millis() as u64,
869 "scheduler: claim call completed (no hit)"
870 );
871 Ok(None)
872 }
873
874 /// Read budget_ids from exec_core and check each. Returns block detail
875 /// string if any budget is breached, None if all pass.
876 async fn check_budgets(
877 &self,
878 checker: &mut BudgetChecker,
879 _exec_ctx: &ExecKeyContext,
880 core_key: &str,
881 _eid_s: &str,
882 ) -> Result<Option<String>, SchedulerError> {
883 // Read budget_ids from exec_core (comma-separated or JSON list)
884 let budget_ids_str: Option<String> = self
885 .client
886 .cmd("HGET")
887 .arg(core_key)
888 .arg("budget_ids")
889 .execute()
890 .await?;
891
892 let budget_ids_str = match budget_ids_str {
893 Some(s) => s,
894 None => return Ok(None),
895 };
896 if budget_ids_str.is_empty() {
897 return Ok(None); // no budgets attached
898 }
899
900 // Parse comma-separated budget IDs
901 for budget_id in budget_ids_str.split(',') {
902 let budget_id = budget_id.trim();
903 if budget_id.is_empty() {
904 continue;
905 }
906 let result = checker.check_budget(&self.client, budget_id).await?;
907 if let BudgetCheckResult::HardBreach { dimension, detail } = &result {
908 // PR-94: budget hard-breach counter, labelled by
909 // dimension so operators can distinguish "tokens"
910 // breaches from "cost" breaches at a glance.
911 self.metrics.inc_budget_hit(dimension);
912 return Ok(Some(detail.clone()));
913 }
914 }
915
916 Ok(None)
917 }
918
919 /// Check quota admission for the candidate.
920 async fn check_quota(
921 &self,
922 _exec_ctx: &ExecKeyContext,
923 core_key: &str,
924 eid_s: &str,
925 now_ms: u64,
926 ) -> Result<QuotaCheckOutcome, SchedulerError> {
927 // Read quota_policy_id from exec_core
928 let quota_id_str: Option<String> = self
929 .client
930 .cmd("HGET")
931 .arg(core_key)
932 .arg("quota_policy_id")
933 .execute()
934 .await?;
935
936 let quota_id_str = match quota_id_str {
937 Some(s) => s,
938 None => return Ok(QuotaCheckOutcome::NoQuota),
939 };
940 if quota_id_str.is_empty() {
941 return Ok(QuotaCheckOutcome::NoQuota);
942 }
943
944 // Compute real {q:K} partition tag from quota_policy_id
945 let tag = match QuotaPolicyId::parse("a_id_str) {
946 Ok(qid) => {
947 let partition = quota_partition(&qid, &self.config);
948 partition.hash_tag()
949 }
950 Err(_) => "{q:0}".to_owned(), // fallback for non-UUID test IDs
951 };
952
953 let quota_def_key = format!("ff:quota:{}:{}", tag, quota_id_str);
954 let window_key = format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id_str);
955 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id_str);
956 let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id_str, eid_s);
957 let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id_str);
958
959 // Read quota limits from policy hash
960 let rate_limit: Option<String> = self.client
961 .cmd("HGET").arg("a_def_key).arg("max_requests_per_window")
962 .execute().await?;
963 let window_secs: Option<String> = self.client
964 .cmd("HGET").arg("a_def_key).arg("requests_per_window_seconds")
965 .execute().await?;
966 let concurrency_cap: Option<String> = self.client
967 .cmd("HGET").arg("a_def_key).arg("active_concurrency_cap")
968 .execute().await?;
969 let jitter: Option<String> = self.client
970 .cmd("HGET").arg("a_def_key).arg("jitter_ms")
971 .execute().await?;
972
973 let rate_limit = rate_limit.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
974 let window_secs = window_secs.as_deref().and_then(|s| s.parse().ok()).unwrap_or(60u64);
975 let concurrency_cap = concurrency_cap.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
976 let jitter_ms = jitter.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0u64);
977
978 // No limits configured — admit without recording
979 if rate_limit == 0 && concurrency_cap == 0 {
980 return Ok(QuotaCheckOutcome::NoQuota);
981 }
982
983 // FCALL ff_check_admission_and_record on {q:K}
984 let keys: [&str; 5] = [&window_key, &concurrency_key, "a_def_key, &admitted_key, &admitted_set_key];
985 let now_s = now_ms.to_string();
986 let ws = window_secs.to_string();
987 let rl = rate_limit.to_string();
988 let cc = concurrency_cap.to_string();
989 let jt = jitter_ms.to_string();
990 let argv: [&str; 6] = [&now_s, &ws, &rl, &cc, eid_s, &jt];
991
992 match self.client
993 .fcall::<ferriskey::Value>("ff_check_admission_and_record", &keys, &argv)
994 .await
995 {
996 Ok(result) => {
997 // Parse domain-specific result: {"ADMITTED"}, {"RATE_EXCEEDED", retry_after},
998 // {"CONCURRENCY_EXCEEDED"}, {"ALREADY_ADMITTED"}
999 let status = Self::parse_admission_status(&result);
1000 match status.as_str() {
1001 "ADMITTED" | "ALREADY_ADMITTED" => Ok(QuotaCheckOutcome::Admitted {
1002 tag: tag.clone(),
1003 quota_id: quota_id_str.clone(),
1004 eid: eid_s.to_owned(),
1005 }),
1006 "RATE_EXCEEDED" => {
1007 // PR-94: quota admission block, labelled by
1008 // reason so rate-vs-concurrency waves are
1009 // distinguishable in a dashboard.
1010 self.metrics.inc_quota_hit("rate");
1011 Ok(QuotaCheckOutcome::Blocked(format!(
1012 "quota {}: rate limit {}/{} per {}s window",
1013 quota_id_str, rate_limit, rate_limit, window_secs
1014 )))
1015 }
1016 "CONCURRENCY_EXCEEDED" => {
1017 self.metrics.inc_quota_hit("concurrency");
1018 Ok(QuotaCheckOutcome::Blocked(format!(
1019 "quota {}: concurrency cap {}",
1020 quota_id_str, concurrency_cap
1021 )))
1022 }
1023 other => {
1024 // Fail-closed: an unrecognised status is the Lua
1025 // telling us a contract we don't understand. Do
1026 // NOT default to admit — surface it so the
1027 // scheduler retries next cycle and ops sees the
1028 // event.
1029 tracing::warn!(
1030 quota_id = quota_id_str.as_str(),
1031 status = other,
1032 "scheduler: unexpected admission result, denying (fail-closed)"
1033 );
1034 Err(SchedulerError::Config(format!(
1035 "quota {quota_id_str}: unexpected admission status \"{other}\""
1036 )))
1037 }
1038 }
1039 }
1040 Err(e) => {
1041 // Fail-closed: transport fault on the admission FCALL
1042 // must NOT silently admit the candidate. Propagate so
1043 // the outer cycle returns the error and the worker
1044 // retries after the usual backoff; the next cycle will
1045 // re-run the admission check against fresh state.
1046 tracing::warn!(
1047 quota_id = quota_id_str.as_str(),
1048 error = %e,
1049 "scheduler: quota FCALL failed, denying (fail-closed)"
1050 );
1051 Err(SchedulerError::ValkeyContext {
1052 source: e,
1053 context: format!("ff_check_admission_and_record {quota_id_str}"),
1054 })
1055 }
1056 }
1057 }
1058
1059 /// Parse the first element of a Valkey array result as a status string.
1060 fn parse_admission_status(result: &ferriskey::Value) -> String {
1061 match result {
1062 ferriskey::Value::Array(arr) => {
1063 match arr.first() {
1064 Some(Ok(ferriskey::Value::BulkString(b))) => {
1065 String::from_utf8_lossy(b).into_owned()
1066 }
1067 Some(Ok(ferriskey::Value::SimpleString(s))) => s.clone(),
1068 _ => "UNKNOWN".to_owned(),
1069 }
1070 }
1071 _ => "UNKNOWN".to_owned(),
1072 }
1073 }
1074
1075 /// Block a candidate that failed budget/quota check.
1076 /// FCALL ff_block_execution_for_admission on {p:N}.
1077 #[allow(clippy::too_many_arguments)]
1078 async fn block_candidate(
1079 &self,
1080 partition: &Partition,
1081 idx: &IndexKeys,
1082 lane: &LaneId,
1083 eid: &ExecutionId,
1084 eligible_key: &str,
1085 block_reason: &str,
1086 blocking_detail: &str,
1087 now_ms: u64,
1088 ) {
1089 let exec_ctx = ExecKeyContext::new(partition, eid);
1090 let core_key = exec_ctx.core();
1091 let eid_s = eid.to_string();
1092 let blocked_key = match block_reason {
1093 "waiting_for_budget" => idx.lane_blocked_budget(lane),
1094 "waiting_for_quota" => idx.lane_blocked_quota(lane),
1095 "waiting_for_capable_worker" => idx.lane_blocked_route(lane),
1096 _ => idx.lane_blocked_budget(lane),
1097 };
1098
1099 let keys: [&str; 3] = [&core_key, eligible_key, &blocked_key];
1100 let now_s = now_ms.to_string();
1101 let argv: [&str; 4] = [&eid_s, block_reason, blocking_detail, &now_s];
1102
1103 // Parse FcallResult so we distinguish Lua-level rejections (e.g.
1104 // execution_not_active because the execution went terminal between
1105 // our HGET and the FCALL) from a real block. Previously `Ok(_)`
1106 // treated an err-tuple as success → INFO log "candidate blocked"
1107 // while nothing actually changed on exec_core, then the next tick
1108 // re-picked the same candidate and looped. Mirrors the
1109 // release_admission parse fix.
1110 match self.client
1111 .fcall::<ferriskey::Value>("ff_block_execution_for_admission", &keys, &argv)
1112 .await
1113 {
1114 Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
1115 Ok(_) => {
1116 tracing::info!(
1117 execution_id = eid_s,
1118 reason = block_reason,
1119 "scheduler: candidate blocked by admission check"
1120 );
1121 }
1122 Err(script_err) => {
1123 // Logical reject from Lua (e.g. execution_not_active
1124 // — the execution went terminal between the scheduler
1125 // pick and the block FCALL; the candidate loop will
1126 // naturally move on). WARN so ops dashboards surface
1127 // actual block failures, but not so loud that a common
1128 // race spams alerts.
1129 tracing::warn!(
1130 execution_id = eid_s,
1131 reason = block_reason,
1132 error = %script_err,
1133 "scheduler: ff_block_execution_for_admission rejected by Lua"
1134 );
1135 }
1136 },
1137 Err(e) => {
1138 tracing::warn!(
1139 execution_id = eid_s,
1140 error = %e,
1141 "scheduler: ff_block_execution_for_admission transport failed"
1142 );
1143 }
1144 }
1145 }
1146
1147 /// Release a previously-recorded quota admission slot.
1148 /// Called when ff_issue_claim_grant fails after admission was recorded.
1149 async fn release_admission(
1150 &self,
1151 tag: &str,
1152 quota_id: &str,
1153 eid_s: &str,
1154 ) {
1155 let admitted_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid_s);
1156 let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
1157 let concurrency_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
1158
1159 let keys: [&str; 3] = [&admitted_key, &admitted_set_key, &concurrency_key];
1160 let argv: [&str; 1] = [eid_s];
1161
1162 // Parse the Lua response properly: FCALL returns `Ok(Value)` for
1163 // BOTH success and logical-error paths. Treating Ok(_) blindly as
1164 // "released" logs a false positive when the Lua returns
1165 // `{0, "quota_not_found"}` (or any other script-level err) — the
1166 // slot in fact remains pinned until its TTL expires, which is
1167 // minutes to hours. Surface the real outcome so on-call sees
1168 // actual release failures instead of clean "released" events.
1169 match self.client
1170 .fcall::<ferriskey::Value>("ff_release_admission", &keys, &argv)
1171 .await
1172 {
1173 Ok(v) => match FcallResult::parse(&v).and_then(|r| r.into_success()) {
1174 Ok(_) => {
1175 tracing::info!(
1176 execution_id = eid_s,
1177 quota_id,
1178 "scheduler: released admission after claim failure"
1179 );
1180 }
1181 Err(script_err) => {
1182 tracing::warn!(
1183 execution_id = eid_s,
1184 quota_id,
1185 error = %script_err,
1186 "scheduler: ff_release_admission rejected by Lua \
1187 (slot will expire via TTL)"
1188 );
1189 }
1190 },
1191 Err(e) => {
1192 tracing::warn!(
1193 execution_id = eid_s,
1194 quota_id,
1195 error = %e,
1196 "scheduler: ff_release_admission transport failed \
1197 (slot will expire via TTL)"
1198 );
1199 }
1200 }
1201 }
1202}
1203
1204/// Get server time in milliseconds via the TIME command.
1205async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
1206 let result: Vec<String> = client
1207 .cmd("TIME")
1208 .execute()
1209 .await?;
1210 if result.len() < 2 {
1211 return Err(ferriskey::Error::from((
1212 ferriskey::ErrorKind::ClientError,
1213 "TIME returned fewer than 2 elements",
1214 )));
1215 }
1216 let secs: u64 = result[0].parse().map_err(|_| {
1217 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
1218 })?;
1219 let micros: u64 = result[1].parse().map_err(|_| {
1220 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
1221 })?;
1222 Ok(secs * 1000 + micros / 1000)
1223}
1224
1225/// Errors from the scheduler.
1226#[derive(Debug, thiserror::Error)]
1227pub enum SchedulerError {
1228 /// Valkey connection or command error (preserves ErrorKind for caller inspection).
1229 #[error("valkey: {0}")]
1230 Valkey(#[from] ferriskey::Error),
1231 /// Valkey error with additional context (preserves ErrorKind via #[source]).
1232 #[error("valkey ({context}): {source}")]
1233 ValkeyContext {
1234 #[source]
1235 source: ferriskey::Error,
1236 context: String,
1237 },
1238 /// Caller-supplied value failed ingress validation. NOT retryable — the
1239 /// caller must fix its input before retrying.
1240 #[error("config: {0}")]
1241 Config(String),
1242}
1243
1244impl SchedulerError {
1245 /// Returns the underlying ferriskey ErrorKind, if this is a Valkey error.
1246 /// Matches `ScriptError::valkey_kind` so callers can treat both
1247 /// uniformly. (Note: `ServerError` exposed the same shape until #88
1248 /// renamed it to `ServerError::backend_kind` with a `BackendErrorKind`
1249 /// return; this scheduler-internal surface keeps the raw-ErrorKind
1250 /// name pending its own sealing.)
1251 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
1252 match self {
1253 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
1254 Self::Config(_) => None,
1255 }
1256 }
1257
1258 /// Whether this error is safely retryable by a caller. Mirrors
1259 /// `ServerError::is_retryable` semantics.
1260 pub fn is_retryable(&self) -> bool {
1261 self.valkey_kind()
1262 .map(is_retryable_kind)
1263 .unwrap_or(false)
1264 }
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269 use super::*;
1270 use ferriskey::ErrorKind;
1271
1272 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
1273 ferriskey::Error::from((kind, "synthetic"))
1274 }
1275
1276 #[test]
1277 fn scheduler_is_retryable_matches_kind_table() {
1278 assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
1279 assert!(SchedulerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
1280
1281 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
1282 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
1283 assert!(!SchedulerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
1284 }
1285
1286 #[test]
1287 fn scheduler_valkey_context_is_retryable() {
1288 let err = SchedulerError::ValkeyContext {
1289 source: mk_fk_err(ErrorKind::BusyLoadingError),
1290 context: "HGET budget_ids".into(),
1291 };
1292 assert!(err.is_retryable());
1293 }
1294
1295 #[test]
1296 fn scheduler_valkey_kind_exposed() {
1297 let err = SchedulerError::Valkey(mk_fk_err(ErrorKind::TryAgain));
1298 assert_eq!(err.valkey_kind(), Some(ErrorKind::TryAgain));
1299 }
1300
1301 // ── iter_partitions: regression test for the modular wrap + bounded
1302 // length contract. The fairness behaviour of the bounded scheduler
1303 // scan depends entirely on this helper returning exactly `count`
1304 // distinct partition indices starting at `start` and wrapping modulo
1305 // `total`. Tested in isolation so a bug here is distinguishable from
1306 // a bug in the Valkey-backed loop. ──
1307
1308 #[test]
1309 fn iter_partitions_no_wrap() {
1310 // start=10, count=5, total=256 → 10..15, no wrap involved.
1311 let ps: Vec<u16> = iter_partitions(256, 10, 5).collect();
1312 assert_eq!(ps, vec![10, 11, 12, 13, 14]);
1313 }
1314
1315 #[test]
1316 fn iter_partitions_wraps_modulo_total() {
1317 // start=254, count=5, total=256 → 254, 255, 0, 1, 2.
1318 let ps: Vec<u16> = iter_partitions(256, 254, 5).collect();
1319 assert_eq!(ps, vec![254, 255, 0, 1, 2]);
1320 }
1321
1322 #[test]
1323 fn iter_partitions_count_capped_to_total() {
1324 // Asking for more than `total` yields exactly `total` distinct
1325 // partitions — never a duplicate, never more than the universe.
1326 let ps: Vec<u16> = iter_partitions(4, 1, 100).collect();
1327 assert_eq!(ps, vec![1, 2, 3, 0]);
1328 }
1329
1330 #[test]
1331 fn iter_partitions_length_matches_count() {
1332 // Invariant: output length == min(count, total). The scan loop
1333 // upper-bounds round-trips on this, so regressing it would
1334 // silently re-introduce the 256-round-trip-per-tick bug.
1335 for start in [0u16, 1, 50, 255] {
1336 for count in [0u16, 1, 16, 32, 256] {
1337 let len = iter_partitions(256, start, count).count();
1338 assert_eq!(len, count.min(256) as usize);
1339 }
1340 }
1341 }
1342
1343 // ── Fairness: the union of the partitions visited across
1344 // `ceil(total / max_partitions_per_scan)` successive scans, with the
1345 // rotation cursor advancing each scan, must cover every partition
1346 // exactly once. This is the contract the operator rustdoc promises
1347 // ("any given partition reached within 8 ticks at defaults"). ──
1348 #[test]
1349 fn fairness_full_coverage_in_ceil_total_over_budget_scans() {
1350 const TOTAL: u16 = 256;
1351 const BUDGET: u16 = SchedulerConfig::DEFAULT_MAX_PARTITIONS_PER_SCAN;
1352 let scans = TOTAL.div_ceil(BUDGET); // 8 at defaults
1353
1354 // Simulate the same advance logic the live cursor performs: each
1355 // "scan" starts at the previous start + BUDGET (mod TOTAL). We
1356 // pin start to 0 for determinism; the per-worker FNV jitter is a
1357 // phase offset on top and doesn't change coverage.
1358 let mut union = std::collections::BTreeSet::new();
1359 let mut cursor: u16 = 0;
1360 for _ in 0..scans {
1361 for p in iter_partitions(TOTAL, cursor, BUDGET) {
1362 union.insert(p);
1363 }
1364 cursor = cursor.wrapping_add(BUDGET) % TOTAL;
1365 }
1366
1367 assert_eq!(union.len(), TOTAL as usize, "every partition visited once");
1368 for p in 0..TOTAL {
1369 assert!(union.contains(&p), "missing partition {p}");
1370 }
1371 }
1372
1373 #[test]
1374 fn fairness_full_coverage_with_phase_offset() {
1375 // Regression: the per-worker FNV phase must not change the
1376 // coverage property. Pick a non-zero start; we still cover the
1377 // whole universe in ceil(total/budget) scans.
1378 const TOTAL: u16 = 256;
1379 const BUDGET: u16 = SchedulerConfig::DEFAULT_MAX_PARTITIONS_PER_SCAN;
1380 let scans = TOTAL.div_ceil(BUDGET);
1381
1382 let mut union = std::collections::BTreeSet::new();
1383 let mut cursor: u16 = 137; // arbitrary per-worker jitter
1384 for _ in 0..scans {
1385 for p in iter_partitions(TOTAL, cursor, BUDGET) {
1386 union.insert(p);
1387 }
1388 cursor = cursor.wrapping_add(BUDGET) % TOTAL;
1389 }
1390 assert_eq!(union.len(), TOTAL as usize);
1391 }
1392
1393 #[test]
1394 fn scheduler_config_defaults_match_rustdoc() {
1395 let c = SchedulerConfig::default();
1396 assert_eq!(c.max_partitions_per_scan, 32);
1397 assert_eq!(c.rotation_window_ms, 250);
1398 }
1399}
1400
1401
1402