StreamStats

Struct StreamStats 

Source
pub struct StreamStats {
    pub current_chunk: usize,
    pub total_chunks: Option<usize>,
    pub buffer_size: usize,
    pub buffer_capacity: usize,
    pub finished: bool,
}
Expand description

Statistics about streaming operation

Fields§

§current_chunk: usize

Current chunk being processed

§total_chunks: Option<usize>

Total number of chunks (if known)

§buffer_size: usize

Number of chunks currently buffered

§buffer_capacity: usize

Maximum buffer capacity

§finished: bool

Whether streaming is finished

Implementations§

Source§

impl StreamStats

Source

pub fn progress_percent(&self) -> Option<f64>

Get progress as a percentage (if total is known)

Examples found in repository?
examples/datasets_streaming_demo.rs (line 89)
44fn demonstrate_basic_streaming() -> Result<(), Box<dyn std::error::Error>> {
45    println!("📊 BASIC STREAMING OPERATIONS");
46    println!("{}", "-".repeat(40));
47
48    // Configure streaming
49    let config = StreamConfig {
50        chunk_size: 1000,           // 1K samples per chunk
51        buffer_size: 3,             // Buffer 3 chunks
52        num_workers: 4,             // Use 4 worker threads
53        memory_limit_mb: Some(100), // Limit to 100MB
54        enable_compression: false,
55        enable_prefetch: true,
56        max_chunks: Some(10), // Process only 10 chunks for demo
57    };
58
59    println!("Streaming Configuration:");
60    println!("  Chunk size: {} samples", config.chunk_size);
61    println!("  Buffer size: {} chunks", config.buffer_size);
62    println!("  Workers: {}", config.num_workers);
63    println!("  Memory limit: {:?} MB", config.memory_limit_mb);
64    println!("  Max chunks: {:?}", config.max_chunks);
65
66    // Create streaming classification dataset
67    println!("\nStreaming synthetic classification data...");
68    let mut stream = stream_classification(100_000, 20, 5, config.clone())?;
69
70    let mut total_samples = 0;
71    let mut chunk_count = 0;
72    let mut class_distribution: HashMap<i32, usize> = HashMap::new();
73
74    let start_time = Instant::now();
75
76    while let Some(chunk) = stream.next_chunk()? {
77        total_samples += chunk.n_samples();
78        chunk_count += 1;
79
80        // Analyze this chunk
81        if let Some(target) = &chunk.target {
82            for &class in target.iter() {
83                *class_distribution.entry(class as i32).or_insert(0) += 1;
84            }
85        }
86
87        // Print progress
88        let stats = stream.stats();
89        if let Some(progress) = stats.progress_percent() {
90            println!(
91                "  Chunk {}: {} samples (Progress: {:.1}%, Buffer: {:.1}%)",
92                chunk.chunk_index + 1,
93                chunk.n_samples(),
94                progress,
95                stats.buffer_utilization()
96            );
97        } else {
98            println!(
99                "  Chunk {}: {} samples (Buffer: {:.1}%)",
100                chunk.chunk_index + 1,
101                chunk.n_samples(),
102                stats.buffer_utilization()
103            );
104        }
105
106        // Simulate processing time
107        std::thread::sleep(std::time::Duration::from_millis(50));
108
109        if chunk.is_last {
110            println!("  📋 Reached last chunk");
111            break;
112        }
113    }
114
115    let duration = start_time.elapsed();
116
117    println!("\nStreaming Results:");
118    println!("  Total chunks processed: {chunk_count}");
119    println!("  Total samples: {total_samples}");
120    println!("  Processing time: {:.2}s", duration.as_secs_f64());
121    println!(
122        "  Throughput: {:.1} samples/s",
123        total_samples as f64 / duration.as_secs_f64()
124    );
125    println!("  Class distribution: {class_distribution:?}");
126
127    println!();
128    Ok(())
129}
Source

pub fn buffer_utilization(&self) -> f64

