pub struct StreamProcessor<T> { /* private fields */ }
Expand description
Parallel streaming processor for applying operations to chunks
Implementations§
Source§impl<T> StreamProcessor<T>
impl<T> StreamProcessor<T>
Sourcepub fn new(config: StreamConfig) -> Self
pub fn new(config: StreamConfig) -> Self
Create a new stream processor
Examples found in repository?
examples/datasets_streaming_demo.rs (line 304)
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287 println!("⚡ PARALLEL STREAM PROCESSING");
288 println!("{}", "-".repeat(40));
289
290 let config = StreamConfig {
291 chunk_size: 1500,
292 buffer_size: 4,
293 num_workers: 4,
294 max_chunks: Some(8),
295 ..Default::default()
296 };
297
298 println!("Parallel processing configuration:");
299 println!(" Workers: {}", config.num_workers);
300 println!(" Chunk size: {}", config.chunk_size);
301 println!(" Buffer size: {}", config.buffer_size);
302
303 // Create a simple processor that computes statistics
304 let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306 // Define a processing function
307 let compute_stats = |chunk: DataChunk| -> Result<
308 HashMap<String, f64>,
309 Box<dyn std::error::Error + Send + Sync>,
310 > {
311 let mut stats = HashMap::new();
312
313 // Compute basic statistics
314 let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315 let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317 stats.insert("mean_feature_0".to_string(), mean[0]);
318 stats.insert("std_feature_0".to_string(), std[0]);
319 stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320 stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322 // Simulate some computation time
323 std::thread::sleep(std::time::Duration::from_millis(100));
324
325 Ok(stats)
326 };
327
328 println!("\nProcessing stream with parallel workers...");
329 let start_time = Instant::now();
330
331 let stream = stream_classification(12_000, 10, 3, config)?;
332
333 // For demonstration, we'll process chunks sequentially with timing
334 // In a real implementation, you'd use the processor.process_parallel method
335 let mut stream_iter = stream;
336 let mut chunk_results = Vec::new();
337
338 while let Some(chunk) = stream_iter.next_chunk()? {
339 let chunk_start = Instant::now();
340 let chunk_id = chunk.chunk_index;
341 let chunk_samples = chunk.n_samples();
342
343 // Process chunk
344 let stats = compute_stats(chunk)
345 .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346 let chunk_time = chunk_start.elapsed();
347
348 println!(
349 " Chunk {}: {} samples, {:.2}ms",
350 chunk_id + 1,
351 chunk_samples,
352 chunk_time.as_millis()
353 );
354
355 chunk_results.push(stats);
356
357 if chunk_results.len() >= 8 {
358 break;
359 }
360 }
361
362 let total_time = start_time.elapsed();
363
364 println!("\nParallel Processing Results:");
365 println!(" Total chunks: {}", chunk_results.len());
366 println!(" Total time: {:.2}s", total_time.as_secs_f64());
367 println!(
368 " Average time per chunk: {:.2}ms",
369 total_time.as_millis() as f64 / chunk_results.len() as f64
370 );
371
372 // Aggregate statistics
373 let total_samples: f64 = chunk_results
374 .iter()
375 .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376 .sum();
377
378 println!(" Total samples processed: {total_samples}");
379 println!(
380 " Throughput: {:.1} samples/s",
381 total_samples / total_time.as_secs_f64()
382 );
383
384 println!();
385 Ok(())
386}
Auto Trait Implementations§
impl<T> Freeze for StreamProcessor<T>
impl<T> RefUnwindSafe for StreamProcessor<T>where
T: RefUnwindSafe,
impl<T> Send for StreamProcessor<T>where
T: Send,
impl<T> Sync for StreamProcessor<T>where
T: Sync,
impl<T> Unpin for StreamProcessor<T>where
T: Unpin,
impl<T> UnwindSafe for StreamProcessor<T>where
T: UnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
Source§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
The inverse inclusion map: attempts to construct
self
from the equivalent element of its
superset. Read moreSource§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
Checks if
self
is actually part of its subset T
(and can be converted to it).Source§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
Use with care! Same as
self.to_subset
but without any property checks. Always succeeds.Source§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
The inclusion map: converts
self
to the equivalent element of its superset.