use rs2_stream::rs2::*;
use serde::{Deserialize, Serialize};
use futures_util::StreamExt;
use std::time::{Duration, Instant};
use tokio::time::sleep;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct UserProfile {
id: u64,
name: String,
email: String,
age: u32,
preferences: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ProcessedUser {
id: u64,
name: String,
email: String,
age: u32,
preferences: Vec<String>,
processed_at: u64,
processing_time_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ApiResponse {
user_id: u64,
data: String,
response_time_ms: u64,
status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DatabaseRecord {
id: u64,
content: String,
created_at: u64,
updated_at: u64,
}
async fn cpu_intensive_processing(user: UserProfile) -> ProcessedUser {
let start = Instant::now();
let mut _result = 0.0;
for i in 0..100_000 {
_result += (i as f64).sqrt();
}
sleep(Duration::from_millis(10)).await;
let processing_time = start.elapsed().as_millis() as u64;
ProcessedUser {
id: user.id,
name: user.name,
email: user.email,
age: user.age,
preferences: user.preferences,
processed_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
processing_time_ms: processing_time,
}
}
async fn api_call(user_id: u64) -> ApiResponse {
let start = Instant::now();
sleep(Duration::from_millis(50 + (user_id % 100))).await;
let response_time = start.elapsed().as_millis() as u64;
ApiResponse {
user_id,
data: format!("API data for user {}", user_id),
response_time_ms: response_time,
status: "success".to_string(),
}
}
async fn database_operation(user_id: u64) -> DatabaseRecord {
let start = Instant::now();
sleep(Duration::from_millis(20 + (user_id % 50))).await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
DatabaseRecord {
id: user_id,
content: format!("Database content for user {}", user_id),
created_at: now,
updated_at: now,
}
}
async fn file_processing(user_id: u64) -> String {
let start = Instant::now();
sleep(Duration::from_millis(30 + (user_id % 80))).await;
format!("Processed file for user {} in {}ms", user_id, start.elapsed().as_millis())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 RS2 Comprehensive Parallel Processing Example");
println!("================================================\n");
let users: Vec<UserProfile> = (1..=1000)
.map(|id| UserProfile {
id,
name: format!("User {}", id),
email: format!("user{}@example.com", id),
age: 20 + (id as u32 % 50),
preferences: vec![
"music".to_string(),
"sports".to_string(),
"technology".to_string(),
],
})
.collect();
println!("📊 Performance Comparison: Sequential vs Parallel Processing");
println!("-----------------------------------------------------------");
println!("\n1️⃣ Sequential Processing (Baseline)");
let start = Instant::now();
let sequential_results: Vec<ProcessedUser> = from_iter(users.clone())
.eval_map_rs2(|user| Box::pin(cpu_intensive_processing(user)))
.collect()
.await;
let sequential_time = start.elapsed();
println!(" ✅ Sequential: {} users in {:?}", sequential_results.len(), sequential_time);
println!("\n2️⃣ Parallel Processing (Ordered)");
let start = Instant::now();
let parallel_ordered_results: Vec<ProcessedUser> = from_iter(users.clone())
.par_eval_map_rs2(10, |user| Box::pin(cpu_intensive_processing(user)))
.collect()
.await;
let parallel_ordered_time = start.elapsed();
println!(" ✅ Parallel (Ordered): {} users in {:?}", parallel_ordered_results.len(), parallel_ordered_time);
println!(" 📈 Speedup: {:.2}x", sequential_time.as_secs_f64() / parallel_ordered_time.as_secs_f64());
println!("\n3️⃣ Parallel Processing (Unordered)");
let start = Instant::now();
let parallel_unordered_results: Vec<ProcessedUser> = from_iter(users.clone())
.par_eval_map_unordered_rs2(10, |user| Box::pin(cpu_intensive_processing(user)))
.collect()
.await;
let parallel_unordered_time = start.elapsed();
println!(" ✅ Parallel (Unordered): {} users in {:?}", parallel_unordered_results.len(), parallel_unordered_time);
println!(" 📈 Speedup: {:.2}x", sequential_time.as_secs_f64() / parallel_unordered_time.as_secs_f64());
println!("\n4️⃣ Mixed Workload Processing (CPU + I/O)");
let start = Instant::now();
let mixed_results: Vec<(ProcessedUser, ApiResponse, DatabaseRecord)> = from_iter(users.clone())
.par_eval_map_rs2(8, |user| {
Box::pin(async move {
let processed_user = cpu_intensive_processing(user.clone()).await;
let api_response = api_call(user.id).await;
let db_record = database_operation(user.id).await;
(processed_user, api_response, db_record)
})
})
.collect()
.await;
let mixed_time = start.elapsed();
println!(" ✅ Mixed Workload: {} users in {:?}", mixed_results.len(), mixed_time);
println!("\n5️⃣ Pipeline Processing");
let start = Instant::now();
let pipeline_results: Vec<String> = from_iter(users.clone())
.par_eval_map_rs2(6, |user| Box::pin(cpu_intensive_processing(user)))
.par_eval_map_rs2(4, |processed_user| Box::pin(api_call(processed_user.id)))
.par_eval_map_rs2(8, |api_response| Box::pin(file_processing(api_response.user_id)))
.collect()
.await;
let pipeline_time = start.elapsed();
println!(" ✅ Pipeline: {} users in {:?}", pipeline_results.len(), pipeline_time);
println!("\n6️⃣ Adaptive Concurrency (Different Concurrency Levels)");
let concurrency_levels = vec![1, 2, 4, 8, 16, 32];
for concurrency in concurrency_levels {
let start = Instant::now();
let results: Vec<ProcessedUser> = from_iter(users.clone())
.par_eval_map_rs2(concurrency, |user| Box::pin(cpu_intensive_processing(user)))
.collect()
.await;
let time = start.elapsed();
println!(" 🔧 Concurrency {}: {} users in {:?} ({:.2} users/sec)",
concurrency, results.len(), time,
results.len() as f64 / time.as_secs_f64());
}
println!("\n7️⃣ Error Handling in Parallel Processing");
let users_with_errors: Vec<UserProfile> = (1..=100)
.map(|id| UserProfile {
id,
name: format!("User {}", id),
email: format!("user{}@example.com", id),
age: 20 + (id as u32 % 50),
preferences: vec!["music".to_string()],
})
.collect();
let error_results: Vec<Result<ProcessedUser, String>> = from_iter(users_with_errors)
.par_eval_map_rs2(5, |user| {
Box::pin(async move {
if user.id % 10 == 0 {
Err(format!("Processing failed for user {}", user.id))
} else {
Ok(cpu_intensive_processing(user).await)
}
})
})
.collect()
.await;
let success_count = error_results.iter().filter(|r| r.is_ok()).count();
let error_count = error_results.iter().filter(|r| r.is_err()).count();
println!(" ✅ Success: {}, Errors: {}", success_count, error_count);
println!("\n8️⃣ Resource Management and Backpressure");
let start = Instant::now();
let resource_results: Vec<ProcessedUser> = from_iter(users.clone())
.par_eval_map_rs2(4, |user| Box::pin(cpu_intensive_processing(user)))
.collect()
.await;
let resource_time = start.elapsed();
println!(" ✅ Resource Managed: {} users in {:?}", resource_results.len(), resource_time);
println!("\n9️⃣ Real-World Scenario: E-commerce Order Processing");
let orders: Vec<u64> = (1..=500).collect();
let start = Instant::now();
let order_results: Vec<(u64, ApiResponse, DatabaseRecord, String)> = from_iter(orders)
.par_eval_map_rs2(12, |order_id| {
Box::pin(async move {
let api_response = api_call(order_id).await;
let db_record = database_operation(order_id).await;
let file_result = file_processing(order_id).await;
(order_id, api_response, db_record, file_result)
})
})
.collect()
.await;
let order_time = start.elapsed();
println!(" ✅ E-commerce Processing: {} orders in {:?}", order_results.len(), order_time);
println!("\n📊 Performance Summary");
println!("=====================");
println!("Sequential Processing: {:?}", sequential_time);
println!("Parallel (Ordered): {:?} ({:.2}x speedup)",
parallel_ordered_time,
sequential_time.as_secs_f64() / parallel_ordered_time.as_secs_f64());
println!("Parallel (Unordered): {:?} ({:.2}x speedup)",
parallel_unordered_time,
sequential_time.as_secs_f64() / parallel_unordered_time.as_secs_f64());
println!("Mixed Workload: {:?}", mixed_time);
println!("Pipeline Processing: {:?}", pipeline_time);
println!("Resource Managed: {:?}", resource_time);
println!("E-commerce Processing: {:?}", order_time);
println!("\n🎯 Key Takeaways:");
println!("• Parallel processing provides significant speedup for CPU-intensive tasks");
println!("• Unordered processing is faster than ordered when order doesn't matter");
println!("• Optimal concurrency depends on workload type (CPU vs I/O bound)");
println!("• Error handling works seamlessly in parallel operations");
println!("• Resource management prevents memory issues at scale");
println!("• Pipeline processing enables complex workflows");
println!("\n✅ Comprehensive parallel processing example completed!");
Ok(())
}