stream_regression

Function stream_regression 

Source
pub fn stream_regression(
    total_samples: usize,
    n_features: usize,
    config: StreamConfig,
) -> Result<StreamingIterator>
Expand description

Stream synthetic regression data

Examples found in repository?
examples/datasets_streaming_demo.rs (line 240)
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221    println!("🔄 STREAM TRANSFORMATIONS");
222    println!("{}", "-".repeat(40));
223
224    // Create a transformer pipeline
225    let transformer = StreamTransformer::new()
226        .add_standard_scaling()
227        .add_missing_value_imputation();
228
229    println!("Created transformation pipeline:");
230    println!("  1. Standard scaling (z-score normalization)");
231    println!("  2. Missing value imputation");
232
233    let config = StreamConfig {
234        chunk_size: 2000,
235        buffer_size: 2,
236        max_chunks: Some(5),
237        ..Default::default()
238    };
239
240    let mut stream = stream_regression(10_000, 15, config)?;
241    let mut transformed_chunks = 0;
242
243    println!("\nProcessing and transforming chunks...");
244
245    while let Some(mut chunk) = stream.next_chunk()? {
246        println!("  Processing chunk {}", chunk.chunk_index + 1);
247
248        // Show statistics before transformation
249        let data_mean_before = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
250        let data_std_before = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
251
252        println!(
253            "    Before: mean = {:.3}, std = {:.3}",
254            data_mean_before[0], data_std_before[0]
255        );
256
257        // Apply transformations
258        transformer.transform_chunk(&mut chunk)?;
259
260        // Show statistics after transformation
261        let data_mean_after = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
262        let data_std_after = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
263
264        println!(
265            "    After:  mean = {:.3}, std = {:.3}",
266            data_mean_after[0], data_std_after[0]
267        );
268
269        transformed_chunks += 1;
270
271        if chunk.is_last {
272            break;
273        }
274    }
275
276    println!("\nTransformation Summary:");
277    println!("  Chunks processed: {transformed_chunks}");
278    println!("  Each chunk was transformed independently");
279    println!("  Memory-efficient: only one chunk in memory at a time");
280
281    println!();
282    Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287    println!("⚡ PARALLEL STREAM PROCESSING");
288    println!("{}", "-".repeat(40));
289
290    let config = StreamConfig {
291        chunk_size: 1500,
292        buffer_size: 4,
293        num_workers: 4,
294        max_chunks: Some(8),
295        ..Default::default()
296    };
297
298    println!("Parallel processing configuration:");
299    println!("  Workers: {}", config.num_workers);
300    println!("  Chunk size: {}", config.chunk_size);
301    println!("  Buffer size: {}", config.buffer_size);
302
303    // Create a simple processor that computes statistics
304    let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306    // Define a processing function
307    let compute_stats = |chunk: DataChunk| -> Result<
308        HashMap<String, f64>,
309        Box<dyn std::error::Error + Send + Sync>,
310    > {
311        let mut stats = HashMap::new();
312
313        // Compute basic statistics
314        let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315        let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317        stats.insert("mean_feature_0".to_string(), mean[0]);
318        stats.insert("std_feature_0".to_string(), std[0]);
319        stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320        stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322        // Simulate some computation time
323        std::thread::sleep(std::time::Duration::from_millis(100));
324
325        Ok(stats)
326    };
327
328    println!("\nProcessing stream with parallel workers...");
329    let start_time = Instant::now();
330
331    let stream = stream_classification(12_000, 10, 3, config)?;
332
333    // For demonstration, we'll process chunks sequentially with timing
334    // In a real implementation, you'd use the processor.process_parallel method
335    let mut stream_iter = stream;
336    let mut chunk_results = Vec::new();
337
338    while let Some(chunk) = stream_iter.next_chunk()? {
339        let chunk_start = Instant::now();
340        let chunk_id = chunk.chunk_index;
341        let chunk_samples = chunk.n_samples();
342
343        // Process chunk
344        let stats = compute_stats(chunk)
345            .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346        let chunk_time = chunk_start.elapsed();
347
348        println!(
349            "  Chunk {}: {} samples, {:.2}ms",
350            chunk_id + 1,
351            chunk_samples,
352            chunk_time.as_millis()
353        );
354
355        chunk_results.push(stats);
356
357        if chunk_results.len() >= 8 {
358            break;
359        }
360    }
361
362    let total_time = start_time.elapsed();
363
364    println!("\nParallel Processing Results:");
365    println!("  Total chunks: {}", chunk_results.len());
366    println!("  Total time: {:.2}s", total_time.as_secs_f64());
367    println!(
368        "  Average time per chunk: {:.2}ms",
369        total_time.as_millis() as f64 / chunk_results.len() as f64
370    );
371
372    // Aggregate statistics
373    let total_samples: f64 = chunk_results
374        .iter()
375        .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376        .sum();
377
378    println!("  Total samples processed: {total_samples}");
379    println!(
380        "  Throughput: {:.1} samples/s",
381        total_samples / total_time.as_secs_f64()
382    );
383
384    println!();
385    Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390    println!("📊 PERFORMANCE COMPARISON");
391    println!("{}", "-".repeat(40));
392
393    let dataset_sizes = vec![10_000, 50_000, 100_000];
394    let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396    println!("Comparing streaming performance across different configurations:");
397    println!();
398
399    for &datasetsize in &dataset_sizes {
400        println!("Dataset size: {datasetsize} samples");
401
402        for &chunksize in &chunk_sizes {
403            let config = StreamConfig {
404                chunk_size: chunksize,
405                buffer_size: 3,
406                num_workers: 2,
407                max_chunks: Some(datasetsize / chunksize),
408                ..Default::default()
409            };
410
411            let start_time = Instant::now();
412            let mut stream = stream_regression(datasetsize, 20, config)?;
413
414            let mut processed_samples = 0;
415            let mut processed_chunks = 0;
416
417            while let Some(chunk) = stream.next_chunk()? {
418                processed_samples += chunk.n_samples();
419                processed_chunks += 1;
420
421                // Simulate minimal processing
422                let _stats = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
423
424                if chunk.is_last || processed_samples >= datasetsize {
425                    break;
426                }
427            }
428
429            let duration = start_time.elapsed();
430            let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432            println!(
433                "  Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434                chunksize,
435                duration.as_secs_f64(),
436                throughput,
437                processed_chunks
438            );
439        }
440        println!();
441    }
442
443    println!("Performance Insights:");
444    println!("  • Larger chunks = fewer iterations, better throughput");
445    println!("  • Smaller chunks = lower memory usage, more responsive");
446    println!("  • Optimal chunk size depends on memory constraints and processing complexity");
447
448    println!();
449    Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454    println!("🌍 REAL-WORLD STREAMING SCENARIOS");
455    println!("{}", "-".repeat(40));
456
457    // Scenario 1: Training on large dataset with limited memory
458    println!("Scenario 1: Large dataset training with memory constraints");
459    simulate_training_scenario()?;
460
461    // Scenario 2: Data preprocessing pipeline
462    println!("\nScenario 2: Data preprocessing pipeline");
463    simulate_preprocessing_pipeline()?;
464
465    // Scenario 3: Model evaluation on large test set
466    println!("\nScenario 3: Model evaluation on large test set");
467    simulate_model_evaluation()?;
468
469    println!();
470    Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475    println!("  • Dataset: 500K samples, 100 features");
476    println!("  • Memory limit: 200MB");
477    println!("  • Goal: Train incrementally using mini-batches");
478
479    let config = StreamConfig {
480        chunk_size: 5_000, // Mini-batch size
481        buffer_size: 2,    // Keep memory low
482        memory_limit_mb: Some(200),
483        max_chunks: Some(10), // Simulate partial processing
484        ..Default::default()
485    };
486
487    let mut stream = stream_classification(500_000, 100, 10, config)?;
488    let mut total_batches = 0;
489    let mut total_samples = 0;
490
491    let start_time = Instant::now();
492
493    while let Some(chunk) = stream.next_chunk()? {
494        // Simulate training on mini-batch
495        let batchsize = chunk.n_samples();
496
497        // Simulate gradient computation time
498        std::thread::sleep(std::time::Duration::from_millis(20));
499
500        total_batches += 1;
501        total_samples += batchsize;
502
503        if total_batches % 3 == 0 {
504            println!("    Processed {total_batches} batches ({total_samples} samples)");
505        }
506
507        if chunk.is_last {
508            break;
509        }
510    }
511
512    let duration = start_time.elapsed();
513    println!(
514        "  ✅ Training simulation: {} batches, {:.2}s",
515        total_batches,
516        duration.as_secs_f64()
517    );
518
519    Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524    println!("  • Raw data → Clean → Scale → Feature selection");
525    println!("  • Process 200K samples in chunks");
526
527    let config = StreamConfig {
528        chunk_size: 8_000,
529        buffer_size: 3,
530        max_chunks: Some(5),
531        ..Default::default()
532    };
533
534    let transformer = StreamTransformer::new()
535        .add_missing_value_imputation()
536        .add_standard_scaling();
537
538    let mut stream = stream_regression(200_000, 50, config)?;
539    let mut processed_chunks = 0;
540
541    while let Some(mut chunk) = stream.next_chunk()? {
542        // Step 1: Clean data (remove outliers, handle missing values)
543        transformer.transform_chunk(&mut chunk)?;
544
545        // Step 2: Feature selection (simulate by keeping first 30 features)
546        let selecteddata = chunk
547            .data
548            .slice(scirs2_core::ndarray::s![.., ..30])
549            .to_owned();
550
551        processed_chunks += 1;
552        println!(
553            "    Chunk {}: {} → {} features",
554            processed_chunks,
555            chunk.n_features(),
556            selecteddata.ncols()
557        );
558
559        if chunk.is_last {
560            break;
561        }
562    }
563
564    println!("  ✅ Preprocessing pipeline: {processed_chunks} chunks processed");
565
566    Ok(())
567}