oxirs_stream/
feature_engineering.rs

1//! # Feature Engineering Pipelines for Stream Processing
2//!
3//! This module provides a comprehensive feature engineering framework for real-time
4//! stream processing, enabling automatic feature extraction, transformation, and
5//! selection for machine learning workflows.
6//!
7//! ## Features
8//! - Automatic feature extraction from streaming events
9//! - Real-time feature transformations (scaling, encoding, binning)
10//! - Time-based features (rolling windows, lag features, rate of change)
11//! - Categorical encoding (one-hot, label, target encoding)
12//! - Feature selection and dimensionality reduction
13//! - Feature store integration for reusability
14//! - Pipeline composition with visual DAG representation
15//!
16//! ## Example Usage
17//! ```rust,ignore
18//! use oxirs_stream::feature_engineering::{FeaturePipeline, FeatureTransform};
19//!
20//! let mut pipeline = FeaturePipeline::new();
21//! pipeline
22//!     .add_transform(FeatureTransform::StandardScaler)
23//!     .add_transform(FeatureTransform::RollingMean { window: 10 })
24//!     .add_transform(FeatureTransform::OneHotEncoder { columns: vec!["category".into()] });
25//!
26//! let features = pipeline.transform(&event)?;
27//! ```
28
29use anyhow::{anyhow, Result};
30use scirs2_core::ndarray_ext::Array2;
31use serde::{Deserialize, Serialize};
32use std::collections::{HashMap, VecDeque};
33use std::sync::Arc;
34use tokio::sync::RwLock;
35use tracing::{debug, info};
36
37/// Feature data type
38#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39pub enum FeatureValue {
40    /// Numeric value
41    Numeric(f64),
42    /// Categorical value
43    Categorical(String),
44    /// Boolean value
45    Boolean(bool),
46    /// Array of numeric values
47    NumericArray(Vec<f64>),
48    /// Missing value
49    Missing,
50}
51
52impl FeatureValue {
53    /// Convert to numeric value (NaN for non-numeric)
54    pub fn as_numeric(&self) -> f64 {
55        match self {
56            FeatureValue::Numeric(v) => *v,
57            FeatureValue::Boolean(b) => {
58                if *b {
59                    1.0
60                } else {
61                    0.0
62                }
63            }
64            _ => f64::NAN,
65        }
66    }
67
68    /// Check if value is missing
69    pub fn is_missing(&self) -> bool {
70        match self {
71            FeatureValue::Missing => true,
72            FeatureValue::Numeric(v) => v.is_nan(),
73            _ => false,
74        }
75    }
76}
77
78/// Feature definition
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct Feature {
81    /// Feature name
82    pub name: String,
83    /// Feature value
84    pub value: FeatureValue,
85    /// Feature importance score (0-1)
86    pub importance: Option<f64>,
87    /// Feature metadata
88    pub metadata: HashMap<String, String>,
89}
90
91impl Feature {
92    /// Create a new numeric feature
93    pub fn numeric(name: impl Into<String>, value: f64) -> Self {
94        Self {
95            name: name.into(),
96            value: FeatureValue::Numeric(value),
97            importance: None,
98            metadata: HashMap::new(),
99        }
100    }
101
102    /// Create a new categorical feature
103    pub fn categorical(name: impl Into<String>, value: impl Into<String>) -> Self {
104        Self {
105            name: name.into(),
106            value: FeatureValue::Categorical(value.into()),
107            importance: None,
108            metadata: HashMap::new(),
109        }
110    }
111
112    /// Create a new boolean feature
113    pub fn boolean(name: impl Into<String>, value: bool) -> Self {
114        Self {
115            name: name.into(),
116            value: FeatureValue::Boolean(value),
117            importance: None,
118            metadata: HashMap::new(),
119        }
120    }
121}
122
123/// Feature set (collection of features)
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct FeatureSet {
126    /// Features in this set
127    pub features: Vec<Feature>,
128    /// Timestamp when features were created
129    pub timestamp: chrono::DateTime<chrono::Utc>,
130    /// Version of the feature set
131    pub version: String,
132}
133
134impl FeatureSet {
135    /// Create a new empty feature set
136    pub fn new() -> Self {
137        Self {
138            features: Vec::new(),
139            timestamp: chrono::Utc::now(),
140            version: "1.0".to_string(),
141        }
142    }
143
144    /// Add a feature to the set
145    pub fn add_feature(&mut self, feature: Feature) {
146        self.features.push(feature);
147    }
148
149    /// Get feature by name
150    pub fn get_feature(&self, name: &str) -> Option<&Feature> {
151        self.features.iter().find(|f| f.name == name)
152    }
153
154    /// Convert to numeric array (skipping non-numeric features)
155    pub fn to_numeric_array(&self) -> Vec<f64> {
156        self.features.iter().map(|f| f.value.as_numeric()).collect()
157    }
158
159    /// Get feature names
160    pub fn feature_names(&self) -> Vec<String> {
161        self.features.iter().map(|f| f.name.clone()).collect()
162    }
163}
164
165impl Default for FeatureSet {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171/// Feature transformation types
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub enum FeatureTransform {
174    /// Standard scaling (mean=0, std=1)
175    StandardScaler,
176    /// Min-max scaling (range [0, 1])
177    MinMaxScaler { min: f64, max: f64 },
178    /// Robust scaling (using median and IQR)
179    RobustScaler,
180    /// Log transformation
181    LogTransform { offset: f64 },
182    /// Power transformation (Box-Cox like)
183    PowerTransform { lambda: f64 },
184    /// Rolling mean over window
185    RollingMean { window: usize },
186    /// Rolling standard deviation
187    RollingStd { window: usize },
188    /// Rolling sum
189    RollingSum { window: usize },
190    /// Exponential weighted moving average
191    EWMA { alpha: f64 },
192    /// Lag features (previous values)
193    LagFeatures { lags: Vec<usize> },
194    /// Rate of change (derivative)
195    RateOfChange { period: usize },
196    /// Binning/discretization
197    Binning { bins: Vec<f64> },
198    /// One-hot encoding for categorical features
199    OneHotEncoder { columns: Vec<String> },
200    /// Label encoding for categorical features
201    LabelEncoder { columns: Vec<String> },
202    /// Target encoding for categorical features
203    TargetEncoder { column: String },
204    /// Polynomial features
205    PolynomialFeatures { degree: usize },
206    /// Interaction features (cross products)
207    InteractionFeatures { pairs: Vec<(String, String)> },
208    /// Missing value imputation
209    Imputation { strategy: ImputationStrategy },
210    /// Feature selection (keep top k by importance)
211    FeatureSelection { top_k: usize },
212    /// PCA dimensionality reduction
213    PCA { n_components: usize },
214    /// Custom transformation (user-defined function)
215    Custom { name: String },
216}
217
218/// Imputation strategy for missing values
219#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
220pub enum ImputationStrategy {
221    /// Fill with mean value
222    Mean,
223    /// Fill with median value
224    Median,
225    /// Fill with mode (most frequent)
226    Mode,
227    /// Fill with constant value
228    Constant,
229    /// Forward fill (carry last value)
230    ForwardFill,
231    /// Backward fill
232    BackwardFill,
233    /// Interpolate (linear)
234    Interpolate,
235}
236
237/// Feature extraction configuration
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct FeatureExtractionConfig {
240    /// Extract time-based features
241    pub extract_time_features: bool,
242    /// Extract statistical features
243    pub extract_statistical_features: bool,
244    /// Window size for rolling statistics
245    pub rolling_window: usize,
246    /// Enable automatic feature generation
247    pub auto_generate: bool,
248    /// Maximum number of features to generate
249    pub max_features: usize,
250}
251
252impl Default for FeatureExtractionConfig {
253    fn default() -> Self {
254        Self {
255            extract_time_features: true,
256            extract_statistical_features: true,
257            rolling_window: 10,
258            auto_generate: false,
259            max_features: 100,
260        }
261    }
262}
263
264/// Feature engineering pipeline
265pub struct FeaturePipeline {
266    /// Ordered list of transformations
267    transforms: Vec<FeatureTransform>,
268    /// Configuration
269    config: FeatureExtractionConfig,
270    /// Historical data buffer for time-based features
271    history: Arc<RwLock<HashMap<String, VecDeque<f64>>>>,
272    /// Fitted transformation parameters
273    fitted_params: Arc<RwLock<FittedParameters>>,
274    /// Statistics
275    stats: Arc<RwLock<PipelineStats>>,
276}
277
278/// Fitted parameters for transformations
279#[derive(Debug, Clone, Default)]
280struct FittedParameters {
281    /// Mean values for standard scaling
282    means: HashMap<String, f64>,
283    /// Standard deviations for standard scaling
284    stds: HashMap<String, f64>,
285    /// Min values for min-max scaling
286    mins: HashMap<String, f64>,
287    /// Max values for min-max scaling
288    maxs: HashMap<String, f64>,
289    /// Median values for robust scaling
290    medians: HashMap<String, f64>,
291    /// IQR values for robust scaling
292    iqrs: HashMap<String, f64>,
293    /// Label encodings
294    label_encodings: HashMap<String, HashMap<String, usize>>,
295    /// PCA components
296    pca_components: Option<Array2<f64>>,
297    /// Feature importance scores
298    feature_importances: HashMap<String, f64>,
299}
300
301/// Pipeline statistics
302#[derive(Debug, Clone, Default, Serialize, Deserialize)]
303pub struct PipelineStats {
304    /// Total features processed
305    pub total_features_processed: u64,
306    /// Total transformations applied
307    pub total_transformations: u64,
308    /// Average transformation time (ms)
309    pub avg_transform_time_ms: f64,
310    /// Number of features generated
311    pub features_generated: usize,
312    /// Number of features selected
313    pub features_selected: usize,
314}
315
316impl FeaturePipeline {
317    /// Create a new feature engineering pipeline
318    pub fn new() -> Self {
319        Self {
320            transforms: Vec::new(),
321            config: FeatureExtractionConfig::default(),
322            history: Arc::new(RwLock::new(HashMap::new())),
323            fitted_params: Arc::new(RwLock::new(FittedParameters::default())),
324            stats: Arc::new(RwLock::new(PipelineStats::default())),
325        }
326    }
327
328    /// Create a pipeline with configuration
329    pub fn with_config(config: FeatureExtractionConfig) -> Self {
330        Self {
331            transforms: Vec::new(),
332            config,
333            history: Arc::new(RwLock::new(HashMap::new())),
334            fitted_params: Arc::new(RwLock::new(FittedParameters::default())),
335            stats: Arc::new(RwLock::new(PipelineStats::default())),
336        }
337    }
338
339    /// Add a transformation to the pipeline
340    pub fn add_transform(&mut self, transform: FeatureTransform) -> &mut Self {
341        self.transforms.push(transform);
342        self
343    }
344
345    /// Fit the pipeline on training data
346    pub async fn fit(&mut self, data: &[FeatureSet]) -> Result<()> {
347        info!("Fitting feature pipeline on {} samples", data.len());
348
349        if data.is_empty() {
350            return Err(anyhow!("Cannot fit on empty data"));
351        }
352
353        let mut params = self.fitted_params.write().await;
354
355        // Collect all numeric features for fitting
356        let mut feature_values: HashMap<String, Vec<f64>> = HashMap::new();
357
358        for feature_set in data {
359            for feature in &feature_set.features {
360                if let FeatureValue::Numeric(value) = feature.value {
361                    if !value.is_nan() {
362                        feature_values
363                            .entry(feature.name.clone())
364                            .or_default()
365                            .push(value);
366                    }
367                }
368            }
369        }
370
371        // Fit transformations
372        for (name, values) in &feature_values {
373            if values.is_empty() {
374                continue;
375            }
376
377            // Compute statistics for scaling
378            let mean = values.iter().sum::<f64>() / values.len() as f64;
379            let variance =
380                values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
381            let std = variance.sqrt();
382
383            let mut sorted_values = values.clone();
384            sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
385
386            let min = sorted_values.first().copied().unwrap_or(0.0);
387            let max = sorted_values.last().copied().unwrap_or(1.0);
388            let median = sorted_values[sorted_values.len() / 2];
389
390            // Compute IQR
391            let q1_idx = sorted_values.len() / 4;
392            let q3_idx = 3 * sorted_values.len() / 4;
393            let q1 = sorted_values[q1_idx];
394            let q3 = sorted_values[q3_idx];
395            let iqr = q3 - q1;
396
397            params.means.insert(name.clone(), mean);
398            params.stds.insert(name.clone(), std.max(1e-10));
399            params.mins.insert(name.clone(), min);
400            params.maxs.insert(name.clone(), max);
401            params.medians.insert(name.clone(), median);
402            params.iqrs.insert(name.clone(), iqr.max(1e-10));
403        }
404
405        // Fit categorical encodings
406        for transform in &self.transforms {
407            if let FeatureTransform::LabelEncoder { columns } = transform {
408                for column in columns {
409                    let mut unique_values = std::collections::HashSet::new();
410                    for feature_set in data {
411                        if let Some(feature) = feature_set.get_feature(column) {
412                            if let FeatureValue::Categorical(value) = &feature.value {
413                                unique_values.insert(value.clone());
414                            }
415                        }
416                    }
417
418                    let encoding: HashMap<String, usize> = unique_values
419                        .iter()
420                        .enumerate()
421                        .map(|(i, v)| (v.clone(), i))
422                        .collect();
423
424                    params.label_encodings.insert(column.clone(), encoding);
425                }
426            }
427        }
428
429        info!("Pipeline fitted successfully");
430        Ok(())
431    }
432
433    /// Transform a feature set using the fitted pipeline
434    pub async fn transform(&self, input: &FeatureSet) -> Result<FeatureSet> {
435        let start = std::time::Instant::now();
436        let mut output = input.clone();
437
438        let params = self.fitted_params.read().await;
439        let mut history = self.history.write().await;
440
441        // Apply each transformation in order
442        for transform in &self.transforms {
443            output = self
444                .apply_transform(&output, transform, &params, &mut history)
445                .await?;
446        }
447
448        // Update statistics
449        let mut stats = self.stats.write().await;
450        stats.total_features_processed += output.features.len() as u64;
451        stats.total_transformations += 1;
452        let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
453        stats.avg_transform_time_ms =
454            (stats.avg_transform_time_ms * (stats.total_transformations - 1) as f64 + elapsed_ms)
455                / stats.total_transformations as f64;
456        stats.features_generated = output.features.len();
457
458        Ok(output)
459    }
460
461    /// Apply a single transformation
462    async fn apply_transform(
463        &self,
464        input: &FeatureSet,
465        transform: &FeatureTransform,
466        params: &FittedParameters,
467        history: &mut HashMap<String, VecDeque<f64>>,
468    ) -> Result<FeatureSet> {
469        let mut output = input.clone();
470
471        match transform {
472            FeatureTransform::StandardScaler => {
473                for feature in &mut output.features {
474                    if let FeatureValue::Numeric(value) = &mut feature.value {
475                        if let (Some(&mean), Some(&std)) = (
476                            params.means.get(&feature.name),
477                            params.stds.get(&feature.name),
478                        ) {
479                            *value = (*value - mean) / std;
480                        }
481                    }
482                }
483            }
484            FeatureTransform::MinMaxScaler { .. } => {
485                for feature in &mut output.features {
486                    if let FeatureValue::Numeric(value) = &mut feature.value {
487                        if let (Some(&min), Some(&max)) = (
488                            params.mins.get(&feature.name),
489                            params.maxs.get(&feature.name),
490                        ) {
491                            *value = (*value - min) / (max - min).max(1e-10);
492                        }
493                    }
494                }
495            }
496            FeatureTransform::RobustScaler => {
497                for feature in &mut output.features {
498                    if let FeatureValue::Numeric(value) = &mut feature.value {
499                        if let (Some(&median), Some(&iqr)) = (
500                            params.medians.get(&feature.name),
501                            params.iqrs.get(&feature.name),
502                        ) {
503                            *value = (*value - median) / iqr;
504                        }
505                    }
506                }
507            }
508            FeatureTransform::LogTransform { offset } => {
509                for feature in &mut output.features {
510                    if let FeatureValue::Numeric(value) = &mut feature.value {
511                        *value = (*value + offset).ln();
512                    }
513                }
514            }
515            FeatureTransform::PowerTransform { lambda } => {
516                for feature in &mut output.features {
517                    if let FeatureValue::Numeric(value) = &mut feature.value {
518                        *value = if *lambda == 0.0 {
519                            value.ln()
520                        } else {
521                            (value.powf(*lambda) - 1.0) / lambda
522                        };
523                    }
524                }
525            }
526            FeatureTransform::RollingMean { window } => {
527                self.apply_rolling_stat(input, &mut output, *window, history, |values| {
528                    values.iter().sum::<f64>() / values.len() as f64
529                })?;
530            }
531            FeatureTransform::RollingStd { window } => {
532                self.apply_rolling_stat(input, &mut output, *window, history, |values| {
533                    let mean = values.iter().sum::<f64>() / values.len() as f64;
534                    let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>()
535                        / values.len() as f64;
536                    variance.sqrt()
537                })?;
538            }
539            FeatureTransform::RollingSum { window } => {
540                self.apply_rolling_stat(input, &mut output, *window, history, |values| {
541                    values.iter().sum()
542                })?;
543            }
544            FeatureTransform::EWMA { alpha } => {
545                for feature in &mut output.features {
546                    if let FeatureValue::Numeric(value) = &mut feature.value {
547                        let hist = history.entry(feature.name.clone()).or_default();
548                        let ewma = if hist.is_empty() {
549                            *value
550                        } else {
551                            alpha * (*value) + (1.0 - alpha) * hist.back().copied().unwrap_or(0.0)
552                        };
553                        hist.push_back(ewma);
554                        *value = ewma;
555                    }
556                }
557            }
558            FeatureTransform::LagFeatures { lags } => {
559                let mut new_features = Vec::new();
560                for feature in &input.features {
561                    if let FeatureValue::Numeric(value) = feature.value {
562                        let hist = history.entry(feature.name.clone()).or_default();
563
564                        for &lag in lags {
565                            if lag > 0 && lag <= hist.len() {
566                                let lag_value = hist[hist.len() - lag];
567                                new_features.push(Feature::numeric(
568                                    format!("{}_lag_{}", feature.name, lag),
569                                    lag_value,
570                                ));
571                            }
572                        }
573
574                        hist.push_back(value);
575                        if hist.len() > lags.iter().max().copied().unwrap_or(10) {
576                            hist.pop_front();
577                        }
578                    }
579                }
580                output.features.extend(new_features);
581            }
582            FeatureTransform::RateOfChange { period } => {
583                for feature in &mut output.features {
584                    if let FeatureValue::Numeric(value) = &mut feature.value {
585                        let hist = history.entry(feature.name.clone()).or_default();
586
587                        if hist.len() >= *period {
588                            let old_value = hist[hist.len() - period];
589                            *value = (*value - old_value) / old_value.max(1e-10);
590                        }
591
592                        hist.push_back(*value);
593                        if hist.len() > period + 1 {
594                            hist.pop_front();
595                        }
596                    }
597                }
598            }
599            FeatureTransform::Binning { bins } => {
600                for feature in &mut output.features {
601                    if let FeatureValue::Numeric(value) = &mut feature.value {
602                        let bin_idx = bins.iter().position(|&b| *value < b).unwrap_or(bins.len());
603                        *value = bin_idx as f64;
604                    }
605                }
606            }
607            FeatureTransform::OneHotEncoder { columns } => {
608                let mut new_features = Vec::new();
609                for feature in &input.features {
610                    if columns.contains(&feature.name) {
611                        if let FeatureValue::Categorical(cat_value) = &feature.value {
612                            // Create binary features for each category
613                            new_features.push(Feature::numeric(
614                                format!("{}_{}", feature.name, cat_value),
615                                1.0,
616                            ));
617                        }
618                    }
619                }
620                output.features.extend(new_features);
621            }
622            FeatureTransform::LabelEncoder { columns } => {
623                for feature in &mut output.features {
624                    if columns.contains(&feature.name) {
625                        if let FeatureValue::Categorical(cat_value) = &feature.value {
626                            if let Some(encoding_map) = params.label_encodings.get(&feature.name) {
627                                if let Some(&encoded) = encoding_map.get(cat_value) {
628                                    feature.value = FeatureValue::Numeric(encoded as f64);
629                                }
630                            }
631                        }
632                    }
633                }
634            }
635            FeatureTransform::PolynomialFeatures { degree } => {
636                let numeric_features: Vec<_> = input
637                    .features
638                    .iter()
639                    .filter(|f| matches!(f.value, FeatureValue::Numeric(_)))
640                    .collect();
641
642                let mut new_features = Vec::new();
643                for d in 2..=*degree {
644                    for feature in &numeric_features {
645                        if let FeatureValue::Numeric(value) = feature.value {
646                            new_features.push(Feature::numeric(
647                                format!("{}_pow{}", feature.name, d),
648                                value.powi(d as i32),
649                            ));
650                        }
651                    }
652                }
653                output.features.extend(new_features);
654            }
655            FeatureTransform::InteractionFeatures { pairs } => {
656                let mut new_features = Vec::new();
657                for (name1, name2) in pairs {
658                    if let (Some(f1), Some(f2)) =
659                        (input.get_feature(name1), input.get_feature(name2))
660                    {
661                        if let (FeatureValue::Numeric(v1), FeatureValue::Numeric(v2)) =
662                            (&f1.value, &f2.value)
663                        {
664                            new_features.push(Feature::numeric(
665                                format!("{}_{}_interaction", name1, name2),
666                                v1 * v2,
667                            ));
668                        }
669                    }
670                }
671                output.features.extend(new_features);
672            }
673            FeatureTransform::Imputation { strategy } => {
674                for feature in &mut output.features {
675                    if feature.value.is_missing() {
676                        let imputed_value = match strategy {
677                            ImputationStrategy::Mean => params.means.get(&feature.name).copied(),
678                            ImputationStrategy::Median => {
679                                params.medians.get(&feature.name).copied()
680                            }
681                            ImputationStrategy::Constant => Some(0.0),
682                            _ => None,
683                        };
684
685                        if let Some(value) = imputed_value {
686                            feature.value = FeatureValue::Numeric(value);
687                        }
688                    }
689                }
690            }
691            _ => {
692                debug!("Transform {:?} not yet implemented", transform);
693            }
694        }
695
696        Ok(output)
697    }
698
699    /// Apply rolling statistics
700    fn apply_rolling_stat<F>(
701        &self,
702        _input: &FeatureSet,
703        output: &mut FeatureSet,
704        window: usize,
705        history: &mut HashMap<String, VecDeque<f64>>,
706        stat_fn: F,
707    ) -> Result<()>
708    where
709        F: Fn(&VecDeque<f64>) -> f64,
710    {
711        for feature in &mut output.features {
712            if let FeatureValue::Numeric(value) = &mut feature.value {
713                let hist = history.entry(feature.name.clone()).or_default();
714                hist.push_back(*value);
715
716                if hist.len() > window {
717                    hist.pop_front();
718                }
719
720                if hist.len() >= window {
721                    *value = stat_fn(hist);
722                }
723            }
724        }
725
726        Ok(())
727    }
728
729    /// Extract features from raw event data
730    pub async fn extract_features(
731        &self,
732        event_data: &HashMap<String, serde_json::Value>,
733    ) -> Result<FeatureSet> {
734        let mut feature_set = FeatureSet::new();
735
736        // Extract basic features from event data
737        for (key, value) in event_data {
738            let feature = match value {
739                serde_json::Value::Number(n) => {
740                    if let Some(f) = n.as_f64() {
741                        Feature::numeric(key, f)
742                    } else {
743                        continue;
744                    }
745                }
746                serde_json::Value::String(s) => Feature::categorical(key, s.clone()),
747                serde_json::Value::Bool(b) => Feature::boolean(key, *b),
748                _ => continue,
749            };
750
751            feature_set.add_feature(feature);
752        }
753
754        // Extract time-based features if enabled
755        if self.config.extract_time_features {
756            let now = chrono::Utc::now();
757            feature_set.add_feature(Feature::numeric(
758                "hour_of_day",
759                now.format("%H").to_string().parse::<f64>().unwrap_or(0.0),
760            ));
761            feature_set.add_feature(Feature::numeric(
762                "day_of_week",
763                now.format("%u").to_string().parse::<f64>().unwrap_or(0.0),
764            ));
765            feature_set.add_feature(Feature::numeric(
766                "day_of_month",
767                now.format("%d").to_string().parse::<f64>().unwrap_or(0.0),
768            ));
769            feature_set.add_feature(Feature::numeric(
770                "month",
771                now.format("%m").to_string().parse::<f64>().unwrap_or(0.0),
772            ));
773        }
774
775        Ok(feature_set)
776    }
777
778    /// Get pipeline statistics
779    pub async fn get_stats(&self) -> PipelineStats {
780        self.stats.read().await.clone()
781    }
782
783    /// Clear pipeline history
784    pub async fn clear_history(&mut self) {
785        self.history.write().await.clear();
786    }
787
788    /// Get number of transformations in pipeline
789    pub fn transform_count(&self) -> usize {
790        self.transforms.len()
791    }
792}
793
794impl Default for FeaturePipeline {
795    fn default() -> Self {
796        Self::new()
797    }
798}
799
800/// Feature store for reusable features
801pub struct FeatureStore {
802    /// Stored feature sets indexed by ID
803    features: Arc<RwLock<HashMap<String, FeatureSet>>>,
804    /// Feature metadata
805    metadata: Arc<RwLock<HashMap<String, FeatureMetadata>>>,
806}
807
808/// Feature metadata
809#[derive(Debug, Clone, Serialize, Deserialize)]
810pub struct FeatureMetadata {
811    /// Feature set ID
812    pub id: String,
813    /// Description
814    pub description: String,
815    /// Creation timestamp
816    pub created_at: chrono::DateTime<chrono::Utc>,
817    /// Last updated timestamp
818    pub updated_at: chrono::DateTime<chrono::Utc>,
819    /// Version
820    pub version: String,
821    /// Tags for organization
822    pub tags: Vec<String>,
823}
824
825impl FeatureStore {
826    /// Create a new feature store
827    pub fn new() -> Self {
828        Self {
829            features: Arc::new(RwLock::new(HashMap::new())),
830            metadata: Arc::new(RwLock::new(HashMap::new())),
831        }
832    }
833
834    /// Store a feature set
835    pub async fn store(
836        &self,
837        id: impl Into<String>,
838        features: FeatureSet,
839        metadata: FeatureMetadata,
840    ) -> Result<()> {
841        let id = id.into();
842        self.features.write().await.insert(id.clone(), features);
843        self.metadata.write().await.insert(id, metadata);
844        Ok(())
845    }
846
847    /// Retrieve a feature set
848    pub async fn retrieve(&self, id: &str) -> Option<FeatureSet> {
849        self.features.read().await.get(id).cloned()
850    }
851
852    /// List all feature set IDs
853    pub async fn list_ids(&self) -> Vec<String> {
854        self.features.read().await.keys().cloned().collect()
855    }
856
857    /// Get feature metadata
858    pub async fn get_metadata(&self, id: &str) -> Option<FeatureMetadata> {
859        self.metadata.read().await.get(id).cloned()
860    }
861
862    /// Delete a feature set
863    pub async fn delete(&self, id: &str) -> Result<()> {
864        self.features.write().await.remove(id);
865        self.metadata.write().await.remove(id);
866        Ok(())
867    }
868}
869
870impl Default for FeatureStore {
871    fn default() -> Self {
872        Self::new()
873    }
874}
875
876#[cfg(test)]
877mod tests {
878    use super::*;
879
880    #[test]
881    fn test_feature_value_conversions() {
882        assert_eq!(FeatureValue::Numeric(2.5).as_numeric(), 2.5);
883        assert_eq!(FeatureValue::Boolean(true).as_numeric(), 1.0);
884        assert_eq!(FeatureValue::Boolean(false).as_numeric(), 0.0);
885        assert!(FeatureValue::Categorical("test".into())
886            .as_numeric()
887            .is_nan());
888        assert!(FeatureValue::Missing.is_missing());
889    }
890
891    #[test]
892    fn test_feature_creation() {
893        let num_feature = Feature::numeric("value", 42.0);
894        assert_eq!(num_feature.name, "value");
895        assert_eq!(num_feature.value.as_numeric(), 42.0);
896
897        let cat_feature = Feature::categorical("category", "A");
898        assert_eq!(cat_feature.name, "category");
899
900        let bool_feature = Feature::boolean("flag", true);
901        assert_eq!(bool_feature.value.as_numeric(), 1.0);
902    }
903
904    #[test]
905    fn test_feature_set() {
906        let mut feature_set = FeatureSet::new();
907        feature_set.add_feature(Feature::numeric("x", 1.0));
908        feature_set.add_feature(Feature::numeric("y", 2.0));
909        feature_set.add_feature(Feature::categorical("cat", "A"));
910
911        assert_eq!(feature_set.features.len(), 3);
912        assert!(feature_set.get_feature("x").is_some());
913        assert!(feature_set.get_feature("missing").is_none());
914
915        let numeric_array = feature_set.to_numeric_array();
916        assert_eq!(numeric_array.len(), 3);
917        assert_eq!(numeric_array[0], 1.0);
918        assert_eq!(numeric_array[1], 2.0);
919    }
920
921    #[tokio::test]
922    async fn test_pipeline_creation() {
923        let pipeline = FeaturePipeline::new();
924        assert_eq!(pipeline.transform_count(), 0);
925    }
926
927    #[tokio::test]
928    async fn test_add_transforms() {
929        let mut pipeline = FeaturePipeline::new();
930        pipeline
931            .add_transform(FeatureTransform::StandardScaler)
932            .add_transform(FeatureTransform::RollingMean { window: 5 });
933
934        assert_eq!(pipeline.transform_count(), 2);
935    }
936
937    #[tokio::test]
938    async fn test_standard_scaler() {
939        let mut pipeline = FeaturePipeline::new();
940        pipeline.add_transform(FeatureTransform::StandardScaler);
941
942        // Training data
943        let mut training_data = Vec::new();
944        for i in 0..10 {
945            let mut fs = FeatureSet::new();
946            fs.add_feature(Feature::numeric("value", (i * 10) as f64));
947            training_data.push(fs);
948        }
949
950        pipeline.fit(&training_data).await.unwrap();
951
952        // Transform new data
953        let mut test_fs = FeatureSet::new();
954        test_fs.add_feature(Feature::numeric("value", 50.0));
955
956        let result = pipeline.transform(&test_fs).await.unwrap();
957        let value = result.get_feature("value").unwrap().value.as_numeric();
958
959        // After standard scaling, mean should be 0, std should be 1
960        // Value 50 should be close to 0 (since mean is 45)
961        assert!((value.abs()) < 1.0);
962    }
963
964    #[tokio::test]
965    async fn test_min_max_scaler() {
966        let mut pipeline = FeaturePipeline::new();
967        pipeline.add_transform(FeatureTransform::MinMaxScaler { min: 0.0, max: 1.0 });
968
969        let mut training_data = Vec::new();
970        for i in 0..10 {
971            let mut fs = FeatureSet::new();
972            fs.add_feature(Feature::numeric("value", (i * 10) as f64));
973            training_data.push(fs);
974        }
975
976        pipeline.fit(&training_data).await.unwrap();
977
978        let mut test_fs = FeatureSet::new();
979        test_fs.add_feature(Feature::numeric("value", 90.0)); // Max value
980
981        let result = pipeline.transform(&test_fs).await.unwrap();
982        let value = result.get_feature("value").unwrap().value.as_numeric();
983
984        assert!((value - 1.0).abs() < 0.01); // Should be close to 1.0
985    }
986
987    #[tokio::test]
988    async fn test_polynomial_features() {
989        let mut pipeline = FeaturePipeline::new();
990        pipeline.add_transform(FeatureTransform::PolynomialFeatures { degree: 2 });
991
992        let mut fs = FeatureSet::new();
993        fs.add_feature(Feature::numeric("x", 3.0));
994
995        let result = pipeline.transform(&fs).await.unwrap();
996
997        // Should have original + polynomial features
998        assert!(result.features.len() >= 2);
999        let x_pow2 = result.get_feature("x_pow2").unwrap();
1000        assert_eq!(x_pow2.value.as_numeric(), 9.0);
1001    }
1002
1003    #[tokio::test]
1004    async fn test_interaction_features() {
1005        let mut pipeline = FeaturePipeline::new();
1006        pipeline.add_transform(FeatureTransform::InteractionFeatures {
1007            pairs: vec![("x".to_string(), "y".to_string())],
1008        });
1009
1010        let mut fs = FeatureSet::new();
1011        fs.add_feature(Feature::numeric("x", 2.0));
1012        fs.add_feature(Feature::numeric("y", 3.0));
1013
1014        let result = pipeline.transform(&fs).await.unwrap();
1015
1016        let interaction = result.get_feature("x_y_interaction").unwrap();
1017        assert_eq!(interaction.value.as_numeric(), 6.0);
1018    }
1019
1020    #[tokio::test]
1021    async fn test_label_encoder() {
1022        let mut pipeline = FeaturePipeline::new();
1023        pipeline.add_transform(FeatureTransform::LabelEncoder {
1024            columns: vec!["category".to_string()],
1025        });
1026
1027        let mut training_data = Vec::new();
1028        for cat in &["A", "B", "C", "A", "B"] {
1029            let mut fs = FeatureSet::new();
1030            fs.add_feature(Feature::categorical("category", *cat));
1031            training_data.push(fs);
1032        }
1033
1034        pipeline.fit(&training_data).await.unwrap();
1035
1036        let mut test_fs = FeatureSet::new();
1037        test_fs.add_feature(Feature::categorical("category", "B"));
1038
1039        let result = pipeline.transform(&test_fs).await.unwrap();
1040        let encoded = result.get_feature("category").unwrap().value.as_numeric();
1041
1042        assert!(!encoded.is_nan());
1043        assert!(encoded >= 0.0);
1044    }
1045
1046    #[tokio::test]
1047    async fn test_feature_extraction() {
1048        let pipeline = FeaturePipeline::with_config(FeatureExtractionConfig {
1049            extract_time_features: true,
1050            ..Default::default()
1051        });
1052
1053        let mut event_data = HashMap::new();
1054        event_data.insert("temperature".to_string(), serde_json::json!(23.5));
1055        event_data.insert("humidity".to_string(), serde_json::json!(65.0));
1056        event_data.insert("location".to_string(), serde_json::json!("room_A"));
1057
1058        let features = pipeline.extract_features(&event_data).await.unwrap();
1059
1060        assert!(features.get_feature("temperature").is_some());
1061        assert!(features.get_feature("humidity").is_some());
1062        assert!(features.get_feature("location").is_some());
1063        assert!(features.get_feature("hour_of_day").is_some());
1064    }
1065
1066    #[tokio::test]
1067    async fn test_feature_store() {
1068        let store = FeatureStore::new();
1069
1070        let mut fs = FeatureSet::new();
1071        fs.add_feature(Feature::numeric("value", 42.0));
1072
1073        let metadata = FeatureMetadata {
1074            id: "test_1".to_string(),
1075            description: "Test features".to_string(),
1076            created_at: chrono::Utc::now(),
1077            updated_at: chrono::Utc::now(),
1078            version: "1.0".to_string(),
1079            tags: vec!["test".to_string()],
1080        };
1081
1082        store.store("test_1", fs.clone(), metadata).await.unwrap();
1083
1084        let retrieved = store.retrieve("test_1").await;
1085        assert!(retrieved.is_some());
1086
1087        let ids = store.list_ids().await;
1088        assert_eq!(ids.len(), 1);
1089        assert_eq!(ids[0], "test_1");
1090
1091        store.delete("test_1").await.unwrap();
1092        assert!(store.retrieve("test_1").await.is_none());
1093    }
1094
1095    #[tokio::test]
1096    async fn test_pipeline_stats() {
1097        let mut pipeline = FeaturePipeline::new();
1098        pipeline.add_transform(FeatureTransform::StandardScaler);
1099
1100        let mut fs = FeatureSet::new();
1101        fs.add_feature(Feature::numeric("value", 42.0));
1102
1103        let _ = pipeline.transform(&fs).await;
1104
1105        let stats = pipeline.get_stats().await;
1106        assert_eq!(stats.total_transformations, 1);
1107        assert!(stats.avg_transform_time_ms >= 0.0);
1108    }
1109}