1use sklears_core::error::Result as SklResult;
8use std::any::Any;
9use std::collections::HashMap;
10use std::fmt;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16pub trait ExecutionStrategy: Send + Sync + fmt::Debug {
18 fn name(&self) -> &str;
20
21 fn execute_task(
23 &self,
24 task: ExecutionTask,
25 ) -> Pin<Box<dyn Future<Output = SklResult<TaskResult>> + Send + '_>>;
26
27 fn execute_batch(
29 &self,
30 tasks: Vec<ExecutionTask>,
31 ) -> Pin<Box<dyn Future<Output = SklResult<Vec<TaskResult>>> + Send + '_>>;
32
33 fn can_handle(&self, requirements: &TaskRequirements) -> bool;
35
36 fn config(&self) -> &StrategyConfig;
38
39 fn update_config(&mut self, config: StrategyConfig) -> SklResult<()>;
41
42 fn health_check(&self) -> Health;
44
45 fn get_metrics(&self) -> StrategyMetrics;
47}
48
49pub trait TaskScheduler: Send + Sync + fmt::Debug {
51 fn schedule_task(&mut self, task: ExecutionTask) -> SklResult<String>;
53
54 fn schedule_batch(&mut self, tasks: Vec<ExecutionTask>) -> SklResult<Vec<String>>;
56
57 fn next_task(&mut self) -> Option<ExecutionTask>;
59
60 fn cancel_task(&mut self, task_id: &str) -> SklResult<()>;
62
63 fn queue_status(&self) -> QueueStatus;
65
66 fn start(&mut self) -> SklResult<()>;
68
69 fn pause(&mut self) -> SklResult<()>;
71
72 fn resume(&mut self) -> SklResult<()>;
74
75 fn shutdown(&mut self) -> SklResult<()>;
77
78 fn health_check(&self) -> Health;
80}
81
82pub trait ResourceManager: Send + Sync + fmt::Debug {
84 fn check_availability(&self, requirements: &TaskRequirements) -> SklResult<bool>;
86
87 fn allocate_resources(&self, requirements: &TaskRequirements) -> SklResult<ResourceAllocation>;
89
90 fn release_resources(&self, allocation: ResourceAllocation) -> SklResult<()>;
92
93 fn get_usage_summary(&self) -> ResourceUsageSummary;
95
96 fn initialize(&self) -> SklResult<()>;
98
99 fn shutdown(&self) -> SklResult<()>;
101
102 fn health_check(&self) -> Health;
104}
105
106pub struct ExecutionTask {
108 pub id: String,
110 pub metadata: TaskMetadata,
112 pub requirements: TaskRequirements,
114 pub constraints: TaskConstraints,
116 pub execution_fn: Arc<dyn Fn() -> SklResult<Box<dyn Any + Send + Sync>> + Send + Sync>,
118 pub created_at: SystemTime,
120 pub priority: TaskPriority,
122 pub status: TaskStatus,
124}
125
126impl std::fmt::Debug for ExecutionTask {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("ExecutionTask")
129 .field("id", &self.id)
130 .field("metadata", &self.metadata)
131 .field("requirements", &self.requirements)
132 .field("constraints", &self.constraints)
133 .field("execution_fn", &"<function>")
134 .field("created_at", &self.created_at)
135 .field("priority", &self.priority)
136 .field("status", &self.status)
137 .finish()
138 }
139}
140
141impl Clone for ExecutionTask {
142 fn clone(&self) -> Self {
143 Self {
144 id: self.id.clone(),
145 metadata: self.metadata.clone(),
146 requirements: self.requirements.clone(),
147 constraints: self.constraints.clone(),
148 execution_fn: self.execution_fn.clone(), created_at: self.created_at,
150 priority: self.priority.clone(),
151 status: self.status.clone(),
152 }
153 }
154}
155
156#[derive(Debug, Clone)]
158pub struct TaskMetadata {
159 pub name: String,
161 pub task_type: TaskType,
163 pub description: Option<String>,
165 pub tags: Vec<String>,
167 pub custom_fields: HashMap<String, String>,
169}
170
171#[derive(Debug, Clone)]
173pub struct TaskRequirements {
174 pub cpu_cores: Option<usize>,
176 pub memory: Option<u64>,
178 pub gpu_memory: Option<u64>,
180 pub gpu_devices: Vec<String>,
182 pub network_bandwidth: Option<u64>,
184 pub disk_space: Option<u64>,
186 pub execution_location: ExecutionLocation,
188 pub affinity: TaskAffinity,
190}
191
192#[derive(Debug, Clone)]
194pub struct TaskConstraints {
195 pub max_execution_time: Option<Duration>,
197 pub max_memory_usage: Option<u64>,
199 pub max_retries: Option<usize>,
201 pub timeout: Option<Duration>,
203 pub dependencies: Vec<String>,
205 pub exclusive_resources: Vec<String>,
207}
208
209#[derive(Debug)]
211pub struct TaskResult {
212 pub task_id: String,
214 pub status: TaskStatus,
216 pub result: Option<Box<dyn Any + Send + Sync>>,
218 pub error: Option<TaskError>,
220 pub metrics: TaskExecutionMetrics,
222 pub completed_at: SystemTime,
224}
225
226#[derive(Debug, Clone, PartialEq)]
228pub enum TaskStatus {
229 Pending,
231 Running,
233 Completed,
235 Failed,
237 Cancelled,
239 TimedOut,
241 Retrying,
243}
244
245#[derive(Debug, Clone, PartialEq)]
247pub enum TaskType {
248 Preprocess,
250 Fit,
252 Predict,
254 Transform,
256 Evaluate,
258 FeatureExtraction,
260 Validate,
262 Optimize,
264 Custom(String),
266}
267
268#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Default)]
270pub enum TaskPriority {
271 Low = 0,
273 #[default]
275 Normal = 1,
276 High = 2,
278 Critical = 3,
280}
281
282#[derive(Debug, Clone, PartialEq)]
284pub enum ExecutionLocation {
285 Any,
287 Local,
289 Remote,
291 Specific(String),
293 Gpu,
295 Cloud,
297}
298
299#[derive(Debug, Clone, Default)]
301pub struct TaskAffinity {
302 pub preferred_nodes: Vec<String>,
304 pub avoid_nodes: Vec<String>,
306 pub cpu_affinity: Vec<usize>,
308 pub numa_node: Option<usize>,
310}
311
312#[derive(Debug, Clone)]
314pub struct TaskError {
315 pub code: String,
317 pub message: String,
319 pub category: ErrorCategory,
321 pub retry_info: Option<RetryInfo>,
323 pub stack_trace: Option<String>,
325}
326
327#[derive(Debug, Clone, PartialEq)]
329pub enum ErrorCategory {
330 ResourceAllocation,
332 Timeout,
334 InvalidInput,
336 System,
338 Network,
340 UserCode,
342 Unknown,
344}
345
346#[derive(Debug, Clone)]
348pub struct RetryInfo {
349 pub attempt: usize,
351 pub max_retries: usize,
353 pub next_retry_delay: Duration,
355 pub strategy: RetryStrategy,
357}
358
359#[derive(Debug, Clone, PartialEq)]
361pub enum RetryStrategy {
362 Fixed(Duration),
364 Exponential { base: Duration, multiplier: f64 },
366 Linear { base: Duration, increment: Duration },
368 Custom(String),
370}
371
372#[derive(Debug, Clone)]
374pub struct TaskExecutionMetrics {
375 pub start_time: SystemTime,
377 pub end_time: Option<SystemTime>,
379 pub execution_duration: Option<Duration>,
381 pub cpu_time: Option<Duration>,
383 pub memory_usage: TaskMemoryUsage,
385 pub resource_usage: TaskResourceUsage,
387 pub performance_metrics: TaskPerformanceMetrics,
389}
390
391#[derive(Debug, Clone)]
393pub struct TaskMemoryUsage {
394 pub peak_memory: u64,
396 pub average_memory: u64,
398 pub allocations: u64,
400 pub deallocations: u64,
402}
403
404#[derive(Debug, Clone)]
406pub struct TaskResourceUsage {
407 pub cpu_utilization: f64,
409 pub memory_utilization: f64,
411 pub gpu_utilization: Option<f64>,
413 pub network_io: u64,
415 pub disk_io: u64,
417}
418
419#[derive(Debug, Clone)]
421pub struct TaskPerformanceMetrics {
422 pub ops_per_second: Option<f64>,
424 pub throughput: Option<f64>,
426 pub latency: Option<Duration>,
428 pub quality_metrics: HashMap<String, f64>,
430}
431
432#[derive(Debug, Clone)]
434pub struct StrategyConfig {
435 pub name: String,
437 pub parameters: HashMap<String, String>,
439 pub resource_limits: ResourceLimits,
441 pub performance_targets: PerformanceTargets,
443 pub debug_enabled: bool,
445}
446
447#[derive(Debug, Clone)]
449pub struct ResourceLimits {
450 pub max_cpu_cores: Option<usize>,
452 pub max_memory: Option<u64>,
454 pub max_gpu_memory: Option<u64>,
456 pub max_concurrent_tasks: Option<usize>,
458}
459
460#[derive(Debug, Clone)]
462pub struct PerformanceTargets {
463 pub target_throughput: Option<f64>,
465 pub target_latency: Option<f64>,
467 pub target_utilization: Option<f64>,
469}
470
471#[derive(Debug, Clone)]
473pub struct StrategyMetrics {
474 pub total_tasks: u64,
476 pub successful_tasks: u64,
478 pub failed_tasks: u64,
480 pub average_execution_time: Duration,
482 pub current_throughput: f64,
484 pub resource_utilization: f64,
486}
487
488#[derive(Debug, Clone, PartialEq)]
490pub enum Health {
491 Healthy,
493 Unhealthy,
495 Degraded,
497 Unknown,
499}
500
501#[derive(Debug, Clone)]
503pub struct QueueStatus {
504 pub pending_tasks: usize,
506 pub running_tasks: usize,
508 pub queue_capacity: Option<usize>,
510 pub utilization: f64,
512}
513
514#[derive(Debug, Clone)]
516pub struct ResourceAllocation {
517 pub id: String,
519 pub cpu_cores: Vec<usize>,
521 pub memory: u64,
523 pub gpu_devices: Vec<String>,
525 pub allocated_at: SystemTime,
527 pub timeout: Option<Duration>,
529}
530
531#[derive(Debug, Clone)]
533pub struct ResourceUsageSummary {
534 pub total_cpu_cores: usize,
536 pub used_cpu_cores: usize,
538 pub total_memory: u64,
540 pub used_memory: u64,
542 pub total_gpu_devices: usize,
544 pub used_gpu_devices: usize,
546 pub utilization_percentage: f64,
548}
549
550impl Default for TaskRequirements {
552 fn default() -> Self {
553 Self {
554 cpu_cores: None,
555 memory: None,
556 gpu_memory: None,
557 gpu_devices: Vec::new(),
558 network_bandwidth: None,
559 disk_space: None,
560 execution_location: ExecutionLocation::Any,
561 affinity: TaskAffinity::default(),
562 }
563 }
564}
565
566impl Default for TaskConstraints {
567 fn default() -> Self {
568 Self {
569 max_execution_time: None,
570 max_memory_usage: None,
571 max_retries: Some(3),
572 timeout: Some(Duration::from_secs(300)), dependencies: Vec::new(),
574 exclusive_resources: Vec::new(),
575 }
576 }
577}
578
579impl Default for StrategyConfig {
580 fn default() -> Self {
581 Self {
582 name: "default".to_string(),
583 parameters: HashMap::new(),
584 resource_limits: ResourceLimits::default(),
585 performance_targets: PerformanceTargets::default(),
586 debug_enabled: false,
587 }
588 }
589}
590
591impl Default for ResourceLimits {
592 fn default() -> Self {
593 Self {
594 max_cpu_cores: None,
595 max_memory: None,
596 max_gpu_memory: None,
597 max_concurrent_tasks: Some(10),
598 }
599 }
600}
601
602impl Default for PerformanceTargets {
603 fn default() -> Self {
604 Self {
605 target_throughput: None,
606 target_latency: None,
607 target_utilization: Some(80.0), }
609 }
610}
611
612impl fmt::Display for TaskStatus {
613 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614 match self {
615 TaskStatus::Pending => write!(f, "Pending"),
616 TaskStatus::Running => write!(f, "Running"),
617 TaskStatus::Completed => write!(f, "Completed"),
618 TaskStatus::Failed => write!(f, "Failed"),
619 TaskStatus::Cancelled => write!(f, "Cancelled"),
620 TaskStatus::TimedOut => write!(f, "TimedOut"),
621 TaskStatus::Retrying => write!(f, "Retrying"),
622 }
623 }
624}
625
626impl fmt::Display for TaskType {
627 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
628 match self {
629 TaskType::Preprocess => write!(f, "Preprocess"),
630 TaskType::Fit => write!(f, "Fit"),
631 TaskType::Predict => write!(f, "Predict"),
632 TaskType::Transform => write!(f, "Transform"),
633 TaskType::Evaluate => write!(f, "Evaluate"),
634 TaskType::FeatureExtraction => write!(f, "FeatureExtraction"),
635 TaskType::Validate => write!(f, "Validate"),
636 TaskType::Optimize => write!(f, "Optimize"),
637 TaskType::Custom(name) => write!(f, "Custom({name})"),
638 }
639 }
640}
641
642impl fmt::Display for Health {
643 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
644 match self {
645 Health::Healthy => write!(f, "Healthy"),
646 Health::Unhealthy => write!(f, "Unhealthy"),
647 Health::Degraded => write!(f, "Degraded"),
648 Health::Unknown => write!(f, "Unknown"),
649 }
650 }
651}
652
653pub struct TaskBuilder {
655 id: Option<String>,
656 metadata: TaskMetadata,
657 requirements: TaskRequirements,
658 constraints: TaskConstraints,
659 priority: TaskPriority,
660}
661
662impl TaskBuilder {
663 #[must_use]
665 pub fn new(name: &str, task_type: TaskType) -> Self {
666 Self {
667 id: None,
668 metadata: TaskMetadata {
669 name: name.to_string(),
670 task_type,
671 description: None,
672 tags: Vec::new(),
673 custom_fields: HashMap::new(),
674 },
675 requirements: TaskRequirements::default(),
676 constraints: TaskConstraints::default(),
677 priority: TaskPriority::default(),
678 }
679 }
680
681 #[must_use]
683 pub fn id(mut self, id: &str) -> Self {
684 self.id = Some(id.to_string());
685 self
686 }
687
688 #[must_use]
690 pub fn description(mut self, description: &str) -> Self {
691 self.metadata.description = Some(description.to_string());
692 self
693 }
694
695 #[must_use]
697 pub fn tag(mut self, tag: &str) -> Self {
698 self.metadata.tags.push(tag.to_string());
699 self
700 }
701
702 #[must_use]
704 pub fn cpu_cores(mut self, cores: usize) -> Self {
705 self.requirements.cpu_cores = Some(cores);
706 self
707 }
708
709 #[must_use]
711 pub fn memory(mut self, memory: u64) -> Self {
712 self.requirements.memory = Some(memory);
713 self
714 }
715
716 #[must_use]
718 pub fn priority(mut self, priority: TaskPriority) -> Self {
719 self.priority = priority;
720 self
721 }
722
723 #[must_use]
725 pub fn max_execution_time(mut self, duration: Duration) -> Self {
726 self.constraints.max_execution_time = Some(duration);
727 self
728 }
729
730 #[must_use]
732 pub fn build_metadata(
733 self,
734 ) -> (
735 TaskMetadata,
736 TaskRequirements,
737 TaskConstraints,
738 TaskPriority,
739 ) {
740 (
741 self.metadata,
742 self.requirements,
743 self.constraints,
744 self.priority,
745 )
746 }
747}
748
749#[allow(non_snake_case)]
750#[cfg(test)]
751mod tests {
752 use super::*;
753
754 #[test]
755 fn test_task_builder() {
756 let (metadata, requirements, constraints, priority) =
757 TaskBuilder::new("test_task", TaskType::Preprocess)
758 .id("task_001")
759 .description("Test task description")
760 .tag("test")
761 .cpu_cores(4)
762 .memory(1024 * 1024 * 1024) .priority(TaskPriority::High)
764 .max_execution_time(Duration::from_secs(60))
765 .build_metadata();
766
767 assert_eq!(metadata.name, "test_task");
768 assert_eq!(metadata.task_type, TaskType::Preprocess);
769 assert_eq!(
770 metadata.description,
771 Some("Test task description".to_string())
772 );
773 assert_eq!(metadata.tags, vec!["test"]);
774 assert_eq!(requirements.cpu_cores, Some(4));
775 assert_eq!(requirements.memory, Some(1024 * 1024 * 1024));
776 assert_eq!(priority, TaskPriority::High);
777 assert_eq!(
778 constraints.max_execution_time,
779 Some(Duration::from_secs(60))
780 );
781 }
782
783 #[test]
784 fn test_task_status_display() {
785 assert_eq!(format!("{}", TaskStatus::Pending), "Pending");
786 assert_eq!(format!("{}", TaskStatus::Running), "Running");
787 assert_eq!(format!("{}", TaskStatus::Completed), "Completed");
788 assert_eq!(format!("{}", TaskStatus::Failed), "Failed");
789 }
790
791 #[test]
792 fn test_health_status() {
793 assert_eq!(Health::Healthy, Health::Healthy);
794 assert_ne!(Health::Healthy, Health::Unhealthy);
795 assert_eq!(format!("{}", Health::Healthy), "Healthy");
796 }
797
798 #[test]
799 fn test_default_implementations() {
800 let requirements = TaskRequirements::default();
801 assert_eq!(requirements.execution_location, ExecutionLocation::Any);
802
803 let constraints = TaskConstraints::default();
804 assert_eq!(constraints.max_retries, Some(3));
805 assert_eq!(constraints.timeout, Some(Duration::from_secs(300)));
806
807 let priority = TaskPriority::default();
808 assert_eq!(priority, TaskPriority::Normal);
809 }
810}