1use sklears_core::error::{Result as SklResult, SklearsError};
8use std::collections::{BTreeMap, HashMap, VecDeque};
9use std::time::{Duration, SystemTime};
10
11use crate::execution_types::{ExecutionTask, TaskPriority};
12
13pub trait TaskScheduler: Send + Sync {
18 fn schedule_task(&mut self, task: ExecutionTask) -> SklResult<TaskHandle>;
26
27 fn schedule_batch(&mut self, tasks: Vec<ExecutionTask>) -> SklResult<Vec<TaskHandle>>;
35
36 fn cancel_task(&mut self, handle: TaskHandle) -> SklResult<()>;
44
45 fn get_status(&self) -> SchedulerStatus;
50
51 fn update_config(&mut self, config: SchedulerConfig) -> SklResult<()>;
59
60 fn shutdown_gracefully(&mut self) -> impl std::future::Future<Output = SklResult<()>> + Send;
65
66 fn get_next_task(&mut self) -> Option<(ExecutionTask, TaskHandle)>;
68
69 fn mark_task_completed(&mut self, handle: &TaskHandle) -> SklResult<()>;
71
72 fn mark_task_failed(&mut self, handle: &TaskHandle, error: String) -> SklResult<()>;
74}
75
76#[derive(Debug, Clone)]
81pub struct TaskHandle {
82 pub task_id: String,
84
85 pub scheduled_at: SystemTime,
87
88 pub estimated_duration: Option<Duration>,
90
91 pub priority: TaskPriority,
93
94 pub dependencies: Vec<String>,
96
97 pub state: TaskState,
99
100 pub queue_position: Option<usize>,
102
103 pub retry_count: usize,
105
106 pub last_updated: SystemTime,
108}
109
110#[derive(Debug, Clone, PartialEq)]
112pub enum TaskState {
113 Queued,
115 WaitingForDependencies,
117 Ready,
119 Running,
121 Completed,
123 Failed,
125 Cancelled,
127 TimedOut,
129}
130
131#[derive(Debug, Clone)]
136pub struct SchedulerConfig {
137 pub algorithm: SchedulingAlgorithm,
139
140 pub queue_management: QueueManagement,
142
143 pub priority_handling: PriorityHandling,
145
146 pub dependency_resolution: DependencyResolution,
148
149 pub load_balancing: LoadBalancingConfig,
151
152 pub performance_tuning: SchedulerPerformanceTuning,
154
155 pub monitoring: SchedulerMonitoringConfig,
157}
158
159#[derive(Debug, Clone)]
164pub enum SchedulingAlgorithm {
165 FIFO,
166
167 LIFO,
168
169 Priority,
170
171 ShortestJobFirst,
172
173 FairShare,
174
175 WorkConservingCFS,
176
177 MultiLevelFeedback {
178 levels: usize,
179 time_quantum: Duration,
180 aging_factor: f64,
181 },
182
183 DeadlineAware,
185
186 ResourceAware,
188
189 MLOptimized {
191 model_type: String,
192 learning_rate: f64,
193 },
194
195 Custom {
197 algorithm_name: String,
198 parameters: HashMap<String, String>,
199 },
200}
201
202#[derive(Debug, Clone)]
206pub struct QueueManagement {
207 pub max_queue_size: usize,
209
210 pub overflow_strategy: QueueOverflowStrategy,
212
213 pub persistence: QueuePersistence,
215
216 pub partitioning: QueuePartitioning,
218
219 pub compaction: QueueCompaction,
221
222 pub rebalancing: QueueRebalancing,
224}
225
226#[derive(Debug, Clone)]
228pub enum QueueOverflowStrategy {
229 Block,
231
232 Drop,
234
235 DropOldest,
237
238 DropLowestPriority,
240
241 Reject,
243
244 Spill { storage_path: String },
246
247 DynamicScale { max_scale_factor: f64 },
249}
250
251#[derive(Debug, Clone)]
253pub enum QueuePersistence {
254 Memory,
256
257 Disk {
259 path: String,
260 sync_interval: Duration,
261 },
262
263 Hybrid {
265 memory_limit: usize,
266 disk_path: String,
267 spill_threshold: f64,
268 },
269
270 Database {
272 connection_string: String,
273 table_name: String,
274 },
275}
276
277#[derive(Debug, Clone)]
279pub enum QueuePartitioning {
280 Single,
282
283 ByPriority,
285
286 ByTaskType,
288
289 ByResourceRequirements,
291
292 Custom { scheme_name: String },
294}
295
296#[derive(Debug, Clone)]
298pub struct QueueCompaction {
299 pub enabled: bool,
301
302 pub trigger_threshold: f64,
304
305 pub interval: Duration,
307
308 pub strategy: CompactionStrategy,
310}
311
312#[derive(Debug, Clone)]
314pub enum CompactionStrategy {
315 RemoveCompleted,
317
318 MergeSimilar,
320
321 OptimizeOrder,
323
324 Custom { strategy_name: String },
326}
327
328#[derive(Debug, Clone)]
330pub struct QueueRebalancing {
331 pub enabled: bool,
333
334 pub interval: Duration,
336
337 pub imbalance_threshold: f64,
339
340 pub strategy: RebalancingStrategy,
342}
343
344#[derive(Debug, Clone)]
346pub enum RebalancingStrategy {
347 RoundRobin,
349
350 LoadBased,
352
353 PriorityAware,
355
356 Custom { strategy_name: String },
358}
359
360#[derive(Debug, Clone)]
365pub struct PriorityHandling {
366 pub levels: Vec<TaskPriority>,
368
369 pub aging_strategy: AgingStrategy,
371
372 pub starvation_prevention: bool,
374
375 pub priority_inversion: PriorityInversionHandling,
377
378 pub dynamic_priority: DynamicPriorityConfig,
380}
381
382#[derive(Debug, Clone)]
384pub enum AgingStrategy {
385 None,
387
388 Linear {
390 increment_interval: Duration,
391 increment_amount: i32,
392 },
393
394 Exponential {
396 base: f64,
397 interval: Duration,
398 max_boost: i32,
399 },
400
401 Adaptive {
403 threshold: Duration,
404 boost_factor: f64,
405 },
406
407 Custom { algorithm_name: String },
409}
410
411#[derive(Debug, Clone)]
413pub struct PriorityInversionHandling {
414 pub priority_inheritance: bool,
416
417 pub priority_ceiling: bool,
419
420 pub detection_threshold: Duration,
422
423 pub resolution_strategy: PriorityInversionResolution,
425}
426
427#[derive(Debug, Clone)]
429pub enum PriorityInversionResolution {
430 PriorityBoost,
432
433 Preemption,
435
436 ResourceReallocation,
438
439 Custom { strategy_name: String },
441}
442
443#[derive(Debug, Clone)]
445pub struct DynamicPriorityConfig {
446 pub enabled: bool,
448
449 pub factors: PriorityAdjustmentFactors,
451
452 pub adjustment_interval: Duration,
454
455 pub max_adjustment: i32,
457}
458
459#[derive(Debug, Clone)]
461pub struct PriorityAdjustmentFactors {
462 pub wait_time_factor: f64,
464
465 pub resource_availability_factor: f64,
467
468 pub system_load_factor: f64,
470
471 pub deadline_proximity_factor: f64,
473
474 pub performance_factor: f64,
476}
477
478#[derive(Debug, Clone)]
483pub struct DependencyResolution {
484 pub enable_tracking: bool,
486
487 pub cycle_detection: bool,
489
490 pub deadlock_prevention: bool,
492
493 pub resolution_timeout: Duration,
495
496 pub graph_optimization: DependencyGraphOptimization,
498
499 pub caching: DependencyCaching,
501}
502
503#[derive(Debug, Clone)]
505pub struct DependencyGraphOptimization {
506 pub enabled: bool,
508
509 pub algorithms: Vec<GraphOptimizationAlgorithm>,
511
512 pub optimization_interval: Duration,
514}
515
516#[derive(Debug, Clone)]
518pub enum GraphOptimizationAlgorithm {
519 TopologicalSort,
521
522 CriticalPath,
524
525 ParallelExecution,
527
528 ResourceAware,
530
531 Custom { algorithm_name: String },
533}
534
535#[derive(Debug, Clone)]
537pub struct DependencyCaching {
538 pub enabled: bool,
540
541 pub cache_size: usize,
543
544 pub ttl: Duration,
546
547 pub eviction_strategy: CacheEvictionStrategy,
549}
550
551#[derive(Debug, Clone)]
553pub enum CacheEvictionStrategy {
554 LRU,
556 LFU,
558 FIFO,
560 TTL,
562 Custom { strategy_name: String },
564}
565
566#[derive(Debug, Clone)]
568pub struct LoadBalancingConfig {
569 pub algorithm: LoadBalancingAlgorithm,
571
572 pub rebalancing_frequency: Duration,
574
575 pub load_threshold: f64,
577
578 pub health_checks: LoadBalancerHealthChecks,
580
581 pub failover: LoadBalancerFailover,
583}
584
585#[derive(Debug, Clone)]
587pub enum LoadBalancingAlgorithm {
588 RoundRobin,
590
591 WeightedRoundRobin { weights: Vec<f64> },
593
594 LeastConnections,
596
597 LeastResponseTime,
599
600 ResourceBased,
602
603 PredictiveScaling { prediction_window: Duration },
605
606 Custom { algorithm_name: String },
608}
609
610#[derive(Debug, Clone)]
612pub struct LoadBalancerHealthChecks {
613 pub interval: Duration,
615
616 pub timeout: Duration,
618
619 pub unhealthy_threshold: usize,
621
622 pub healthy_threshold: usize,
624}
625
626#[derive(Debug, Clone)]
628pub struct LoadBalancerFailover {
629 pub enabled: bool,
631
632 pub targets: Vec<String>,
634
635 pub failback_policy: FailbackPolicy,
637}
638
639#[derive(Debug, Clone)]
641pub enum FailbackPolicy {
642 Immediate,
644
645 Delayed { delay: Duration },
647
648 Manual,
650
651 LoadBased { threshold: f64 },
653}
654
655#[derive(Debug, Clone)]
657pub struct SchedulerPerformanceTuning {
658 pub thread_pool_size: usize,
660
661 pub batch_size: usize,
663
664 pub scheduling_frequency: Duration,
666
667 pub memory_optimization: MemoryOptimization,
669
670 pub cache_config: SchedulerCacheConfig,
672}
673
674#[derive(Debug, Clone)]
676pub struct MemoryOptimization {
677 pub memory_pooling: bool,
679
680 pub object_recycling: bool,
682
683 pub gc_tuning: GarbageCollectionTuning,
685}
686
687#[derive(Debug, Clone)]
689pub struct GarbageCollectionTuning {
690 pub frequency: Duration,
692
693 pub pressure_threshold: f64,
695
696 pub cleanup_strategies: Vec<CleanupStrategy>,
698}
699
700#[derive(Debug, Clone)]
702pub enum CleanupStrategy {
703 CompletedTasks,
705
706 ExpiredCache,
708
709 CompactStructures,
711
712 Custom { strategy_name: String },
714}
715
716#[derive(Debug, Clone)]
718pub struct SchedulerCacheConfig {
719 pub task_cache_size: usize,
721
722 pub dependency_cache_size: usize,
724
725 pub stats_cache_size: usize,
727
728 pub cache_ttl: Duration,
730}
731
732#[derive(Debug, Clone)]
734pub struct SchedulerMonitoringConfig {
735 pub enable_metrics: bool,
737
738 pub enable_task_tracking: bool,
740
741 pub enable_queue_stats: bool,
743
744 pub metrics_interval: Duration,
746
747 pub alert_thresholds: SchedulerAlertThresholds,
749}
750
751#[derive(Debug, Clone)]
753pub struct SchedulerAlertThresholds {
754 pub queue_size_threshold: usize,
756
757 pub failure_rate_threshold: f64,
759
760 pub wait_time_threshold: Duration,
762
763 pub utilization_threshold: f64,
765}
766
767#[derive(Debug, Clone)]
769pub struct SchedulerStatus {
770 pub queued_tasks: usize,
772
773 pub running_tasks: usize,
775
776 pub completed_tasks: u64,
778
779 pub failed_tasks: u64,
781
782 pub cancelled_tasks: u64,
784
785 pub health: SchedulerHealth,
787
788 pub performance: SchedulerPerformanceMetrics,
790
791 pub queue_stats: HashMap<TaskPriority, QueueStatistics>,
793
794 pub resource_utilization: f64,
796}
797
798#[derive(Debug, Clone)]
800pub enum SchedulerHealth {
801 Healthy,
803
804 Overloaded { queue_size: usize },
806
807 Blocked { reason: String },
809
810 Degraded { performance_impact: f64 },
812
813 Failed { reason: String },
815}
816
817#[derive(Debug, Clone)]
819pub struct SchedulerPerformanceMetrics {
820 pub avg_scheduling_time: Duration,
822
823 pub avg_wait_time: Duration,
825
826 pub throughput: f64,
828
829 pub efficiency: f64,
831
832 pub queue_utilization: f64,
834
835 pub dependency_resolution_efficiency: f64,
837}
838
839#[derive(Debug, Clone)]
841pub struct QueueStatistics {
842 pub task_count: usize,
844
845 pub avg_wait_time: Duration,
847
848 pub oldest_task_age: Duration,
850
851 pub growth_rate: f64,
853}
854
855impl Default for SchedulerConfig {
858 fn default() -> Self {
859 Self {
860 algorithm: SchedulingAlgorithm::Priority,
861 queue_management: QueueManagement {
862 max_queue_size: 10000,
863 overflow_strategy: QueueOverflowStrategy::Block,
864 persistence: QueuePersistence::Memory,
865 partitioning: QueuePartitioning::ByPriority,
866 compaction: QueueCompaction {
867 enabled: true,
868 trigger_threshold: 0.8,
869 interval: Duration::from_secs(300),
870 strategy: CompactionStrategy::RemoveCompleted,
871 },
872 rebalancing: QueueRebalancing {
873 enabled: false,
874 interval: Duration::from_secs(60),
875 imbalance_threshold: 0.3,
876 strategy: RebalancingStrategy::LoadBased,
877 },
878 },
879 priority_handling: PriorityHandling {
880 levels: vec![
881 TaskPriority::Low,
882 TaskPriority::Normal,
883 TaskPriority::High,
884 TaskPriority::Critical,
885 ],
886 aging_strategy: AgingStrategy::Linear {
887 increment_interval: Duration::from_secs(60),
888 increment_amount: 1,
889 },
890 starvation_prevention: true,
891 priority_inversion: PriorityInversionHandling {
892 priority_inheritance: true,
893 priority_ceiling: false,
894 detection_threshold: Duration::from_secs(10),
895 resolution_strategy: PriorityInversionResolution::PriorityBoost,
896 },
897 dynamic_priority: DynamicPriorityConfig {
898 enabled: false,
899 factors: PriorityAdjustmentFactors {
900 wait_time_factor: 0.3,
901 resource_availability_factor: 0.2,
902 system_load_factor: 0.2,
903 deadline_proximity_factor: 0.2,
904 performance_factor: 0.1,
905 },
906 adjustment_interval: Duration::from_secs(30),
907 max_adjustment: 5,
908 },
909 },
910 dependency_resolution: DependencyResolution {
911 enable_tracking: true,
912 cycle_detection: true,
913 deadlock_prevention: true,
914 resolution_timeout: Duration::from_secs(30),
915 graph_optimization: DependencyGraphOptimization {
916 enabled: false,
917 algorithms: vec![GraphOptimizationAlgorithm::TopologicalSort],
918 optimization_interval: Duration::from_secs(300),
919 },
920 caching: DependencyCaching {
921 enabled: true,
922 cache_size: 1000,
923 ttl: Duration::from_secs(300),
924 eviction_strategy: CacheEvictionStrategy::LRU,
925 },
926 },
927 load_balancing: LoadBalancingConfig {
928 algorithm: LoadBalancingAlgorithm::RoundRobin,
929 rebalancing_frequency: Duration::from_secs(30),
930 load_threshold: 0.8,
931 health_checks: LoadBalancerHealthChecks {
932 interval: Duration::from_secs(10),
933 timeout: Duration::from_secs(5),
934 unhealthy_threshold: 3,
935 healthy_threshold: 2,
936 },
937 failover: LoadBalancerFailover {
938 enabled: false,
939 targets: Vec::new(),
940 failback_policy: FailbackPolicy::Delayed {
941 delay: Duration::from_secs(60),
942 },
943 },
944 },
945 performance_tuning: SchedulerPerformanceTuning {
946 thread_pool_size: num_cpus::get(),
947 batch_size: 100,
948 scheduling_frequency: Duration::from_millis(100),
949 memory_optimization: MemoryOptimization {
950 memory_pooling: true,
951 object_recycling: true,
952 gc_tuning: GarbageCollectionTuning {
953 frequency: Duration::from_secs(60),
954 pressure_threshold: 0.8,
955 cleanup_strategies: vec![
956 CleanupStrategy::CompletedTasks,
957 CleanupStrategy::ExpiredCache,
958 ],
959 },
960 },
961 cache_config: SchedulerCacheConfig {
962 task_cache_size: 10000,
963 dependency_cache_size: 5000,
964 stats_cache_size: 1000,
965 cache_ttl: Duration::from_secs(300),
966 },
967 },
968 monitoring: SchedulerMonitoringConfig {
969 enable_metrics: true,
970 enable_task_tracking: true,
971 enable_queue_stats: true,
972 metrics_interval: Duration::from_secs(10),
973 alert_thresholds: SchedulerAlertThresholds {
974 queue_size_threshold: 1000,
975 failure_rate_threshold: 0.1,
976 wait_time_threshold: Duration::from_secs(300),
977 utilization_threshold: 0.9,
978 },
979 },
980 }
981 }
982}
983
984pub struct DefaultTaskScheduler {
989 config: SchedulerConfig,
991
992 queues: BTreeMap<TaskPriority, VecDeque<(ExecutionTask, TaskHandle)>>,
994
995 running: HashMap<String, (ExecutionTask, TaskHandle)>,
997
998 dependency_graph: HashMap<String, Vec<String>>,
1000
1001 handles: HashMap<String, TaskHandle>,
1003
1004 stats: SchedulerStatistics,
1006
1007 task_id_counter: u64,
1009
1010 state: SchedulerState,
1012}
1013
1014#[derive(Debug, Clone)]
1016struct SchedulerStatistics {
1017 total_scheduled: u64,
1018 total_completed: u64,
1019 total_failed: u64,
1020 total_cancelled: u64,
1021 scheduling_start_time: SystemTime,
1022 last_stats_update: SystemTime,
1023}
1024
1025#[derive(Debug, Clone)]
1027enum SchedulerState {
1028 Running,
1030 Stopping,
1032 Stopped,
1034}
1035
1036impl DefaultTaskScheduler {
1037 #[must_use]
1039 pub fn new(config: SchedulerConfig) -> Self {
1040 Self {
1041 config,
1042 queues: BTreeMap::new(),
1043 running: HashMap::new(),
1044 dependency_graph: HashMap::new(),
1045 handles: HashMap::new(),
1046 stats: SchedulerStatistics {
1047 total_scheduled: 0,
1048 total_completed: 0,
1049 total_failed: 0,
1050 total_cancelled: 0,
1051 scheduling_start_time: SystemTime::now(),
1052 last_stats_update: SystemTime::now(),
1053 },
1054 task_id_counter: 0,
1055 state: SchedulerState::Running,
1056 }
1057 }
1058
1059 fn generate_handle(&mut self, task: &ExecutionTask) -> TaskHandle {
1061 self.task_id_counter += 1;
1062
1063 TaskHandle {
1065 task_id: format!("{}_{}", task.id, self.task_id_counter),
1066 scheduled_at: SystemTime::now(),
1067 estimated_duration: task.metadata.estimated_duration,
1068 priority: task.metadata.priority.clone(),
1069 dependencies: task.metadata.dependencies.clone(),
1070 state: TaskState::Queued,
1071 queue_position: None,
1072 retry_count: 0,
1073 last_updated: SystemTime::now(),
1074 }
1075 }
1076
1077 fn dependencies_satisfied(&self, handle: &TaskHandle) -> bool {
1079 for dependency_id in &handle.dependencies {
1080 if let Some(dep_handle) = self.handles.get(dependency_id) {
1081 if dep_handle.state != TaskState::Completed {
1082 return false;
1083 }
1084 } else {
1085 }
1087 }
1088 true
1089 }
1090
1091 fn update_queue_positions(&mut self) {
1093 for (priority, queue) in &mut self.queues {
1094 for (pos, (_, handle)) in queue.iter_mut().enumerate() {
1095 handle.queue_position = Some(pos);
1096 handle.last_updated = SystemTime::now();
1097 }
1098 }
1099 }
1100
1101 fn apply_priority_aging(&mut self) {
1103 if let AgingStrategy::Linear {
1104 increment_interval,
1105 increment_amount,
1106 } = &self.config.priority_handling.aging_strategy
1107 {
1108 let now = SystemTime::now();
1109 for queue in self.queues.values_mut() {
1110 for (_, handle) in queue {
1111 if let Ok(elapsed) = now.duration_since(handle.scheduled_at) {
1112 if elapsed >= *increment_interval {
1113 handle.last_updated = now;
1115 }
1116 }
1117 }
1118 }
1119 } else {
1120 }
1122 }
1123
1124 fn detect_dependency_cycles(&self) -> SklResult<Vec<Vec<String>>> {
1126 Ok(Vec::new())
1129 }
1130
1131 fn optimize_execution_order(&mut self) -> SklResult<()> {
1133 Ok(())
1135 }
1136}
1137
1138impl TaskScheduler for DefaultTaskScheduler {
1139 fn schedule_task(&mut self, task: ExecutionTask) -> SklResult<TaskHandle> {
1140 if matches!(
1141 self.state,
1142 SchedulerState::Stopping | SchedulerState::Stopped
1143 ) {
1144 return Err(SklearsError::InvalidInput(
1145 "Scheduler is shutting down".to_string(),
1146 ));
1147 }
1148
1149 let handle = self.generate_handle(&task);
1150 let priority = handle.priority.clone();
1151
1152 if self.config.dependency_resolution.cycle_detection {
1154 self.detect_dependency_cycles()?;
1155 }
1156
1157 let queue = self.queues.entry(priority).or_default();
1159
1160 if queue.len() >= self.config.queue_management.max_queue_size {
1162 match self.config.queue_management.overflow_strategy {
1163 QueueOverflowStrategy::Block => {
1164 return Err(SklearsError::InvalidInput("Queue is full".to_string()));
1165 }
1166 QueueOverflowStrategy::Drop => {
1167 return Ok(handle); }
1169 QueueOverflowStrategy::DropOldest => {
1170 queue.pop_front();
1171 }
1172 QueueOverflowStrategy::Reject => {
1173 return Err(SklearsError::InvalidInput(
1174 "Queue overflow: task rejected".to_string(),
1175 ));
1176 }
1177 _ => {
1178 }
1180 }
1181 }
1182
1183 queue.push_back((task, handle.clone()));
1184 self.handles.insert(handle.task_id.clone(), handle.clone());
1185 self.stats.total_scheduled += 1;
1186
1187 self.update_queue_positions();
1188
1189 Ok(handle)
1190 }
1191
1192 fn schedule_batch(&mut self, tasks: Vec<ExecutionTask>) -> SklResult<Vec<TaskHandle>> {
1193 let mut handles = Vec::new();
1194 for task in tasks {
1195 let handle = self.schedule_task(task)?;
1196 handles.push(handle);
1197 }
1198 Ok(handles)
1199 }
1200
1201 fn cancel_task(&mut self, handle: TaskHandle) -> SklResult<()> {
1202 for queue in self.queues.values_mut() {
1204 queue.retain(|(_, h)| h.task_id != handle.task_id);
1205 }
1206
1207 self.running.remove(&handle.task_id);
1209
1210 if let Some(h) = self.handles.get_mut(&handle.task_id) {
1212 h.state = TaskState::Cancelled;
1213 h.last_updated = SystemTime::now();
1214 }
1215
1216 self.stats.total_cancelled += 1;
1217
1218 Ok(())
1219 }
1220
1221 fn get_status(&self) -> SchedulerStatus {
1222 let queued_tasks: usize = self
1223 .queues
1224 .values()
1225 .map(std::collections::VecDeque::len)
1226 .sum();
1227 let running_tasks = self.running.len();
1228
1229 let health = if queued_tasks > self.config.monitoring.alert_thresholds.queue_size_threshold
1230 {
1231 SchedulerHealth::Overloaded {
1232 queue_size: queued_tasks,
1233 }
1234 } else {
1235 SchedulerHealth::Healthy
1236 };
1237
1238 let queue_stats = self
1239 .queues
1240 .iter()
1241 .map(|(priority, queue)| {
1242 let avg_wait_time = Duration::from_secs(60); let oldest_task_age = Duration::from_secs(300); (
1246 priority.clone(),
1247 QueueStatistics {
1249 task_count: queue.len(),
1250 avg_wait_time,
1251 oldest_task_age,
1252 growth_rate: 0.1, },
1254 )
1255 })
1256 .collect();
1257
1258 SchedulerStatus {
1260 queued_tasks,
1261 running_tasks,
1262 completed_tasks: self.stats.total_completed,
1263 failed_tasks: self.stats.total_failed,
1264 cancelled_tasks: self.stats.total_cancelled,
1265 health,
1266 performance: SchedulerPerformanceMetrics {
1267 avg_scheduling_time: Duration::from_millis(5),
1268 avg_wait_time: Duration::from_secs(30),
1269 throughput: 10.0, efficiency: 0.85,
1271 queue_utilization: 0.6,
1272 dependency_resolution_efficiency: 0.9,
1273 },
1274 queue_stats,
1275 resource_utilization: 0.7,
1276 }
1277 }
1278
1279 fn update_config(&mut self, config: SchedulerConfig) -> SklResult<()> {
1280 self.config = config;
1281 Ok(())
1282 }
1283
1284 async fn shutdown_gracefully(&mut self) -> SklResult<()> {
1285 self.state = SchedulerState::Stopping;
1286
1287 while !self.running.is_empty() {
1289 tokio::time::sleep(Duration::from_millis(100)).await;
1290 }
1291
1292 self.state = SchedulerState::Stopped;
1293 Ok(())
1294 }
1295
1296 fn get_next_task(&mut self) -> Option<(ExecutionTask, TaskHandle)> {
1297 if self.config.priority_handling.starvation_prevention {
1299 self.apply_priority_aging();
1300 }
1301
1302 for (priority, queue) in self.queues.iter_mut().rev() {
1304 if let Some((task, mut handle)) = queue.pop_front() {
1305 let dependencies_satisfied = handle.dependencies.iter().all(|dep_id| {
1307 if let Some(dep_handle) = self.handles.get(dep_id) {
1308 dep_handle.state == TaskState::Completed
1309 } else {
1310 true }
1312 });
1313
1314 if dependencies_satisfied {
1315 handle.state = TaskState::Running;
1316 handle.last_updated = SystemTime::now();
1317
1318 self.running
1319 .insert(handle.task_id.clone(), (task.clone(), handle.clone()));
1320 self.handles.insert(handle.task_id.clone(), handle.clone());
1321
1322 return Some((task, handle));
1323 }
1324 handle.state = TaskState::WaitingForDependencies;
1326 queue.push_back((task, handle));
1327 }
1329 }
1330
1331 None
1332 }
1333
1334 fn mark_task_completed(&mut self, handle: &TaskHandle) -> SklResult<()> {
1335 self.running.remove(&handle.task_id);
1336
1337 if let Some(h) = self.handles.get_mut(&handle.task_id) {
1338 h.state = TaskState::Completed;
1339 h.last_updated = SystemTime::now();
1340 }
1341
1342 self.stats.total_completed += 1;
1343 Ok(())
1344 }
1345
1346 fn mark_task_failed(&mut self, handle: &TaskHandle, _error: String) -> SklResult<()> {
1347 self.running.remove(&handle.task_id);
1348
1349 if let Some(h) = self.handles.get_mut(&handle.task_id) {
1350 h.state = TaskState::Failed;
1351 h.retry_count += 1;
1352 h.last_updated = SystemTime::now();
1353 }
1354
1355 self.stats.total_failed += 1;
1356 Ok(())
1357 }
1358}
1359
1360#[allow(non_snake_case)]
1361#[cfg(test)]
1362mod tests {
1363 use super::*;
1364 use crate::execution_types::{TaskConstraints, TaskMetadata, TaskRequirements, TaskType};
1365
1366 #[test]
1367 fn test_scheduler_creation() {
1368 let config = SchedulerConfig::default();
1369 let scheduler = DefaultTaskScheduler::new(config);
1370
1371 let status = scheduler.get_status();
1372 assert_eq!(status.queued_tasks, 0);
1373 assert_eq!(status.running_tasks, 0);
1374 assert!(matches!(status.health, SchedulerHealth::Healthy));
1375 }
1376
1377 #[test]
1378 fn test_task_scheduling() {
1379 let config = SchedulerConfig::default();
1380 let mut scheduler = DefaultTaskScheduler::new(config);
1381
1382 let task = ExecutionTask {
1383 id: "test_task".to_string(),
1384 task_type: TaskType::Transform,
1385 task_fn: Box::new(|| Ok(())),
1386 metadata: TaskMetadata {
1387 name: "Test Task".to_string(),
1388 description: Some("A test task".to_string()),
1389 tags: vec!["test".to_string()],
1390 created_at: SystemTime::now(),
1391 estimated_duration: Some(Duration::from_secs(60)),
1392 priority: TaskPriority::Normal,
1393 dependencies: vec![],
1394 group_id: None,
1395 submitted_by: None,
1396 custom_metadata: HashMap::new(),
1397 retry_config: None,
1398 timeout_config: None,
1399 },
1400 requirements: TaskRequirements {
1401 cpu_cores: Some(1),
1402 memory_bytes: Some(1024 * 1024), io_bandwidth: None,
1404 gpu_memory: None,
1405 network_bandwidth: None,
1406 storage_space: None,
1407 gpu_requirements: None,
1408 cpu_requirements: None,
1409 memory_requirements: None,
1410 io_requirements: None,
1411 network_requirements: None,
1412 custom_requirements: HashMap::new(),
1413 },
1414 constraints: TaskConstraints {
1415 max_execution_time: Some(Duration::from_secs(300)),
1416 deadline: None,
1417 location: None,
1418 affinity: None,
1419 isolation: None,
1420 security: None,
1421 compliance: None,
1422 custom_constraints: HashMap::new(),
1423 },
1424 };
1425
1426 let handle = scheduler.schedule_task(task);
1427 assert!(handle.is_ok());
1428
1429 let handle = handle.unwrap();
1430 assert_eq!(handle.priority, TaskPriority::Normal);
1431 assert_eq!(handle.state, TaskState::Queued);
1432
1433 let status = scheduler.get_status();
1434 assert_eq!(status.queued_tasks, 1);
1435 }
1436
1437 #[test]
1438 fn test_priority_ordering() {
1439 assert!(TaskPriority::Critical > TaskPriority::High);
1440 assert!(TaskPriority::High > TaskPriority::Normal);
1441 assert!(TaskPriority::Normal > TaskPriority::Low);
1442 }
1443
1444 #[test]
1445 fn test_task_states() {
1446 let states = vec![
1447 TaskState::Queued,
1448 TaskState::WaitingForDependencies,
1449 TaskState::Ready,
1450 TaskState::Running,
1451 TaskState::Completed,
1452 TaskState::Failed,
1453 TaskState::Cancelled,
1454 TaskState::TimedOut,
1455 ];
1456
1457 for state in states {
1458 assert!(matches!(state, _)); }
1460 }
1461
1462 #[test]
1463 fn test_scheduling_algorithms() {
1464 let algorithms = vec![
1465 SchedulingAlgorithm::FIFO,
1466 SchedulingAlgorithm::LIFO,
1467 SchedulingAlgorithm::Priority,
1468 SchedulingAlgorithm::ShortestJobFirst,
1469 SchedulingAlgorithm::FairShare,
1470 ];
1471
1472 for algorithm in algorithms {
1473 assert!(matches!(algorithm, _)); }
1475 }
1476
1477 #[test]
1478 fn test_queue_overflow_strategies() {
1479 let strategies = vec![
1480 QueueOverflowStrategy::Block,
1481 QueueOverflowStrategy::Drop,
1482 QueueOverflowStrategy::DropOldest,
1483 QueueOverflowStrategy::Reject,
1484 ];
1485
1486 for strategy in strategies {
1487 assert!(matches!(strategy, _)); }
1489 }
1490
1491 #[test]
1492 fn test_scheduler_config_defaults() {
1493 let config = SchedulerConfig::default();
1494 assert!(matches!(config.algorithm, SchedulingAlgorithm::Priority));
1495 assert_eq!(config.queue_management.max_queue_size, 10000);
1496 assert!(config.priority_handling.starvation_prevention);
1497 assert!(config.dependency_resolution.enable_tracking);
1498 }
1499}