1use scirs2_datasets::{
10 make_classification, stream_classification, stream_regression, utils::train_test_split,
11 DataChunk, StreamConfig, StreamProcessor, StreamTransformer,
12};
13use std::collections::HashMap;
14use std::time::Instant;
15
16#[allow(dead_code)]
17fn main() -> Result<(), Box<dyn std::error::Error>> {
18 println!("š Streaming Datasets Demonstration");
19 println!("===================================\n");
20
21 demonstrate_basic_streaming()?;
23
24 demonstrate_memory_efficient_processing()?;
26
27 demonstrate_stream_transformations()?;
29
30 demonstrate_parallel_processing()?;
32
33 demonstrate_performance_comparison()?;
35
36 demonstrate_real_world_scenarios()?;
38
39 println!("\nš Streaming demonstration completed!");
40 Ok(())
41}
42
43#[allow(dead_code)]
44fn demonstrate_basic_streaming() -> Result<(), Box<dyn std::error::Error>> {
45 println!("š BASIC STREAMING OPERATIONS");
46 println!("{}", "-".repeat(40));
47
48 let config = StreamConfig {
50 chunk_size: 1000, buffer_size: 3, num_workers: 4, memory_limit_mb: Some(100), enable_compression: false,
55 enable_prefetch: true,
56 max_chunks: Some(10), };
58
59 println!("Streaming Configuration:");
60 println!(" Chunk size: {} samples", config.chunk_size);
61 println!(" Buffer size: {} chunks", config.buffer_size);
62 println!(" Workers: {}", config.num_workers);
63 println!(" Memory limit: {:?} MB", config.memory_limit_mb);
64 println!(" Max chunks: {:?}", config.max_chunks);
65
66 println!("\nStreaming synthetic classification data...");
68 let mut stream = stream_classification(100_000, 20, 5, config.clone())?;
69
70 let mut total_samples = 0;
71 let mut chunk_count = 0;
72 let mut class_distribution: HashMap<i32, usize> = HashMap::new();
73
74 let start_time = Instant::now();
75
76 while let Some(chunk) = stream.next_chunk()? {
77 total_samples += chunk.n_samples();
78 chunk_count += 1;
79
80 if let Some(target) = &chunk.target {
82 for &class in target.iter() {
83 *class_distribution.entry(class as i32).or_insert(0) += 1;
84 }
85 }
86
87 let stats = stream.stats();
89 if let Some(progress) = stats.progress_percent() {
90 println!(
91 " Chunk {}: {} samples (Progress: {:.1}%, Buffer: {:.1}%)",
92 chunk.chunk_index + 1,
93 chunk.n_samples(),
94 progress,
95 stats.buffer_utilization()
96 );
97 } else {
98 println!(
99 " Chunk {}: {} samples (Buffer: {:.1}%)",
100 chunk.chunk_index + 1,
101 chunk.n_samples(),
102 stats.buffer_utilization()
103 );
104 }
105
106 std::thread::sleep(std::time::Duration::from_millis(50));
108
109 if chunk.is_last {
110 println!(" š Reached last chunk");
111 break;
112 }
113 }
114
115 let duration = start_time.elapsed();
116
117 println!("\nStreaming Results:");
118 println!(" Total chunks processed: {chunk_count}");
119 println!(" Total samples: {total_samples}");
120 println!(" Processing time: {:.2}s", duration.as_secs_f64());
121 println!(
122 " Throughput: {:.1} samples/s",
123 total_samples as f64 / duration.as_secs_f64()
124 );
125 println!(" Class distribution: {class_distribution:?}");
126
127 println!();
128 Ok(())
129}
130
131#[allow(dead_code)]
132fn demonstrate_memory_efficient_processing() -> Result<(), Box<dyn std::error::Error>> {
133 println!("š¾ MEMORY-EFFICIENT PROCESSING");
134 println!("{}", "-".repeat(40));
135
136 let datasetsize = 50_000;
138 let n_features = 50;
139
140 println!("Comparing memory usage for {datasetsize} samples with {n_features} features");
141
142 println!("\n1. In-memory approach:");
144 let start_mem = get_memory_usage();
145 let start_time = Instant::now();
146
147 let in_memorydataset = make_classification(datasetsize, n_features, 5, 2, 25, Some(42))?;
148 let (train, test) = train_test_split(&in_memorydataset, 0.2, Some(42))?;
149
150 let in_memory_time = start_time.elapsed();
151 let in_memory_mem = get_memory_usage() - start_mem;
152
153 println!(" Time: {:.2}s", in_memory_time.as_secs_f64());
154 println!(" Memory usage: ~{in_memory_mem:.1} MB");
155 println!(" Train samples: {}", train.n_samples());
156 println!(" Test samples: {}", test.n_samples());
157
158 println!("\n2. Streaming approach:");
160 let stream_start_time = Instant::now();
161 let stream_start_mem = get_memory_usage();
162
163 let config = StreamConfig {
164 chunk_size: 5_000, buffer_size: 2, num_workers: 2,
167 memory_limit_mb: Some(50),
168 ..Default::default()
169 };
170
171 let mut stream = stream_classification(datasetsize, n_features, 5, config)?;
172
173 let mut total_processed = 0;
174 let mut train_samples = 0;
175 let mut test_samples = 0;
176
177 while let Some(chunk) = stream.next_chunk()? {
178 total_processed += chunk.n_samples();
179
180 let chunk_trainsize = (chunk.n_samples() as f64 * 0.8) as usize;
182 train_samples += chunk_trainsize;
183 test_samples += chunk.n_samples() - chunk_trainsize;
184
185 let _mean = chunk.data.mean_axis(ndarray::Axis(0));
187 let _std = chunk.data.std_axis(ndarray::Axis(0), 0.0);
188
189 if chunk.is_last {
190 break;
191 }
192 }
193
194 let stream_time = stream_start_time.elapsed();
195 let stream_mem = get_memory_usage() - stream_start_mem;
196
197 println!(" Time: {:.2}s", stream_time.as_secs_f64());
198 println!(" Memory usage: ~{stream_mem:.1} MB");
199 println!(" Train samples: {train_samples}");
200 println!(" Test samples: {test_samples}");
201 println!(" Total processed: {total_processed}");
202
203 println!("\n3. Comparison:");
205 println!(
206 " Memory savings: {:.1}x less memory",
207 in_memory_mem / stream_mem.max(1.0)
208 );
209 println!(
210 " Time overhead: {:.1}x",
211 stream_time.as_secs_f64() / in_memory_time.as_secs_f64()
212 );
213 println!(" Streaming is beneficial for large datasets that don't fit in memory");
214
215 println!();
216 Ok(())
217}
218
219#[allow(dead_code)]
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221 println!("š STREAM TRANSFORMATIONS");
222 println!("{}", "-".repeat(40));
223
224 let transformer = StreamTransformer::new()
226 .add_standard_scaling()
227 .add_missing_value_imputation();
228
229 println!("Created transformation pipeline:");
230 println!(" 1. Standard scaling (z-score normalization)");
231 println!(" 2. Missing value imputation");
232
233 let config = StreamConfig {
234 chunk_size: 2000,
235 buffer_size: 2,
236 max_chunks: Some(5),
237 ..Default::default()
238 };
239
240 let mut stream = stream_regression(10_000, 15, config)?;
241 let mut transformed_chunks = 0;
242
243 println!("\nProcessing and transforming chunks...");
244
245 while let Some(mut chunk) = stream.next_chunk()? {
246 println!(" Processing chunk {}", chunk.chunk_index + 1);
247
248 let data_mean_before = chunk.data.mean_axis(ndarray::Axis(0)).unwrap();
250 let data_std_before = chunk.data.std_axis(ndarray::Axis(0), 0.0);
251
252 println!(
253 " Before: mean = {:.3}, std = {:.3}",
254 data_mean_before[0], data_std_before[0]
255 );
256
257 transformer.transform_chunk(&mut chunk)?;
259
260 let data_mean_after = chunk.data.mean_axis(ndarray::Axis(0)).unwrap();
262 let data_std_after = chunk.data.std_axis(ndarray::Axis(0), 0.0);
263
264 println!(
265 " After: mean = {:.3}, std = {:.3}",
266 data_mean_after[0], data_std_after[0]
267 );
268
269 transformed_chunks += 1;
270
271 if chunk.is_last {
272 break;
273 }
274 }
275
276 println!("\nTransformation Summary:");
277 println!(" Chunks processed: {transformed_chunks}");
278 println!(" Each chunk was transformed independently");
279 println!(" Memory-efficient: only one chunk in memory at a time");
280
281 println!();
282 Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("ā” PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 let mean = chunk.data.mean_axis(ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390 println!("š PERFORMANCE COMPARISON");
391 println!("{}", "-".repeat(40));
392
393 let dataset_sizes = vec![10_000, 50_000, 100_000];
394 let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396 println!("Comparing streaming performance across different configurations:");
397 println!();
398
399 for &datasetsize in &dataset_sizes {
400 println!("Dataset size: {datasetsize} samples");
401
402 for &chunksize in &chunk_sizes {
403 let config = StreamConfig {
404 chunk_size: chunksize,
405 buffer_size: 3,
406 num_workers: 2,
407 max_chunks: Some(datasetsize / chunksize),
408 ..Default::default()
409 };
410
411 let start_time = Instant::now();
412 let mut stream = stream_regression(datasetsize, 20, config)?;
413
414 let mut processed_samples = 0;
415 let mut processed_chunks = 0;
416
417 while let Some(chunk) = stream.next_chunk()? {
418 processed_samples += chunk.n_samples();
419 processed_chunks += 1;
420
421 let _stats = chunk.data.mean_axis(ndarray::Axis(0));
423
424 if chunk.is_last || processed_samples >= datasetsize {
425 break;
426 }
427 }
428
429 let duration = start_time.elapsed();
430 let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432 println!(
433 " Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434 chunksize,
435 duration.as_secs_f64(),
436 throughput,
437 processed_chunks
438 );
439 }
440 println!();
441 }
442
443 println!("Performance Insights:");
444 println!(" ⢠Larger chunks = fewer iterations, better throughput");
445 println!(" ⢠Smaller chunks = lower memory usage, more responsive");
446 println!(" ⢠Optimal chunk size depends on memory constraints and processing complexity");
447
448 println!();
449 Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454 println!("š REAL-WORLD STREAMING SCENARIOS");
455 println!("{}", "-".repeat(40));
456
457 println!("Scenario 1: Large dataset training with memory constraints");
459 simulate_training_scenario()?;
460
461 println!("\nScenario 2: Data preprocessing pipeline");
463 simulate_preprocessing_pipeline()?;
464
465 println!("\nScenario 3: Model evaluation on large test set");
467 simulate_model_evaluation()?;
468
469 println!();
470 Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475 println!(" ⢠Dataset: 500K samples, 100 features");
476 println!(" ⢠Memory limit: 200MB");
477 println!(" ⢠Goal: Train incrementally using mini-batches");
478
479 let config = StreamConfig {
480 chunk_size: 5_000, buffer_size: 2, memory_limit_mb: Some(200),
483 max_chunks: Some(10), ..Default::default()
485 };
486
487 let mut stream = stream_classification(500_000, 100, 10, config)?;
488 let mut total_batches = 0;
489 let mut total_samples = 0;
490
491 let start_time = Instant::now();
492
493 while let Some(chunk) = stream.next_chunk()? {
494 let batchsize = chunk.n_samples();
496
497 std::thread::sleep(std::time::Duration::from_millis(20));
499
500 total_batches += 1;
501 total_samples += batchsize;
502
503 if total_batches % 3 == 0 {
504 println!(" Processed {total_batches} batches ({total_samples} samples)");
505 }
506
507 if chunk.is_last {
508 break;
509 }
510 }
511
512 let duration = start_time.elapsed();
513 println!(
514 " ā
Training simulation: {} batches, {:.2}s",
515 total_batches,
516 duration.as_secs_f64()
517 );
518
519 Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" ⢠Raw data ā Clean ā Scale ā Feature selection");
525 println!(" ⢠Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 transformer.transform_chunk(&mut chunk)?;
544
545 let selecteddata = chunk.data.slice(ndarray::s![.., ..30]).to_owned();
547
548 processed_chunks += 1;
549 println!(
550 " Chunk {}: {} ā {} features",
551 processed_chunks,
552 chunk.n_features(),
553 selecteddata.ncols()
554 );
555
556 if chunk.is_last {
557 break;
558 }
559 }
560
561 println!(" ā
Preprocessing pipeline: {processed_chunks} chunks processed");
562
563 Ok(())
564}
565
566#[allow(dead_code)]
567fn simulate_model_evaluation() -> Result<(), Box<dyn std::error::Error>> {
568 println!(" ⢠Evaluate model on 1M test samples");
569 println!(" ⢠Compute accuracy in streaming fashion");
570
571 let config = StreamConfig {
572 chunk_size: 10_000,
573 buffer_size: 2,
574 max_chunks: Some(8),
575 ..Default::default()
576 };
577
578 let mut stream = stream_classification(1_000_000, 20, 5, config)?;
579 let mut correct_predictions = 0;
580 let mut total_predictions = 0;
581
582 while let Some(chunk) = stream.next_chunk()? {
583 if let Some(true_labels) = &chunk.target {
584 let predictions: Vec<f64> = (0..chunk.n_samples())
586 .map(|_| (rand::random::<f64>() * 5.0).floor())
587 .collect();
588
589 let chunk_correct = true_labels
591 .iter()
592 .zip(predictions.iter())
593 .filter(|(&true_label, &pred)| (true_label - pred).abs() < 0.5)
594 .count();
595
596 correct_predictions += chunk_correct;
597 total_predictions += chunk.n_samples();
598 }
599
600 if chunk.is_last {
601 break;
602 }
603 }
604
605 let accuracy = correct_predictions as f64 / total_predictions as f64;
606 println!(
607 " ā
Model evaluation: {:.1}% accuracy on {} samples",
608 accuracy * 100.0,
609 total_predictions
610 );
611
612 Ok(())
613}
614
615#[allow(dead_code)]
617fn get_memory_usage() -> f64 {
618 get_process_memory_usage().unwrap_or_else(|_| {
619 rand::random::<f64>() * 50.0 + 10.0 })
622}
623
624#[cfg(target_os = "linux")]
626#[allow(dead_code)]
627fn get_process_memory_usage() -> Result<f64, Box<dyn std::error::Error>> {
628 use std::fs;
629
630 let status = fs::read_to_string("/proc/self/status")?;
632
633 for line in status.lines() {
634 if line.starts_with("VmRSS:") {
635 let parts: Vec<&str> = line.split_whitespace().collect();
636 if parts.len() >= 2 {
637 let kb: f64 = parts[1].parse()?;
638 return Ok(kb / 1024.0); }
640 }
641 }
642
643 Err("VmRSS not found in /proc/self/status".into())
644}
645
646#[cfg(target_os = "macos")]
647#[allow(dead_code)]
648fn get_process_memory_usage() -> Result<f64, Box<dyn std::error::Error>> {
649 use std::mem;
650 use std::ptr;
651
652 extern "C" {
654 fn mach_task_self() -> u32;
655 fn task_info(
656 target_task: u32,
657 flavor: u32,
658 task_info_out: *mut u8,
659 task_info_outCnt: *mut u32,
660 ) -> i32;
661 }
662
663 const TASK_BASIC_INFO: u32 = 5;
664 const TASK_BASIC_INFO_COUNT: u32 = 5;
665
666 #[repr(C)]
667 struct TaskBasicInfo {
668 suspend_count: u32,
669 virtualsize: u32,
670 residentsize: u32,
671 user_time: u64,
672 system_time: u64,
673 }
674
675 unsafe {
676 let mut info = mem::zeroed::<TaskBasicInfo>();
677 let mut count = TASK_BASIC_INFO_COUNT;
678
679 let result = task_info(
680 mach_task_self(),
681 TASK_BASIC_INFO,
682 &mut info as *mut _ as *mut u8,
683 &mut count,
684 );
685
686 if result == 0 {
687 Ok(info.residentsize as f64 / (1024.0 * 1024.0)) } else {
689 Err(format!("task_info failed with code {}", result).into())
690 }
691 }
692}
693
694#[cfg(target_os = "windows")]
695#[allow(dead_code)]
696fn get_process_memory_usage() -> Result<f64, Box<dyn std::error::Error>> {
697 use std::mem;
698 use std::ptr;
699
700 extern "system" {
702 fn GetCurrentProcess() -> isize;
703 fn GetProcessMemoryInfo(
704 process: isize,
705 counters: *mut ProcessMemoryCounters,
706 cb: u32,
707 ) -> i32;
708 }
709
710 #[repr(C)]
711 struct ProcessMemoryCounters {
712 cb: u32,
713 page_fault_count: u32,
714 peak_working_setsize: usize,
715 working_setsize: usize,
716 quota_peak_paged_pool_usage: usize,
717 quota_paged_pool_usage: usize,
718 quota_peak_non_paged_pool_usage: usize,
719 quota_non_paged_pool_usage: usize,
720 pagefile_usage: usize,
721 peak_pagefile_usage: usize,
722 }
723
724 unsafe {
725 let mut counters = mem::zeroed::<ProcessMemoryCounters>();
726 counters.cb = mem::size_of::<ProcessMemoryCounters>() as u32;
727
728 let result = GetProcessMemoryInfo(GetCurrentProcess(), &mut counters, counters.cb);
729
730 if result != 0 {
731 Ok(counters.working_setsize as f64 / (1024.0 * 1024.0)) } else {
733 Err("GetProcessMemoryInfo failed".into())
734 }
735 }
736}
737
738#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
739#[allow(dead_code)]
740fn get_process_memory_usage() -> Result<f64, Box<dyn std::error::Error>> {
741 Err("Memory usage monitoring not implemented for this platform".into())
742}