feature_factory/transformers/
feature_creation.rs1use crate::exceptions::{FeatureFactoryError, FeatureFactoryResult};
15use crate::impl_transformer;
16use datafusion::dataframe::DataFrame;
17use datafusion_expr::{col, lit, Expr};
18use std::ops::{Div, Mul, Sub};
19
20pub struct MathFeatures {
24 pub features: Vec<(String, Expr)>,
25}
26
27impl MathFeatures {
28 pub fn new(features: Vec<(String, Expr)>) -> Self {
29 for (name, _) in &features {
31 if name.trim().is_empty() {
32 panic!("MathFeatures: feature name cannot be empty");
33 }
34 }
35 Self { features }
36 }
37
38 pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
40 Ok(())
41 }
42
43 pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
45 let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
46 for (name, expr) in &self.features {
47 exprs.push(expr.clone().alias(name));
48 }
49 df.select(exprs).map_err(FeatureFactoryError::from)
50 }
51
52 fn inherent_is_stateful(&self) -> bool {
54 false
55 }
56}
57
58pub enum RelativeOperation {
60 Ratio, Difference, PercentChange, }
64
65pub struct RelativeFeatures {
69 pub features: Vec<(String, String, String, RelativeOperation)>,
70}
71
72impl RelativeFeatures {
73 pub fn new(features: Vec<(String, String, String, RelativeOperation)>) -> Self {
74 for (new_name, target, reference, _) in &features {
76 if new_name.trim().is_empty() {
77 panic!("RelativeFeatures: new feature name cannot be empty");
78 }
79 if target.trim().is_empty() || reference.trim().is_empty() {
80 panic!("RelativeFeatures: target and reference names must be non-empty");
81 }
82 }
83 Self { features }
84 }
85
86 pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
88 Ok(())
89 }
90
91 fn validate(&self, df: &DataFrame) -> FeatureFactoryResult<()> {
93 for (_, target, reference, _) in &self.features {
94 df.schema().field_with_name(None, target).map_err(|_| {
95 FeatureFactoryError::MissingColumn(format!("Target column '{}' not found", target))
96 })?;
97 df.schema().field_with_name(None, reference).map_err(|_| {
98 FeatureFactoryError::MissingColumn(format!(
99 "Reference column '{}' not found",
100 reference
101 ))
102 })?;
103 }
104 Ok(())
105 }
106
107 pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
109 self.validate(&df)?;
110 let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
111 for (new_name, target, reference, op) in &self.features {
112 let expr = match op {
113 RelativeOperation::Ratio => col(target).div(col(reference)),
114 RelativeOperation::Difference => col(target).sub(col(reference)),
115 RelativeOperation::PercentChange => {
116 col(target).sub(col(reference)).div(col(reference))
117 }
118 };
119 exprs.push(expr.alias(new_name));
120 }
121 df.select(exprs).map_err(FeatureFactoryError::from)
122 }
123
124 fn inherent_is_stateful(&self) -> bool {
126 false
127 }
128}
129
130pub enum CyclicalMethod {
132 Sine,
133 Cosine,
134}
135
136pub struct CyclicalFeatures {
140 pub features: Vec<(String, String, f64, CyclicalMethod)>,
141}
142
143impl CyclicalFeatures {
144 pub fn new(features: Vec<(String, String, f64, CyclicalMethod)>) -> Self {
145 for (new_name, source, period, _) in &features {
147 if new_name.trim().is_empty() {
148 panic!("CyclicalFeatures: new feature name cannot be empty");
149 }
150 if source.trim().is_empty() {
151 panic!("CyclicalFeatures: source feature name must be non-empty");
152 }
153 if *period <= 0.0 {
154 panic!("CyclicalFeatures: period must be positive, got {}", period);
155 }
156 }
157 Self { features }
158 }
159
160 pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
162 Ok(())
163 }
164
165 fn validate(&self, df: &DataFrame) -> FeatureFactoryResult<()> {
167 for (_, source, period, _) in &self.features {
168 df.schema().field_with_name(None, source).map_err(|_| {
169 FeatureFactoryError::MissingColumn(format!("Source column '{}' not found", source))
170 })?;
171 if *period <= 0.0 {
172 return Err(FeatureFactoryError::InvalidParameter(format!(
173 "CyclicalFeatures: period must be positive, got {}",
174 period
175 )));
176 }
177 }
178 Ok(())
179 }
180
181 pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
183 self.validate(&df)?;
184 let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
185 for (new_name, source, period, method) in &self.features {
186 let base_expr = lit(2.0 * std::f64::consts::PI)
187 .mul(col(source))
188 .div(lit(*period));
189 let cyc_expr = match method {
190 CyclicalMethod::Sine => datafusion_functions::math::sin().call(vec![base_expr]),
191 CyclicalMethod::Cosine => datafusion_functions::math::cos().call(vec![base_expr]),
192 };
193 exprs.push(cyc_expr.alias(new_name));
194 }
195 df.select(exprs).map_err(FeatureFactoryError::from)
196 }
197
198 fn inherent_is_stateful(&self) -> bool {
200 false
201 }
202}
203
204impl_transformer!(MathFeatures);
206impl_transformer!(RelativeFeatures);
207impl_transformer!(CyclicalFeatures);