1#![forbid(unsafe_code)]
2
3use std::cmp::Ordering;
68use std::collections::BinaryHeap;
69use std::fmt::Write;
70
71const DEFAULT_AGING_FACTOR: f64 = 0.1;
73
74const MAX_QUEUE_SIZE: usize = 10_000;
76
77const DEFAULT_P_MIN_MS: f64 = 0.05;
79
80const DEFAULT_P_MAX_MS: f64 = 5_000.0;
82
83const DEFAULT_W_MIN: f64 = 1e-6;
85
86const DEFAULT_W_MAX: f64 = 100.0;
88
89const DEFAULT_WEIGHT_DEFAULT: f64 = 1.0;
91
92const DEFAULT_WEIGHT_UNKNOWN: f64 = 1.0;
94
95const DEFAULT_ESTIMATE_DEFAULT_MS: f64 = 10.0;
97
98const DEFAULT_ESTIMATE_UNKNOWN_MS: f64 = 1_000.0;
100
101const DEFAULT_WAIT_STARVE_MS: f64 = 500.0;
103
104const DEFAULT_STARVE_BOOST_RATIO: f64 = 1.5;
106
107#[derive(Debug, Clone)]
109pub struct SchedulerConfig {
110 pub aging_factor: f64,
114
115 pub p_min_ms: f64,
117
118 pub p_max_ms: f64,
120
121 pub estimate_default_ms: f64,
124
125 pub estimate_unknown_ms: f64,
128
129 pub w_min: f64,
131
132 pub w_max: f64,
134
135 pub weight_default: f64,
138
139 pub weight_unknown: f64,
142
143 pub wait_starve_ms: f64,
145
146 pub starve_boost_ratio: f64,
149
150 pub smith_enabled: bool,
152
153 pub force_fifo: bool,
156
157 pub max_queue_size: usize,
159
160 pub preemptive: bool,
162
163 pub time_quantum: f64,
166
167 pub enable_logging: bool,
169}
170
171impl Default for SchedulerConfig {
172 fn default() -> Self {
173 Self {
174 aging_factor: DEFAULT_AGING_FACTOR,
175 p_min_ms: DEFAULT_P_MIN_MS,
176 p_max_ms: DEFAULT_P_MAX_MS,
177 estimate_default_ms: DEFAULT_ESTIMATE_DEFAULT_MS,
178 estimate_unknown_ms: DEFAULT_ESTIMATE_UNKNOWN_MS,
179 w_min: DEFAULT_W_MIN,
180 w_max: DEFAULT_W_MAX,
181 weight_default: DEFAULT_WEIGHT_DEFAULT,
182 weight_unknown: DEFAULT_WEIGHT_UNKNOWN,
183 wait_starve_ms: DEFAULT_WAIT_STARVE_MS,
184 starve_boost_ratio: DEFAULT_STARVE_BOOST_RATIO,
185 smith_enabled: true,
186 force_fifo: false,
187 max_queue_size: MAX_QUEUE_SIZE,
188 preemptive: true,
189 time_quantum: 10.0,
190 enable_logging: false,
191 }
192 }
193}
194
195impl SchedulerConfig {
196 pub fn mode(&self) -> SchedulingMode {
198 if self.force_fifo {
199 SchedulingMode::Fifo
200 } else if self.smith_enabled {
201 SchedulingMode::Smith
202 } else {
203 SchedulingMode::Srpt
204 }
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct Job {
211 pub id: u64,
213
214 pub weight: f64,
216
217 pub remaining_time: f64,
219
220 pub total_time: f64,
222
223 pub arrival_time: f64,
225
226 pub arrival_seq: u64,
228
229 pub estimate_source: EstimateSource,
231
232 pub weight_source: WeightSource,
234
235 pub name: Option<String>,
237}
238
239impl Job {
240 pub fn new(id: u64, weight: f64, estimated_time: f64) -> Self {
242 let weight = if weight.is_nan() {
243 DEFAULT_W_MIN
244 } else if weight.is_infinite() {
245 if weight.is_sign_positive() {
246 DEFAULT_W_MAX
247 } else {
248 DEFAULT_W_MIN
249 }
250 } else {
251 weight.clamp(DEFAULT_W_MIN, DEFAULT_W_MAX)
252 };
253 let estimated_time = if estimated_time.is_nan() {
254 DEFAULT_P_MAX_MS
255 } else if estimated_time.is_infinite() {
256 if estimated_time.is_sign_positive() {
257 DEFAULT_P_MAX_MS
258 } else {
259 DEFAULT_P_MIN_MS
260 }
261 } else {
262 estimated_time.clamp(DEFAULT_P_MIN_MS, DEFAULT_P_MAX_MS)
263 };
264 Self {
265 id,
266 weight,
267 remaining_time: estimated_time,
268 total_time: estimated_time,
269 arrival_time: 0.0,
270 arrival_seq: 0,
271 estimate_source: EstimateSource::Explicit,
272 weight_source: WeightSource::Explicit,
273 name: None,
274 }
275 }
276
277 pub fn with_name(id: u64, weight: f64, estimated_time: f64, name: impl Into<String>) -> Self {
279 let mut job = Self::new(id, weight, estimated_time);
280 job.name = Some(name.into());
281 job
282 }
283
284 pub fn with_sources(
286 mut self,
287 weight_source: WeightSource,
288 estimate_source: EstimateSource,
289 ) -> Self {
290 self.weight_source = weight_source;
291 self.estimate_source = estimate_source;
292 self
293 }
294
295 pub fn progress(&self) -> f64 {
297 if self.total_time <= 0.0 {
298 1.0
299 } else {
300 1.0 - (self.remaining_time / self.total_time).clamp(0.0, 1.0)
301 }
302 }
303
304 pub fn is_complete(&self) -> bool {
306 self.remaining_time <= 0.0
307 }
308}
309
310#[derive(Debug, Clone)]
312struct PriorityJob {
313 priority: f64,
314 base_ratio: f64,
315 job: Job,
316 mode: SchedulingMode,
317}
318
319impl PartialEq for PriorityJob {
320 fn eq(&self, other: &Self) -> bool {
321 self.job.id == other.job.id
322 }
323}
324
325impl Eq for PriorityJob {}
326
327impl PartialOrd for PriorityJob {
328 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
329 Some(self.cmp(other))
330 }
331}
332
333impl Ord for PriorityJob {
334 fn cmp(&self, other: &Self) -> Ordering {
335 if self.mode == SchedulingMode::Fifo || other.mode == SchedulingMode::Fifo {
336 return other
337 .job
338 .arrival_seq
339 .cmp(&self.job.arrival_seq)
340 .then_with(|| other.job.id.cmp(&self.job.id));
341 }
342 self.priority
344 .total_cmp(&other.priority)
345 .then_with(|| self.base_ratio.total_cmp(&other.base_ratio))
347 .then_with(|| self.job.weight.total_cmp(&other.job.weight))
349 .then_with(|| other.job.remaining_time.total_cmp(&self.job.remaining_time))
351 .then_with(|| other.job.arrival_seq.cmp(&self.job.arrival_seq))
353 .then_with(|| other.job.id.cmp(&self.job.id))
355 }
356}
357
358#[derive(Debug, Clone)]
360pub struct SchedulingEvidence {
361 pub current_time: f64,
363
364 pub selected_job_id: Option<u64>,
366
367 pub queue_length: usize,
369
370 pub mean_wait_time: f64,
372
373 pub max_wait_time: f64,
375
376 pub reason: SelectionReason,
378
379 pub tie_break_reason: Option<TieBreakReason>,
381
382 pub jobs: Vec<JobEvidence>,
384}
385
386#[derive(Debug, Clone)]
388pub struct JobEvidence {
389 pub job_id: u64,
391 pub name: Option<String>,
393 pub estimate_ms: f64,
395 pub weight: f64,
397 pub ratio: f64,
399 pub aging_reward: f64,
401 pub starvation_floor: f64,
403 pub age_ms: f64,
405 pub effective_priority: f64,
407 pub objective_loss_proxy: f64,
409 pub estimate_source: EstimateSource,
411 pub weight_source: WeightSource,
413}
414
415#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum SelectionReason {
418 QueueEmpty,
420 ShortestRemaining,
422 HighestWeightedPriority,
424 Fifo,
426 AgingBoost,
428 Continuation,
430}
431
432impl SelectionReason {
433 fn as_str(self) -> &'static str {
434 match self {
435 Self::QueueEmpty => "queue_empty",
436 Self::ShortestRemaining => "shortest_remaining",
437 Self::HighestWeightedPriority => "highest_weighted_priority",
438 Self::Fifo => "fifo",
439 Self::AgingBoost => "aging_boost",
440 Self::Continuation => "continuation",
441 }
442 }
443}
444
445#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum EstimateSource {
448 Explicit,
450 Historical,
452 Default,
454 Unknown,
456}
457
458impl EstimateSource {
459 fn as_str(self) -> &'static str {
460 match self {
461 Self::Explicit => "explicit",
462 Self::Historical => "historical",
463 Self::Default => "default",
464 Self::Unknown => "unknown",
465 }
466 }
467}
468
469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
471pub enum WeightSource {
472 Explicit,
474 Default,
476 Unknown,
478}
479
480impl WeightSource {
481 fn as_str(self) -> &'static str {
482 match self {
483 Self::Explicit => "explicit",
484 Self::Default => "default",
485 Self::Unknown => "unknown",
486 }
487 }
488}
489
490#[derive(Debug, Clone, Copy, PartialEq, Eq)]
492pub enum TieBreakReason {
493 EffectivePriority,
495 BaseRatio,
497 Weight,
499 RemainingTime,
501 ArrivalSeq,
503 JobId,
505 Continuation,
507}
508
509impl TieBreakReason {
510 fn as_str(self) -> &'static str {
511 match self {
512 Self::EffectivePriority => "effective_priority",
513 Self::BaseRatio => "base_ratio",
514 Self::Weight => "weight",
515 Self::RemainingTime => "remaining_time",
516 Self::ArrivalSeq => "arrival_seq",
517 Self::JobId => "job_id",
518 Self::Continuation => "continuation",
519 }
520 }
521}
522
523#[derive(Debug, Clone, Copy, PartialEq, Eq)]
525pub enum SchedulingMode {
526 Smith,
528 Srpt,
530 Fifo,
532}
533
534impl SchedulingEvidence {
535 #[must_use]
537 pub fn to_jsonl(&self, event: &str) -> String {
538 let mut out = String::with_capacity(256 + (self.jobs.len() * 64));
539 out.push_str("{\"event\":\"");
540 out.push_str(&escape_json(event));
541 out.push_str("\",\"current_time\":");
542 let _ = write!(out, "{:.6}", self.current_time);
543 out.push_str(",\"selected_job_id\":");
544 match self.selected_job_id {
545 Some(id) => {
546 let _ = write!(out, "{id}");
547 }
548 None => out.push_str("null"),
549 }
550 out.push_str(",\"queue_length\":");
551 let _ = write!(out, "{}", self.queue_length);
552 out.push_str(",\"mean_wait_time\":");
553 let _ = write!(out, "{:.6}", self.mean_wait_time);
554 out.push_str(",\"max_wait_time\":");
555 let _ = write!(out, "{:.6}", self.max_wait_time);
556 out.push_str(",\"reason\":\"");
557 out.push_str(self.reason.as_str());
558 out.push('"');
559 out.push_str(",\"tie_break_reason\":");
560 match self.tie_break_reason {
561 Some(reason) => {
562 out.push('"');
563 out.push_str(reason.as_str());
564 out.push('"');
565 }
566 None => out.push_str("null"),
567 }
568 out.push_str(",\"jobs\":[");
569 for (idx, job) in self.jobs.iter().enumerate() {
570 if idx > 0 {
571 out.push(',');
572 }
573 out.push_str(&job.to_json());
574 }
575 out.push_str("]}");
576 out
577 }
578}
579
580impl JobEvidence {
581 fn to_json(&self) -> String {
582 let mut out = String::with_capacity(128);
583 out.push_str("{\"job_id\":");
584 let _ = write!(out, "{}", self.job_id);
585 out.push_str(",\"name\":");
586 match &self.name {
587 Some(name) => {
588 out.push('"');
589 out.push_str(&escape_json(name));
590 out.push('"');
591 }
592 None => out.push_str("null"),
593 }
594 out.push_str(",\"estimate_ms\":");
595 let _ = write!(out, "{:.6}", self.estimate_ms);
596 out.push_str(",\"weight\":");
597 let _ = write!(out, "{:.6}", self.weight);
598 out.push_str(",\"ratio\":");
599 let _ = write!(out, "{:.6}", self.ratio);
600 out.push_str(",\"aging_reward\":");
601 let _ = write!(out, "{:.6}", self.aging_reward);
602 out.push_str(",\"starvation_floor\":");
603 let _ = write!(out, "{:.6}", self.starvation_floor);
604 out.push_str(",\"age_ms\":");
605 let _ = write!(out, "{:.6}", self.age_ms);
606 out.push_str(",\"effective_priority\":");
607 let _ = write!(out, "{:.6}", self.effective_priority);
608 out.push_str(",\"objective_loss_proxy\":");
609 let _ = write!(out, "{:.6}", self.objective_loss_proxy);
610 out.push_str(",\"estimate_source\":\"");
611 out.push_str(self.estimate_source.as_str());
612 out.push('"');
613 out.push_str(",\"weight_source\":\"");
614 out.push_str(self.weight_source.as_str());
615 out.push('"');
616 out.push('}');
617 out
618 }
619}
620
621fn escape_json(input: &str) -> String {
625 let mut out = String::with_capacity(input.len() + 8);
626 for ch in input.chars() {
627 match ch {
628 '"' => out.push_str("\\\""),
629 '\\' => out.push_str("\\\\"),
630 '\n' => out.push_str("\\n"),
631 '\r' => out.push_str("\\r"),
632 '\t' => out.push_str("\\t"),
633 '\u{08}' => out.push_str("\\b"),
634 '\u{0C}' => out.push_str("\\f"),
635 c if c < ' ' => {
636 let _ = write!(out, "\\u{:04x}", c as u32);
637 }
638 _ => out.push(ch),
639 }
640 }
641 out
642}
643
644#[derive(Debug, Clone, Default)]
646pub struct SchedulerStats {
647 pub total_submitted: u64,
649
650 pub total_completed: u64,
652
653 pub total_rejected: u64,
655
656 pub total_preemptions: u64,
658
659 pub total_processing_time: f64,
661
662 pub total_response_time: f64,
664
665 pub max_response_time: f64,
667
668 pub queue_length: usize,
670}
671
672impl SchedulerStats {
673 pub fn mean_response_time(&self) -> f64 {
675 if self.total_completed > 0 {
676 self.total_response_time / self.total_completed as f64
677 } else {
678 0.0
679 }
680 }
681
682 pub fn throughput(&self) -> f64 {
684 if self.total_processing_time > 0.0 {
685 self.total_completed as f64 / self.total_processing_time
686 } else {
687 0.0
688 }
689 }
690}
691
692#[derive(Debug)]
694pub struct QueueingScheduler {
695 config: SchedulerConfig,
696
697 queue: BinaryHeap<PriorityJob>,
699
700 current_job: Option<Job>,
702
703 current_time: f64,
705
706 next_job_id: u64,
708
709 next_arrival_seq: u64,
711
712 stats: SchedulerStats,
714}
715
716#[derive(Debug, Clone, Copy)]
717struct PriorityTerms {
718 aging_reward: f64,
719 starvation_floor: f64,
720 effective_priority: f64,
721}
722
723impl QueueingScheduler {
724 pub fn new(config: SchedulerConfig) -> Self {
726 Self {
727 config,
728 queue: BinaryHeap::new(),
729 current_job: None,
730 current_time: 0.0,
731 next_job_id: 1,
732 next_arrival_seq: 1,
733 stats: SchedulerStats::default(),
734 }
735 }
736
737 pub fn submit(&mut self, weight: f64, estimated_time: f64) -> Option<u64> {
741 self.submit_named(weight, estimated_time, None::<&str>)
742 }
743
744 pub fn submit_named(
746 &mut self,
747 weight: f64,
748 estimated_time: f64,
749 name: Option<impl Into<String>>,
750 ) -> Option<u64> {
751 self.submit_with_sources(
752 weight,
753 estimated_time,
754 WeightSource::Explicit,
755 EstimateSource::Explicit,
756 name,
757 )
758 }
759
760 pub fn submit_with_sources(
762 &mut self,
763 weight: f64,
764 estimated_time: f64,
765 weight_source: WeightSource,
766 estimate_source: EstimateSource,
767 name: Option<impl Into<String>>,
768 ) -> Option<u64> {
769 if self.queue.len() >= self.config.max_queue_size {
770 self.stats.total_rejected += 1;
771 return None;
772 }
773
774 let id = self.next_job_id;
775 self.next_job_id += 1;
776
777 let mut job = Job {
779 id,
780 weight,
781 remaining_time: estimated_time,
782 total_time: estimated_time,
783 arrival_time: 0.0,
784 arrival_seq: 0,
785 estimate_source,
786 weight_source,
787 name: None,
788 };
789 job.weight = self.normalize_weight_with_source(job.weight, job.weight_source);
790 job.remaining_time =
791 self.normalize_time_with_source(job.remaining_time, job.estimate_source);
792 job.total_time = job.remaining_time;
793 job.arrival_time = self.current_time;
794 job.arrival_seq = self.next_arrival_seq;
795 self.next_arrival_seq += 1;
796 if let Some(n) = name {
797 job.name = Some(n.into());
798 }
799
800 let priority_job = self.make_priority_job(job);
801 self.queue.push(priority_job);
802
803 self.stats.total_submitted += 1;
804 self.stats.queue_length = self.queue.len();
805
806 if self.config.preemptive {
808 self.maybe_preempt();
809 }
810
811 Some(id)
812 }
813
814 pub fn tick(&mut self, delta_time: f64) -> Vec<u64> {
818 let mut completed = Vec::new();
819 if !delta_time.is_finite() || delta_time <= 0.0 {
820 return completed;
821 }
822
823 let mut remaining_time = delta_time;
824 let mut now = self.current_time;
825 let mut processed_time = 0.0;
826
827 while remaining_time > 0.0 {
828 let Some(mut job) = (if let Some(j) = self.current_job.take() {
830 Some(j)
831 } else {
832 self.queue.pop().map(|pj| pj.job)
833 }) else {
834 now += remaining_time;
835 break; };
837
838 let process_time = remaining_time.min(job.remaining_time);
840 job.remaining_time -= process_time;
841 remaining_time -= process_time;
842 now += process_time;
843 processed_time += process_time;
844
845 if job.is_complete() {
846 let response_time = now - job.arrival_time;
848 self.stats.total_response_time += response_time;
849 self.stats.max_response_time = self.stats.max_response_time.max(response_time);
850 self.stats.total_completed += 1;
851 completed.push(job.id);
852 } else {
853 self.current_job = Some(job);
855 }
856 }
857
858 self.stats.total_processing_time += processed_time;
859 self.current_time = now;
860 self.refresh_priorities();
862
863 self.stats.queue_length = self.queue.len();
864 completed
865 }
866
867 pub fn peek_next(&self) -> Option<&Job> {
869 self.current_job
870 .as_ref()
871 .or_else(|| self.queue.peek().map(|pj| &pj.job))
872 }
873
874 pub fn evidence(&self) -> SchedulingEvidence {
876 let (mean_wait, max_wait) = self.compute_wait_stats();
877
878 let mut candidates: Vec<PriorityJob> = self
879 .queue
880 .iter()
881 .map(|pj| self.make_priority_job(pj.job.clone()))
882 .collect();
883
884 if let Some(ref current) = self.current_job {
885 candidates.push(self.make_priority_job(current.clone()));
886 }
887
888 candidates.sort_by(|a, b| b.cmp(a));
889
890 let selected_job_id = if let Some(ref current) = self.current_job {
891 Some(current.id)
892 } else {
893 candidates.first().map(|pj| pj.job.id)
894 };
895
896 let tie_break_reason = if self.current_job.is_some() {
897 Some(TieBreakReason::Continuation)
898 } else if candidates.len() > 1 {
899 Some(self.tie_break_reason(&candidates[0], &candidates[1]))
900 } else {
901 None
902 };
903
904 let reason = if self.queue.is_empty() && self.current_job.is_none() {
905 SelectionReason::QueueEmpty
906 } else if self.current_job.is_some() {
907 SelectionReason::Continuation
908 } else if self.config.mode() == SchedulingMode::Fifo {
909 SelectionReason::Fifo
910 } else if let Some(pj) = candidates.first() {
911 let wait_time = (self.current_time - pj.job.arrival_time).max(0.0);
912 let aging_contribution = self.config.aging_factor * wait_time;
913 let aging_boost = (self.config.wait_starve_ms > 0.0
914 && wait_time >= self.config.wait_starve_ms)
915 || aging_contribution > pj.base_ratio * 0.5;
916 if aging_boost {
917 SelectionReason::AgingBoost
918 } else if self.config.smith_enabled && pj.job.weight > 1.0 {
919 SelectionReason::HighestWeightedPriority
920 } else {
921 SelectionReason::ShortestRemaining
922 }
923 } else {
924 SelectionReason::QueueEmpty
925 };
926
927 let jobs = candidates
928 .iter()
929 .map(|pj| {
930 let age_ms = (self.current_time - pj.job.arrival_time).max(0.0);
931 let terms = self.compute_priority_terms(&pj.job);
932 JobEvidence {
933 job_id: pj.job.id,
934 name: pj.job.name.clone(),
935 estimate_ms: pj.job.remaining_time,
936 weight: pj.job.weight,
937 ratio: pj.base_ratio,
938 aging_reward: terms.aging_reward,
939 starvation_floor: terms.starvation_floor,
940 age_ms,
941 effective_priority: pj.priority,
942 objective_loss_proxy: 1.0 / pj.priority.max(self.config.w_min),
943 estimate_source: pj.job.estimate_source,
944 weight_source: pj.job.weight_source,
945 }
946 })
947 .collect();
948
949 SchedulingEvidence {
950 current_time: self.current_time,
951 selected_job_id,
952 queue_length: self.queue.len() + if self.current_job.is_some() { 1 } else { 0 },
953 mean_wait_time: mean_wait,
954 max_wait_time: max_wait,
955 reason,
956 tie_break_reason,
957 jobs,
958 }
959 }
960
961 pub fn stats(&self) -> SchedulerStats {
963 let mut stats = self.stats.clone();
964 stats.queue_length = self.queue.len() + if self.current_job.is_some() { 1 } else { 0 };
965 stats
966 }
967
968 #[must_use]
970 pub const fn max_queue_size(&self) -> usize {
971 self.config.max_queue_size
972 }
973
974 pub fn cancel(&mut self, job_id: u64) -> bool {
976 if let Some(ref j) = self.current_job
978 && j.id == job_id
979 {
980 self.current_job = None;
981 self.stats.queue_length = self.queue.len();
982 return true;
983 }
984
985 let old_len = self.queue.len();
987 let jobs: Vec<_> = self
988 .queue
989 .drain()
990 .filter(|pj| pj.job.id != job_id)
991 .collect();
992 self.queue = jobs.into_iter().collect();
993
994 self.stats.queue_length = self.queue.len();
995 old_len != self.queue.len()
996 }
997
998 pub fn clear(&mut self) {
1000 self.queue.clear();
1001 self.current_job = None;
1002 self.stats.queue_length = 0;
1003 }
1004
1005 pub fn reset(&mut self) {
1007 self.queue.clear();
1008 self.current_job = None;
1009 self.current_time = 0.0;
1010 self.next_job_id = 1;
1011 self.next_arrival_seq = 1;
1012 self.stats = SchedulerStats::default();
1013 }
1014
1015 fn normalize_weight(&self, weight: f64) -> f64 {
1019 if weight.is_nan() {
1020 return self.config.w_min;
1021 }
1022 if weight.is_infinite() {
1023 return if weight.is_sign_positive() {
1024 self.config.w_max
1025 } else {
1026 self.config.w_min
1027 };
1028 }
1029 weight.clamp(self.config.w_min, self.config.w_max)
1030 }
1031
1032 fn normalize_time(&self, estimate_ms: f64) -> f64 {
1034 if estimate_ms.is_nan() {
1035 return self.config.p_max_ms;
1036 }
1037 if estimate_ms.is_infinite() {
1038 return if estimate_ms.is_sign_positive() {
1039 self.config.p_max_ms
1040 } else {
1041 self.config.p_min_ms
1042 };
1043 }
1044 estimate_ms.clamp(self.config.p_min_ms, self.config.p_max_ms)
1045 }
1046
1047 fn normalize_weight_with_source(&self, weight: f64, source: WeightSource) -> f64 {
1049 let resolved = match source {
1050 WeightSource::Explicit => weight,
1051 WeightSource::Default => self.config.weight_default,
1052 WeightSource::Unknown => self.config.weight_unknown,
1053 };
1054 self.normalize_weight(resolved)
1055 }
1056
1057 fn normalize_time_with_source(&self, estimate_ms: f64, source: EstimateSource) -> f64 {
1059 let resolved = match source {
1060 EstimateSource::Explicit | EstimateSource::Historical => estimate_ms,
1061 EstimateSource::Default => self.config.estimate_default_ms,
1062 EstimateSource::Unknown => self.config.estimate_unknown_ms,
1063 };
1064 self.normalize_time(resolved)
1065 }
1066
1067 fn compute_base_ratio(&self, job: &Job) -> f64 {
1069 if self.config.mode() == SchedulingMode::Fifo {
1070 return 0.0;
1071 }
1072 let remaining = job.remaining_time.max(self.config.p_min_ms);
1073 let weight = match self.config.mode() {
1074 SchedulingMode::Smith => job.weight,
1075 SchedulingMode::Srpt => 1.0,
1076 SchedulingMode::Fifo => 0.0,
1077 };
1078 weight / remaining
1079 }
1080
1081 fn compute_priority_terms(&self, job: &Job) -> PriorityTerms {
1089 if self.config.mode() == SchedulingMode::Fifo {
1090 return PriorityTerms {
1091 aging_reward: 0.0,
1092 starvation_floor: 0.0,
1093 effective_priority: 0.0,
1094 };
1095 }
1096
1097 let base_ratio = self.compute_base_ratio(job);
1098 let wait_time = (self.current_time - job.arrival_time).max(0.0);
1099 let aging_reward = self.config.aging_factor * wait_time;
1100 let starvation_floor =
1101 if self.config.wait_starve_ms > 0.0 && wait_time >= self.config.wait_starve_ms {
1102 base_ratio * self.config.starve_boost_ratio
1103 } else {
1104 0.0
1105 };
1106
1107 let effective_priority = (base_ratio + aging_reward).max(starvation_floor);
1108
1109 PriorityTerms {
1110 aging_reward,
1111 starvation_floor,
1112 effective_priority,
1113 }
1114 }
1115
1116 fn compute_priority(&self, job: &Job) -> f64 {
1118 self.compute_priority_terms(job).effective_priority
1119 }
1120
1121 fn make_priority_job(&self, job: Job) -> PriorityJob {
1123 let base_ratio = self.compute_base_ratio(&job);
1124 let priority = self.compute_priority(&job);
1125 PriorityJob {
1126 priority,
1127 base_ratio,
1128 job,
1129 mode: self.config.mode(),
1130 }
1131 }
1132
1133 fn tie_break_reason(&self, a: &PriorityJob, b: &PriorityJob) -> TieBreakReason {
1135 if self.config.mode() == SchedulingMode::Fifo {
1136 if a.job.arrival_seq != b.job.arrival_seq {
1137 return TieBreakReason::ArrivalSeq;
1138 }
1139 return TieBreakReason::JobId;
1140 }
1141 if a.priority.total_cmp(&b.priority) != Ordering::Equal {
1142 TieBreakReason::EffectivePriority
1143 } else if a.base_ratio.total_cmp(&b.base_ratio) != Ordering::Equal {
1144 TieBreakReason::BaseRatio
1145 } else if a.job.weight.total_cmp(&b.job.weight) != Ordering::Equal {
1146 TieBreakReason::Weight
1147 } else if a.job.remaining_time.total_cmp(&b.job.remaining_time) != Ordering::Equal {
1148 TieBreakReason::RemainingTime
1149 } else if a.job.arrival_seq != b.job.arrival_seq {
1150 TieBreakReason::ArrivalSeq
1151 } else {
1152 TieBreakReason::JobId
1153 }
1154 }
1155
1156 fn maybe_preempt(&mut self) {
1158 if self.config.mode() == SchedulingMode::Fifo {
1159 return;
1160 }
1161 if let Some(ref current) = self.current_job
1162 && let Some(pj) = self.queue.peek()
1163 {
1164 let current_pj = self.make_priority_job(current.clone());
1165 if pj.cmp(¤t_pj) == Ordering::Greater {
1166 let old = self
1168 .current_job
1169 .take()
1170 .expect("current_job guaranteed by if-let guard");
1171 let priority_job = self.make_priority_job(old);
1172 self.queue.push(priority_job);
1173 self.stats.total_preemptions += 1;
1174 }
1175 }
1176 }
1177
1178 fn refresh_priorities(&mut self) {
1180 let jobs: Vec<_> = self.queue.drain().map(|pj| pj.job).collect();
1181 for job in jobs {
1182 let priority_job = self.make_priority_job(job);
1183 self.queue.push(priority_job);
1184 }
1185 }
1186
1187 fn compute_wait_stats(&self) -> (f64, f64) {
1189 let mut total_wait = 0.0;
1190 let mut max_wait = 0.0f64;
1191 let mut count = 0;
1192
1193 for pj in self.queue.iter() {
1194 let wait = (self.current_time - pj.job.arrival_time).max(0.0);
1195 total_wait += wait;
1196 max_wait = max_wait.max(wait);
1197 count += 1;
1198 }
1199
1200 if let Some(ref j) = self.current_job {
1201 let wait = (self.current_time - j.arrival_time).max(0.0);
1202 total_wait += wait;
1203 max_wait = max_wait.max(wait);
1204 count += 1;
1205 }
1206
1207 let mean = if count > 0 {
1208 total_wait / count as f64
1209 } else {
1210 0.0
1211 };
1212 (mean, max_wait)
1213 }
1214}
1215
1216#[cfg(test)]
1221mod tests {
1222 use super::*;
1223 use std::collections::HashMap;
1224
1225 fn test_config() -> SchedulerConfig {
1226 SchedulerConfig {
1227 aging_factor: 0.001,
1228 p_min_ms: DEFAULT_P_MIN_MS,
1229 p_max_ms: DEFAULT_P_MAX_MS,
1230 estimate_default_ms: DEFAULT_ESTIMATE_DEFAULT_MS,
1231 estimate_unknown_ms: DEFAULT_ESTIMATE_UNKNOWN_MS,
1232 w_min: DEFAULT_W_MIN,
1233 w_max: DEFAULT_W_MAX,
1234 weight_default: DEFAULT_WEIGHT_DEFAULT,
1235 weight_unknown: DEFAULT_WEIGHT_UNKNOWN,
1236 wait_starve_ms: DEFAULT_WAIT_STARVE_MS,
1237 starve_boost_ratio: DEFAULT_STARVE_BOOST_RATIO,
1238 smith_enabled: true,
1239 force_fifo: false,
1240 max_queue_size: 100,
1241 preemptive: true,
1242 time_quantum: 10.0,
1243 enable_logging: false,
1244 }
1245 }
1246
1247 #[derive(Clone, Copy, Debug)]
1248 struct WorkloadJob {
1249 arrival: u64,
1250 weight: f64,
1251 duration: f64,
1252 }
1253
1254 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
1255 enum SimPolicy {
1256 Smith,
1257 Fifo,
1258 }
1259
1260 #[derive(Debug)]
1261 struct SimulationMetrics {
1262 mean: f64,
1263 p95: f64,
1264 p99: f64,
1265 max: f64,
1266 job_count: usize,
1267 completion_order: Vec<u64>,
1268 }
1269
1270 fn mixed_workload() -> Vec<WorkloadJob> {
1271 let mut jobs = Vec::new();
1272 jobs.push(WorkloadJob {
1273 arrival: 0,
1274 weight: 1.0,
1275 duration: 100.0,
1276 });
1277 for t in 1..=200u64 {
1278 jobs.push(WorkloadJob {
1279 arrival: t,
1280 weight: 1.0,
1281 duration: 1.0,
1282 });
1283 }
1284 jobs
1285 }
1286
1287 fn percentile(sorted: &[f64], p: f64) -> f64 {
1288 if sorted.is_empty() {
1289 return 0.0;
1290 }
1291 let idx = ((sorted.len() as f64 - 1.0) * p).ceil() as usize;
1292 sorted[idx.min(sorted.len() - 1)]
1293 }
1294
1295 fn summary_json(policy: SimPolicy, metrics: &SimulationMetrics) -> String {
1296 let policy = match policy {
1297 SimPolicy::Smith => "Smith",
1298 SimPolicy::Fifo => "Fifo",
1299 };
1300 let head: Vec<String> = metrics
1301 .completion_order
1302 .iter()
1303 .take(8)
1304 .map(|id| id.to_string())
1305 .collect();
1306 let tail: Vec<String> = metrics
1307 .completion_order
1308 .iter()
1309 .rev()
1310 .take(3)
1311 .collect::<Vec<_>>()
1312 .into_iter()
1313 .rev()
1314 .map(|id| id.to_string())
1315 .collect();
1316 format!(
1317 "{{\"policy\":\"{policy}\",\"jobs\":{jobs},\"mean\":{mean:.3},\"p95\":{p95:.3},\"p99\":{p99:.3},\"max\":{max:.3},\"order_head\":[{head}],\"order_tail\":[{tail}]}}",
1318 policy = policy,
1319 jobs = metrics.job_count,
1320 mean = metrics.mean,
1321 p95 = metrics.p95,
1322 p99 = metrics.p99,
1323 max = metrics.max,
1324 head = head.join(","),
1325 tail = tail.join(",")
1326 )
1327 }
1328
1329 fn workload_summary_json(workload: &[WorkloadJob]) -> String {
1330 if workload.is_empty() {
1331 return "{\"workload\":\"empty\"}".to_string();
1332 }
1333 let mut min_arrival = u64::MAX;
1334 let mut max_arrival = 0u64;
1335 let mut min_duration = f64::INFINITY;
1336 let mut max_duration: f64 = 0.0;
1337 let mut total_work: f64 = 0.0;
1338 let mut long_jobs = 0usize;
1339 let long_threshold = 10.0;
1340
1341 for job in workload {
1342 min_arrival = min_arrival.min(job.arrival);
1343 max_arrival = max_arrival.max(job.arrival);
1344 min_duration = min_duration.min(job.duration);
1345 max_duration = max_duration.max(job.duration);
1346 total_work += job.duration;
1347 if job.duration >= long_threshold {
1348 long_jobs += 1;
1349 }
1350 }
1351
1352 format!(
1353 "{{\"workload\":\"mixed\",\"jobs\":{jobs},\"arrival_min\":{arrival_min},\"arrival_max\":{arrival_max},\"duration_min\":{duration_min:.3},\"duration_max\":{duration_max:.3},\"total_work\":{total_work:.3},\"long_jobs\":{long_jobs},\"long_threshold\":{long_threshold:.1}}}",
1354 jobs = workload.len(),
1355 arrival_min = min_arrival,
1356 arrival_max = max_arrival,
1357 duration_min = min_duration,
1358 duration_max = max_duration,
1359 total_work = total_work,
1360 long_jobs = long_jobs,
1361 long_threshold = long_threshold
1362 )
1363 }
1364
1365 fn simulate_policy(policy: SimPolicy, workload: &[WorkloadJob]) -> SimulationMetrics {
1366 let mut config = test_config();
1367 config.aging_factor = 0.0;
1368 config.wait_starve_ms = 0.0;
1369 config.starve_boost_ratio = 1.0;
1370 config.smith_enabled = policy == SimPolicy::Smith;
1371 config.force_fifo = policy == SimPolicy::Fifo;
1372 config.preemptive = true;
1373
1374 let mut scheduler = QueueingScheduler::new(config);
1375 let mut arrivals = workload.to_vec();
1376 arrivals.sort_by_key(|job| job.arrival);
1377
1378 let mut arrival_times: HashMap<u64, f64> = HashMap::new();
1379 let mut response_times = Vec::with_capacity(arrivals.len());
1380 let mut completion_order = Vec::with_capacity(arrivals.len());
1381
1382 let mut idx = 0usize;
1383 let mut safety = 0usize;
1384
1385 while (idx < arrivals.len() || scheduler.peek_next().is_some()) && safety < 10_000 {
1386 let now = scheduler.current_time;
1387
1388 while idx < arrivals.len() && (arrivals[idx].arrival as f64) <= now + f64::EPSILON {
1389 let job = arrivals[idx];
1390 let id = scheduler
1391 .submit(job.weight, job.duration)
1392 .expect("queue capacity should not be exceeded");
1393 arrival_times.insert(id, scheduler.current_time);
1394 idx += 1;
1395 }
1396
1397 if scheduler.peek_next().is_none() {
1398 if idx < arrivals.len() {
1399 let next_time = arrivals[idx].arrival as f64;
1400 let delta = (next_time - scheduler.current_time).max(0.0);
1401 let completed = scheduler.tick(delta);
1402 for id in completed {
1403 let arrival = arrival_times.get(&id).copied().unwrap_or(0.0);
1404 response_times.push(scheduler.current_time - arrival);
1405 completion_order.push(id);
1406 }
1407 }
1408 safety += 1;
1409 continue;
1410 }
1411
1412 let completed = scheduler.tick(1.0);
1413 for id in completed {
1414 let arrival = arrival_times.get(&id).copied().unwrap_or(0.0);
1415 response_times.push(scheduler.current_time - arrival);
1416 completion_order.push(id);
1417 }
1418 safety += 1;
1419 }
1420
1421 assert_eq!(
1422 response_times.len(),
1423 arrivals.len(),
1424 "simulation did not complete all jobs"
1425 );
1426
1427 let mut sorted = response_times.clone();
1428 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
1429
1430 let mean = response_times.iter().sum::<f64>() / response_times.len() as f64;
1431 let p95 = percentile(&sorted, 0.95);
1432 let p99 = percentile(&sorted, 0.99);
1433 let max = *sorted.last().unwrap_or(&0.0);
1434
1435 SimulationMetrics {
1436 mean,
1437 p95,
1438 p99,
1439 max,
1440 job_count: response_times.len(),
1441 completion_order,
1442 }
1443 }
1444
1445 #[test]
1450 fn new_creates_empty_scheduler() {
1451 let scheduler = QueueingScheduler::new(test_config());
1452 assert_eq!(scheduler.stats().queue_length, 0);
1453 assert!(scheduler.peek_next().is_none());
1454 }
1455
1456 #[test]
1457 fn default_config_valid() {
1458 let config = SchedulerConfig::default();
1459 let scheduler = QueueingScheduler::new(config);
1460 assert_eq!(scheduler.stats().queue_length, 0);
1461 }
1462
1463 #[test]
1468 fn submit_returns_job_id() {
1469 let mut scheduler = QueueingScheduler::new(test_config());
1470 let id = scheduler.submit(1.0, 10.0);
1471 assert_eq!(id, Some(1));
1472 }
1473
1474 #[test]
1475 fn submit_increments_job_id() {
1476 let mut scheduler = QueueingScheduler::new(test_config());
1477 let id1 = scheduler.submit(1.0, 10.0);
1478 let id2 = scheduler.submit(1.0, 10.0);
1479 assert_eq!(id1, Some(1));
1480 assert_eq!(id2, Some(2));
1481 }
1482
1483 #[test]
1484 fn submit_rejects_when_queue_full() {
1485 let mut config = test_config();
1486 config.max_queue_size = 2;
1487 let mut scheduler = QueueingScheduler::new(config);
1488
1489 assert!(scheduler.submit(1.0, 10.0).is_some());
1490 assert!(scheduler.submit(1.0, 10.0).is_some());
1491 assert!(scheduler.submit(1.0, 10.0).is_none()); assert_eq!(scheduler.stats().total_rejected, 1);
1493 }
1494
1495 #[test]
1496 fn submit_named_job() {
1497 let mut scheduler = QueueingScheduler::new(test_config());
1498 let id = scheduler.submit_named(1.0, 10.0, Some("test-job"));
1499 assert!(id.is_some());
1500 }
1501
1502 #[test]
1507 fn srpt_prefers_shorter_jobs() {
1508 let mut scheduler = QueueingScheduler::new(test_config());
1509
1510 scheduler.submit(1.0, 100.0); scheduler.submit(1.0, 10.0); let next = scheduler.peek_next().unwrap();
1514 assert_eq!(next.remaining_time, 10.0); }
1516
1517 #[test]
1518 fn smith_rule_prefers_high_weight() {
1519 let mut scheduler = QueueingScheduler::new(test_config());
1520
1521 scheduler.submit(1.0, 10.0); scheduler.submit(10.0, 10.0); let next = scheduler.peek_next().unwrap();
1525 assert_eq!(next.weight, 10.0); }
1527
1528 #[test]
1529 fn smith_rule_balances_weight_and_time() {
1530 let mut scheduler = QueueingScheduler::new(test_config());
1531
1532 scheduler.submit(2.0, 20.0); scheduler.submit(1.0, 5.0); let next = scheduler.peek_next().unwrap();
1536 assert_eq!(next.remaining_time, 5.0); }
1538
1539 #[test]
1544 fn aging_increases_priority_over_time() {
1545 let mut scheduler = QueueingScheduler::new(test_config());
1546
1547 scheduler.submit(1.0, 100.0); scheduler.tick(0.0); let before_aging = scheduler.compute_priority(scheduler.peek_next().unwrap());
1551
1552 scheduler.current_time = 100.0; scheduler.refresh_priorities();
1554
1555 let after_aging = scheduler.compute_priority(scheduler.peek_next().unwrap());
1556 assert!(
1557 after_aging > before_aging,
1558 "Priority should increase with wait time"
1559 );
1560 }
1561
1562 #[test]
1563 fn aging_prevents_starvation() {
1564 let mut config = test_config();
1565 config.aging_factor = 1.0; let mut scheduler = QueueingScheduler::new(config);
1567
1568 scheduler.submit(1.0, 1000.0); scheduler.submit(1.0, 1.0); assert_eq!(scheduler.peek_next().unwrap().remaining_time, 1.0);
1573
1574 let completed = scheduler.tick(1.0);
1576 assert_eq!(completed.len(), 1);
1577
1578 assert!(scheduler.peek_next().is_some());
1579 }
1580
1581 #[test]
1586 fn preemption_when_higher_priority_arrives() {
1587 let mut scheduler = QueueingScheduler::new(test_config());
1588
1589 scheduler.submit(1.0, 100.0); scheduler.tick(10.0); let before = scheduler.peek_next().unwrap().remaining_time;
1593 assert_eq!(before, 90.0);
1594
1595 scheduler.submit(1.0, 5.0); let next = scheduler.peek_next().unwrap();
1599 assert_eq!(next.remaining_time, 5.0);
1600
1601 assert_eq!(scheduler.stats().total_preemptions, 1);
1603 }
1604
1605 #[test]
1606 fn no_preemption_when_disabled() {
1607 let mut config = test_config();
1608 config.preemptive = false;
1609 let mut scheduler = QueueingScheduler::new(config);
1610
1611 scheduler.submit(1.0, 100.0);
1612 scheduler.tick(10.0);
1613
1614 scheduler.submit(1.0, 5.0); let next = scheduler.peek_next().unwrap();
1618 assert_eq!(next.remaining_time, 90.0);
1619 }
1620
1621 #[test]
1626 fn tick_processes_jobs() {
1627 let mut scheduler = QueueingScheduler::new(test_config());
1628
1629 scheduler.submit(1.0, 10.0);
1630 let completed = scheduler.tick(5.0);
1631
1632 assert!(completed.is_empty()); assert_eq!(scheduler.peek_next().unwrap().remaining_time, 5.0);
1634 }
1635
1636 #[test]
1637 fn tick_completes_jobs() {
1638 let mut scheduler = QueueingScheduler::new(test_config());
1639
1640 scheduler.submit(1.0, 10.0);
1641 let completed = scheduler.tick(10.0);
1642
1643 assert_eq!(completed.len(), 1);
1644 assert_eq!(completed[0], 1);
1645 assert!(scheduler.peek_next().is_none());
1646 }
1647
1648 #[test]
1649 fn tick_completes_multiple_jobs() {
1650 let mut scheduler = QueueingScheduler::new(test_config());
1651
1652 scheduler.submit(1.0, 5.0);
1653 scheduler.submit(1.0, 5.0);
1654 let completed = scheduler.tick(10.0);
1655
1656 assert_eq!(completed.len(), 2);
1657 }
1658
1659 #[test]
1660 fn tick_handles_zero_delta() {
1661 let mut scheduler = QueueingScheduler::new(test_config());
1662 scheduler.submit(1.0, 10.0);
1663 let completed = scheduler.tick(0.0);
1664 assert!(completed.is_empty());
1665 }
1666
1667 #[test]
1672 fn stats_track_submissions() {
1673 let mut scheduler = QueueingScheduler::new(test_config());
1674
1675 scheduler.submit(1.0, 10.0);
1676 scheduler.submit(1.0, 10.0);
1677
1678 let stats = scheduler.stats();
1679 assert_eq!(stats.total_submitted, 2);
1680 assert_eq!(stats.queue_length, 2);
1681 }
1682
1683 #[test]
1684 fn stats_track_completions() {
1685 let mut scheduler = QueueingScheduler::new(test_config());
1686
1687 scheduler.submit(1.0, 10.0);
1688 scheduler.tick(10.0);
1689
1690 let stats = scheduler.stats();
1691 assert_eq!(stats.total_completed, 1);
1692 }
1693
1694 #[test]
1695 fn stats_compute_mean_response_time() {
1696 let mut scheduler = QueueingScheduler::new(test_config());
1697
1698 scheduler.submit(1.0, 10.0);
1699 scheduler.submit(1.0, 10.0);
1700 scheduler.tick(20.0);
1701
1702 let stats = scheduler.stats();
1703 assert_eq!(stats.total_completed, 2);
1706 assert!(stats.mean_response_time() > 0.0);
1707 }
1708
1709 #[test]
1710 fn stats_compute_throughput() {
1711 let mut scheduler = QueueingScheduler::new(test_config());
1712
1713 scheduler.submit(1.0, 10.0);
1714 scheduler.tick(10.0);
1715
1716 let stats = scheduler.stats();
1717 assert!((stats.throughput() - 0.1).abs() < 0.01);
1719 }
1720
1721 #[test]
1726 fn evidence_reports_queue_empty() {
1727 let scheduler = QueueingScheduler::new(test_config());
1728 let evidence = scheduler.evidence();
1729 assert_eq!(evidence.reason, SelectionReason::QueueEmpty);
1730 assert!(evidence.selected_job_id.is_none());
1731 assert!(evidence.tie_break_reason.is_none());
1732 assert!(evidence.jobs.is_empty());
1733 }
1734
1735 #[test]
1736 fn evidence_reports_selected_job() {
1737 let mut scheduler = QueueingScheduler::new(test_config());
1738 scheduler.submit(1.0, 10.0);
1739 let evidence = scheduler.evidence();
1740 assert_eq!(evidence.selected_job_id, Some(1));
1741 assert_eq!(evidence.jobs.len(), 1);
1742 }
1743
1744 #[test]
1745 fn evidence_reports_wait_stats() {
1746 let mut scheduler = QueueingScheduler::new(test_config());
1747 scheduler.submit(1.0, 100.0);
1748 scheduler.submit(1.0, 100.0);
1749 scheduler.current_time = 50.0;
1750 scheduler.refresh_priorities();
1751
1752 let evidence = scheduler.evidence();
1753 assert!(evidence.mean_wait_time > 0.0);
1754 assert!(evidence.max_wait_time > 0.0);
1755 }
1756
1757 #[test]
1758 fn evidence_reports_priority_objective_terms() {
1759 let mut config = test_config();
1760 config.aging_factor = 0.5;
1761 config.wait_starve_ms = 10.0;
1762 config.starve_boost_ratio = 2.0;
1763 let mut scheduler = QueueingScheduler::new(config);
1764
1765 scheduler.submit(1.0, 20.0);
1766 scheduler.current_time = 20.0;
1767 scheduler.refresh_priorities();
1768
1769 let evidence = scheduler.evidence();
1770 let job = evidence.jobs.first().expect("job evidence");
1771 assert!(job.aging_reward > 0.0);
1772 assert!(job.starvation_floor > 0.0);
1773 assert!(job.effective_priority >= job.ratio + job.aging_reward);
1774 assert!(
1775 (job.objective_loss_proxy - (1.0 / job.effective_priority.max(DEFAULT_W_MIN))).abs()
1776 < 1e-9
1777 );
1778 }
1779
1780 #[test]
1785 fn force_fifo_overrides_priority() {
1786 let mut config = test_config();
1787 config.force_fifo = true;
1788 let mut scheduler = QueueingScheduler::new(config);
1789
1790 let first = scheduler.submit(1.0, 100.0).unwrap();
1791 let second = scheduler.submit(10.0, 1.0).unwrap();
1792
1793 let next = scheduler.peek_next().unwrap();
1794 assert_eq!(next.id, first);
1795 assert_ne!(next.id, second);
1796 assert_eq!(scheduler.evidence().reason, SelectionReason::Fifo);
1797 }
1798
1799 #[test]
1800 fn default_sources_use_config_values() {
1801 let mut config = test_config();
1802 config.weight_default = 7.0;
1803 config.estimate_default_ms = 12.0;
1804 let mut scheduler = QueueingScheduler::new(config);
1805
1806 scheduler.submit_with_sources(
1807 999.0,
1808 999.0,
1809 WeightSource::Default,
1810 EstimateSource::Default,
1811 None::<&str>,
1812 );
1813
1814 let next = scheduler.peek_next().unwrap();
1815 assert!((next.weight - 7.0).abs() < f64::EPSILON);
1816 assert!((next.remaining_time - 12.0).abs() < f64::EPSILON);
1817 }
1818
1819 #[test]
1820 fn unknown_sources_use_config_values() {
1821 let mut config = test_config();
1822 config.weight_unknown = 2.5;
1823 config.estimate_unknown_ms = 250.0;
1824 let mut scheduler = QueueingScheduler::new(config);
1825
1826 scheduler.submit_with_sources(
1827 0.0,
1828 0.0,
1829 WeightSource::Unknown,
1830 EstimateSource::Unknown,
1831 None::<&str>,
1832 );
1833
1834 let next = scheduler.peek_next().unwrap();
1835 assert!((next.weight - 2.5).abs() < f64::EPSILON);
1836 assert!((next.remaining_time - 250.0).abs() < f64::EPSILON);
1837 }
1838
1839 #[test]
1844 fn tie_break_prefers_base_ratio_when_effective_equal() {
1845 let mut config = test_config();
1846 config.aging_factor = 0.1;
1847 let mut scheduler = QueueingScheduler::new(config);
1848
1849 let id_a = scheduler.submit(1.0, 2.0).unwrap(); scheduler.current_time = 5.0;
1852 scheduler.refresh_priorities();
1853
1854 let id_b = scheduler.submit(1.0, 1.0).unwrap(); scheduler.refresh_priorities();
1857
1858 let next = scheduler.peek_next().unwrap();
1859 assert_eq!(next.id, id_b);
1860
1861 let evidence = scheduler.evidence();
1862 assert_eq!(evidence.selected_job_id, Some(id_b));
1863 assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::BaseRatio));
1864 assert_ne!(id_a, id_b);
1865 }
1866
1867 #[test]
1868 fn tie_break_prefers_weight_over_arrival() {
1869 let mut scheduler = QueueingScheduler::new(test_config());
1870
1871 let high_weight = scheduler.submit(2.0, 2.0).unwrap(); let _low_weight = scheduler.submit(1.0, 1.0).unwrap(); let evidence = scheduler.evidence();
1875 assert_eq!(evidence.selected_job_id, Some(high_weight));
1876 assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::Weight));
1877 }
1878
1879 #[test]
1880 fn tie_break_prefers_arrival_seq_when_all_equal() {
1881 let mut config = test_config();
1882 config.aging_factor = 0.0;
1883 let mut scheduler = QueueingScheduler::new(config);
1884
1885 let first = scheduler.submit(1.0, 10.0).unwrap();
1886 let second = scheduler.submit(1.0, 10.0).unwrap();
1887
1888 let evidence = scheduler.evidence();
1889 assert_eq!(evidence.selected_job_id, Some(first));
1890 assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::ArrivalSeq));
1891 assert_ne!(first, second);
1892 }
1893
1894 #[test]
1899 fn srpt_mode_ignores_weights() {
1900 let mut config = test_config();
1901 config.smith_enabled = false;
1902 let mut scheduler = QueueingScheduler::new(config);
1903
1904 scheduler.submit(10.0, 100.0); scheduler.submit(1.0, 10.0); let next = scheduler.peek_next().unwrap();
1908 assert_eq!(next.remaining_time, 10.0);
1909 assert_eq!(
1910 scheduler.evidence().reason,
1911 SelectionReason::ShortestRemaining
1912 );
1913 }
1914
1915 #[test]
1916 fn fifo_mode_disables_preemption() {
1917 let mut config = test_config();
1918 config.force_fifo = true;
1919 config.preemptive = true;
1920 let mut scheduler = QueueingScheduler::new(config);
1921
1922 let first = scheduler.submit(1.0, 100.0).unwrap();
1923 scheduler.tick(10.0);
1924
1925 let _later = scheduler.submit(10.0, 1.0).unwrap();
1926 let next = scheduler.peek_next().unwrap();
1927 assert_eq!(next.id, first);
1928 }
1929
1930 #[test]
1931 fn explicit_zero_weight_clamps_to_min() {
1932 let mut config = test_config();
1933 config.w_min = 0.5;
1934 let mut scheduler = QueueingScheduler::new(config);
1935
1936 scheduler.submit_with_sources(
1937 0.0,
1938 1.0,
1939 WeightSource::Explicit,
1940 EstimateSource::Explicit,
1941 None::<&str>,
1942 );
1943
1944 let next = scheduler.peek_next().unwrap();
1945 assert!((next.weight - 0.5).abs() < f64::EPSILON);
1946 }
1947
1948 #[test]
1949 fn explicit_zero_estimate_clamps_to_min() {
1950 let mut config = test_config();
1951 config.p_min_ms = 2.0;
1952 let mut scheduler = QueueingScheduler::new(config);
1953
1954 scheduler.submit_with_sources(
1955 1.0,
1956 0.0,
1957 WeightSource::Explicit,
1958 EstimateSource::Explicit,
1959 None::<&str>,
1960 );
1961
1962 let next = scheduler.peek_next().unwrap();
1963 assert!((next.remaining_time - 2.0).abs() < f64::EPSILON);
1964 }
1965
1966 #[test]
1967 fn explicit_weight_honors_config_w_max_above_defaults() {
1968 let mut config = test_config();
1969 config.w_max = 50.0;
1970 let mut scheduler = QueueingScheduler::new(config);
1971
1972 scheduler.submit_with_sources(
1973 20.0,
1974 1.0,
1975 WeightSource::Explicit,
1976 EstimateSource::Explicit,
1977 None::<&str>,
1978 );
1979
1980 let next = scheduler.peek_next().unwrap();
1981 assert!((next.weight - 20.0).abs() < f64::EPSILON);
1982 }
1983
1984 #[test]
1985 fn explicit_estimate_honors_config_p_max_above_defaults() {
1986 let mut config = test_config();
1987 config.p_max_ms = 100_000.0;
1988 let mut scheduler = QueueingScheduler::new(config);
1989
1990 scheduler.submit_with_sources(
1991 1.0,
1992 50_000.0,
1993 WeightSource::Explicit,
1994 EstimateSource::Explicit,
1995 None::<&str>,
1996 );
1997
1998 let next = scheduler.peek_next().unwrap();
1999 assert!((next.remaining_time - 50_000.0).abs() < f64::EPSILON);
2000 }
2001
2002 #[test]
2007 fn cancel_removes_job() {
2008 let mut scheduler = QueueingScheduler::new(test_config());
2009 let id = scheduler.submit(1.0, 10.0).unwrap();
2010
2011 assert!(scheduler.cancel(id));
2012 assert!(scheduler.peek_next().is_none());
2013 }
2014
2015 #[test]
2016 fn cancel_returns_false_for_nonexistent() {
2017 let mut scheduler = QueueingScheduler::new(test_config());
2018 assert!(!scheduler.cancel(999));
2019 }
2020
2021 #[test]
2026 fn reset_clears_all_state() {
2027 let mut scheduler = QueueingScheduler::new(test_config());
2028
2029 scheduler.submit(1.0, 10.0);
2030 scheduler.tick(5.0);
2031
2032 scheduler.reset();
2033
2034 assert!(scheduler.peek_next().is_none());
2035 assert_eq!(scheduler.stats().total_submitted, 0);
2036 assert_eq!(scheduler.stats().total_completed, 0);
2037 }
2038
2039 #[test]
2040 fn clear_removes_jobs_but_keeps_stats() {
2041 let mut scheduler = QueueingScheduler::new(test_config());
2042
2043 scheduler.submit(1.0, 10.0);
2044 scheduler.clear();
2045
2046 assert!(scheduler.peek_next().is_none());
2047 assert_eq!(scheduler.stats().total_submitted, 1); }
2049
2050 #[test]
2055 fn job_progress_increases() {
2056 let mut job = Job::new(1, 1.0, 100.0);
2057 assert_eq!(job.progress(), 0.0);
2058
2059 job.remaining_time = 50.0;
2060 assert!((job.progress() - 0.5).abs() < 0.01);
2061
2062 job.remaining_time = 0.0;
2063 assert_eq!(job.progress(), 1.0);
2064 }
2065
2066 #[test]
2067 fn job_is_complete() {
2068 let mut job = Job::new(1, 1.0, 10.0);
2069 assert!(!job.is_complete());
2070
2071 job.remaining_time = 0.0;
2072 assert!(job.is_complete());
2073 }
2074
2075 #[test]
2080 fn property_work_conserving() {
2081 let mut scheduler = QueueingScheduler::new(test_config());
2082
2083 for i in 0..10 {
2085 scheduler.submit(1.0, (i as f64) + 1.0);
2086 }
2087
2088 let mut total_processed = 0;
2090 while scheduler.peek_next().is_some() {
2091 let completed = scheduler.tick(1.0);
2092 total_processed += completed.len();
2093 }
2094
2095 assert_eq!(total_processed, 10);
2096 }
2097
2098 #[test]
2099 fn property_bounded_memory() {
2100 let mut config = test_config();
2101 config.max_queue_size = 100;
2102 let mut scheduler = QueueingScheduler::new(config);
2103
2104 for _ in 0..1000 {
2106 scheduler.submit(1.0, 10.0);
2107 }
2108
2109 assert!(scheduler.stats().queue_length <= 100);
2110 }
2111
2112 #[test]
2113 fn property_deterministic() {
2114 let run = || {
2115 let mut scheduler = QueueingScheduler::new(test_config());
2116 let mut completions = Vec::new();
2117
2118 for i in 0..20 {
2119 scheduler.submit(((i % 3) + 1) as f64, ((i % 5) + 1) as f64);
2120 }
2121
2122 for _ in 0..50 {
2123 completions.extend(scheduler.tick(1.0));
2124 }
2125
2126 completions
2127 };
2128
2129 let run1 = run();
2130 let run2 = run();
2131
2132 assert_eq!(run1, run2, "Scheduling should be deterministic");
2133 }
2134
2135 #[test]
2136 fn smith_beats_fifo_on_mixed_workload() {
2137 let workload = mixed_workload();
2138 let smith = simulate_policy(SimPolicy::Smith, &workload);
2139 let fifo = simulate_policy(SimPolicy::Fifo, &workload);
2140
2141 eprintln!("{}", workload_summary_json(&workload));
2142 eprintln!("{}", summary_json(SimPolicy::Smith, &smith));
2143 eprintln!("{}", summary_json(SimPolicy::Fifo, &fifo));
2144
2145 assert!(
2146 smith.mean < fifo.mean,
2147 "mean should improve: smith={} fifo={}",
2148 summary_json(SimPolicy::Smith, &smith),
2149 summary_json(SimPolicy::Fifo, &fifo)
2150 );
2151 assert!(
2152 smith.p95 < fifo.p95,
2153 "p95 should improve: smith={} fifo={}",
2154 summary_json(SimPolicy::Smith, &smith),
2155 summary_json(SimPolicy::Fifo, &fifo)
2156 );
2157 assert!(
2158 smith.p99 < fifo.p99,
2159 "p99 should improve: smith={} fifo={}",
2160 summary_json(SimPolicy::Smith, &smith),
2161 summary_json(SimPolicy::Fifo, &fifo)
2162 );
2163 }
2164
2165 #[test]
2166 fn simulation_is_deterministic_per_policy() {
2167 let workload = mixed_workload();
2168 let smith1 = simulate_policy(SimPolicy::Smith, &workload);
2169 let smith2 = simulate_policy(SimPolicy::Smith, &workload);
2170 let fifo1 = simulate_policy(SimPolicy::Fifo, &workload);
2171 let fifo2 = simulate_policy(SimPolicy::Fifo, &workload);
2172
2173 assert_eq!(smith1.completion_order, smith2.completion_order);
2174 assert_eq!(fifo1.completion_order, fifo2.completion_order);
2175 assert!((smith1.mean - smith2.mean).abs() < 1e-9);
2176 assert!((fifo1.mean - fifo2.mean).abs() < 1e-9);
2177 }
2178
2179 #[test]
2180 fn effect_queue_trace_is_deterministic() {
2181 let mut config = test_config();
2182 config.preemptive = false;
2183 config.aging_factor = 0.0;
2184 config.wait_starve_ms = 0.0;
2185 config.force_fifo = false;
2186 config.smith_enabled = true;
2187
2188 let mut scheduler = QueueingScheduler::new(config);
2189 let id_alpha = scheduler
2190 .submit_with_sources(
2191 1.0,
2192 8.0,
2193 WeightSource::Explicit,
2194 EstimateSource::Explicit,
2195 Some("alpha"),
2196 )
2197 .expect("alpha accepted");
2198 let id_beta = scheduler
2199 .submit_with_sources(
2200 4.0,
2201 2.0,
2202 WeightSource::Explicit,
2203 EstimateSource::Explicit,
2204 Some("beta"),
2205 )
2206 .expect("beta accepted");
2207 let id_gamma = scheduler
2208 .submit_with_sources(
2209 2.0,
2210 10.0,
2211 WeightSource::Explicit,
2212 EstimateSource::Explicit,
2213 Some("gamma"),
2214 )
2215 .expect("gamma accepted");
2216 let id_delta = scheduler
2217 .submit_with_sources(
2218 3.0,
2219 3.0,
2220 WeightSource::Explicit,
2221 EstimateSource::Explicit,
2222 Some("delta"),
2223 )
2224 .expect("delta accepted");
2225
2226 scheduler.refresh_priorities();
2227
2228 let mut selected = Vec::new();
2229 while let Some(job) = scheduler.peek_next().cloned() {
2230 let evidence = scheduler.evidence();
2231 if let Some(id) = evidence.selected_job_id {
2232 selected.push(id);
2233 }
2234 println!("{}", evidence.to_jsonl("effect_queue_select"));
2235
2236 let completed = scheduler.tick(job.remaining_time);
2237 assert!(
2238 !completed.is_empty(),
2239 "expected completion per tick in non-preemptive mode"
2240 );
2241 }
2242
2243 assert_eq!(selected, vec![id_beta, id_delta, id_gamma, id_alpha]);
2244 }
2245
2246 #[test]
2251 fn zero_weight_handled() {
2252 let mut scheduler = QueueingScheduler::new(test_config());
2253 scheduler.submit(0.0, 10.0);
2254 assert!(scheduler.peek_next().is_some());
2255 }
2256
2257 #[test]
2258 fn zero_time_completes_immediately() {
2259 let mut scheduler = QueueingScheduler::new(test_config());
2260 scheduler.submit(1.0, 0.0);
2261 let completed = scheduler.tick(1.0);
2262 assert_eq!(completed.len(), 1);
2263 }
2264
2265 #[test]
2266 fn negative_time_handled() {
2267 let mut scheduler = QueueingScheduler::new(test_config());
2268 scheduler.submit(1.0, -10.0);
2269 let completed = scheduler.tick(1.0);
2270 assert_eq!(completed.len(), 1);
2271 }
2272
2273 #[test]
2274 fn tick_non_finite_delta_noops() {
2275 let mut scheduler = QueueingScheduler::new(test_config());
2276 scheduler.submit(1.0, 5.0);
2277
2278 let before = scheduler.stats();
2279 assert!(scheduler.tick(f64::NAN).is_empty());
2280 assert!(scheduler.tick(f64::INFINITY).is_empty());
2281 assert!(scheduler.tick(f64::NEG_INFINITY).is_empty());
2282 let after = scheduler.stats();
2283
2284 assert_eq!(before.total_processing_time, after.total_processing_time);
2285 assert_eq!(before.total_completed, after.total_completed);
2286 assert!(scheduler.peek_next().is_some());
2287 }
2288
2289 #[test]
2294 fn job_new_nan_weight_clamps_to_min() {
2295 let job = Job::new(1, f64::NAN, 10.0);
2296 assert_eq!(job.weight, DEFAULT_W_MIN);
2297 }
2298
2299 #[test]
2300 fn job_new_pos_inf_weight_clamps_to_max() {
2301 let job = Job::new(1, f64::INFINITY, 10.0);
2302 assert_eq!(job.weight, DEFAULT_W_MAX);
2303 }
2304
2305 #[test]
2306 fn job_new_neg_inf_weight_clamps_to_min() {
2307 let job = Job::new(1, f64::NEG_INFINITY, 10.0);
2308 assert_eq!(job.weight, DEFAULT_W_MIN);
2309 }
2310
2311 #[test]
2312 fn job_new_nan_estimate_clamps_to_max() {
2313 let job = Job::new(1, 1.0, f64::NAN);
2314 assert_eq!(job.remaining_time, DEFAULT_P_MAX_MS);
2315 assert_eq!(job.total_time, DEFAULT_P_MAX_MS);
2316 }
2317
2318 #[test]
2319 fn job_new_pos_inf_estimate_clamps_to_max() {
2320 let job = Job::new(1, 1.0, f64::INFINITY);
2321 assert_eq!(job.remaining_time, DEFAULT_P_MAX_MS);
2322 }
2323
2324 #[test]
2325 fn job_new_neg_inf_estimate_clamps_to_min() {
2326 let job = Job::new(1, 1.0, f64::NEG_INFINITY);
2327 assert_eq!(job.remaining_time, DEFAULT_P_MIN_MS);
2328 }
2329
2330 #[test]
2331 fn job_with_name_sets_name() {
2332 let job = Job::with_name(1, 1.0, 10.0, "alpha");
2333 assert_eq!(job.name.as_deref(), Some("alpha"));
2334 assert_eq!(job.id, 1);
2335 }
2336
2337 #[test]
2338 fn job_with_sources_sets_both() {
2339 let job =
2340 Job::new(1, 1.0, 10.0).with_sources(WeightSource::Unknown, EstimateSource::Historical);
2341 assert_eq!(job.weight_source, WeightSource::Unknown);
2342 assert_eq!(job.estimate_source, EstimateSource::Historical);
2343 }
2344
2345 #[test]
2346 fn job_progress_zero_total_time() {
2347 let mut job = Job::new(1, 1.0, 10.0);
2348 job.total_time = 0.0;
2349 assert_eq!(job.progress(), 1.0);
2350 }
2351
2352 #[test]
2353 fn job_is_complete_negative_remaining() {
2354 let mut job = Job::new(1, 1.0, 10.0);
2355 job.remaining_time = -5.0;
2356 assert!(job.is_complete());
2357 }
2358
2359 #[test]
2364 fn submit_nan_weight_normalized() {
2365 let mut scheduler = QueueingScheduler::new(test_config());
2366 scheduler.submit(f64::NAN, 10.0);
2367 let next = scheduler.peek_next().unwrap();
2368 assert!(next.weight >= DEFAULT_W_MIN);
2369 assert!(next.weight.is_finite());
2370 }
2371
2372 #[test]
2373 fn submit_inf_weight_normalized() {
2374 let mut scheduler = QueueingScheduler::new(test_config());
2375 scheduler.submit(f64::INFINITY, 10.0);
2376 let next = scheduler.peek_next().unwrap();
2377 assert!(next.weight <= DEFAULT_W_MAX);
2378 assert!(next.weight.is_finite());
2379 }
2380
2381 #[test]
2382 fn submit_nan_estimate_normalized() {
2383 let mut scheduler = QueueingScheduler::new(test_config());
2384 scheduler.submit(1.0, f64::NAN);
2385 let next = scheduler.peek_next().unwrap();
2386 assert!(next.remaining_time <= DEFAULT_P_MAX_MS);
2387 assert!(next.remaining_time.is_finite());
2388 }
2389
2390 #[test]
2391 fn submit_inf_estimate_normalized() {
2392 let mut scheduler = QueueingScheduler::new(test_config());
2393 scheduler.submit(1.0, f64::INFINITY);
2394 let next = scheduler.peek_next().unwrap();
2395 assert!(next.remaining_time <= DEFAULT_P_MAX_MS);
2396 assert!(next.remaining_time.is_finite());
2397 }
2398
2399 #[test]
2404 fn config_mode_smith() {
2405 let config = SchedulerConfig {
2406 smith_enabled: true,
2407 force_fifo: false,
2408 ..Default::default()
2409 };
2410 assert_eq!(config.mode(), SchedulingMode::Smith);
2411 }
2412
2413 #[test]
2414 fn config_mode_srpt() {
2415 let config = SchedulerConfig {
2416 smith_enabled: false,
2417 force_fifo: false,
2418 ..Default::default()
2419 };
2420 assert_eq!(config.mode(), SchedulingMode::Srpt);
2421 }
2422
2423 #[test]
2424 fn config_mode_fifo_overrides_smith() {
2425 let config = SchedulerConfig {
2426 smith_enabled: true,
2427 force_fifo: true,
2428 ..Default::default()
2429 };
2430 assert_eq!(config.mode(), SchedulingMode::Fifo);
2431 }
2432
2433 #[test]
2438 fn starvation_guard_triggers_after_threshold() {
2439 let mut config = test_config();
2440 config.aging_factor = 0.0;
2441 config.wait_starve_ms = 50.0;
2442 config.starve_boost_ratio = 5.0;
2443 let mut scheduler = QueueingScheduler::new(config);
2444
2445 scheduler.submit(1.0, 100.0); scheduler.current_time = 60.0; scheduler.refresh_priorities();
2448
2449 let evidence = scheduler.evidence();
2450 let job_ev = &evidence.jobs[0];
2451 assert!(
2453 job_ev.starvation_floor > 0.0,
2454 "starvation floor should be active: {}",
2455 job_ev.starvation_floor
2456 );
2457 assert!(
2458 job_ev.effective_priority >= job_ev.starvation_floor,
2459 "effective priority {} should be >= starvation floor {}",
2460 job_ev.effective_priority,
2461 job_ev.starvation_floor
2462 );
2463 }
2464
2465 #[test]
2466 fn starvation_guard_disabled_when_zero() {
2467 let mut config = test_config();
2468 config.aging_factor = 0.0;
2469 config.wait_starve_ms = 0.0;
2470 let mut scheduler = QueueingScheduler::new(config);
2471
2472 scheduler.submit(1.0, 100.0);
2473 scheduler.current_time = 1000.0;
2474 scheduler.refresh_priorities();
2475
2476 let evidence = scheduler.evidence();
2477 let job_ev = &evidence.jobs[0];
2478 assert!(
2479 (job_ev.starvation_floor - 0.0).abs() < f64::EPSILON,
2480 "starvation floor should be 0 when disabled"
2481 );
2482 }
2483
2484 #[test]
2489 fn cancel_current_job() {
2490 let mut scheduler = QueueingScheduler::new(test_config());
2491 let id = scheduler.submit(1.0, 100.0).unwrap();
2492 scheduler.tick(10.0); assert!(scheduler.cancel(id));
2495 assert!(scheduler.peek_next().is_none());
2496 }
2497
2498 #[test]
2499 fn cancel_from_middle_of_queue() {
2500 let mut scheduler = QueueingScheduler::new(test_config());
2501 scheduler.submit(1.0, 100.0); let id2 = scheduler.submit(1.0, 50.0).unwrap(); scheduler.submit(1.0, 200.0); assert!(scheduler.cancel(id2));
2506 assert_eq!(scheduler.stats().queue_length, 2);
2507 }
2508
2509 #[test]
2514 fn tick_negative_delta_returns_empty() {
2515 let mut scheduler = QueueingScheduler::new(test_config());
2516 scheduler.submit(1.0, 10.0);
2517 let completed = scheduler.tick(-5.0);
2518 assert!(completed.is_empty());
2519 }
2520
2521 #[test]
2522 fn tick_empty_queue_advances_time() {
2523 let mut scheduler = QueueingScheduler::new(test_config());
2524 let completed = scheduler.tick(100.0);
2525 assert!(completed.is_empty());
2526 }
2527
2528 #[test]
2529 fn tick_processes_across_multiple_jobs_in_single_delta() {
2530 let mut config = test_config();
2531 config.aging_factor = 0.0;
2532 let mut scheduler = QueueingScheduler::new(config);
2533
2534 scheduler.submit(1.0, 3.0);
2535 scheduler.submit(1.0, 3.0);
2536 scheduler.submit(1.0, 3.0);
2537
2538 let completed = scheduler.tick(9.0);
2540 assert_eq!(completed.len(), 3);
2541 }
2542
2543 #[test]
2548 fn stats_default_values() {
2549 let stats = SchedulerStats::default();
2550 assert_eq!(stats.total_submitted, 0);
2551 assert_eq!(stats.total_completed, 0);
2552 assert_eq!(stats.total_rejected, 0);
2553 assert_eq!(stats.total_preemptions, 0);
2554 assert_eq!(stats.queue_length, 0);
2555 }
2556
2557 #[test]
2558 fn stats_mean_response_time_zero_completions() {
2559 let stats = SchedulerStats::default();
2560 assert_eq!(stats.mean_response_time(), 0.0);
2561 }
2562
2563 #[test]
2564 fn stats_throughput_zero_processing_time() {
2565 let stats = SchedulerStats::default();
2566 assert_eq!(stats.throughput(), 0.0);
2567 }
2568
2569 #[test]
2570 fn stats_max_response_time_tracked() {
2571 let mut scheduler = QueueingScheduler::new(test_config());
2572 scheduler.submit(1.0, 5.0);
2573 scheduler.submit(1.0, 10.0);
2574 scheduler.tick(15.0);
2575
2576 let stats = scheduler.stats();
2577 assert!(
2578 stats.max_response_time >= 10.0,
2579 "max response time {} should be >= 10",
2580 stats.max_response_time
2581 );
2582 }
2583
2584 #[test]
2589 fn evidence_continuation_reason() {
2590 let mut scheduler = QueueingScheduler::new(test_config());
2591 scheduler.submit(1.0, 100.0);
2592 scheduler.tick(10.0); let evidence = scheduler.evidence();
2595 assert_eq!(evidence.reason, SelectionReason::Continuation);
2596 }
2597
2598 #[test]
2599 fn evidence_single_job_no_tie_break() {
2600 let mut scheduler = QueueingScheduler::new(test_config());
2601 scheduler.submit(1.0, 10.0);
2602
2603 let evidence = scheduler.evidence();
2604 assert!(
2605 evidence.tie_break_reason.is_none(),
2606 "single job should have no tie break"
2607 );
2608 }
2609
2610 #[test]
2611 fn evidence_to_jsonl_contains_required_fields() {
2612 let mut scheduler = QueueingScheduler::new(test_config());
2613 scheduler.submit(1.0, 10.0);
2614 scheduler.submit(2.0, 5.0);
2615
2616 let evidence = scheduler.evidence();
2617 let json = evidence.to_jsonl("test_event");
2618
2619 assert!(json.contains("\"event\":\"test_event\""));
2620 assert!(json.contains("\"current_time\":"));
2621 assert!(json.contains("\"selected_job_id\":"));
2622 assert!(json.contains("\"queue_length\":"));
2623 assert!(json.contains("\"mean_wait_time\":"));
2624 assert!(json.contains("\"max_wait_time\":"));
2625 assert!(json.contains("\"reason\":"));
2626 assert!(json.contains("\"tie_break_reason\":"));
2627 assert!(json.contains("\"jobs\":["));
2628 }
2629
2630 #[test]
2631 fn evidence_to_jsonl_empty_queue() {
2632 let scheduler = QueueingScheduler::new(test_config());
2633 let evidence = scheduler.evidence();
2634 let json = evidence.to_jsonl("empty");
2635
2636 assert!(json.contains("\"selected_job_id\":null"));
2637 assert!(json.contains("\"tie_break_reason\":null"));
2638 assert!(json.contains("\"jobs\":[]"));
2639 }
2640
2641 #[test]
2642 fn job_evidence_to_json_contains_all_fields() {
2643 let mut config = test_config();
2644 config.aging_factor = 0.5;
2645 config.wait_starve_ms = 5.0;
2646 let mut scheduler = QueueingScheduler::new(config);
2647
2648 scheduler.submit_with_sources(
2649 2.0,
2650 10.0,
2651 WeightSource::Explicit,
2652 EstimateSource::Default,
2653 Some("test-job"),
2654 );
2655 scheduler.current_time = 10.0;
2656 scheduler.refresh_priorities();
2657
2658 let evidence = scheduler.evidence();
2659 let json = evidence.to_jsonl("detail");
2660
2661 assert!(json.contains("\"job_id\":"));
2662 assert!(json.contains("\"name\":\"test-job\""));
2663 assert!(json.contains("\"estimate_ms\":"));
2664 assert!(json.contains("\"weight\":"));
2665 assert!(json.contains("\"ratio\":"));
2666 assert!(json.contains("\"aging_reward\":"));
2667 assert!(json.contains("\"starvation_floor\":"));
2668 assert!(json.contains("\"age_ms\":"));
2669 assert!(json.contains("\"effective_priority\":"));
2670 assert!(json.contains("\"objective_loss_proxy\":"));
2671 assert!(json.contains("\"estimate_source\":"));
2672 assert!(json.contains("\"weight_source\":"));
2673 }
2674
2675 #[test]
2676 fn evidence_jsonl_escapes_special_chars_in_name() {
2677 let mut scheduler = QueueingScheduler::new(test_config());
2678 scheduler.submit_named(1.0, 10.0, Some("job\"with\\special\nchars"));
2679
2680 let evidence = scheduler.evidence();
2681 let json = evidence.to_jsonl("escape_test");
2682
2683 assert!(json.contains("\\\""));
2684 assert!(json.contains("\\\\"));
2685 assert!(json.contains("\\n"));
2686 }
2687
2688 #[test]
2693 fn selection_reason_as_str_coverage() {
2694 assert_eq!(SelectionReason::QueueEmpty.as_str(), "queue_empty");
2695 assert_eq!(
2696 SelectionReason::ShortestRemaining.as_str(),
2697 "shortest_remaining"
2698 );
2699 assert_eq!(
2700 SelectionReason::HighestWeightedPriority.as_str(),
2701 "highest_weighted_priority"
2702 );
2703 assert_eq!(SelectionReason::Fifo.as_str(), "fifo");
2704 assert_eq!(SelectionReason::AgingBoost.as_str(), "aging_boost");
2705 assert_eq!(SelectionReason::Continuation.as_str(), "continuation");
2706 }
2707
2708 #[test]
2709 fn estimate_source_as_str_coverage() {
2710 assert_eq!(EstimateSource::Explicit.as_str(), "explicit");
2711 assert_eq!(EstimateSource::Historical.as_str(), "historical");
2712 assert_eq!(EstimateSource::Default.as_str(), "default");
2713 assert_eq!(EstimateSource::Unknown.as_str(), "unknown");
2714 }
2715
2716 #[test]
2717 fn weight_source_as_str_coverage() {
2718 assert_eq!(WeightSource::Explicit.as_str(), "explicit");
2719 assert_eq!(WeightSource::Default.as_str(), "default");
2720 assert_eq!(WeightSource::Unknown.as_str(), "unknown");
2721 }
2722
2723 #[test]
2724 fn tie_break_reason_as_str_coverage() {
2725 assert_eq!(
2726 TieBreakReason::EffectivePriority.as_str(),
2727 "effective_priority"
2728 );
2729 assert_eq!(TieBreakReason::BaseRatio.as_str(), "base_ratio");
2730 assert_eq!(TieBreakReason::Weight.as_str(), "weight");
2731 assert_eq!(TieBreakReason::RemainingTime.as_str(), "remaining_time");
2732 assert_eq!(TieBreakReason::ArrivalSeq.as_str(), "arrival_seq");
2733 assert_eq!(TieBreakReason::JobId.as_str(), "job_id");
2734 assert_eq!(TieBreakReason::Continuation.as_str(), "continuation");
2735 }
2736
2737 #[test]
2742 fn debug_job() {
2743 let job = Job::with_name(1, 2.0, 50.0, "render");
2744 let dbg = format!("{job:?}");
2745 assert!(dbg.contains("Job"));
2746 assert!(dbg.contains("render"));
2747 }
2748
2749 #[test]
2750 fn debug_scheduler_config() {
2751 let config = SchedulerConfig::default();
2752 let dbg = format!("{config:?}");
2753 assert!(dbg.contains("SchedulerConfig"));
2754 assert!(dbg.contains("aging_factor"));
2755 }
2756
2757 #[test]
2758 fn debug_scheduler_stats() {
2759 let stats = SchedulerStats::default();
2760 let dbg = format!("{stats:?}");
2761 assert!(dbg.contains("SchedulerStats"));
2762 }
2763
2764 #[test]
2765 fn debug_scheduling_evidence() {
2766 let scheduler = QueueingScheduler::new(test_config());
2767 let evidence = scheduler.evidence();
2768 let dbg = format!("{evidence:?}");
2769 assert!(dbg.contains("SchedulingEvidence"));
2770 }
2771
2772 #[test]
2773 fn debug_scheduling_mode() {
2774 assert!(format!("{:?}", SchedulingMode::Smith).contains("Smith"));
2775 assert!(format!("{:?}", SchedulingMode::Srpt).contains("Srpt"));
2776 assert!(format!("{:?}", SchedulingMode::Fifo).contains("Fifo"));
2777 }
2778
2779 #[test]
2784 fn historical_estimate_passes_through() {
2785 let mut scheduler = QueueingScheduler::new(test_config());
2786 scheduler.submit_with_sources(
2787 1.0,
2788 42.0,
2789 WeightSource::Explicit,
2790 EstimateSource::Historical,
2791 None::<&str>,
2792 );
2793
2794 let next = scheduler.peek_next().unwrap();
2795 assert!((next.remaining_time - 42.0).abs() < f64::EPSILON);
2796 }
2797
2798 #[test]
2803 fn multiple_preemptions_counted() {
2804 let mut config = test_config();
2805 config.aging_factor = 0.0; config.wait_starve_ms = 0.0;
2807 let mut scheduler = QueueingScheduler::new(config);
2808
2809 scheduler.submit(1.0, 100.0); scheduler.tick(1.0); scheduler.submit(1.0, 50.0); scheduler.tick(1.0); scheduler.submit(1.0, 10.0); assert!(
2818 scheduler.stats().total_preemptions >= 2,
2819 "expected >= 2 preemptions, got {}",
2820 scheduler.stats().total_preemptions
2821 );
2822 }
2823
2824 #[test]
2829 fn multiple_rejections_counted() {
2830 let mut config = test_config();
2831 config.max_queue_size = 1;
2832 let mut scheduler = QueueingScheduler::new(config);
2833
2834 scheduler.submit(1.0, 10.0); scheduler.submit(1.0, 10.0); scheduler.submit(1.0, 10.0); assert_eq!(scheduler.stats().total_rejected, 2);
2839 }
2840
2841 #[test]
2846 fn reset_resets_job_id_sequence() {
2847 let mut scheduler = QueueingScheduler::new(test_config());
2848 scheduler.submit(1.0, 10.0); scheduler.submit(1.0, 10.0); scheduler.reset();
2852
2853 let id = scheduler.submit(1.0, 10.0).unwrap();
2854 assert_eq!(id, 1, "job id should restart from 1 after reset");
2855 }
2856
2857 #[test]
2858 fn clear_preserves_job_id_sequence() {
2859 let mut scheduler = QueueingScheduler::new(test_config());
2860 scheduler.submit(1.0, 10.0); scheduler.submit(1.0, 10.0); scheduler.clear();
2864
2865 let id = scheduler.submit(1.0, 10.0).unwrap();
2866 assert_eq!(id, 3, "job id should continue after clear");
2867 }
2868
2869 #[test]
2874 fn evidence_aging_boost_reason() {
2875 let mut config = test_config();
2876 config.aging_factor = 1.0;
2877 config.wait_starve_ms = 10.0;
2878 let mut scheduler = QueueingScheduler::new(config);
2879
2880 scheduler.submit(1.0, 100.0);
2881 scheduler.current_time = 100.0;
2882 scheduler.refresh_priorities();
2883
2884 let evidence = scheduler.evidence();
2885 assert_eq!(
2886 evidence.reason,
2887 SelectionReason::AgingBoost,
2888 "long-waiting job should show aging boost reason"
2889 );
2890 }
2891}