StreamProcessor

Struct StreamProcessor 

Source
pub struct StreamProcessor<T> { /* private fields */ }
Expand description

Parallel streaming processor for applying operations to chunks

Implementations§

Source§

impl<T> StreamProcessor<T>
where T: Send + Sync + 'static,

Source

pub fn new(config: StreamConfig) -> Self

Create a new stream processor

Examples found in repository?
examples/datasets_streaming_demo.rs (line 304)
286fn demonstrate_parallel_processing() -> Result<(), Box<dyn std::error::Error>> {
287    println!("⚡ PARALLEL STREAM PROCESSING");
288    println!("{}", "-".repeat(40));
289
290    let config = StreamConfig {
291        chunk_size: 1500,
292        buffer_size: 4,
293        num_workers: 4,
294        max_chunks: Some(8),
295        ..Default::default()
296    };
297
298    println!("Parallel processing configuration:");
299    println!("  Workers: {}", config.num_workers);
300    println!("  Chunk size: {}", config.chunk_size);
301    println!("  Buffer size: {}", config.buffer_size);
302
303    // Create a simple processor that computes statistics
304    let _processor: StreamProcessor<DataChunk> = StreamProcessor::new(config.clone());
305
306    // Define a processing function
307    let compute_stats = |chunk: DataChunk| -> Result<
308        HashMap<String, f64>,
309        Box<dyn std::error::Error + Send + Sync>,
310    > {
311        let mut stats = HashMap::new();
312
313        // Compute basic statistics
314        let mean = chunk.data.mean_axis(scirs2_core::ndarray::Axis(0)).unwrap();
315        let std = chunk.data.std_axis(scirs2_core::ndarray::Axis(0), 0.0);
316
317        stats.insert("mean_feature_0".to_string(), mean[0]);
318        stats.insert("std_feature_0".to_string(), std[0]);
319        stats.insert("n_samples".to_string(), chunk.n_samples() as f64);
320        stats.insert("chunk_index".to_string(), chunk.chunk_index as f64);
321
322        // Simulate some computation time
323        std::thread::sleep(std::time::Duration::from_millis(100));
324
325        Ok(stats)
326    };
327
328    println!("\nProcessing stream with parallel workers...");
329    let start_time = Instant::now();
330
331    let stream = stream_classification(12_000, 10, 3, config)?;
332
333    // For demonstration, we'll process chunks sequentially with timing
334    // In a real implementation, you'd use the processor.process_parallel method
335    let mut stream_iter = stream;
336    let mut chunk_results = Vec::new();
337
338    while let Some(chunk) = stream_iter.next_chunk()? {
339        let chunk_start = Instant::now();
340        let chunk_id = chunk.chunk_index;
341        let chunk_samples = chunk.n_samples();
342
343        // Process chunk
344        let stats = compute_stats(chunk)
345            .map_err(|e| -> Box<dyn std::error::Error> { Box::new(std::io::Error::other(e)) })?;
346        let chunk_time = chunk_start.elapsed();
347
348        println!(
349            "  Chunk {}: {} samples, {:.2}ms",
350            chunk_id + 1,
351            chunk_samples,
352            chunk_time.as_millis()
353        );
354
355        chunk_results.push(stats);
356
357        if chunk_results.len() >= 8 {
358            break;
359        }
360    }
361
362    let total_time = start_time.elapsed();
363
364    println!("\nParallel Processing Results:");
365    println!("  Total chunks: {}", chunk_results.len());
366    println!("  Total time: {:.2}s", total_time.as_secs_f64());
367    println!(
368        "  Average time per chunk: {:.2}ms",
369        total_time.as_millis() as f64 / chunk_results.len() as f64
370    );
371
372    // Aggregate statistics
373    let total_samples: f64 = chunk_results
374        .iter()
375        .map(|stats| stats.get("n_samples").unwrap_or(&0.0))
376        .sum();
377
378    println!("  Total samples processed: {total_samples}");
379    println!(
380        "  Throughput: {:.1} samples/s",
381        total_samples / total_time.as_secs_f64()
382    );
383
384    println!();
385    Ok(())
386}
Source

pub fn process_parallel<F, R>( &self, iterator: StreamingIterator, processor: F, ) -> Result<Vec<R>>
where F: Fn(DataChunk) -> Result<R> + Send + Sync + Clone + 'static, R: Send + 'static,

Process chunks in parallel using a custom function

Auto Trait Implementations§

§

impl<T> Freeze for StreamProcessor<T>

§

impl<T> RefUnwindSafe for StreamProcessor<T>
where T: RefUnwindSafe,

§

impl<T> Send for StreamProcessor<T>
where T: Send,

§

impl<T> Sync for StreamProcessor<T>
where T: Sync,

§

impl<T> Unpin for StreamProcessor<T>
where T: Unpin,

§

impl<T> UnwindSafe for StreamProcessor<T>
where T: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<SS, SP> SupersetOf<SS> for SP
where SS: SubsetOf<SP>,

Source§

fn to_subset(&self) -> Option<SS>

The inverse inclusion map: attempts to construct self from the equivalent element of its superset. Read more
Source§

fn is_in_subset(&self) -> bool

Checks if self is actually part of its subset T (and can be converted to it).
Source§

fn to_subset_unchecked(&self) -> SS

Use with care! Same as self.to_subset but without any property checks. Always succeeds.
Source§

fn from_subset(element: &SS) -> SP

The inclusion map: converts self to the equivalent element of its superset.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V