pub struct StreamStats {
pub current_chunk: usize,
pub total_chunks: Option<usize>,
pub buffer_size: usize,
pub buffer_capacity: usize,
pub finished: bool,
}
Expand description
Statistics about streaming operation
Fields§
§current_chunk: usize
Current chunk being processed
total_chunks: Option<usize>
Total number of chunks (if known)
buffer_size: usize
Number of chunks currently buffered
buffer_capacity: usize
Maximum buffer capacity
finished: bool
Whether streaming is finished
Implementations§
Source§impl StreamStats
impl StreamStats
Sourcepub fn progress_percent(&self) -> Option<f64>
pub fn progress_percent(&self) -> Option<f64>
Get progress as a percentage (if total is known)
Examples found in repository?
examples/datasets_streaming_demo.rs (line 89)
44fn demonstrate_basic_streaming() -> Result<(), Box<dyn std::error::Error>> {
45 println!("📊 BASIC STREAMING OPERATIONS");
46 println!("{}", "-".repeat(40));
47
48 // Configure streaming
49 let config = StreamConfig {
50 chunk_size: 1000, // 1K samples per chunk
51 buffer_size: 3, // Buffer 3 chunks
52 num_workers: 4, // Use 4 worker threads
53 memory_limit_mb: Some(100), // Limit to 100MB
54 enable_compression: false,
55 enable_prefetch: true,
56 max_chunks: Some(10), // Process only 10 chunks for demo
57 };
58
59 println!("Streaming Configuration:");
60 println!(" Chunk size: {} samples", config.chunk_size);
61 println!(" Buffer size: {} chunks", config.buffer_size);
62 println!(" Workers: {}", config.num_workers);
63 println!(" Memory limit: {:?} MB", config.memory_limit_mb);
64 println!(" Max chunks: {:?}", config.max_chunks);
65
66 // Create streaming classification dataset
67 println!("\nStreaming synthetic classification data...");
68 let mut stream = stream_classification(100_000, 20, 5, config.clone())?;
69
70 let mut total_samples = 0;
71 let mut chunk_count = 0;
72 let mut class_distribution: HashMap<i32, usize> = HashMap::new();
73
74 let start_time = Instant::now();
75
76 while let Some(chunk) = stream.next_chunk()? {
77 total_samples += chunk.n_samples();
78 chunk_count += 1;
79
80 // Analyze this chunk
81 if let Some(target) = &chunk.target {
82 for &class in target.iter() {
83 *class_distribution.entry(class as i32).or_insert(0) += 1;
84 }
85 }
86
87 // Print progress
88 let stats = stream.stats();
89 if let Some(progress) = stats.progress_percent() {
90 println!(
91 " Chunk {}: {} samples (Progress: {:.1}%, Buffer: {:.1}%)",
92 chunk.chunk_index + 1,
93 chunk.n_samples(),
94 progress,
95 stats.buffer_utilization()
96 );
97 } else {
98 println!(
99 " Chunk {}: {} samples (Buffer: {:.1}%)",
100 chunk.chunk_index + 1,
101 chunk.n_samples(),
102 stats.buffer_utilization()
103 );
104 }
105
106 // Simulate processing time
107 std::thread::sleep(std::time::Duration::from_millis(50));
108
109 if chunk.is_last {
110 println!(" 📋 Reached last chunk");
111 break;
112 }
113 }
114
115 let duration = start_time.elapsed();
116
117 println!("\nStreaming Results:");
118 println!(" Total chunks processed: {chunk_count}");
119 println!(" Total samples: {total_samples}");
120 println!(" Processing time: {:.2}s", duration.as_secs_f64());
121 println!(
122 " Throughput: {:.1} samples/s",
123 total_samples as f64 / duration.as_secs_f64()
124 );
125 println!(" Class distribution: {class_distribution:?}");
126
127 println!();
128 Ok(())
129}
Sourcepub fn buffer_utilization(&self) -> f64
pub fn buffer_utilization(&self) -> f64
Get buffer utilization as a percentage
Examples found in repository?
examples/datasets_streaming_demo.rs (line 95)
44fn demonstrate_basic_streaming() -> Result<(), Box<dyn std::error::Error>> {
45 println!("📊 BASIC STREAMING OPERATIONS");
46 println!("{}", "-".repeat(40));
47
48 // Configure streaming
49 let config = StreamConfig {
50 chunk_size: 1000, // 1K samples per chunk
51 buffer_size: 3, // Buffer 3 chunks
52 num_workers: 4, // Use 4 worker threads
53 memory_limit_mb: Some(100), // Limit to 100MB
54 enable_compression: false,
55 enable_prefetch: true,
56 max_chunks: Some(10), // Process only 10 chunks for demo
57 };
58
59 println!("Streaming Configuration:");
60 println!(" Chunk size: {} samples", config.chunk_size);
61 println!(" Buffer size: {} chunks", config.buffer_size);
62 println!(" Workers: {}", config.num_workers);
63 println!(" Memory limit: {:?} MB", config.memory_limit_mb);
64 println!(" Max chunks: {:?}", config.max_chunks);
65
66 // Create streaming classification dataset
67 println!("\nStreaming synthetic classification data...");
68 let mut stream = stream_classification(100_000, 20, 5, config.clone())?;
69
70 let mut total_samples = 0;
71 let mut chunk_count = 0;
72 let mut class_distribution: HashMap<i32, usize> = HashMap::new();
73
74 let start_time = Instant::now();
75
76 while let Some(chunk) = stream.next_chunk()? {
77 total_samples += chunk.n_samples();
78 chunk_count += 1;
79
80 // Analyze this chunk
81 if let Some(target) = &chunk.target {
82 for &class in target.iter() {
83 *class_distribution.entry(class as i32).or_insert(0) += 1;
84 }
85 }
86
87 // Print progress
88 let stats = stream.stats();
89 if let Some(progress) = stats.progress_percent() {
90 println!(
91 " Chunk {}: {} samples (Progress: {:.1}%, Buffer: {:.1}%)",
92 chunk.chunk_index + 1,
93 chunk.n_samples(),
94 progress,
95 stats.buffer_utilization()
96 );
97 } else {
98 println!(
99 " Chunk {}: {} samples (Buffer: {:.1}%)",
100 chunk.chunk_index + 1,
101 chunk.n_samples(),
102 stats.buffer_utilization()
103 );
104 }
105
106 // Simulate processing time
107 std::thread::sleep(std::time::Duration::from_millis(50));
108
109 if chunk.is_last {
110 println!(" 📋 Reached last chunk");
111 break;
112 }
113 }
114
115 let duration = start_time.elapsed();
116
117 println!("\nStreaming Results:");
118 println!(" Total chunks processed: {chunk_count}");
119 println!(" Total samples: {total_samples}");
120 println!(" Processing time: {:.2}s", duration.as_secs_f64());
121 println!(
122 " Throughput: {:.1} samples/s",
123 total_samples as f64 / duration.as_secs_f64()
124 );
125 println!(" Class distribution: {class_distribution:?}");
126
127 println!();
128 Ok(())
129}
Trait Implementations§
Source§impl Clone for StreamStats
impl Clone for StreamStats
Source§fn clone(&self) -> StreamStats
fn clone(&self) -> StreamStats
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreAuto Trait Implementations§
impl Freeze for StreamStats
impl RefUnwindSafe for StreamStats
impl Send for StreamStats
impl Sync for StreamStats
impl Unpin for StreamStats
impl UnwindSafe for StreamStats
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
Source§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
The inverse inclusion map: attempts to construct
self
from the equivalent element of its
superset. Read moreSource§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
Checks if
self
is actually part of its subset T
(and can be converted to it).Source§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
Use with care! Same as
self.to_subset
but without any property checks. Always succeeds.Source§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
The inclusion map: converts
self
to the equivalent element of its superset.