use futures_util::stream::StreamExt;
use rs2_stream::queue::*;
use rs2_stream::rs2::*;
use std::error::Error;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::time::{sleep, Duration};
#[derive(Debug, Clone)]
struct Message {
id: u64,
content: String,
priority: Priority,
timestamp: u64,
}
#[derive(Debug, Clone, PartialEq)]
enum Priority {
Low,
Medium,
High,
}
async fn process_message(msg: Message) -> Result<(), Box<dyn Error + Send + Sync>> {
println!("Processing message {}: '{}'", msg.id, msg.content);
let delay = match msg.priority {
Priority::High => 5,
Priority::Medium => 10,
Priority::Low => 20,
};
sleep(Duration::from_millis(delay)).await;
println!("Completed message {}", msg.id);
Ok(())
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let high_priority_queue = Arc::new(Queue::bounded(5));
let medium_priority_queue = Arc::new(Queue::bounded(10));
let low_priority_queue = Arc::new(Queue::bounded(20));
let messages = vec![
Message {
id: 1,
content: "Critical system alert".to_string(),
priority: Priority::High,
timestamp: 1000,
},
Message {
id: 2,
content: "User login".to_string(),
priority: Priority::Medium,
timestamp: 1001,
},
Message {
id: 3,
content: "Log rotation".to_string(),
priority: Priority::Low,
timestamp: 1002,
},
Message {
id: 4,
content: "Security breach detected".to_string(),
priority: Priority::High,
timestamp: 1003,
},
Message {
id: 5,
content: "New user registration".to_string(),
priority: Priority::Medium,
timestamp: 1004,
},
Message {
id: 6,
content: "Daily report".to_string(),
priority: Priority::Low,
timestamp: 1005,
},
];
for msg in messages {
let queue = match msg.priority {
Priority::High => Arc::clone(&high_priority_queue),
Priority::Medium => Arc::clone(&medium_priority_queue),
Priority::Low => Arc::clone(&low_priority_queue),
};
println!(
"Enqueueing message {}: '{}' with {:?} priority",
msg.id, msg.content, msg.priority
);
queue.enqueue(msg).await.unwrap();
}
high_priority_queue.close().await;
medium_priority_queue.close().await;
low_priority_queue.close().await;
let high_stream = high_priority_queue.dequeue();
let medium_stream = medium_priority_queue.dequeue();
let low_stream = low_priority_queue.dequeue();
let prioritized_stream = high_stream.chain(medium_stream).chain(low_stream);
let results = prioritized_stream
.par_eval_map_rs2(2, |msg| async move {
let result = process_message(msg.clone()).await;
(msg, result)
})
.collect::<Vec<_>>()
.await;
println!("\nProcessing Summary:");
println!("Total messages processed: {}", results.len());
let successes = results.iter().filter(|(_, result)| result.is_ok()).count();
let failures = results.len() - successes;
println!("Successful: {}", successes);
println!("Failed: {}", failures);
});
}