use futures::StreamExt;
use futures::executor::ThreadPool;
use iterstream::IterStream;
use std::thread::sleep;
use std::time::{Duration, Instant};
struct Sleep {
max: u32,
name: String,
}
impl Sleep {
fn new(max: u32, name: &str) -> Self {
Sleep {
max,
name: name.into(),
}
}
}
impl Iterator for Sleep {
type Item = (u32, String);
fn next(&mut self) -> Option<Self::Item> {
match self.max {
0 => None,
_ => {
sleep(Duration::from_secs(1));
self.max -= 1;
Some((self.max, self.name.clone()))
}
}
}
}
fn round_robin_iterator() {
let mut iterators = vec![
Sleep::new(5, "one"),
Sleep::new(5, "two"),
Sleep::new(5, "three"),
Sleep::new(5, "four"),
];
loop {
let mut one_yield = false;
for sleep in iterators.iter_mut() {
if let Some((val, name)) = sleep.next() {
println!("{} {}", name, val);
one_yield = true;
}
}
if !one_yield {
break;
}
}
}
async fn round_robin_streams() {
let pool = ThreadPool::new().unwrap();
let mut iterators = vec![
Sleep::new(5, "one").to_stream_with_pool(20, pool.clone()),
Sleep::new(5, "two").to_stream_with_pool(20, pool.clone()),
Sleep::new(5, "three").to_stream_with_pool(20, pool.clone()),
Sleep::new(5, "four").to_stream_with_pool(20, pool.clone()),
];
loop {
let mut one_yield = false;
for sleep in iterators.iter_mut() {
if let Some((val, name)) = sleep.next().await {
println!("{} {}", name, val);
one_yield = true;
}
}
if !one_yield {
break;
}
}
}
fn main() {
println!("With iterators");
let start = Instant::now();
round_robin_iterator();
println!("Duration: {} s", start.elapsed().as_secs_f32());
println!("With streams");
let start = Instant::now();
futures::executor::block_on(async {
round_robin_streams().await;
});
println!("Duration: {} s", start.elapsed().as_secs_f32());
}