1#![forbid(unsafe_code)]
2
3use std::cmp::Ordering;
63use std::collections::BinaryHeap;
64use std::fmt::Write;
65
66const DEFAULT_AGING_FACTOR: f64 = 0.1;
68
69const MAX_QUEUE_SIZE: usize = 10_000;
71
72const DEFAULT_P_MIN_MS: f64 = 0.05;
74
75const DEFAULT_P_MAX_MS: f64 = 5_000.0;
77
78const DEFAULT_W_MIN: f64 = 1e-6;
80
81const DEFAULT_W_MAX: f64 = 100.0;
83
84const DEFAULT_WEIGHT_DEFAULT: f64 = 1.0;
86
87const DEFAULT_WEIGHT_UNKNOWN: f64 = 1.0;
89
90const DEFAULT_ESTIMATE_DEFAULT_MS: f64 = 10.0;
92
93const DEFAULT_ESTIMATE_UNKNOWN_MS: f64 = 1_000.0;
95
96const DEFAULT_WAIT_STARVE_MS: f64 = 500.0;
98
99const DEFAULT_STARVE_BOOST_RATIO: f64 = 1.5;
101
102#[derive(Debug, Clone)]
104pub struct SchedulerConfig {
105 pub aging_factor: f64,
109
110 pub p_min_ms: f64,
112
113 pub p_max_ms: f64,
115
116 pub estimate_default_ms: f64,
119
120 pub estimate_unknown_ms: f64,
123
124 pub w_min: f64,
126
127 pub w_max: f64,
129
130 pub weight_default: f64,
133
134 pub weight_unknown: f64,
137
138 pub wait_starve_ms: f64,
140
141 pub starve_boost_ratio: f64,
144
145 pub smith_enabled: bool,
147
148 pub force_fifo: bool,
151
152 pub max_queue_size: usize,
154
155 pub preemptive: bool,
157
158 pub time_quantum: f64,
161
162 pub enable_logging: bool,
164}
165
166impl Default for SchedulerConfig {
167 fn default() -> Self {
168 Self {
169 aging_factor: DEFAULT_AGING_FACTOR,
170 p_min_ms: DEFAULT_P_MIN_MS,
171 p_max_ms: DEFAULT_P_MAX_MS,
172 estimate_default_ms: DEFAULT_ESTIMATE_DEFAULT_MS,
173 estimate_unknown_ms: DEFAULT_ESTIMATE_UNKNOWN_MS,
174 w_min: DEFAULT_W_MIN,
175 w_max: DEFAULT_W_MAX,
176 weight_default: DEFAULT_WEIGHT_DEFAULT,
177 weight_unknown: DEFAULT_WEIGHT_UNKNOWN,
178 wait_starve_ms: DEFAULT_WAIT_STARVE_MS,
179 starve_boost_ratio: DEFAULT_STARVE_BOOST_RATIO,
180 smith_enabled: true,
181 force_fifo: false,
182 max_queue_size: MAX_QUEUE_SIZE,
183 preemptive: true,
184 time_quantum: 10.0,
185 enable_logging: false,
186 }
187 }
188}
189
190impl SchedulerConfig {
191 pub fn mode(&self) -> SchedulingMode {
193 if self.force_fifo {
194 SchedulingMode::Fifo
195 } else if self.smith_enabled {
196 SchedulingMode::Smith
197 } else {
198 SchedulingMode::Srpt
199 }
200 }
201}
202
203#[derive(Debug, Clone)]
205pub struct Job {
206 pub id: u64,
208
209 pub weight: f64,
211
212 pub remaining_time: f64,
214
215 pub total_time: f64,
217
218 pub arrival_time: f64,
220
221 pub arrival_seq: u64,
223
224 pub estimate_source: EstimateSource,
226
227 pub weight_source: WeightSource,
229
230 pub name: Option<String>,
232}
233
234impl Job {
235 pub fn new(id: u64, weight: f64, estimated_time: f64) -> Self {
237 let weight = if weight.is_nan() {
238 DEFAULT_W_MIN
239 } else if weight.is_infinite() {
240 if weight.is_sign_positive() {
241 DEFAULT_W_MAX
242 } else {
243 DEFAULT_W_MIN
244 }
245 } else {
246 weight.clamp(DEFAULT_W_MIN, DEFAULT_W_MAX)
247 };
248 let estimated_time = if estimated_time.is_nan() {
249 DEFAULT_P_MAX_MS
250 } else if estimated_time.is_infinite() {
251 if estimated_time.is_sign_positive() {
252 DEFAULT_P_MAX_MS
253 } else {
254 DEFAULT_P_MIN_MS
255 }
256 } else {
257 estimated_time.clamp(DEFAULT_P_MIN_MS, DEFAULT_P_MAX_MS)
258 };
259 Self {
260 id,
261 weight,
262 remaining_time: estimated_time,
263 total_time: estimated_time,
264 arrival_time: 0.0,
265 arrival_seq: 0,
266 estimate_source: EstimateSource::Explicit,
267 weight_source: WeightSource::Explicit,
268 name: None,
269 }
270 }
271
272 pub fn with_name(id: u64, weight: f64, estimated_time: f64, name: impl Into<String>) -> Self {
274 let mut job = Self::new(id, weight, estimated_time);
275 job.name = Some(name.into());
276 job
277 }
278
279 pub fn with_sources(
281 mut self,
282 weight_source: WeightSource,
283 estimate_source: EstimateSource,
284 ) -> Self {
285 self.weight_source = weight_source;
286 self.estimate_source = estimate_source;
287 self
288 }
289
290 pub fn progress(&self) -> f64 {
292 if self.total_time <= 0.0 {
293 1.0
294 } else {
295 1.0 - (self.remaining_time / self.total_time).clamp(0.0, 1.0)
296 }
297 }
298
299 pub fn is_complete(&self) -> bool {
301 self.remaining_time <= 0.0
302 }
303}
304
305#[derive(Debug, Clone)]
307struct PriorityJob {
308 priority: f64,
309 base_ratio: f64,
310 job: Job,
311 mode: SchedulingMode,
312}
313
314impl PartialEq for PriorityJob {
315 fn eq(&self, other: &Self) -> bool {
316 self.job.id == other.job.id
317 }
318}
319
320impl Eq for PriorityJob {}
321
322impl PartialOrd for PriorityJob {
323 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
324 Some(self.cmp(other))
325 }
326}
327
328impl Ord for PriorityJob {
329 fn cmp(&self, other: &Self) -> Ordering {
330 if self.mode == SchedulingMode::Fifo || other.mode == SchedulingMode::Fifo {
331 return other
332 .job
333 .arrival_seq
334 .cmp(&self.job.arrival_seq)
335 .then_with(|| other.job.id.cmp(&self.job.id));
336 }
337 self.priority
339 .total_cmp(&other.priority)
340 .then_with(|| self.base_ratio.total_cmp(&other.base_ratio))
342 .then_with(|| self.job.weight.total_cmp(&other.job.weight))
344 .then_with(|| other.job.remaining_time.total_cmp(&self.job.remaining_time))
346 .then_with(|| other.job.arrival_seq.cmp(&self.job.arrival_seq))
348 .then_with(|| other.job.id.cmp(&self.job.id))
350 }
351}
352
353#[derive(Debug, Clone)]
355pub struct SchedulingEvidence {
356 pub current_time: f64,
358
359 pub selected_job_id: Option<u64>,
361
362 pub queue_length: usize,
364
365 pub mean_wait_time: f64,
367
368 pub max_wait_time: f64,
370
371 pub reason: SelectionReason,
373
374 pub tie_break_reason: Option<TieBreakReason>,
376
377 pub jobs: Vec<JobEvidence>,
379}
380
381#[derive(Debug, Clone)]
383pub struct JobEvidence {
384 pub job_id: u64,
386 pub name: Option<String>,
388 pub estimate_ms: f64,
390 pub weight: f64,
392 pub ratio: f64,
394 pub age_ms: f64,
396 pub effective_priority: f64,
398 pub estimate_source: EstimateSource,
400 pub weight_source: WeightSource,
402}
403
404#[derive(Debug, Clone, Copy, PartialEq, Eq)]
406pub enum SelectionReason {
407 QueueEmpty,
409 ShortestRemaining,
411 HighestWeightedPriority,
413 Fifo,
415 AgingBoost,
417 Continuation,
419}
420
421impl SelectionReason {
422 fn as_str(self) -> &'static str {
423 match self {
424 Self::QueueEmpty => "queue_empty",
425 Self::ShortestRemaining => "shortest_remaining",
426 Self::HighestWeightedPriority => "highest_weighted_priority",
427 Self::Fifo => "fifo",
428 Self::AgingBoost => "aging_boost",
429 Self::Continuation => "continuation",
430 }
431 }
432}
433
434#[derive(Debug, Clone, Copy, PartialEq, Eq)]
436pub enum EstimateSource {
437 Explicit,
439 Historical,
441 Default,
443 Unknown,
445}
446
447impl EstimateSource {
448 fn as_str(self) -> &'static str {
449 match self {
450 Self::Explicit => "explicit",
451 Self::Historical => "historical",
452 Self::Default => "default",
453 Self::Unknown => "unknown",
454 }
455 }
456}
457
458#[derive(Debug, Clone, Copy, PartialEq, Eq)]
460pub enum WeightSource {
461 Explicit,
463 Default,
465 Unknown,
467}
468
469impl WeightSource {
470 fn as_str(self) -> &'static str {
471 match self {
472 Self::Explicit => "explicit",
473 Self::Default => "default",
474 Self::Unknown => "unknown",
475 }
476 }
477}
478
479#[derive(Debug, Clone, Copy, PartialEq, Eq)]
481pub enum TieBreakReason {
482 EffectivePriority,
484 BaseRatio,
486 Weight,
488 RemainingTime,
490 ArrivalSeq,
492 JobId,
494 Continuation,
496}
497
498impl TieBreakReason {
499 fn as_str(self) -> &'static str {
500 match self {
501 Self::EffectivePriority => "effective_priority",
502 Self::BaseRatio => "base_ratio",
503 Self::Weight => "weight",
504 Self::RemainingTime => "remaining_time",
505 Self::ArrivalSeq => "arrival_seq",
506 Self::JobId => "job_id",
507 Self::Continuation => "continuation",
508 }
509 }
510}
511
512#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum SchedulingMode {
515 Smith,
517 Srpt,
519 Fifo,
521}
522
523impl SchedulingEvidence {
524 #[must_use]
526 pub fn to_jsonl(&self, event: &str) -> String {
527 let mut out = String::with_capacity(256 + (self.jobs.len() * 64));
528 out.push_str("{\"event\":\"");
529 out.push_str(&escape_json(event));
530 out.push_str("\",\"current_time\":");
531 let _ = write!(out, "{:.6}", self.current_time);
532 out.push_str(",\"selected_job_id\":");
533 match self.selected_job_id {
534 Some(id) => {
535 let _ = write!(out, "{id}");
536 }
537 None => out.push_str("null"),
538 }
539 out.push_str(",\"queue_length\":");
540 let _ = write!(out, "{}", self.queue_length);
541 out.push_str(",\"mean_wait_time\":");
542 let _ = write!(out, "{:.6}", self.mean_wait_time);
543 out.push_str(",\"max_wait_time\":");
544 let _ = write!(out, "{:.6}", self.max_wait_time);
545 out.push_str(",\"reason\":\"");
546 out.push_str(self.reason.as_str());
547 out.push('"');
548 out.push_str(",\"tie_break_reason\":");
549 match self.tie_break_reason {
550 Some(reason) => {
551 out.push('"');
552 out.push_str(reason.as_str());
553 out.push('"');
554 }
555 None => out.push_str("null"),
556 }
557 out.push_str(",\"jobs\":[");
558 for (idx, job) in self.jobs.iter().enumerate() {
559 if idx > 0 {
560 out.push(',');
561 }
562 out.push_str(&job.to_json());
563 }
564 out.push_str("]}");
565 out
566 }
567}
568
569impl JobEvidence {
570 fn to_json(&self) -> String {
571 let mut out = String::with_capacity(128);
572 out.push_str("{\"job_id\":");
573 let _ = write!(out, "{}", self.job_id);
574 out.push_str(",\"name\":");
575 match &self.name {
576 Some(name) => {
577 out.push('"');
578 out.push_str(&escape_json(name));
579 out.push('"');
580 }
581 None => out.push_str("null"),
582 }
583 out.push_str(",\"estimate_ms\":");
584 let _ = write!(out, "{:.6}", self.estimate_ms);
585 out.push_str(",\"weight\":");
586 let _ = write!(out, "{:.6}", self.weight);
587 out.push_str(",\"ratio\":");
588 let _ = write!(out, "{:.6}", self.ratio);
589 out.push_str(",\"age_ms\":");
590 let _ = write!(out, "{:.6}", self.age_ms);
591 out.push_str(",\"effective_priority\":");
592 let _ = write!(out, "{:.6}", self.effective_priority);
593 out.push_str(",\"estimate_source\":\"");
594 out.push_str(self.estimate_source.as_str());
595 out.push('"');
596 out.push_str(",\"weight_source\":\"");
597 out.push_str(self.weight_source.as_str());
598 out.push('"');
599 out.push('}');
600 out
601 }
602}
603
604fn escape_json(input: &str) -> String {
605 let mut out = String::with_capacity(input.len() + 8);
606 for ch in input.chars() {
607 match ch {
608 '"' => out.push_str("\\\""),
609 '\\' => out.push_str("\\\\"),
610 '\n' => out.push_str("\\n"),
611 '\r' => out.push_str("\\r"),
612 '\t' => out.push_str("\\t"),
613 '\u{08}' => out.push_str("\\b"),
614 '\u{0C}' => out.push_str("\\f"),
615 c if c < ' ' => {
616 let _ = write!(out, "\\u{:04x}", c as u32);
617 }
618 _ => out.push(ch),
619 }
620 }
621 out
622}
623
624#[derive(Debug, Clone, Default)]
626pub struct SchedulerStats {
627 pub total_submitted: u64,
629
630 pub total_completed: u64,
632
633 pub total_rejected: u64,
635
636 pub total_preemptions: u64,
638
639 pub total_processing_time: f64,
641
642 pub total_response_time: f64,
644
645 pub max_response_time: f64,
647
648 pub queue_length: usize,
650}
651
652impl SchedulerStats {
653 pub fn mean_response_time(&self) -> f64 {
655 if self.total_completed > 0 {
656 self.total_response_time / self.total_completed as f64
657 } else {
658 0.0
659 }
660 }
661
662 pub fn throughput(&self) -> f64 {
664 if self.total_processing_time > 0.0 {
665 self.total_completed as f64 / self.total_processing_time
666 } else {
667 0.0
668 }
669 }
670}
671
672#[derive(Debug)]
674pub struct QueueingScheduler {
675 config: SchedulerConfig,
676
677 queue: BinaryHeap<PriorityJob>,
679
680 current_job: Option<Job>,
682
683 current_time: f64,
685
686 next_job_id: u64,
688
689 next_arrival_seq: u64,
691
692 stats: SchedulerStats,
694}
695
696impl QueueingScheduler {
697 pub fn new(config: SchedulerConfig) -> Self {
699 Self {
700 config,
701 queue: BinaryHeap::new(),
702 current_job: None,
703 current_time: 0.0,
704 next_job_id: 1,
705 next_arrival_seq: 1,
706 stats: SchedulerStats::default(),
707 }
708 }
709
710 pub fn submit(&mut self, weight: f64, estimated_time: f64) -> Option<u64> {
714 self.submit_named(weight, estimated_time, None::<&str>)
715 }
716
717 pub fn submit_named(
719 &mut self,
720 weight: f64,
721 estimated_time: f64,
722 name: Option<impl Into<String>>,
723 ) -> Option<u64> {
724 self.submit_with_sources(
725 weight,
726 estimated_time,
727 WeightSource::Explicit,
728 EstimateSource::Explicit,
729 name,
730 )
731 }
732
733 pub fn submit_with_sources(
735 &mut self,
736 weight: f64,
737 estimated_time: f64,
738 weight_source: WeightSource,
739 estimate_source: EstimateSource,
740 name: Option<impl Into<String>>,
741 ) -> Option<u64> {
742 if self.queue.len() >= self.config.max_queue_size {
743 self.stats.total_rejected += 1;
744 return None;
745 }
746
747 let id = self.next_job_id;
748 self.next_job_id += 1;
749
750 let mut job =
751 Job::new(id, weight, estimated_time).with_sources(weight_source, estimate_source);
752 job.weight = self.normalize_weight_with_source(job.weight, job.weight_source);
753 job.remaining_time =
754 self.normalize_time_with_source(job.remaining_time, job.estimate_source);
755 job.total_time = job.remaining_time;
756 job.arrival_time = self.current_time;
757 job.arrival_seq = self.next_arrival_seq;
758 self.next_arrival_seq += 1;
759 if let Some(n) = name {
760 job.name = Some(n.into());
761 }
762
763 let priority_job = self.make_priority_job(job);
764 self.queue.push(priority_job);
765
766 self.stats.total_submitted += 1;
767 self.stats.queue_length = self.queue.len();
768
769 if self.config.preemptive {
771 self.maybe_preempt();
772 }
773
774 Some(id)
775 }
776
777 pub fn tick(&mut self, delta_time: f64) -> Vec<u64> {
781 let mut completed = Vec::new();
782 if delta_time <= 0.0 {
783 return completed;
784 }
785
786 let mut remaining_time = delta_time;
787 let mut now = self.current_time;
788 let mut processed_time = 0.0;
789
790 while remaining_time > 0.0 {
791 let Some(mut job) = (if let Some(j) = self.current_job.take() {
793 Some(j)
794 } else {
795 self.queue.pop().map(|pj| pj.job)
796 }) else {
797 now += remaining_time;
798 break; };
800
801 let process_time = remaining_time.min(job.remaining_time);
803 job.remaining_time -= process_time;
804 remaining_time -= process_time;
805 now += process_time;
806 processed_time += process_time;
807
808 if job.is_complete() {
809 let response_time = now - job.arrival_time;
811 self.stats.total_response_time += response_time;
812 self.stats.max_response_time = self.stats.max_response_time.max(response_time);
813 self.stats.total_completed += 1;
814 completed.push(job.id);
815 } else {
816 self.current_job = Some(job);
818 }
819 }
820
821 self.stats.total_processing_time += processed_time;
822 self.current_time = now;
823 self.refresh_priorities();
825
826 self.stats.queue_length = self.queue.len();
827 completed
828 }
829
830 pub fn peek_next(&self) -> Option<&Job> {
832 self.current_job
833 .as_ref()
834 .or_else(|| self.queue.peek().map(|pj| &pj.job))
835 }
836
837 pub fn evidence(&self) -> SchedulingEvidence {
839 let (mean_wait, max_wait) = self.compute_wait_stats();
840
841 let mut candidates: Vec<PriorityJob> = self
842 .queue
843 .iter()
844 .map(|pj| self.make_priority_job(pj.job.clone()))
845 .collect();
846
847 if let Some(ref current) = self.current_job {
848 candidates.push(self.make_priority_job(current.clone()));
849 }
850
851 candidates.sort_by(|a, b| b.cmp(a));
852
853 let selected_job_id = if let Some(ref current) = self.current_job {
854 Some(current.id)
855 } else {
856 candidates.first().map(|pj| pj.job.id)
857 };
858
859 let tie_break_reason = if self.current_job.is_some() {
860 Some(TieBreakReason::Continuation)
861 } else if candidates.len() > 1 {
862 Some(self.tie_break_reason(&candidates[0], &candidates[1]))
863 } else {
864 None
865 };
866
867 let reason = if self.queue.is_empty() && self.current_job.is_none() {
868 SelectionReason::QueueEmpty
869 } else if self.current_job.is_some() {
870 SelectionReason::Continuation
871 } else if self.config.mode() == SchedulingMode::Fifo {
872 SelectionReason::Fifo
873 } else if let Some(pj) = candidates.first() {
874 let wait_time = (self.current_time - pj.job.arrival_time).max(0.0);
875 let aging_contribution = self.config.aging_factor * wait_time;
876 let aging_boost = (self.config.wait_starve_ms > 0.0
877 && wait_time >= self.config.wait_starve_ms)
878 || aging_contribution > pj.base_ratio * 0.5;
879 if aging_boost {
880 SelectionReason::AgingBoost
881 } else if self.config.smith_enabled && pj.job.weight > 1.0 {
882 SelectionReason::HighestWeightedPriority
883 } else {
884 SelectionReason::ShortestRemaining
885 }
886 } else {
887 SelectionReason::QueueEmpty
888 };
889
890 let jobs = candidates
891 .iter()
892 .map(|pj| {
893 let age_ms = (self.current_time - pj.job.arrival_time).max(0.0);
894 JobEvidence {
895 job_id: pj.job.id,
896 name: pj.job.name.clone(),
897 estimate_ms: pj.job.remaining_time,
898 weight: pj.job.weight,
899 ratio: pj.base_ratio,
900 age_ms,
901 effective_priority: pj.priority,
902 estimate_source: pj.job.estimate_source,
903 weight_source: pj.job.weight_source,
904 }
905 })
906 .collect();
907
908 SchedulingEvidence {
909 current_time: self.current_time,
910 selected_job_id,
911 queue_length: self.queue.len() + if self.current_job.is_some() { 1 } else { 0 },
912 mean_wait_time: mean_wait,
913 max_wait_time: max_wait,
914 reason,
915 tie_break_reason,
916 jobs,
917 }
918 }
919
920 pub fn stats(&self) -> SchedulerStats {
922 let mut stats = self.stats.clone();
923 stats.queue_length = self.queue.len() + if self.current_job.is_some() { 1 } else { 0 };
924 stats
925 }
926
927 pub fn cancel(&mut self, job_id: u64) -> bool {
929 if let Some(ref j) = self.current_job
931 && j.id == job_id
932 {
933 self.current_job = None;
934 self.stats.queue_length = self.queue.len();
935 return true;
936 }
937
938 let old_len = self.queue.len();
940 let jobs: Vec<_> = self
941 .queue
942 .drain()
943 .filter(|pj| pj.job.id != job_id)
944 .collect();
945 self.queue = jobs.into_iter().collect();
946
947 self.stats.queue_length = self.queue.len();
948 old_len != self.queue.len()
949 }
950
951 pub fn clear(&mut self) {
953 self.queue.clear();
954 self.current_job = None;
955 self.stats.queue_length = 0;
956 }
957
958 pub fn reset(&mut self) {
960 self.queue.clear();
961 self.current_job = None;
962 self.current_time = 0.0;
963 self.next_job_id = 1;
964 self.next_arrival_seq = 1;
965 self.stats = SchedulerStats::default();
966 }
967
968 fn normalize_weight(&self, weight: f64) -> f64 {
972 if weight.is_nan() {
973 return self.config.w_min;
974 }
975 if weight.is_infinite() {
976 return if weight.is_sign_positive() {
977 self.config.w_max
978 } else {
979 self.config.w_min
980 };
981 }
982 weight.clamp(self.config.w_min, self.config.w_max)
983 }
984
985 fn normalize_time(&self, estimate_ms: f64) -> f64 {
987 if estimate_ms.is_nan() {
988 return self.config.p_max_ms;
989 }
990 if estimate_ms.is_infinite() {
991 return if estimate_ms.is_sign_positive() {
992 self.config.p_max_ms
993 } else {
994 self.config.p_min_ms
995 };
996 }
997 estimate_ms.clamp(self.config.p_min_ms, self.config.p_max_ms)
998 }
999
1000 fn normalize_weight_with_source(&self, weight: f64, source: WeightSource) -> f64 {
1002 let resolved = match source {
1003 WeightSource::Explicit => weight,
1004 WeightSource::Default => self.config.weight_default,
1005 WeightSource::Unknown => self.config.weight_unknown,
1006 };
1007 self.normalize_weight(resolved)
1008 }
1009
1010 fn normalize_time_with_source(&self, estimate_ms: f64, source: EstimateSource) -> f64 {
1012 let resolved = match source {
1013 EstimateSource::Explicit | EstimateSource::Historical => estimate_ms,
1014 EstimateSource::Default => self.config.estimate_default_ms,
1015 EstimateSource::Unknown => self.config.estimate_unknown_ms,
1016 };
1017 self.normalize_time(resolved)
1018 }
1019
1020 fn compute_base_ratio(&self, job: &Job) -> f64 {
1022 if self.config.mode() == SchedulingMode::Fifo {
1023 return 0.0;
1024 }
1025 let remaining = job.remaining_time.max(self.config.p_min_ms);
1026 let weight = match self.config.mode() {
1027 SchedulingMode::Smith => job.weight,
1028 SchedulingMode::Srpt => 1.0,
1029 SchedulingMode::Fifo => 0.0,
1030 };
1031 weight / remaining
1032 }
1033
1034 fn compute_priority(&self, job: &Job) -> f64 {
1036 if self.config.mode() == SchedulingMode::Fifo {
1037 return 0.0;
1038 }
1039 let base_ratio = self.compute_base_ratio(job);
1040 let wait_time = (self.current_time - job.arrival_time).max(0.0);
1041 let aging_boost = self.config.aging_factor * wait_time;
1042 let mut effective = base_ratio + aging_boost;
1043
1044 if self.config.wait_starve_ms > 0.0 && wait_time >= self.config.wait_starve_ms {
1045 effective = effective.max(base_ratio * self.config.starve_boost_ratio);
1046 }
1047
1048 effective
1049 }
1050
1051 fn make_priority_job(&self, job: Job) -> PriorityJob {
1053 let base_ratio = self.compute_base_ratio(&job);
1054 let priority = self.compute_priority(&job);
1055 PriorityJob {
1056 priority,
1057 base_ratio,
1058 job,
1059 mode: self.config.mode(),
1060 }
1061 }
1062
1063 fn tie_break_reason(&self, a: &PriorityJob, b: &PriorityJob) -> TieBreakReason {
1065 if self.config.mode() == SchedulingMode::Fifo {
1066 if a.job.arrival_seq != b.job.arrival_seq {
1067 return TieBreakReason::ArrivalSeq;
1068 }
1069 return TieBreakReason::JobId;
1070 }
1071 if a.priority.total_cmp(&b.priority) != Ordering::Equal {
1072 TieBreakReason::EffectivePriority
1073 } else if a.base_ratio.total_cmp(&b.base_ratio) != Ordering::Equal {
1074 TieBreakReason::BaseRatio
1075 } else if a.job.weight.total_cmp(&b.job.weight) != Ordering::Equal {
1076 TieBreakReason::Weight
1077 } else if a.job.remaining_time.total_cmp(&b.job.remaining_time) != Ordering::Equal {
1078 TieBreakReason::RemainingTime
1079 } else if a.job.arrival_seq != b.job.arrival_seq {
1080 TieBreakReason::ArrivalSeq
1081 } else {
1082 TieBreakReason::JobId
1083 }
1084 }
1085
1086 fn maybe_preempt(&mut self) {
1088 if self.config.mode() == SchedulingMode::Fifo {
1089 return;
1090 }
1091 if let Some(ref current) = self.current_job
1092 && let Some(pj) = self.queue.peek()
1093 {
1094 let current_pj = self.make_priority_job(current.clone());
1095 if pj.cmp(¤t_pj) == Ordering::Greater {
1096 let old = self.current_job.take().unwrap();
1098 let priority_job = self.make_priority_job(old);
1099 self.queue.push(priority_job);
1100 self.stats.total_preemptions += 1;
1101 }
1102 }
1103 }
1104
1105 fn refresh_priorities(&mut self) {
1107 let jobs: Vec<_> = self.queue.drain().map(|pj| pj.job).collect();
1108 for job in jobs {
1109 let priority_job = self.make_priority_job(job);
1110 self.queue.push(priority_job);
1111 }
1112 }
1113
1114 fn compute_wait_stats(&self) -> (f64, f64) {
1116 let mut total_wait = 0.0;
1117 let mut max_wait = 0.0f64;
1118 let mut count = 0;
1119
1120 for pj in self.queue.iter() {
1121 let wait = (self.current_time - pj.job.arrival_time).max(0.0);
1122 total_wait += wait;
1123 max_wait = max_wait.max(wait);
1124 count += 1;
1125 }
1126
1127 if let Some(ref j) = self.current_job {
1128 let wait = (self.current_time - j.arrival_time).max(0.0);
1129 total_wait += wait;
1130 max_wait = max_wait.max(wait);
1131 count += 1;
1132 }
1133
1134 let mean = if count > 0 {
1135 total_wait / count as f64
1136 } else {
1137 0.0
1138 };
1139 (mean, max_wait)
1140 }
1141}
1142
1143#[cfg(test)]
1148mod tests {
1149 use super::*;
1150 use std::collections::HashMap;
1151
1152 fn test_config() -> SchedulerConfig {
1153 SchedulerConfig {
1154 aging_factor: 0.001,
1155 p_min_ms: DEFAULT_P_MIN_MS,
1156 p_max_ms: DEFAULT_P_MAX_MS,
1157 estimate_default_ms: DEFAULT_ESTIMATE_DEFAULT_MS,
1158 estimate_unknown_ms: DEFAULT_ESTIMATE_UNKNOWN_MS,
1159 w_min: DEFAULT_W_MIN,
1160 w_max: DEFAULT_W_MAX,
1161 weight_default: DEFAULT_WEIGHT_DEFAULT,
1162 weight_unknown: DEFAULT_WEIGHT_UNKNOWN,
1163 wait_starve_ms: DEFAULT_WAIT_STARVE_MS,
1164 starve_boost_ratio: DEFAULT_STARVE_BOOST_RATIO,
1165 smith_enabled: true,
1166 force_fifo: false,
1167 max_queue_size: 100,
1168 preemptive: true,
1169 time_quantum: 10.0,
1170 enable_logging: false,
1171 }
1172 }
1173
1174 #[derive(Clone, Copy, Debug)]
1175 struct WorkloadJob {
1176 arrival: u64,
1177 weight: f64,
1178 duration: f64,
1179 }
1180
1181 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
1182 enum SimPolicy {
1183 Smith,
1184 Fifo,
1185 }
1186
1187 #[derive(Debug)]
1188 struct SimulationMetrics {
1189 mean: f64,
1190 p95: f64,
1191 p99: f64,
1192 max: f64,
1193 job_count: usize,
1194 completion_order: Vec<u64>,
1195 }
1196
1197 fn mixed_workload() -> Vec<WorkloadJob> {
1198 let mut jobs = Vec::new();
1199 jobs.push(WorkloadJob {
1200 arrival: 0,
1201 weight: 1.0,
1202 duration: 100.0,
1203 });
1204 for t in 1..=200u64 {
1205 jobs.push(WorkloadJob {
1206 arrival: t,
1207 weight: 1.0,
1208 duration: 1.0,
1209 });
1210 }
1211 jobs
1212 }
1213
1214 fn percentile(sorted: &[f64], p: f64) -> f64 {
1215 if sorted.is_empty() {
1216 return 0.0;
1217 }
1218 let idx = ((sorted.len() as f64 - 1.0) * p).ceil() as usize;
1219 sorted[idx.min(sorted.len() - 1)]
1220 }
1221
1222 fn summary_json(policy: SimPolicy, metrics: &SimulationMetrics) -> String {
1223 let policy = match policy {
1224 SimPolicy::Smith => "Smith",
1225 SimPolicy::Fifo => "Fifo",
1226 };
1227 let head: Vec<String> = metrics
1228 .completion_order
1229 .iter()
1230 .take(8)
1231 .map(|id| id.to_string())
1232 .collect();
1233 let tail: Vec<String> = metrics
1234 .completion_order
1235 .iter()
1236 .rev()
1237 .take(3)
1238 .collect::<Vec<_>>()
1239 .into_iter()
1240 .rev()
1241 .map(|id| id.to_string())
1242 .collect();
1243 format!(
1244 "{{\"policy\":\"{policy}\",\"jobs\":{jobs},\"mean\":{mean:.3},\"p95\":{p95:.3},\"p99\":{p99:.3},\"max\":{max:.3},\"order_head\":[{head}],\"order_tail\":[{tail}]}}",
1245 policy = policy,
1246 jobs = metrics.job_count,
1247 mean = metrics.mean,
1248 p95 = metrics.p95,
1249 p99 = metrics.p99,
1250 max = metrics.max,
1251 head = head.join(","),
1252 tail = tail.join(",")
1253 )
1254 }
1255
1256 fn workload_summary_json(workload: &[WorkloadJob]) -> String {
1257 if workload.is_empty() {
1258 return "{\"workload\":\"empty\"}".to_string();
1259 }
1260 let mut min_arrival = u64::MAX;
1261 let mut max_arrival = 0u64;
1262 let mut min_duration = f64::INFINITY;
1263 let mut max_duration: f64 = 0.0;
1264 let mut total_work: f64 = 0.0;
1265 let mut long_jobs = 0usize;
1266 let long_threshold = 10.0;
1267
1268 for job in workload {
1269 min_arrival = min_arrival.min(job.arrival);
1270 max_arrival = max_arrival.max(job.arrival);
1271 min_duration = min_duration.min(job.duration);
1272 max_duration = max_duration.max(job.duration);
1273 total_work += job.duration;
1274 if job.duration >= long_threshold {
1275 long_jobs += 1;
1276 }
1277 }
1278
1279 format!(
1280 "{{\"workload\":\"mixed\",\"jobs\":{jobs},\"arrival_min\":{arrival_min},\"arrival_max\":{arrival_max},\"duration_min\":{duration_min:.3},\"duration_max\":{duration_max:.3},\"total_work\":{total_work:.3},\"long_jobs\":{long_jobs},\"long_threshold\":{long_threshold:.1}}}",
1281 jobs = workload.len(),
1282 arrival_min = min_arrival,
1283 arrival_max = max_arrival,
1284 duration_min = min_duration,
1285 duration_max = max_duration,
1286 total_work = total_work,
1287 long_jobs = long_jobs,
1288 long_threshold = long_threshold
1289 )
1290 }
1291
1292 fn simulate_policy(policy: SimPolicy, workload: &[WorkloadJob]) -> SimulationMetrics {
1293 let mut config = test_config();
1294 config.aging_factor = 0.0;
1295 config.wait_starve_ms = 0.0;
1296 config.starve_boost_ratio = 1.0;
1297 config.smith_enabled = policy == SimPolicy::Smith;
1298 config.force_fifo = policy == SimPolicy::Fifo;
1299 config.preemptive = true;
1300
1301 let mut scheduler = QueueingScheduler::new(config);
1302 let mut arrivals = workload.to_vec();
1303 arrivals.sort_by_key(|job| job.arrival);
1304
1305 let mut arrival_times: HashMap<u64, f64> = HashMap::new();
1306 let mut response_times = Vec::with_capacity(arrivals.len());
1307 let mut completion_order = Vec::with_capacity(arrivals.len());
1308
1309 let mut idx = 0usize;
1310 let mut safety = 0usize;
1311
1312 while (idx < arrivals.len() || scheduler.peek_next().is_some()) && safety < 10_000 {
1313 let now = scheduler.current_time;
1314
1315 while idx < arrivals.len() && (arrivals[idx].arrival as f64) <= now + f64::EPSILON {
1316 let job = arrivals[idx];
1317 let id = scheduler
1318 .submit(job.weight, job.duration)
1319 .expect("queue capacity should not be exceeded");
1320 arrival_times.insert(id, scheduler.current_time);
1321 idx += 1;
1322 }
1323
1324 if scheduler.peek_next().is_none() {
1325 if idx < arrivals.len() {
1326 let next_time = arrivals[idx].arrival as f64;
1327 let delta = (next_time - scheduler.current_time).max(0.0);
1328 let completed = scheduler.tick(delta);
1329 for id in completed {
1330 let arrival = arrival_times.get(&id).copied().unwrap_or(0.0);
1331 response_times.push(scheduler.current_time - arrival);
1332 completion_order.push(id);
1333 }
1334 }
1335 safety += 1;
1336 continue;
1337 }
1338
1339 let completed = scheduler.tick(1.0);
1340 for id in completed {
1341 let arrival = arrival_times.get(&id).copied().unwrap_or(0.0);
1342 response_times.push(scheduler.current_time - arrival);
1343 completion_order.push(id);
1344 }
1345 safety += 1;
1346 }
1347
1348 assert_eq!(
1349 response_times.len(),
1350 arrivals.len(),
1351 "simulation did not complete all jobs"
1352 );
1353
1354 let mut sorted = response_times.clone();
1355 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
1356
1357 let mean = response_times.iter().sum::<f64>() / response_times.len() as f64;
1358 let p95 = percentile(&sorted, 0.95);
1359 let p99 = percentile(&sorted, 0.99);
1360 let max = *sorted.last().unwrap_or(&0.0);
1361
1362 SimulationMetrics {
1363 mean,
1364 p95,
1365 p99,
1366 max,
1367 job_count: response_times.len(),
1368 completion_order,
1369 }
1370 }
1371
1372 #[test]
1377 fn new_creates_empty_scheduler() {
1378 let scheduler = QueueingScheduler::new(test_config());
1379 assert_eq!(scheduler.stats().queue_length, 0);
1380 assert!(scheduler.peek_next().is_none());
1381 }
1382
1383 #[test]
1384 fn default_config_valid() {
1385 let config = SchedulerConfig::default();
1386 let scheduler = QueueingScheduler::new(config);
1387 assert_eq!(scheduler.stats().queue_length, 0);
1388 }
1389
1390 #[test]
1395 fn submit_returns_job_id() {
1396 let mut scheduler = QueueingScheduler::new(test_config());
1397 let id = scheduler.submit(1.0, 10.0);
1398 assert_eq!(id, Some(1));
1399 }
1400
1401 #[test]
1402 fn submit_increments_job_id() {
1403 let mut scheduler = QueueingScheduler::new(test_config());
1404 let id1 = scheduler.submit(1.0, 10.0);
1405 let id2 = scheduler.submit(1.0, 10.0);
1406 assert_eq!(id1, Some(1));
1407 assert_eq!(id2, Some(2));
1408 }
1409
1410 #[test]
1411 fn submit_rejects_when_queue_full() {
1412 let mut config = test_config();
1413 config.max_queue_size = 2;
1414 let mut scheduler = QueueingScheduler::new(config);
1415
1416 assert!(scheduler.submit(1.0, 10.0).is_some());
1417 assert!(scheduler.submit(1.0, 10.0).is_some());
1418 assert!(scheduler.submit(1.0, 10.0).is_none()); assert_eq!(scheduler.stats().total_rejected, 1);
1420 }
1421
1422 #[test]
1423 fn submit_named_job() {
1424 let mut scheduler = QueueingScheduler::new(test_config());
1425 let id = scheduler.submit_named(1.0, 10.0, Some("test-job"));
1426 assert!(id.is_some());
1427 }
1428
1429 #[test]
1434 fn srpt_prefers_shorter_jobs() {
1435 let mut scheduler = QueueingScheduler::new(test_config());
1436
1437 scheduler.submit(1.0, 100.0); scheduler.submit(1.0, 10.0); let next = scheduler.peek_next().unwrap();
1441 assert_eq!(next.remaining_time, 10.0); }
1443
1444 #[test]
1445 fn smith_rule_prefers_high_weight() {
1446 let mut scheduler = QueueingScheduler::new(test_config());
1447
1448 scheduler.submit(1.0, 10.0); scheduler.submit(10.0, 10.0); let next = scheduler.peek_next().unwrap();
1452 assert_eq!(next.weight, 10.0); }
1454
1455 #[test]
1456 fn smith_rule_balances_weight_and_time() {
1457 let mut scheduler = QueueingScheduler::new(test_config());
1458
1459 scheduler.submit(2.0, 20.0); scheduler.submit(1.0, 5.0); let next = scheduler.peek_next().unwrap();
1463 assert_eq!(next.remaining_time, 5.0); }
1465
1466 #[test]
1471 fn aging_increases_priority_over_time() {
1472 let mut scheduler = QueueingScheduler::new(test_config());
1473
1474 scheduler.submit(1.0, 100.0); scheduler.tick(0.0); let before_aging = scheduler.compute_priority(scheduler.peek_next().unwrap());
1478
1479 scheduler.current_time = 100.0; scheduler.refresh_priorities();
1481
1482 let after_aging = scheduler.compute_priority(scheduler.peek_next().unwrap());
1483 assert!(
1484 after_aging > before_aging,
1485 "Priority should increase with wait time"
1486 );
1487 }
1488
1489 #[test]
1490 fn aging_prevents_starvation() {
1491 let mut config = test_config();
1492 config.aging_factor = 1.0; let mut scheduler = QueueingScheduler::new(config);
1494
1495 scheduler.submit(1.0, 1000.0); scheduler.submit(1.0, 1.0); assert_eq!(scheduler.peek_next().unwrap().remaining_time, 1.0);
1500
1501 let completed = scheduler.tick(1.0);
1503 assert_eq!(completed.len(), 1);
1504
1505 assert!(scheduler.peek_next().is_some());
1506 }
1507
1508 #[test]
1513 fn preemption_when_higher_priority_arrives() {
1514 let mut scheduler = QueueingScheduler::new(test_config());
1515
1516 scheduler.submit(1.0, 100.0); scheduler.tick(10.0); let before = scheduler.peek_next().unwrap().remaining_time;
1520 assert_eq!(before, 90.0);
1521
1522 scheduler.submit(1.0, 5.0); let next = scheduler.peek_next().unwrap();
1526 assert_eq!(next.remaining_time, 5.0);
1527
1528 assert_eq!(scheduler.stats().total_preemptions, 1);
1530 }
1531
1532 #[test]
1533 fn no_preemption_when_disabled() {
1534 let mut config = test_config();
1535 config.preemptive = false;
1536 let mut scheduler = QueueingScheduler::new(config);
1537
1538 scheduler.submit(1.0, 100.0);
1539 scheduler.tick(10.0);
1540
1541 scheduler.submit(1.0, 5.0); let next = scheduler.peek_next().unwrap();
1545 assert_eq!(next.remaining_time, 90.0);
1546 }
1547
1548 #[test]
1553 fn tick_processes_jobs() {
1554 let mut scheduler = QueueingScheduler::new(test_config());
1555
1556 scheduler.submit(1.0, 10.0);
1557 let completed = scheduler.tick(5.0);
1558
1559 assert!(completed.is_empty()); assert_eq!(scheduler.peek_next().unwrap().remaining_time, 5.0);
1561 }
1562
1563 #[test]
1564 fn tick_completes_jobs() {
1565 let mut scheduler = QueueingScheduler::new(test_config());
1566
1567 scheduler.submit(1.0, 10.0);
1568 let completed = scheduler.tick(10.0);
1569
1570 assert_eq!(completed.len(), 1);
1571 assert_eq!(completed[0], 1);
1572 assert!(scheduler.peek_next().is_none());
1573 }
1574
1575 #[test]
1576 fn tick_completes_multiple_jobs() {
1577 let mut scheduler = QueueingScheduler::new(test_config());
1578
1579 scheduler.submit(1.0, 5.0);
1580 scheduler.submit(1.0, 5.0);
1581 let completed = scheduler.tick(10.0);
1582
1583 assert_eq!(completed.len(), 2);
1584 }
1585
1586 #[test]
1587 fn tick_handles_zero_delta() {
1588 let mut scheduler = QueueingScheduler::new(test_config());
1589 scheduler.submit(1.0, 10.0);
1590 let completed = scheduler.tick(0.0);
1591 assert!(completed.is_empty());
1592 }
1593
1594 #[test]
1599 fn stats_track_submissions() {
1600 let mut scheduler = QueueingScheduler::new(test_config());
1601
1602 scheduler.submit(1.0, 10.0);
1603 scheduler.submit(1.0, 10.0);
1604
1605 let stats = scheduler.stats();
1606 assert_eq!(stats.total_submitted, 2);
1607 assert_eq!(stats.queue_length, 2);
1608 }
1609
1610 #[test]
1611 fn stats_track_completions() {
1612 let mut scheduler = QueueingScheduler::new(test_config());
1613
1614 scheduler.submit(1.0, 10.0);
1615 scheduler.tick(10.0);
1616
1617 let stats = scheduler.stats();
1618 assert_eq!(stats.total_completed, 1);
1619 }
1620
1621 #[test]
1622 fn stats_compute_mean_response_time() {
1623 let mut scheduler = QueueingScheduler::new(test_config());
1624
1625 scheduler.submit(1.0, 10.0);
1626 scheduler.submit(1.0, 10.0);
1627 scheduler.tick(20.0);
1628
1629 let stats = scheduler.stats();
1630 assert_eq!(stats.total_completed, 2);
1633 assert!(stats.mean_response_time() > 0.0);
1634 }
1635
1636 #[test]
1637 fn stats_compute_throughput() {
1638 let mut scheduler = QueueingScheduler::new(test_config());
1639
1640 scheduler.submit(1.0, 10.0);
1641 scheduler.tick(10.0);
1642
1643 let stats = scheduler.stats();
1644 assert!((stats.throughput() - 0.1).abs() < 0.01);
1646 }
1647
1648 #[test]
1653 fn evidence_reports_queue_empty() {
1654 let scheduler = QueueingScheduler::new(test_config());
1655 let evidence = scheduler.evidence();
1656 assert_eq!(evidence.reason, SelectionReason::QueueEmpty);
1657 assert!(evidence.selected_job_id.is_none());
1658 assert!(evidence.tie_break_reason.is_none());
1659 assert!(evidence.jobs.is_empty());
1660 }
1661
1662 #[test]
1663 fn evidence_reports_selected_job() {
1664 let mut scheduler = QueueingScheduler::new(test_config());
1665 scheduler.submit(1.0, 10.0);
1666 let evidence = scheduler.evidence();
1667 assert_eq!(evidence.selected_job_id, Some(1));
1668 assert_eq!(evidence.jobs.len(), 1);
1669 }
1670
1671 #[test]
1672 fn evidence_reports_wait_stats() {
1673 let mut scheduler = QueueingScheduler::new(test_config());
1674 scheduler.submit(1.0, 100.0);
1675 scheduler.submit(1.0, 100.0);
1676 scheduler.current_time = 50.0;
1677 scheduler.refresh_priorities();
1678
1679 let evidence = scheduler.evidence();
1680 assert!(evidence.mean_wait_time > 0.0);
1681 assert!(evidence.max_wait_time > 0.0);
1682 }
1683
1684 #[test]
1689 fn force_fifo_overrides_priority() {
1690 let mut config = test_config();
1691 config.force_fifo = true;
1692 let mut scheduler = QueueingScheduler::new(config);
1693
1694 let first = scheduler.submit(1.0, 100.0).unwrap();
1695 let second = scheduler.submit(10.0, 1.0).unwrap();
1696
1697 let next = scheduler.peek_next().unwrap();
1698 assert_eq!(next.id, first);
1699 assert_ne!(next.id, second);
1700 assert_eq!(scheduler.evidence().reason, SelectionReason::Fifo);
1701 }
1702
1703 #[test]
1704 fn default_sources_use_config_values() {
1705 let mut config = test_config();
1706 config.weight_default = 7.0;
1707 config.estimate_default_ms = 12.0;
1708 let mut scheduler = QueueingScheduler::new(config);
1709
1710 scheduler.submit_with_sources(
1711 999.0,
1712 999.0,
1713 WeightSource::Default,
1714 EstimateSource::Default,
1715 None::<&str>,
1716 );
1717
1718 let next = scheduler.peek_next().unwrap();
1719 assert!((next.weight - 7.0).abs() < f64::EPSILON);
1720 assert!((next.remaining_time - 12.0).abs() < f64::EPSILON);
1721 }
1722
1723 #[test]
1724 fn unknown_sources_use_config_values() {
1725 let mut config = test_config();
1726 config.weight_unknown = 2.5;
1727 config.estimate_unknown_ms = 250.0;
1728 let mut scheduler = QueueingScheduler::new(config);
1729
1730 scheduler.submit_with_sources(
1731 0.0,
1732 0.0,
1733 WeightSource::Unknown,
1734 EstimateSource::Unknown,
1735 None::<&str>,
1736 );
1737
1738 let next = scheduler.peek_next().unwrap();
1739 assert!((next.weight - 2.5).abs() < f64::EPSILON);
1740 assert!((next.remaining_time - 250.0).abs() < f64::EPSILON);
1741 }
1742
1743 #[test]
1748 fn tie_break_prefers_base_ratio_when_effective_equal() {
1749 let mut config = test_config();
1750 config.aging_factor = 0.1;
1751 let mut scheduler = QueueingScheduler::new(config);
1752
1753 let id_a = scheduler.submit(1.0, 2.0).unwrap(); scheduler.current_time = 5.0;
1756 scheduler.refresh_priorities();
1757
1758 let id_b = scheduler.submit(1.0, 1.0).unwrap(); scheduler.refresh_priorities();
1761
1762 let next = scheduler.peek_next().unwrap();
1763 assert_eq!(next.id, id_b);
1764
1765 let evidence = scheduler.evidence();
1766 assert_eq!(evidence.selected_job_id, Some(id_b));
1767 assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::BaseRatio));
1768 assert_ne!(id_a, id_b);
1769 }
1770
1771 #[test]
1772 fn tie_break_prefers_weight_over_arrival() {
1773 let mut scheduler = QueueingScheduler::new(test_config());
1774
1775 let high_weight = scheduler.submit(2.0, 2.0).unwrap(); let _low_weight = scheduler.submit(1.0, 1.0).unwrap(); let evidence = scheduler.evidence();
1779 assert_eq!(evidence.selected_job_id, Some(high_weight));
1780 assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::Weight));
1781 }
1782
1783 #[test]
1784 fn tie_break_prefers_arrival_seq_when_all_equal() {
1785 let mut config = test_config();
1786 config.aging_factor = 0.0;
1787 let mut scheduler = QueueingScheduler::new(config);
1788
1789 let first = scheduler.submit(1.0, 10.0).unwrap();
1790 let second = scheduler.submit(1.0, 10.0).unwrap();
1791
1792 let evidence = scheduler.evidence();
1793 assert_eq!(evidence.selected_job_id, Some(first));
1794 assert_eq!(evidence.tie_break_reason, Some(TieBreakReason::ArrivalSeq));
1795 assert_ne!(first, second);
1796 }
1797
1798 #[test]
1803 fn srpt_mode_ignores_weights() {
1804 let mut config = test_config();
1805 config.smith_enabled = false;
1806 let mut scheduler = QueueingScheduler::new(config);
1807
1808 scheduler.submit(10.0, 100.0); scheduler.submit(1.0, 10.0); let next = scheduler.peek_next().unwrap();
1812 assert_eq!(next.remaining_time, 10.0);
1813 assert_eq!(
1814 scheduler.evidence().reason,
1815 SelectionReason::ShortestRemaining
1816 );
1817 }
1818
1819 #[test]
1820 fn fifo_mode_disables_preemption() {
1821 let mut config = test_config();
1822 config.force_fifo = true;
1823 config.preemptive = true;
1824 let mut scheduler = QueueingScheduler::new(config);
1825
1826 let first = scheduler.submit(1.0, 100.0).unwrap();
1827 scheduler.tick(10.0);
1828
1829 let _later = scheduler.submit(10.0, 1.0).unwrap();
1830 let next = scheduler.peek_next().unwrap();
1831 assert_eq!(next.id, first);
1832 }
1833
1834 #[test]
1835 fn explicit_zero_weight_clamps_to_min() {
1836 let mut config = test_config();
1837 config.w_min = 0.5;
1838 let mut scheduler = QueueingScheduler::new(config);
1839
1840 scheduler.submit_with_sources(
1841 0.0,
1842 1.0,
1843 WeightSource::Explicit,
1844 EstimateSource::Explicit,
1845 None::<&str>,
1846 );
1847
1848 let next = scheduler.peek_next().unwrap();
1849 assert!((next.weight - 0.5).abs() < f64::EPSILON);
1850 }
1851
1852 #[test]
1853 fn explicit_zero_estimate_clamps_to_min() {
1854 let mut config = test_config();
1855 config.p_min_ms = 2.0;
1856 let mut scheduler = QueueingScheduler::new(config);
1857
1858 scheduler.submit_with_sources(
1859 1.0,
1860 0.0,
1861 WeightSource::Explicit,
1862 EstimateSource::Explicit,
1863 None::<&str>,
1864 );
1865
1866 let next = scheduler.peek_next().unwrap();
1867 assert!((next.remaining_time - 2.0).abs() < f64::EPSILON);
1868 }
1869
1870 #[test]
1875 fn cancel_removes_job() {
1876 let mut scheduler = QueueingScheduler::new(test_config());
1877 let id = scheduler.submit(1.0, 10.0).unwrap();
1878
1879 assert!(scheduler.cancel(id));
1880 assert!(scheduler.peek_next().is_none());
1881 }
1882
1883 #[test]
1884 fn cancel_returns_false_for_nonexistent() {
1885 let mut scheduler = QueueingScheduler::new(test_config());
1886 assert!(!scheduler.cancel(999));
1887 }
1888
1889 #[test]
1894 fn reset_clears_all_state() {
1895 let mut scheduler = QueueingScheduler::new(test_config());
1896
1897 scheduler.submit(1.0, 10.0);
1898 scheduler.tick(5.0);
1899
1900 scheduler.reset();
1901
1902 assert!(scheduler.peek_next().is_none());
1903 assert_eq!(scheduler.stats().total_submitted, 0);
1904 assert_eq!(scheduler.stats().total_completed, 0);
1905 }
1906
1907 #[test]
1908 fn clear_removes_jobs_but_keeps_stats() {
1909 let mut scheduler = QueueingScheduler::new(test_config());
1910
1911 scheduler.submit(1.0, 10.0);
1912 scheduler.clear();
1913
1914 assert!(scheduler.peek_next().is_none());
1915 assert_eq!(scheduler.stats().total_submitted, 1); }
1917
1918 #[test]
1923 fn job_progress_increases() {
1924 let mut job = Job::new(1, 1.0, 100.0);
1925 assert_eq!(job.progress(), 0.0);
1926
1927 job.remaining_time = 50.0;
1928 assert!((job.progress() - 0.5).abs() < 0.01);
1929
1930 job.remaining_time = 0.0;
1931 assert_eq!(job.progress(), 1.0);
1932 }
1933
1934 #[test]
1935 fn job_is_complete() {
1936 let mut job = Job::new(1, 1.0, 10.0);
1937 assert!(!job.is_complete());
1938
1939 job.remaining_time = 0.0;
1940 assert!(job.is_complete());
1941 }
1942
1943 #[test]
1948 fn property_work_conserving() {
1949 let mut scheduler = QueueingScheduler::new(test_config());
1950
1951 for i in 0..10 {
1953 scheduler.submit(1.0, (i as f64) + 1.0);
1954 }
1955
1956 let mut total_processed = 0;
1958 while scheduler.peek_next().is_some() {
1959 let completed = scheduler.tick(1.0);
1960 total_processed += completed.len();
1961 }
1962
1963 assert_eq!(total_processed, 10);
1964 }
1965
1966 #[test]
1967 fn property_bounded_memory() {
1968 let mut config = test_config();
1969 config.max_queue_size = 100;
1970 let mut scheduler = QueueingScheduler::new(config);
1971
1972 for _ in 0..1000 {
1974 scheduler.submit(1.0, 10.0);
1975 }
1976
1977 assert!(scheduler.stats().queue_length <= 100);
1978 }
1979
1980 #[test]
1981 fn property_deterministic() {
1982 let run = || {
1983 let mut scheduler = QueueingScheduler::new(test_config());
1984 let mut completions = Vec::new();
1985
1986 for i in 0..20 {
1987 scheduler.submit(((i % 3) + 1) as f64, ((i % 5) + 1) as f64);
1988 }
1989
1990 for _ in 0..50 {
1991 completions.extend(scheduler.tick(1.0));
1992 }
1993
1994 completions
1995 };
1996
1997 let run1 = run();
1998 let run2 = run();
1999
2000 assert_eq!(run1, run2, "Scheduling should be deterministic");
2001 }
2002
2003 #[test]
2004 fn smith_beats_fifo_on_mixed_workload() {
2005 let workload = mixed_workload();
2006 let smith = simulate_policy(SimPolicy::Smith, &workload);
2007 let fifo = simulate_policy(SimPolicy::Fifo, &workload);
2008
2009 eprintln!("{}", workload_summary_json(&workload));
2010 eprintln!("{}", summary_json(SimPolicy::Smith, &smith));
2011 eprintln!("{}", summary_json(SimPolicy::Fifo, &fifo));
2012
2013 assert!(
2014 smith.mean < fifo.mean,
2015 "mean should improve: smith={} fifo={}",
2016 summary_json(SimPolicy::Smith, &smith),
2017 summary_json(SimPolicy::Fifo, &fifo)
2018 );
2019 assert!(
2020 smith.p95 < fifo.p95,
2021 "p95 should improve: smith={} fifo={}",
2022 summary_json(SimPolicy::Smith, &smith),
2023 summary_json(SimPolicy::Fifo, &fifo)
2024 );
2025 assert!(
2026 smith.p99 < fifo.p99,
2027 "p99 should improve: smith={} fifo={}",
2028 summary_json(SimPolicy::Smith, &smith),
2029 summary_json(SimPolicy::Fifo, &fifo)
2030 );
2031 }
2032
2033 #[test]
2034 fn simulation_is_deterministic_per_policy() {
2035 let workload = mixed_workload();
2036 let smith1 = simulate_policy(SimPolicy::Smith, &workload);
2037 let smith2 = simulate_policy(SimPolicy::Smith, &workload);
2038 let fifo1 = simulate_policy(SimPolicy::Fifo, &workload);
2039 let fifo2 = simulate_policy(SimPolicy::Fifo, &workload);
2040
2041 assert_eq!(smith1.completion_order, smith2.completion_order);
2042 assert_eq!(fifo1.completion_order, fifo2.completion_order);
2043 assert!((smith1.mean - smith2.mean).abs() < 1e-9);
2044 assert!((fifo1.mean - fifo2.mean).abs() < 1e-9);
2045 }
2046
2047 #[test]
2048 fn effect_queue_trace_is_deterministic() {
2049 let mut config = test_config();
2050 config.preemptive = false;
2051 config.aging_factor = 0.0;
2052 config.wait_starve_ms = 0.0;
2053 config.force_fifo = false;
2054 config.smith_enabled = true;
2055
2056 let mut scheduler = QueueingScheduler::new(config);
2057 let id_alpha = scheduler
2058 .submit_with_sources(
2059 1.0,
2060 8.0,
2061 WeightSource::Explicit,
2062 EstimateSource::Explicit,
2063 Some("alpha"),
2064 )
2065 .expect("alpha accepted");
2066 let id_beta = scheduler
2067 .submit_with_sources(
2068 4.0,
2069 2.0,
2070 WeightSource::Explicit,
2071 EstimateSource::Explicit,
2072 Some("beta"),
2073 )
2074 .expect("beta accepted");
2075 let id_gamma = scheduler
2076 .submit_with_sources(
2077 2.0,
2078 10.0,
2079 WeightSource::Explicit,
2080 EstimateSource::Explicit,
2081 Some("gamma"),
2082 )
2083 .expect("gamma accepted");
2084 let id_delta = scheduler
2085 .submit_with_sources(
2086 3.0,
2087 3.0,
2088 WeightSource::Explicit,
2089 EstimateSource::Explicit,
2090 Some("delta"),
2091 )
2092 .expect("delta accepted");
2093
2094 scheduler.refresh_priorities();
2095
2096 let mut selected = Vec::new();
2097 while let Some(job) = scheduler.peek_next().cloned() {
2098 let evidence = scheduler.evidence();
2099 if let Some(id) = evidence.selected_job_id {
2100 selected.push(id);
2101 }
2102 println!("{}", evidence.to_jsonl("effect_queue_select"));
2103
2104 let completed = scheduler.tick(job.remaining_time);
2105 assert!(
2106 !completed.is_empty(),
2107 "expected completion per tick in non-preemptive mode"
2108 );
2109 }
2110
2111 assert_eq!(selected, vec![id_beta, id_delta, id_gamma, id_alpha]);
2112 }
2113
2114 #[test]
2119 fn zero_weight_handled() {
2120 let mut scheduler = QueueingScheduler::new(test_config());
2121 scheduler.submit(0.0, 10.0);
2122 assert!(scheduler.peek_next().is_some());
2123 }
2124
2125 #[test]
2126 fn zero_time_completes_immediately() {
2127 let mut scheduler = QueueingScheduler::new(test_config());
2128 scheduler.submit(1.0, 0.0);
2129 let completed = scheduler.tick(1.0);
2130 assert_eq!(completed.len(), 1);
2131 }
2132
2133 #[test]
2134 fn negative_time_handled() {
2135 let mut scheduler = QueueingScheduler::new(test_config());
2136 scheduler.submit(1.0, -10.0);
2137 let completed = scheduler.tick(1.0);
2138 assert_eq!(completed.len(), 1);
2139 }
2140}