Get buffer utilization as a percentage

Examples found in repository?
examples/datasets_streaming_demo.rs (line 95)
44fn demonstrate_basic_streaming() -> Result<(), Box<dyn std::error::Error>> {
45    println!("📊 BASIC STREAMING OPERATIONS");
46    println!("{}", "-".repeat(40));
47
48    // Configure streaming
49    let config = StreamConfig {
50        chunk_size: 1000,           // 1K samples per chunk
51        buffer_size: 3,             // Buffer 3 chunks
52        num_workers: 4,             // Use 4 worker threads
53        memory_limit_mb: Some(100), // Limit to 100MB
54        enable_compression: false,
55        enable_prefetch: true,
56        max_chunks: Some(10), // Process only 10 chunks for demo
57    };
58
59    println!("Streaming Configuration:");
60    println!("  Chunk size: {} samples", config.chunk_size);
61    println!("  Buffer size: {} chunks", config.buffer_size);
62    println!("  Workers: {}", config.num_workers);
63    println!("  Memory limit: {:?} MB", config.memory_limit_mb);
64    println!("  Max chunks: {:?}", config.max_chunks);
65
66    // Create streaming classification dataset
67    println!("\nStreaming synthetic classification data...");
68    let mut stream = stream_classification(100_000, 20, 5, config.clone())?;
69
70    let mut total_samples = 0;
71    let mut chunk_count = 0;
72    let mut class_distribution: HashMap<i32, usize> = HashMap::new();
73
74    let start_time = Instant::now();
75
76    while let Some(chunk) = stream.next_chunk()? {
77        total_samples += chunk.n_samples();
78        chunk_count += 1;
79
80        // Analyze this chunk
81        if let Some(target) = &chunk.target {
82            for &class in target.iter() {
83                *class_distribution.entry(class as i32).or_insert(0) += 1;
84            }
85        }
86
87        // Print progress
88        let stats = stream.stats();
89        if let Some(progress) = stats.progress_percent() {
90            println!(
91                "  Chunk {}: {} samples (Progress: {:.1}%, Buffer: {:.1}%)",
92                chunk.chunk_index + 1,
93                chunk.n_samples(),
94                progress,
95                stats.buffer_utilization()
96            );
97        } else {
98            println!(
99                "  Chunk {}: {} samples (Buffer: {:.1}%)",
100                chunk.chunk_index + 1,
101                chunk.n_samples(),
102                stats.buffer_utilization()
103            );
104        }
105
106        // Simulate processing time
107        std::thread::sleep(std::time::Duration::from_millis(50));
108
109        if chunk.is_last {
110            println!("  📋 Reached last chunk");
111            break;
112        }
113    }
114
115    let duration = start_time.elapsed();
116
117    println!("\nStreaming Results:");
118    println!("  Total chunks processed: {chunk_count}");
119    println!("  Total samples: {total_samples}");
120    println!("  Processing time: {:.2}s", duration.as_secs_f64());
121    println!(
122        "  Throughput: {:.1} samples/s",
123        total_samples as f64 / duration.as_secs_f64()
124    );
125    println!("  Class distribution: {class_distribution:?}");
126
127    println!();
128    Ok(())
129}

Trait Implementations§

Source§

impl Clone for StreamStats

Source§

fn clone(&self) -> StreamStats

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for StreamStats

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<SS, SP> SupersetOf<SS> for SP
where SS: SubsetOf<SP>,

Source§

fn to_subset(&self) -> Option<SS>

The inverse inclusion map: attempts to construct self from the equivalent element of its superset. Read more
Source§

fn is_in_subset(&self) -> bool

Checks if self is actually part of its subset T (and can be converted to it).
Source§

fn to_subset_unchecked(&self) -> SS

Use with care! Same as self.to_subset but without any property checks. Always succeeds.
Source§

fn from_subset(element: &SS) -> SP

The inclusion map: converts self to the equivalent element of its superset.
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V