use futures_util::stream::StreamExt;
use rs2_stream::pipe::*;
use rs2_stream::rs2::*;
use std::collections::HashMap;
use tokio::runtime::Runtime;
#[derive(Debug, Clone, PartialEq)]
struct User {
id: u64,
name: String,
email: String,
active: bool,
role: String,
login_count: u32,
}
#[derive(Debug, Clone, PartialEq)]
struct UserStats {
id: u64,
name: String,
role: String,
is_active: bool,
login_frequency: &'static str,
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let users = vec![
User {
id: 1,
name: "Alice".to_string(),
email: "alice@example.com".to_string(),
active: true,
role: "admin".to_string(),
login_count: 120,
},
User {
id: 2,
name: "Bob".to_string(),
email: "bob@example.com".to_string(),
active: true,
role: "user".to_string(),
login_count: 45,
},
User {
id: 3,
name: "Charlie".to_string(),
email: "charlie@example.com".to_string(),
active: false,
role: "user".to_string(),
login_count: 5,
},
User {
id: 4,
name: "Diana".to_string(),
email: "diana@example.com".to_string(),
active: true,
role: "moderator".to_string(),
login_count: 80,
},
User {
id: 5,
name: "Eve".to_string(),
email: "eve@example.com".to_string(),
active: true,
role: "user".to_string(),
login_count: 30,
},
];
let active_filter = filter(|user: &User| user.active);
let stats_transform = map(|user: User| {
let frequency = match user.login_count {
0..=10 => "Low",
11..=50 => "Medium",
_ => "High",
};
UserStats {
id: user.id,
name: user.name,
role: user.role,
is_active: user.active,
login_frequency: frequency,
}
});
let process_users = active_filter.compose(stats_transform);
let user_stream = from_iter(users.clone());
let stats_stream = process_users.apply(user_stream);
let user_stats = stats_stream.collect::<Vec<_>>().await;
println!("User Statistics:");
for stats in user_stats {
println!(
" - {} ({}): {} usage",
stats.name, stats.role, stats.login_frequency
);
}
let group_by_frequency = map(|stats: UserStats| (stats.login_frequency, stats));
let user_stream = from_iter(users);
let stats_stream = process_users.apply(user_stream);
let grouped_stream = group_by_frequency.apply(stats_stream);
let mut frequency_groups: HashMap<&str, Vec<UserStats>> = HashMap::new();
let grouped_stats = grouped_stream.collect::<Vec<_>>().await;
for (frequency, stats) in grouped_stats {
frequency_groups
.entry(frequency)
.or_insert_with(Vec::new)
.push(stats);
}
println!("\nUsers by Login Frequency:");
for (frequency, users) in &frequency_groups {
println!(" {} Usage ({} users):", frequency, users.len());
for user in users {
println!(" - {} ({})", user.name, user.role);
}
}
});
}