use async_stream::stream;
use futures_util::stream::StreamExt;
use rs2_stream::error::StreamError;
use rs2_stream::rs2::*;
use std::error::Error;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
async fn slow_operation(id: u32, delay_ms: u64) -> Result<String, Box<dyn Error + Send + Sync>> {
println!("Starting operation {} with delay {}ms", id, delay_ms);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
println!("Completed operation {}", id);
Ok(format!("Result from operation {}", id))
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("\n=== 1. Timeout Example ===");
let operations = from_iter(vec![
(1, 50), (2, 150), (3, 300), (4, 500), ]);
let timeout_duration = Duration::from_millis(200);
let results = operations
.eval_map_rs2(move |(id, delay)| async move {
match tokio::time::timeout(timeout_duration, slow_operation(id, delay)).await {
Ok(Ok(result)) => (id, format!("Success: {}", result)),
Ok(Err(e)) => (id, format!("Error: {}", e)),
Err(_) => (
id,
format!("Timeout after {}ms", timeout_duration.as_millis()),
),
}
})
.collect::<Vec<_>>()
.await;
println!("\nTimeout Results:");
for (id, result) in results {
println!(" Operation {}: {}", id, result);
}
println!("\n=== 2. Using timeout_rs2 ===");
let operations = from_iter(vec![
(1, 50), (2, 250), ]);
let results = operations
.eval_map_rs2(|(id, delay)| slow_operation(id, delay))
.timeout_rs2(Duration::from_millis(150))
.collect::<Vec<_>>()
.await;
println!("\nTimeout Results using timeout_rs2:");
for result in results {
match result {
Ok(Ok(value)) => println!(" Success: {}", value),
Ok(Err(e)) => println!(" Error: {}", e),
Err(StreamError::Timeout) => println!(" Operation timed out"),
Err(e) => println!(" Other error: {:?}", e),
}
}
println!("\n=== 3. Throttle Example ===");
let start = Instant::now();
let rapid_stream = from_iter(0..10);
let throttled = rapid_stream
.throttle_rs2(Duration::from_millis(100))
.collect::<Vec<_>>()
.await;
let elapsed = start.elapsed();
println!("\nThrottled {} elements in {:?}", throttled.len(), elapsed);
println!(
"Expected minimum time: {:?}",
Duration::from_millis(100 * (throttled.len() as u64 - 1))
);
println!("Elements: {:?}", throttled);
println!("\n=== 4. Debounce Example ===");
let start = Instant::now();
let input_events = vec![
("a", 10), ("b", 20), ("c", 30), ("d", 200), ("e", 10), ("f", 300), ];
let input_stream = stream! {
for (value, delay) in input_events {
yield value;
tokio::time::sleep(Duration::from_millis(delay as u64)).await;
}
}
.boxed();
let debounced = input_stream
.debounce_rs2(Duration::from_millis(100))
.collect::<Vec<_>>()
.await;
let elapsed = start.elapsed();
println!("\nDebounced stream collected in {:?}", elapsed);
println!("Original events: a, b, c, d, e, f");
println!("Debounced events (expected 'd' and 'f'): {:?}", debounced);
println!("\n=== 5. Sample Example ===");
let start = Instant::now();
let fast_stream = stream! {
let mut i = 0;
loop {
yield i;
i += 1;
tokio::time::sleep(Duration::from_millis(10)).await;
if start.elapsed() > Duration::from_millis(500) {
break;
}
}
}
.boxed();
let sampled = fast_stream
.sample_rs2(Duration::from_millis(100))
.collect::<Vec<_>>()
.await;
let elapsed = start.elapsed();
println!("\nSampled stream collected in {:?}", elapsed);
println!(
"Expected approximately {} samples",
elapsed.as_millis() / 100
);
println!("Actual samples: {} - {:?}", sampled.len(), sampled);
println!("\n=== 6. Emit After Example ===");
let start = Instant::now();
let delayed_value = emit_after("Delayed value", Duration::from_millis(300))
.collect::<Vec<_>>()
.await;
let elapsed = start.elapsed();
println!("\nEmit after collected in {:?}", elapsed);
println!("Expected delay: 300ms");
println!("Value: {:?}", delayed_value[0]);
});
}