pub fn stream_regression(
total_samples: usize,
n_features: usize,
config: StreamConfig,
) -> Result<StreamingIterator>
Expand description
Stream synthetic regression data
Examples found in repository?
examples/datasets_streaming_demo.rs (line 240)
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221 println!("🔄 STREAM TRANSFORMATIONS");
222 println!("{}", "-".repeat(40));
223
224 // Create a transformer pipeline
225 let transformer = StreamTransformer::new()
226 .add_standard_scaling()
227 .add_missing_value_imputation();
228
229 println!("Created transformation pipeline:");
230 println!(" 1. Standard scaling (z-score normalization)");
231 println!(" 2. Missing value imputation");
232
233 let config = StreamConfig {
234 chunk_size: 2000,
235 buffer_size: 2,
236 max_chunks: Some(5),
237 ..Default::default()
238 };
239
240 let mut stream = stream_regression(10_000, 15, config)?;
241 let mut transformed_chunks = 0;
242
243 println!("\nProcessing and transforming chunks...");
244
245 while let Some(mut chunk) = stream.next_chunk()? {
246 println!(" Processing chunk {}", chunk.chunk_index + 1);
247
248 // Show statistics before transformation
249 let data_mean_before = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
250 let data_std_before = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
251
252 println!(
253 " Before: mean = {:.3}, std = {:.3}",
254 data_mean_before[0], data_std_before[0]
255 );
256
257 // Apply transformations
258 transformer.transform_chunk(&mut chunk)?;
259
260 // Show statistics after transformation
261 let data_mean_after = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
262 let data_std_after = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
263
264 println!(
265 " After: mean = {:.3}, std = {:.3}",
266 data_mean_after[0], data_std_after[0]
267 );
268
269 transformed_chunks += 1;
270
271 if chunk.is_last {
272 break;
273 }
274 }
275
276 println!("\nTransformation Summary:");
277 println!(" Chunks processed: {transformed_chunks}");
278 println!(" Each chunk was transformed independently");
279 println!(" Memory-efficient: only one chunk in memory at a time");
280
281 println!();
282 Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("⚡ PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 // Create a simple processor that computes statistics
304 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 // Define a processing function
307 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 // Compute basic statistics
314 let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 // Simulate some computation time
323 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 // For demonstration, we'll process chunks sequentially with timing
334 // In a real implementation, you'd use the processor.process_parallel method
335 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 // Process chunk
344 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 // Aggregate statistics
373 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390 println!("📊 PERFORMANCE COMPARISON");
391 println!("{}", "-".repeat(40));
392
393 let dataset_sizes = vec![10_000, 50_000, 100_000];
394 let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396 println!("Comparing streaming performance across different configurations:");
397 println!();
398
399 for &datasetsize in &dataset_sizes {
400 println!("Dataset size: {datasetsize} samples");
401
402 for &chunksize in &chunk_sizes {
403 let config = StreamConfig {
404 chunk_size: chunksize,
405 buffer_size: 3,
406 num_workers: 2,
407 max_chunks: Some(datasetsize / chunksize),
408 ..Default::default()
409 };
410
411 let start_time = Instant::now();
412 let mut stream = stream_regression(datasetsize, 20, config)?;
413
414 let mut processed_samples = 0;
415 let mut processed_chunks = 0;
416
417 while let Some(chunk) = stream.next_chunk()? {
418 processed_samples += chunk.n_samples();
419 processed_chunks += 1;
420
421 // Simulate minimal processing
422 let _stats = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
423
424 if chunk.is_last || processed_samples >= datasetsize {
425 break;
426 }
427 }
428
429 let duration = start_time.elapsed();
430 let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432 println!(
433 " Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434 chunksize,
435 duration.as_secs_f64(),
436 throughput,
437 processed_chunks
438 );
439 }
440 println!();
441 }
442
443 println!("Performance Insights:");
444 println!(" • Larger chunks = fewer iterations, better throughput");
445 println!(" • Smaller chunks = lower memory usage, more responsive");
446 println!(" • Optimal chunk size depends on memory constraints and processing complexity");
447
448 println!();
449 Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454 println!("🌍 REAL-WORLD STREAMING SCENARIOS");
455 println!("{}", "-".repeat(40));
456
457 // Scenario 1: Training on large dataset with limited memory
458 println!("Scenario 1: Large dataset training with memory constraints");
459 simulate_training_scenario()?;
460
461 // Scenario 2: Data preprocessing pipeline
462 println!("\nScenario 2: Data preprocessing pipeline");
463 simulate_preprocessing_pipeline()?;
464
465 // Scenario 3: Model evaluation on large test set
466 println!("\nScenario 3: Model evaluation on large test set");
467 simulate_model_evaluation()?;
468
469 println!();
470 Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475 println!(" • Dataset: 500K samples, 100 features");
476 println!(" • Memory limit: 200MB");
477 println!(" • Goal: Train incrementally using mini-batches");
478
479 let config = StreamConfig {
480 chunk_size: 5_000, // Mini-batch size
481 buffer_size: 2, // Keep memory low
482 memory_limit_mb: Some(200),
483 max_chunks: Some(10), // Simulate partial processing
484 ..Default::default()
485 };
486
487 let mut stream = stream_classification(500_000, 100, 10, config)?;
488 let mut total_batches = 0;
489 let mut total_samples = 0;
490
491 let start_time = Instant::now();
492
493 while let Some(chunk) = stream.next_chunk()? {
494 // Simulate training on mini-batch
495 let batchsize = chunk.n_samples();
496
497 // Simulate gradient computation time
498 std::thread::sleep(std::time::Duration::from_millis(20));
499
500 total_batches += 1;
501 total_samples += batchsize;
502
503 if total_batches % 3 == 0 {
504 println!(" Processed {total_batches} batches ({total_samples} samples)");
505 }
506
507 if chunk.is_last {
508 break;
509 }
510 }
511
512 let duration = start_time.elapsed();
513 println!(
514 " ✅ Training simulation: {} batches, {:.2}s",
515 total_batches,
516 duration.as_secs_f64()
517 );
518
519 Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" • Raw data → Clean → Scale → Feature selection");
525 println!(" • Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 // Step 1: Clean data (remove outliers, handle missing values)
543 transformer.transform_chunk(&mut chunk)?;
544
545 // Step 2: Feature selection (simulate by keeping first 30 features)
546 let selecteddata = chunk
547 .data
548 .slice(scirs2_core::ndarray::s![.., ..30])
549 .to_owned();
550
551 processed_chunks += 1;
552 println!(
553 " Chunk {}: {} → {} features",
554 processed_chunks,
555 chunk.n_features(),
556 selecteddata.ncols()
557 );
558
559 if chunk.is_last {
560 break;
561 }
562 }
563
564 println!(" ✅ Preprocessing pipeline: {processed_chunks} chunks processed");
565
566 Ok(())
567}