rs2-stream 0.3.3

A high-performance, production-ready async streaming library for Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
use futures_util::stream::StreamExt;
use rand::{thread_rng, Rng};
use rs2_stream::rs2::*;
use rs2_stream::stream_performance_metrics::HealthThresholds;
use std::error::Error;
use std::time::Duration;
use tokio::runtime::Runtime;

// Simulate a slow operation that may fail sometimes
async fn process_item(item: i32) -> Result<i32, Box<dyn Error + Send + Sync>> {
    // Simulate processing time that varies based on the item value
    let delay = 10 + (item % 5) * 20;
    tokio::time::sleep(Duration::from_millis(delay as u64)).await;

    // Increased error probability from 5% (1/20) to 20% (1/5)
    if thread_rng().gen_ratio(1, 2) {
        return Err("Random processing error".into());
    }

    Ok(item * 2)
}

// Enhanced metrics display function
fn print_enhanced_metrics(
    name: &str,
    metrics: &rs2_stream::stream_performance_metrics::StreamMetrics,
) {
    println!("\n📊 {} Metrics:", name);
    println!("  ✅ Items processed: {}", metrics.items_processed);
    println!("  📦 Bytes processed: {}", metrics.bytes_processed);
    println!("  ⏱️  Processing time: {:?}", metrics.processing_time);
    println!(
        "  🚀 Throughput (processing): {:.2} items/sec",
        metrics.throughput_items_per_sec()
    );
    println!(
        "  📈 Throughput (wall-clock): {:.2} items/sec",
        metrics.items_per_second
    );
    println!(
        "  💾 Bandwidth (processing): {:.2} KB/sec",
        metrics.throughput_bytes_per_sec() / 1000.0
    );
    println!(
        "  📊 Bandwidth (wall-clock): {:.2} KB/sec",
        metrics.bytes_per_second / 1000.0
    );
    println!(
        "  📏 Average item size: {:.1} bytes",
        metrics.average_item_size
    );
    println!(
        "  ❌ Errors: {} ({:.1}%)",
        metrics.errors,
        metrics.error_rate * 100.0
    );
    println!("  🔄 Retries: {}", metrics.retries);
    println!("  ⚠️  Consecutive errors: {}", metrics.consecutive_errors);
    println!(
        "  🐌 Peak processing time: {:?}",
        metrics.peak_processing_time
    );
    println!("  📊 Backpressure events: {}", metrics.backpressure_events);
    println!("  📋 Queue depth: {}", metrics.queue_depth);
    println!(
        "  🏥 Health: {}",
        if metrics.is_healthy() {
            "✅ Good"
        } else {
            "⚠️ Issues"
        }
    );
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("🚀 Enhanced Stream Metrics Collection Example");

        println!("\n=== Basic Metrics Collection Example ===");

        // Create a stream of numbers
        let numbers = from_iter(1..=20);

        // Apply metrics collection to the stream
        let (metrics_stream, metrics) =
            numbers.with_metrics_rs2("numbers_stream".to_string(), HealthThresholds::default());

        // Process the stream with enhanced metrics tracking
        let mut results = Vec::new();
        let mut metrics_stream = std::pin::pin!(metrics_stream);

        while let Some(item) = metrics_stream.next().await {
            let start = std::time::Instant::now();

            // Simulate some processing
            let processed = item * 2;
            results.push(processed);

            // Record additional metrics
            {
                let mut m = metrics.lock().await;
                m.record_processing_time(start.elapsed());

                // Simulate occasional backpressure
                if thread_rng().gen_ratio(1, 10) {
                    m.record_backpressure();
                }

                // Update queue depth (simulated)
                m.update_queue_depth(thread_rng().gen_range(0..=10));
            }
        }

        println!("Processed {} numbers", results.len());

        // Print enhanced metrics
        let metrics_data = metrics.lock().await;
        print_enhanced_metrics("Basic Processing", &*metrics_data);

        println!("\n=== Error-Prone Async Processing Example ===");

        // Create a stream of numbers that might fail during processing
        let numbers = from_iter(1..=50);
        let (metrics_stream, metrics) =
            numbers.with_metrics_rs2("async_processing".to_string(), HealthThresholds::default());

        let mut success_count = 0;
        let mut error_count = 0;
        let mut retry_count = 0;

        let mut metrics_stream = std::pin::pin!(metrics_stream);

        while let Some(item) = metrics_stream.next().await {
            let start = std::time::Instant::now();

            // Try processing with retries
            let mut attempts = 0;
            let max_retries = 3;

            loop {
                match process_item(item).await {
                    Ok(_result) => {
                        success_count += 1;
                        {
                            let mut m = metrics.lock().await;
                            m.record_processing_time(start.elapsed());
                            if attempts > 0 {
                                m.retries += attempts; // Record total retry attempts
                            }
                        }
                        break;
                    }
                    Err(_e) => {
                        attempts += 1;
                        {
                            let mut m = metrics.lock().await;
                            m.record_error();
                            if attempts <= max_retries {
                                m.record_retry();
                            }
                        }

                        if attempts > max_retries {
                            error_count += 1;
                            println!(
                                "  ❌ Failed to process item {} after {} attempts",
                                item, attempts
                            );
                            break;
                        } else {
                            retry_count += 1;
                            // Exponential backoff
                            tokio::time::sleep(Duration::from_millis(
                                50 * 2_u64.pow(attempts as u32),
                            ))
                            .await;
                        }
                    }
                }
            }
        }

        println!(
            "✅ Successful: {} | ❌ Failed: {} | 🔄 Total retries: {}",
            success_count, error_count, retry_count
        );

        let metrics_data = metrics.lock().await;
        print_enhanced_metrics("Error-Prone Processing", &*metrics_data);

        println!("\n=== Stream Transformation Comparison ===");

        // Test different transformations with enhanced metrics

        // 1. Filter operation
        let (filter_stream, filter_metrics) = from_iter(1..=1000)
            .with_metrics_rs2("filter_operation".to_string(), HealthThresholds::default());

        // Clone metrics for use in the closure
        let filter_metrics_for_closure = filter_metrics.clone();

        let filter_results = filter_stream
            .filter_rs2(move |n| {
                // Simulate backpressure during heavy filtering
                if thread_rng().gen_ratio(1, 50) {
                    // In a real scenario, you'd record backpressure here
                    std::thread::sleep(Duration::from_micros(10));
                }

                // Simulate errors during filtering
                if thread_rng().gen_ratio(1, 10) {
                    // Record the error
                    tokio::spawn({
                        let metrics = filter_metrics_for_closure.clone();
                        async move {
                            let mut m = metrics.lock().await;
                            m.record_error();
                        }
                    });
                    return false; // Filter out this item due to "error"
                }

                n % 2 == 0 // Keep only even numbers
            })
            .collect::<Vec<_>>()
            .await;

        // Manually record some backpressure events for demonstration
        {
            let mut m = filter_metrics.lock().await;
            m.backpressure_events = filter_results.len() as u64 / 50; // Simulate some backpressure
        }

        // 2. Map operation with timing
        let (map_stream, map_metrics) = from_iter(1..=1000)
            .with_metrics_rs2("map_operation".to_string(), HealthThresholds::default());

        // Clone metrics for use in the closure
        let map_metrics_for_closure = map_metrics.clone();

        let map_results = map_stream
            .map_rs2(move |n| {
                // Simulate variable processing time
                std::thread::sleep(Duration::from_micros(n as u64 % 100));

                // Simulate errors during mapping
                if thread_rng().gen_ratio(1, 15) {
                    // Record the error
                    tokio::spawn({
                        let metrics = map_metrics_for_closure.clone();
                        async move {
                            let mut m = metrics.lock().await;
                            m.record_error();
                        }
                    });
                    // We still return a value since map_rs2 doesn't support filtering
                    return n * 3;
                }

                n * 3
            })
            .collect::<Vec<_>>()
            .await;

        // 3. Throttled operation
        let (throttled_stream, throttled_metrics) = from_iter(1..=100).with_metrics_rs2(
            "throttled_operation".to_string(),
            HealthThresholds::default(),
        );

        // We need to manually record errors for throttled operation
        // since throttle_rs2 doesn't take a closure where we could add error logic
        let throttled_metrics_for_errors = throttled_metrics.clone();

        // Spawn a task to simulate random errors during throttled processing
        tokio::spawn(async move {
            for _ in 0..20 {
                // Generate about 20 errors
                if thread_rng().gen_ratio(1, 5) {
                    let mut m = throttled_metrics_for_errors.lock().await;
                    m.record_error();
                }
                tokio::time::sleep(Duration::from_millis(5)).await;
            }
        });

        let throttled_results = throttled_stream
            .throttle_rs2(Duration::from_millis(10)) // Throttle to simulate load control
            .collect::<Vec<_>>()
            .await;

        // 4. Chunked operation with queue depth tracking
        let (chunked_stream, chunked_metrics) = from_iter(1..=200)
            .with_metrics_rs2("chunked_operation".to_string(), HealthThresholds::default());

        // Clone metrics before moving into closure
        let chunked_metrics_for_results = chunked_metrics.clone();
        let chunked_metrics_for_errors = chunked_metrics.clone();

        let chunked_results = chunked_stream
            .chunk_rs2(5) // Process in chunks of 5
            .enumerate()
            .map_rs2(move |(chunk_idx, chunk)| {
                // Simulate queue depth changes
                tokio::spawn({
                    let metrics = chunked_metrics.clone();
                    async move {
                        let mut m = metrics.lock().await;
                        m.update_queue_depth(chunk_idx as u64 % 10);
                    }
                });

                // Simulate errors during chunked processing
                if thread_rng().gen_ratio(1, 8) {
                    // Record the error
                    tokio::spawn({
                        let metrics = chunked_metrics_for_errors.clone();
                        async move {
                            let mut m = metrics.lock().await;
                            m.record_error();
                        }
                    });
                }

                chunk.len() // Return chunk size
            })
            .collect::<Vec<_>>()
            .await;

        // Print comparison
        println!("\n📊 Stream Transformation Comparison:");

        let filter_data = filter_metrics.lock().await;
        print_enhanced_metrics("Filter (even numbers)", &*filter_data);
        println!("  📉 Filtered from 1000 to {} items", filter_results.len());

        let map_data = map_metrics.lock().await;
        print_enhanced_metrics("Map (triple values)", &*map_data);
        println!("  🔢 Processed {} items", map_results.len());

        let throttled_data = throttled_metrics.lock().await;
        print_enhanced_metrics("Throttled Processing", &*throttled_data);
        println!("  🐌 Throttled {} items", throttled_results.len());

        let chunked_data = chunked_metrics_for_results.lock().await;
        print_enhanced_metrics("Chunked Processing", &*chunked_data);
        println!("  📦 Created {} chunks", chunked_results.len());

        println!("\n=== Performance Summary ===");
        println!(
            "🏆 Fastest throughput: Map operation ({:.2} items/sec)",
            map_data.throughput_items_per_sec()
        );
        println!(
            "🐌 Slowest throughput: Throttled operation ({:.2} items/sec)",
            throttled_data.throughput_items_per_sec()
        );
        println!(
            "📊 Most selective: Filter operation ({:.1}% pass rate)",
            filter_results.len() as f64 / 1000.0 * 100.0
        );

        // Health check summary
        println!("\n🏥 Health Check Summary:");
        println!(
            "  Metrics collection: {}",
            if metrics_data.is_healthy() {
                "✅ Healthy"
            } else {
                "⚠️ Issues"
            }
        );
        println!(
            "  Filter operation: {}",
            if filter_data.is_healthy() {
                "✅ Healthy"
            } else {
                "⚠️ Issues"
            }
        );
        println!(
            "  Map operation: {}",
            if map_data.is_healthy() {
                "✅ Healthy"
            } else {
                "⚠️ Issues"
            }
        );
        println!(
            "  Throttled operation: {}",
            if throttled_data.is_healthy() {
                "✅ Healthy"
            } else {
                "⚠️ Issues"
            }
        );
        println!(
            "  Chunked operation: {}",
            if chunked_data.is_healthy() {
                "✅ Healthy"
            } else {
                "⚠️ Issues"
            }
        );
    });
}