1use anyhow::{anyhow, Result};
30use scirs2_core::ndarray_ext::Array2;
31use serde::{Deserialize, Serialize};
32use std::collections::{HashMap, VecDeque};
33use std::sync::Arc;
34use tokio::sync::RwLock;
35use tracing::{debug, info};
36
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39pub enum FeatureValue {
40 Numeric(f64),
42 Categorical(String),
44 Boolean(bool),
46 NumericArray(Vec<f64>),
48 Missing,
50}
51
52impl FeatureValue {
53 pub fn as_numeric(&self) -> f64 {
55 match self {
56 FeatureValue::Numeric(v) => *v,
57 FeatureValue::Boolean(b) => {
58 if *b {
59 1.0
60 } else {
61 0.0
62 }
63 }
64 _ => f64::NAN,
65 }
66 }
67
68 pub fn is_missing(&self) -> bool {
70 match self {
71 FeatureValue::Missing => true,
72 FeatureValue::Numeric(v) => v.is_nan(),
73 _ => false,
74 }
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct Feature {
81 pub name: String,
83 pub value: FeatureValue,
85 pub importance: Option<f64>,
87 pub metadata: HashMap<String, String>,
89}
90
91impl Feature {
92 pub fn numeric(name: impl Into<String>, value: f64) -> Self {
94 Self {
95 name: name.into(),
96 value: FeatureValue::Numeric(value),
97 importance: None,
98 metadata: HashMap::new(),
99 }
100 }
101
102 pub fn categorical(name: impl Into<String>, value: impl Into<String>) -> Self {
104 Self {
105 name: name.into(),
106 value: FeatureValue::Categorical(value.into()),
107 importance: None,
108 metadata: HashMap::new(),
109 }
110 }
111
112 pub fn boolean(name: impl Into<String>, value: bool) -> Self {
114 Self {
115 name: name.into(),
116 value: FeatureValue::Boolean(value),
117 importance: None,
118 metadata: HashMap::new(),
119 }
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct FeatureSet {
126 pub features: Vec<Feature>,
128 pub timestamp: chrono::DateTime<chrono::Utc>,
130 pub version: String,
132}
133
134impl FeatureSet {
135 pub fn new() -> Self {
137 Self {
138 features: Vec::new(),
139 timestamp: chrono::Utc::now(),
140 version: "1.0".to_string(),
141 }
142 }
143
144 pub fn add_feature(&mut self, feature: Feature) {
146 self.features.push(feature);
147 }
148
149 pub fn get_feature(&self, name: &str) -> Option<&Feature> {
151 self.features.iter().find(|f| f.name == name)
152 }
153
154 pub fn to_numeric_array(&self) -> Vec<f64> {
156 self.features.iter().map(|f| f.value.as_numeric()).collect()
157 }
158
159 pub fn feature_names(&self) -> Vec<String> {
161 self.features.iter().map(|f| f.name.clone()).collect()
162 }
163}
164
165impl Default for FeatureSet {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub enum FeatureTransform {
174 StandardScaler,
176 MinMaxScaler { min: f64, max: f64 },
178 RobustScaler,
180 LogTransform { offset: f64 },
182 PowerTransform { lambda: f64 },
184 RollingMean { window: usize },
186 RollingStd { window: usize },
188 RollingSum { window: usize },
190 EWMA { alpha: f64 },
192 LagFeatures { lags: Vec<usize> },
194 RateOfChange { period: usize },
196 Binning { bins: Vec<f64> },
198 OneHotEncoder { columns: Vec<String> },
200 LabelEncoder { columns: Vec<String> },
202 TargetEncoder { column: String },
204 PolynomialFeatures { degree: usize },
206 InteractionFeatures { pairs: Vec<(String, String)> },
208 Imputation { strategy: ImputationStrategy },
210 FeatureSelection { top_k: usize },
212 PCA { n_components: usize },
214 Custom { name: String },
216}
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
220pub enum ImputationStrategy {
221 Mean,
223 Median,
225 Mode,
227 Constant,
229 ForwardFill,
231 BackwardFill,
233 Interpolate,
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct FeatureExtractionConfig {
240 pub extract_time_features: bool,
242 pub extract_statistical_features: bool,
244 pub rolling_window: usize,
246 pub auto_generate: bool,
248 pub max_features: usize,
250}
251
252impl Default for FeatureExtractionConfig {
253 fn default() -> Self {
254 Self {
255 extract_time_features: true,
256 extract_statistical_features: true,
257 rolling_window: 10,
258 auto_generate: false,
259 max_features: 100,
260 }
261 }
262}
263
264pub struct FeaturePipeline {
266 transforms: Vec<FeatureTransform>,
268 config: FeatureExtractionConfig,
270 history: Arc<RwLock<HashMap<String, VecDeque<f64>>>>,
272 fitted_params: Arc<RwLock<FittedParameters>>,
274 stats: Arc<RwLock<PipelineStats>>,
276}
277
278#[derive(Debug, Clone, Default)]
280struct FittedParameters {
281 means: HashMap<String, f64>,
283 stds: HashMap<String, f64>,
285 mins: HashMap<String, f64>,
287 maxs: HashMap<String, f64>,
289 medians: HashMap<String, f64>,
291 iqrs: HashMap<String, f64>,
293 label_encodings: HashMap<String, HashMap<String, usize>>,
295 pca_components: Option<Array2<f64>>,
297 feature_importances: HashMap<String, f64>,
299}
300
301#[derive(Debug, Clone, Default, Serialize, Deserialize)]
303pub struct PipelineStats {
304 pub total_features_processed: u64,
306 pub total_transformations: u64,
308 pub avg_transform_time_ms: f64,
310 pub features_generated: usize,
312 pub features_selected: usize,
314}
315
316impl FeaturePipeline {
317 pub fn new() -> Self {
319 Self {
320 transforms: Vec::new(),
321 config: FeatureExtractionConfig::default(),
322 history: Arc::new(RwLock::new(HashMap::new())),
323 fitted_params: Arc::new(RwLock::new(FittedParameters::default())),
324 stats: Arc::new(RwLock::new(PipelineStats::default())),
325 }
326 }
327
328 pub fn with_config(config: FeatureExtractionConfig) -> Self {
330 Self {
331 transforms: Vec::new(),
332 config,
333 history: Arc::new(RwLock::new(HashMap::new())),
334 fitted_params: Arc::new(RwLock::new(FittedParameters::default())),
335 stats: Arc::new(RwLock::new(PipelineStats::default())),
336 }
337 }
338
339 pub fn add_transform(&mut self, transform: FeatureTransform) -> &mut Self {
341 self.transforms.push(transform);
342 self
343 }
344
345 pub async fn fit(&mut self, data: &[FeatureSet]) -> Result<()> {
347 info!("Fitting feature pipeline on {} samples", data.len());
348
349 if data.is_empty() {
350 return Err(anyhow!("Cannot fit on empty data"));
351 }
352
353 let mut params = self.fitted_params.write().await;
354
355 let mut feature_values: HashMap<String, Vec<f64>> = HashMap::new();
357
358 for feature_set in data {
359 for feature in &feature_set.features {
360 if let FeatureValue::Numeric(value) = feature.value {
361 if !value.is_nan() {
362 feature_values
363 .entry(feature.name.clone())
364 .or_default()
365 .push(value);
366 }
367 }
368 }
369 }
370
371 for (name, values) in &feature_values {
373 if values.is_empty() {
374 continue;
375 }
376
377 let mean = values.iter().sum::<f64>() / values.len() as f64;
379 let variance =
380 values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
381 let std = variance.sqrt();
382
383 let mut sorted_values = values.clone();
384 sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
385
386 let min = sorted_values.first().copied().unwrap_or(0.0);
387 let max = sorted_values.last().copied().unwrap_or(1.0);
388 let median = sorted_values[sorted_values.len() / 2];
389
390 let q1_idx = sorted_values.len() / 4;
392 let q3_idx = 3 * sorted_values.len() / 4;
393 let q1 = sorted_values[q1_idx];
394 let q3 = sorted_values[q3_idx];
395 let iqr = q3 - q1;
396
397 params.means.insert(name.clone(), mean);
398 params.stds.insert(name.clone(), std.max(1e-10));
399 params.mins.insert(name.clone(), min);
400 params.maxs.insert(name.clone(), max);
401 params.medians.insert(name.clone(), median);
402 params.iqrs.insert(name.clone(), iqr.max(1e-10));
403 }
404
405 for transform in &self.transforms {
407 if let FeatureTransform::LabelEncoder { columns } = transform {
408 for column in columns {
409 let mut unique_values = std::collections::HashSet::new();
410 for feature_set in data {
411 if let Some(feature) = feature_set.get_feature(column) {
412 if let FeatureValue::Categorical(value) = &feature.value {
413 unique_values.insert(value.clone());
414 }
415 }
416 }
417
418 let encoding: HashMap<String, usize> = unique_values
419 .iter()
420 .enumerate()
421 .map(|(i, v)| (v.clone(), i))
422 .collect();
423
424 params.label_encodings.insert(column.clone(), encoding);
425 }
426 }
427 }
428
429 info!("Pipeline fitted successfully");
430 Ok(())
431 }
432
433 pub async fn transform(&self, input: &FeatureSet) -> Result<FeatureSet> {
435 let start = std::time::Instant::now();
436 let mut output = input.clone();
437
438 let params = self.fitted_params.read().await;
439 let mut history = self.history.write().await;
440
441 for transform in &self.transforms {
443 output = self
444 .apply_transform(&output, transform, ¶ms, &mut history)
445 .await?;
446 }
447
448 let mut stats = self.stats.write().await;
450 stats.total_features_processed += output.features.len() as u64;
451 stats.total_transformations += 1;
452 let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
453 stats.avg_transform_time_ms =
454 (stats.avg_transform_time_ms * (stats.total_transformations - 1) as f64 + elapsed_ms)
455 / stats.total_transformations as f64;
456 stats.features_generated = output.features.len();
457
458 Ok(output)
459 }
460
461 async fn apply_transform(
463 &self,
464 input: &FeatureSet,
465 transform: &FeatureTransform,
466 params: &FittedParameters,
467 history: &mut HashMap<String, VecDeque<f64>>,
468 ) -> Result<FeatureSet> {
469 let mut output = input.clone();
470
471 match transform {
472 FeatureTransform::StandardScaler => {
473 for feature in &mut output.features {
474 if let FeatureValue::Numeric(value) = &mut feature.value {
475 if let (Some(&mean), Some(&std)) = (
476 params.means.get(&feature.name),
477 params.stds.get(&feature.name),
478 ) {
479 *value = (*value - mean) / std;
480 }
481 }
482 }
483 }
484 FeatureTransform::MinMaxScaler { .. } => {
485 for feature in &mut output.features {
486 if let FeatureValue::Numeric(value) = &mut feature.value {
487 if let (Some(&min), Some(&max)) = (
488 params.mins.get(&feature.name),
489 params.maxs.get(&feature.name),
490 ) {
491 *value = (*value - min) / (max - min).max(1e-10);
492 }
493 }
494 }
495 }
496 FeatureTransform::RobustScaler => {
497 for feature in &mut output.features {
498 if let FeatureValue::Numeric(value) = &mut feature.value {
499 if let (Some(&median), Some(&iqr)) = (
500 params.medians.get(&feature.name),
501 params.iqrs.get(&feature.name),
502 ) {
503 *value = (*value - median) / iqr;
504 }
505 }
506 }
507 }
508 FeatureTransform::LogTransform { offset } => {
509 for feature in &mut output.features {
510 if let FeatureValue::Numeric(value) = &mut feature.value {
511 *value = (*value + offset).ln();
512 }
513 }
514 }
515 FeatureTransform::PowerTransform { lambda } => {
516 for feature in &mut output.features {
517 if let FeatureValue::Numeric(value) = &mut feature.value {
518 *value = if *lambda == 0.0 {
519 value.ln()
520 } else {
521 (value.powf(*lambda) - 1.0) / lambda
522 };
523 }
524 }
525 }
526 FeatureTransform::RollingMean { window } => {
527 self.apply_rolling_stat(input, &mut output, *window, history, |values| {
528 values.iter().sum::<f64>() / values.len() as f64
529 })?;
530 }
531 FeatureTransform::RollingStd { window } => {
532 self.apply_rolling_stat(input, &mut output, *window, history, |values| {
533 let mean = values.iter().sum::<f64>() / values.len() as f64;
534 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>()
535 / values.len() as f64;
536 variance.sqrt()
537 })?;
538 }
539 FeatureTransform::RollingSum { window } => {
540 self.apply_rolling_stat(input, &mut output, *window, history, |values| {
541 values.iter().sum()
542 })?;
543 }
544 FeatureTransform::EWMA { alpha } => {
545 for feature in &mut output.features {
546 if let FeatureValue::Numeric(value) = &mut feature.value {
547 let hist = history.entry(feature.name.clone()).or_default();
548 let ewma = if hist.is_empty() {
549 *value
550 } else {
551 alpha * (*value) + (1.0 - alpha) * hist.back().copied().unwrap_or(0.0)
552 };
553 hist.push_back(ewma);
554 *value = ewma;
555 }
556 }
557 }
558 FeatureTransform::LagFeatures { lags } => {
559 let mut new_features = Vec::new();
560 for feature in &input.features {
561 if let FeatureValue::Numeric(value) = feature.value {
562 let hist = history.entry(feature.name.clone()).or_default();
563
564 for &lag in lags {
565 if lag > 0 && lag <= hist.len() {
566 let lag_value = hist[hist.len() - lag];
567 new_features.push(Feature::numeric(
568 format!("{}_lag_{}", feature.name, lag),
569 lag_value,
570 ));
571 }
572 }
573
574 hist.push_back(value);
575 if hist.len() > lags.iter().max().copied().unwrap_or(10) {
576 hist.pop_front();
577 }
578 }
579 }
580 output.features.extend(new_features);
581 }
582 FeatureTransform::RateOfChange { period } => {
583 for feature in &mut output.features {
584 if let FeatureValue::Numeric(value) = &mut feature.value {
585 let hist = history.entry(feature.name.clone()).or_default();
586
587 if hist.len() >= *period {
588 let old_value = hist[hist.len() - period];
589 *value = (*value - old_value) / old_value.max(1e-10);
590 }
591
592 hist.push_back(*value);
593 if hist.len() > period + 1 {
594 hist.pop_front();
595 }
596 }
597 }
598 }
599 FeatureTransform::Binning { bins } => {
600 for feature in &mut output.features {
601 if let FeatureValue::Numeric(value) = &mut feature.value {
602 let bin_idx = bins.iter().position(|&b| *value < b).unwrap_or(bins.len());
603 *value = bin_idx as f64;
604 }
605 }
606 }
607 FeatureTransform::OneHotEncoder { columns } => {
608 let mut new_features = Vec::new();
609 for feature in &input.features {
610 if columns.contains(&feature.name) {
611 if let FeatureValue::Categorical(cat_value) = &feature.value {
612 new_features.push(Feature::numeric(
614 format!("{}_{}", feature.name, cat_value),
615 1.0,
616 ));
617 }
618 }
619 }
620 output.features.extend(new_features);
621 }
622 FeatureTransform::LabelEncoder { columns } => {
623 for feature in &mut output.features {
624 if columns.contains(&feature.name) {
625 if let FeatureValue::Categorical(cat_value) = &feature.value {
626 if let Some(encoding_map) = params.label_encodings.get(&feature.name) {
627 if let Some(&encoded) = encoding_map.get(cat_value) {
628 feature.value = FeatureValue::Numeric(encoded as f64);
629 }
630 }
631 }
632 }
633 }
634 }
635 FeatureTransform::PolynomialFeatures { degree } => {
636 let numeric_features: Vec<_> = input
637 .features
638 .iter()
639 .filter(|f| matches!(f.value, FeatureValue::Numeric(_)))
640 .collect();
641
642 let mut new_features = Vec::new();
643 for d in 2..=*degree {
644 for feature in &numeric_features {
645 if let FeatureValue::Numeric(value) = feature.value {
646 new_features.push(Feature::numeric(
647 format!("{}_pow{}", feature.name, d),
648 value.powi(d as i32),
649 ));
650 }
651 }
652 }
653 output.features.extend(new_features);
654 }
655 FeatureTransform::InteractionFeatures { pairs } => {
656 let mut new_features = Vec::new();
657 for (name1, name2) in pairs {
658 if let (Some(f1), Some(f2)) =
659 (input.get_feature(name1), input.get_feature(name2))
660 {
661 if let (FeatureValue::Numeric(v1), FeatureValue::Numeric(v2)) =
662 (&f1.value, &f2.value)
663 {
664 new_features.push(Feature::numeric(
665 format!("{}_{}_interaction", name1, name2),
666 v1 * v2,
667 ));
668 }
669 }
670 }
671 output.features.extend(new_features);
672 }
673 FeatureTransform::Imputation { strategy } => {
674 for feature in &mut output.features {
675 if feature.value.is_missing() {
676 let imputed_value = match strategy {
677 ImputationStrategy::Mean => params.means.get(&feature.name).copied(),
678 ImputationStrategy::Median => {
679 params.medians.get(&feature.name).copied()
680 }
681 ImputationStrategy::Constant => Some(0.0),
682 _ => None,
683 };
684
685 if let Some(value) = imputed_value {
686 feature.value = FeatureValue::Numeric(value);
687 }
688 }
689 }
690 }
691 _ => {
692 debug!("Transform {:?} not yet implemented", transform);
693 }
694 }
695
696 Ok(output)
697 }
698
699 fn apply_rolling_stat<F>(
701 &self,
702 _input: &FeatureSet,
703 output: &mut FeatureSet,
704 window: usize,
705 history: &mut HashMap<String, VecDeque<f64>>,
706 stat_fn: F,
707 ) -> Result<()>
708 where
709 F: Fn(&VecDeque<f64>) -> f64,
710 {
711 for feature in &mut output.features {
712 if let FeatureValue::Numeric(value) = &mut feature.value {
713 let hist = history.entry(feature.name.clone()).or_default();
714 hist.push_back(*value);
715
716 if hist.len() > window {
717 hist.pop_front();
718 }
719
720 if hist.len() >= window {
721 *value = stat_fn(hist);
722 }
723 }
724 }
725
726 Ok(())
727 }
728
729 pub async fn extract_features(
731 &self,
732 event_data: &HashMap<String, serde_json::Value>,
733 ) -> Result<FeatureSet> {
734 let mut feature_set = FeatureSet::new();
735
736 for (key, value) in event_data {
738 let feature = match value {
739 serde_json::Value::Number(n) => {
740 if let Some(f) = n.as_f64() {
741 Feature::numeric(key, f)
742 } else {
743 continue;
744 }
745 }
746 serde_json::Value::String(s) => Feature::categorical(key, s.clone()),
747 serde_json::Value::Bool(b) => Feature::boolean(key, *b),
748 _ => continue,
749 };
750
751 feature_set.add_feature(feature);
752 }
753
754 if self.config.extract_time_features {
756 let now = chrono::Utc::now();
757 feature_set.add_feature(Feature::numeric(
758 "hour_of_day",
759 now.format("%H").to_string().parse::<f64>().unwrap_or(0.0),
760 ));
761 feature_set.add_feature(Feature::numeric(
762 "day_of_week",
763 now.format("%u").to_string().parse::<f64>().unwrap_or(0.0),
764 ));
765 feature_set.add_feature(Feature::numeric(
766 "day_of_month",
767 now.format("%d").to_string().parse::<f64>().unwrap_or(0.0),
768 ));
769 feature_set.add_feature(Feature::numeric(
770 "month",
771 now.format("%m").to_string().parse::<f64>().unwrap_or(0.0),
772 ));
773 }
774
775 Ok(feature_set)
776 }
777
778 pub async fn get_stats(&self) -> PipelineStats {
780 self.stats.read().await.clone()
781 }
782
783 pub async fn clear_history(&mut self) {
785 self.history.write().await.clear();
786 }
787
788 pub fn transform_count(&self) -> usize {
790 self.transforms.len()
791 }
792}
793
794impl Default for FeaturePipeline {
795 fn default() -> Self {
796 Self::new()
797 }
798}
799
800pub struct FeatureStore {
802 features: Arc<RwLock<HashMap<String, FeatureSet>>>,
804 metadata: Arc<RwLock<HashMap<String, FeatureMetadata>>>,
806}
807
808#[derive(Debug, Clone, Serialize, Deserialize)]
810pub struct FeatureMetadata {
811 pub id: String,
813 pub description: String,
815 pub created_at: chrono::DateTime<chrono::Utc>,
817 pub updated_at: chrono::DateTime<chrono::Utc>,
819 pub version: String,
821 pub tags: Vec<String>,
823}
824
825impl FeatureStore {
826 pub fn new() -> Self {
828 Self {
829 features: Arc::new(RwLock::new(HashMap::new())),
830 metadata: Arc::new(RwLock::new(HashMap::new())),
831 }
832 }
833
834 pub async fn store(
836 &self,
837 id: impl Into<String>,
838 features: FeatureSet,
839 metadata: FeatureMetadata,
840 ) -> Result<()> {
841 let id = id.into();
842 self.features.write().await.insert(id.clone(), features);
843 self.metadata.write().await.insert(id, metadata);
844 Ok(())
845 }
846
847 pub async fn retrieve(&self, id: &str) -> Option<FeatureSet> {
849 self.features.read().await.get(id).cloned()
850 }
851
852 pub async fn list_ids(&self) -> Vec<String> {
854 self.features.read().await.keys().cloned().collect()
855 }
856
857 pub async fn get_metadata(&self, id: &str) -> Option<FeatureMetadata> {
859 self.metadata.read().await.get(id).cloned()
860 }
861
862 pub async fn delete(&self, id: &str) -> Result<()> {
864 self.features.write().await.remove(id);
865 self.metadata.write().await.remove(id);
866 Ok(())
867 }
868}
869
870impl Default for FeatureStore {
871 fn default() -> Self {
872 Self::new()
873 }
874}
875
876#[cfg(test)]
877mod tests {
878 use super::*;
879
880 #[test]
881 fn test_feature_value_conversions() {
882 assert_eq!(FeatureValue::Numeric(2.5).as_numeric(), 2.5);
883 assert_eq!(FeatureValue::Boolean(true).as_numeric(), 1.0);
884 assert_eq!(FeatureValue::Boolean(false).as_numeric(), 0.0);
885 assert!(FeatureValue::Categorical("test".into())
886 .as_numeric()
887 .is_nan());
888 assert!(FeatureValue::Missing.is_missing());
889 }
890
891 #[test]
892 fn test_feature_creation() {
893 let num_feature = Feature::numeric("value", 42.0);
894 assert_eq!(num_feature.name, "value");
895 assert_eq!(num_feature.value.as_numeric(), 42.0);
896
897 let cat_feature = Feature::categorical("category", "A");
898 assert_eq!(cat_feature.name, "category");
899
900 let bool_feature = Feature::boolean("flag", true);
901 assert_eq!(bool_feature.value.as_numeric(), 1.0);
902 }
903
904 #[test]
905 fn test_feature_set() {
906 let mut feature_set = FeatureSet::new();
907 feature_set.add_feature(Feature::numeric("x", 1.0));
908 feature_set.add_feature(Feature::numeric("y", 2.0));
909 feature_set.add_feature(Feature::categorical("cat", "A"));
910
911 assert_eq!(feature_set.features.len(), 3);
912 assert!(feature_set.get_feature("x").is_some());
913 assert!(feature_set.get_feature("missing").is_none());
914
915 let numeric_array = feature_set.to_numeric_array();
916 assert_eq!(numeric_array.len(), 3);
917 assert_eq!(numeric_array[0], 1.0);
918 assert_eq!(numeric_array[1], 2.0);
919 }
920
921 #[tokio::test]
922 async fn test_pipeline_creation() {
923 let pipeline = FeaturePipeline::new();
924 assert_eq!(pipeline.transform_count(), 0);
925 }
926
927 #[tokio::test]
928 async fn test_add_transforms() {
929 let mut pipeline = FeaturePipeline::new();
930 pipeline
931 .add_transform(FeatureTransform::StandardScaler)
932 .add_transform(FeatureTransform::RollingMean { window: 5 });
933
934 assert_eq!(pipeline.transform_count(), 2);
935 }
936
937 #[tokio::test]
938 async fn test_standard_scaler() {
939 let mut pipeline = FeaturePipeline::new();
940 pipeline.add_transform(FeatureTransform::StandardScaler);
941
942 let mut training_data = Vec::new();
944 for i in 0..10 {
945 let mut fs = FeatureSet::new();
946 fs.add_feature(Feature::numeric("value", (i * 10) as f64));
947 training_data.push(fs);
948 }
949
950 pipeline.fit(&training_data).await.unwrap();
951
952 let mut test_fs = FeatureSet::new();
954 test_fs.add_feature(Feature::numeric("value", 50.0));
955
956 let result = pipeline.transform(&test_fs).await.unwrap();
957 let value = result.get_feature("value").unwrap().value.as_numeric();
958
959 assert!((value.abs()) < 1.0);
962 }
963
964 #[tokio::test]
965 async fn test_min_max_scaler() {
966 let mut pipeline = FeaturePipeline::new();
967 pipeline.add_transform(FeatureTransform::MinMaxScaler { min: 0.0, max: 1.0 });
968
969 let mut training_data = Vec::new();
970 for i in 0..10 {
971 let mut fs = FeatureSet::new();
972 fs.add_feature(Feature::numeric("value", (i * 10) as f64));
973 training_data.push(fs);
974 }
975
976 pipeline.fit(&training_data).await.unwrap();
977
978 let mut test_fs = FeatureSet::new();
979 test_fs.add_feature(Feature::numeric("value", 90.0)); let result = pipeline.transform(&test_fs).await.unwrap();
982 let value = result.get_feature("value").unwrap().value.as_numeric();
983
984 assert!((value - 1.0).abs() < 0.01); }
986
987 #[tokio::test]
988 async fn test_polynomial_features() {
989 let mut pipeline = FeaturePipeline::new();
990 pipeline.add_transform(FeatureTransform::PolynomialFeatures { degree: 2 });
991
992 let mut fs = FeatureSet::new();
993 fs.add_feature(Feature::numeric("x", 3.0));
994
995 let result = pipeline.transform(&fs).await.unwrap();
996
997 assert!(result.features.len() >= 2);
999 let x_pow2 = result.get_feature("x_pow2").unwrap();
1000 assert_eq!(x_pow2.value.as_numeric(), 9.0);
1001 }
1002
1003 #[tokio::test]
1004 async fn test_interaction_features() {
1005 let mut pipeline = FeaturePipeline::new();
1006 pipeline.add_transform(FeatureTransform::InteractionFeatures {
1007 pairs: vec![("x".to_string(), "y".to_string())],
1008 });
1009
1010 let mut fs = FeatureSet::new();
1011 fs.add_feature(Feature::numeric("x", 2.0));
1012 fs.add_feature(Feature::numeric("y", 3.0));
1013
1014 let result = pipeline.transform(&fs).await.unwrap();
1015
1016 let interaction = result.get_feature("x_y_interaction").unwrap();
1017 assert_eq!(interaction.value.as_numeric(), 6.0);
1018 }
1019
1020 #[tokio::test]
1021 async fn test_label_encoder() {
1022 let mut pipeline = FeaturePipeline::new();
1023 pipeline.add_transform(FeatureTransform::LabelEncoder {
1024 columns: vec!["category".to_string()],
1025 });
1026
1027 let mut training_data = Vec::new();
1028 for cat in &["A", "B", "C", "A", "B"] {
1029 let mut fs = FeatureSet::new();
1030 fs.add_feature(Feature::categorical("category", *cat));
1031 training_data.push(fs);
1032 }
1033
1034 pipeline.fit(&training_data).await.unwrap();
1035
1036 let mut test_fs = FeatureSet::new();
1037 test_fs.add_feature(Feature::categorical("category", "B"));
1038
1039 let result = pipeline.transform(&test_fs).await.unwrap();
1040 let encoded = result.get_feature("category").unwrap().value.as_numeric();
1041
1042 assert!(!encoded.is_nan());
1043 assert!(encoded >= 0.0);
1044 }
1045
1046 #[tokio::test]
1047 async fn test_feature_extraction() {
1048 let pipeline = FeaturePipeline::with_config(FeatureExtractionConfig {
1049 extract_time_features: true,
1050 ..Default::default()
1051 });
1052
1053 let mut event_data = HashMap::new();
1054 event_data.insert("temperature".to_string(), serde_json::json!(23.5));
1055 event_data.insert("humidity".to_string(), serde_json::json!(65.0));
1056 event_data.insert("location".to_string(), serde_json::json!("room_A"));
1057
1058 let features = pipeline.extract_features(&event_data).await.unwrap();
1059
1060 assert!(features.get_feature("temperature").is_some());
1061 assert!(features.get_feature("humidity").is_some());
1062 assert!(features.get_feature("location").is_some());
1063 assert!(features.get_feature("hour_of_day").is_some());
1064 }
1065
1066 #[tokio::test]
1067 async fn test_feature_store() {
1068 let store = FeatureStore::new();
1069
1070 let mut fs = FeatureSet::new();
1071 fs.add_feature(Feature::numeric("value", 42.0));
1072
1073 let metadata = FeatureMetadata {
1074 id: "test_1".to_string(),
1075 description: "Test features".to_string(),
1076 created_at: chrono::Utc::now(),
1077 updated_at: chrono::Utc::now(),
1078 version: "1.0".to_string(),
1079 tags: vec!["test".to_string()],
1080 };
1081
1082 store.store("test_1", fs.clone(), metadata).await.unwrap();
1083
1084 let retrieved = store.retrieve("test_1").await;
1085 assert!(retrieved.is_some());
1086
1087 let ids = store.list_ids().await;
1088 assert_eq!(ids.len(), 1);
1089 assert_eq!(ids[0], "test_1");
1090
1091 store.delete("test_1").await.unwrap();
1092 assert!(store.retrieve("test_1").await.is_none());
1093 }
1094
1095 #[tokio::test]
1096 async fn test_pipeline_stats() {
1097 let mut pipeline = FeaturePipeline::new();
1098 pipeline.add_transform(FeatureTransform::StandardScaler);
1099
1100 let mut fs = FeatureSet::new();
1101 fs.add_feature(Feature::numeric("value", 42.0));
1102
1103 let _ = pipeline.transform(&fs).await;
1104
1105 let stats = pipeline.get_stats().await;
1106 assert_eq!(stats.total_transformations, 1);
1107 assert!(stats.avg_transform_time_ms >= 0.0);
1108 }
1109}