use futures_util::stream::StreamExt;
use rs2_stream::queue::*;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::time::{sleep, Duration};
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Arc::new(Queue::bounded(10));
let producer_queue = Arc::clone(&queue);
let consumer_queue = Arc::clone(&queue);
let producer = tokio::spawn(async move {
for i in 1..=20 {
sleep(Duration::from_millis(100)).await;
match producer_queue.enqueue(i).await {
Ok(_) => println!("Producer: Enqueued {}", i),
Err(e) => println!("Producer: Failed to enqueue {}: {:?}", i, e),
}
}
producer_queue.close().await;
println!("Producer: Done, queue closed");
});
let consumer = tokio::spawn(async move {
let mut items = consumer_queue.dequeue();
while let Some(item) = items.next().await {
println!("Consumer: Processing {}", item);
sleep(Duration::from_millis(200)).await;
println!("Consumer: Finished processing {}", item);
}
println!("Consumer: Queue exhausted");
});
let _ = tokio::join!(producer, consumer);
});
}