use crate::learner::StreamingLearner;
use crate::preprocessing::StreamingTargetPreprocessor;
pub trait StreamingPreprocessor: Send + Sync {
fn update_and_transform(&mut self, features: &[f64]) -> Vec<f64>;
fn transform(&self, features: &[f64]) -> Vec<f64>;
fn output_dim(&self) -> Option<usize>;
fn reset(&mut self);
}
pub struct PipelineBuilder {
preprocessors: Vec<Box<dyn StreamingPreprocessor>>,
target_preprocessor: Option<Box<dyn StreamingTargetPreprocessor>>,
}
impl PipelineBuilder {
pub fn new() -> Self {
Self {
preprocessors: Vec::new(),
target_preprocessor: None,
}
}
pub fn pipe(mut self, preprocessor: impl StreamingPreprocessor + 'static) -> Self {
self.preprocessors.push(Box::new(preprocessor));
self
}
pub fn target_preprocessor(mut self, tp: impl StreamingTargetPreprocessor + 'static) -> Self {
self.target_preprocessor = Some(Box::new(tp));
self
}
pub fn learner(self, learner: impl StreamingLearner + 'static) -> Pipeline {
Pipeline {
preprocessors: self.preprocessors,
learner: Box::new(learner),
target_preprocessor: self.target_preprocessor,
samples_seen: 0,
}
}
pub fn learner_boxed(self, learner: Box<dyn StreamingLearner>) -> Pipeline {
Pipeline {
preprocessors: self.preprocessors,
learner,
target_preprocessor: self.target_preprocessor,
samples_seen: 0,
}
}
}
impl Default for PipelineBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct Pipeline {
preprocessors: Vec<Box<dyn StreamingPreprocessor>>,
learner: Box<dyn StreamingLearner>,
target_preprocessor: Option<Box<dyn StreamingTargetPreprocessor>>,
samples_seen: u64,
}
impl Pipeline {
pub fn builder() -> PipelineBuilder {
PipelineBuilder::new()
}
pub fn n_preprocessors(&self) -> usize {
self.preprocessors.len()
}
pub fn learner(&self) -> &dyn StreamingLearner {
&*self.learner
}
pub fn learner_mut(&mut self) -> &mut dyn StreamingLearner {
&mut *self.learner
}
fn transform_features(&self, features: &[f64]) -> Vec<f64> {
let mut x = features.to_vec();
for preprocessor in &self.preprocessors {
x = preprocessor.transform(&x);
}
x
}
fn update_and_transform_features(&mut self, features: &[f64]) -> Vec<f64> {
let mut x = features.to_vec();
for preprocessor in &mut self.preprocessors {
x = preprocessor.update_and_transform(&x);
}
x
}
}
impl StreamingLearner for Pipeline {
fn train_one(&mut self, features: &[f64], target: f64, weight: f64) {
let x = self.update_and_transform_features(features);
let transformed_target = if let Some(tp) = self.target_preprocessor.as_mut() {
tp.fit_transform(target)
} else {
target
};
self.learner.train_one(&x, transformed_target, weight);
self.samples_seen += 1;
}
fn predict(&self, features: &[f64]) -> f64 {
let x = self.transform_features(features);
let raw = self.learner.predict(&x);
if let Some(tp) = self.target_preprocessor.as_ref() {
tp.inverse_transform(raw)
} else {
raw
}
}
fn n_samples_seen(&self) -> u64 {
self.samples_seen
}
fn reset(&mut self) {
for preprocessor in &mut self.preprocessors {
preprocessor.reset();
}
self.learner.reset();
if let Some(tp) = self.target_preprocessor.as_mut() {
tp.reset();
}
self.samples_seen = 0;
}
#[allow(deprecated)]
fn diagnostics_array(&self) -> [f64; 5] {
self.learner.diagnostics_array()
}
#[allow(deprecated)]
fn adjust_config(&mut self, lr_multiplier: f64, lambda_delta: f64) {
self.learner.adjust_config(lr_multiplier, lambda_delta);
}
#[allow(deprecated)]
fn apply_structural_change(&mut self, depth_delta: i32, steps_delta: i32) {
self.learner
.apply_structural_change(depth_delta, steps_delta);
}
#[allow(deprecated)]
fn replacement_count(&self) -> u64 {
self.learner.replacement_count()
}
}
impl fmt::Debug for Pipeline {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pipeline")
.field("n_preprocessors", &self.preprocessors.len())
.field(
"has_target_preprocessor",
&self.target_preprocessor.is_some(),
)
.field("samples_seen", &self.samples_seen)
.finish()
}
}
use std::fmt;
impl crate::automl::DiagnosticSource for Pipeline {
fn config_diagnostics(&self) -> Option<crate::automl::ConfigDiagnostics> {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::preprocessing::IncrementalNormalizer;
const EPS: f64 = 1e-6;
#[derive(Clone)]
struct ScalePreprocessor {
factor: f64,
dim: Option<usize>,
}
impl ScalePreprocessor {
fn new(factor: f64) -> Self {
Self { factor, dim: None }
}
}
impl StreamingPreprocessor for ScalePreprocessor {
fn update_and_transform(&mut self, features: &[f64]) -> Vec<f64> {
self.dim = Some(features.len());
features.iter().map(|&x| x * self.factor).collect()
}
fn transform(&self, features: &[f64]) -> Vec<f64> {
features.iter().map(|&x| x * self.factor).collect()
}
fn output_dim(&self) -> Option<usize> {
self.dim
}
fn reset(&mut self) {
self.dim = None;
}
}
struct MeanLearner {
samples: u64,
}
impl MeanLearner {
fn new() -> Self {
Self { samples: 0 }
}
}
impl StreamingLearner for MeanLearner {
fn train_one(&mut self, _features: &[f64], _target: f64, _weight: f64) {
self.samples += 1;
}
fn predict(&self, features: &[f64]) -> f64 {
if features.is_empty() {
return 0.0;
}
features.iter().sum::<f64>() / features.len() as f64
}
fn n_samples_seen(&self) -> u64 {
self.samples
}
fn reset(&mut self) {
self.samples = 0;
}
}
#[test]
fn builder_creates_pipeline() {
let p = Pipeline::builder()
.pipe(ScalePreprocessor::new(2.0))
.learner(MeanLearner::new());
assert_eq!(p.n_preprocessors(), 1);
assert_eq!(p.n_samples_seen(), 0);
}
#[test]
fn pipeline_trains_and_predicts() {
let mut p = Pipeline::builder()
.pipe(ScalePreprocessor::new(2.0))
.learner(MeanLearner::new());
p.train(&[1.0, 2.0, 3.0], 0.0);
assert_eq!(p.n_samples_seen(), 1);
let pred = p.predict(&[1.0, 2.0, 3.0]);
assert!((pred - 4.0).abs() < EPS, "pred = {}", pred);
}
#[test]
fn multi_preprocessor_chaining() {
let mut p = Pipeline::builder()
.pipe(ScalePreprocessor::new(2.0))
.pipe(ScalePreprocessor::new(3.0))
.learner(MeanLearner::new());
p.train(&[1.0, 1.0], 0.0);
let pred = p.predict(&[1.0, 1.0]);
assert!((pred - 6.0).abs() < EPS, "pred = {}", pred);
}
#[test]
fn predict_does_not_update_preprocessor() {
let mut p = Pipeline::builder()
.pipe(ScalePreprocessor::new(2.0))
.learner(MeanLearner::new());
assert_eq!(p.preprocessors[0].output_dim(), None);
let _ = p.predict(&[1.0, 2.0]);
assert_eq!(p.preprocessors[0].output_dim(), None);
assert_eq!(p.n_samples_seen(), 0);
p.train(&[1.0, 2.0], 0.0);
assert_eq!(p.preprocessors[0].output_dim(), Some(2));
assert_eq!(p.n_samples_seen(), 1);
}
#[test]
fn reset_clears_all_state() {
let mut p = Pipeline::builder()
.pipe(ScalePreprocessor::new(2.0))
.learner(MeanLearner::new());
p.train(&[1.0], 0.0);
p.train(&[2.0], 0.0);
assert_eq!(p.n_samples_seen(), 2);
assert_eq!(p.preprocessors[0].output_dim(), Some(1));
p.reset();
assert_eq!(p.n_samples_seen(), 0);
assert_eq!(p.preprocessors[0].output_dim(), None);
assert_eq!(p.learner().n_samples_seen(), 0);
}
#[test]
fn pipeline_as_trait_object() {
let p = Pipeline::builder()
.pipe(ScalePreprocessor::new(1.0))
.learner(MeanLearner::new());
let mut boxed: Box<dyn StreamingLearner> = Box::new(p);
boxed.train(&[5.0, 10.0], 0.0);
let pred = boxed.predict(&[5.0, 10.0]);
assert!((pred - 7.5).abs() < EPS);
assert_eq!(boxed.n_samples_seen(), 1);
}
#[test]
fn pipeline_with_normalizer() {
let mut p = Pipeline::builder()
.pipe(IncrementalNormalizer::new())
.learner(MeanLearner::new());
for i in 0..100 {
p.train(&[i as f64, (i as f64) * 2.0], 0.0);
}
let pred = p.predict(&[49.5, 99.0]);
assert!(
pred.abs() < 0.5,
"prediction on mean features should be near zero, got {}",
pred
);
}
#[test]
fn empty_preprocessor_pipeline() {
let mut p = Pipeline::builder().learner(MeanLearner::new());
assert_eq!(p.n_preprocessors(), 0);
p.train(&[10.0, 20.0], 0.0);
let pred = p.predict(&[10.0, 20.0]);
assert!((pred - 15.0).abs() < EPS);
}
#[test]
fn learner_boxed_constructor() {
let learner: Box<dyn StreamingLearner> = Box::new(MeanLearner::new());
let mut p = Pipeline::builder()
.pipe(ScalePreprocessor::new(2.0))
.learner_boxed(learner);
p.train(&[3.0], 0.0);
let pred = p.predict(&[3.0]);
assert!((pred - 6.0).abs() < EPS);
}
#[test]
fn pipeline_with_target_preprocessor_inverts_on_predict() {
use crate::preprocessing::{StreamingTargetPreprocessor, TargetScaler};
let mut scaler = TargetScaler::new();
let mut reference = TargetScaler::new();
for i in 0..100_i32 {
scaler.fit_transform(i as f64);
reference.fit_transform(i as f64);
}
let warm_mean = scaler.mean();
let mut pipeline = Pipeline::builder()
.target_preprocessor(scaler)
.learner_boxed(Box::new(MeanLearner::new()));
pipeline.train(&[1.0, 2.0], warm_mean);
reference.fit_transform(warm_mean);
let pred = pipeline.predict(&[1.0, 2.0]);
let expected = reference.inverse_transform(1.5);
assert!(
(pred - expected).abs() < 1e-9,
"predict should invert target scaler: expected {}, got {}",
expected,
pred
);
}
#[test]
fn pipeline_target_preprocessor_reset_clears_scaler() {
use crate::preprocessing::TargetScaler;
let mut pipeline = Pipeline::builder()
.target_preprocessor(TargetScaler::new())
.learner_boxed(Box::new(MeanLearner::new()));
for i in 0..10_i32 {
pipeline.train(&[1.0], i as f64);
}
assert_eq!(pipeline.n_samples_seen(), 10);
pipeline.reset();
assert_eq!(pipeline.n_samples_seen(), 0);
let pred = pipeline.predict(&[1.0]);
assert!(
pred.is_finite(),
"predict after reset should be finite, got {}",
pred
);
}
#[test]
fn make_pipeline_compiles_with_one_stage() {
let mut p = crate::make_pipeline!(MeanLearner::new());
assert_eq!(
p.n_preprocessors(),
0,
"no preprocessors in single-stage pipeline"
);
p.train(&[3.0, 7.0], 0.0);
let pred = p.predict(&[3.0, 7.0]);
assert!(
(pred - 5.0).abs() < EPS,
"single-stage pipeline pred = {pred}"
);
}
#[test]
fn make_pipeline_chains_three_stages() {
let mut p = crate::make_pipeline!(
ScalePreprocessor::new(2.0) =>
ScalePreprocessor::new(3.0) =>
MeanLearner::new()
);
assert_eq!(
p.n_preprocessors(),
2,
"two preprocessors in three-stage pipeline"
);
p.train(&[1.0, 1.0], 0.0);
let pred = p.predict(&[1.0, 1.0]);
assert!(
(pred - 6.0).abs() < EPS,
"three-stage make_pipeline! expected 6.0, got {pred}"
);
}
#[test]
fn make_pipeline_two_stages() {
let mut p = crate::make_pipeline!(
ScalePreprocessor::new(2.0) =>
MeanLearner::new()
);
assert_eq!(
p.n_preprocessors(),
1,
"one preprocessor in two-stage pipeline"
);
p.train(&[2.0, 4.0], 0.0);
let pred = p.predict(&[2.0, 4.0]);
assert!(
(pred - 6.0).abs() < EPS,
"two-stage make_pipeline! expected 6.0, got {pred}"
);
}
#[test]
fn make_pipeline_implements_streaming_learner() {
let mut p: Box<dyn StreamingLearner> =
Box::new(crate::make_pipeline!(ScalePreprocessor::new(1.0) => MeanLearner::new()));
p.train(&[5.0, 10.0], 0.0);
let pred = p.predict(&[5.0, 10.0]);
assert!(
(pred - 7.5).abs() < EPS,
"boxed make_pipeline! pred = {pred}"
);
}
}