advanced_patterns/
advanced_patterns.rs

1//! # Advanced Iterator Patterns - Complex Data Processing Workflows
2//!
3//! ## Overview
4//!
5//! This example demonstrates sophisticated iterator patterns and complex data processing
6//! workflows using Train Station's tensor iterator system. It showcases advanced
7//! functional programming techniques, data transformation pipelines, and real-world
8//! processing scenarios.
9//!
10//! ## Learning Objectives
11//!
12//! - Master complex iterator chains and transformations
13//! - Learn advanced functional programming patterns
14//! - Understand data processing pipeline design
15//! - Explore real-world tensor processing scenarios
16//!
17//! ## Prerequisites
18//!
19//! - Understanding of basic iterator concepts (see element_iteration.rs)
20//! - Familiarity with functional programming patterns
21//! - Knowledge of tensor operations and gradient tracking
22//! - Experience with data processing workflows
23//!
24//! ## Key Concepts Demonstrated
25//!
26//! - **Pipeline Processing**: Multi-stage data transformation workflows
27//! - **Conditional Processing**: Dynamic filtering and transformation based on data
28//! - **Batch Operations**: Efficient processing of large datasets
29//! - **Error Handling**: Robust processing with fallback strategies
30//! - **Performance Optimization**: Memory-efficient processing patterns
31//! - **GradTrack Integration**: Build complex graphs with iterator-based views
32//!
33//! When to choose an approach:
34//! - Prefer per-slice iteration via `iter()` when your logic is row/outer-dimension-oriented.
35//!   Use `collect_shape([..])` to preserve shape while composing transforms.
36//! - For element-wise scalar transforms across entire tensors, use `iter_flat()` with
37//!   `collect_shape` and rely on GradTrack to wire reshapes correctly.
38//! - For big pipelines, chunk flattened tensors with `chunks()` to improve cache behavior.
39//! - For read-only inference, `with_no_grad` and value streaming provide the fastest route.
40//!
41//! ## Example Code Structure
42//!
43//! 1. **Data Pipeline Processing**: Multi-stage transformation workflows
44//! 2. **Conditional Processing**: Dynamic filtering and transformation
45//! 3. **Batch Operations**: Efficient large-scale processing
46//! 4. **Real-world Scenarios**: Practical data processing applications
47//!
48//! ## Expected Output
49//!
50//! The example will demonstrate complex data processing workflows, showing
51//! how to build sophisticated transformation pipelines using iterator patterns
52//! while maintaining performance and gradient tracking capabilities.
53//!
54//! ## Performance Notes
55//!
56//! - Pipeline processing minimizes memory allocations
57//! - Conditional processing avoids unnecessary computations
58//! - Batch operations leverage SIMD optimizations
59//! - Lazy evaluation patterns improve memory efficiency
60
61use train_station::{
62    gradtrack::with_no_grad,
63    tensor::{TensorCollectExt, ValuesCollectExt},
64    Tensor,
65};
66
67/// Main example function demonstrating advanced iterator patterns
68///
69/// This function showcases sophisticated data processing workflows
70/// using complex iterator chains and transformation pipelines.
71fn main() -> Result<(), Box<dyn std::error::Error>> {
72    println!("Starting Advanced Iterator Patterns Example");
73
74    demonstrate_data_pipeline()?;
75    demonstrate_conditional_processing()?;
76    demonstrate_batch_operations()?;
77    demonstrate_real_world_scenarios()?;
78
79    println!("Advanced Iterator Patterns Example completed successfully!");
80    Ok(())
81}
82
83/// Demonstrate multi-stage data processing pipeline
84///
85/// Shows how to build sophisticated transformation workflows using
86/// iterator chains for data preprocessing and feature engineering.
87fn demonstrate_data_pipeline() -> Result<(), Box<dyn std::error::Error>> {
88    println!("\n--- Data Processing Pipeline ---");
89
90    // Simulate raw sensor data with noise
91    let raw_data: Vec<f32> = (0..20)
92        .map(|i| {
93            let base = i as f32 * 0.5;
94            let noise = (i % 3) as f32 * 0.1;
95            base + noise
96        })
97        .collect();
98
99    let tensor = Tensor::from_slice(&raw_data, vec![20])?;
100    println!("Raw sensor data: {:?}", tensor.data());
101
102    // Multi-stage processing pipeline
103    println!("\nProcessing pipeline:");
104    println!("1. Normalize data (z-score)");
105    println!("2. Apply smoothing filter");
106    println!("3. Detect outliers");
107    println!("4. Apply feature scaling");
108
109    // Stage 1: Normalization
110    let mean = tensor.mean().value();
111    let std = tensor.std().value();
112    let normalized: Tensor = tensor
113        .iter()
114        .map(|elem| elem.sub_scalar(mean).div_scalar(std))
115        .collect();
116    println!(
117        "  Normalized (mean={:.3}, std={:.3}): {:?}",
118        mean,
119        std,
120        normalized.data()
121    );
122
123    // Stage 2: Smoothing (simple moving average)
124    let smoothed: Tensor = normalized
125        .iter()
126        .enumerate()
127        .map(|(i, elem)| {
128            if i == 0 || i == normalized.size() - 1 {
129                elem.clone()
130            } else {
131                // Simple 3-point average
132                let prev = normalized.element_view(i - 1);
133                let next = normalized.element_view(i + 1);
134                elem.add_tensor(&prev).add_tensor(&next).div_scalar(3.0)
135            }
136        })
137        .collect();
138    println!("  Smoothed: {:?}", smoothed.data());
139
140    // Stage 3: Outlier detection and removal
141    let outlier_threshold = 2.0;
142    let cleaned: Tensor = smoothed
143        .iter()
144        .filter(|elem| elem.value().abs() < outlier_threshold)
145        .collect();
146    println!(
147        "  Outliers removed (threshold={}): {:?}",
148        outlier_threshold,
149        cleaned.data()
150    );
151
152    // Stage 4: Feature scaling to [0, 1] range
153    let min_val = cleaned
154        .iter()
155        .map(|e| e.value())
156        .fold(f32::INFINITY, f32::min);
157    let max_val = cleaned
158        .iter()
159        .map(|e| e.value())
160        .fold(f32::NEG_INFINITY, f32::max);
161    let scaled: Tensor = cleaned
162        .iter()
163        .map(|elem| elem.sub_scalar(min_val).div_scalar(max_val - min_val))
164        .collect();
165    println!("  Scaled to [0,1]: {:?}", scaled.data());
166
167    Ok(())
168}
169
170/// Demonstrate conditional processing patterns
171///
172/// Shows how to implement dynamic filtering and transformation
173/// based on data characteristics and conditions.
174fn demonstrate_conditional_processing() -> Result<(), Box<dyn std::error::Error>> {
175    println!("\n--- Conditional Processing ---");
176
177    // Create data with mixed characteristics
178    let data = vec![1.0, -2.0, 3.0, -4.0, 5.0, -6.0, 7.0, -8.0, 9.0, -10.0];
179    let tensor = Tensor::from_slice(&data, vec![10])?;
180    println!("Input data: {:?}", tensor.data());
181
182    // Conditional transformation based on sign
183    println!("\nConditional transformation (positive/negative handling):");
184    let processed: Tensor = tensor
185        .iter()
186        .map(|elem| {
187            let val = elem.value();
188            if val > 0.0 {
189                elem.pow_scalar(2.0) // Square positive values
190            } else {
191                elem.mul_scalar(-1.0).sqrt() // Square root of absolute negative values
192            }
193        })
194        .collect();
195    println!("  Processed: {:?}", processed.data());
196
197    // Adaptive filtering based on local statistics
198    println!("\nAdaptive filtering (remove values > 2 std from local mean):");
199    let window_size = 3;
200    let adaptive_filtered: Tensor = tensor
201        .iter()
202        .enumerate()
203        .filter(|(i, elem)| {
204            let start = i.saturating_sub(window_size / 2);
205            let end = (i + window_size / 2 + 1).min(tensor.size());
206
207            // Calculate local mean and std
208            let local_values: Vec<f32> = (start..end)
209                .map(|j| tensor.element_view(j).value())
210                .collect();
211
212            let local_mean = local_values.iter().sum::<f32>() / local_values.len() as f32;
213            let local_variance = local_values
214                .iter()
215                .map(|v| (v - local_mean).powi(2))
216                .sum::<f32>()
217                / local_values.len() as f32;
218            let local_std = local_variance.sqrt();
219
220            let threshold = local_mean + 2.0 * local_std;
221            elem.value() <= threshold
222        })
223        .map(|(_, elem)| elem)
224        .collect();
225    println!("  Adaptive filtered: {:?}", adaptive_filtered.data());
226
227    // Multi-condition processing
228    println!("\nMulti-condition processing:");
229    let multi_processed: Tensor = tensor
230        .iter()
231        .map(|elem| {
232            let val = elem.value();
233            match () {
234                _ if val > 5.0 => elem.mul_scalar(2.0), // Double large values
235                _ if val < -5.0 => elem.div_scalar(2.0), // Halve small values
236                _ if val.abs() < 2.0 => elem.add_scalar(1.0), // Add 1 to small values
237                _ => elem.clone(),                      // Keep others unchanged
238            }
239        })
240        .collect();
241    println!("  Multi-condition: {:?}", multi_processed.data());
242
243    Ok(())
244}
245
246/// Demonstrate batch processing operations
247///
248/// Shows efficient processing of large datasets using iterator
249/// patterns and batch operations for performance optimization.
250fn demonstrate_batch_operations() -> Result<(), Box<dyn std::error::Error>> {
251    println!("\n--- Batch Operations ---");
252
253    // Create a larger dataset for batch processing
254    let size = 100;
255    let data: Vec<f32> = (0..size)
256        .map(|i| {
257            let x = i as f32 / size as f32;
258            x * x + 0.1 * (i % 7) as f32 // Quadratic with some noise
259        })
260        .collect();
261
262    let tensor = Tensor::from_slice(&data, vec![size])?;
263    println!("Dataset size: {}", tensor.size());
264
265    // Batch processing with windowing (iterator views)
266    println!("\nBatch processing with sliding windows:");
267    let batch_size = 10;
268    let batches: Vec<Tensor> = tensor
269        .iter()
270        .collect::<Vec<_>>()
271        .chunks(batch_size)
272        .map(|chunk| {
273            // Process each batch independently
274            chunk
275                .iter()
276                .map(|elem| elem.pow_scalar(2.0).add_scalar(1.0))
277                .collect()
278        })
279        .collect();
280
281    println!(
282        "  Processed {} batches of size {}",
283        batches.len(),
284        batch_size
285    );
286    for (i, batch) in batches.iter().enumerate() {
287        println!(
288            "    Batch {}: mean={:.3}, std={:.3}",
289            i,
290            batch.mean().value(),
291            batch.std().value()
292        );
293    }
294
295    // Parallel-like processing with stride
296    println!("\nStrided processing (every nth element):");
297    let stride = 5;
298    let strided: Tensor = tensor
299        .iter()
300        .enumerate()
301        .filter(|(i, _)| i % stride == 0)
302        .map(|(_, elem)| elem)
303        .collect();
304    println!("  Strided (every {}th): {:?}", stride, strided.data());
305
306    // Hierarchical processing
307    println!("\nHierarchical processing (coarse to fine):");
308    let coarse: Tensor = tensor
309        .iter()
310        .enumerate()
311        .filter(|(i, _)| i % 4 == 0) // Take every 4th element
312        .map(|(_, elem)| elem)
313        .collect();
314
315    let fine: Tensor = tensor
316        .iter()
317        .enumerate()
318        .filter(|(i, _)| i % 4 != 0) // Take the rest
319        .map(|(_, elem)| elem)
320        .collect();
321
322    println!("  Coarse (every 4th): {:?}", coarse.data());
323    println!("  Fine (rest): {:?}", fine.data());
324
325    // Combine coarse and fine with different processing
326    let combined: Tensor = coarse
327        .iter()
328        .map(|elem| elem.mul_scalar(2.0)) // Scale coarse
329        .chain(fine.iter().map(|elem| elem.div_scalar(2.0))) // Scale fine
330        .collect();
331    println!("  Combined: {:?}", combined.data());
332
333    Ok(())
334}
335
336/// Demonstrate real-world processing scenarios
337///
338/// Shows practical applications of iterator patterns for
339/// common data processing tasks in machine learning and analytics.
340fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
341    println!("\n--- Real-world Scenarios ---");
342
343    // Scenario 1: Time series analysis
344    println!("\nScenario 1: Time Series Analysis");
345    let time_series: Vec<f32> = (0..24)
346        .map(|hour| {
347            let base = 20.0 + 10.0 * (hour as f32 * std::f32::consts::PI / 12.0).sin();
348            base + (hour % 3) as f32 * 2.0 // Add some noise
349        })
350        .collect();
351
352    let series = Tensor::from_slice(&time_series, vec![24])?;
353    println!("  Time series (24 hours): {:?}", series.data());
354
355    // Calculate moving average with view-based iteration
356    let window_size = 3;
357    let moving_avg: Tensor = series
358        .iter()
359        .enumerate()
360        .map(|(i, _)| {
361            let start = i.saturating_sub(window_size / 2);
362            let end = (i + window_size / 2 + 1).min(series.size());
363            let window = series.iter_range(start, end);
364            window.fold(0.0, |acc, elem| acc + elem.value()) / (end - start) as f32
365        })
366        .map(|val| Tensor::from_slice(&[val], vec![1]).unwrap())
367        .collect();
368    println!(
369        "  Moving average (window={}): {:?}",
370        window_size,
371        moving_avg.data()
372    );
373
374    // Inference pipeline with NoGrad + streaming
375    println!("\nInference pipeline (NoGrad + streaming)");
376    let features = Tensor::from_slice(
377        &(0..48).map(|i| i as f32 * 0.125).collect::<Vec<_>>(),
378        vec![6, 8],
379    )?;
380    let fast = with_no_grad(|| {
381        // Stream values directly, apply light affine, and collect back to same shape
382        features
383            .data()
384            .iter()
385            .copied()
386            .map(|x| 0.75 * x + 0.1)
387            .collect_shape(vec![6, 8])
388    });
389    println!(
390        "  NoGrad streamed transform shape: {:?}",
391        fast.shape().dims()
392    );
393
394    // Row-wise iteration with shape-preserving collection (GradTrack-friendly)
395    let per_row: Tensor = features
396        .iter()
397        .map(|row| row.mul_scalar(0.5).add_scalar(2.0))
398        .collect_shape(vec![6, 8]);
399    println!("  Row-wise mapped shape: {:?}", per_row.shape().dims());
400
401    // Scenario 2: Feature engineering
402    println!("\nScenario 2: Feature Engineering");
403    let features = Tensor::from_slice(&[1.0, 2.0, 3.0, 4.0, 5.0], vec![5])?;
404    println!("  Original features: {:?}", features.data());
405
406    // Create polynomial features
407    let poly_features: Tensor = features
408        .iter()
409        .flat_map(|elem| {
410            vec![
411                elem.clone(),         // x^1
412                elem.pow_scalar(2.0), // x^2
413                elem.pow_scalar(3.0), // x^3
414            ]
415        })
416        .collect();
417    println!(
418        "  Polynomial features (x, x^2, x^3): {:?}",
419        poly_features.data()
420    );
421
422    // Scenario 3: Data augmentation
423    println!("\nScenario 3: Data Augmentation");
424    let original = Tensor::from_slice(&[1.0, 2.0, 3.0], vec![3])?;
425    println!("  Original data: {:?}", original.data());
426
427    // Augment with noise and scaling
428    let augmented: Tensor = original
429        .iter()
430        .flat_map(|elem| {
431            vec![
432                elem.clone(),         // Original
433                elem.add_scalar(0.1), // Add noise
434                elem.sub_scalar(0.1), // Subtract noise
435                elem.mul_scalar(1.1), // Scale up
436                elem.mul_scalar(0.9), // Scale down
437            ]
438        })
439        .collect();
440    println!("  Augmented data: {:?}", augmented.data());
441
442    // Scenario 4: Statistical analysis
443    println!("\nScenario 4: Statistical Analysis");
444    let sample_data = Tensor::from_slice(&[1.1, 2.3, 1.8, 2.1, 1.9, 2.0, 1.7, 2.2], vec![8])?;
445    println!("  Sample data: {:?}", sample_data.data());
446
447    // Calculate various statistics
448    let mean = sample_data.mean().value();
449    let std = sample_data.std().value();
450    let min = sample_data
451        .iter()
452        .map(|e| e.value())
453        .fold(f32::INFINITY, f32::min);
454    let max = sample_data
455        .iter()
456        .map(|e| e.value())
457        .fold(f32::NEG_INFINITY, f32::max);
458
459    // Z-score normalization
460    let z_scores: Tensor = sample_data
461        .iter()
462        .map(|elem| elem.sub_scalar(mean).div_scalar(std))
463        .collect();
464
465    println!(
466        "  Statistics: mean={:.3}, std={:.3}, min={:.3}, max={:.3}",
467        mean, std, min, max
468    );
469    println!("  Z-scores: {:?}", z_scores.data());
470
471    Ok(())
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477
478    /// Test data pipeline processing
479    #[test]
480    fn test_data_pipeline() {
481        let tensor = Tensor::from_slice(&[1.0, 2.0, 3.0, 4.0], vec![4]).unwrap();
482        let normalized: Tensor = tensor.iter().map(|elem| elem.mul_scalar(2.0)).collect();
483
484        assert_eq!(normalized.data(), &[2.0, 4.0, 6.0, 8.0]);
485    }
486
487    /// Test conditional processing
488    #[test]
489    fn test_conditional_processing() {
490        let tensor = Tensor::from_slice(&[1.0, -2.0, 3.0], vec![3]).unwrap();
491        let processed: Tensor = tensor
492            .iter()
493            .map(|elem| {
494                if elem.value() > 0.0 {
495                    elem.mul_scalar(2.0)
496                } else {
497                    elem.abs()
498                }
499            })
500            .collect();
501
502        assert_eq!(processed.data(), &[2.0, 2.0, 6.0]);
503    }
504
505    /// Test batch operations
506    #[test]
507    fn test_batch_operations() {
508        let tensor = Tensor::from_slice(&[1.0, 2.0, 3.0, 4.0], vec![4]).unwrap();
509        let strided: Tensor = tensor
510            .iter()
511            .enumerate()
512            .filter(|(i, _)| i % 2 == 0)
513            .map(|(_, elem)| elem)
514            .collect();
515
516        assert_eq!(strided.data(), &[1.0, 3.0]);
517    }
518}