Skip to main content

ftui_runtime/
queueing_scheduler.rs

1#![forbid(unsafe_code)]
2
3//! Queueing Theory Scheduler with SRPT/Smith-Rule Style Scheduling (bd-13pq.7).
4//!
5//! This module provides a fair, work-conserving task scheduler based on queueing theory
6//! principles. It implements variants of SRPT (Shortest Remaining Processing Time) with
7//! fairness constraints to prevent starvation.
8//!
9//! # Mathematical Model
10//!
11//! ## Scheduling Disciplines
12//!
13//! 1. **SRPT (Shortest Remaining Processing Time)**
14//!    - Optimal for minimizing mean response time in M/G/1 queues
15//!    - Preempts current job if a shorter job arrives
16//!    - Problem: Can starve long jobs indefinitely
17//!
18//! 2. **Smith's Rule (Weighted SRPT)**
19//!    - Priority = weight / remaining_time
20//!    - Maximizes weighted throughput
21//!    - Still suffers from starvation
22//!
23//! 3. **Fair SRPT (this implementation)**
24//!    - Uses aging: priority increases with wait time
25//!    - Ensures bounded wait time for all jobs
26//!    - Trade-off: slightly worse mean response time for bounded starvation
27//!
28//! ## Queue Discipline
29//!
30//! Jobs are ordered by effective priority:
31//! ```text
32//! priority = (weight / remaining_time) + aging_factor * wait_time
33//! ```
34//!
35//! This combines:
36//! - Smith's rule: `weight / remaining_time`
37//! - Aging: linear increase with wait time
38//!
39//! ## Fairness Guarantee (Aging-Based)
40//!
41//! With aging factor `a` and maximum job size `S_max`:
42//! ```text
43//! max_wait <= S_max * (1 + 1/a) / min_weight
44//! ```
45//!
46//! # Key Invariants
47//!
48//! 1. **Work-conserving**: Server never idles when queue is non-empty
49//! 2. **Priority ordering**: Queue is always sorted by effective priority
50//! 3. **Bounded starvation**: All jobs complete within bounded time
51//! 4. **Monotonic aging**: Wait time only increases while in queue
52//!
53//! # Failure Modes
54//!
55//! | Condition | Behavior | Rationale |
56//! |-----------|----------|-----------|
57//! | Zero weight | Use minimum weight | Prevent infinite priority |
58//! | Zero remaining time | Complete immediately | Job is done |
59//! | Queue overflow | Reject new jobs | Bounded memory |
60//! | Clock drift | Use monotonic time | Avoid priority inversions |
61
62use std::cmp::Ordering;
63use std::collections::BinaryHeap;
64use std::fmt::Write;
65
66/// Default aging factor (0.1 = job gains priority of 1 unit after 10 time units).
67const DEFAULT_AGING_FACTOR: f64 = 0.1;
68
69/// Maximum queue size.
70const MAX_QUEUE_SIZE: usize = 10_000;
71
72/// Default minimum processing-time estimate (ms).
73const DEFAULT_P_MIN_MS: f64 = 0.05;
74
75/// Default maximum processing-time estimate (ms).
76const DEFAULT_P_MAX_MS: f64 = 5_000.0;
77
78/// Default minimum weight to prevent division issues.
79const DEFAULT_W_MIN: f64 = 1e-6;
80
81/// Default maximum weight cap.
82const DEFAULT_W_MAX: f64 = 100.0;
83
84/// Default weight when the source is `Default`.
85const DEFAULT_WEIGHT_DEFAULT: f64 = 1.0;
86
87/// Default weight when the source is `Unknown`.
88const DEFAULT_WEIGHT_UNKNOWN: f64 = 1.0;
89
90/// Default estimate (ms) when the source is `Default`.
91const DEFAULT_ESTIMATE_DEFAULT_MS: f64 = 10.0;
92
93/// Default estimate (ms) when the source is `Unknown`.
94const DEFAULT_ESTIMATE_UNKNOWN_MS: f64 = 1_000.0;
95
96/// Default starvation guard threshold (ms). 0 disables the guard.
97const DEFAULT_WAIT_STARVE_MS: f64 = 500.0;
98
99/// Default multiplier applied when starvation guard triggers.
100const DEFAULT_STARVE_BOOST_RATIO: f64 = 1.5;
101
102/// Configuration for the scheduler.
103#[derive(Debug, Clone)]
104pub struct SchedulerConfig {
105    /// Aging factor: how fast priority increases with wait time.
106    /// Higher = faster aging = more fairness, less optimality.
107    /// Default: 0.1.
108    pub aging_factor: f64,
109
110    /// Minimum processing-time estimate (ms). Default: 0.05.
111    pub p_min_ms: f64,
112
113    /// Maximum processing-time estimate (ms). Default: 5000.
114    pub p_max_ms: f64,
115
116    /// Default estimate (ms) when estimate source is `Default`.
117    /// Default: 10.0.
118    pub estimate_default_ms: f64,
119
120    /// Default estimate (ms) when estimate source is `Unknown`.
121    /// Default: 1000.0.
122    pub estimate_unknown_ms: f64,
123
124    /// Minimum weight clamp. Default: 1e-6.
125    pub w_min: f64,
126
127    /// Maximum weight clamp. Default: 100.
128    pub w_max: f64,
129
130    /// Default weight when weight source is `Default`.
131    /// Default: 1.0.
132    pub weight_default: f64,
133
134    /// Default weight when weight source is `Unknown`.
135    /// Default: 1.0.
136    pub weight_unknown: f64,
137
138    /// Starvation guard threshold (ms). 0 disables the guard. Default: 500.
139    pub wait_starve_ms: f64,
140
141    /// Multiplier applied to base ratio when starvation guard triggers.
142    /// Default: 1.5.
143    pub starve_boost_ratio: f64,
144
145    /// Enable Smith-rule weighting. If false, behaves like SRPT.
146    pub smith_enabled: bool,
147
148    /// Force FIFO ordering (arrival sequence) regardless of other settings.
149    /// Useful for safety overrides and debugging.
150    pub force_fifo: bool,
151
152    /// Maximum queue size. Default: 10_000.
153    pub max_queue_size: usize,
154
155    /// Enable preemption. Default: true.
156    pub preemptive: bool,
157
158    /// Time quantum for round-robin fallback (when priorities are equal).
159    /// Default: 10.0.
160    pub time_quantum: f64,
161
162    /// Enable logging. Default: false.
163    pub enable_logging: bool,
164}
165
166impl Default for SchedulerConfig {
167    fn default() -> Self {
168        Self {
169            aging_factor: DEFAULT_AGING_FACTOR,
170            p_min_ms: DEFAULT_P_MIN_MS,
171            p_max_ms: DEFAULT_P_MAX_MS,
172            estimate_default_ms: DEFAULT_ESTIMATE_DEFAULT_MS,
173            estimate_unknown_ms: DEFAULT_ESTIMATE_UNKNOWN_MS,
174            w_min: DEFAULT_W_MIN,
175            w_max: DEFAULT_W_MAX,
176            weight_default: DEFAULT_WEIGHT_DEFAULT,
177            weight_unknown: DEFAULT_WEIGHT_UNKNOWN,
178            wait_starve_ms: DEFAULT_WAIT_STARVE_MS,
179            starve_boost_ratio: DEFAULT_STARVE_BOOST_RATIO,
180            smith_enabled: true,
181            force_fifo: false,
182            max_queue_size: MAX_QUEUE_SIZE,
183            preemptive: true,
184            time_quantum: 10.0,
185            enable_logging: false,
186        }
187    }
188}
189
190impl SchedulerConfig {
191    /// Effective scheduling mode with FIFO override.
192    pub fn mode(&self) -> SchedulingMode {
193        if self.force_fifo {
194            SchedulingMode::Fifo
195        } else if self.smith_enabled {
196            SchedulingMode::Smith
197        } else {
198            SchedulingMode::Srpt
199        }
200    }
201}
202
203/// A job in the queue.
204#[derive(Debug, Clone)]
205pub struct Job {
206    /// Unique job identifier.
207    pub id: u64,
208
209    /// Job weight (importance). Higher = more priority.
210    pub weight: f64,
211
212    /// Estimated remaining processing time.
213    pub remaining_time: f64,
214
215    /// Original estimated total time.
216    pub total_time: f64,
217
218    /// Time when job was submitted.
219    pub arrival_time: f64,
220
221    /// Monotonic arrival sequence (tie-breaker).
222    pub arrival_seq: u64,
223
224    /// Source of processing-time estimate.
225    pub estimate_source: EstimateSource,
226
227    /// Source of weight/importance.
228    pub weight_source: WeightSource,
229
230    /// Optional job name for debugging.
231    pub name: Option<String>,
232}
233
234impl Job {
235    /// Create a new job with given ID, weight, and estimated time.
236    pub fn new(id: u64, weight: f64, estimated_time: f64) -> Self {
237        let weight = if weight.is_nan() {
238            DEFAULT_W_MIN
239        } else if weight.is_infinite() {
240            if weight.is_sign_positive() {
241                DEFAULT_W_MAX
242            } else {
243                DEFAULT_W_MIN
244            }
245        } else {
246            weight.clamp(DEFAULT_W_MIN, DEFAULT_W_MAX)
247        };
248        let estimated_time = if estimated_time.is_nan() {
249            DEFAULT_P_MAX_MS
250        } else if estimated_time.is_infinite() {
251            if estimated_time.is_sign_positive() {
252                DEFAULT_P_MAX_MS
253            } else {
254                DEFAULT_P_MIN_MS
255            }
256        } else {
257            estimated_time.clamp(DEFAULT_P_MIN_MS, DEFAULT_P_MAX_MS)
258        };
259        Self {
260            id,
261            weight,
262            remaining_time: estimated_time,
263            total_time: estimated_time,
264            arrival_time: 0.0,
265            arrival_seq: 0,
266            estimate_source: EstimateSource::Explicit,
267            weight_source: WeightSource::Explicit,
268            name: None,
269        }
270    }
271
272    /// Create a job with a name.
273    pub fn with_name(id: u64, weight: f64, estimated_time: f64, name: impl Into<String>) -> Self {
274        let mut job = Self::new(id, weight, estimated_time);
275        job.name = Some(name.into());
276        job
277    }
278
279    /// Set estimate and weight sources.
280    pub fn with_sources(
281        mut self,
282        weight_source: WeightSource,
283        estimate_source: EstimateSource,
284    ) -> Self {
285        self.weight_source = weight_source;
286        self.estimate_source = estimate_source;
287        self
288    }
289
290    /// Fraction of job completed.
291    pub fn progress(&self) -> f64 {
292        if self.total_time <= 0.0 {
293            1.0
294        } else {
295            1.0 - (self.remaining_time / self.total_time).clamp(0.0, 1.0)
296        }
297    }
298
299    /// Is the job complete?
300    pub fn is_complete(&self) -> bool {
301        self.remaining_time <= 0.0
302    }
303}
304
305/// Priority wrapper for the binary heap (max-heap, so we negate priority).
306#[derive(Debug, Clone)]
307struct PriorityJob {
308    priority: f64,
309    base_ratio: f64,
310    job: Job,
311    mode: SchedulingMode,
312}
313
314impl PartialEq for PriorityJob {
315    fn eq(&self, other: &Self) -> bool {
316        self.job.id == other.job.id
317    }
318}
319
320impl Eq for PriorityJob {}
321
322impl PartialOrd for PriorityJob {
323    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
324        Some(self.cmp(other))
325    }
326}
327
328impl Ord for PriorityJob {
329    fn cmp(&self, other: &Self) -> Ordering {
330        if self.mode == SchedulingMode::Fifo || other.mode == SchedulingMode::Fifo {
331            return other
332                .job
333                .arrival_seq
334                .cmp(&self.job.arrival_seq)
335                .then_with(|| other.job.id.cmp(&self.job.id));
336        }
337        // Higher priority comes first (max-heap)
338        self.priority
339            .total_cmp(&other.priority)
340            // Tie-break 1: base ratio (w/p)
341            .then_with(|| self.base_ratio.total_cmp(&other.base_ratio))
342            // Tie-break 2: higher weight
343            .then_with(|| self.job.weight.total_cmp(&other.job.weight))
344            // Tie-break 3: shorter remaining time
345            .then_with(|| other.job.remaining_time.total_cmp(&self.job.remaining_time))
346            // Tie-break 4: earlier arrival sequence
347            .then_with(|| other.job.arrival_seq.cmp(&self.job.arrival_seq))
348            // Tie-break 5: lower job id
349            .then_with(|| other.job.id.cmp(&self.job.id))
350    }
351}
352
353/// Evidence for scheduling decisions.
354#[derive(Debug, Clone)]
355pub struct SchedulingEvidence {
356    /// Current time.
357    pub current_time: f64,
358
359    /// Selected job ID (if any).
360    pub selected_job_id: Option<u64>,
361
362    /// Queue length.
363    pub queue_length: usize,
364
365    /// Mean wait time in queue.
366    pub mean_wait_time: f64,
367
368    /// Max wait time in queue.
369    pub max_wait_time: f64,
370
371    /// Reason for selection.
372    pub reason: SelectionReason,
373
374    /// Tie-break reason for the selected job (if applicable).
375    pub tie_break_reason: Option<TieBreakReason>,
376
377    /// Per-job evidence entries (ordered by scheduler priority).
378    pub jobs: Vec<JobEvidence>,
379}
380
381/// Evidence entry for a single job in the queue.
382#[derive(Debug, Clone)]
383pub struct JobEvidence {
384    /// Job id.
385    pub job_id: u64,
386    /// Optional name.
387    pub name: Option<String>,
388    /// Processing-time estimate (ms).
389    pub estimate_ms: f64,
390    /// Weight (importance).
391    pub weight: f64,
392    /// Base ratio (w/p).
393    pub ratio: f64,
394    /// Age in queue (ms).
395    pub age_ms: f64,
396    /// Effective priority (ratio + aging, with starvation guard).
397    pub effective_priority: f64,
398    /// Estimate source.
399    pub estimate_source: EstimateSource,
400    /// Weight source.
401    pub weight_source: WeightSource,
402}
403
404/// Reason for job selection.
405#[derive(Debug, Clone, Copy, PartialEq, Eq)]
406pub enum SelectionReason {
407    /// No jobs in queue.
408    QueueEmpty,
409    /// Selected by SRPT (shortest remaining time).
410    ShortestRemaining,
411    /// Selected by Smith's rule (weight/time).
412    HighestWeightedPriority,
413    /// Selected by FIFO override.
414    Fifo,
415    /// Selected due to aging (waited too long).
416    AgingBoost,
417    /// Continued from preemption.
418    Continuation,
419}
420
421impl SelectionReason {
422    fn as_str(self) -> &'static str {
423        match self {
424            Self::QueueEmpty => "queue_empty",
425            Self::ShortestRemaining => "shortest_remaining",
426            Self::HighestWeightedPriority => "highest_weighted_priority",
427            Self::Fifo => "fifo",
428            Self::AgingBoost => "aging_boost",
429            Self::Continuation => "continuation",
430        }
431    }
432}
433
434/// Source of a processing-time estimate.
435#[derive(Debug, Clone, Copy, PartialEq, Eq)]
436pub enum EstimateSource {
437    /// Explicit estimate provided by caller.
438    Explicit,
439    /// Historical estimate derived from prior runs.
440    Historical,
441    /// Default estimate (fallback).
442    Default,
443    /// Unknown estimate (no data).
444    Unknown,
445}
446
447impl EstimateSource {
448    fn as_str(self) -> &'static str {
449        match self {
450            Self::Explicit => "explicit",
451            Self::Historical => "historical",
452            Self::Default => "default",
453            Self::Unknown => "unknown",
454        }
455    }
456}
457
458/// Source of a weight/importance value.
459#[derive(Debug, Clone, Copy, PartialEq, Eq)]
460pub enum WeightSource {
461    /// Explicit weight provided by caller.
462    Explicit,
463    /// Default weight (fallback).
464    Default,
465    /// Unknown weight (no data).
466    Unknown,
467}
468
469impl WeightSource {
470    fn as_str(self) -> &'static str {
471        match self {
472            Self::Explicit => "explicit",
473            Self::Default => "default",
474            Self::Unknown => "unknown",
475        }
476    }
477}
478
479/// Tie-break reason for ordering decisions.
480#[derive(Debug, Clone, Copy, PartialEq, Eq)]
481pub enum TieBreakReason {
482    /// Effective priority (base ratio + aging) decided.
483    EffectivePriority,
484    /// Base ratio (w/p) decided.
485    BaseRatio,
486    /// Weight decided.
487    Weight,
488    /// Remaining time decided.
489    RemainingTime,
490    /// Arrival sequence decided.
491    ArrivalSeq,
492    /// Job id decided.
493    JobId,
494    /// Continued current job without comparison.
495    Continuation,
496}
497
498impl TieBreakReason {
499    fn as_str(self) -> &'static str {
500        match self {
501            Self::EffectivePriority => "effective_priority",
502            Self::BaseRatio => "base_ratio",
503            Self::Weight => "weight",
504            Self::RemainingTime => "remaining_time",
505            Self::ArrivalSeq => "arrival_seq",
506            Self::JobId => "job_id",
507            Self::Continuation => "continuation",
508        }
509    }
510}
511
512/// Effective scheduling discipline.
513#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum SchedulingMode {
515    /// Smith's rule (weight / remaining time).
516    Smith,
517    /// SRPT (shortest remaining processing time).
518    Srpt,
519    /// FIFO (arrival order).
520    Fifo,
521}
522
523impl SchedulingEvidence {
524    /// Serialize scheduling evidence to JSONL with the supplied event tag.
525    #[must_use]
526    pub fn to_jsonl(&self, event: &str) -> String {
527        let mut out = String::with_capacity(256 + (self.jobs.len() * 64));
528        out.push_str("{\"event\":\"");
529        out.push_str(&escape_json(event));
530        out.push_str("\",\"current_time\":");
531        let _ = write!(out, "{:.6}", self.current_time);
532        out.push_str(",\"selected_job_id\":");
533        match self.selected_job_id {
534            Some(id) => {
535                let _ = write!(out, "{id}");
536            }
537            None => out.push_str("null"),
538        }
539        out.push_str(",\"queue_length\":");
540        let _ = write!(out, "{}", self.queue_length);
541        out.push_str(",\"mean_wait_time\":");
542        let _ = write!(out, "{:.6}", self.mean_wait_time);
543        out.push_str(",\"max_wait_time\":");
544        let _ = write!(out, "{:.6}", self.max_wait_time);
545        out.push_str(",\"reason\":\"");
546        out.push_str(self.reason.as_str());
547        out.push('"');
548        out.push_str(",\"tie_break_reason\":");
549        match self.tie_break_reason {
550            Some(reason) => {
551                out.push('"');
552                out.push_str(reason.as_str());
553                out.push('"');
554            }
555            None => out.push_str("null"),
556        }
557        out.push_str(",\"jobs\":[");
558        for (idx, job) in self.jobs.iter().enumerate() {
559            if idx > 0 {
560                out.push(',');
561            }
562            out.push_str(&job.to_json());
563        }
564        out.push_str("]}");
565        out
566    }
567}
568
569impl JobEvidence {
570    fn to_json(&self) -> String {
571        let mut out = String::with_capacity(128);
572        out.push_str("{\"job_id\":");
573        let _ = write!(out, "{}", self.job_id);
574        out.push_str(",\"name\":");
575        match &self.name {
576            Some(name) => {
577                out.push('"');
578                out.push_str(&escape_json(name));
579                out.push('"');
580            }
581            None => out.push_str("null"),
582        }
583        out.push_str(",\"estimate_ms\":");
584        let _ = write!(out, "{:.6}", self.estimate_ms);
585        out.push_str(",\"weight\":");
586        let _ = write!(out, "{:.6}", self.weight);
587        out.push_str(",\"ratio\":");
588        let _ = write!(out, "{:.6}", self.ratio);
589        out.push_str(",\"age_ms\":");
590        let _ = write!(out, "{:.6}", self.age_ms);
591        out.push_str(",\"effective_priority\":");
592        let _ = write!(out, "{:.6}", self.effective_priority);
593        out.push_str(",\"estimate_source\":\"");
594        out.push_str(self.estimate_source.as_str());
595        out.push('"');
596        out.push_str(",\"weight_source\":\"");
597        out.push_str(self.weight_source.as_str());
598        out.push('"');
599        out.push('}');
600        out
601    }
602}
603
604fn escape_json(input: &str) -> String {
605    let mut out = String::with_capacity(input.len() + 8);
606    for ch in input.chars() {
607        match ch {
608            '"' => out.push_str("\\\""),
609            '\\' => out.push_str("\\\\"),
610            '\n' => out.push_str("\\n"),
611            '\r' => out.push_str("\\r"),
612            '\t' => out.push_str("\\t"),
613            '\u{08}' => out.push_str("\\b"),
614            '\u{0C}' => out.push_str("\\f"),
615            c if c < ' ' => {
616                let _ = write!(out, "\\u{:04x}", c as u32);
617            }
618            _ => out.push(ch),
619        }
620    }
621    out
622}
623
624/// Scheduler statistics.
625#[derive(Debug, Clone, Default)]
626pub struct SchedulerStats {
627    /// Total jobs submitted.
628    pub total_submitted: u64,
629
630    /// Total jobs completed.
631    pub total_completed: u64,
632
633    /// Total jobs rejected (queue full).
634    pub total_rejected: u64,
635
636    /// Total preemptions.
637    pub total_preemptions: u64,
638
639    /// Total time processing.
640    pub total_processing_time: f64,
641
642    /// Sum of response times (for mean calculation).
643    pub total_response_time: f64,
644
645    /// Max response time observed.
646    pub max_response_time: f64,
647
648    /// Current queue length.
649    pub queue_length: usize,
650}
651
652impl SchedulerStats {
653    /// Mean response time.
654    pub fn mean_response_time(&self) -> f64 {
655        if self.total_completed > 0 {
656            self.total_response_time / self.total_completed as f64
657        } else {
658            0.0
659        }
660    }
661
662    /// Throughput (jobs per time unit).
663    pub fn throughput(&self) -> f64 {
664        if self.total_processing_time > 0.0 {
665            self.total_completed as f64 / self.total_processing_time
666        } else {
667            0.0
668        }
669    }
670}
671
672/// Queueing theory scheduler with fair SRPT.
673#[derive(Debug)]
674pub struct QueueingScheduler {
675    config: SchedulerConfig,
676
677    /// Priority queue of jobs.
678    queue: BinaryHeap<PriorityJob>,
679
680    /// Currently running job (if preemptive and processing).
681    current_job: Option<Job>,
682
683    /// Current simulation time.
684    current_time: f64,
685
686    /// Next job ID.
687    next_job_id: u64,
688
689    /// Next arrival sequence number.
690    next_arrival_seq: u64,
691
692    /// Statistics.
693    stats: SchedulerStats,
694}
695
696impl QueueingScheduler {
697    /// Create a new scheduler with given configuration.
698    pub fn new(config: SchedulerConfig) -> Self {
699        Self {
700            config,
701            queue: BinaryHeap::new(),
702            current_job: None,
703            current_time: 0.0,
704            next_job_id: 1,
705            next_arrival_seq: 1,
706            stats: SchedulerStats::default(),
707        }
708    }
709
710    /// Submit a new job to the scheduler.
711    ///
712    /// Returns the job ID if accepted, None if rejected (queue full).
713    pub fn submit(&mut self, weight: f64, estimated_time: f64) -> Option<u64> {
714        self.submit_named(weight, estimated_time, None::<&str>)
715    }
716
717    /// Submit a named job.
718    pub fn submit_named(
719        &mut self,
720        weight: f64,
721        estimated_time: f64,
722        name: Option<impl Into<String>>,
723    ) -> Option<u64> {
724        self.submit_with_sources(
725            weight,
726            estimated_time,
727            WeightSource::Explicit,
728            EstimateSource::Explicit,
729            name,
730        )
731    }
732
733    /// Submit a job with explicit estimate/weight sources for evidence logging.
734    pub fn submit_with_sources(
735        &mut self,
736        weight: f64,
737        estimated_time: f64,
738        weight_source: WeightSource,
739        estimate_source: EstimateSource,
740        name: Option<impl Into<String>>,
741    ) -> Option<u64> {
742        if self.queue.len() >= self.config.max_queue_size {
743            self.stats.total_rejected += 1;
744            return None;
745        }
746
747        let id = self.next_job_id;
748        self.next_job_id += 1;
749
750        let mut job =
751            Job::new(id, weight, estimated_time).with_sources(weight_source, estimate_source);
752        job.weight = self.normalize_weight_with_source(job.weight, job.weight_source);
753        job.remaining_time =
754            self.normalize_time_with_source(job.remaining_time, job.estimate_source);
755        job.total_time = job.remaining_time;
756        job.arrival_time = self.current_time;
757        job.arrival_seq = self.next_arrival_seq;
758        self.next_arrival_seq += 1;
759        if let Some(n) = name {
760            job.name = Some(n.into());
761        }
762
763        let priority_job = self.make_priority_job(job);
764        self.queue.push(priority_job);
765
766        self.stats.total_submitted += 1;
767        self.stats.queue_length = self.queue.len();
768
769        // Check for preemption
770        if self.config.preemptive {
771            self.maybe_preempt();
772        }
773
774        Some(id)
775    }
776
777    /// Advance time by the given amount and process jobs.
778    ///
779    /// Returns a list of completed job IDs.
780    pub fn tick(&mut self, delta_time: f64) -> Vec<u64> {
781        let mut completed = Vec::new();
782        if delta_time <= 0.0 {
783            return completed;
784        }
785
786        let mut remaining_time = delta_time;
787        let mut now = self.current_time;
788        let mut processed_time = 0.0;
789
790        while remaining_time > 0.0 {
791            // Get or select next job
792            let Some(mut job) = (if let Some(j) = self.current_job.take() {
793                Some(j)
794            } else {
795                self.queue.pop().map(|pj| pj.job)
796            }) else {
797                now += remaining_time;
798                break; // Queue empty
799            };
800
801            // Process job
802            let process_time = remaining_time.min(job.remaining_time);
803            job.remaining_time -= process_time;
804            remaining_time -= process_time;
805            now += process_time;
806            processed_time += process_time;
807
808            if job.is_complete() {
809                // Job completed
810                let response_time = now - job.arrival_time;
811                self.stats.total_response_time += response_time;
812                self.stats.max_response_time = self.stats.max_response_time.max(response_time);
813                self.stats.total_completed += 1;
814                completed.push(job.id);
815            } else {
816                // Job not complete, save for next tick
817                self.current_job = Some(job);
818            }
819        }
820
821        self.stats.total_processing_time += processed_time;
822        self.current_time = now;
823        // Recompute priorities for aged jobs
824        self.refresh_priorities();
825
826        self.stats.queue_length = self.queue.len();
827        completed
828    }
829
830    /// Select the next job to run without advancing time.
831    pub fn peek_next(&self) -> Option<&Job> {
832        self.current_job
833            .as_ref()
834            .or_else(|| self.queue.peek().map(|pj| &pj.job))
835    }
836
837    /// Get scheduling evidence for the current state.
838    pub fn evidence(&self) -> SchedulingEvidence {
839        let (mean_wait, max_wait) = self.compute_wait_stats();
840
841        let mut candidates: Vec<PriorityJob> = self
842            .queue
843            .iter()
844            .map(|pj| self.make_priority_job(pj.job.clone()))
845            .collect();
846
847        if let Some(ref current) = self.current_job {
848            candidates.push(self.make_priority_job(current.clone()));
849        }
850
851        candidates.sort_by(|a, b| b.cmp(a));
852
853        let selected_job_id = if let Some(ref current) = self.current_job {
854            Some(current.id)
855        } else {
856            candidates.first().map(|pj| pj.job.id)
857        };
858
859        let tie_break_reason = if self.current_job.is_some() {
860            Some(TieBreakReason::Continuation)
861        } else if candidates.len() > 1 {
862            Some(self.tie_break_reason(&candidates[0], &candidates[1]))
863        } else {
864            None
865        };
866
867        let reason = if self.queue.is_empty() && self.current_job.is_none() {
868            SelectionReason::QueueEmpty
869        } else if self.current_job.is_some() {
870            SelectionReason::Continuation
871        } else if self.config.mode() == SchedulingMode::Fifo {
872            SelectionReason::Fifo
873        } else if let Some(pj) = candidates.first() {
874            let wait_time = (self.current_time - pj.job.arrival_time).max(0.0);
875            let aging_contribution = self.config.aging_factor * wait_time;
876            let aging_boost = (self.config.wait_starve_ms > 0.0
877                && wait_time >= self.config.wait_starve_ms)
878                || aging_contribution > pj.base_ratio * 0.5;
879            if aging_boost {
880                SelectionReason::AgingBoost
881            } else if self.config.smith_enabled && pj.job.weight > 1.0 {
882                SelectionReason::HighestWeightedPriority
883            } else {
884                SelectionReason::ShortestRemaining
885            }
886        } else {
887            SelectionReason::QueueEmpty
888        };
889
890        let jobs = candidates
891            .iter()
892            .map(|pj| {
893                let age_ms = (self.current_time - pj.job.arrival_time).max(0.0);
894                JobEvidence {
895                    job_id: pj.job.id,
896                    name: pj.job.name.clone(),
897                    estimate_ms: pj.job.remaining_time,
898                    weight: pj.job.weight,
899                    ratio: pj.base_ratio,
900                    age_ms,
901                    effective_priority: pj.priority,
902                    estimate_source: pj.job.estimate_source,
903                    weight_source: pj.job.weight_source,
904                }
905            })
906            .collect();
907
908        SchedulingEvidence {
909            current_time: self.current_time,
910            selected_job_id,
911            queue_length: self.queue.len() + if self.current_job.is_some() { 1 } else { 0 },
912            mean_wait_time: mean_wait,
913            max_wait_time: max_wait,
914            reason,
915            tie_break_reason,
916            jobs,
917        }
918    }
919
920    /// Get current statistics.
921    pub fn stats(&self) -> SchedulerStats {
922        let mut stats = self.stats.clone();
923        stats.queue_length = self.queue.len() + if self.current_job.is_some() { 1 } else { 0 };
924        stats
925    }
926
927    /// Cancel a job by ID.
928    pub fn cancel(&mut self, job_id: u64) -> bool {
929        // Check current job
930        if let Some(ref j) = self.current_job
931            && j.id == job_id
932        {
933            self.current_job = None;
934            self.stats.queue_length = self.queue.len();
935            return true;
936        }
937
938        // Remove from queue (rebuild without the job)
939        let old_len = self.queue.len();
940        let jobs: Vec<_> = self
941            .queue
942            .drain()
943            .filter(|pj| pj.job.id != job_id)
944            .collect();
945        self.queue = jobs.into_iter().collect();
946
947        self.stats.queue_length = self.queue.len();
948        old_len != self.queue.len()
949    }
950
951    /// Clear all jobs.
952    pub fn clear(&mut self) {
953        self.queue.clear();
954        self.current_job = None;
955        self.stats.queue_length = 0;
956    }
957
958    /// Reset scheduler state.
959    pub fn reset(&mut self) {
960        self.queue.clear();
961        self.current_job = None;
962        self.current_time = 0.0;
963        self.next_job_id = 1;
964        self.next_arrival_seq = 1;
965        self.stats = SchedulerStats::default();
966    }
967
968    // --- Internal Methods ---
969
970    /// Normalize a weight into the configured clamp range.
971    fn normalize_weight(&self, weight: f64) -> f64 {
972        if weight.is_nan() {
973            return self.config.w_min;
974        }
975        if weight.is_infinite() {
976            return if weight.is_sign_positive() {
977                self.config.w_max
978            } else {
979                self.config.w_min
980            };
981        }
982        weight.clamp(self.config.w_min, self.config.w_max)
983    }
984
985    /// Normalize a processing-time estimate into the configured clamp range.
986    fn normalize_time(&self, estimate_ms: f64) -> f64 {
987        if estimate_ms.is_nan() {
988            return self.config.p_max_ms;
989        }
990        if estimate_ms.is_infinite() {
991            return if estimate_ms.is_sign_positive() {
992                self.config.p_max_ms
993            } else {
994                self.config.p_min_ms
995            };
996        }
997        estimate_ms.clamp(self.config.p_min_ms, self.config.p_max_ms)
998    }
999
1000    /// Resolve a weight based on its declared source, then clamp to config limits.
1001    fn normalize_weight_with_source(&self, weight: f64, source: WeightSource) -> f64 {
1002        let resolved = match source {
1003            WeightSource::Explicit => weight,
1004            WeightSource::Default => self.config.weight_default,
1005            WeightSource::Unknown => self.config.weight_unknown,
1006        };
1007        self.normalize_weight(resolved)
1008    }
1009
1010    /// Resolve an estimate based on its declared source, then clamp to config limits.
1011    fn normalize_time_with_source(&self, estimate_ms: f64, source: EstimateSource) -> f64 {
1012        let resolved = match source {
1013            EstimateSource::Explicit | EstimateSource::Historical => estimate_ms,
1014            EstimateSource::Default => self.config.estimate_default_ms,
1015            EstimateSource::Unknown => self.config.estimate_unknown_ms,
1016        };
1017        self.normalize_time(resolved)
1018    }
1019
1020    /// Compute base ratio (w/p) for Smith's rule.
1021    fn compute_base_ratio(&self, job: &Job) -> f64 {
1022        if self.config.mode() == SchedulingMode::Fifo {
1023            return 0.0;
1024        }
1025        let remaining = job.remaining_time.max(self.config.p_min_ms);
1026        let weight = match self.config.mode() {
1027            SchedulingMode::Smith => job.weight,
1028            SchedulingMode::Srpt => 1.0,
1029            SchedulingMode::Fifo => 0.0,
1030        };
1031        weight / remaining
1032    }
1033
1034    /// Compute effective priority (base ratio + aging, with starvation guard).
1035    fn compute_priority(&self, job: &Job) -> f64 {
1036        if self.config.mode() == SchedulingMode::Fifo {
1037            return 0.0;
1038        }
1039        let base_ratio = self.compute_base_ratio(job);
1040        let wait_time = (self.current_time - job.arrival_time).max(0.0);
1041        let aging_boost = self.config.aging_factor * wait_time;
1042        let mut effective = base_ratio + aging_boost;
1043
1044        if self.config.wait_starve_ms > 0.0 && wait_time >= self.config.wait_starve_ms {
1045            effective = effective.max(base_ratio * self.config.starve_boost_ratio);
1046        }
1047
1048        effective
1049    }
1050
1051    /// Build a priority-queue entry for a job.
1052    fn make_priority_job(&self, job: Job) -> PriorityJob {
1053        let base_ratio = self.compute_base_ratio(&job);
1054        let priority = self.compute_priority(&job);
1055        PriorityJob {
1056            priority,
1057            base_ratio,
1058            job,
1059            mode: self.config.mode(),
1060        }
1061    }
1062
1063    /// Determine the tie-break reason between two candidates.
1064    fn tie_break_reason(&self, a: &PriorityJob, b: &PriorityJob) -> TieBreakReason {
1065        if self.config.mode() == SchedulingMode::Fifo {
1066            if a.job.arrival_seq != b.job.arrival_seq {
1067                return TieBreakReason::ArrivalSeq;
1068            }
1069            return TieBreakReason::JobId;
1070        }
1071        if a.priority.total_cmp(&b.priority) != Ordering::Equal {
1072            TieBreakReason::EffectivePriority
1073        } else if a.base_ratio.total_cmp(&b.base_ratio) != Ordering::Equal {
1074            TieBreakReason::BaseRatio
1075        } else if a.job.weight.total_cmp(&b.job.weight) != Ordering::Equal {
1076            TieBreakReason::Weight
1077        } else if a.job.remaining_time.total_cmp(&b.job.remaining_time) != Ordering::Equal {
1078            TieBreakReason::RemainingTime
1079        } else if a.job.arrival_seq != b.job.arrival_seq {
1080            TieBreakReason::ArrivalSeq
1081        } else {
1082            TieBreakReason::JobId
1083        }
1084    }
1085
1086    /// Check if current job should be preempted.
1087    fn maybe_preempt(&mut self) {
1088        if self.config.mode() == SchedulingMode::Fifo {
1089            return;
1090        }
1091        if let Some(ref current) = self.current_job
1092            && let Some(pj) = self.queue.peek()
1093        {
1094            let current_pj = self.make_priority_job(current.clone());
1095            if pj.cmp(&current_pj) == Ordering::Greater {
1096                // Preempt
1097                let old = self.current_job.take().unwrap();
1098                let priority_job = self.make_priority_job(old);
1099                self.queue.push(priority_job);
1100                self.stats.total_preemptions += 1;
1101            }
1102        }
1103    }
1104
1105    /// Refresh priorities for all queued jobs (aging effect).
1106    fn refresh_priorities(&mut self) {
1107        let jobs: Vec<_> = self.queue.drain().map(|pj| pj.job).collect();
1108        for job in jobs {
1109            let priority_job = self.make_priority_job(job);
1110            self.queue.push(priority_job);
1111        }
1112    }
1113
1114    /// Compute wait time statistics.
1115    fn compute_wait_stats(&self) -> (f64, f64) {
1116        let mut total_wait = 0.0;
1117        let mut max_wait = 0.0f64;
1118        let mut count = 0;
1119
1120        for pj in self.queue.iter() {
1121            let wait = (self.current_time - pj.job.arrival_time).max(0.0);
1122            total_wait += wait;
1123            max_wait = max_wait.max(wait);
1124            count += 1;
1125        }
1126
1127        if let Some(ref j) = self.current_job {
1128            let wait = (self.current_time - j.arrival_time).max(0.0);
1129            total_wait += wait;
1130            max_wait = max_wait.max(wait);
1131            count += 1;
1132        }
1133
1134        let mean = if count > 0 {
1135            total_wait / count as f64
1136        } else {
1137            0.0
1138        };
1139        (mean, max_wait)
1140    }
1141}
1142
1143// =============================================================================
1144// Unit Tests (bd-13pq.7)
1145// =============================================================================
1146
1147#[cfg(test)]
1148mod tests {
1149    use super::*;
1150    use std::collections::HashMap;
1151
1152    fn test_config() -> SchedulerConfig {
1153        SchedulerConfig {
1154            aging_factor: 0.001,
1155            p_min_ms: DEFAULT_P_MIN_MS,
1156            p_max_ms: DEFAULT_P_MAX_MS,
1157            estimate_default_ms: DEFAULT_ESTIMATE_DEFAULT_MS,
1158            estimate_unknown_ms: DEFAULT_ESTIMATE_UNKNOWN_MS,
1159            w_min: DEFAULT_W_MIN,
1160            w_max: DEFAULT_W_MAX,
1161            weight_default: DEFAULT_WEIGHT_DEFAULT,
1162            weight_unknown: DEFAULT_WEIGHT_UNKNOWN,
1163            wait_starve_ms: DEFAULT_WAIT_STARVE_MS,
1164            starve_boost_ratio: DEFAULT_STARVE_BOOST_RATIO,
1165            smith_enabled: true,
1166            force_fifo: false,
1167            max_queue_size: 100,
1168            preemptive: true,
1169            time_quantum: 10.0,
1170            enable_logging: false,
1171        }
1172    }
1173
1174    #[derive(Clone, Copy, Debug)]
1175    struct WorkloadJob {
1176        arrival: u64,
1177        weight: f64,
1178        duration: f64,
1179    }
1180
1181    #[derive(Clone, Copy, Debug, Eq, PartialEq)]
1182    enum SimPolicy {
1183        Smith,
1184        Fifo,
1185    }
1186
1187    #[derive(Debug)]
1188    struct SimulationMetrics {
1189        mean: f64,
1190        p95: f64,
1191        p99: f64,
1192        max: f64,
1193        job_count: usize,
1194        completion_order: Vec<u64>,
1195    }
1196
1197    fn mixed_workload() -> Vec<WorkloadJob> {
1198        let mut jobs = Vec::new();
1199        jobs.push(WorkloadJob {
1200            arrival: 0,
1201            weight: 1.0,
1202            duration: 100.0,
1203        });
1204        for t in 1..=200u64 {
1205            jobs.push(WorkloadJob {
1206                arrival: t,
1207                weight: 1.0,
1208                duration: 1.0,
1209            });
1210        }
1211        jobs
1212    }
1213
1214    fn percentile(sorted: &[f64], p: f64) -> f64 {
1215        if sorted.is_empty() {
1216            return 0.0;
1217        }
1218        let idx = ((sorted.len() as f64 - 1.0) * p).ceil() as usize;
1219        sorted[idx.min(sorted.len() - 1)]
1220    }
1221
1222    fn summary_json(policy: SimPolicy, metrics: &SimulationMetrics) -> String {
1223        let policy = match policy {
1224            SimPolicy::Smith => "Smith",
1225            SimPolicy::Fifo => "Fifo",
1226        };
1227        let head: Vec<String> = metrics
1228            .completion_order
1229            .iter()
1230            .take(8)
1231            .map(|id| id.to_string())
1232            .collect();
1233        let tail: Vec<String> = metrics
1234            .completion_order
1235            .iter()
1236            .rev()
1237            .take(3)
1238            .collect::<Vec<_>>()
1239            .into_iter()
1240            .rev()
1241            .map(|id| id.to_string())
1242            .collect();
1243        format!(
1244            "{{\"policy\":\"{policy}\",\"jobs\":{jobs},\"mean\":{mean:.3},\"p95\":{p95:.3},\"p99\":{p99:.3},\"max\":{max:.3},\"order_head\":[{head}],\"order_tail\":[{tail}]}}",
1245            policy = policy,
1246            jobs = metrics.job_count,
1247            mean = metrics.mean,
1248            p95 = metrics.p95,
1249            p99 = metrics.p99,
1250            max = metrics.max,
1251            head = head.join(","),
1252            tail = tail.join(",")
1253        )
1254    }
1255
1256    fn workload_summary_json(workload: &[WorkloadJob]) -> String {
1257        if workload.is_empty() {
1258            return "{\"workload\":\"empty\"}".to_string();
1259        }
1260        let mut min_arrival = u64::MAX;
1261        let mut max_arrival = 0u64;
1262        let mut min_duration = f64::INFINITY;
1263        let mut max_duration: f64 = 0.0;
1264        let mut total_work: f64 = 0.0;
1265        let mut long_jobs = 0usize;
1266        let long_threshold = 10.0;
1267
1268        for job in workload {
1269            min_arrival = min_arrival.min(job.arrival);
1270            max_arrival = max_arrival.max(job.arrival);
1271            min_duration = min_duration.min(job.duration);
1272            max_duration = max_duration.max(job.duration);
1273            total_work += job.duration;
1274            if job.duration >= long_threshold {
1275                long_jobs += 1;
1276            }
1277        }
1278
1279        format!(
1280            "{{\"workload\":\"mixed\",\"jobs\":{jobs},\"arrival_min\":{arrival_min},\"arrival_max\":{arrival_max},\"duration_min\":{duration_min:.3},\"duration_max\":{duration_max:.3},\"total_work\":{total_work:.3},\"long_jobs\":{long_jobs},\"long_threshold\":{long_threshold:.1}}}",
1281            jobs = workload.len(),
1282            arrival_min = min_arrival,
1283            arrival_max = max_arrival,
1284            duration_min = min_duration,
1285            duration_max = max_duration,
1286            total_work = total_work,
1287            long_jobs = long_jobs,
1288            long_threshold = long_threshold
1289        )
1290    }
1291
1292    fn simulate_policy(policy: SimPolicy, workload: &[WorkloadJob]) -> SimulationMetrics {
1293        let mut config = test_config();
1294        config.aging_factor = 0.0;
1295        config.wait_starve_ms = 0.0;
1296        config.starve_boost_ratio = 1.0;
1297        config.smith_enabled = policy == SimPolicy::Smith;
1298        config.force_fifo = policy == SimPolicy::Fifo;
1299        config.preemptive = true;
1300
1301        let mut scheduler = QueueingScheduler::new(config);
1302        let mut arrivals = workload.to_vec();
1303        arrivals.sort_by_key(|job| job.arrival);
1304
1305        let mut arrival_times: HashMap<u64, f64> = HashMap::new();
1306        let mut response_times = Vec::with_capacity(arrivals.len());
1307        let mut completion_order = Vec::with_capacity(arrivals.len());
1308
1309        let mut idx = 0usize;
1310        let mut safety = 0usize;
1311
1312        while (idx < arrivals.len() || scheduler.peek_next().is_some()) && safety < 10_000 {
1313            let now = scheduler.current_time;
1314
1315            while idx < arrivals.len() && (arrivals[idx].arrival as f64) <= now + f64::EPSILON {
1316                let job = arrivals[idx];
1317                let id = scheduler
1318                    .submit(job.weight, job.duration)
1319                    .expect("queue capacity should not be exceeded");
1320                arrival_times.insert(id, scheduler.current_time);
1321                idx += 1;
1322            }
1323
1324            if scheduler.peek_next().is_none() {
1325                if idx < arrivals.len() {
1326                    let next_time = arrivals[idx].arrival as f64;
1327                    let delta = (next_time - scheduler.current_time).max(0.0);
1328                    let completed = scheduler.tick(delta);
1329                    for id in completed {
1330                        let arrival = arrival_times.get(&id).copied().unwrap_or(0.0);
1331                        response_times.push(scheduler.current_time - arrival);
1332                        completion_order.push(id);
1333                    }
1334                }
1335                safety += 1;
1336                continue;
1337            }
1338
1339            let completed = scheduler.tick(1.0);
1340            for id in completed {
1341                let arrival = arrival_times.get(&id).copied().unwrap_or(0.0);
1342                response_times.push(scheduler.current_time - arrival);
1343                completion_order.push(id);
1344            }
1345            safety += 1;
1346        }
1347
1348        assert_eq!(
1349            response_times.len(),
1350            arrivals.len(),
1351            "simulation did not complete all jobs"
1352        );
1353
1354        let mut sorted = response_times.clone();
1355        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
1356
1357        let mean = response_times.iter().sum::<f64>() / response_times.len() as f64;
1358        let p95 = percentile(&sorted, 0.95);
1359        let p99 = percentile(&sorted, 0.99);
1360        let max = *sorted.last().unwrap_or(&0.0);
1361
1362        SimulationMetrics {
1363            mean,
1364            p95,
1365            p99,
1366            max,
1367            job_count: response_times.len(),
1368            completion_order,
1369        }
1370    }
1371
1372    // =========================================================================
1373    // Initialization tests
1374    // =========================================================================
1375
1376    #[test]
1377    fn new_creates_empty_scheduler() {
1378        let scheduler = QueueingScheduler::new(test_config());
1379        assert_eq!(scheduler.stats().queue_length, 0);
1380        assert!(scheduler.peek_next().is_none());
1381    }
1382
1383    #[test]
1384    fn default_config_valid() {
1385        let config = SchedulerConfig::default();
1386        let scheduler = QueueingScheduler::new(config);
1387        assert_eq!(scheduler.stats().queue_length, 0);
1388    }
1389
1390    // =========================================================================
1391    // Job submission tests
1392    // =========================================================================
1393
1394    #[test]
1395    fn submit_returns_job_id() {
1396        let mut scheduler = QueueingScheduler::new(test_config());
1397        let id = scheduler.submit(1.0, 10.0);
1398        assert_eq!(id, Some(1));
1399    }
1400
1401    #[test]
1402    fn submit_increments_job_id() {
1403        let mut scheduler = QueueingScheduler::new(test_config());
1404        let id1 = scheduler.submit(1.0, 10.0);
1405        let id2 = scheduler.submit(1.0, 10.0);
1406        assert_eq!(id1, Some(1));
1407        assert_eq!(id2, Some(2));
1408    }
1409
1410    #[test]
1411    fn submit_rejects_when_queue_full() {
1412        let mut config = test_config();
1413        config.max_queue_size = 2;
1414        let mut scheduler = QueueingScheduler::new(config);
1415
1416        assert!(scheduler.submit(1.0, 10.0).is_some());
1417        assert!(scheduler.submit(1.0, 10.0).is_some());
1418        assert!(scheduler.submit(1.0, 10.0).is_none()); // Rejected
1419        assert_eq!(scheduler.stats().total_rejected, 1);
1420    }
1421
1422    #[test]
1423    fn submit_named_job() {
1424        let mut scheduler = QueueingScheduler::new(test_config());
1425        let id = scheduler.submit_named(1.0, 10.0, Some("test-job"));
1426        assert!(id.is_some());
1427    }
1428
1429    // =========================================================================
1430    // SRPT ordering tests
1431    // =========================================================================
1432
1433    #[test]
1434    fn srpt_prefers_shorter_jobs() {
1435        let mut scheduler = QueueingScheduler::new(test_config());
1436
1437        scheduler.submit(1.0, 100.0); // Long job
1438        scheduler.submit(1.0, 10.0); // Short job
1439
1440        let next = scheduler.peek_next().unwrap();
1441        assert_eq!(next.remaining_time, 10.0); // Short job selected
1442    }
1443
1444    #[test]
1445    fn smith_rule_prefers_high_weight() {
1446        let mut scheduler = QueueingScheduler::new(test_config());
1447
1448        scheduler.submit(1.0, 10.0); // Low weight
1449        scheduler.submit(10.0, 10.0); // High weight
1450
1451        let next = scheduler.peek_next().unwrap();
1452        assert_eq!(next.weight, 10.0); // High weight selected
1453    }
1454
1455    #[test]
1456    fn smith_rule_balances_weight_and_time() {
1457        let mut scheduler = QueueingScheduler::new(test_config());
1458
1459        scheduler.submit(2.0, 20.0); // priority = 2/20 = 0.1
1460        scheduler.submit(1.0, 5.0); // priority = 1/5 = 0.2
1461
1462        let next = scheduler.peek_next().unwrap();
1463        assert_eq!(next.remaining_time, 5.0); // Higher priority
1464    }
1465
1466    // =========================================================================
1467    // Aging tests
1468    // =========================================================================
1469
1470    #[test]
1471    fn aging_increases_priority_over_time() {
1472        let mut scheduler = QueueingScheduler::new(test_config());
1473
1474        scheduler.submit(1.0, 100.0); // Long job
1475        scheduler.tick(0.0); // Process nothing, just advance
1476
1477        let before_aging = scheduler.compute_priority(scheduler.peek_next().unwrap());
1478
1479        scheduler.current_time = 100.0; // Advance time significantly
1480        scheduler.refresh_priorities();
1481
1482        let after_aging = scheduler.compute_priority(scheduler.peek_next().unwrap());
1483        assert!(
1484            after_aging > before_aging,
1485            "Priority should increase with wait time"
1486        );
1487    }
1488
1489    #[test]
1490    fn aging_prevents_starvation() {
1491        let mut config = test_config();
1492        config.aging_factor = 1.0; // High aging
1493        let mut scheduler = QueueingScheduler::new(config);
1494
1495        scheduler.submit(1.0, 1000.0); // Very long job
1496        scheduler.submit(1.0, 1.0); // Short job
1497
1498        // Initially, short job should be preferred
1499        assert_eq!(scheduler.peek_next().unwrap().remaining_time, 1.0);
1500
1501        // After the short job completes, long job should eventually run
1502        let completed = scheduler.tick(1.0);
1503        assert_eq!(completed.len(), 1);
1504
1505        assert!(scheduler.peek_next().is_some());
1506    }
1507
1508    // =========================================================================
1509    // Preemption tests
1510    // =========================================================================
1511
1512    #[test]
1513    fn preemption_when_higher_priority_arrives() {
1514        let mut scheduler = QueueingScheduler::new(test_config());
1515
1516        scheduler.submit(1.0, 100.0); // Start processing long job
1517        scheduler.tick(10.0); // Process 10 units
1518
1519        let before = scheduler.peek_next().unwrap().remaining_time;
1520        assert_eq!(before, 90.0);
1521
1522        scheduler.submit(1.0, 5.0); // Higher priority arrives
1523
1524        // Should now be processing the short job
1525        let next = scheduler.peek_next().unwrap();
1526        assert_eq!(next.remaining_time, 5.0);
1527
1528        // Stats should show preemption
1529        assert_eq!(scheduler.stats().total_preemptions, 1);
1530    }
1531
1532    #[test]
1533    fn no_preemption_when_disabled() {
1534        let mut config = test_config();
1535        config.preemptive = false;
1536        let mut scheduler = QueueingScheduler::new(config);
1537
1538        scheduler.submit(1.0, 100.0);
1539        scheduler.tick(10.0);
1540
1541        scheduler.submit(1.0, 5.0); // Would preempt if enabled
1542
1543        // Should still be processing the first job
1544        let next = scheduler.peek_next().unwrap();
1545        assert_eq!(next.remaining_time, 90.0);
1546    }
1547
1548    // =========================================================================
1549    // Processing tests
1550    // =========================================================================
1551
1552    #[test]
1553    fn tick_processes_jobs() {
1554        let mut scheduler = QueueingScheduler::new(test_config());
1555
1556        scheduler.submit(1.0, 10.0);
1557        let completed = scheduler.tick(5.0);
1558
1559        assert!(completed.is_empty()); // Not complete yet
1560        assert_eq!(scheduler.peek_next().unwrap().remaining_time, 5.0);
1561    }
1562
1563    #[test]
1564    fn tick_completes_jobs() {
1565        let mut scheduler = QueueingScheduler::new(test_config());
1566
1567        scheduler.submit(1.0, 10.0);
1568        let completed = scheduler.tick(10.0);
1569
1570        assert_eq!(completed.len(), 1);
1571        assert_eq!(completed[0], 1);
1572        assert!(scheduler.peek_next().is_none());
1573    }
1574
1575    #[test]
1576    fn tick_completes_multiple_jobs() {
1577        let mut scheduler = QueueingScheduler::new(test_config());
1578
1579        scheduler.submit(1.0, 5.0);
1580        scheduler.submit(1.0, 5.0);
1581        let completed = scheduler.tick(10.0);
1582
1583        assert_eq!(completed.len(), 2);
1584    }
1585
1586    #[test]
1587    fn tick_handles_zero_delta() {
1588        let mut scheduler = QueueingScheduler::new(test_config());
1589        scheduler.submit(1.0, 10.0);
1590        let completed = scheduler.tick(0.0);
1591        assert!(completed.is_empty());
1592    }
1593
1594    // =========================================================================
1595    // Statistics tests
1596    // =========================================================================
1597
1598    #[test]
1599    fn stats_track_submissions() {
1600        let mut scheduler = QueueingScheduler::new(test_config());
1601
1602        scheduler.submit(1.0, 10.0);
1603        scheduler.submit(1.0, 10.0);
1604
1605        let stats = scheduler.stats();
1606        assert_eq!(stats.total_submitted, 2);
1607        assert_eq!(stats.queue_length, 2);
1608    }
1609
1610    #[test]
1611    fn stats_track_completions() {
1612        let mut scheduler = QueueingScheduler::new(test_config());
1613
1614        scheduler.submit(1.0, 10.0);
1615        scheduler.tick(10.0);
1616
1617        let stats = scheduler.stats();
1618        assert_eq!(stats.total_completed, 1);
1619    }
1620
1621    #[test]
1622    fn stats_compute_mean_response_time() {
1623        let mut scheduler = QueueingScheduler::new(test_config());
1624
1625        scheduler.submit(1.0, 10.0);
1626        scheduler.submit(1.0, 10.0);
1627        scheduler.tick(20.0);
1628
1629        let stats = scheduler.stats();
1630        // First job: 10 time units, Second job: 20 time units
1631        // Mean: (10 + 20) / 2 = 15
1632        assert_eq!(stats.total_completed, 2);
1633        assert!(stats.mean_response_time() > 0.0);
1634    }
1635
1636    #[test]
1637    fn stats_compute_throughput() {
1638        let mut scheduler = QueueingScheduler::new(test_config());
1639
1640        scheduler.submit(1.0, 10.0);
1641        scheduler.tick(10.0);
1642
1643        let stats = scheduler.stats();
1644        // 1 job in 10 time units
1645        assert!((stats.throughput() - 0.1).abs() < 0.01);
1646    }
1647
1648    // =========================================================================
1649    // Evidence tests
1650    // =========================================================================
1651
1652    #[test]
1653    fn evidence_reports_queue_empty() {
1654        let scheduler = QueueingScheduler::new(test_config());
1655        let evidence = scheduler.evidence();
1656        assert_eq!(evidence.reason, SelectionReason::QueueEmpty);
1657        assert!(evidence.selected_job_id.is_none());
1658        assert!(evidence.tie_break_reason.is_none());
1659        assert!(evidence.jobs.is_empty());
1660    }
1661
1662    #[test]
1663    fn evidence_reports_selected_job() {
1664        let mut scheduler = QueueingScheduler::new(test_config());
1665        scheduler.submit(1.0, 10.0);
1666        let evidence = scheduler.evidence();
1667        assert_eq!(evidence.selected_job_id, Some(1));
1668        assert_eq!(evidence.jobs.len(), 1);
1669    }
1670
1671    #[test]
1672    fn evidence_reports_wait_stats() {
1673        let mut scheduler = QueueingScheduler::new(test_config());
1674        scheduler.submit(1.0, 100.0);
1675        scheduler.submit(1.0, 100.0);
1676        scheduler.current_time = 50.0;
1677        scheduler.refresh_priorities();
1678
1679        let evidence = scheduler.evidence();
1680        assert!(evidence.mean_wait_time > 0.0);
1681        assert!(evidence.max_wait_time > 0.0);
1682    }
1683
1684    // =========================================================================
1685    // Config override tests
1686    // =========================================================================
1687
1688    #[test]
1689    fn force_fifo_overrides_priority() {
1690        let mut config = test_config();
1691        config.force_fifo = true;
1692        let mut scheduler = QueueingScheduler::new(config);
1693
1694        let first = scheduler.submit(1.0, 100.0).unwrap();
1695        let second = scheduler.submit(10.0, 1.0).unwrap();
1696
1697        let next = scheduler.peek_next().unwrap();
1698        assert_eq!(next.id, first);
1699        assert_ne!(next.id, second);
1700        assert_eq!(scheduler.evidence().reason, SelectionReason::Fifo);
1701    }
1702
1703    #[test]
1704    fn default_sources_use_config_values() {
1705        let mut config = test_config();
1706        config.weight_default = 7.0;
1707        config.estimate_default_ms = 12.0;
1708        let mut scheduler = QueueingScheduler::new(config);
1709
1710        scheduler.submit_with_sources(
1711            999.0,
1712            999.0,
1713            WeightSource::Default,
1714            EstimateSource::Default,
1715            None::<&str>,
1716        );
1717
1718        let next = scheduler.peek_next().unwrap();
1719        assert!((next.weight - 7.0).abs() < f64::EPSILON);
1720        assert!((next.remaining_time - 12.0).abs() < f64::EPSILON);
1721    }
1722
1723    #[test]
1724    fn unknown_sources_use_config_values() {
1725        let mut config = test_config();
1726        config.weight_unknown = 2.5;
1727        config.estimate_unknown_ms = 250.0;
1728        let mut scheduler = QueueingScheduler::new(config);
1729
1730        scheduler.submit_with_sources(
1731            0.0,
1732            0.0,
1733            WeightSource::Unknown,
1734            EstimateSource::Unknown,
1735            None::<&str>,
1736        );
1737
1738        let next = scheduler.peek_next().unwrap();
1739        assert!((next.weight - 2.5).abs() < f64::EPSILON);
1740        assert!((next.remaining_time - 250.0).abs() < f64::EPSILON);
1741    }
1742
1743    // =========================================================================
1744    // Tie-break tests
1745    // =========================================================================
1746
1747    #[test]
1748    fn tie_break_prefers_base_ratio_when_effective_equal() {
1749        let mut config = test_config();
1750        config.aging_factor = 0.1;
1751        let mut scheduler = QueueingScheduler::new(config);
1752
1753        // Job A: lower base ratio but older (aging brings it up).
1754        let id_a = scheduler.submit(1.0, 2.0).unwrap(); // ratio 0.5
1755        scheduler.current_time = 5.0;
1756        scheduler.refresh_priorities();
1757
1758        // Job B: higher base ratio, newer.
1759        let id_b = scheduler.submit(1.0, 1.0).unwrap(); // ratio 1.0
1760        scheduler.refresh_priorities();
1761
1762        let next = scheduler.peek_next().unwrap();
1763        assert_eq!(next.id, id_b);
1764
1765        let evidence = scheduler.evidence();
1766        assert_eq!(evidence.selected_job_id, Some(id_b));
1767        assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::BaseRatio));
1768        assert_ne!(id_a, id_b);
1769    }
1770
1771    #[test]
1772    fn tie_break_prefers_weight_over_arrival() {
1773        let mut scheduler = QueueingScheduler::new(test_config());
1774
1775        let high_weight = scheduler.submit(2.0, 2.0).unwrap(); // ratio 1.0
1776        let _low_weight = scheduler.submit(1.0, 1.0).unwrap(); // ratio 1.0
1777
1778        let evidence = scheduler.evidence();
1779        assert_eq!(evidence.selected_job_id, Some(high_weight));
1780        assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::Weight));
1781    }
1782
1783    #[test]
1784    fn tie_break_prefers_arrival_seq_when_all_equal() {
1785        let mut config = test_config();
1786        config.aging_factor = 0.0;
1787        let mut scheduler = QueueingScheduler::new(config);
1788
1789        let first = scheduler.submit(1.0, 10.0).unwrap();
1790        let second = scheduler.submit(1.0, 10.0).unwrap();
1791
1792        let evidence = scheduler.evidence();
1793        assert_eq!(evidence.selected_job_id, Some(first));
1794        assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::ArrivalSeq));
1795        assert_ne!(first, second);
1796    }
1797
1798    // =========================================================================
1799    // Ordering + safety edge cases (bd-3e1t.10.4)
1800    // =========================================================================
1801
1802    #[test]
1803    fn srpt_mode_ignores_weights() {
1804        let mut config = test_config();
1805        config.smith_enabled = false;
1806        let mut scheduler = QueueingScheduler::new(config);
1807
1808        scheduler.submit(10.0, 100.0); // High weight, long
1809        scheduler.submit(1.0, 10.0); // Low weight, short
1810
1811        let next = scheduler.peek_next().unwrap();
1812        assert_eq!(next.remaining_time, 10.0);
1813        assert_eq!(
1814            scheduler.evidence().reason,
1815            SelectionReason::ShortestRemaining
1816        );
1817    }
1818
1819    #[test]
1820    fn fifo_mode_disables_preemption() {
1821        let mut config = test_config();
1822        config.force_fifo = true;
1823        config.preemptive = true;
1824        let mut scheduler = QueueingScheduler::new(config);
1825
1826        let first = scheduler.submit(1.0, 100.0).unwrap();
1827        scheduler.tick(10.0);
1828
1829        let _later = scheduler.submit(10.0, 1.0).unwrap();
1830        let next = scheduler.peek_next().unwrap();
1831        assert_eq!(next.id, first);
1832    }
1833
1834    #[test]
1835    fn explicit_zero_weight_clamps_to_min() {
1836        let mut config = test_config();
1837        config.w_min = 0.5;
1838        let mut scheduler = QueueingScheduler::new(config);
1839
1840        scheduler.submit_with_sources(
1841            0.0,
1842            1.0,
1843            WeightSource::Explicit,
1844            EstimateSource::Explicit,
1845            None::<&str>,
1846        );
1847
1848        let next = scheduler.peek_next().unwrap();
1849        assert!((next.weight - 0.5).abs() < f64::EPSILON);
1850    }
1851
1852    #[test]
1853    fn explicit_zero_estimate_clamps_to_min() {
1854        let mut config = test_config();
1855        config.p_min_ms = 2.0;
1856        let mut scheduler = QueueingScheduler::new(config);
1857
1858        scheduler.submit_with_sources(
1859            1.0,
1860            0.0,
1861            WeightSource::Explicit,
1862            EstimateSource::Explicit,
1863            None::<&str>,
1864        );
1865
1866        let next = scheduler.peek_next().unwrap();
1867        assert!((next.remaining_time - 2.0).abs() < f64::EPSILON);
1868    }
1869
1870    // =========================================================================
1871    // Cancel tests
1872    // =========================================================================
1873
1874    #[test]
1875    fn cancel_removes_job() {
1876        let mut scheduler = QueueingScheduler::new(test_config());
1877        let id = scheduler.submit(1.0, 10.0).unwrap();
1878
1879        assert!(scheduler.cancel(id));
1880        assert!(scheduler.peek_next().is_none());
1881    }
1882
1883    #[test]
1884    fn cancel_returns_false_for_nonexistent() {
1885        let mut scheduler = QueueingScheduler::new(test_config());
1886        assert!(!scheduler.cancel(999));
1887    }
1888
1889    // =========================================================================
1890    // Reset tests
1891    // =========================================================================
1892
1893    #[test]
1894    fn reset_clears_all_state() {
1895        let mut scheduler = QueueingScheduler::new(test_config());
1896
1897        scheduler.submit(1.0, 10.0);
1898        scheduler.tick(5.0);
1899
1900        scheduler.reset();
1901
1902        assert!(scheduler.peek_next().is_none());
1903        assert_eq!(scheduler.stats().total_submitted, 0);
1904        assert_eq!(scheduler.stats().total_completed, 0);
1905    }
1906
1907    #[test]
1908    fn clear_removes_jobs_but_keeps_stats() {
1909        let mut scheduler = QueueingScheduler::new(test_config());
1910
1911        scheduler.submit(1.0, 10.0);
1912        scheduler.clear();
1913
1914        assert!(scheduler.peek_next().is_none());
1915        assert_eq!(scheduler.stats().total_submitted, 1); // Stats preserved
1916    }
1917
1918    // =========================================================================
1919    // Job tests
1920    // =========================================================================
1921
1922    #[test]
1923    fn job_progress_increases() {
1924        let mut job = Job::new(1, 1.0, 100.0);
1925        assert_eq!(job.progress(), 0.0);
1926
1927        job.remaining_time = 50.0;
1928        assert!((job.progress() - 0.5).abs() < 0.01);
1929
1930        job.remaining_time = 0.0;
1931        assert_eq!(job.progress(), 1.0);
1932    }
1933
1934    #[test]
1935    fn job_is_complete() {
1936        let mut job = Job::new(1, 1.0, 10.0);
1937        assert!(!job.is_complete());
1938
1939        job.remaining_time = 0.0;
1940        assert!(job.is_complete());
1941    }
1942
1943    // =========================================================================
1944    // Property tests
1945    // =========================================================================
1946
1947    #[test]
1948    fn property_work_conserving() {
1949        let mut scheduler = QueueingScheduler::new(test_config());
1950
1951        // Submit jobs
1952        for i in 0..10 {
1953            scheduler.submit(1.0, (i as f64) + 1.0);
1954        }
1955
1956        // Process - should never be idle while jobs remain
1957        let mut total_processed = 0;
1958        while scheduler.peek_next().is_some() {
1959            let completed = scheduler.tick(1.0);
1960            total_processed += completed.len();
1961        }
1962
1963        assert_eq!(total_processed, 10);
1964    }
1965
1966    #[test]
1967    fn property_bounded_memory() {
1968        let mut config = test_config();
1969        config.max_queue_size = 100;
1970        let mut scheduler = QueueingScheduler::new(config);
1971
1972        // Submit many jobs
1973        for _ in 0..1000 {
1974            scheduler.submit(1.0, 10.0);
1975        }
1976
1977        assert!(scheduler.stats().queue_length <= 100);
1978    }
1979
1980    #[test]
1981    fn property_deterministic() {
1982        let run = || {
1983            let mut scheduler = QueueingScheduler::new(test_config());
1984            let mut completions = Vec::new();
1985
1986            for i in 0..20 {
1987                scheduler.submit(((i % 3) + 1) as f64, ((i % 5) + 1) as f64);
1988            }
1989
1990            for _ in 0..50 {
1991                completions.extend(scheduler.tick(1.0));
1992            }
1993
1994            completions
1995        };
1996
1997        let run1 = run();
1998        let run2 = run();
1999
2000        assert_eq!(run1, run2, "Scheduling should be deterministic");
2001    }
2002
2003    #[test]
2004    fn smith_beats_fifo_on_mixed_workload() {
2005        let workload = mixed_workload();
2006        let smith = simulate_policy(SimPolicy::Smith, &workload);
2007        let fifo = simulate_policy(SimPolicy::Fifo, &workload);
2008
2009        eprintln!("{}", workload_summary_json(&workload));
2010        eprintln!("{}", summary_json(SimPolicy::Smith, &smith));
2011        eprintln!("{}", summary_json(SimPolicy::Fifo, &fifo));
2012
2013        assert!(
2014            smith.mean < fifo.mean,
2015            "mean should improve: smith={} fifo={}",
2016            summary_json(SimPolicy::Smith, &smith),
2017            summary_json(SimPolicy::Fifo, &fifo)
2018        );
2019        assert!(
2020            smith.p95 < fifo.p95,
2021            "p95 should improve: smith={} fifo={}",
2022            summary_json(SimPolicy::Smith, &smith),
2023            summary_json(SimPolicy::Fifo, &fifo)
2024        );
2025        assert!(
2026            smith.p99 < fifo.p99,
2027            "p99 should improve: smith={} fifo={}",
2028            summary_json(SimPolicy::Smith, &smith),
2029            summary_json(SimPolicy::Fifo, &fifo)
2030        );
2031    }
2032
2033    #[test]
2034    fn simulation_is_deterministic_per_policy() {
2035        let workload = mixed_workload();
2036        let smith1 = simulate_policy(SimPolicy::Smith, &workload);
2037        let smith2 = simulate_policy(SimPolicy::Smith, &workload);
2038        let fifo1 = simulate_policy(SimPolicy::Fifo, &workload);
2039        let fifo2 = simulate_policy(SimPolicy::Fifo, &workload);
2040
2041        assert_eq!(smith1.completion_order, smith2.completion_order);
2042        assert_eq!(fifo1.completion_order, fifo2.completion_order);
2043        assert!((smith1.mean - smith2.mean).abs() < 1e-9);
2044        assert!((fifo1.mean - fifo2.mean).abs() < 1e-9);
2045    }
2046
2047    #[test]
2048    fn effect_queue_trace_is_deterministic() {
2049        let mut config = test_config();
2050        config.preemptive = false;
2051        config.aging_factor = 0.0;
2052        config.wait_starve_ms = 0.0;
2053        config.force_fifo = false;
2054        config.smith_enabled = true;
2055
2056        let mut scheduler = QueueingScheduler::new(config);
2057        let id_alpha = scheduler
2058            .submit_with_sources(
2059                1.0,
2060                8.0,
2061                WeightSource::Explicit,
2062                EstimateSource::Explicit,
2063                Some("alpha"),
2064            )
2065            .expect("alpha accepted");
2066        let id_beta = scheduler
2067            .submit_with_sources(
2068                4.0,
2069                2.0,
2070                WeightSource::Explicit,
2071                EstimateSource::Explicit,
2072                Some("beta"),
2073            )
2074            .expect("beta accepted");
2075        let id_gamma = scheduler
2076            .submit_with_sources(
2077                2.0,
2078                10.0,
2079                WeightSource::Explicit,
2080                EstimateSource::Explicit,
2081                Some("gamma"),
2082            )
2083            .expect("gamma accepted");
2084        let id_delta = scheduler
2085            .submit_with_sources(
2086                3.0,
2087                3.0,
2088                WeightSource::Explicit,
2089                EstimateSource::Explicit,
2090                Some("delta"),
2091            )
2092            .expect("delta accepted");
2093
2094        scheduler.refresh_priorities();
2095
2096        let mut selected = Vec::new();
2097        while let Some(job) = scheduler.peek_next().cloned() {
2098            let evidence = scheduler.evidence();
2099            if let Some(id) = evidence.selected_job_id {
2100                selected.push(id);
2101            }
2102            println!("{}", evidence.to_jsonl("effect_queue_select"));
2103
2104            let completed = scheduler.tick(job.remaining_time);
2105            assert!(
2106                !completed.is_empty(),
2107                "expected completion per tick in non-preemptive mode"
2108            );
2109        }
2110
2111        assert_eq!(selected, vec![id_beta, id_delta, id_gamma, id_alpha]);
2112    }
2113
2114    // =========================================================================
2115    // Edge case tests
2116    // =========================================================================
2117
2118    #[test]
2119    fn zero_weight_handled() {
2120        let mut scheduler = QueueingScheduler::new(test_config());
2121        scheduler.submit(0.0, 10.0);
2122        assert!(scheduler.peek_next().is_some());
2123    }
2124
2125    #[test]
2126    fn zero_time_completes_immediately() {
2127        let mut scheduler = QueueingScheduler::new(test_config());
2128        scheduler.submit(1.0, 0.0);
2129        let completed = scheduler.tick(1.0);
2130        assert_eq!(completed.len(), 1);
2131    }
2132
2133    #[test]
2134    fn negative_time_handled() {
2135        let mut scheduler = QueueingScheduler::new(test_config());
2136        scheduler.submit(1.0, -10.0);
2137        let completed = scheduler.tick(1.0);
2138        assert_eq!(completed.len(), 1);
2139    }
2140}