1use crate::error::{CoreError, ErrorContext, ErrorLocation};
17use std::collections::{HashMap, VecDeque};
18use std::sync::{Arc, Mutex, RwLock};
19use std::time::{Duration, Instant, SystemTime};
20use thiserror::Error;
21
22#[cfg(feature = "parallel")]
23#[allow(unused_imports)]
24use crate::parallel_ops::*;
25
26#[cfg(feature = "async")]
27#[allow(unused_imports)]
28use tokio::sync::{mpsc, oneshot};
29
30#[derive(Error, Debug)]
32pub enum MLPipelineError {
33 #[error("Pipeline configuration error: {0}")]
35 ConfigurationError(String),
36
37 #[error("Pipeline execution failed: {0}")]
39 ExecutionError(String),
40
41 #[error("Model error: {0}")]
43 ModelError(String),
44
45 #[error("Feature processing error: {0}")]
47 FeatureError(String),
48
49 #[error("Data validation error: {0}")]
51 ValidationError(String),
52
53 #[error("Resource exhausted: {0}")]
55 ResourceExhausted(String),
56
57 #[error("Inference error: {0}")]
59 InferenceError(String),
60
61 #[error("Training error: {0}")]
63 TrainingError(String),
64
65 #[error("Monitoring error: {0}")]
67 MonitoringError(String),
68
69 #[error("Dependency error: {0}")]
71 DependencyError(String),
72}
73
74impl From<MLPipelineError> for CoreError {
75 fn from(err: MLPipelineError) -> Self {
76 match err {
77 MLPipelineError::ValidationError(msg) => CoreError::ValidationError(
78 ErrorContext::new(format!("{msg}"))
79 .with_location(ErrorLocation::new(file!(), line!())),
80 ),
81 MLPipelineError::ResourceExhausted(msg) => CoreError::ComputationError(
82 ErrorContext::new(format!("{msg}"))
83 .with_location(ErrorLocation::new(file!(), line!())),
84 ),
85 _ => CoreError::ComputationError(
86 ErrorContext::new(format!("{err}"))
87 .with_location(ErrorLocation::new(file!(), line!())),
88 ),
89 }
90 }
91}
92
93#[derive(Debug, Clone, PartialEq)]
95pub enum DataType {
96 Float32,
98 Float64,
100 Int32,
102 Int64,
104 String,
106 Boolean,
108 Categorical(Vec<String>),
110 Array(Box<DataType>),
112 Struct(HashMap<String, DataType>),
114}
115
116#[derive(Debug, Clone)]
118pub struct FeatureSchema {
119 pub name: String,
121 pub datatype: DataType,
123 pub required: bool,
125 pub defaultvalue: Option<FeatureValue>,
127 pub description: Option<String>,
129 pub constraints: Vec<FeatureConstraint>,
131}
132
133#[derive(Debug, Clone)]
135pub enum FeatureConstraint {
136 MinValue(f64),
138 MaxValue(f64),
140 ValidValues(Vec<String>),
142 Pattern(String),
144 Custom(String), }
147
148#[derive(Debug, Clone, PartialEq)]
150pub enum FeatureValue {
151 Float32(f32),
152 Float64(f64),
153 Int32(i32),
154 Int64(i64),
155 String(String),
156 Boolean(bool),
157 Array(Vec<FeatureValue>),
158 Struct(HashMap<String, FeatureValue>),
159 Null,
160}
161
162impl FeatureValue {
163 pub fn as_f64(&self) -> Option<f64> {
165 match self {
166 FeatureValue::Float32(v) => Some(*v as f64),
167 FeatureValue::Float64(v) => Some(*v),
168 FeatureValue::Int32(v) => Some(*v as f64),
169 FeatureValue::Int64(v) => Some(*v as f64),
170 _ => None,
171 }
172 }
173
174 pub fn as_string(&self) -> String {
176 match self {
177 FeatureValue::String(s) => s.clone(),
178 FeatureValue::Float32(v) => v.to_string(),
179 FeatureValue::Float64(v) => v.to_string(),
180 FeatureValue::Int32(v) => v.to_string(),
181 FeatureValue::Int64(v) => v.to_string(),
182 FeatureValue::Boolean(v) => v.to_string(),
183 FeatureValue::Null => "null".to_string(),
184 _ => format!("{self:?}"),
185 }
186 }
187
188 pub fn is_null(&self) -> bool {
190 matches!(self, FeatureValue::Null)
191 }
192}
193
194#[derive(Debug, Clone)]
196pub struct DataSample {
197 pub id: String,
199 pub features: HashMap<String, FeatureValue>,
201 pub target: Option<FeatureValue>,
203 pub timestamp: SystemTime,
205 pub metadata: HashMap<String, String>,
207}
208
209#[derive(Debug, Clone)]
211pub struct DataBatch {
212 pub samples: Vec<DataSample>,
214 pub metadata: HashMap<String, String>,
216 pub created_at: SystemTime,
218}
219
220impl DataBatch {
221 pub fn new() -> Self {
223 Self {
224 samples: Vec::new(),
225 metadata: HashMap::new(),
226 created_at: SystemTime::now(),
227 }
228 }
229
230 pub fn add_sample(&mut self, sample: DataSample) {
232 self.samples.push(sample);
233 }
234
235 pub fn size(&self) -> usize {
237 self.samples.len()
238 }
239
240 pub fn is_empty(&self) -> bool {
242 self.samples.is_empty()
243 }
244
245 pub fn extract_featurematrix(
247 &self,
248 featurenames: &[String],
249 ) -> Result<Vec<Vec<f64>>, MLPipelineError> {
250 let mut matrix = Vec::new();
251
252 for sample in &self.samples {
253 let mut row = Vec::new();
254 for feature_name in featurenames {
255 if let Some(value) = sample.features.get(feature_name) {
256 if let Some(numeric_value) = value.as_f64() {
257 row.push(numeric_value);
258 } else {
259 return Err(MLPipelineError::FeatureError(format!(
260 "Feature '{}' is not numeric",
261 feature_name
262 )));
263 }
264 } else {
265 return Err(MLPipelineError::FeatureError(format!(
266 "Feature '{}' not found in sample",
267 feature_name
268 )));
269 }
270 }
271 matrix.push(row);
272 }
273
274 Ok(matrix)
275 }
276}
277
278impl Default for DataBatch {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284pub trait PipelineNode: Send + Sync {
286 fn name(&self) -> &str;
288
289 fn input_schema(&self) -> &[FeatureSchema];
291
292 fn output_schema(&self) -> &[FeatureSchema];
294
295 fn process(&self, batch: DataBatch) -> Result<DataBatch, MLPipelineError>;
297
298 fn validate(&self) -> Result<(), MLPipelineError>;
300
301 fn metrics(&self) -> HashMap<String, f64>;
303}
304
305#[derive(Debug, Clone)]
307pub struct FeatureTransformer {
308 name: String,
309 transform_type: TransformType,
310 input_features: Vec<String>,
311 output_features: Vec<String>,
312 parameters: HashMap<String, FeatureValue>,
313 metrics: Arc<Mutex<HashMap<String, f64>>>,
314}
315
316#[derive(Debug, Clone)]
318pub enum TransformType {
319 MinMaxScaler,
321 StandardScaler,
323 OneHotEncoder,
325 LabelEncoder,
327 LogTransform,
329 PowerTransform { power: f64 },
331 PCA { n_components: usize },
333 Custom(String),
335}
336
337impl FeatureTransformer {
338 pub fn new(
340 name: String,
341 transform_type: TransformType,
342 input_features: Vec<String>,
343 output_features: Vec<String>,
344 ) -> Self {
345 Self {
346 name,
347 transform_type,
348 input_features,
349 output_features,
350 parameters: HashMap::new(),
351 metrics: Arc::new(Mutex::new(HashMap::new())),
352 }
353 }
354
355 pub fn set_parameter(&mut self, key: String, value: FeatureValue) {
357 self.parameters.insert(key, value);
358 }
359
360 pub fn fit(&mut self, batch: &DataBatch) -> Result<(), MLPipelineError> {
362 match &self.transform_type {
363 TransformType::MinMaxScaler => self.fit_minmax_scaler(batch),
364 TransformType::StandardScaler => self.fit_standard_scaler(batch),
365 TransformType::OneHotEncoder => self.fit_onehot_encoder(batch),
366 TransformType::LabelEncoder => self.fit_label_encoder(batch),
367 _ => Ok(()), }
369 }
370
371 pub fn transform(&self, batch: DataBatch) -> Result<DataBatch, MLPipelineError> {
373 let start_time = Instant::now();
374
375 let mut transformed_batch = DataBatch::new();
376 transformed_batch.metadata = batch.metadata;
377
378 for sample in batch.samples {
379 let mut transformed_sample = sample.clone();
380
381 match &self.transform_type {
382 TransformType::MinMaxScaler => {
383 self.apply_minmax_transform(&mut transformed_sample)?;
384 }
385 TransformType::StandardScaler => {
386 self.apply_standard_transform(&mut transformed_sample)?;
387 }
388 TransformType::LogTransform => {
389 self.applylog_transform(&mut transformed_sample)?;
390 }
391 TransformType::PowerTransform { power } => {
392 self.apply_power_transform(&mut transformed_sample, *power)?;
393 }
394 _ => {
395 return Err(MLPipelineError::FeatureError(format!(
396 "Transform type {:?} not implemented",
397 self.transform_type
398 )));
399 }
400 }
401
402 transformed_batch.add_sample(transformed_sample);
403 }
404
405 let processing_time = start_time.elapsed().as_millis() as f64;
407 self.metrics
408 .lock()
409 .expect("Operation failed")
410 .insert("processing_time_ms".to_string(), processing_time);
411 self.metrics.lock().expect("Operation failed").insert(
412 "samples_processed".to_string(),
413 transformed_batch.size() as f64,
414 );
415
416 Ok(transformed_batch)
417 }
418
419 fn fit_minmax_scaler(&mut self, batch: &DataBatch) -> Result<(), MLPipelineError> {
421 for feature_name in &self.input_features {
422 let mut min_val = f64::INFINITY;
423 let mut max_val = f64::NEG_INFINITY;
424
425 for sample in &batch.samples {
426 if let Some(value) = sample.features.get(feature_name) {
427 if let Some(numeric_value) = value.as_f64() {
428 min_val = min_val.min(numeric_value);
429 max_val = max_val.max(numeric_value);
430 }
431 }
432 }
433
434 self.parameters.insert(
435 format!("{}_min", feature_name),
436 FeatureValue::Float64(min_val),
437 );
438 self.parameters.insert(
439 format!("{}_max", feature_name),
440 FeatureValue::Float64(max_val),
441 );
442 }
443
444 Ok(())
445 }
446
447 fn fit_standard_scaler(&mut self, batch: &DataBatch) -> Result<(), MLPipelineError> {
449 for feature_name in &self.input_features {
450 let mut values = Vec::new();
451
452 for sample in &batch.samples {
453 if let Some(value) = sample.features.get(feature_name) {
454 if let Some(numeric_value) = value.as_f64() {
455 values.push(numeric_value);
456 }
457 }
458 }
459
460 if !values.is_empty() {
461 let mean = values.iter().sum::<f64>() / values.len() as f64;
462 let variance =
463 values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
464 let std_dev = variance.sqrt();
465
466 self.parameters.insert(
467 format!("{}_mean", feature_name),
468 FeatureValue::Float64(mean),
469 );
470 self.parameters.insert(
471 format!("{}_std", feature_name),
472 FeatureValue::Float64(std_dev),
473 );
474 }
475 }
476
477 Ok(())
478 }
479
480 fn fit_onehot_encoder(&mut self, batch: &DataBatch) -> Result<(), MLPipelineError> {
482 for feature_name in &self.input_features {
483 let mut unique_values = std::collections::HashSet::new();
484
485 for sample in &batch.samples {
486 if let Some(value) = sample.features.get(feature_name) {
487 unique_values.insert(value.as_string());
488 }
489 }
490
491 let categories: Vec<_> = unique_values.into_iter().collect();
492 self.parameters.insert(
493 format!("{}_categories", feature_name),
494 FeatureValue::Array(categories.into_iter().map(FeatureValue::String).collect()),
495 );
496 }
497
498 Ok(())
499 }
500
501 fn fit_label_encoder(&mut self, batch: &DataBatch) -> Result<(), MLPipelineError> {
503 for feature_name in &self.input_features {
504 let mut unique_values = std::collections::HashSet::new();
505
506 for sample in &batch.samples {
507 if let Some(value) = sample.features.get(feature_name) {
508 unique_values.insert(value.as_string());
509 }
510 }
511
512 let mut categories: Vec<_> = unique_values.into_iter().collect();
513 categories.sort(); let label_map: HashMap<String, i64> = categories
516 .into_iter()
517 .enumerate()
518 .map(|(i, cat)| (cat, i as i64))
519 .collect();
520
521 for (category, label) in &label_map {
522 self.parameters.insert(
523 format!("{}_{}_label", feature_name, category),
524 FeatureValue::Int64(*label),
525 );
526 }
527 }
528
529 Ok(())
530 }
531
532 fn apply_minmax_transform(&self, sample: &mut DataSample) -> Result<(), MLPipelineError> {
534 for feature_name in &self.input_features {
535 if let Some(value) = sample.features.get(feature_name).cloned() {
536 if let Some(numeric_value) = value.as_f64() {
537 let min_key = format!("{}_min", feature_name);
538 let max_key = format!("{}_max", feature_name);
539
540 let min_val = self
541 .parameters
542 .get(&min_key)
543 .and_then(|v| v.as_f64())
544 .unwrap_or(0.0);
545 let max_val = self
546 .parameters
547 .get(&max_key)
548 .and_then(|v| v.as_f64())
549 .unwrap_or(1.0);
550
551 let scaled_value = if max_val > min_val {
552 (numeric_value - min_val) / (max_val - min_val)
553 } else {
554 0.0
555 };
556
557 sample
558 .features
559 .insert(feature_name.clone(), FeatureValue::Float64(scaled_value));
560 }
561 }
562 }
563
564 Ok(())
565 }
566
567 fn apply_standard_transform(&self, sample: &mut DataSample) -> Result<(), MLPipelineError> {
569 for feature_name in &self.input_features {
570 if let Some(value) = sample.features.get(feature_name).cloned() {
571 if let Some(numeric_value) = value.as_f64() {
572 let mean_key = format!("{}_mean", feature_name);
573 let std_key = format!("{}_std", feature_name);
574
575 let mean = self
576 .parameters
577 .get(&mean_key)
578 .and_then(|v| v.as_f64())
579 .unwrap_or(0.0);
580 let std_dev = self
581 .parameters
582 .get(&std_key)
583 .and_then(|v| v.as_f64())
584 .unwrap_or(1.0);
585
586 let standardized_value = if std_dev > 0.0 {
587 (numeric_value - mean) / std_dev
588 } else {
589 0.0
590 };
591
592 sample.features.insert(
593 feature_name.clone(),
594 FeatureValue::Float64(standardized_value),
595 );
596 }
597 }
598 }
599
600 Ok(())
601 }
602
603 fn applylog_transform(&self, sample: &mut DataSample) -> Result<(), MLPipelineError> {
605 for feature_name in &self.input_features {
606 if let Some(value) = sample.features.get(feature_name).cloned() {
607 if let Some(numeric_value) = value.as_f64() {
608 if numeric_value > 0.0 {
609 let log_value = numeric_value.ln();
610 sample
611 .features
612 .insert(feature_name.clone(), FeatureValue::Float64(log_value));
613 } else {
614 return Err(MLPipelineError::FeatureError(format!(
615 "Cannot apply log transform to non-positive value: {}",
616 numeric_value
617 )));
618 }
619 }
620 }
621 }
622
623 Ok(())
624 }
625
626 fn apply_power_transform(
628 &self,
629 sample: &mut DataSample,
630 power: f64,
631 ) -> Result<(), MLPipelineError> {
632 for feature_name in &self.input_features {
633 if let Some(value) = sample.features.get(feature_name).cloned() {
634 if let Some(numeric_value) = value.as_f64() {
635 let transformed_value = numeric_value.powf(power);
636 sample.features.insert(
637 feature_name.clone(),
638 FeatureValue::Float64(transformed_value),
639 );
640 }
641 }
642 }
643
644 Ok(())
645 }
646}
647
648impl PipelineNode for FeatureTransformer {
649 fn name(&self) -> &str {
650 &self.name
651 }
652
653 fn input_schema(&self) -> &[FeatureSchema] {
654 &[]
656 }
657
658 fn output_schema(&self) -> &[FeatureSchema] {
659 &[]
661 }
662
663 fn process(&self, batch: DataBatch) -> Result<DataBatch, MLPipelineError> {
664 self.transform(batch)
665 }
666
667 fn validate(&self) -> Result<(), MLPipelineError> {
668 if self.input_features.is_empty() {
669 return Err(MLPipelineError::ConfigurationError(
670 "No input features specified".to_string(),
671 ));
672 }
673
674 Ok(())
675 }
676
677 fn metrics(&self) -> HashMap<String, f64> {
678 self.metrics.lock().expect("Operation failed").clone()
679 }
680}
681
682pub struct ModelPredictor {
684 name: String,
685 model_type: ModelType,
686 input_features: Vec<String>,
687 output_features: Vec<String>,
688 model_data: Vec<u8>, metrics: Arc<Mutex<HashMap<String, f64>>>,
690}
691
692#[derive(Debug, Clone)]
694pub enum ModelType {
695 LinearRegression,
697 LogisticRegression,
699 RandomForest,
701 NeuralNetwork,
703 SVM,
705 Custom(String),
707}
708
709impl ModelPredictor {
710 pub fn new(
712 name: String,
713 model_type: ModelType,
714 input_features: Vec<String>,
715 output_features: Vec<String>,
716 model_data: Vec<u8>,
717 ) -> Self {
718 Self {
719 name,
720 model_type,
721 input_features,
722 output_features,
723 model_data,
724 metrics: Arc::new(Mutex::new(HashMap::new())),
725 }
726 }
727
728 pub fn loadmodel(&mut self, modeldata: Vec<u8>) -> Result<(), MLPipelineError> {
730 self.model_data = modeldata;
731 Ok(())
733 }
734
735 pub fn predict(&self, batch: DataBatch) -> Result<DataBatch, MLPipelineError> {
737 let start_time = Instant::now();
738
739 let mut prediction_batch = DataBatch::new();
740 prediction_batch.metadata = batch.metadata;
741
742 for sample in batch.samples {
743 let mut prediction_sample = sample.clone();
744
745 let feature_values: Vec<f64> = self
747 .input_features
748 .iter()
749 .map(|feature_name| {
750 sample
751 .features
752 .get(feature_name)
753 .and_then(|v| v.as_f64())
754 .unwrap_or(0.0)
755 })
756 .collect();
757
758 let prediction = self.make_prediction(&feature_values)?;
760
761 for (i, output_feature) in self.output_features.iter().enumerate() {
763 let pred_value = prediction.get(i).copied().unwrap_or(0.0);
764 prediction_sample
765 .features
766 .insert(output_feature.clone(), FeatureValue::Float64(pred_value));
767 }
768
769 prediction_batch.add_sample(prediction_sample);
770 }
771
772 let processing_time = start_time.elapsed().as_millis() as f64;
774 self.metrics
775 .lock()
776 .expect("Operation failed")
777 .insert("inference_time_ms".to_string(), processing_time);
778 self.metrics.lock().expect("Operation failed").insert(
779 "samples_predicted".to_string(),
780 prediction_batch.size() as f64,
781 );
782
783 Ok(prediction_batch)
784 }
785
786 fn make_prediction(&self, features: &[f64]) -> Result<Vec<f64>, MLPipelineError> {
788 match &self.model_type {
790 ModelType::LinearRegression => {
791 let prediction = features.iter().sum::<f64>() / features.len() as f64;
793 Ok(vec![prediction])
794 }
795 ModelType::LogisticRegression => {
796 let linear_output = features.iter().sum::<f64>();
798 let prediction = 1.0 / (1.0 + (-linear_output).exp());
799 Ok(vec![prediction])
800 }
801 ModelType::RandomForest => {
802 let prediction =
804 features.iter().map(|&x| x.abs()).sum::<f64>() / features.len() as f64;
805 Ok(vec![prediction])
806 }
807 _ => Err(MLPipelineError::InferenceError(format!(
808 "Model type {:?} not implemented",
809 self.model_type
810 ))),
811 }
812 }
813}
814
815impl PipelineNode for ModelPredictor {
816 fn name(&self) -> &str {
817 &self.name
818 }
819
820 fn input_schema(&self) -> &[FeatureSchema] {
821 &[]
822 }
823
824 fn output_schema(&self) -> &[FeatureSchema] {
825 &[]
826 }
827
828 fn process(&self, batch: DataBatch) -> Result<DataBatch, MLPipelineError> {
829 self.predict(batch)
830 }
831
832 fn validate(&self) -> Result<(), MLPipelineError> {
833 if self.input_features.is_empty() {
834 return Err(MLPipelineError::ConfigurationError(
835 "No input features specified for model".to_string(),
836 ));
837 }
838
839 if self.output_features.is_empty() {
840 return Err(MLPipelineError::ConfigurationError(
841 "No output features specified for model".to_string(),
842 ));
843 }
844
845 if self.model_data.is_empty() {
846 return Err(MLPipelineError::ModelError(
847 "No model data loaded".to_string(),
848 ));
849 }
850
851 Ok(())
852 }
853
854 fn metrics(&self) -> HashMap<String, f64> {
855 self.metrics.lock().expect("Operation failed").clone()
856 }
857}
858
859pub struct MLPipeline {
861 name: String,
862 nodes: Vec<Box<dyn PipelineNode>>,
863 node_dependencies: HashMap<String, Vec<String>>,
864 pipeline_metrics: Arc<RwLock<PipelineMetrics>>,
865 config: PipelineConfig,
866}
867
868#[derive(Debug, Clone)]
870pub struct PipelineConfig {
871 pub max_batchsize: usize,
873 pub node_timeout: Duration,
875 pub parallel_processing: bool,
877 pub error_strategy: ErrorStrategy,
879 pub monitoring: MonitoringConfig,
881}
882
883impl Default for PipelineConfig {
884 fn default() -> Self {
885 Self {
886 max_batchsize: 1000,
887 node_timeout: Duration::from_secs(30),
888 parallel_processing: true,
889 error_strategy: ErrorStrategy::FailFast,
890 monitoring: MonitoringConfig::default(),
891 }
892 }
893}
894
895#[derive(Debug, Clone)]
897pub enum ErrorStrategy {
898 FailFast,
900 SkipErrors,
902 RetryWithBackoff {
904 maxretries: u32,
905 basedelay: Duration,
906 },
907}
908
909#[derive(Debug, Clone)]
911pub struct MonitoringConfig {
912 pub enable_metrics: bool,
914 pub metrics_interval: Duration,
916 pub enable_health_checks: bool,
918 pub health_check_interval: Duration,
920}
921
922impl Default for MonitoringConfig {
923 fn default() -> Self {
924 Self {
925 enable_metrics: true,
926 metrics_interval: Duration::from_secs(60),
927 enable_health_checks: true,
928 health_check_interval: Duration::from_secs(30),
929 }
930 }
931}
932
933#[derive(Debug, Clone)]
935pub struct PipelineMetrics {
936 pub samples_processed: u64,
938 pub total_processing_time: Duration,
940 pub error_count: u64,
942 pub success_rate: f64,
944 pub throughput: f64,
946 pub node_metrics: HashMap<String, HashMap<String, f64>>,
948 pub last_updated: SystemTime,
950}
951
952impl Default for PipelineMetrics {
953 fn default() -> Self {
954 Self {
955 samples_processed: 0,
956 total_processing_time: Duration::default(),
957 error_count: 0,
958 success_rate: 0.0,
959 throughput: 0.0,
960 node_metrics: HashMap::default(),
961 last_updated: SystemTime::UNIX_EPOCH,
962 }
963 }
964}
965
966impl MLPipeline {
967 pub fn new(name: String, config: PipelineConfig) -> Self {
969 Self {
970 name,
971 nodes: Vec::new(),
972 node_dependencies: HashMap::new(),
973 pipeline_metrics: Arc::new(RwLock::new(PipelineMetrics::default())),
974 config,
975 }
976 }
977
978 pub fn add_node(&mut self, node: Box<dyn PipelineNode>) -> Result<(), MLPipelineError> {
980 node.validate()?;
981 self.nodes.push(node);
982 Ok(())
983 }
984
985 pub fn set_dependencies(&mut self, nodename: String, dependencies: Vec<String>) {
987 self.node_dependencies.insert(nodename, dependencies);
988 }
989
990 pub fn execute(&self, mut batch: DataBatch) -> Result<DataBatch, MLPipelineError> {
992 let start_time = Instant::now();
993 let initial_size = batch.size();
994
995 if batch.size() > self.config.max_batchsize {
997 return Err(MLPipelineError::ValidationError(format!(
998 "Batch size {} exceeds maximum {}",
999 batch.size(),
1000 self.config.max_batchsize
1001 )));
1002 }
1003
1004 let execution_order = self.get_execution_order()?;
1006
1007 for node_name in execution_order {
1008 if let Some(node) = self.nodes.iter().find(|n| n.name() == node_name) {
1009 let node_start = Instant::now();
1010
1011 let batch_clone = batch.clone();
1012 batch = match node.process(batch) {
1013 Ok(processed_batch) => {
1014 let node_time = node_start.elapsed();
1016 self.update_node_metrics(&node_name, node_time, processed_batch.size());
1017 processed_batch
1018 }
1019 Err(e) => match &self.config.error_strategy {
1020 ErrorStrategy::FailFast => return Err(e),
1021 ErrorStrategy::SkipErrors => {
1022 eprintln!("Node {} failed: {}, continuing...", node_name, e);
1023 batch_clone
1024 }
1025 ErrorStrategy::RetryWithBackoff {
1026 maxretries,
1027 basedelay,
1028 } => {
1029 let mut retries = 0;
1030 loop {
1031 if retries >= *maxretries {
1032 return Err(e);
1033 }
1034
1035 std::thread::sleep(*basedelay * 2_u32.pow(retries));
1036
1037 match node.process(batch_clone.clone()) {
1038 Ok(processed_batch) => {
1039 break processed_batch;
1040 }
1041 Err(_) => {
1042 retries += 1;
1043 }
1044 }
1045 }
1046 }
1047 },
1048 }
1049 }
1050 }
1051
1052 let total_time = start_time.elapsed();
1054 self.update_pipeline_metrics(total_time, true, initial_size, total_time);
1055
1056 Ok(batch)
1057 }
1058
1059 fn get_execution_order(&self) -> Result<Vec<String>, MLPipelineError> {
1061 let mut order = Vec::new();
1062 let mut visited = std::collections::HashSet::new();
1063 let mut visiting = std::collections::HashSet::new();
1064
1065 for node in &self.nodes {
1066 if !visited.contains(node.name()) {
1067 self.dfs_visit(node.name(), &mut order, &mut visited, &mut visiting)?;
1068 }
1069 }
1070
1071 Ok(order)
1072 }
1073
1074 fn dfs_visit(
1076 &self,
1077 node_name: &str,
1078 order: &mut Vec<String>,
1079 visited: &mut std::collections::HashSet<String>,
1080 visiting: &mut std::collections::HashSet<String>,
1081 ) -> Result<(), MLPipelineError> {
1082 if visiting.contains(node_name) {
1083 return Err(MLPipelineError::DependencyError(
1084 "Circular dependency detected".to_string(),
1085 ));
1086 }
1087
1088 if visited.contains(node_name) {
1089 return Ok(());
1090 }
1091
1092 visiting.insert(node_name.to_string());
1093
1094 if let Some(dependencies) = self.node_dependencies.get(node_name) {
1095 for dep in dependencies {
1096 self.dfs_visit(dep, order, visited, visiting)?;
1097 }
1098 }
1099
1100 visiting.remove(node_name);
1101 visited.insert(node_name.to_string());
1102 order.push(node_name.to_string());
1103
1104 Ok(())
1105 }
1106
1107 fn update_node_metrics(&self, node_name: &str, processing_time: Duration, batchsize: usize) {
1109 if let Ok(mut metrics) = self.pipeline_metrics.write() {
1110 let node_metrics = metrics
1111 .node_metrics
1112 .entry(node_name.to_string())
1113 .or_insert_with(HashMap::new);
1114 node_metrics.insert(
1115 "processing_time_ms".to_string(),
1116 processing_time.as_millis() as f64,
1117 );
1118 node_metrics.insert("batchsize".to_string(), batchsize as f64);
1119 node_metrics.insert(
1120 "throughput".to_string(),
1121 batchsize as f64 / processing_time.as_secs_f64(),
1122 );
1123 }
1124 }
1125
1126 fn update_pipeline_metrics(
1128 &self,
1129 duration: Duration,
1130 success: bool,
1131 batchsize: usize,
1132 processing_time: Duration,
1133 ) {
1134 if let Ok(mut metrics) = self.pipeline_metrics.write() {
1135 metrics.samples_processed += batchsize as u64;
1136 metrics.total_processing_time += processing_time;
1137
1138 if !success {
1139 metrics.error_count += 1;
1140 }
1141
1142 let total_executions = metrics.samples_processed as f64 / batchsize as f64;
1143 metrics.success_rate =
1144 (total_executions - metrics.error_count as f64) / total_executions;
1145 metrics.throughput =
1146 metrics.samples_processed as f64 / metrics.total_processing_time.as_secs_f64();
1147 metrics.last_updated = SystemTime::now();
1148 }
1149 }
1150
1151 pub fn get_metrics(&self) -> PipelineMetrics {
1153 self.pipeline_metrics
1154 .read()
1155 .expect("Operation failed")
1156 .clone()
1157 }
1158
1159 pub fn config(&self) -> &PipelineConfig {
1161 &self.config
1162 }
1163
1164 pub fn validate(&self) -> Result<(), MLPipelineError> {
1166 for node in &self.nodes {
1168 node.validate()?;
1169 }
1170
1171 self.get_execution_order()?;
1173
1174 Ok(())
1175 }
1176}
1177
1178#[cfg(feature = "async")]
1180pub struct StreamingProcessor {
1181 pipeline: Arc<MLPipeline>,
1182 input_buffer: Arc<Mutex<VecDeque<DataSample>>>,
1183 output_buffer: Arc<Mutex<VecDeque<DataSample>>>,
1184 batchsize: usize,
1185 processing_interval: Duration,
1186 is_running: Arc<Mutex<bool>>,
1187}
1188
1189#[cfg(feature = "async")]
1190impl StreamingProcessor {
1191 pub fn with_interval(
1193 processing_interval: Duration,
1194 batchsize: usize,
1195 pipeline: Arc<MLPipeline>,
1196 ) -> Self {
1197 Self {
1198 pipeline,
1199 input_buffer: Arc::new(Mutex::new(VecDeque::new())),
1200 output_buffer: Arc::new(Mutex::new(VecDeque::new())),
1201 batchsize,
1202 processing_interval,
1203 is_running: Arc::new(Mutex::new(false)),
1204 }
1205 }
1206
1207 pub async fn start(&self) -> Result<(), MLPipelineError> {
1209 {
1210 let mut running = self.is_running.lock().expect("Operation failed");
1211 if *running {
1212 return Err(MLPipelineError::ExecutionError(
1213 "Processor already running".to_string(),
1214 ));
1215 }
1216 *running = true;
1217 }
1218
1219 let pipeline = self.pipeline.clone();
1220 let input_buffer = self.input_buffer.clone();
1221 let output_buffer = self.output_buffer.clone();
1222 let batchsize = self.batchsize;
1223 let processing_interval = self.processing_interval;
1224 let is_running = self.is_running.clone();
1225
1226 tokio::spawn(async move {
1227 let mut interval = tokio::time::interval(processing_interval);
1228
1229 loop {
1230 interval.tick().await;
1231
1232 if !*is_running.lock().expect("Operation failed") {
1233 break;
1234 }
1235
1236 let mut batch = DataBatch::new();
1238 {
1239 let mut input = input_buffer.lock().expect("Operation failed");
1240 let mut count = 0;
1241 while count < batchsize && !input.is_empty() {
1242 if let Some(sample) = input.pop_front() {
1243 batch.add_sample(sample);
1244 count += 1;
1245 }
1246 }
1247 }
1248
1249 if !batch.is_empty() {
1250 match pipeline.execute(batch) {
1251 Ok(processed_batch) => {
1252 let mut output = output_buffer.lock().expect("Operation failed");
1253 for sample in processed_batch.samples {
1254 output.push_back(sample);
1255 }
1256 }
1257 Err(e) => {
1258 eprintln!("Streaming processing error: {}", e);
1259 }
1260 }
1261 }
1262 }
1263 });
1264
1265 Ok(())
1266 }
1267
1268 pub fn stop(&self) {
1270 *self.is_running.lock().expect("Operation failed") = false;
1271 }
1272
1273 pub fn add_sample(&self, sample: DataSample) {
1275 self.input_buffer
1276 .lock()
1277 .expect("Operation failed")
1278 .push_back(sample);
1279 }
1280
1281 pub fn get_samples(&self, maxcount: usize) -> Vec<DataSample> {
1283 let mut output = self.output_buffer.lock().expect("Operation failed");
1284 let mut samples = Vec::new();
1285 let mut _count = 0;
1286
1287 while _count < maxcount && !output.is_empty() {
1288 if let Some(sample) = output.pop_front() {
1289 samples.push(sample);
1290 _count += 1;
1291 }
1292 }
1293
1294 samples
1295 }
1296
1297 pub fn get_buffer_stats(&self) -> (usize, usize) {
1299 let input_size = self.input_buffer.lock().expect("Operation failed").len();
1300 let output_size = self.output_buffer.lock().expect("Operation failed").len();
1301 (input_size, output_size)
1302 }
1303}
1304
1305pub mod utils {
1307 use super::*;
1308
1309 pub fn with_preprocessing(featurenames: Vec<String>) -> MLPipeline {
1311 let mut pipeline = MLPipeline::new("preprocessing".to_string(), PipelineConfig::default());
1312
1313 let scaler = FeatureTransformer::new(
1315 "standard_scaler".to_string(),
1316 TransformType::StandardScaler,
1317 featurenames.clone(),
1318 featurenames.clone(),
1319 );
1320 pipeline
1321 .add_node(Box::new(scaler))
1322 .expect("Operation failed");
1323
1324 pipeline
1325 }
1326
1327 pub fn with_model_type(
1329 model_name: String,
1330 model_type: ModelType,
1331 input_features: Vec<String>,
1332 output_features: Vec<String>,
1333 ) -> MLPipeline {
1334 let mut pipeline = MLPipeline::new("prediction".to_string(), PipelineConfig::default());
1335
1336 let predictor = ModelPredictor::new(
1338 model_name,
1339 model_type,
1340 input_features,
1341 output_features,
1342 vec![], );
1344 pipeline
1345 .add_node(Box::new(predictor))
1346 .expect("Operation failed");
1347
1348 pipeline
1349 }
1350
1351 pub fn create_sample_batch(featurenames: &[String], size: usize) -> DataBatch {
1353 let mut batch = DataBatch::new();
1354
1355 for i in 0..size {
1356 let mut features = HashMap::new();
1357 for (j, feature_name) in featurenames.iter().enumerate() {
1358 let value = (i * 10 + j) as f64 / 100.0; features.insert(feature_name.clone(), FeatureValue::Float64(value));
1360 }
1361
1362 let sample = DataSample {
1363 id: format!("{i}"),
1364 features,
1365 target: Some(FeatureValue::Float64((i as f64) % 2.0)), timestamp: SystemTime::now(),
1367 metadata: HashMap::new(),
1368 };
1369
1370 batch.add_sample(sample);
1371 }
1372
1373 batch
1374 }
1375
1376 pub fn calculate_feature_statistics(
1378 batch: &DataBatch,
1379 feature_name: &str,
1380 ) -> Option<(f64, f64, f64, f64)> {
1381 let mut values = Vec::new();
1382
1383 for sample in &batch.samples {
1384 if let Some(value) = sample.features.get(feature_name) {
1385 if let Some(numeric_value) = value.as_f64() {
1386 values.push(numeric_value);
1387 }
1388 }
1389 }
1390
1391 if values.is_empty() {
1392 return None;
1393 }
1394
1395 values.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
1396
1397 let mean = values.iter().sum::<f64>() / values.len() as f64;
1398 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
1399 let std_dev = variance.sqrt();
1400 let min = values[0];
1401 let max = values[values.len() - 1];
1402
1403 Some((mean, std_dev, min, max))
1404 }
1405}
1406
1407#[cfg(test)]
1408mod tests {
1409 use super::*;
1410
1411 #[test]
1412 fn test_feature_value_conversions() {
1413 let float_val = FeatureValue::Float64(3.15);
1414 assert_eq!(float_val.as_f64(), Some(3.15));
1415 assert_eq!(float_val.as_string(), "3.15");
1416
1417 let int_val = FeatureValue::Int32(42);
1418 assert_eq!(int_val.as_f64(), Some(42.0));
1419 assert_eq!(int_val.as_string(), "42");
1420
1421 let null_val = FeatureValue::Null;
1422 assert!(null_val.is_null());
1423 assert_eq!(null_val.as_f64(), None);
1424 }
1425
1426 #[test]
1427 fn test_data_batch_operations() {
1428 let mut batch = DataBatch::new();
1429 assert!(batch.is_empty());
1430
1431 let sample = DataSample {
1432 id: "test1".to_string(),
1433 features: {
1434 let mut features = HashMap::new();
1435 features.insert("feature1".to_string(), FeatureValue::Float64(1.0));
1436 features.insert("feature2".to_string(), FeatureValue::Float64(2.0));
1437 features
1438 },
1439 target: Some(FeatureValue::Float64(1.0)),
1440 timestamp: SystemTime::now(),
1441 metadata: HashMap::new(),
1442 };
1443
1444 batch.add_sample(sample);
1445 assert_eq!(batch.size(), 1);
1446 assert!(!batch.is_empty());
1447
1448 let featurenames = vec!["feature1".to_string(), "feature2".to_string()];
1450 let matrix = batch
1451 .extract_featurematrix(&featurenames)
1452 .expect("Operation failed");
1453 assert_eq!(matrix.len(), 1);
1454 assert_eq!(matrix[0], vec![1.0, 2.0]);
1455 }
1456
1457 #[test]
1458 fn test_feature_transformer_creation() {
1459 let transformer = FeatureTransformer::new(
1460 "test_scaler".to_string(),
1461 TransformType::StandardScaler,
1462 vec!["feature1".to_string()],
1463 vec!["feature1_scaled".to_string()],
1464 );
1465
1466 assert_eq!(transformer.name(), "test_scaler");
1467 assert!(transformer.validate().is_ok());
1468 }
1469
1470 #[test]
1471 fn test_model_predictor_creation() {
1472 let predictor = ModelPredictor::new(
1473 "test_model".to_string(),
1474 ModelType::LinearRegression,
1475 vec!["feature1".to_string(), "feature2".to_string()],
1476 vec!["prediction".to_string()],
1477 vec![1, 2, 3, 4], );
1479
1480 assert_eq!(predictor.name(), "test_model");
1481 assert!(predictor.validate().is_ok());
1482 }
1483
1484 #[test]
1485 fn test_pipeline_creation_and_validation() {
1486 let mut pipeline = MLPipeline::new("test_pipeline".to_string(), PipelineConfig::default());
1487
1488 let transformer = FeatureTransformer::new(
1489 "scaler".to_string(),
1490 TransformType::StandardScaler,
1491 vec!["feature1".to_string()],
1492 vec!["feature1_scaled".to_string()],
1493 );
1494
1495 pipeline
1496 .add_node(Box::new(transformer))
1497 .expect("Operation failed");
1498 assert!(pipeline.validate().is_ok());
1499 }
1500
1501 #[test]
1502 fn test_pipeline_execution_order() {
1503 let mut pipeline = MLPipeline::new("test_pipeline".to_string(), PipelineConfig::default());
1504
1505 let node1 = FeatureTransformer::new(
1507 "node1".to_string(),
1508 TransformType::StandardScaler,
1509 vec!["feature1".to_string()],
1510 vec!["feature1_scaled".to_string()],
1511 );
1512 let node2 = FeatureTransformer::new(
1513 "node2".to_string(),
1514 TransformType::MinMaxScaler,
1515 vec!["feature1_scaled".to_string()],
1516 vec!["feature1_normalized".to_string()],
1517 );
1518
1519 pipeline
1520 .add_node(Box::new(node1))
1521 .expect("Operation failed");
1522 pipeline
1523 .add_node(Box::new(node2))
1524 .expect("Operation failed");
1525
1526 pipeline.set_dependencies("node2".to_string(), vec!["node1".to_string()]);
1528
1529 let execution_order = pipeline.get_execution_order().expect("Operation failed");
1530 assert_eq!(execution_order, vec!["node1", "node2"]);
1531 }
1532
1533 #[test]
1534 fn test_utils_sample_batch_creation() {
1535 let featurenames = vec!["feature1".to_string(), "feature2".to_string()];
1536 let batch = utils::create_sample_batch(&featurenames, 10);
1537
1538 assert_eq!(batch.size(), 10);
1539 assert!(!batch.is_empty());
1540
1541 for sample in &batch.samples {
1543 assert!(sample.features.contains_key("feature1"));
1544 assert!(sample.features.contains_key("feature2"));
1545 assert!(sample.target.is_some());
1546 }
1547 }
1548
1549 #[test]
1550 fn test_feature_statistics() {
1551 let featurenames = vec!["feature1".to_string()];
1552 let batch = utils::create_sample_batch(&featurenames, 100);
1553
1554 let stats =
1555 utils::calculate_feature_statistics(&batch, "feature1").expect("Operation failed");
1556 let (mean, std_dev, min, max) = stats;
1557
1558 assert!(mean >= 0.0);
1559 assert!(std_dev >= 0.0);
1560 assert!(min <= max);
1561 }
1562
1563 #[test]
1564 fn test_pipeline_config_default() {
1565 let config = PipelineConfig::default();
1566 assert_eq!(config.max_batchsize, 1000);
1567 assert_eq!(config.node_timeout, Duration::from_secs(30));
1568 assert!(config.parallel_processing);
1569 assert!(matches!(config.error_strategy, ErrorStrategy::FailFast));
1570 }
1571}