sklears_compose/workflow_language/
workflow_definitions.rs

1//! Core Workflow Definitions and Data Structures
2//!
3//! This module provides the fundamental data structures for workflow definitions,
4//! including workflow metadata, input/output schemas, step definitions, data types,
5//! constraints, and execution configurations for machine learning pipelines.
6
7use chrono;
8use serde::{Deserialize, Serialize};
9use std::collections::BTreeMap;
10use std::fmt;
11
12/// Workflow description language for pipeline definitions
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct WorkflowDefinition {
15    /// Workflow metadata
16    pub metadata: WorkflowMetadata,
17    /// Input schema definition
18    pub inputs: Vec<InputDefinition>,
19    /// Output schema definition
20    pub outputs: Vec<OutputDefinition>,
21    /// Pipeline steps
22    pub steps: Vec<StepDefinition>,
23    /// Data flow connections
24    pub connections: Vec<Connection>,
25    /// Execution configuration
26    pub execution: ExecutionConfig,
27}
28
29/// Workflow metadata
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct WorkflowMetadata {
32    /// Workflow name
33    pub name: String,
34    /// Version
35    pub version: String,
36    /// Description
37    pub description: Option<String>,
38    /// Author
39    pub author: Option<String>,
40    /// Tags
41    pub tags: Vec<String>,
42    /// Creation timestamp
43    pub created_at: String,
44    /// Last modified timestamp
45    pub modified_at: String,
46}
47
48/// Input definition
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct InputDefinition {
51    /// Input name
52    pub name: String,
53    /// Data type
54    pub data_type: DataType,
55    /// Shape constraints
56    pub shape: Option<ShapeConstraint>,
57    /// Value constraints
58    pub constraints: Option<ValueConstraints>,
59    /// Description
60    pub description: Option<String>,
61}
62
63/// Output definition
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct OutputDefinition {
66    /// Output name
67    pub name: String,
68    /// Data type
69    pub data_type: DataType,
70    /// Shape
71    pub shape: Option<ShapeConstraint>,
72    /// Description
73    pub description: Option<String>,
74}
75
76/// Step definition in the workflow
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct StepDefinition {
79    /// Step identifier
80    pub id: String,
81    /// Step type
82    pub step_type: StepType,
83    /// Algorithm/component name
84    pub algorithm: String,
85    /// Parameters
86    pub parameters: BTreeMap<String, ParameterValue>,
87    /// Input mappings
88    pub inputs: Vec<String>,
89    /// Output mappings
90    pub outputs: Vec<String>,
91    /// Conditional execution
92    pub condition: Option<ExecutionCondition>,
93    /// Description
94    pub description: Option<String>,
95}
96
97/// Data type enumeration
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
99pub enum DataType {
100    /// 32-bit floating point
101    Float32,
102    /// 64-bit floating point
103    Float64,
104    /// 32-bit integer
105    Int32,
106    /// 64-bit integer
107    Int64,
108    /// Boolean
109    Boolean,
110    /// String
111    String,
112    /// Array of specific type
113    Array(Box<DataType>),
114    /// Matrix of specific type
115    Matrix(Box<DataType>),
116    /// Custom type
117    Custom(String),
118}
119
120/// Shape constraint
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct ShapeConstraint {
123    /// Dimensions
124    pub dimensions: Vec<DimensionConstraint>,
125    /// Optional shape validation
126    pub validation: Option<String>,
127}
128
129/// Dimension constraint
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub enum DimensionConstraint {
132    /// Fixed size
133    Fixed(usize),
134    /// Range of sizes
135    Range { min: usize, max: Option<usize> },
136    /// Any size
137    Any,
138    /// Size matches another dimension
139    MatchesDimension { step_id: String, dimension: usize },
140    /// Dynamic size determined at runtime
141    Dynamic(String),
142}
143
144/// Value constraints
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ValueConstraints {
147    /// Minimum value
148    pub min: Option<f64>,
149    /// Maximum value
150    pub max: Option<f64>,
151    /// Allowed values
152    pub allowed_values: Option<Vec<String>>,
153    /// Regular expression pattern for strings
154    pub pattern: Option<String>,
155    /// Custom validation function name
156    pub custom_validator: Option<String>,
157}
158
159/// Step type enumeration
160#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
161pub enum StepType {
162    /// Data input step
163    Input,
164    /// Data output step
165    Output,
166    /// Data preprocessing step
167    Preprocessor,
168    /// Feature transformation step
169    Transformer,
170    /// Model training step
171    Trainer,
172    /// Model prediction step
173    Predictor,
174    /// Evaluation/metrics step
175    Evaluator,
176    /// Custom processing step
177    Custom(String),
178}
179
180/// Parameter value
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub enum ParameterValue {
183    /// Float value
184    Float(f64),
185    /// Integer value
186    Int(i64),
187    /// Boolean value
188    Bool(bool),
189    /// String value
190    String(String),
191    /// Array of values
192    Array(Vec<ParameterValue>),
193}
194
195/// Connection between steps
196#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct Connection {
198    /// Source step identifier
199    pub from_step: String,
200    /// Source output name
201    pub from_output: String,
202    /// Target step identifier
203    pub to_step: String,
204    /// Target input name
205    pub to_input: String,
206    /// Connection type
207    pub connection_type: ConnectionType,
208    /// Optional transformation applied to data
209    pub transform: Option<String>,
210}
211
212/// Connection type
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
214pub enum ConnectionType {
215    /// Direct data flow
216    Direct,
217    /// Data splitting/broadcasting
218    Split,
219    /// Data aggregation/joining
220    Join,
221    /// Conditional connection
222    Conditional,
223}
224
225/// Execution condition
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct ExecutionCondition {
228    /// Condition expression
229    pub expression: String,
230    /// Variables referenced in the condition
231    pub variables: Vec<String>,
232}
233
234/// Execution configuration
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct ExecutionConfig {
237    /// Execution mode
238    pub mode: ExecutionMode,
239    /// Parallel execution configuration
240    pub parallel: Option<ParallelConfig>,
241    /// Resource limits
242    pub resources: Option<ResourceLimits>,
243    /// Caching configuration
244    pub caching: Option<CachingConfig>,
245}
246
247/// Execution mode
248#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
249pub enum ExecutionMode {
250    /// Sequential execution
251    Sequential,
252    /// Parallel execution
253    Parallel,
254    /// Distributed execution
255    Distributed,
256    /// GPU-accelerated execution
257    GPU,
258    /// Adaptive execution (choose based on data size)
259    Adaptive,
260}
261
262/// Parallel execution configuration
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct ParallelConfig {
265    /// Number of worker threads
266    pub num_workers: usize,
267    /// Chunk size for data parallelism
268    pub chunk_size: Option<usize>,
269    /// Load balancing strategy
270    pub load_balancing: String,
271}
272
273/// Resource limits
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct ResourceLimits {
276    /// Maximum memory usage in MB
277    pub max_memory_mb: Option<usize>,
278    /// Maximum CPU time in seconds
279    pub max_cpu_time_sec: Option<usize>,
280    /// Maximum wall clock time in seconds
281    pub max_wall_time_sec: Option<usize>,
282}
283
284/// Caching configuration
285#[derive(Debug, Clone, Serialize, Deserialize)]
286pub struct CachingConfig {
287    /// Enable step-level caching
288    pub enable_step_caching: bool,
289    /// Cache directory
290    pub cache_directory: Option<String>,
291    /// Cache expiration time in seconds
292    pub cache_ttl_sec: Option<usize>,
293    /// Maximum cache size in MB
294    pub max_cache_size_mb: Option<usize>,
295}
296
297impl Default for WorkflowDefinition {
298    fn default() -> Self {
299        Self {
300            metadata: WorkflowMetadata {
301                name: "Untitled Workflow".to_string(),
302                version: "1.0.0".to_string(),
303                description: None,
304                author: None,
305                tags: Vec::new(),
306                created_at: chrono::Utc::now().to_rfc3339(),
307                modified_at: chrono::Utc::now().to_rfc3339(),
308            },
309            inputs: Vec::new(),
310            outputs: Vec::new(),
311            steps: Vec::new(),
312            connections: Vec::new(),
313            execution: ExecutionConfig {
314                mode: ExecutionMode::Sequential,
315                parallel: None,
316                resources: None,
317                caching: None,
318            },
319        }
320    }
321}
322
323impl fmt::Display for DataType {
324    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325        match self {
326            DataType::Float32 => write!(f, "f32"),
327            DataType::Float64 => write!(f, "f64"),
328            DataType::Int32 => write!(f, "i32"),
329            DataType::Int64 => write!(f, "i64"),
330            DataType::Boolean => write!(f, "bool"),
331            DataType::String => write!(f, "String"),
332            DataType::Array(inner) => write!(f, "Array<{inner}>"),
333            DataType::Matrix(inner) => write!(f, "Matrix<{inner}>"),
334            DataType::Custom(name) => write!(f, "{name}"),
335        }
336    }
337}
338
339impl WorkflowMetadata {
340    /// Create new workflow metadata with defaults
341    #[must_use]
342    pub fn new(name: &str) -> Self {
343        Self {
344            name: name.to_string(),
345            version: "1.0.0".to_string(),
346            description: None,
347            author: None,
348            tags: Vec::new(),
349            created_at: chrono::Utc::now().to_rfc3339(),
350            modified_at: chrono::Utc::now().to_rfc3339(),
351        }
352    }
353
354    /// Update the modified timestamp
355    pub fn touch(&mut self) {
356        self.modified_at = chrono::Utc::now().to_rfc3339();
357    }
358}
359
360impl StepDefinition {
361    /// Create a new step definition
362    #[must_use]
363    pub fn new(id: &str, step_type: StepType, algorithm: &str) -> Self {
364        Self {
365            id: id.to_string(),
366            step_type,
367            algorithm: algorithm.to_string(),
368            parameters: BTreeMap::new(),
369            inputs: Vec::new(),
370            outputs: Vec::new(),
371            condition: None,
372            description: None,
373        }
374    }
375
376    /// Add a parameter to the step
377    #[must_use]
378    pub fn with_parameter(mut self, name: &str, value: ParameterValue) -> Self {
379        self.parameters.insert(name.to_string(), value);
380        self
381    }
382
383    /// Add an input mapping
384    #[must_use]
385    pub fn with_input(mut self, input: &str) -> Self {
386        self.inputs.push(input.to_string());
387        self
388    }
389
390    /// Add an output mapping
391    #[must_use]
392    pub fn with_output(mut self, output: &str) -> Self {
393        self.outputs.push(output.to_string());
394        self
395    }
396
397    /// Set description
398    #[must_use]
399    pub fn with_description(mut self, description: &str) -> Self {
400        self.description = Some(description.to_string());
401        self
402    }
403}
404
405impl Connection {
406    /// Create a new direct connection between steps
407    #[must_use]
408    pub fn direct(from_step: &str, from_output: &str, to_step: &str, to_input: &str) -> Self {
409        Self {
410            from_step: from_step.to_string(),
411            from_output: from_output.to_string(),
412            to_step: to_step.to_string(),
413            to_input: to_input.to_string(),
414            connection_type: ConnectionType::Direct,
415            transform: None,
416        }
417    }
418
419    /// Create a new connection with transformation
420    #[must_use]
421    pub fn with_transform(mut self, transform: &str) -> Self {
422        self.transform = Some(transform.to_string());
423        self
424    }
425
426    /// Set connection type
427    #[must_use]
428    pub fn with_type(mut self, connection_type: ConnectionType) -> Self {
429        self.connection_type = connection_type;
430        self
431    }
432}
433
434impl InputDefinition {
435    /// Create a new input definition
436    #[must_use]
437    pub fn new(name: &str, data_type: DataType) -> Self {
438        Self {
439            name: name.to_string(),
440            data_type,
441            shape: None,
442            constraints: None,
443            description: None,
444        }
445    }
446
447    /// Add shape constraints
448    #[must_use]
449    pub fn with_shape(mut self, shape: ShapeConstraint) -> Self {
450        self.shape = Some(shape);
451        self
452    }
453
454    /// Add value constraints
455    #[must_use]
456    pub fn with_constraints(mut self, constraints: ValueConstraints) -> Self {
457        self.constraints = Some(constraints);
458        self
459    }
460
461    /// Add description
462    #[must_use]
463    pub fn with_description(mut self, description: &str) -> Self {
464        self.description = Some(description.to_string());
465        self
466    }
467}
468
469impl OutputDefinition {
470    /// Create a new output definition
471    #[must_use]
472    pub fn new(name: &str, data_type: DataType) -> Self {
473        Self {
474            name: name.to_string(),
475            data_type,
476            shape: None,
477            description: None,
478        }
479    }
480
481    /// Add shape constraints
482    #[must_use]
483    pub fn with_shape(mut self, shape: ShapeConstraint) -> Self {
484        self.shape = Some(shape);
485        self
486    }
487
488    /// Add description
489    #[must_use]
490    pub fn with_description(mut self, description: &str) -> Self {
491        self.description = Some(description.to_string());
492        self
493    }
494}
495
496impl ShapeConstraint {
497    /// Create a new shape constraint with fixed dimensions
498    pub fn fixed(dimensions: Vec<usize>) -> Self {
499        Self {
500            dimensions: dimensions
501                .into_iter()
502                .map(DimensionConstraint::Fixed)
503                .collect(),
504            validation: None,
505        }
506    }
507
508    /// Create a shape constraint with dynamic dimensions
509    #[must_use]
510    pub fn dynamic(constraints: Vec<DimensionConstraint>) -> Self {
511        Self {
512            dimensions: constraints,
513            validation: None,
514        }
515    }
516
517    /// Add validation expression
518    #[must_use]
519    pub fn with_validation(mut self, validation: &str) -> Self {
520        self.validation = Some(validation.to_string());
521        self
522    }
523}
524
525impl ValueConstraints {
526    /// Create new value constraints
527    #[must_use]
528    pub fn new() -> Self {
529        Self {
530            min: None,
531            max: None,
532            allowed_values: None,
533            pattern: None,
534            custom_validator: None,
535        }
536    }
537
538    /// Set minimum value
539    #[must_use]
540    pub fn with_min(mut self, min: f64) -> Self {
541        self.min = Some(min);
542        self
543    }
544
545    /// Set maximum value
546    #[must_use]
547    pub fn with_max(mut self, max: f64) -> Self {
548        self.max = Some(max);
549        self
550    }
551
552    /// Set range
553    #[must_use]
554    pub fn with_range(mut self, min: f64, max: f64) -> Self {
555        self.min = Some(min);
556        self.max = Some(max);
557        self
558    }
559
560    /// Set allowed values
561    #[must_use]
562    pub fn with_allowed_values(mut self, values: Vec<String>) -> Self {
563        self.allowed_values = Some(values);
564        self
565    }
566
567    /// Set pattern for string validation
568    #[must_use]
569    pub fn with_pattern(mut self, pattern: &str) -> Self {
570        self.pattern = Some(pattern.to_string());
571        self
572    }
573}
574
575impl Default for ValueConstraints {
576    fn default() -> Self {
577        Self::new()
578    }
579}
580
581/// Parameter definition for step configuration
582#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct ParameterDefinition {
584    /// Parameter name
585    pub name: String,
586    /// Data type of the parameter
587    pub data_type: DataType,
588    /// Default value if not specified
589    pub default_value: Option<ParameterValue>,
590    /// Whether the parameter is required
591    pub required: bool,
592    /// Parameter description
593    pub description: Option<String>,
594    /// Value constraints
595    pub constraints: Option<ValueConstraints>,
596}
597
598/// Resource requirements for execution
599#[derive(Debug, Clone, Serialize, Deserialize)]
600pub struct ResourceRequirements {
601    /// CPU cores required
602    pub cpu_cores: Option<usize>,
603    /// Memory required in MB
604    pub memory_mb: Option<usize>,
605    /// GPU memory required in MB
606    pub gpu_memory_mb: Option<usize>,
607    /// Disk space required in MB
608    pub disk_space_mb: Option<usize>,
609    /// Network bandwidth required in Mbps
610    pub network_bandwidth_mbps: Option<f64>,
611}
612
613/// Step execution status
614#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
615pub enum StepStatus {
616    /// Step is pending execution
617    Pending,
618    /// Step is currently running
619    Running,
620    /// Step completed successfully
621    Completed,
622    /// Step failed with error
623    Failed,
624    /// Step was skipped
625    Skipped,
626    /// Step was cancelled
627    Cancelled,
628}
629
630/// Validation result for workflow components
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct ValidationResult {
633    /// Whether validation passed
634    pub is_valid: bool,
635    /// Validation errors found
636    pub errors: Vec<String>,
637    /// Validation warnings
638    pub warnings: Vec<String>,
639    /// Step or component ID that was validated
640    pub component_id: Option<String>,
641}
642
643/// Overall workflow status
644#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
645pub enum WorkflowStatus {
646    /// Workflow is being initialized
647    Initializing,
648    /// Workflow is ready to run
649    Ready,
650    /// Workflow is currently executing
651    Running,
652    /// Workflow paused execution
653    Paused,
654    /// Workflow completed successfully
655    Completed,
656    /// Workflow failed during execution
657    Failed,
658    /// Workflow was cancelled
659    Cancelled,
660}
661
662impl Default for WorkflowMetadata {
663    fn default() -> Self {
664        Self {
665            name: "Untitled Workflow".to_string(),
666            version: "1.0.0".to_string(),
667            description: None,
668            author: None,
669            tags: vec![],
670            created_at: chrono::Utc::now().to_rfc3339(),
671            modified_at: chrono::Utc::now().to_rfc3339(),
672        }
673    }
674}
675
676impl Default for ExecutionConfig {
677    fn default() -> Self {
678        Self {
679            mode: ExecutionMode::Sequential,
680            parallel: None,
681            resources: None,
682            caching: None,
683        }
684    }
685}
686
687#[allow(non_snake_case)]
688#[cfg(test)]
689mod tests {
690    use super::*;
691
692    #[test]
693    fn test_workflow_definition_default() {
694        let workflow = WorkflowDefinition::default();
695        assert_eq!(workflow.metadata.name, "Untitled Workflow");
696        assert_eq!(workflow.metadata.version, "1.0.0");
697        assert_eq!(workflow.steps.len(), 0);
698        assert_eq!(workflow.execution.mode, ExecutionMode::Sequential);
699    }
700
701    #[test]
702    fn test_step_definition_builder() {
703        let step = StepDefinition::new("test_step", StepType::Transformer, "StandardScaler")
704            .with_parameter("with_mean", ParameterValue::Bool(true))
705            .with_parameter("with_std", ParameterValue::Bool(true))
706            .with_input("X")
707            .with_output("X_scaled")
708            .with_description("Standard scaling step");
709
710        assert_eq!(step.id, "test_step");
711        assert_eq!(step.algorithm, "StandardScaler");
712        assert_eq!(step.parameters.len(), 2);
713        assert_eq!(step.inputs.len(), 1);
714        assert_eq!(step.outputs.len(), 1);
715        assert!(step.description.is_some());
716    }
717
718    #[test]
719    fn test_connection_builder() {
720        let connection = Connection::direct("step1", "output", "step2", "input")
721            .with_transform("normalize")
722            .with_type(ConnectionType::Split);
723
724        assert_eq!(connection.from_step, "step1");
725        assert_eq!(connection.to_step, "step2");
726        assert_eq!(connection.connection_type, ConnectionType::Split);
727        assert!(connection.transform.is_some());
728    }
729
730    #[test]
731    fn test_data_type_display() {
732        assert_eq!(format!("{}", DataType::Float32), "f32");
733        assert_eq!(
734            format!("{}", DataType::Array(Box::new(DataType::Float64))),
735            "Array<f64>"
736        );
737        assert_eq!(
738            format!("{}", DataType::Matrix(Box::new(DataType::Int32))),
739            "Matrix<i32>"
740        );
741    }
742
743    #[test]
744    fn test_input_output_definitions() {
745        let input = InputDefinition::new("features", DataType::Matrix(Box::new(DataType::Float64)))
746            .with_shape(ShapeConstraint::fixed(vec![1000, 20]))
747            .with_constraints(ValueConstraints::new().with_range(-1.0, 1.0))
748            .with_description("Feature matrix");
749
750        let output =
751            OutputDefinition::new("predictions", DataType::Array(Box::new(DataType::Float64)))
752                .with_shape(ShapeConstraint::fixed(vec![1000]))
753                .with_description("Model predictions");
754
755        assert_eq!(input.name, "features");
756        assert!(input.shape.is_some());
757        assert!(input.constraints.is_some());
758
759        assert_eq!(output.name, "predictions");
760        assert!(output.shape.is_some());
761    }
762
763    #[test]
764    fn test_workflow_metadata() {
765        let mut metadata = WorkflowMetadata::new("Test Workflow");
766        let original_time = metadata.modified_at.clone();
767
768        std::thread::sleep(std::time::Duration::from_millis(10));
769        metadata.touch();
770
771        assert_eq!(metadata.name, "Test Workflow");
772        assert_ne!(metadata.modified_at, original_time);
773    }
774
775    #[test]
776    fn test_execution_config() {
777        let config = ExecutionConfig {
778            mode: ExecutionMode::Parallel,
779            parallel: Some(ParallelConfig {
780                num_workers: 4,
781                chunk_size: Some(1000),
782                load_balancing: "round_robin".to_string(),
783            }),
784            resources: Some(ResourceLimits {
785                max_memory_mb: Some(1024),
786                max_cpu_time_sec: Some(300),
787                max_wall_time_sec: Some(600),
788            }),
789            caching: Some(CachingConfig {
790                enable_step_caching: true,
791                cache_directory: Some("/tmp/workflow_cache".to_string()),
792                cache_ttl_sec: Some(3600),
793                max_cache_size_mb: Some(512),
794            }),
795        };
796
797        assert_eq!(config.mode, ExecutionMode::Parallel);
798        assert!(config.parallel.is_some());
799        assert!(config.resources.is_some());
800        assert!(config.caching.is_some());
801    }
802}