pub struct StreamTransformer { /* private fields */ }
Expand description
Memory-efficient data transformer for streaming
Implementations§
Source§impl StreamTransformer
impl StreamTransformer
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a new stream transformer
Examples found in repository?
examples/datasets_streaming_demo.rs (line 225)
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221 println!("🔄 STREAM TRANSFORMATIONS");
222 println!("{}", "-".repeat(40));
223
224 // Create a transformer pipeline
225 let transformer = StreamTransformer::new()
226 .add_standard_scaling()
227 .add_missing_value_imputation();
228
229 println!("Created transformation pipeline:");
230 println!(" 1. Standard scaling (z-score normalization)");
231 println!(" 2. Missing value imputation");
232
233 let config = StreamConfig {
234 chunk_size: 2000,
235 buffer_size: 2,
236 max_chunks: Some(5),
237 ..Default::default()
238 };
239
240 let mut stream = stream_regression(10_000, 15, config)?;
241 let mut transformed_chunks = 0;
242
243 println!("\nProcessing and transforming chunks...");
244
245 while let Some(mut chunk) = stream.next_chunk()? {
246 println!(" Processing chunk {}", chunk.chunk_index + 1);
247
248 // Show statistics before transformation
249 let data_mean_before = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
250 let data_std_before = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
251
252 println!(
253 " Before: mean = {:.3}, std = {:.3}",
254 data_mean_before[0], data_std_before[0]
255 );
256
257 // Apply transformations
258 transformer.transform_chunk(&mut chunk)?;
259
260 // Show statistics after transformation
261 let data_mean_after = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
262 let data_std_after = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
263
264 println!(
265 " After: mean = {:.3}, std = {:.3}",
266 data_mean_after[0], data_std_after[0]
267 );
268
269 transformed_chunks += 1;
270
271 if chunk.is_last {
272 break;
273 }
274 }
275
276 println!("\nTransformation Summary:");
277 println!(" Chunks processed: {transformed_chunks}");
278 println!(" Each chunk was transformed independently");
279 println!(" Memory-efficient: only one chunk in memory at a time");
280
281 println!();
282 Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("⚡ PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 // Create a simple processor that computes statistics
304 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 // Define a processing function
307 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 // Compute basic statistics
314 let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 // Simulate some computation time
323 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 // For demonstration, we'll process chunks sequentially with timing
334 // In a real implementation, you'd use the processor.process_parallel method
335 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 // Process chunk
344 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 // Aggregate statistics
373 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390 println!("📊 PERFORMANCE COMPARISON");
391 println!("{}", "-".repeat(40));
392
393 let dataset_sizes = vec![10_000, 50_000, 100_000];
394 let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396 println!("Comparing streaming performance across different configurations:");
397 println!();
398
399 for &datasetsize in &dataset_sizes {
400 println!("Dataset size: {datasetsize} samples");
401
402 for &chunksize in &chunk_sizes {
403 let config = StreamConfig {
404 chunk_size: chunksize,
405 buffer_size: 3,
406 num_workers: 2,
407 max_chunks: Some(datasetsize / chunksize),
408 ..Default::default()
409 };
410
411 let start_time = Instant::now();
412 let mut stream = stream_regression(datasetsize, 20, config)?;
413
414 let mut processed_samples = 0;
415 let mut processed_chunks = 0;
416
417 while let Some(chunk) = stream.next_chunk()? {
418 processed_samples += chunk.n_samples();
419 processed_chunks += 1;
420
421 // Simulate minimal processing
422 let _stats = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
423
424 if chunk.is_last || processed_samples >= datasetsize {
425 break;
426 }
427 }
428
429 let duration = start_time.elapsed();
430 let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432 println!(
433 " Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434 chunksize,
435 duration.as_secs_f64(),
436 throughput,
437 processed_chunks
438 );
439 }
440 println!();
441 }
442
443 println!("Performance Insights:");
444 println!(" • Larger chunks = fewer iterations, better throughput");
445 println!(" • Smaller chunks = lower memory usage, more responsive");
446 println!(" • Optimal chunk size depends on memory constraints and processing complexity");
447
448 println!();
449 Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454 println!("🌍 REAL-WORLD STREAMING SCENARIOS");
455 println!("{}", "-".repeat(40));
456
457 // Scenario 1: Training on large dataset with limited memory
458 println!("Scenario 1: Large dataset training with memory constraints");
459 simulate_training_scenario()?;
460
461 // Scenario 2: Data preprocessing pipeline
462 println!("\nScenario 2: Data preprocessing pipeline");
463 simulate_preprocessing_pipeline()?;
464
465 // Scenario 3: Model evaluation on large test set
466 println!("\nScenario 3: Model evaluation on large test set");
467 simulate_model_evaluation()?;
468
469 println!();
470 Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475 println!(" • Dataset: 500K samples, 100 features");
476 println!(" • Memory limit: 200MB");
477 println!(" • Goal: Train incrementally using mini-batches");
478
479 let config = StreamConfig {
480 chunk_size: 5_000, // Mini-batch size
481 buffer_size: 2, // Keep memory low
482 memory_limit_mb: Some(200),
483 max_chunks: Some(10), // Simulate partial processing
484 ..Default::default()
485 };
486
487 let mut stream = stream_classification(500_000, 100, 10, config)?;
488 let mut total_batches = 0;
489 let mut total_samples = 0;
490
491 let start_time = Instant::now();
492
493 while let Some(chunk) = stream.next_chunk()? {
494 // Simulate training on mini-batch
495 let batchsize = chunk.n_samples();
496
497 // Simulate gradient computation time
498 std::thread::sleep(std::time::Duration::from_millis(20));
499
500 total_batches += 1;
501 total_samples += batchsize;
502
503 if total_batches % 3 == 0 {
504 println!(" Processed {total_batches} batches ({total_samples} samples)");
505 }
506
507 if chunk.is_last {
508 break;
509 }
510 }
511
512 let duration = start_time.elapsed();
513 println!(
514 " ✅ Training simulation: {} batches, {:.2}s",
515 total_batches,
516 duration.as_secs_f64()
517 );
518
519 Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" • Raw data → Clean → Scale → Feature selection");
525 println!(" • Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 // Step 1: Clean data (remove outliers, handle missing values)
543 transformer.transform_chunk(&mut chunk)?;
544
545 // Step 2: Feature selection (simulate by keeping first 30 features)
546 let selecteddata = chunk
547 .data
548 .slice(scirs2_core::ndarray::s![.., ..30])
549 .to_owned();
550
551 processed_chunks += 1;
552 println!(
553 " Chunk {}: {} → {} features",
554 processed_chunks,
555 chunk.n_features(),
556 selecteddata.ncols()
557 );
558
559 if chunk.is_last {
560 break;
561 }
562 }
563
564 println!(" ✅ Preprocessing pipeline: {processed_chunks} chunks processed");
565
566 Ok(())
567}
Sourcepub fn add_transform<F>(self, transform: F) -> Self
pub fn add_transform<F>(self, transform: F) -> Self
Add a transformation function
Sourcepub fn transform_chunk(&self, chunk: &mut DataChunk) -> Result<()>
pub fn transform_chunk(&self, chunk: &mut DataChunk) -> Result<()>
Apply all transformations to a chunk
Examples found in repository?
examples/datasets_streaming_demo.rs (line 258)
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221 println!("🔄 STREAM TRANSFORMATIONS");
222 println!("{}", "-".repeat(40));
223
224 // Create a transformer pipeline
225 let transformer = StreamTransformer::new()
226 .add_standard_scaling()
227 .add_missing_value_imputation();
228
229 println!("Created transformation pipeline:");
230 println!(" 1. Standard scaling (z-score normalization)");
231 println!(" 2. Missing value imputation");
232
233 let config = StreamConfig {
234 chunk_size: 2000,
235 buffer_size: 2,
236 max_chunks: Some(5),
237 ..Default::default()
238 };
239
240 let mut stream = stream_regression(10_000, 15, config)?;
241 let mut transformed_chunks = 0;
242
243 println!("\nProcessing and transforming chunks...");
244
245 while let Some(mut chunk) = stream.next_chunk()? {
246 println!(" Processing chunk {}", chunk.chunk_index + 1);
247
248 // Show statistics before transformation
249 let data_mean_before = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
250 let data_std_before = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
251
252 println!(
253 " Before: mean = {:.3}, std = {:.3}",
254 data_mean_before[0], data_std_before[0]
255 );
256
257 // Apply transformations
258 transformer.transform_chunk(&mut chunk)?;
259
260 // Show statistics after transformation
261 let data_mean_after = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
262 let data_std_after = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
263
264 println!(
265 " After: mean = {:.3}, std = {:.3}",
266 data_mean_after[0], data_std_after[0]
267 );
268
269 transformed_chunks += 1;
270
271 if chunk.is_last {
272 break;
273 }
274 }
275
276 println!("\nTransformation Summary:");
277 println!(" Chunks processed: {transformed_chunks}");
278 println!(" Each chunk was transformed independently");
279 println!(" Memory-efficient: only one chunk in memory at a time");
280
281 println!();
282 Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("⚡ PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 // Create a simple processor that computes statistics
304 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 // Define a processing function
307 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 // Compute basic statistics
314 let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 // Simulate some computation time
323 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 // For demonstration, we'll process chunks sequentially with timing
334 // In a real implementation, you'd use the processor.process_parallel method
335 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 // Process chunk
344 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 // Aggregate statistics
373 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390 println!("📊 PERFORMANCE COMPARISON");
391 println!("{}", "-".repeat(40));
392
393 let dataset_sizes = vec![10_000, 50_000, 100_000];
394 let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396 println!("Comparing streaming performance across different configurations:");
397 println!();
398
399 for &datasetsize in &dataset_sizes {
400 println!("Dataset size: {datasetsize} samples");
401
402 for &chunksize in &chunk_sizes {
403 let config = StreamConfig {
404 chunk_size: chunksize,
405 buffer_size: 3,
406 num_workers: 2,
407 max_chunks: Some(datasetsize / chunksize),
408 ..Default::default()
409 };
410
411 let start_time = Instant::now();
412 let mut stream = stream_regression(datasetsize, 20, config)?;
413
414 let mut processed_samples = 0;
415 let mut processed_chunks = 0;
416
417 while let Some(chunk) = stream.next_chunk()? {
418 processed_samples += chunk.n_samples();
419 processed_chunks += 1;
420
421 // Simulate minimal processing
422 let _stats = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
423
424 if chunk.is_last || processed_samples >= datasetsize {
425 break;
426 }
427 }
428
429 let duration = start_time.elapsed();
430 let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432 println!(
433 " Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434 chunksize,
435 duration.as_secs_f64(),
436 throughput,
437 processed_chunks
438 );
439 }
440 println!();
441 }
442
443 println!("Performance Insights:");
444 println!(" • Larger chunks = fewer iterations, better throughput");
445 println!(" • Smaller chunks = lower memory usage, more responsive");
446 println!(" • Optimal chunk size depends on memory constraints and processing complexity");
447
448 println!();
449 Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454 println!("🌍 REAL-WORLD STREAMING SCENARIOS");
455 println!("{}", "-".repeat(40));
456
457 // Scenario 1: Training on large dataset with limited memory
458 println!("Scenario 1: Large dataset training with memory constraints");
459 simulate_training_scenario()?;
460
461 // Scenario 2: Data preprocessing pipeline
462 println!("\nScenario 2: Data preprocessing pipeline");
463 simulate_preprocessing_pipeline()?;
464
465 // Scenario 3: Model evaluation on large test set
466 println!("\nScenario 3: Model evaluation on large test set");
467 simulate_model_evaluation()?;
468
469 println!();
470 Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475 println!(" • Dataset: 500K samples, 100 features");
476 println!(" • Memory limit: 200MB");
477 println!(" • Goal: Train incrementally using mini-batches");
478
479 let config = StreamConfig {
480 chunk_size: 5_000, // Mini-batch size
481 buffer_size: 2, // Keep memory low
482 memory_limit_mb: Some(200),
483 max_chunks: Some(10), // Simulate partial processing
484 ..Default::default()
485 };
486
487 let mut stream = stream_classification(500_000, 100, 10, config)?;
488 let mut total_batches = 0;
489 let mut total_samples = 0;
490
491 let start_time = Instant::now();
492
493 while let Some(chunk) = stream.next_chunk()? {
494 // Simulate training on mini-batch
495 let batchsize = chunk.n_samples();
496
497 // Simulate gradient computation time
498 std::thread::sleep(std::time::Duration::from_millis(20));
499
500 total_batches += 1;
501 total_samples += batchsize;
502
503 if total_batches % 3 == 0 {
504 println!(" Processed {total_batches} batches ({total_samples} samples)");
505 }
506
507 if chunk.is_last {
508 break;
509 }
510 }
511
512 let duration = start_time.elapsed();
513 println!(
514 " ✅ Training simulation: {} batches, {:.2}s",
515 total_batches,
516 duration.as_secs_f64()
517 );
518
519 Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" • Raw data → Clean → Scale → Feature selection");
525 println!(" • Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 // Step 1: Clean data (remove outliers, handle missing values)
543 transformer.transform_chunk(&mut chunk)?;
544
545 // Step 2: Feature selection (simulate by keeping first 30 features)
546 let selecteddata = chunk
547 .data
548 .slice(scirs2_core::ndarray::s![.., ..30])
549 .to_owned();
550
551 processed_chunks += 1;
552 println!(
553 " Chunk {}: {} → {} features",
554 processed_chunks,
555 chunk.n_features(),
556 selecteddata.ncols()
557 );
558
559 if chunk.is_last {
560 break;
561 }
562 }
563
564 println!(" ✅ Preprocessing pipeline: {processed_chunks} chunks processed");
565
566 Ok(())
567}
Sourcepub fn add_standard_scaling(self) -> Self
pub fn add_standard_scaling(self) -> Self
Add standard scaling transformation
Examples found in repository?
examples/datasets_streaming_demo.rs (line 226)
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221 println!("🔄 STREAM TRANSFORMATIONS");
222 println!("{}", "-".repeat(40));
223
224 // Create a transformer pipeline
225 let transformer = StreamTransformer::new()
226 .add_standard_scaling()
227 .add_missing_value_imputation();
228
229 println!("Created transformation pipeline:");
230 println!(" 1. Standard scaling (z-score normalization)");
231 println!(" 2. Missing value imputation");
232
233 let config = StreamConfig {
234 chunk_size: 2000,
235 buffer_size: 2,
236 max_chunks: Some(5),
237 ..Default::default()
238 };
239
240 let mut stream = stream_regression(10_000, 15, config)?;
241 let mut transformed_chunks = 0;
242
243 println!("\nProcessing and transforming chunks...");
244
245 while let Some(mut chunk) = stream.next_chunk()? {
246 println!(" Processing chunk {}", chunk.chunk_index + 1);
247
248 // Show statistics before transformation
249 let data_mean_before = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
250 let data_std_before = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
251
252 println!(
253 " Before: mean = {:.3}, std = {:.3}",
254 data_mean_before[0], data_std_before[0]
255 );
256
257 // Apply transformations
258 transformer.transform_chunk(&mut chunk)?;
259
260 // Show statistics after transformation
261 let data_mean_after = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
262 let data_std_after = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
263
264 println!(
265 " After: mean = {:.3}, std = {:.3}",
266 data_mean_after[0], data_std_after[0]
267 );
268
269 transformed_chunks += 1;
270
271 if chunk.is_last {
272 break;
273 }
274 }
275
276 println!("\nTransformation Summary:");
277 println!(" Chunks processed: {transformed_chunks}");
278 println!(" Each chunk was transformed independently");
279 println!(" Memory-efficient: only one chunk in memory at a time");
280
281 println!();
282 Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("⚡ PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 // Create a simple processor that computes statistics
304 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 // Define a processing function
307 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 // Compute basic statistics
314 let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 // Simulate some computation time
323 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 // For demonstration, we'll process chunks sequentially with timing
334 // In a real implementation, you'd use the processor.process_parallel method
335 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 // Process chunk
344 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 // Aggregate statistics
373 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390 println!("📊 PERFORMANCE COMPARISON");
391 println!("{}", "-".repeat(40));
392
393 let dataset_sizes = vec![10_000, 50_000, 100_000];
394 let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396 println!("Comparing streaming performance across different configurations:");
397 println!();
398
399 for &datasetsize in &dataset_sizes {
400 println!("Dataset size: {datasetsize} samples");
401
402 for &chunksize in &chunk_sizes {
403 let config = StreamConfig {
404 chunk_size: chunksize,
405 buffer_size: 3,
406 num_workers: 2,
407 max_chunks: Some(datasetsize / chunksize),
408 ..Default::default()
409 };
410
411 let start_time = Instant::now();
412 let mut stream = stream_regression(datasetsize, 20, config)?;
413
414 let mut processed_samples = 0;
415 let mut processed_chunks = 0;
416
417 while let Some(chunk) = stream.next_chunk()? {
418 processed_samples += chunk.n_samples();
419 processed_chunks += 1;
420
421 // Simulate minimal processing
422 let _stats = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
423
424 if chunk.is_last || processed_samples >= datasetsize {
425 break;
426 }
427 }
428
429 let duration = start_time.elapsed();
430 let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432 println!(
433 " Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434 chunksize,
435 duration.as_secs_f64(),
436 throughput,
437 processed_chunks
438 );
439 }
440 println!();
441 }
442
443 println!("Performance Insights:");
444 println!(" • Larger chunks = fewer iterations, better throughput");
445 println!(" • Smaller chunks = lower memory usage, more responsive");
446 println!(" • Optimal chunk size depends on memory constraints and processing complexity");
447
448 println!();
449 Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454 println!("🌍 REAL-WORLD STREAMING SCENARIOS");
455 println!("{}", "-".repeat(40));
456
457 // Scenario 1: Training on large dataset with limited memory
458 println!("Scenario 1: Large dataset training with memory constraints");
459 simulate_training_scenario()?;
460
461 // Scenario 2: Data preprocessing pipeline
462 println!("\nScenario 2: Data preprocessing pipeline");
463 simulate_preprocessing_pipeline()?;
464
465 // Scenario 3: Model evaluation on large test set
466 println!("\nScenario 3: Model evaluation on large test set");
467 simulate_model_evaluation()?;
468
469 println!();
470 Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475 println!(" • Dataset: 500K samples, 100 features");
476 println!(" • Memory limit: 200MB");
477 println!(" • Goal: Train incrementally using mini-batches");
478
479 let config = StreamConfig {
480 chunk_size: 5_000, // Mini-batch size
481 buffer_size: 2, // Keep memory low
482 memory_limit_mb: Some(200),
483 max_chunks: Some(10), // Simulate partial processing
484 ..Default::default()
485 };
486
487 let mut stream = stream_classification(500_000, 100, 10, config)?;
488 let mut total_batches = 0;
489 let mut total_samples = 0;
490
491 let start_time = Instant::now();
492
493 while let Some(chunk) = stream.next_chunk()? {
494 // Simulate training on mini-batch
495 let batchsize = chunk.n_samples();
496
497 // Simulate gradient computation time
498 std::thread::sleep(std::time::Duration::from_millis(20));
499
500 total_batches += 1;
501 total_samples += batchsize;
502
503 if total_batches % 3 == 0 {
504 println!(" Processed {total_batches} batches ({total_samples} samples)");
505 }
506
507 if chunk.is_last {
508 break;
509 }
510 }
511
512 let duration = start_time.elapsed();
513 println!(
514 " ✅ Training simulation: {} batches, {:.2}s",
515 total_batches,
516 duration.as_secs_f64()
517 );
518
519 Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" • Raw data → Clean → Scale → Feature selection");
525 println!(" • Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 // Step 1: Clean data (remove outliers, handle missing values)
543 transformer.transform_chunk(&mut chunk)?;
544
545 // Step 2: Feature selection (simulate by keeping first 30 features)
546 let selecteddata = chunk
547 .data
548 .slice(scirs2_core::ndarray::s![.., ..30])
549 .to_owned();
550
551 processed_chunks += 1;
552 println!(
553 " Chunk {}: {} → {} features",
554 processed_chunks,
555 chunk.n_features(),
556 selecteddata.ncols()
557 );
558
559 if chunk.is_last {
560 break;
561 }
562 }
563
564 println!(" ✅ Preprocessing pipeline: {processed_chunks} chunks processed");
565
566 Ok(())
567}
Sourcepub fn add_missing_value_imputation(self) -> Self
pub fn add_missing_value_imputation(self) -> Self
Add missing value imputation
Examples found in repository?
examples/datasets_streaming_demo.rs (line 227)
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221 println!("🔄 STREAM TRANSFORMATIONS");
222 println!("{}", "-".repeat(40));
223
224 // Create a transformer pipeline
225 let transformer = StreamTransformer::new()
226 .add_standard_scaling()
227 .add_missing_value_imputation();
228
229 println!("Created transformation pipeline:");
230 println!(" 1. Standard scaling (z-score normalization)");
231 println!(" 2. Missing value imputation");
232
233 let config = StreamConfig {
234 chunk_size: 2000,
235 buffer_size: 2,
236 max_chunks: Some(5),
237 ..Default::default()
238 };
239
240 let mut stream = stream_regression(10_000, 15, config)?;
241 let mut transformed_chunks = 0;
242
243 println!("\nProcessing and transforming chunks...");
244
245 while let Some(mut chunk) = stream.next_chunk()? {
246 println!(" Processing chunk {}", chunk.chunk_index + 1);
247
248 // Show statistics before transformation
249 let data_mean_before = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
250 let data_std_before = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
251
252 println!(
253 " Before: mean = {:.3}, std = {:.3}",
254 data_mean_before[0], data_std_before[0]
255 );
256
257 // Apply transformations
258 transformer.transform_chunk(&mut chunk)?;
259
260 // Show statistics after transformation
261 let data_mean_after = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
262 let data_std_after = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
263
264 println!(
265 " After: mean = {:.3}, std = {:.3}",
266 data_mean_after[0], data_std_after[0]
267 );
268
269 transformed_chunks += 1;
270
271 if chunk.is_last {
272 break;
273 }
274 }
275
276 println!("\nTransformation Summary:");
277 println!(" Chunks processed: {transformed_chunks}");
278 println!(" Each chunk was transformed independently");
279 println!(" Memory-efficient: only one chunk in memory at a time");
280
281 println!();
282 Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("⚡ PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 // Create a simple processor that computes statistics
304 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 // Define a processing function
307 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 // Compute basic statistics
314 let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 // Simulate some computation time
323 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 // For demonstration, we'll process chunks sequentially with timing
334 // In a real implementation, you'd use the processor.process_parallel method
335 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 // Process chunk
344 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 // Aggregate statistics
373 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390 println!("📊 PERFORMANCE COMPARISON");
391 println!("{}", "-".repeat(40));
392
393 let dataset_sizes = vec![10_000, 50_000, 100_000];
394 let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396 println!("Comparing streaming performance across different configurations:");
397 println!();
398
399 for &datasetsize in &dataset_sizes {
400 println!("Dataset size: {datasetsize} samples");
401
402 for &chunksize in &chunk_sizes {
403 let config = StreamConfig {
404 chunk_size: chunksize,
405 buffer_size: 3,
406 num_workers: 2,
407 max_chunks: Some(datasetsize / chunksize),
408 ..Default::default()
409 };
410
411 let start_time = Instant::now();
412 let mut stream = stream_regression(datasetsize, 20, config)?;
413
414 let mut processed_samples = 0;
415 let mut processed_chunks = 0;
416
417 while let Some(chunk) = stream.next_chunk()? {
418 processed_samples += chunk.n_samples();
419 processed_chunks += 1;
420
421 // Simulate minimal processing
422 let _stats = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
423
424 if chunk.is_last || processed_samples >= datasetsize {
425 break;
426 }
427 }
428
429 let duration = start_time.elapsed();
430 let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432 println!(
433 " Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434 chunksize,
435 duration.as_secs_f64(),
436 throughput,
437 processed_chunks
438 );
439 }
440 println!();
441 }
442
443 println!("Performance Insights:");
444 println!(" • Larger chunks = fewer iterations, better throughput");
445 println!(" • Smaller chunks = lower memory usage, more responsive");
446 println!(" • Optimal chunk size depends on memory constraints and processing complexity");
447
448 println!();
449 Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454 println!("🌍 REAL-WORLD STREAMING SCENARIOS");
455 println!("{}", "-".repeat(40));
456
457 // Scenario 1: Training on large dataset with limited memory
458 println!("Scenario 1: Large dataset training with memory constraints");
459 simulate_training_scenario()?;
460
461 // Scenario 2: Data preprocessing pipeline
462 println!("\nScenario 2: Data preprocessing pipeline");
463 simulate_preprocessing_pipeline()?;
464
465 // Scenario 3: Model evaluation on large test set
466 println!("\nScenario 3: Model evaluation on large test set");
467 simulate_model_evaluation()?;
468
469 println!();
470 Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475 println!(" • Dataset: 500K samples, 100 features");
476 println!(" • Memory limit: 200MB");
477 println!(" • Goal: Train incrementally using mini-batches");
478
479 let config = StreamConfig {
480 chunk_size: 5_000, // Mini-batch size
481 buffer_size: 2, // Keep memory low
482 memory_limit_mb: Some(200),
483 max_chunks: Some(10), // Simulate partial processing
484 ..Default::default()
485 };
486
487 let mut stream = stream_classification(500_000, 100, 10, config)?;
488 let mut total_batches = 0;
489 let mut total_samples = 0;
490
491 let start_time = Instant::now();
492
493 while let Some(chunk) = stream.next_chunk()? {
494 // Simulate training on mini-batch
495 let batchsize = chunk.n_samples();
496
497 // Simulate gradient computation time
498 std::thread::sleep(std::time::Duration::from_millis(20));
499
500 total_batches += 1;
501 total_samples += batchsize;
502
503 if total_batches % 3 == 0 {
504 println!(" Processed {total_batches} batches ({total_samples} samples)");
505 }
506
507 if chunk.is_last {
508 break;
509 }
510 }
511
512 let duration = start_time.elapsed();
513 println!(
514 " ✅ Training simulation: {} batches, {:.2}s",
515 total_batches,
516 duration.as_secs_f64()
517 );
518
519 Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" • Raw data → Clean → Scale → Feature selection");
525 println!(" • Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 // Step 1: Clean data (remove outliers, handle missing values)
543 transformer.transform_chunk(&mut chunk)?;
544
545 // Step 2: Feature selection (simulate by keeping first 30 features)
546 let selecteddata = chunk
547 .data
548 .slice(scirs2_core::ndarray::s![.., ..30])
549 .to_owned();
550
551 processed_chunks += 1;
552 println!(
553 " Chunk {}: {} → {} features",
554 processed_chunks,
555 chunk.n_features(),
556 selecteddata.ncols()
557 );
558
559 if chunk.is_last {
560 break;
561 }
562 }
563
564 println!(" ✅ Preprocessing pipeline: {processed_chunks} chunks processed");
565
566 Ok(())
567}
Trait Implementations§
Auto Trait Implementations§
impl Freeze for StreamTransformer
impl !RefUnwindSafe for StreamTransformer
impl Send for StreamTransformer
impl Sync for StreamTransformer
impl Unpin for StreamTransformer
impl !UnwindSafe for StreamTransformer
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
Source§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
The inverse inclusion map: attempts to construct
self
from the equivalent element of its
superset. Read moreSource§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
Checks if
self
is actually part of its subset T
(and can be converted to it).Source§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
Use with care! Same as
self.to_subset
but without any property checks. Always succeeds.Source§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
The inclusion map: converts
self
to the equivalent element of its superset.