use crate::exceptions::{FeatureFactoryError, FeatureFactoryResult};
use async_trait::async_trait;
use datafusion::prelude::*;
use std::time::Instant;
#[async_trait]
pub trait Transformer {
async fn fit(&mut self, df: &DataFrame) -> FeatureFactoryResult<()>;
fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame>;
fn is_stateful(&self) -> bool;
}
#[macro_export]
macro_rules! impl_transformer {
($ty:ty) => {
#[async_trait::async_trait]
impl $crate::pipeline::Transformer for $ty {
async fn fit(
&mut self,
df: &datafusion::prelude::DataFrame,
) -> $crate::exceptions::FeatureFactoryResult<()> {
<$ty>::fit(self, df).await
}
fn transform(
&self,
df: datafusion::prelude::DataFrame,
) -> $crate::exceptions::FeatureFactoryResult<datafusion::prelude::DataFrame> {
<$ty>::transform(self, df)
}
fn is_stateful(&self) -> bool {
<$ty>::inherent_is_stateful(self)
}
}
};
}
pub struct Pipeline {
steps: Vec<(String, Box<dyn Transformer + Send + Sync>)>,
verbose: bool,
}
impl Pipeline {
pub fn new(steps: Vec<(String, Box<dyn Transformer + Send + Sync>)>, verbose: bool) -> Self {
Self { steps, verbose }
}
pub async fn fit(&mut self, df: &DataFrame) -> FeatureFactoryResult<DataFrame> {
if self.steps.is_empty() {
return Err(FeatureFactoryError::InvalidParameter(
"Pipeline must have at least one transformer.".to_string(),
));
}
let mut current_df = df.clone();
for (name, step) in self.steps.iter_mut() {
if self.verbose {
println!("Fitting step: {}", name);
}
let start = Instant::now();
step.fit(¤t_df).await.map_err(|e| {
FeatureFactoryError::InvalidParameter(format!(
"Error fitting transformer '{}': {:?}",
name, e
))
})?;
current_df = step.transform(current_df).map_err(|e| {
FeatureFactoryError::InvalidParameter(format!(
"Error transforming in '{}': {:?}",
name, e
))
})?;
if self.verbose {
println!("Step '{}' completed in {:?}", name, start.elapsed());
}
}
Ok(current_df)
}
pub fn transform(&self, df: DataFrame) -> FeatureFactoryResult<DataFrame> {
if self.steps.is_empty() {
return Err(FeatureFactoryError::InvalidParameter(
"Pipeline must have at least one transformer.".to_string(),
));
}
let mut current_df = df;
for (name, step) in self.steps.iter() {
if self.verbose {
println!("Applying transformer: {}", name);
}
current_df = step.transform(current_df).map_err(|e| {
FeatureFactoryError::InvalidParameter(format!(
"Error in transformer '{}': {:?}",
name, e
))
})?;
}
Ok(current_df)
}
pub async fn fit_transform(&mut self, df: &DataFrame) -> FeatureFactoryResult<DataFrame> {
self.fit(df).await
}
}
#[macro_export]
macro_rules! make_pipeline {
($verbose:expr, $(($name:expr, $transformer:expr)),+ $(,)?) => {
{
let steps: Vec<(String, Box<dyn $crate::pipeline::Transformer + Send + Sync>)> = vec![
$(
($name.to_string(), Box::new($transformer)),
)+
];
$crate::pipeline::Pipeline::new(steps, $verbose)
}
};
}