1use scirs2_datasets::{
10 make_classification, stream_classification, stream_regression, utils::train_test_split,
11 DataChunk, StreamConfig, StreamProcessor, StreamTransformer,
12};
13use std::collections::HashMap;
14use std::time::Instant;
15
16#[allow(dead_code)]
17fn main() -> Result<(), Box<dyn std::error::Error>> {
18 println!("š Streaming Datasets Demonstration");
19 println!("===================================\n");
20
21 demonstrate_basic_streaming()?;
23
24 demonstrate_memory_efficient_processing()?;
26
27 demonstrate_stream_transformations()?;
29
30 demonstrate_parallel_processing()?;
32
33 demonstrate_performance_comparison()?;
35
36 demonstrate_real_world_scenarios()?;
38
39 println!("\nš Streaming demonstration completed!");
40 Ok(())
41}
42
43#[allow(dead_code)]
44fn demonstrate_basic_streaming() -> Result<(), Box<dyn std::error::Error>> {
45 println!("š BASIC STREAMING OPERATIONS");
46 println!("{}", "-".repeat(40));
47
48 let config = StreamConfig {
50 chunk_size: 1000, buffer_size: 3, num_workers: 4, memory_limit_mb: Some(100), enable_compression: false,
55 enable_prefetch: true,
56 max_chunks: Some(10), };
58
59 println!("Streaming Configuration:");
60 println!(" Chunk size: {} samples", config.chunk_size);
61 println!(" Buffer size: {} chunks", config.buffer_size);
62 println!(" Workers: {}", config.num_workers);
63 println!(" Memory limit: {:?} MB", config.memory_limit_mb);
64 println!(" Max chunks: {:?}", config.max_chunks);
65
66 println!("\nStreaming synthetic classification data...");
68 let mut stream = stream_classification(100_000, 20, 5, config.clone())?;
69
70 let mut total_samples = 0;
71 let mut chunk_count = 0;
72 let mut class_distribution: HashMap<i32, usize> = HashMap::new();
73
74 let start_time = Instant::now();
75
76 while let Some(chunk) = stream.next_chunk()? {
77 total_samples += chunk.n_samples();
78 chunk_count += 1;
79
80 if let Some(target) = &chunk.target {
82 for &class in target.iter() {
83 *class_distribution.entry(class as i32).or_insert(0) += 1;
84 }
85 }
86
87 let stats = stream.stats();
89 if let Some(progress) = stats.progress_percent() {
90 println!(
91 " Chunk {}: {} samples (Progress: {:.1}%, Buffer: {:.1}%)",
92 chunk.chunk_index + 1,
93 chunk.n_samples(),
94 progress,
95 stats.buffer_utilization()
96 );
97 } else {
98 println!(
99 " Chunk {}: {} samples (Buffer: {:.1}%)",
100 chunk.chunk_index + 1,
101 chunk.n_samples(),
102 stats.buffer_utilization()
103 );
104 }
105
106 std::thread::sleep(std::time::Duration::from_millis(50));
108
109 if chunk.is_last {
110 println!(" š Reached last chunk");
111 break;
112 }
113 }
114
115 let duration = start_time.elapsed();
116
117 println!("\nStreaming Results:");
118 println!(" Total chunks processed: {chunk_count}");
119 println!(" Total samples: {total_samples}");
120 println!(" Processing time: {:.2}s", duration.as_secs_f64());
121 println!(
122 " Throughput: {:.1} samples/s",
123 total_samples as f64 / duration.as_secs_f64()
124 );
125 println!(" Class distribution: {class_distribution:?}");
126
127 println!();
128 Ok(())
129}
130
131#[allow(dead_code)]
132fn demonstrate_memory_efficient_processing() -> Result<(), Box<dyn std::error::Error>> {
133 println!("š¾ MEMORY-EFFICIENT PROCESSING");
134 println!("{}", "-".repeat(40));
135
136 let datasetsize = 50_000;
138 let n_features = 50;
139
140 println!("Comparing memory usage for {datasetsize} samples with {n_features} features");
141
142 println!("\n1. In-memory approach:");
144 let start_mem = get_memory_usage();
145 let start_time = Instant::now();
146
147 let in_memorydataset = make_classification(datasetsize, n_features, 5, 2, 25, Some(42))?;
148 let (train, test) = train_test_split(&in_memorydataset, 0.2, Some(42))?;
149
150 let in_memory_time = start_time.elapsed();
151 let in_memory_mem = get_memory_usage() - start_mem;
152
153 println!(" Time: {:.2}s", in_memory_time.as_secs_f64());
154 println!(" Memory usage: ~{in_memory_mem:.1} MB");
155 println!(" Train samples: {}", train.n_samples());
156 println!(" Test samples: {}", test.n_samples());
157
158 println!("\n2. Streaming approach:");
160 let stream_start_time = Instant::now();
161 let stream_start_mem = get_memory_usage();
162
163 let config = StreamConfig {
164 chunk_size: 5_000, buffer_size: 2, num_workers: 2,
167 memory_limit_mb: Some(50),
168 ..Default::default()
169 };
170
171 let mut stream = stream_classification(datasetsize, n_features, 5, config)?;
172
173 let mut total_processed = 0;
174 let mut train_samples = 0;
175 let mut test_samples = 0;
176
177 while let Some(chunk) = stream.next_chunk()? {
178 total_processed += chunk.n_samples();
179
180 let chunk_trainsize = (chunk.n_samples() as f64 * 0.8) as usize;
182 train_samples += chunk_trainsize;
183 test_samples += chunk.n_samples() - chunk_trainsize;
184
185 let _mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
187 let _std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
188
189 if chunk.is_last {
190 break;
191 }
192 }
193
194 let stream_time = stream_start_time.elapsed();
195 let stream_mem = get_memory_usage() - stream_start_mem;
196
197 println!(" Time: {:.2}s", stream_time.as_secs_f64());
198 println!(" Memory usage: ~{stream_mem:.1} MB");
199 println!(" Train samples: {train_samples}");
200 println!(" Test samples: {test_samples}");
201 println!(" Total processed: {total_processed}");
202
203 println!("\n3. Comparison:");
205 println!(
206 " Memory savings: {:.1}x less memory",
207 in_memory_mem / stream_mem.max(1.0)
208 );
209 println!(
210 " Time overhead: {:.1}x",
211 stream_time.as_secs_f64() / in_memory_time.as_secs_f64()
212 );
213 println!(" Streaming is beneficial for large datasets that don't fit in memory");
214
215 println!();
216 Ok(())
217}
218
219#[allow(dead_code)]
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221 println!("š STREAM TRANSFORMATIONS");
222 println!("{}", "-".repeat(40));
223
224 let transformer = StreamTransformer::new()
226 .add_standard_scaling()
227 .add_missing_value_imputation();
228
229 println!("Created transformation pipeline:");
230 println!(" 1. Standard scaling (z-score normalization)");
231 println!(" 2. Missing value imputation");
232
233 let config = StreamConfig {
234 chunk_size: 2000,
235 buffer_size: 2,
236 max_chunks: Some(5),
237 ..Default::default()
238 };
239
240 let mut stream = stream_regression(10_000, 15, config)?;
241 let mut transformed_chunks = 0;
242
243 println!("\nProcessing and transforming chunks...");
244
245 while let Some(mut chunk) = stream.next_chunk()? {
246 println!(" Processing chunk {}", chunk.chunk_index + 1);
247
248 let data_mean_before = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
250 let data_std_before = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
251
252 println!(
253 " Before: mean = {:.3}, std = {:.3}",
254 data_mean_before[0], data_std_before[0]
255 );
256
257 transformer.transform_chunk(&mut chunk)?;
259
260 let data_mean_after = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
262 let data_std_after = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
263
264 println!(
265 " After: mean = {:.3}, std = {:.3}",
266 data_mean_after[0], data_std_after[0]
267 );
268
269 transformed_chunks += 1;
270
271 if chunk.is_last {
272 break;
273 }
274 }
275
276 println!("\nTransformation Summary:");
277 println!(" Chunks processed: {transformed_chunks}");
278 println!(" Each chunk was transformed independently");
279 println!(" Memory-efficient: only one chunk in memory at a time");
280
281 println!();
282 Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("ā” PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390 println!("š PERFORMANCE COMPARISON");
391 println!("{}", "-".repeat(40));
392
393 let dataset_sizes = vec![10_000, 50_000, 100_000];
394 let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396 println!("Comparing streaming performance across different configurations:");
397 println!();
398
399 for &datasetsize in &dataset_sizes {
400 println!("Dataset size: {datasetsize} samples");
401
402 for &chunksize in &chunk_sizes {
403 let config = StreamConfig {
404 chunk_size: chunksize,
405 buffer_size: 3,
406 num_workers: 2,
407 max_chunks: Some(datasetsize / chunksize),
408 ..Default::default()
409 };
410
411 let start_time = Instant::now();
412 let mut stream = stream_regression(datasetsize, 20, config)?;
413
414 let mut processed_samples = 0;
415 let mut processed_chunks = 0;
416
417 while let Some(chunk) = stream.next_chunk()? {
418 processed_samples += chunk.n_samples();
419 processed_chunks += 1;
420
421 let _stats = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
423
424 if chunk.is_last || processed_samples >= datasetsize {
425 break;
426 }
427 }
428
429 let duration = start_time.elapsed();
430 let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432 println!(
433 " Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434 chunksize,
435 duration.as_secs_f64(),
436 throughput,
437 processed_chunks
438 );
439 }
440 println!();
441 }
442
443 println!("Performance Insights:");
444 println!(" ⢠Larger chunks = fewer iterations, better throughput");
445 println!(" ⢠Smaller chunks = lower memory usage, more responsive");
446 println!(" ⢠Optimal chunk size depends on memory constraints and processing complexity");
447
448 println!();
449 Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454 println!("š REAL-WORLD STREAMING SCENARIOS");
455 println!("{}", "-".repeat(40));
456
457 println!("Scenario 1: Large dataset training with memory constraints");
459 simulate_training_scenario()?;
460
461 println!("\nScenario 2: Data preprocessing pipeline");
463 simulate_preprocessing_pipeline()?;
464
465 println!("\nScenario 3: Model evaluation on large test set");
467 simulate_model_evaluation()?;
468
469 println!();
470 Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475 println!(" ⢠Dataset: 500K samples, 100 features");
476 println!(" ⢠Memory limit: 200MB");
477 println!(" ⢠Goal: Train incrementally using mini-batches");
478
479 let config = StreamConfig {
480 chunk_size: 5_000, buffer_size: 2, memory_limit_mb: Some(200),
483 max_chunks: Some(10), ..Default::default()
485 };
486
487 let mut stream = stream_classification(500_000, 100, 10, config)?;
488 let mut total_batches = 0;
489 let mut total_samples = 0;
490
491 let start_time = Instant::now();
492
493 while let Some(chunk) = stream.next_chunk()? {
494 let batchsize = chunk.n_samples();
496
497 std::thread::sleep(std::time::Duration::from_millis(20));
499
500 total_batches += 1;
501 total_samples += batchsize;
502
503 if total_batches % 3 == 0 {
504 println!(" Processed {total_batches} batches ({total_samples} samples)");
505 }
506
507 if chunk.is_last {
508 break;
509 }
510 }
511
512 let duration = start_time.elapsed();
513 println!(
514 " ā
Training simulation: {} batches, {:.2}s",
515 total_batches,
516 duration.as_secs_f64()
517 );
518
519 Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" ⢠Raw data ā Clean ā Scale ā Feature selection");
525 println!(" ⢠Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 transformer.transform_chunk(&mut chunk)?;
544
545 let selecteddata = chunk
547 .data
548 .slice(scirs2_core::ndarray::s![.., ..30])
549 .to_owned();
550
551 processed_chunks += 1;
552 println!(
553 " Chunk {}: {} ā {} features",
554 processed_chunks,
555 chunk.n_features(),
556 selecteddata.ncols()
557 );
558
559 if chunk.is_last {
560 break;
561 }
562 }
563
564 println!(" ā
Preprocessing pipeline: {processed_chunks} chunks processed");
565
566 Ok(())
567}
568
569#[allow(dead_code)]
570fn simulate_model_evaluation() -> Result<(), Box<dyn std::error::Error>> {
571 println!(" ⢠Evaluate model on 1M test samples");
572 println!(" ⢠Compute accuracy in streaming fashion");
573
574 let config = StreamConfig {
575 chunk_size: 10_000,
576 buffer_size: 2,
577 max_chunks: Some(8),
578 ..Default::default()
579 };
580
581 let mut stream = stream_classification(1_000_000, 20, 5, config)?;
582 let mut correct_predictions = 0;
583 let mut total_predictions = 0;
584
585 while let Some(chunk) = stream.next_chunk()? {
586 if let Some(true_labels) = &chunk.target {
587 let predictions: Vec<f64> = (0..chunk.n_samples())
589 .map(|_| (scirs2_core::random::random::<f64>() * 5.0).floor())
590 .collect();
591
592 let chunk_correct = true_labels
594 .iter()
595 .zip(predictions.iter())
596 .filter(|(&true_label, &pred)| (true_label - pred).abs() < 0.5)
597 .count();
598
599 correct_predictions += chunk_correct;
600 total_predictions += chunk.n_samples();
601 }
602
603 if chunk.is_last {
604 break;
605 }
606 }
607
608 let accuracy = correct_predictions as f64 / total_predictions as f64;
609 println!(
610 " ā
Model evaluation: {:.1}% accuracy on {} samples",
611 accuracy * 100.0,
612 total_predictions
613 );
614
615 Ok(())
616}
617
618#[allow(dead_code)]
620fn get_memory_usage() -> f64 {
621 get_process_memory_usage().unwrap_or_else(|_| {
622 scirs2_core::random::random::<f64>() * 50.0 + 10.0 })
625}
626
627#[cfg(target_os = "linux")]
629#[allow(dead_code)]
630fn get_process_memory_usage() -> Result<f64, Box<dyn std::error::Error>> {
631 use std::fs;
632
633 let status = fs::read_to_string("/proc/self/status")?;
635
636 for line in status.lines() {
637 if line.starts_with("VmRSS:") {
638 let parts: Vec<&str> = line.split_whitespace().collect();
639 if parts.len() >= 2 {
640 let kb: f64 = parts[1].parse()?;
641 return Ok(kb / 1024.0); }
643 }
644 }
645
646 Err("VmRSS not found in /proc/self/status".into())
647}
648
649#[cfg(target_os = "macos")]
650#[allow(dead_code)]
651fn get_process_memory_usage() -> Result<f64, Box<dyn std::error::Error>> {
652 use std::mem;
653 use std::ptr;
654
655 extern "C" {
657 fn mach_task_self() -> u32;
658 fn task_info(
659 target_task: u32,
660 flavor: u32,
661 task_info_out: *mut u8,
662 task_info_outCnt: *mut u32,
663 ) -> i32;
664 }
665
666 const TASK_BASIC_INFO: u32 = 5;
667 const TASK_BASIC_INFO_COUNT: u32 = 5;
668
669 #[repr(C)]
670 struct TaskBasicInfo {
671 suspend_count: u32,
672 virtualsize: u32,
673 residentsize: u32,
674 user_time: u64,
675 system_time: u64,
676 }
677
678 unsafe {
679 let mut info = mem::zeroed::<TaskBasicInfo>();
680 let mut count = TASK_BASIC_INFO_COUNT;
681
682 let result = task_info(
683 mach_task_self(),
684 TASK_BASIC_INFO,
685 &mut info as *mut _ as *mut u8,
686 &mut count,
687 );
688
689 if result == 0 {
690 Ok(info.residentsize as f64 / (1024.0 * 1024.0)) } else {
692 Err(format!("task_info failed with code {}", result).into())
693 }
694 }
695}
696
697#[cfg(target_os = "windows")]
698#[allow(dead_code)]
699fn get_process_memory_usage() -> Result<f64, Box<dyn std::error::Error>> {
700 use std::mem;
701 use std::ptr;
702
703 extern "system" {
705 fn GetCurrentProcess() -> isize;
706 fn GetProcessMemoryInfo(
707 process: isize,
708 counters: *mut ProcessMemoryCounters,
709 cb: u32,
710 ) -> i32;
711 }
712
713 #[repr(C)]
714 struct ProcessMemoryCounters {
715 cb: u32,
716 page_fault_count: u32,
717 peak_working_setsize: usize,
718 working_setsize: usize,
719 quota_peak_paged_pool_usage: usize,
720 quota_paged_pool_usage: usize,
721 quota_peak_non_paged_pool_usage: usize,
722 quota_non_paged_pool_usage: usize,
723 pagefile_usage: usize,
724 peak_pagefile_usage: usize,
725 }
726
727 unsafe {
728 let mut counters = mem::zeroed::<ProcessMemoryCounters>();
729 counters.cb = mem::size_of::<ProcessMemoryCounters>() as u32;
730
731 let result = GetProcessMemoryInfo(GetCurrentProcess(), &mut counters, counters.cb);
732
733 if result != 0 {
734 Ok(counters.working_setsize as f64 / (1024.0 * 1024.0)) } else {
736 Err("GetProcessMemoryInfo failed".into())
737 }
738 }
739}
740
741#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
742#[allow(dead_code)]
743fn get_process_memory_usage() -> Result<f64, Box<dyn std::error::Error>> {
744 Err("Memory usage monitoring not implemented for this platform".into())
745}