use crate::dataset::binner::QuantileBinner;
use crate::dataset::{BinnedDataset, FeatureInfo, FeatureType};
use crate::defaults::{gbdt as gbdt_defaults, preprocessing as preprocessing_defaults};
use crate::encoding::{CategoryFilter, CategoryMapping, EncodingMap, OrderedTargetEncoder};
use crate::{Result, TreeBoostError};
use polars::prelude::*;
use rkyv::{Archive, Deserialize, Serialize};
use std::path::Path;
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub num_bins: usize,
pub cms_eps: f64,
pub cms_confidence: f64,
pub min_category_count: u64,
pub target_encoding_smoothing: f64,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
num_bins: gbdt_defaults::DEFAULT_NUM_BINS,
cms_eps: preprocessing_defaults::CMS_EPSILON,
cms_confidence: preprocessing_defaults::CMS_CONFIDENCE,
min_category_count: preprocessing_defaults::MIN_CATEGORY_COUNT,
target_encoding_smoothing: preprocessing_defaults::TARGET_ENCODING_SMOOTHING,
}
}
}
impl PipelineConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_num_bins(mut self, num_bins: usize) -> Self {
self.num_bins = num_bins;
self
}
pub fn with_cms_params(mut self, eps: f64, confidence: f64, min_count: u64) -> Self {
self.cms_eps = eps;
self.cms_confidence = confidence;
self.min_category_count = min_count;
self
}
pub fn with_smoothing(mut self, smoothing: f64) -> Self {
self.target_encoding_smoothing = smoothing;
self
}
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize)]
pub struct CategoricalEncodingState {
pub name: String,
pub category_mapping: CategoryMapping,
pub encoding_map: EncodingMap,
pub bin_boundaries: Vec<f64>,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize)]
pub struct PipelineState {
pub feature_info: Vec<FeatureInfo>,
pub categorical_encodings: Vec<CategoricalEncodingState>,
pub column_order: Vec<String>,
pub categorical_indices: Vec<usize>,
}
pub struct DataPipeline {
config: PipelineConfig,
binner: QuantileBinner,
}
impl DataPipeline {
pub fn new(config: PipelineConfig) -> Self {
Self {
binner: QuantileBinner::new(config.num_bins),
config,
}
}
pub fn with_defaults() -> Self {
Self::new(PipelineConfig::default())
}
pub fn default_config() -> Self {
Self::new(PipelineConfig::default())
}
pub fn load_csv_for_training(
&self,
path: impl AsRef<Path>,
target_column: &str,
categorical_columns: Option<&[&str]>,
) -> Result<(BinnedDataset, PipelineState, DataFrame)> {
let df = CsvReadOptions::default()
.try_into_reader_with_file_path(Some(path.as_ref().to_path_buf()))?
.finish()?;
self.process_for_training(df, target_column, categorical_columns)
}
pub fn load_parquet_for_training(
&self,
path: impl AsRef<Path>,
target_column: &str,
categorical_columns: Option<&[&str]>,
) -> Result<(BinnedDataset, PipelineState, DataFrame)> {
let pl_path = PlPath::new(&path.as_ref().to_string_lossy());
let df = LazyFrame::scan_parquet(pl_path, Default::default())?.collect()?;
self.process_for_training(df, target_column, categorical_columns)
}
pub fn process_for_training(
&self,
df: DataFrame,
target_column: &str,
categorical_columns: Option<&[&str]>,
) -> Result<(BinnedDataset, PipelineState, DataFrame)> {
let _num_rows = df.height();
let target_col = df.column(target_column).map_err(|e| {
TreeBoostError::Data(format!(
"Target column '{}' not found: {}",
target_column, e
))
})?;
let targets: Vec<f64> = self.series_to_f64(target_col.as_materialized_series())?;
let mut targets_filled: Vec<f64> = targets;
for t in targets_filled.iter_mut() {
if t.is_nan() {
*t = 0.0;
}
}
let targets_f32: Vec<f32> = targets_filled.iter().map(|&t| t as f32).collect();
let targets_filtered: Vec<f64> = targets_filled;
let num_rows = targets_f32.len();
let feature_names: Vec<String> = df
.get_column_names()
.into_iter()
.filter(|name| *name != target_column)
.map(|s| s.to_string())
.collect();
let categorical_set: std::collections::HashSet<&str> = match categorical_columns {
Some(cols) => cols.iter().copied().collect(),
None => {
feature_names
.iter()
.filter(|name| {
matches!(
df.column(name.as_str()).map(|c| c.dtype().clone()),
Ok(DataType::String) | Ok(DataType::Categorical(_, _))
)
})
.map(|s| s.as_str())
.collect()
}
};
let mut all_binned: Vec<Vec<u8>> = Vec::with_capacity(feature_names.len());
let mut all_info: Vec<FeatureInfo> = Vec::with_capacity(feature_names.len());
let mut categorical_encodings: Vec<CategoricalEncodingState> = Vec::new();
let mut categorical_indices: Vec<usize> = Vec::new();
let mut encoded_series: Vec<(String, Vec<f64>)> = Vec::new();
for (col_idx, name) in feature_names.iter().enumerate() {
let col = df.column(name.as_str()).map_err(|e| {
TreeBoostError::Data(format!("Feature column '{}' not found: {}", name, e))
})?;
let series = col.as_materialized_series();
if categorical_set.contains(name.as_str()) {
let (binned, info, encoding_state, encoded_values) = self
.process_categorical_column_with_values(
name.clone(),
series,
&targets_filtered,
)?;
all_binned.push(binned);
all_info.push(info);
categorical_encodings.push(encoding_state);
categorical_indices.push(col_idx);
encoded_series.push((name.clone(), encoded_values)); } else {
let (binned, info) = self.process_numeric_column(name.clone(), series)?;
all_binned.push(binned);
all_info.push(info);
}
}
let mut features = Vec::with_capacity(num_rows * all_binned.len());
for binned_col in all_binned {
features.extend(binned_col);
}
let dataset = BinnedDataset::new(num_rows, features, targets_f32.clone(), all_info.clone());
let state = PipelineState {
feature_info: all_info,
categorical_encodings,
column_order: feature_names.clone(),
categorical_indices,
};
let encoded_cat_map: std::collections::HashMap<&str, &Vec<f64>> = encoded_series
.iter()
.map(|(name, vals)| (name.as_str(), vals))
.collect();
let mut new_columns: Vec<polars::prelude::Column> = Vec::new();
for name in feature_names.iter() {
if let Some(encoded_vals) = encoded_cat_map.get(name.as_str()) {
new_columns.push(Series::new(name.as_str().into(), *encoded_vals).into());
} else {
let col = df.column(name.as_str())?;
new_columns.push(col.clone());
}
}
let target_col = df.column(target_column)?;
new_columns.push(target_col.clone());
let filtered_df = DataFrame::new(new_columns).map_err(|e| {
TreeBoostError::Data(format!("Failed to build filtered DataFrame: {}", e))
})?;
Ok((dataset, state, filtered_df))
}
pub fn load_csv_for_inference(
&self,
path: impl AsRef<Path>,
state: &PipelineState,
) -> Result<BinnedDataset> {
let df = CsvReadOptions::default()
.try_into_reader_with_file_path(Some(path.as_ref().to_path_buf()))?
.finish()?;
let (_preprocessed_df, dataset) = self.process_for_inference(df, state)?;
Ok(dataset)
}
pub fn load_parquet_for_inference(
&self,
path: impl AsRef<Path>,
state: &PipelineState,
) -> Result<BinnedDataset> {
let pl_path = PlPath::new(&path.as_ref().to_string_lossy());
let df = LazyFrame::scan_parquet(pl_path, Default::default())?.collect()?;
let (_preprocessed_df, dataset) = self.process_for_inference(df, state)?;
Ok(dataset)
}
pub fn process_for_inference(
&self,
df: DataFrame,
state: &PipelineState,
) -> Result<(DataFrame, BinnedDataset)> {
let num_rows = df.height();
let cat_state_map: std::collections::HashMap<&str, &CategoricalEncodingState> = state
.categorical_encodings
.iter()
.map(|s| (s.name.as_str(), s))
.collect();
let cat_indices_set: std::collections::HashSet<usize> =
state.categorical_indices.iter().copied().collect();
let mut all_binned: Vec<Vec<u8>> = Vec::with_capacity(state.column_order.len());
let mut encoded_series: Vec<Series> = Vec::with_capacity(state.column_order.len());
for (col_idx, name) in state.column_order.iter().enumerate() {
let col = df.column(name.as_str()).map_err(|e| {
TreeBoostError::Data(format!("Feature column '{}' not found: {}", name, e))
})?;
let series = col.as_materialized_series();
if cat_indices_set.contains(&col_idx) {
let encoding_state = cat_state_map.get(name.as_str()).ok_or_else(|| {
TreeBoostError::Data(format!(
"Missing encoding state for categorical column '{}'",
name
))
})?;
let binned = self.apply_categorical_encoding(series, encoding_state)?;
let categories = self.series_to_strings(series)?;
let encoded_values: Vec<f64> = categories
.iter()
.map(|cat| {
let idx = encoding_state.category_mapping.get_index(cat);
if idx == encoding_state.category_mapping.unknown_idx {
encoding_state.encoding_map.default_value
} else {
encoding_state.encoding_map.encode(cat)
}
})
.collect();
encoded_series.push(Series::new(name.clone().into(), encoded_values));
all_binned.push(binned);
} else {
let info = &state.feature_info[col_idx];
let binned = self.apply_numeric_binning(series, info)?;
encoded_series.push(series.clone());
all_binned.push(binned);
}
}
let columns: Vec<_> = encoded_series
.into_iter()
.map(|s| s.into_column())
.collect();
let preprocessed_df = DataFrame::new(columns).map_err(|e| {
TreeBoostError::Data(format!("Failed to create preprocessed DataFrame: {}", e))
})?;
let mut features = Vec::with_capacity(num_rows * all_binned.len());
for binned_col in all_binned {
features.extend(binned_col);
}
let targets = vec![0.0f32; num_rows];
let dataset = BinnedDataset::new(num_rows, features, targets, state.feature_info.clone());
Ok((preprocessed_df, dataset))
}
fn process_numeric_column(
&self,
name: String,
series: &Series,
) -> Result<(Vec<u8>, FeatureInfo)> {
let mut values = self.series_to_f64(series)?;
let non_nan_values: Vec<f64> = values.iter().copied().filter(|v| !v.is_nan()).collect();
let impute_value = if !non_nan_values.is_empty() {
let mut sorted = non_nan_values.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mid = sorted.len() / 2;
if sorted.len() % 2 == 0 {
(sorted[mid - 1] + sorted[mid]) / 2.0
} else {
sorted[mid]
}
} else {
0.0 };
for v in values.iter_mut() {
if v.is_nan() {
*v = impute_value;
}
}
let boundaries = self.binner.compute_boundaries(&values);
let binned: Vec<u8> = values
.iter()
.map(|&v| QuantileBinner::bin_value(v, &boundaries))
.collect();
let info = FeatureInfo {
name,
feature_type: FeatureType::Numeric,
num_bins: (boundaries.len() + 1).min(255) as u8,
bin_boundaries: boundaries,
};
Ok((binned, info))
}
fn process_categorical_column(
&self,
name: String,
series: &Series,
targets: &[f64],
) -> Result<(Vec<u8>, FeatureInfo, CategoricalEncodingState)> {
let categories = self.series_to_strings(series)?;
let mut filter = CategoryFilter::new(
self.config.cms_eps,
self.config.cms_confidence,
self.config.min_category_count,
);
for cat in &categories {
filter.count(cat);
}
let unique: Vec<String> = categories
.iter()
.cloned()
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
filter.finalize(unique);
let filtered: Vec<String> = categories
.iter()
.map(|c| filter.filter(c).to_string())
.collect();
let category_mapping = CategoryMapping::from_filter(&filter);
let mut encoder = OrderedTargetEncoder::new(self.config.target_encoding_smoothing);
let encoded = encoder.encode_column(&filtered, targets);
let encoding_map = encoder.get_encoding_map();
let boundaries = self.binner.compute_boundaries(&encoded);
let binned: Vec<u8> = encoded
.iter()
.map(|&v| QuantileBinner::bin_value(v, &boundaries))
.collect();
let info = FeatureInfo {
name: name.clone(),
feature_type: FeatureType::Categorical,
num_bins: (boundaries.len() + 1).min(255) as u8,
bin_boundaries: boundaries.clone(),
};
let encoding_state = CategoricalEncodingState {
name,
category_mapping,
encoding_map,
bin_boundaries: boundaries,
};
Ok((binned, info, encoding_state))
}
fn process_categorical_column_with_values(
&self,
name: String,
series: &Series,
targets: &[f64],
) -> Result<(Vec<u8>, FeatureInfo, CategoricalEncodingState, Vec<f64>)> {
let (binned, info, encoding_state) =
self.process_categorical_column(name, series, targets)?;
let categories = self.series_to_strings(series)?;
let encoded: Vec<f64> = categories
.iter()
.map(|cat| {
let idx = encoding_state.category_mapping.get_index(cat);
if idx == encoding_state.category_mapping.unknown_idx {
encoding_state.encoding_map.default_value
} else {
encoding_state.encoding_map.encode(cat)
}
})
.collect();
Ok((binned, info, encoding_state, encoded))
}
fn apply_categorical_encoding(
&self,
series: &Series,
state: &CategoricalEncodingState,
) -> Result<Vec<u8>> {
let categories = self.series_to_strings(series)?;
let encoded: Vec<f64> = categories
.iter()
.map(|cat| {
let idx = state.category_mapping.get_index(cat);
if idx == state.category_mapping.unknown_idx {
state.encoding_map.default_value
} else {
state.encoding_map.encode(cat)
}
})
.collect();
let binned: Vec<u8> = encoded
.iter()
.map(|&v| QuantileBinner::bin_value(v, &state.bin_boundaries))
.collect();
Ok(binned)
}
fn apply_numeric_binning(&self, series: &Series, info: &FeatureInfo) -> Result<Vec<u8>> {
let values = self.series_to_f64(series)?;
let binned: Vec<u8> = values
.iter()
.map(|&v| QuantileBinner::bin_value(v, &info.bin_boundaries))
.collect();
Ok(binned)
}
fn series_to_f64(&self, series: &Series) -> Result<Vec<f64>> {
series
.cast(&DataType::Float64)
.map_err(|e| TreeBoostError::Data(format!("Failed to cast to f64: {}", e)))?
.f64()
.map_err(|e| TreeBoostError::Data(format!("Failed to get f64 chunked: {}", e)))?
.into_iter()
.map(|opt| Ok(opt.unwrap_or(f64::NAN)))
.collect()
}
fn series_to_strings(&self, series: &Series) -> Result<Vec<String>> {
let str_series = series
.cast(&DataType::String)
.map_err(|e| TreeBoostError::Data(format!("Failed to cast to String: {}", e)))?;
let str_chunked = str_series
.str()
.map_err(|e| TreeBoostError::Data(format!("Failed to get str chunked: {}", e)))?;
Ok(str_chunked
.into_iter()
.map(|opt| opt.unwrap_or("").to_string())
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_numeric_only() {
let df = df! {
"feature1" => &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"feature2" => &[10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0],
"target" => &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
}
.unwrap();
let pipeline = DataPipeline::new(PipelineConfig::new().with_num_bins(4));
let (dataset, state, _filtered_df) =
pipeline.process_for_training(df, "target", None).unwrap();
assert_eq!(dataset.num_rows(), 10);
assert_eq!(dataset.num_features(), 2);
assert_eq!(state.column_order.len(), 2);
assert!(state.categorical_indices.is_empty());
}
#[test]
fn test_pipeline_with_categoricals() {
let df = df! {
"numeric" => &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"category" => &["a", "a", "b", "b", "c", "c", "a", "b", "c", "rare"],
"target" => &[10.0, 12.0, 20.0, 22.0, 30.0, 32.0, 11.0, 21.0, 31.0, 5.0]
}
.unwrap();
let pipeline = DataPipeline::new(
PipelineConfig::new()
.with_num_bins(4)
.with_cms_params(0.01, 0.99, 2) .with_smoothing(1.0),
);
let (dataset, state, _filtered_df) = pipeline
.process_for_training(df, "target", Some(&["category"]))
.unwrap();
assert_eq!(dataset.num_rows(), 10);
assert_eq!(dataset.num_features(), 2);
assert_eq!(state.categorical_indices, vec![1]); assert_eq!(state.categorical_encodings.len(), 1);
let cat_state = &state.categorical_encodings[0];
assert!(!cat_state
.category_mapping
.category_to_idx
.iter()
.any(|(name, _)| name == "rare"));
}
#[test]
fn test_pipeline_inference() {
let train_df = df! {
"numeric" => &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"category" => &["a", "a", "b", "b", "c", "c", "a", "b", "c", "a"],
"target" => &[10.0, 12.0, 20.0, 22.0, 30.0, 32.0, 11.0, 21.0, 31.0, 13.0]
}
.unwrap();
let pipeline = DataPipeline::new(
PipelineConfig::new()
.with_num_bins(4)
.with_cms_params(0.01, 0.99, 2)
.with_smoothing(1.0),
);
let (_train_dataset, state, _filtered_df) = pipeline
.process_for_training(train_df, "target", Some(&["category"]))
.unwrap();
let test_df = df! {
"numeric" => &[2.5, 5.5, 8.5],
"category" => &["a", "b", "unseen"]
}
.unwrap();
let (_test_preprocessed_df, test_dataset) =
pipeline.process_for_inference(test_df, &state).unwrap();
assert_eq!(test_dataset.num_rows(), 3);
assert_eq!(test_dataset.num_features(), 2);
for row in 0..3 {
for feat in 0..2 {
let bin = test_dataset.get_bin(row, feat);
assert!(bin < state.feature_info[feat].num_bins);
}
}
}
#[test]
fn test_target_encoding_ordering() {
let df = df! {
"category" => &["a", "a", "a", "b", "b", "b"],
"target" => &[10.0, 20.0, 30.0, 100.0, 200.0, 300.0]
}
.unwrap();
let pipeline = DataPipeline::new(
PipelineConfig::new()
.with_num_bins(4)
.with_cms_params(0.01, 0.99, 1) .with_smoothing(0.0), );
let (dataset, state, _filtered_df) = pipeline
.process_for_training(df, "target", Some(&["category"]))
.unwrap();
assert_eq!(dataset.num_rows(), 6);
assert_eq!(state.categorical_encodings.len(), 1);
}
}