use noworkers::Workers;
use noworkers::extensions::WithSysLimitCpus;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Parallel Processing Pipeline Example");
println!("====================================\n");
let dataset: Vec<i32> = (1..=100).collect();
let total_items = dataset.len();
println!("Processing {} items in parallel...\n", total_items);
example_parallel_map(dataset.clone()).await?;
example_pipeline(dataset.clone()).await?;
example_batch_processing(dataset).await?;
Ok(())
}
async fn example_parallel_map(dataset: Vec<i32>) -> anyhow::Result<()> {
println!("1. Simple Parallel Map");
println!("----------------------");
let mut workers = Workers::new();
workers.with_limit_to_system_cpus();
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let processed = Arc::new(AtomicUsize::new(0));
let total = dataset.len();
let start = Instant::now();
for item in dataset {
let results = results.clone();
let processed = processed.clone();
workers
.add(move |_cancel| async move {
let result = expensive_computation(item).await?;
let mut res = results.lock().await;
res.push(result);
let count = processed.fetch_add(1, Ordering::SeqCst) + 1;
if count.is_multiple_of(10) {
println!(" Processed {}/{} items", count, total);
}
Ok(())
})
.await?;
}
workers.wait().await?;
let results = results.lock().await;
let sum: i32 = results.iter().sum();
println!(" Completed in {}ms", start.elapsed().as_millis());
println!(" Sum of results: {}\n", sum);
Ok(())
}
async fn example_pipeline(dataset: Vec<i32>) -> anyhow::Result<()> {
println!("2. Multi-Stage Pipeline");
println!("-----------------------");
let stage1_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let mut stage1 = Workers::new();
stage1.with_limit(4);
println!(" Stage 1: Transforming data...");
for item in dataset {
let output = stage1_output.clone();
stage1
.add(move |_| async move {
let transformed = item * 2;
tokio::time::sleep(Duration::from_millis(10)).await;
let mut out = output.lock().await;
out.push(transformed);
Ok(())
})
.await?;
}
stage1.wait().await?;
let stage2_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let mut stage2 = Workers::new();
stage2.with_limit(2);
println!(" Stage 2: Filtering and aggregating...");
let stage1_data = stage1_output.lock().await.clone();
for chunk in stage1_data.chunks(10) {
let chunk = chunk.to_vec();
let output = stage2_output.clone();
stage2
.add(move |_| async move {
let filtered_sum: i32 = chunk.iter().filter(|&&x| x % 2 == 0).sum();
tokio::time::sleep(Duration::from_millis(20)).await;
let mut out = output.lock().await;
out.push(filtered_sum);
Ok(())
})
.await?;
}
stage2.wait().await?;
let final_results = stage2_output.lock().await;
let total: i32 = final_results.iter().sum();
println!(" Pipeline result: {}\n", total);
Ok(())
}
async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
println!("3. Batch Processing");
println!("-------------------");
let batch_size = 20;
let batches: Vec<Vec<i32>> = dataset
.chunks(batch_size)
.map(|chunk| chunk.to_vec())
.collect();
println!(
" Processing {} batches of {} items each",
batches.len(),
batch_size
);
let mut workers = Workers::new();
workers.with_limit(3);
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let start = Instant::now();
for (batch_idx, batch) in batches.into_iter().enumerate() {
let results = results.clone();
workers
.add(move |cancel| async move {
println!(" Batch {} started", batch_idx);
tokio::select! {
batch_result = process_batch(batch, batch_idx) => {
match batch_result {
Ok(result) => {
let mut res = results.lock().await;
res.push(result);
println!(" Batch {} completed", batch_idx);
Ok(())
}
Err(e) => {
println!(" Batch {} failed: {}", batch_idx, e);
Err(e)
}
}
}
_ = cancel.cancelled() => {
println!(" Batch {} cancelled", batch_idx);
Ok(())
}
}
})
.await?;
}
workers.wait().await?;
let results = results.lock().await;
let total_processed: usize = results.iter().sum();
println!(
" Processed {} items in {}ms\n",
total_processed,
start.elapsed().as_millis()
);
Ok(())
}
async fn expensive_computation(n: i32) -> anyhow::Result<i32> {
tokio::time::sleep(Duration::from_millis(5)).await;
let result = (n * n) + (n / 2) - 1;
Ok(result)
}
async fn process_batch(batch: Vec<i32>, _batch_idx: usize) -> anyhow::Result<usize> {
tokio::time::sleep(Duration::from_millis(100)).await;
let processed_count = batch.len();
Ok(processed_count)
}