Skip to main content

feature_factory/transformers/
datetime.rs

1//! ## Datetime Feature Transformers
2//!
3//! This module provides transformers to extract or calculate features from datetime data.
4//!
5//! ### Available Transformers
6//!
7//! - [`DatetimeFeatures`]: Extracts features such as year, month, day, hour, minute, second, and weekday from datetime columns.
8//! - [`DatetimeSubtraction`]: Computes differences between datetime columns in specified units (seconds, minutes, hours, and days).
9//!
10//! Each transformer returns a new DataFrame with the added or modified columns.
11//! Errors are returned as `FeatureFactoryError`, and successful transformations are wrapped in `FeatureFactoryResult`.
12
13use crate::exceptions::{FeatureFactoryError, FeatureFactoryResult};
14use crate::impl_transformer;
15use datafusion::arrow::datatypes::DataType;
16use datafusion::dataframe::DataFrame;
17use datafusion_expr::{col, lit, Expr};
18use datafusion_functions::datetime::{date_part, to_unixtime};
19use std::ops::{Div, Sub};
20
21/// Validates that a column exists and is of a datetime type (Timestamp, Date32, or Date64).
22fn validate_datetime_column(df: &DataFrame, col_name: &str) -> FeatureFactoryResult<()> {
23    let field = df.schema().field_with_name(None, col_name).map_err(|_| {
24        FeatureFactoryError::MissingColumn(format!("Column '{}' not found", col_name))
25    })?;
26    match field.data_type() {
27        DataType::Timestamp(_, _) | DataType::Date32 | DataType::Date64 => Ok(()),
28        dt => Err(FeatureFactoryError::InvalidParameter(format!(
29            "Column '{}' must be a datetime type (Timestamp, Date32, or Date64), but found {:?}",
30            col_name, dt
31        ))),
32    }
33}
34
35/// Extracts features from datetime columns.
36/// For each column in `self.columns`, it adds the following new features:
37/// `<column>_year`, `<column>_month`, `<column>_day`, `<column>_hour`,
38/// `<column>_minute`, `<column>_second`, and `<column>_weekday`.
39pub struct DatetimeFeatures {
40    pub columns: Vec<String>,
41}
42
43impl DatetimeFeatures {
44    pub fn new(columns: Vec<String>) -> Self {
45        Self { columns }
46    }
47
48    /// Stateless transformer: fit does nothing.
49    pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
50        Ok(())
51    }
52
53    /// Transform validates that each target column exists and is a datetime type,
54    /// then returns a new DataFrame with the additional extracted features.
55    pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
56        // Validate each target column in transform.
57        for col_name in &self.columns {
58            validate_datetime_column(&df, col_name)?;
59        }
60        // Retain all original columns.
61        let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
62        // Add new features.
63        for col_name in &self.columns {
64            let base = col(col_name);
65            let year_expr = date_part()
66                .call(vec![lit("year"), base.clone()])
67                .alias(format!("{}_year", col_name));
68            let month_expr = date_part()
69                .call(vec![lit("month"), base.clone()])
70                .alias(format!("{}_month", col_name));
71            let day_expr = date_part()
72                .call(vec![lit("day"), base.clone()])
73                .alias(format!("{}_day", col_name));
74            let hour_expr = date_part()
75                .call(vec![lit("hour"), base.clone()])
76                .alias(format!("{}_hour", col_name));
77            let minute_expr = date_part()
78                .call(vec![lit("minute"), base.clone()])
79                .alias(format!("{}_minute", col_name));
80            let second_expr = date_part()
81                .call(vec![lit("second"), base.clone()])
82                .alias(format!("{}_second", col_name));
83            let weekday_expr = date_part()
84                .call(vec![lit("dow"), base.clone()])
85                .alias(format!("{}_weekday", col_name));
86            exprs.push(year_expr);
87            exprs.push(month_expr);
88            exprs.push(day_expr);
89            exprs.push(hour_expr);
90            exprs.push(minute_expr);
91            exprs.push(second_expr);
92            exprs.push(weekday_expr);
93        }
94        df.select(exprs)
95            .map_err(FeatureFactoryError::DataFusionError)
96    }
97
98    // This transformer is stateless.
99    fn inherent_is_stateful(&self) -> bool {
100        false
101    }
102}
103
104/// Time units for datetime subtraction.
105pub enum TimeUnit {
106    Second,
107    Minute,
108    Hour,
109    Day,
110}
111
112impl TimeUnit {
113    pub fn as_str(&self) -> &'static str {
114        match self {
115            TimeUnit::Second => "second",
116            TimeUnit::Minute => "minute",
117            TimeUnit::Hour => "hour",
118            TimeUnit::Day => "day",
119        }
120    }
121}
122
123/// Helper function to compute timestamp difference between two datetime expressions.
124/// It converts both expressions to Unix time (in seconds) using `to_unixtime`,
125/// subtracts them, and then converts the difference to the desired unit.
126fn timestamp_diff_expr(left: Expr, right: Expr, unit: &str) -> Expr {
127    let left_sec = to_unixtime().call(vec![left]);
128    let right_sec = to_unixtime().call(vec![right]);
129    let diff_in_seconds = left_sec.sub(right_sec);
130    match unit {
131        "second" => diff_in_seconds,
132        "minute" => diff_in_seconds.div(lit(60.0)),
133        "hour" => diff_in_seconds.div(lit(3600.0)),
134        "day" => diff_in_seconds.div(lit(86400.0)),
135        _ => diff_in_seconds,
136    }
137}
138
139/// Computes time differences between two datetime columns.
140/// `new_features` is a list of tuples: (new_feature_name, left_column, right_column, time_unit).
141/// Transform validates that each left and right column exists and is of a datetime type.
142pub struct DatetimeSubtraction {
143    pub new_features: Vec<(String, String, String, TimeUnit)>,
144}
145
146impl DatetimeSubtraction {
147    pub fn new(new_features: Vec<(String, String, String, TimeUnit)>) -> Self {
148        Self { new_features }
149    }
150
151    /// Stateless transformer: fit does nothing.
152    pub async fn fit(&mut self, _df: &DataFrame) -> FeatureFactoryResult<()> {
153        Ok(())
154    }
155
156    /// Transform validates that each left and right column exists and is a datetime type,
157    /// then adds a new column for each specified time difference.
158    pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
159        // Validate that each left and right column is a datetime type.
160        for (_, left, right, _) in &self.new_features {
161            // These calls now ensure that the column exists *and* is of a valid datetime type.
162            validate_datetime_column(&df, left)?;
163            validate_datetime_column(&df, right)?;
164        }
165        let mut exprs: Vec<Expr> = df.schema().fields().iter().map(|f| col(f.name())).collect();
166        for (new_name, left, right, unit) in &self.new_features {
167            let diff_expr =
168                timestamp_diff_expr(col(left), col(right), unit.as_str()).alias(new_name);
169            exprs.push(diff_expr);
170        }
171        df.select(exprs)
172            .map_err(FeatureFactoryError::DataFusionError)
173    }
174
175    // This transformer is stateless.
176    fn inherent_is_stateful(&self) -> bool {
177        false
178    }
179}
180
181// Implement the Transformer trait for the transformers in this module.
182impl_transformer!(DatetimeFeatures);
183impl_transformer!(DatetimeSubtraction);