use futures_util::stream::StreamExt;
use rs2_stream::rs2::*;
use std::error::Error;
use std::time::Duration;
use tokio::runtime::Runtime;
#[derive(Debug, Clone, PartialEq)]
struct User {
id: u64,
name: String,
email: String,
active: bool,
role: String,
}
async fn fetch_users() -> Vec<User> {
vec![
User {
id: 1,
name: "Alice".to_string(),
email: "alice@example.com".to_string(),
active: true,
role: "admin".to_string(),
},
User {
id: 2,
name: "Bob".to_string(),
email: "bob@example.com".to_string(),
active: true,
role: "user".to_string(),
},
User {
id: 3,
name: "Charlie".to_string(),
email: "charlie@example.com".to_string(),
active: false,
role: "user".to_string(),
},
User {
id: 4,
name: "Diana".to_string(),
email: "diana@example.com".to_string(),
active: true,
role: "moderator".to_string(),
},
User {
id: 5,
name: "Eve".to_string(),
email: "eve@example.com".to_string(),
active: true,
role: "user".to_string(),
},
]
}
async fn send_email(user: &User, message: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
println!("Sending email to {}: {}", user.email, message);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
async fn update_user(user: User) -> Result<User, Box<dyn Error + Send + Sync>> {
println!("Updating user: {}", user.name);
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(user)
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
async fn get_active_users() -> RS2Stream<User> {
let users = eval(fetch_users()).flat_map(|users| from_iter(users));
let users_with_backpressure = users.auto_backpressure_rs2();
users_with_backpressure
.filter_rs2(|user| user.active)
.prefetch_rs2(2) }
let users_by_role = get_active_users()
.await
.group_by_rs2(|user| user.role.clone())
.collect::<Vec<_>>()
.await;
for (role, users) in users_by_role {
println!("Role: {}, Count: {}", role, users.len());
for user in users {
println!(" - {} ({})", user.name, user.email);
}
}
let processed_users = get_active_users()
.await
.par_eval_map_rs2(3, |mut user| async move {
user.name = format!("{} (Processed)", user.name);
update_user(user.clone()).await.unwrap_or_else(|_| user)
})
.collect::<Vec<_>>()
.await;
println!("\nProcessed {} users", processed_users.len());
let email_results = get_active_users()
.await
.eval_map_rs2(|user| async move {
match tokio::time::timeout(
Duration::from_millis(200),
send_email(&user, "Welcome to our platform!"),
)
.await
{
Ok(Ok(_)) => (user.id, true),
_ => (user.id, false),
}
})
.collect::<Vec<_>>()
.await;
println!("\nEmail results:");
for (user_id, success) in email_results {
println!(
"User {}: {}",
user_id,
if success {
"Email sent"
} else {
"Failed to send email"
}
);
}
});
}