use futures_util::stream::StreamExt;
use rs2_stream::rs2::*;
use std::collections::{BTreeMap, HashSet};
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
#[derive(Debug, Clone, PartialEq)]
struct User {
id: u64,
name: String,
email: String,
active: bool,
role: String,
}
async fn send_notification(user: &str, message: &str) {
println!("Sending to {}: {}", user, message);
tokio::time::sleep(Duration::from_millis(50)).await;
}
async fn process_profile(user: &User) -> Result<String, Box<dyn Error + Send + Sync>> {
println!("Processing profile for: {}", user.name);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(format!("{}'s profile processed successfully", user.name))
}
async fn update_permissions(user: &User) -> Result<String, Box<dyn Error + Send + Sync>> {
println!("Updating permissions for: {}", user.name);
tokio::time::sleep(Duration::from_millis(75)).await;
Ok(format!("{}'s permissions updated", user.name))
}
async fn generate_analytics(user: &User) -> Result<String, Box<dyn Error + Send + Sync>> {
println!("Generating analytics for: {}", user.name);
tokio::time::sleep(Duration::from_millis(150)).await;
Ok(format!("Analytics for {} completed", user.name))
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let users = vec![
User {
id: 1,
name: "Alice".to_string(),
email: "alice@example.com".to_string(),
active: true,
role: "admin".to_string(),
},
User {
id: 2,
name: "Bob".to_string(),
email: "bob@example.com".to_string(),
active: true,
role: "user".to_string(),
},
User {
id: 3,
name: "Charlie".to_string(),
email: "charlie@example.com".to_string(),
active: false,
role: "user".to_string(),
},
User {
id: 4,
name: "Diana".to_string(),
email: "diana@example.com".to_string(),
active: true,
role: "moderator".to_string(),
},
User {
id: 5,
name: "Eve".to_string(),
email: "eve@example.com".to_string(),
active: true,
role: "user".to_string(),
},
];
println!("\n=== Sequential Processing Example ===");
let notification_tracker = Arc::new(Mutex::new(Vec::new()));
let tracker_clone = notification_tracker.clone();
from_iter(users.clone())
.filter_rs2(|user| user.active)
.for_each_rs2(move |user| {
let tracker = tracker_clone.clone();
async move {
send_notification(&user.name, "Your account has been updated").await;
let mut guard = tracker.lock().await;
guard.push(user.id);
}
})
.await;
let notified_users = notification_tracker.lock().await;
println!("Notified {} users sequentially", notified_users.len());
println!("\n=== Parallel Processing with Order Preservation ===");
let start = std::time::Instant::now();
let processed_profiles = from_iter(users.clone())
.filter_rs2(|user| user.active)
.par_eval_map_rs2(3, |user| async move {
match process_profile(&user).await {
Ok(result) => (user.id, result),
Err(_) => (
user.id,
format!("Failed to process {}'s profile", user.name),
),
}
})
.collect::<Vec<_>>()
.await;
println!(
"Processed {} profiles in parallel (ordered) in {:?}",
processed_profiles.len(),
start.elapsed()
);
for (id, result) in &processed_profiles {
println!(" User {}: {}", id, result);
}
println!("\n=== Parallel Processing without Order Preservation ===");
let start = std::time::Instant::now();
let permission_updates = from_iter(users.clone())
.filter_rs2(|user| user.active)
.par_eval_map_unordered_rs2(2, |user| async move {
match update_permissions(&user).await {
Ok(result) => (user.id, result),
Err(_) => (
user.id,
format!("Failed to update {}'s permissions", user.name),
),
}
})
.collect::<Vec<_>>()
.await;
println!(
"Updated {} user permissions in parallel (unordered) in {:?}",
permission_updates.len(),
start.elapsed()
);
for (id, result) in &permission_updates {
println!(" User {}: {}", id, result);
}
println!("\n=== Parallel Stream Joining ===");
let start = std::time::Instant::now();
let active_users = from_iter(users.clone()).filter_rs2(|user| user.active);
let admin_users = from_iter(users.clone()).filter_rs2(|user| user.role == "admin");
let regular_users = from_iter(users.clone()).filter_rs2(|user| user.role == "user");
let streams = vec![
active_users
.map_rs2(|user| format!("Active: {}", user.name))
.boxed(),
admin_users
.map_rs2(|user| format!("Admin: {}", user.name))
.boxed(),
regular_users
.map_rs2(|user| format!("Regular: {}", user.name))
.boxed(),
];
let combined_results = from_iter(streams).par_join_rs2(3).collect::<Vec<_>>().await;
println!(
"Processed {} streams in parallel in {:?}",
combined_results.len(),
start.elapsed()
);
for result in &combined_results {
println!(" {}", result);
}
println!("\n=== Collection Examples ===");
let unique_roles = from_iter(users.clone())
.map_rs2(|user| user.role)
.collect_rs2::<HashSet<_>>()
.await;
println!("Unique roles: {:?}", unique_roles);
let users_by_id = from_iter(users.clone())
.map_rs2(|user| (user.id, user))
.collect_rs2::<BTreeMap<_, _>>()
.await;
println!("Users by ID (sorted):");
for (id, user) in users_by_id {
println!(" {}: {}", id, user.name);
}
});
}