1use scirs2_core::ndarray::{Array1, Array2};
8use serde::{Deserialize, Serialize};
9use sklears_core::{
10 error::{Result as SklResult, SklearsError},
11 types::Float,
12};
13use std::collections::{HashMap, HashSet, VecDeque};
14use std::time::{Duration, Instant};
15
16use super::component_registry::ComponentRegistry;
17use super::workflow_definitions::{Connection, ExecutionMode, StepDefinition, WorkflowDefinition};
18
19#[derive(Debug)]
21pub struct WorkflowExecutor {
22 registry: ComponentRegistry,
24 context: ExecutionContext,
26 stats: ExecutionStatistics,
28}
29
30#[derive(Debug, Clone)]
32pub struct ExecutionContext {
33 pub execution_id: String,
35 pub workflow: WorkflowDefinition,
37 pub data_flow: HashMap<String, StepData>,
39 pub start_time: Instant,
41 pub current_step: Option<String>,
43 pub execution_mode: ExecutionMode,
45}
46
47#[derive(Debug, Clone)]
49pub struct StepData {
50 pub step_id: String,
52 pub port_name: String,
54 pub matrices: HashMap<String, Array2<Float>>,
56 pub arrays: HashMap<String, Array1<Float>>,
58 pub metadata: HashMap<String, String>,
60 pub timestamp: Instant,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ExecutionResult {
67 pub execution_id: String,
69 pub success: bool,
71 pub duration: Duration,
73 pub step_results: Vec<StepExecutionResult>,
75 pub outputs: HashMap<String, String>,
77 pub error: Option<String>,
79 pub performance: PerformanceMetrics,
81}
82
83impl Default for ExecutionResult {
84 fn default() -> Self {
85 Self {
86 execution_id: "unknown".to_string(),
87 success: false,
88 duration: Duration::from_secs(0),
89 step_results: Vec::new(),
90 outputs: HashMap::new(),
91 error: Some("Execution failed".to_string()),
92 performance: PerformanceMetrics::default(),
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct StepExecutionResult {
100 pub step_id: String,
102 pub algorithm: String,
104 pub success: bool,
106 pub duration: Duration,
108 pub memory_usage: u64,
110 pub output_sizes: HashMap<String, usize>,
112 pub error: Option<String>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct PerformanceMetrics {
119 pub total_time: Duration,
121 pub peak_memory: u64,
123 pub cpu_utilization: f64,
125 pub throughput: f64,
127 pub parallelism_efficiency: f64,
129}
130
131#[derive(Debug, Clone)]
133pub struct ExecutionStatistics {
134 pub total_executions: u64,
136 pub successful_executions: u64,
138 pub failed_executions: u64,
140 pub average_execution_time: Duration,
142 pub step_execution_counts: HashMap<String, u64>,
144}
145
146#[derive(Debug, Clone)]
148pub struct ValidationResult {
149 pub is_valid: bool,
151 pub errors: Vec<ValidationError>,
153 pub warnings: Vec<ValidationWarning>,
155 pub execution_order: Option<Vec<String>>,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct ValidationError {
162 pub error_type: String,
164 pub message: String,
166 pub step_id: Option<String>,
168 pub connection: Option<String>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct ValidationWarning {
175 pub warning_type: String,
177 pub message: String,
179 pub step_id: Option<String>,
181}
182
183impl WorkflowExecutor {
184 #[must_use]
186 pub fn new() -> Self {
187 Self {
188 registry: ComponentRegistry::new(),
189 context: ExecutionContext::new(),
190 stats: ExecutionStatistics::new(),
191 }
192 }
193
194 #[must_use]
196 pub fn with_registry(registry: ComponentRegistry) -> Self {
197 Self {
198 registry,
199 context: ExecutionContext::new(),
200 stats: ExecutionStatistics::new(),
201 }
202 }
203
204 #[must_use]
206 pub fn validate_workflow(&self, workflow: &WorkflowDefinition) -> ValidationResult {
207 let mut errors = Vec::new();
208 let mut warnings = Vec::new();
209
210 if workflow.steps.is_empty() {
212 errors.push(ValidationError {
213 error_type: "EmptyWorkflow".to_string(),
214 message: "Workflow has no steps".to_string(),
215 step_id: None,
216 connection: None,
217 });
218 return ValidationResult {
219 is_valid: false,
220 errors,
221 warnings,
222 execution_order: None,
223 };
224 }
225
226 for step in &workflow.steps {
228 self.validate_step(step, &mut errors, &mut warnings);
229 }
230
231 for connection in &workflow.connections {
233 self.validate_connection(connection, workflow, &mut errors, &mut warnings);
234 }
235
236 if let Err(cycle_error) = self.check_circular_dependencies(workflow) {
238 errors.push(ValidationError {
239 error_type: "CircularDependency".to_string(),
240 message: cycle_error,
241 step_id: None,
242 connection: None,
243 });
244 }
245
246 let execution_order = if errors.is_empty() {
248 self.determine_execution_order(workflow).ok()
249 } else {
250 None
251 };
252
253 ValidationResult {
254 is_valid: errors.is_empty(),
255 errors,
256 warnings,
257 execution_order,
258 }
259 }
260
261 fn validate_step(
263 &self,
264 step: &StepDefinition,
265 errors: &mut Vec<ValidationError>,
266 warnings: &mut Vec<ValidationWarning>,
267 ) {
268 if !self.registry.has_component(&step.algorithm) {
270 errors.push(ValidationError {
271 error_type: "UnknownComponent".to_string(),
272 message: format!("Component '{}' not found in registry", step.algorithm),
273 step_id: Some(step.id.clone()),
274 connection: None,
275 });
276 return;
277 }
278
279 if let Err(param_error) = self
281 .registry
282 .validate_parameters(&step.algorithm, &step.parameters)
283 {
284 errors.push(ValidationError {
285 error_type: "InvalidParameters".to_string(),
286 message: param_error.to_string(),
287 step_id: Some(step.id.clone()),
288 connection: None,
289 });
290 }
291
292 if let Some(component) = self.registry.get_component(&step.algorithm) {
294 if component.deprecated {
295 warnings.push(ValidationWarning {
296 warning_type: "DeprecatedComponent".to_string(),
297 message: format!("Component '{}' is deprecated", step.algorithm),
298 step_id: Some(step.id.clone()),
299 });
300 }
301 }
302 }
303
304 fn validate_connection(
306 &self,
307 connection: &Connection,
308 workflow: &WorkflowDefinition,
309 errors: &mut Vec<ValidationError>,
310 _warnings: &mut Vec<ValidationWarning>,
311 ) {
312 let source_step = workflow.steps.iter().find(|s| s.id == connection.from_step);
314 if source_step.is_none() {
315 errors.push(ValidationError {
316 error_type: "InvalidConnection".to_string(),
317 message: format!("Source step '{}' not found", connection.from_step),
318 step_id: None,
319 connection: Some(format!(
320 "{}:{} -> {}:{}",
321 connection.from_step,
322 connection.from_output,
323 connection.to_step,
324 connection.to_input
325 )),
326 });
327 return;
328 }
329
330 let target_step = workflow.steps.iter().find(|s| s.id == connection.to_step);
332 if target_step.is_none() {
333 errors.push(ValidationError {
334 error_type: "InvalidConnection".to_string(),
335 message: format!("Target step '{}' not found", connection.to_step),
336 step_id: None,
337 connection: Some(format!(
338 "{}:{} -> {}:{}",
339 connection.from_step,
340 connection.from_output,
341 connection.to_step,
342 connection.to_input
343 )),
344 });
345 return;
346 }
347
348 let Some(source) = source_step else {
350 return;
351 };
352 if !source.outputs.contains(&connection.from_output) {
353 errors.push(ValidationError {
354 error_type: "InvalidConnection".to_string(),
355 message: format!(
356 "Step '{}' does not have output '{}'",
357 connection.from_step, connection.from_output
358 ),
359 step_id: Some(source.id.clone()),
360 connection: Some(format!(
361 "{}:{} -> {}:{}",
362 connection.from_step,
363 connection.from_output,
364 connection.to_step,
365 connection.to_input
366 )),
367 });
368 }
369
370 let Some(target) = target_step else {
372 return;
373 };
374 if !target.inputs.contains(&connection.to_input) {
375 errors.push(ValidationError {
376 error_type: "InvalidConnection".to_string(),
377 message: format!(
378 "Step '{}' does not have input '{}'",
379 connection.to_step, connection.to_input
380 ),
381 step_id: Some(target.id.clone()),
382 connection: Some(format!(
383 "{}:{} -> {}:{}",
384 connection.from_step,
385 connection.from_output,
386 connection.to_step,
387 connection.to_input
388 )),
389 });
390 }
391 }
392
393 pub fn check_circular_dependencies(&self, workflow: &WorkflowDefinition) -> Result<(), String> {
395 let mut graph = HashMap::new();
396
397 for step in &workflow.steps {
399 graph.insert(step.id.clone(), HashSet::new());
400 }
401
402 for connection in &workflow.connections {
403 if let Some(dependencies) = graph.get_mut(&connection.to_step) {
404 dependencies.insert(connection.from_step.clone());
405 }
406 }
407
408 let mut visited = HashSet::new();
410 let mut rec_stack = HashSet::new();
411
412 for step_id in graph.keys() {
413 if !visited.contains(step_id)
414 && self.has_cycle_dfs(step_id, &graph, &mut visited, &mut rec_stack)
415 {
416 return Err(format!(
417 "Circular dependency detected involving step '{step_id}'"
418 ));
419 }
420 }
421
422 Ok(())
423 }
424
425 fn has_cycle_dfs(
427 &self,
428 step_id: &str,
429 graph: &HashMap<String, HashSet<String>>,
430 visited: &mut HashSet<String>,
431 rec_stack: &mut HashSet<String>,
432 ) -> bool {
433 visited.insert(step_id.to_string());
434 rec_stack.insert(step_id.to_string());
435
436 if let Some(dependencies) = graph.get(step_id) {
437 for dep in dependencies {
438 if !visited.contains(dep) {
439 if self.has_cycle_dfs(dep, graph, visited, rec_stack) {
440 return true;
441 }
442 } else if rec_stack.contains(dep) {
443 return true;
444 }
445 }
446 }
447
448 rec_stack.remove(step_id);
449 false
450 }
451
452 pub fn determine_execution_order(
454 &self,
455 workflow: &WorkflowDefinition,
456 ) -> SklResult<Vec<String>> {
457 let mut in_degree = HashMap::new();
458 let mut adj_list = HashMap::new();
459
460 for step in &workflow.steps {
462 in_degree.insert(step.id.clone(), 0);
463 adj_list.insert(step.id.clone(), Vec::new());
464 }
465
466 for connection in &workflow.connections {
468 if let Some(deg) = in_degree.get_mut(&connection.to_step) {
469 *deg += 1;
470 }
471 if let Some(list) = adj_list.get_mut(&connection.from_step) {
472 list.push(connection.to_step.clone());
473 }
474 }
475
476 let mut queue = VecDeque::new();
478 let mut result = Vec::new();
479
480 for (step_id, degree) in &in_degree {
482 if *degree == 0 {
483 queue.push_back(step_id.clone());
484 }
485 }
486
487 while let Some(current) = queue.pop_front() {
488 result.push(current.clone());
489
490 for neighbor in &adj_list[¤t] {
492 if let Some(deg) = in_degree.get_mut(neighbor) {
493 *deg -= 1;
494 }
495 if in_degree[neighbor] == 0 {
496 queue.push_back(neighbor.clone());
497 }
498 }
499 }
500
501 if result.len() != workflow.steps.len() {
502 return Err(SklearsError::InvalidInput(
503 "Circular dependency detected".to_string(),
504 ));
505 }
506
507 Ok(result)
508 }
509
510 pub fn execute_workflow(&mut self, workflow: WorkflowDefinition) -> SklResult<ExecutionResult> {
512 let execution_start = Instant::now();
513 let execution_id = uuid::Uuid::new_v4().to_string();
514
515 let validation = self.validate_workflow(&workflow);
517 if !validation.is_valid {
518 return Ok(ExecutionResult {
519 execution_id,
520 success: false,
521 duration: execution_start.elapsed(),
522 step_results: Vec::new(),
523 outputs: HashMap::new(),
524 error: Some(format!(
525 "Workflow validation failed: {:?}",
526 validation.errors
527 )),
528 performance: PerformanceMetrics::default(),
529 });
530 }
531
532 self.context = ExecutionContext {
534 execution_id: execution_id.clone(),
535 workflow: workflow.clone(),
536 data_flow: HashMap::new(),
537 start_time: execution_start,
538 current_step: None,
539 execution_mode: workflow.execution.mode.clone(),
540 };
541
542 let execution_order = validation.execution_order.unwrap_or_default();
543 let mut step_results = Vec::new();
544 let mut success = true;
545 let mut error_message = None;
546
547 for step_id in execution_order {
549 let Some(step) = workflow.steps.iter().find(|s| s.id == step_id) else {
550 continue;
551 };
552 self.context.current_step = Some(step_id.clone());
553
554 match self.execute_step(step) {
555 Ok(step_result) => {
556 step_results.push(step_result);
557 }
558 Err(e) => {
559 success = false;
560 error_message = Some(e.to_string());
561 step_results.push(StepExecutionResult {
562 step_id: step_id.clone(),
563 algorithm: step.algorithm.clone(),
564 success: false,
565 duration: Duration::from_millis(0),
566 memory_usage: 0,
567 output_sizes: HashMap::new(),
568 error: Some(e.to_string()),
569 });
570 break;
571 }
572 }
573 }
574
575 self.stats.total_executions += 1;
577 if success {
578 self.stats.successful_executions += 1;
579 } else {
580 self.stats.failed_executions += 1;
581 }
582
583 let total_duration = execution_start.elapsed();
584 self.stats.average_execution_time = Duration::from_millis(
585 (((self.stats.average_execution_time.as_millis()
586 * u128::from(self.stats.total_executions - 1))
587 + total_duration.as_millis())
588 / u128::from(self.stats.total_executions))
589 .try_into()
590 .unwrap_or(u64::MAX),
591 );
592
593 Ok(ExecutionResult {
594 execution_id,
595 success,
596 duration: total_duration,
597 step_results,
598 outputs: self.extract_final_outputs(&workflow),
599 error: error_message,
600 performance: self.calculate_performance_metrics(execution_start),
601 })
602 }
603
604 fn execute_step(&mut self, step: &StepDefinition) -> SklResult<StepExecutionResult> {
606 let step_start = Instant::now();
607
608 let component = self
610 .registry
611 .get_component(&step.algorithm)
612 .ok_or_else(|| {
613 SklearsError::InvalidInput(format!("Component '{}' not found", step.algorithm))
614 })?;
615
616 let input_data = self.prepare_step_input(step)?;
618
619 let output_data = self.simulate_step_execution(step, &input_data)?;
621
622 self.store_step_output(step, output_data.clone());
624
625 *self
627 .stats
628 .step_execution_counts
629 .entry(step.algorithm.clone())
630 .or_insert(0) += 1;
631
632 Ok(StepExecutionResult {
633 step_id: step.id.clone(),
634 algorithm: step.algorithm.clone(),
635 success: true,
636 duration: step_start.elapsed(),
637 memory_usage: self.estimate_memory_usage(&output_data),
638 output_sizes: output_data
639 .matrices
640 .iter()
641 .map(|(k, v)| (k.clone(), v.len()))
642 .collect(),
643 error: None,
644 })
645 }
646
647 fn prepare_step_input(&self, step: &StepDefinition) -> SklResult<StepData> {
649 let mut input_data = StepData {
650 step_id: step.id.clone(),
651 port_name: "input".to_string(),
652 matrices: HashMap::new(),
653 arrays: HashMap::new(),
654 metadata: HashMap::new(),
655 timestamp: Instant::now(),
656 };
657
658 for connection in &self.context.workflow.connections {
660 if connection.to_step == step.id {
661 let source_data_key =
662 format!("{}:{}", connection.from_step, connection.from_output);
663 if let Some(source_data) = self.context.data_flow.get(&source_data_key) {
664 for (key, matrix) in &source_data.matrices {
666 input_data.matrices.insert(key.clone(), matrix.clone());
667 }
668 for (key, array) in &source_data.arrays {
669 input_data.arrays.insert(key.clone(), array.clone());
670 }
671 }
672 }
673 }
674
675 Ok(input_data)
676 }
677
678 fn simulate_step_execution(
680 &self,
681 step: &StepDefinition,
682 input_data: &StepData,
683 ) -> SklResult<StepData> {
684 let mut output_data = StepData {
688 step_id: step.id.clone(),
689 port_name: "output".to_string(),
690 matrices: HashMap::new(),
691 arrays: HashMap::new(),
692 metadata: HashMap::new(),
693 timestamp: Instant::now(),
694 };
695
696 match step.algorithm.as_str() {
698 "StandardScaler" => {
699 if let Some(input_matrix) = input_data.matrices.get("X") {
701 let scaled_matrix = input_matrix.clone(); output_data
703 .matrices
704 .insert("X_scaled".to_string(), scaled_matrix);
705 }
706 }
707 "LinearRegression" => {
708 if input_data.matrices.contains_key("X") && input_data.arrays.contains_key("y") {
710 output_data
712 .metadata
713 .insert("model_type".to_string(), "LinearRegression".to_string());
714 output_data
715 .metadata
716 .insert("trained".to_string(), "true".to_string());
717 }
718 }
719 _ => {
720 output_data.matrices = input_data.matrices.clone();
722 output_data.arrays = input_data.arrays.clone();
723 }
724 }
725
726 Ok(output_data)
727 }
728
729 fn store_step_output(&mut self, step: &StepDefinition, output_data: StepData) {
731 for output_name in &step.outputs {
732 let key = format!("{}:{}", step.id, output_name);
733 self.context.data_flow.insert(key, output_data.clone());
734 }
735 }
736
737 fn extract_final_outputs(&self, workflow: &WorkflowDefinition) -> HashMap<String, String> {
739 let mut outputs = HashMap::new();
740
741 for output in &workflow.outputs {
742 for step in &workflow.steps {
744 if step.outputs.contains(&output.name) {
745 let key = format!("{}:{}", step.id, output.name);
746 if let Some(data) = self.context.data_flow.get(&key) {
747 outputs.insert(
748 output.name.clone(),
749 format!("Data from step '{}' port '{}'", step.id, output.name),
750 );
751 }
752 }
753 }
754 }
755
756 outputs
757 }
758
759 fn calculate_performance_metrics(&self, start_time: Instant) -> PerformanceMetrics {
761 PerformanceMetrics {
762 total_time: start_time.elapsed(),
763 peak_memory: 0, cpu_utilization: 0.0, throughput: 0.0, parallelism_efficiency: 1.0, }
768 }
769
770 fn estimate_memory_usage(&self, _data: &StepData) -> u64 {
772 1024 * 1024 }
775
776 #[must_use]
778 pub fn get_statistics(&self) -> &ExecutionStatistics {
779 &self.stats
780 }
781}
782
783impl ExecutionContext {
784 fn new() -> Self {
785 Self {
786 execution_id: String::new(),
787 workflow: WorkflowDefinition::default(),
788 data_flow: HashMap::new(),
789 start_time: Instant::now(),
790 current_step: None,
791 execution_mode: ExecutionMode::Sequential,
792 }
793 }
794}
795
796impl ExecutionStatistics {
797 fn new() -> Self {
798 Self {
799 total_executions: 0,
800 successful_executions: 0,
801 failed_executions: 0,
802 average_execution_time: Duration::from_secs(0),
803 step_execution_counts: HashMap::new(),
804 }
805 }
806}
807
808impl Default for PerformanceMetrics {
809 fn default() -> Self {
810 Self {
811 total_time: Duration::from_secs(0),
812 peak_memory: 0,
813 cpu_utilization: 0.0,
814 throughput: 0.0,
815 parallelism_efficiency: 0.0,
816 }
817 }
818}
819
820impl Default for WorkflowExecutor {
821 fn default() -> Self {
822 Self::new()
823 }
824}
825
826#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
828pub enum ExecutionState {
829 Initializing,
831 Preparing,
833 Running,
835 Paused,
837 Completed,
839 Failed,
841 Cancelled,
843 TimedOut,
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct ExecutionTracker {
850 pub state: ExecutionState,
852 pub progress: f64,
854 pub current_step: Option<String>,
856 pub completed_steps: Vec<String>,
858 pub failed_steps: Vec<String>,
860 pub start_time: String,
862 pub estimated_completion: Option<String>,
864 pub errors: Vec<String>,
866 pub warnings: Vec<String>,
868}
869
870#[derive(Debug, Clone, Serialize, Deserialize)]
872pub struct ParallelExecutionConfig {
873 pub max_workers: usize,
875 pub queue_size: usize,
877 pub load_balancing: LoadBalancingStrategy,
879 pub thread_pool: ThreadPoolConfig,
881 pub resource_sharing: ResourceSharingStrategy,
883}
884
885#[derive(Debug, Clone, Serialize, Deserialize)]
887pub enum LoadBalancingStrategy {
888 RoundRobin,
890 LeastLoaded,
892 Random,
894 WorkStealing,
896 Custom(String),
898}
899
900#[derive(Debug, Clone, Serialize, Deserialize)]
902pub struct ThreadPoolConfig {
903 pub core_threads: usize,
905 pub max_threads: usize,
907 pub keep_alive_sec: u64,
909 pub stack_size: Option<usize>,
911}
912
913#[derive(Debug, Clone, Serialize, Deserialize)]
915pub enum ResourceSharingStrategy {
916 Exclusive,
918 Shared,
920 CopyOnWrite,
922 MemoryMapped,
924}
925
926#[derive(Debug, Clone, Serialize, Deserialize)]
928pub struct ResourceAllocation {
929 pub cpu: CpuAllocation,
931 pub memory: MemoryAllocation,
933 pub gpu: Option<GpuAllocation>,
935 pub disk: Option<DiskAllocation>,
937 pub network: Option<NetworkAllocation>,
939}
940
941#[derive(Debug, Clone, Serialize, Deserialize)]
943pub struct CpuAllocation {
944 pub cores: usize,
946 pub utilization_limit: f64,
948 pub affinity: Vec<usize>,
950}
951
952#[derive(Debug, Clone, Serialize, Deserialize)]
954pub struct MemoryAllocation {
955 pub max_memory_mb: usize,
957 pub memory_type: MemoryType,
959 pub allow_swap: bool,
961}
962
963#[derive(Debug, Clone, Serialize, Deserialize)]
965pub enum MemoryType {
966 Ram,
968 Hbm,
970 Nvram,
972 Any,
974}
975
976#[derive(Debug, Clone, Serialize, Deserialize)]
978pub struct GpuAllocation {
979 pub device_ids: Vec<usize>,
981 pub memory_per_gpu_mb: usize,
983 pub min_compute_capability: f64,
985}
986
987#[derive(Debug, Clone, Serialize, Deserialize)]
989pub struct DiskAllocation {
990 pub temp_storage_mb: usize,
992 pub storage_paths: Vec<String>,
994 pub io_bandwidth_mbs: Option<f64>,
996}
997
998#[derive(Debug, Clone, Serialize, Deserialize)]
1000pub struct NetworkAllocation {
1001 pub bandwidth_mbps: f64,
1003 pub max_connections: usize,
1005 pub interfaces: Vec<String>,
1007}
1008
1009#[derive(Debug, Clone, Serialize, Deserialize)]
1011pub struct ResourceManager {
1012 pub available_resources: ResourcePool,
1014 pub allocations: HashMap<String, ResourceAllocation>,
1016 pub monitoring: ResourceMonitoring,
1018 pub scheduling_strategy: ResourceSchedulingStrategy,
1020}
1021
1022#[derive(Debug, Clone, Serialize, Deserialize)]
1024pub struct ResourcePool {
1025 pub total_cpu_cores: usize,
1027 pub total_memory_mb: usize,
1029 pub gpus: Vec<GpuInfo>,
1031 pub disk_space_mb: usize,
1033 pub network_bandwidth_mbps: f64,
1035}
1036
1037#[derive(Debug, Clone, Serialize, Deserialize)]
1039pub struct GpuInfo {
1040 pub id: usize,
1042 pub name: String,
1044 pub memory_mb: usize,
1046 pub compute_capability: f64,
1048 pub available: bool,
1050}
1051
1052#[derive(Debug, Clone, Serialize, Deserialize)]
1054pub struct ResourceMonitoring {
1055 pub enabled: bool,
1057 pub interval_sec: u64,
1059 pub thresholds: ResourceThresholds,
1061}
1062
1063#[derive(Debug, Clone, Serialize, Deserialize)]
1065pub struct ResourceThresholds {
1066 pub cpu_warning: f64,
1068 pub memory_warning: f64,
1070 pub disk_warning: f64,
1072}
1073
1074#[derive(Debug, Clone, Serialize, Deserialize)]
1076pub enum ResourceSchedulingStrategy {
1077 Fcfs,
1079 Sjf,
1081 RoundRobin,
1083 Priority,
1085 FairShare,
1087 Custom(String),
1089}
1090
1091#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1093pub enum WorkflowExecutionError {
1094 #[error("Workflow validation error: {0}")]
1096 ValidationError(String),
1097 #[error("Resource allocation failed: {0}")]
1099 ResourceAllocationError(String),
1100 #[error("Step execution failed for '{0}': {1}")]
1102 StepExecutionError(String, String), #[error("Dependency resolution failed: {0}")]
1105 DependencyError(String),
1106 #[error("Workflow timeout: {0}")]
1108 TimeoutError(String),
1109 #[error("Workflow cancelled: {0}")]
1111 CancellationError(String),
1112 #[error("Configuration error: {0}")]
1114 ConfigurationError(String),
1115 #[error("Runtime error: {0}")]
1117 RuntimeError(String),
1118 #[error("System error: {0}")]
1120 SystemError(String),
1121}
1122
1123#[allow(non_snake_case)]
1124#[cfg(test)]
1125mod tests {
1126 use super::*;
1127 use crate::workflow_language::workflow_definitions::{DataType, StepType};
1128
1129 #[test]
1130 fn test_workflow_executor_creation() {
1131 let executor = WorkflowExecutor::new();
1132 assert_eq!(executor.stats.total_executions, 0);
1133 }
1134
1135 #[test]
1136 fn test_empty_workflow_validation() {
1137 let executor = WorkflowExecutor::new();
1138 let workflow = WorkflowDefinition::default();
1139
1140 let validation = executor.validate_workflow(&workflow);
1141 assert!(!validation.is_valid);
1142 assert!(!validation.errors.is_empty());
1143 assert_eq!(validation.errors[0].error_type, "EmptyWorkflow");
1144 }
1145
1146 #[test]
1147 fn test_valid_workflow_validation() {
1148 let executor = WorkflowExecutor::new();
1149 let mut workflow = WorkflowDefinition::default();
1150
1151 workflow.steps.push(StepDefinition::new(
1152 "step1",
1153 StepType::Transformer,
1154 "StandardScaler",
1155 ));
1156
1157 let validation = executor.validate_workflow(&workflow);
1158 assert!(validation.is_valid);
1159 assert!(validation.errors.is_empty());
1160 assert!(validation.execution_order.is_some());
1161 }
1162
1163 #[test]
1164 fn test_unknown_component_validation() {
1165 let executor = WorkflowExecutor::new();
1166 let mut workflow = WorkflowDefinition::default();
1167
1168 workflow.steps.push(StepDefinition::new(
1169 "step1",
1170 StepType::Transformer,
1171 "UnknownComponent",
1172 ));
1173
1174 let validation = executor.validate_workflow(&workflow);
1175 assert!(!validation.is_valid);
1176 assert!(!validation.errors.is_empty());
1177 assert_eq!(validation.errors[0].error_type, "UnknownComponent");
1178 }
1179
1180 #[test]
1181 fn test_execution_order_determination() {
1182 let executor = WorkflowExecutor::new();
1183 let mut workflow = WorkflowDefinition::default();
1184
1185 workflow.steps.push(
1187 StepDefinition::new("step1", StepType::Transformer, "StandardScaler")
1188 .with_output("X_scaled"),
1189 );
1190 workflow.steps.push(
1191 StepDefinition::new("step2", StepType::Trainer, "LinearRegression").with_input("X"),
1192 );
1193
1194 workflow
1196 .connections
1197 .push(Connection::direct("step1", "X_scaled", "step2", "X"));
1198
1199 let order = executor
1200 .determine_execution_order(&workflow)
1201 .unwrap_or_default();
1202 assert_eq!(order, vec!["step1".to_string(), "step2".to_string()]);
1203 }
1204
1205 #[test]
1206 fn test_circular_dependency_detection() {
1207 let executor = WorkflowExecutor::new();
1208 let mut workflow = WorkflowDefinition::default();
1209
1210 workflow.steps.push(StepDefinition::new(
1212 "step1",
1213 StepType::Transformer,
1214 "StandardScaler",
1215 ));
1216 workflow.steps.push(StepDefinition::new(
1217 "step2",
1218 StepType::Trainer,
1219 "LinearRegression",
1220 ));
1221
1222 workflow
1224 .connections
1225 .push(Connection::direct("step1", "output", "step2", "input"));
1226 workflow
1227 .connections
1228 .push(Connection::direct("step2", "output", "step1", "input"));
1229
1230 let result = executor.check_circular_dependencies(&workflow);
1231 assert!(result.is_err());
1232 }
1233}