Skip to main content

feature_factory/transformers/
feature_creation.rs

1//! ## Feature Creation Transformers
2//!
3//! This module provides transformers for generating new features from existing ones using mathematical operations, relationships, and cyclical encoding.
4//!
5//! ### Available Transformers
6//!
7//! - [`MathFeatures`]: Creates new features by applying arbitrary mathematical operations or expressions.
8//! - [`RelativeFeatures`]: Combines features with reference variables using operations such as ratio, difference, or percent change.
9//! - [`CyclicalFeatures`]: Encodes cyclical features using sine or cosine transformations, e.g., to represent hours or months in a periodic manner.
10//!
11//! Each transformer returns a new DataFrame with the transformed columns.
12//! Errors are returned as [`FeatureFactoryError`], and results are wrapped in [`FeatureFactoryResult`].
13
14use crate::exceptions::{FeatureFactoryError, FeatureFactoryResult};
15use crate::impl_transformer;
16use datafusion::dataframe::DataFrame;
17use datafusion_expr::{col, lit, Expr};
18use std::ops::{Div, Mul, Sub};
19
20/// Creates new features using arbitrary mathematical operations or expressions.
21/// The input is a vector of tuples with the following fields for each new feature:
22/// (new_feature_name, math expression to be computed).
23pub struct MathFeatures {
24    pub features: Vec<(String, Expr)>,
25}
26
27impl MathFeatures {
28    pub fn new(features: Vec<(String, Expr)>) -> Self {
29        // Check that each new feature name is not empty.
30        for (name, _) in &features {
31            if name.trim().is_empty() {
32                panic!("MathFeatures: feature name cannot be empty");
33            }
34        }
35        Self { features }
36    }
37
38    /// Stateless transformer: fit does nothing.
39    pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
40        Ok(())
41    }
42
43    /// Adds the new features to the existing DataFrame.
44    pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
45        let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
46        for (name, expr) in &self.features {
47            exprs.push(expr.clone().alias(name));
48        }
49        df.select(exprs).map_err(FeatureFactoryError::from)
50    }
51
52    // This transformer is stateless.
53    fn inherent_is_stateful(&self) -> bool {
54        false
55    }
56}
57
58/// Operations available for computing relative features.
59pub enum RelativeOperation {
60    Ratio,         // target / reference
61    Difference,    // target - reference
62    PercentChange, // (target - reference) / reference
63}
64
65/// Creates new features by combining a target feature with a reference feature.
66/// Input is a vector of tuples with the following fields for each new feature:
67/// (new_feature_name, target_feature, reference_feature, operation).
68pub struct RelativeFeatures {
69    pub features: Vec<(String, String, String, RelativeOperation)>,
70}
71
72impl RelativeFeatures {
73    pub fn new(features: Vec<(String, String, String, RelativeOperation)>) -> Self {
74        // Check that new feature names, target, and reference names are not empty.
75        for (new_name, target, reference, _) in &features {
76            if new_name.trim().is_empty() {
77                panic!("RelativeFeatures: new feature name cannot be empty");
78            }
79            if target.trim().is_empty() || reference.trim().is_empty() {
80                panic!("RelativeFeatures: target and reference names must be non-empty");
81            }
82        }
83        Self { features }
84    }
85
86    /// Stateless transformer: fit does nothing.
87    pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
88        Ok(())
89    }
90
91    /// Validates that the target and reference columns exist.
92    fn validate(&self, df: &DataFrame) -> FeatureFactoryResult<()> {
93        for (_, target, reference, _) in &self.features {
94            df.schema().field_with_name(None, target).map_err(|_| {
95                FeatureFactoryError::MissingColumn(format!("Target column '{}' not found", target))
96            })?;
97            df.schema().field_with_name(None, reference).map_err(|_| {
98                FeatureFactoryError::MissingColumn(format!(
99                    "Reference column '{}' not found",
100                    reference
101                ))
102            })?;
103        }
104        Ok(())
105    }
106
107    /// Adds the relative features to the DataFrame.
108    pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
109        self.validate(&df)?;
110        let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
111        for (new_name, target, reference, op) in &self.features {
112            let expr = match op {
113                RelativeOperation::Ratio => col(target).div(col(reference)),
114                RelativeOperation::Difference => col(target).sub(col(reference)),
115                RelativeOperation::PercentChange => {
116                    col(target).sub(col(reference)).div(col(reference))
117                }
118            };
119            exprs.push(expr.alias(new_name));
120        }
121        df.select(exprs).map_err(FeatureFactoryError::from)
122    }
123
124    // This transformer is stateless.
125    fn inherent_is_stateful(&self) -> bool {
126        false
127    }
128}
129
130/// Methods for encoding cyclical features.
131pub enum CyclicalMethod {
132    Sine,
133    Cosine,
134}
135
136/// Encodes a cyclical variable by computing either a sine or cosine transformation.
137/// The input is a vector of tuples with the following fields for each new feature:
138/// (new_feature_name, source_feature, period, method).
139pub struct CyclicalFeatures {
140    pub features: Vec<(String, String, f64, CyclicalMethod)>,
141}
142
143impl CyclicalFeatures {
144    pub fn new(features: Vec<(String, String, f64, CyclicalMethod)>) -> Self {
145        // Validate that new feature names and source feature names are non-empty and period is positive.
146        for (new_name, source, period, _) in &features {
147            if new_name.trim().is_empty() {
148                panic!("CyclicalFeatures: new feature name cannot be empty");
149            }
150            if source.trim().is_empty() {
151                panic!("CyclicalFeatures: source feature name must be non-empty");
152            }
153            if *period <= 0.0 {
154                panic!("CyclicalFeatures: period must be positive, got {}", period);
155            }
156        }
157        Self { features }
158    }
159
160    /// Stateless transformer: fit does nothing.
161    pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
162        Ok(())
163    }
164
165    /// Validates that each source column exists.
166    fn validate(&self, df: &DataFrame) -> FeatureFactoryResult<()> {
167        for (_, source, period, _) in &self.features {
168            df.schema().field_with_name(None, source).map_err(|_| {
169                FeatureFactoryError::MissingColumn(format!("Source column '{}' not found", source))
170            })?;
171            if *period <= 0.0 {
172                return Err(FeatureFactoryError::InvalidParameter(format!(
173                    "CyclicalFeatures: period must be positive, got {}",
174                    period
175                )));
176            }
177        }
178        Ok(())
179    }
180
181    /// Adds the cyclical features to the DataFrame.
182    pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
183        self.validate(&df)?;
184        let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
185        for (new_name, source, period, method) in &self.features {
186            let base_expr = lit(2.0 * std::f64::consts::PI)
187                .mul(col(source))
188                .div(lit(*period));
189            let cyc_expr = match method {
190                CyclicalMethod::Sine => datafusion_functions::math::sin().call(vec![base_expr]),
191                CyclicalMethod::Cosine => datafusion_functions::math::cos().call(vec![base_expr]),
192            };
193            exprs.push(cyc_expr.alias(new_name));
194        }
195        df.select(exprs).map_err(FeatureFactoryError::from)
196    }
197
198    // This transformer is stateless.
199    fn inherent_is_stateful(&self) -> bool {
200        false
201    }
202}
203
204// Implement the Transformer trait for the transformers in this module.
205impl_transformer!(MathFeatures);
206impl_transformer!(RelativeFeatures);
207impl_transformer!(CyclicalFeatures);