feature_factory/transformers/
datetime.rs1use crate::exceptions::{FeatureFactoryError, FeatureFactoryResult};
14use crate::impl_transformer;
15use datafusion::arrow::datatypes::DataType;
16use datafusion::dataframe::DataFrame;
17use datafusion_expr::{col, lit, Expr};
18use datafusion_functions::datetime::{date_part, to_unixtime};
19use std::ops::{Div, Sub};
20
21fn validate_datetime_column(df: &DataFrame, col_name: &str) -> FeatureFactoryResult<()> {
23 let field = df.schema().field_with_name(None, col_name).map_err(|_| {
24 FeatureFactoryError::MissingColumn(format!("Column '{}' not found", col_name))
25 })?;
26 match field.data_type() {
27 DataType::Timestamp(_, _) | DataType::Date32 | DataType::Date64 => Ok(()),
28 dt => Err(FeatureFactoryError::InvalidParameter(format!(
29 "Column '{}' must be a datetime type (Timestamp, Date32, or Date64), but found {:?}",
30 col_name, dt
31 ))),
32 }
33}
34
35pub struct DatetimeFeatures {
40 pub columns: Vec<String>,
41}
42
43impl DatetimeFeatures {
44 pub fn new(columns: Vec<String>) -> Self {
45 Self { columns }
46 }
47
48 pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
50 Ok(())
51 }
52
53 pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
56 for col_name in &self.columns {
58 validate_datetime_column(&df, col_name)?;
59 }
60 let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
62 for col_name in &self.columns {
64 let base = col(col_name);
65 let year_expr = date_part()
66 .call(vec![lit("year"), base.clone()])
67 .alias(format!("{}_year", col_name));
68 let month_expr = date_part()
69 .call(vec![lit("month"), base.clone()])
70 .alias(format!("{}_month", col_name));
71 let day_expr = date_part()
72 .call(vec![lit("day"), base.clone()])
73 .alias(format!("{}_day", col_name));
74 let hour_expr = date_part()
75 .call(vec![lit("hour"), base.clone()])
76 .alias(format!("{}_hour", col_name));
77 let minute_expr = date_part()
78 .call(vec![lit("minute"), base.clone()])
79 .alias(format!("{}_minute", col_name));
80 let second_expr = date_part()
81 .call(vec![lit("second"), base.clone()])
82 .alias(format!("{}_second", col_name));
83 let weekday_expr = date_part()
84 .call(vec![lit("dow"), base.clone()])
85 .alias(format!("{}_weekday", col_name));
86 exprs.push(year_expr);
87 exprs.push(month_expr);
88 exprs.push(day_expr);
89 exprs.push(hour_expr);
90 exprs.push(minute_expr);
91 exprs.push(second_expr);
92 exprs.push(weekday_expr);
93 }
94 df.select(exprs)
95 .map_err(FeatureFactoryError::DataFusionError)
96 }
97
98 fn inherent_is_stateful(&self) -> bool {
100 false
101 }
102}
103
104pub enum TimeUnit {
106 Second,
107 Minute,
108 Hour,
109 Day,
110}
111
112impl TimeUnit {
113 pub fn as_str(&self) -> &'static str {
114 match self {
115 TimeUnit::Second => "second",
116 TimeUnit::Minute => "minute",
117 TimeUnit::Hour => "hour",
118 TimeUnit::Day => "day",
119 }
120 }
121}
122
123fn timestamp_diff_expr(left: Expr, right: Expr, unit: &str) -> Expr {
127 let left_sec = to_unixtime().call(vec![left]);
128 let right_sec = to_unixtime().call(vec![right]);
129 let diff_in_seconds = left_sec.sub(right_sec);
130 match unit {
131 "second" => diff_in_seconds,
132 "minute" => diff_in_seconds.div(lit(60.0)),
133 "hour" => diff_in_seconds.div(lit(3600.0)),
134 "day" => diff_in_seconds.div(lit(86400.0)),
135 _ => diff_in_seconds,
136 }
137}
138
139pub struct DatetimeSubtraction {
143 pub new_features: Vec<(String, String, String, TimeUnit)>,
144}
145
146impl DatetimeSubtraction {
147 pub fn new(new_features: Vec<(String, String, String, TimeUnit)>) -> Self {
148 Self { new_features }
149 }
150
151 pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
153 Ok(())
154 }
155
156 pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
159 for (_, left, right, _) in &self.new_features {
161 validate_datetime_column(&df, left)?;
163 validate_datetime_column(&df, right)?;
164 }
165 let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
166 for (new_name, left, right, unit) in &self.new_features {
167 let diff_expr =
168 timestamp_diff_expr(col(left), col(right), unit.as_str()).alias(new_name);
169 exprs.push(diff_expr);
170 }
171 df.select(exprs)
172 .map_err(FeatureFactoryError::DataFusionError)
173 }
174
175 fn inherent_is_stateful(&self) -> bool {
177 false
178 }
179}
180
181impl_transformer!(DatetimeFeatures);
183impl_transformer!(DatetimeSubtraction);