pub struct DataChunk {
pub data: Array2<f64>,
pub target: Option<Array1<f64>>,
pub chunk_index: usize,
pub sample_indices: Vec<usize>,
pub is_last: bool,
}
Expand description
A chunk of data from a streaming dataset
Fields§
§data: Array2<f64>
Feature data for this chunk
target: Option<Array1<f64>>
Target values for this chunk (if available)
chunk_index: usize
Chunk index in the stream
sample_indices: Vec<usize>
Global sample indices for this chunk
is_last: bool
Whether this is the last chunk in the stream
Implementations§
Source§impl DataChunk
impl DataChunk
Sourcepub fn n_samples(&self) -> usize
pub fn n_samples(&self) -> usize
Number of samples in this chunk
Examples found in repository?
examples/datasets_streaming_demo.rs (line 77)
44fn demonstrate_basic_streaming() -> Result<(), Box<dyn std::error::Error>> {
45 println!("📊 BASIC STREAMING OPERATIONS");
46 println!("{}", "-".repeat(40));
47
48 // Configure streaming
49 let config = StreamConfig {
50 chunk_size: 1000, // 1K samples per chunk
51 buffer_size: 3, // Buffer 3 chunks
52 num_workers: 4, // Use 4 worker threads
53 memory_limit_mb: Some(100), // Limit to 100MB
54 enable_compression: false,
55 enable_prefetch: true,
56 max_chunks: Some(10), // Process only 10 chunks for demo
57 };
58
59 println!("Streaming Configuration:");
60 println!(" Chunk size: {} samples", config.chunk_size);
61 println!(" Buffer size: {} chunks", config.buffer_size);
62 println!(" Workers: {}", config.num_workers);
63 println!(" Memory limit: {:?} MB", config.memory_limit_mb);
64 println!(" Max chunks: {:?}", config.max_chunks);
65
66 // Create streaming classification dataset
67 println!("\nStreaming synthetic classification data...");
68 let mut stream = stream_classification(100_000, 20, 5, config.clone())?;
69
70 let mut total_samples = 0;
71 let mut chunk_count = 0;
72 let mut class_distribution: HashMap<i32, usize> = HashMap::new();
73
74 let start_time = Instant::now();
75
76 while let Some(chunk) = stream.next_chunk()? {
77 total_samples += chunk.n_samples();
78 chunk_count += 1;
79
80 // Analyze this chunk
81 if let Some(target) = &chunk.target {
82 for &class in target.iter() {
83 *class_distribution.entry(class as i32).or_insert(0) += 1;
84 }
85 }
86
87 // Print progress
88 let stats = stream.stats();
89 if let Some(progress) = stats.progress_percent() {
90 println!(
91 " Chunk {}: {} samples (Progress: {:.1}%, Buffer: {:.1}%)",
92 chunk.chunk_index + 1,
93 chunk.n_samples(),
94 progress,
95 stats.buffer_utilization()
96 );
97 } else {
98 println!(
99 " Chunk {}: {} samples (Buffer: {:.1}%)",
100 chunk.chunk_index + 1,
101 chunk.n_samples(),
102 stats.buffer_utilization()
103 );
104 }
105
106 // Simulate processing time
107 std::thread::sleep(std::time::Duration::from_millis(50));
108
109 if chunk.is_last {
110 println!(" 📋 Reached last chunk");
111 break;
112 }
113 }
114
115 let duration = start_time.elapsed();
116
117 println!("\nStreaming Results:");
118 println!(" Total chunks processed: {chunk_count}");
119 println!(" Total samples: {total_samples}");
120 println!(" Processing time: {:.2}s", duration.as_secs_f64());
121 println!(
122 " Throughput: {:.1} samples/s",
123 total_samples as f64 / duration.as_secs_f64()
124 );
125 println!(" Class distribution: {class_distribution:?}");
126
127 println!();
128 Ok(())
129}
130
131#[allow(dead_code)]
132fn demonstrate_memory_efficient_processing() -> Result<(), Box<dyn std::error::Error>> {
133 println!("💾 MEMORY-EFFICIENT PROCESSING");
134 println!("{}", "-".repeat(40));
135
136 // Compare memory usage: streaming vs. in-memory
137 let datasetsize = 50_000;
138 let n_features = 50;
139
140 println!("Comparing memory usage for {datasetsize} samples with {n_features} features");
141
142 // In-memory approach (for comparison)
143 println!("\n1. In-memory approach:");
144 let start_mem = get_memory_usage();
145 let start_time = Instant::now();
146
147 let in_memorydataset = make_classification(datasetsize, n_features, 5, 2, 25, Some(42))?;
148 let (train, test) = train_test_split(&in_memorydataset, 0.2, Some(42))?;
149
150 let in_memory_time = start_time.elapsed();
151 let in_memory_mem = get_memory_usage() - start_mem;
152
153 println!(" Time: {:.2}s", in_memory_time.as_secs_f64());
154 println!(" Memory usage: ~{in_memory_mem:.1} MB");
155 println!(" Train samples: {}", train.n_samples());
156 println!(" Test samples: {}", test.n_samples());
157
158 // Streaming approach
159 println!("\n2. Streaming approach:");
160 let stream_start_time = Instant::now();
161 let stream_start_mem = get_memory_usage();
162
163 let config = StreamConfig {
164 chunk_size: 5_000, // Smaller chunks for memory efficiency
165 buffer_size: 2, // Smaller buffer
166 num_workers: 2,
167 memory_limit_mb: Some(50),
168 ..Default::default()
169 };
170
171 let mut stream = stream_classification(datasetsize, n_features, 5, config)?;
172
173 let mut total_processed = 0;
174 let mut train_samples = 0;
175 let mut test_samples = 0;
176
177 while let Some(chunk) = stream.next_chunk()? {
178 total_processed += chunk.n_samples();
179
180 // Simulate train/test split on chunk level
181 let chunk_trainsize = (chunk.n_samples() as f64 * 0.8) as usize;
182 train_samples += chunk_trainsize;
183 test_samples += chunk.n_samples() - chunk_trainsize;
184
185 // Process chunk (simulate some computation)
186 let _mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
187 let _std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
188
189 if chunk.is_last {
190 break;
191 }
192 }
193
194 let stream_time = stream_start_time.elapsed();
195 let stream_mem = get_memory_usage() - stream_start_mem;
196
197 println!(" Time: {:.2}s", stream_time.as_secs_f64());
198 println!(" Memory usage: ~{stream_mem:.1} MB");
199 println!(" Train samples: {train_samples}");
200 println!(" Test samples: {test_samples}");
201 println!(" Total processed: {total_processed}");
202
203 // Comparison
204 println!("\n3. Comparison:");
205 println!(
206 " Memory savings: {:.1}x less memory",
207 in_memory_mem / stream_mem.max(1.0)
208 );
209 println!(
210 " Time overhead: {:.1}x",
211 stream_time.as_secs_f64() / in_memory_time.as_secs_f64()
212 );
213 println!(" Streaming is beneficial for large datasets that don't fit in memory");
214
215 println!();
216 Ok(())
217}
218
219#[allow(dead_code)]
220fn demonstrate_stream_transformations() -> Result<(), Box<dyn std::error::Error>> {
221 println!("🔄 STREAM TRANSFORMATIONS");
222 println!("{}", "-".repeat(40));
223
224 // Create a transformer pipeline
225 let transformer = StreamTransformer::new()
226 .add_standard_scaling()
227 .add_missing_value_imputation();
228
229 println!("Created transformation pipeline:");
230 println!(" 1. Standard scaling (z-score normalization)");
231 println!(" 2. Missing value imputation");
232
233 let config = StreamConfig {
234 chunk_size: 2000,
235 buffer_size: 2,
236 max_chunks: Some(5),
237 ..Default::default()
238 };
239
240 let mut stream = stream_regression(10_000, 15, config)?;
241 let mut transformed_chunks = 0;
242
243 println!("\nProcessing and transforming chunks...");
244
245 while let Some(mut chunk) = stream.next_chunk()? {
246 println!(" Processing chunk {}", chunk.chunk_index + 1);
247
248 // Show statistics before transformation
249 let data_mean_before = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
250 let data_std_before = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
251
252 println!(
253 " Before: mean = {:.3}, std = {:.3}",
254 data_mean_before[0], data_std_before[0]
255 );
256
257 // Apply transformations
258 transformer.transform_chunk(&mut chunk)?;
259
260 // Show statistics after transformation
261 let data_mean_after = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
262 let data_std_after = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
263
264 println!(
265 " After: mean = {:.3}, std = {:.3}",
266 data_mean_after[0], data_std_after[0]
267 );
268
269 transformed_chunks += 1;
270
271 if chunk.is_last {
272 break;
273 }
274 }
275
276 println!("\nTransformation Summary:");
277 println!(" Chunks processed: {transformed_chunks}");
278 println!(" Each chunk was transformed independently");
279 println!(" Memory-efficient: only one chunk in memory at a time");
280
281 println!();
282 Ok(())
283}
284
285#[allow(dead_code)]
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("⚡ PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 // Create a simple processor that computes statistics
304 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 // Define a processing function
307 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 // Compute basic statistics
314 let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 // Simulate some computation time
323 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 // For demonstration, we'll process chunks sequentially with timing
334 // In a real implementation, you'd use the processor.process_parallel method
335 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 // Process chunk
344 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 // Aggregate statistics
373 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
387
388#[allow(dead_code)]
389fn demonstrate_performance_comparison() -> Result<(), Box<dyn std::error::Error>> {
390 println!("📊 PERFORMANCE COMPARISON");
391 println!("{}", "-".repeat(40));
392
393 let dataset_sizes = vec![10_000, 50_000, 100_000];
394 let chunk_sizes = vec![1_000, 5_000, 10_000];
395
396 println!("Comparing streaming performance across different configurations:");
397 println!();
398
399 for &datasetsize in &dataset_sizes {
400 println!("Dataset size: {datasetsize} samples");
401
402 for &chunksize in &chunk_sizes {
403 let config = StreamConfig {
404 chunk_size: chunksize,
405 buffer_size: 3,
406 num_workers: 2,
407 max_chunks: Some(datasetsize / chunksize),
408 ..Default::default()
409 };
410
411 let start_time = Instant::now();
412 let mut stream = stream_regression(datasetsize, 20, config)?;
413
414 let mut processed_samples = 0;
415 let mut processed_chunks = 0;
416
417 while let Some(chunk) = stream.next_chunk()? {
418 processed_samples += chunk.n_samples();
419 processed_chunks += 1;
420
421 // Simulate minimal processing
422 let _stats = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0));
423
424 if chunk.is_last || processed_samples >= datasetsize {
425 break;
426 }
427 }
428
429 let duration = start_time.elapsed();
430 let throughput = processed_samples as f64 / duration.as_secs_f64();
431
432 println!(
433 " Chunk size {}: {:.2}s ({:.1} samples/s, {} chunks)",
434 chunksize,
435 duration.as_secs_f64(),
436 throughput,
437 processed_chunks
438 );
439 }
440 println!();
441 }
442
443 println!("Performance Insights:");
444 println!(" • Larger chunks = fewer iterations, better throughput");
445 println!(" • Smaller chunks = lower memory usage, more responsive");
446 println!(" • Optimal chunk size depends on memory constraints and processing complexity");
447
448 println!();
449 Ok(())
450}
451
452#[allow(dead_code)]
453fn demonstrate_real_world_scenarios() -> Result<(), Box<dyn std::error::Error>> {
454 println!("🌍 REAL-WORLD STREAMING SCENARIOS");
455 println!("{}", "-".repeat(40));
456
457 // Scenario 1: Training on large dataset with limited memory
458 println!("Scenario 1: Large dataset training with memory constraints");
459 simulate_training_scenario()?;
460
461 // Scenario 2: Data preprocessing pipeline
462 println!("\nScenario 2: Data preprocessing pipeline");
463 simulate_preprocessing_pipeline()?;
464
465 // Scenario 3: Model evaluation on large test set
466 println!("\nScenario 3: Model evaluation on large test set");
467 simulate_model_evaluation()?;
468
469 println!();
470 Ok(())
471}
472
473#[allow(dead_code)]
474fn simulate_training_scenario() -> Result<(), Box<dyn std::error::Error>> {
475 println!(" • Dataset: 500K samples, 100 features");
476 println!(" • Memory limit: 200MB");
477 println!(" • Goal: Train incrementally using mini-batches");
478
479 let config = StreamConfig {
480 chunk_size: 5_000, // Mini-batch size
481 buffer_size: 2, // Keep memory low
482 memory_limit_mb: Some(200),
483 max_chunks: Some(10), // Simulate partial processing
484 ..Default::default()
485 };
486
487 let mut stream = stream_classification(500_000, 100, 10, config)?;
488 let mut total_batches = 0;
489 let mut total_samples = 0;
490
491 let start_time = Instant::now();
492
493 while let Some(chunk) = stream.next_chunk()? {
494 // Simulate training on mini-batch
495 let batchsize = chunk.n_samples();
496
497 // Simulate gradient computation time
498 std::thread::sleep(std::time::Duration::from_millis(20));
499
500 total_batches += 1;
501 total_samples += batchsize;
502
503 if total_batches % 3 == 0 {
504 println!(" Processed {total_batches} batches ({total_samples} samples)");
505 }
506
507 if chunk.is_last {
508 break;
509 }
510 }
511
512 let duration = start_time.elapsed();
513 println!(
514 " ✅ Training simulation: {} batches, {:.2}s",
515 total_batches,
516 duration.as_secs_f64()
517 );
518
519 Ok(())
520}
521
522#[allow(dead_code)]
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" • Raw data → Clean → Scale → Feature selection");
525 println!(" • Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 // Step 1: Clean data (remove outliers, handle missing values)
543 transformer.transform_chunk(&mut chunk)?;
544
545 // Step 2: Feature selection (simulate by keeping first 30 features)
546 let selecteddata = chunk
547 .data
548 .slice(scirs2_core::ndarray::s![.., ..30])
549 .to_owned();
550
551 processed_chunks += 1;
552 println!(
553 " Chunk {}: {} → {} features",
554 processed_chunks,
555 chunk.n_features(),
556 selecteddata.ncols()
557 );
558
559 if chunk.is_last {
560 break;
561 }
562 }
563
564 println!(" ✅ Preprocessing pipeline: {processed_chunks} chunks processed");
565
566 Ok(())
567}
568
569#[allow(dead_code)]
570fn simulate_model_evaluation() -> Result<(), Box<dyn std::error::Error>> {
571 println!(" • Evaluate model on 1M test samples");
572 println!(" • Compute accuracy in streaming fashion");
573
574 let config = StreamConfig {
575 chunk_size: 10_000,
576 buffer_size: 2,
577 max_chunks: Some(8),
578 ..Default::default()
579 };
580
581 let mut stream = stream_classification(1_000_000, 20, 5, config)?;
582 let mut correct_predictions = 0;
583 let mut total_predictions = 0;
584
585 while let Some(chunk) = stream.next_chunk()? {
586 if let Some(true_labels) = &chunk.target {
587 // Simulate model predictions (random for demo)
588 let predictions: Vec<f64> = (0..chunk.n_samples())
589 .map(|_| (scirs2_core::random::random::<f64>() * 5.0).floor())
590 .collect();
591
592 // Calculate accuracy for this chunk
593 let chunk_correct = true_labels
594 .iter()
595 .zip(predictions.iter())
596 .filter(|(&true_label, &pred)| (true_label - pred).abs() < 0.5)
597 .count();
598
599 correct_predictions += chunk_correct;
600 total_predictions += chunk.n_samples();
601 }
602
603 if chunk.is_last {
604 break;
605 }
606 }
607
608 let accuracy = correct_predictions as f64 / total_predictions as f64;
609 println!(
610 " ✅ Model evaluation: {:.1}% accuracy on {} samples",
611 accuracy * 100.0,
612 total_predictions
613 );
614
615 Ok(())
616}
Sourcepub fn n_features(&self) -> usize
pub fn n_features(&self) -> usize
Number of features in this chunk
Examples found in repository?
examples/datasets_streaming_demo.rs (line 555)
523fn simulate_preprocessing_pipeline() -> Result<(), Box<dyn std::error::Error>> {
524 println!(" • Raw data → Clean → Scale → Feature selection");
525 println!(" • Process 200K samples in chunks");
526
527 let config = StreamConfig {
528 chunk_size: 8_000,
529 buffer_size: 3,
530 max_chunks: Some(5),
531 ..Default::default()
532 };
533
534 let transformer = StreamTransformer::new()
535 .add_missing_value_imputation()
536 .add_standard_scaling();
537
538 let mut stream = stream_regression(200_000, 50, config)?;
539 let mut processed_chunks = 0;
540
541 while let Some(mut chunk) = stream.next_chunk()? {
542 // Step 1: Clean data (remove outliers, handle missing values)
543 transformer.transform_chunk(&mut chunk)?;
544
545 // Step 2: Feature selection (simulate by keeping first 30 features)
546 let selecteddata = chunk
547 .data
548 .slice(scirs2_core::ndarray::s![.., ..30])
549 .to_owned();
550
551 processed_chunks += 1;
552 println!(
553 " Chunk {}: {} → {} features",
554 processed_chunks,
555 chunk.n_features(),
556 selecteddata.ncols()
557 );
558
559 if chunk.is_last {
560 break;
561 }
562 }
563
564 println!(" ✅ Preprocessing pipeline: {processed_chunks} chunks processed");
565
566 Ok(())
567}
Sourcepub fn to_dataset(&self) -> Dataset
pub fn to_dataset(&self) -> Dataset
Convert chunk to a Dataset
Trait Implementations§
Auto Trait Implementations§
impl Freeze for DataChunk
impl RefUnwindSafe for DataChunk
impl Send for DataChunk
impl Sync for DataChunk
impl Unpin for DataChunk
impl UnwindSafe for DataChunk
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
Source§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
The inverse inclusion map: attempts to construct
self
from the equivalent element of its
superset. Read moreSource§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
Checks if
self
is actually part of its subset T
(and can be converted to it).Source§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
Use with care! Same as
self.to_subset
but without any property checks. Always succeeds.Source§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
The inclusion map: converts
self
to the equivalent element of its superset.