use async_stream::stream;
use futures_util::stream::StreamExt;
use rs2_stream::rs2::*;
use std::time::Duration;
use tokio::runtime::Runtime;
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("Demonstrating rate_limit_backpressure extension method");
let fast_stream = stream! {
for i in 1..=20 {
yield Ok::<i32, String>(i);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
.boxed();
println!("Stream created. Applying rate_limit_backpressure with capacity 5...");
let controlled_stream = fast_stream.rate_limit_backpressure_rs2(5);
println!("Processing elements with a delay to simulate slow consumption...");
let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let start = std::time::Instant::now();
let result = controlled_stream
.eval_map_rs2(move |x| {
let counter = counter.clone();
async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let count = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
println!("Processed item {}: {:?}", count, x);
x
}
})
.collect::<Vec<_>>()
.await;
let elapsed = start.elapsed();
println!(
"Processed {} elements with backpressure in {:?}",
result.len(),
elapsed
);
println!(
"Without backpressure, all elements would have been produced in {:?}",
Duration::from_millis(10 * 20)
);
println!("With backpressure, the production rate is limited by the consumption rate");
});
}